From 162c15359f334d23f67f0f611b42eb39c3be3057 Mon Sep 17 00:00:00 2001 From: Ilia Krustev Date: Mon, 29 Jul 2019 15:05:38 +0300 Subject: [PATCH 1/2] Connector: implement connect complete timeout. --- .../nynja/bridge/verticle/MqttConnector.java | 257 ++++++++++++++---- 1 file changed, 208 insertions(+), 49 deletions(-) diff --git a/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java b/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java index 5953263..d069f0f 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java +++ b/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java @@ -9,6 +9,54 @@ import lombok.extern.slf4j.Slf4j; @Slf4j class MqttConnector { + /** + * INITIAL + * --> onStart + * --> connect, start connect timer + * --> CONNECTING + * --> X + * CONNECTING + * --> onConnectSuccess + * --> stop connect timer + * --> CONNECTED + * --> onConnectFailure + * --> stop connect timer, start reconnect timer + * --> DISCONNECTED + * --> onConnectTimeout + * --> disconnect, start reconnect timer + * --> DISCONNECTED + * --> onConnectionLost + * --> start reconnect timer + * --> DISCONNECTED + * --> X + * CONNECTED + * --> onConnectionLost + * --> start reconnect timer + * --> DISCONNECTED + * --> X + * DISCONNECTED + * --> onReconnectTimeout + * --> connect, start connect timer + * --> CONNECTING + * --> X + * + * STOPPED + * --> All Events + * --> STOPPED + * + * CONNECTING, CONNECTED, DISCONNECTED + * --> onStop + * --> disconnect, stop all timers + * --> STOPPED + * */ + private enum State { + INITAIL, + CONNECTING, + CONNECTED, + DISCONNECTED, + STOPPED, + } + private static final int MAX_RECONNECT_DELAY = 20000; private static final int INITIAL_RECONNECT_DELAY = 100; @@ -16,19 +64,19 @@ class MqttConnector { private final MqttClient client; private final MqttConfiguration configuration; private int reconnectDelay; + private State state; + private Long connectTimerId; private Long reconnectTimerId; - private boolean shutdown; - private boolean reconnectPending; MqttConnector(Vertx vertx, MqttClient client, MqttConfiguration configuration) { this.vertx = vertx; this.client = client; this.configuration = configuration; this.client.pingResponseHandler(this::onPong); - this.client.closeHandler(this::onClose); + this.client.closeHandler(this::onConnectionLost); + this.state = State.INITAIL; + this.connectTimerId = null; this.reconnectTimerId = null; - this.shutdown = false; - this.reconnectPending = false; reset(); } @@ -45,83 +93,194 @@ class MqttConnector { future.complete(); } }); - connect(connectFuture); + onStart(connectFuture); } void stop() { - shutdown = true; - if (reconnectTimerId != null) { - vertx.cancelTimer(reconnectTimerId); - } - client.disconnect(); + onStop(); } void reset() { reconnectDelay = INITIAL_RECONNECT_DELAY; } - private void connect(Future future) { + private void onStart(Future future) { + final String event = "onStart"; + switch (state) { + case INITAIL: + enterConnecting(future, event); + break; + case STOPPED: + break; + default: + handleUnexpectedEvent(event); + break; + } + } + + private void onStop() { + final String event = "onStop"; + switch (state) { + case STOPPED: + case INITAIL: + handleUnexpectedEvent(event); + break; + default: + stopConnectTimer(); + stopReconnectTimer(); + client.disconnect(); + setState(State.STOPPED, event); + break; + } + } + + private void onConnectSuccess() { + final String event = "onConnectSuccess"; + switch (state) { + case CONNECTING: + stopConnectTimer(); + setState(State.CONNECTED, event); + break; + case STOPPED: + break; + default: + handleUnexpectedEvent(event); + break; + } + } + + private void onConnectFailure() { + final String event = "onConnectFailure"; + switch (state) { + case CONNECTING: + stopConnectTimer(); + enterDisconnected(event); + break; + case STOPPED: + break; + default: + handleUnexpectedEvent(event); + break; + } + } + + private void onConnectTimeout(Long timerId) { + final String event = "onConnectTimeout"; + if (!timerId.equals(connectTimerId)) { + log.error("Unexpected connect timer ignored"); + return; + } + switch (state) { + case CONNECTING: + client.disconnect(); + enterDisconnected(event); + break; + case STOPPED: + break; + default: + handleUnexpectedEvent(event); + break; + } + } + + private void onConnectionLost(Void ignore) { + final String event = "onConnectionLost"; + log.error("Connection lost!"); + switch (state) { + case CONNECTED: + case CONNECTING: + enterDisconnected(event); + break; + case STOPPED: + break; + default: + handleUnexpectedEvent(event); + break; + } + } + + private void onReconnectTimeout(Long timerId) { + final String event = "onReconnectTimeout"; + if (!timerId.equals(reconnectTimerId)) { + log.error("Unexpected reconnect timer ignored"); + return; + } + reconnectTimerId = null; + switch (state) { + case DISCONNECTED: + log.info("Reconnecting ..."); + Future f = Future.future(); + enterConnecting(f, event); + break; + case STOPPED: + break; + default: + handleUnexpectedEvent(event); + break; + } + } + + private void handleUnexpectedEvent(String event) { + log.error("Unexpected event {} in state {} ignored", event, state); + } + + private void enterConnecting(Future future, String event) { log.debug("Connecting '{}@{}:{}'...", configuration.getUniqueClientId(), configuration.getHost(), configuration.getPort()); + connectTimerId = vertx.setTimer(40000, this::onConnectTimeout); + client.connect(configuration.getPort(), configuration.getHost(), ch -> { if (ch.succeeded()) { - log.debug("Connected"); future.complete(); + onConnectSuccess(); } else { - log.error("Failed to connect", ch.cause()); - checkReconnect(); future.fail(ch.cause()); + log.error("Failed to connect", ch.cause()); + onConnectFailure(); } }); + setState(State.CONNECTING, event); } - private void onPong(Void ignore) { - log.debug("Pong received!"); - reset(); + private void stopConnectTimer() { + stopTimer(connectTimerId); + reconnectTimerId = null; } - private void onClose(Void ignore) { - log.error("Connection lost!"); - checkReconnect(); + private void enterDisconnected(String event) { + startReconnectTimer(); + setState(State.DISCONNECTED, event); } - private void checkReconnect() { - if (shutdown) { - log.info("No reconnect due to shutdown"); - return; - } - if (reconnectPending) { - log.warn("Reconnect already pending"); - return; - } - reconnectPending = true; - scheduleReconnect(); + private void startReconnectTimer() { + log.info("Scheduling reconnect in {} msec", reconnectDelay); + assert reconnectTimerId == null; + reconnectTimerId = vertx.setTimer(reconnectDelay, this::onReconnectTimeout); reconnectDelay = 2 * reconnectDelay; if (reconnectDelay > MAX_RECONNECT_DELAY) { reconnectDelay = MAX_RECONNECT_DELAY; } } - private void scheduleReconnect() { - log.info("Scheduling reconnect in {} msec", reconnectDelay); - assert reconnectTimerId == null; - reconnectTimerId = vertx.setTimer(reconnectDelay, this::reconnect); + private void stopReconnectTimer() { + stopTimer(reconnectTimerId ); + reconnectTimerId = null; } - private void reconnect(Long timerId) { - if (shutdown) { - log.info("Timer ignored due to shutdown"); - return; + private void stopTimer(Long timerId) { + if (timerId != null) { + vertx.cancelTimer(timerId); } - if (!timerId.equals(reconnectTimerId)) { - log.error("Unexpected timer ignored"); - return; - } - log.info("Reconnecting ..."); - reconnectTimerId = null; - reconnectPending = false; - Future f = Future.future(); - connect(f); + } + + private void onPong(Void ignore) { + log.debug("Pong received!"); + reset(); + } + + private void setState(State state, String event) { + log.info("Transition {} =={}==> {}", this.state, event, state); + this.state = state; } } -- GitLab From 0f5c3872073b4f91c8f886e897ed3fc7bb729056 Mon Sep 17 00:00:00 2001 From: Ilia Krustev Date: Mon, 29 Jul 2019 15:32:30 +0300 Subject: [PATCH 2/2] Connector: runtime exception from mqtt client.disconnect() --- .../biz/nynja/bridge/verticle/MqttConnector.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java b/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java index d069f0f..9ecb2dc 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java +++ b/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java @@ -128,7 +128,7 @@ class MqttConnector { default: stopConnectTimer(); stopReconnectTimer(); - client.disconnect(); + disconnectClient(); setState(State.STOPPED, event); break; } @@ -172,7 +172,7 @@ class MqttConnector { } switch (state) { case CONNECTING: - client.disconnect(); + disconnectClient(); enterDisconnected(event); break; case STOPPED: @@ -283,4 +283,12 @@ class MqttConnector { log.info("Transition {} =={}==> {}", this.state, event, state); this.state = state; } + + private void disconnectClient() { + try { + client.disconnect(); + } catch (RuntimeException e) { + log.debug("Exception disconnecting client", e); + } + } } -- GitLab