diff --git a/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java b/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java index 59532636dd41631055acf4c0c17044e3348c18c1..9ecb2dc9fd82d95dafe36f621e4ad669e230b763 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java +++ b/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java @@ -9,6 +9,54 @@ import lombok.extern.slf4j.Slf4j; @Slf4j class MqttConnector { + /** + * INITIAL + * --> onStart + * --> connect, start connect timer + * --> CONNECTING + * --> X + * CONNECTING + * --> onConnectSuccess + * --> stop connect timer + * --> CONNECTED + * --> onConnectFailure + * --> stop connect timer, start reconnect timer + * --> DISCONNECTED + * --> onConnectTimeout + * --> disconnect, start reconnect timer + * --> DISCONNECTED + * --> onConnectionLost + * --> start reconnect timer + * --> DISCONNECTED + * --> X + * CONNECTED + * --> onConnectionLost + * --> start reconnect timer + * --> DISCONNECTED + * --> X + * DISCONNECTED + * --> onReconnectTimeout + * --> connect, start connect timer + * --> CONNECTING + * --> X + * + * STOPPED + * --> All Events + * --> STOPPED + * + * CONNECTING, CONNECTED, DISCONNECTED + * --> onStop + * --> disconnect, stop all timers + * --> STOPPED + * */ + private enum State { + INITAIL, + CONNECTING, + CONNECTED, + DISCONNECTED, + STOPPED, + } + private static final int MAX_RECONNECT_DELAY = 20000; private static final int INITIAL_RECONNECT_DELAY = 100; @@ -16,19 +64,19 @@ class MqttConnector { private final MqttClient client; private final MqttConfiguration configuration; private int reconnectDelay; + private State state; + private Long connectTimerId; private Long reconnectTimerId; - private boolean shutdown; - private boolean reconnectPending; MqttConnector(Vertx vertx, MqttClient client, MqttConfiguration configuration) { this.vertx = vertx; this.client = client; this.configuration = configuration; this.client.pingResponseHandler(this::onPong); - this.client.closeHandler(this::onClose); + this.client.closeHandler(this::onConnectionLost); + this.state = State.INITAIL; + this.connectTimerId = null; this.reconnectTimerId = null; - this.shutdown = false; - this.reconnectPending = false; reset(); } @@ -45,83 +93,202 @@ class MqttConnector { future.complete(); } }); - connect(connectFuture); + onStart(connectFuture); } void stop() { - shutdown = true; - if (reconnectTimerId != null) { - vertx.cancelTimer(reconnectTimerId); - } - client.disconnect(); + onStop(); } void reset() { reconnectDelay = INITIAL_RECONNECT_DELAY; } - private void connect(Future future) { + private void onStart(Future future) { + final String event = "onStart"; + switch (state) { + case INITAIL: + enterConnecting(future, event); + break; + case STOPPED: + break; + default: + handleUnexpectedEvent(event); + break; + } + } + + private void onStop() { + final String event = "onStop"; + switch (state) { + case STOPPED: + case INITAIL: + handleUnexpectedEvent(event); + break; + default: + stopConnectTimer(); + stopReconnectTimer(); + disconnectClient(); + setState(State.STOPPED, event); + break; + } + } + + private void onConnectSuccess() { + final String event = "onConnectSuccess"; + switch (state) { + case CONNECTING: + stopConnectTimer(); + setState(State.CONNECTED, event); + break; + case STOPPED: + break; + default: + handleUnexpectedEvent(event); + break; + } + } + + private void onConnectFailure() { + final String event = "onConnectFailure"; + switch (state) { + case CONNECTING: + stopConnectTimer(); + enterDisconnected(event); + break; + case STOPPED: + break; + default: + handleUnexpectedEvent(event); + break; + } + } + + private void onConnectTimeout(Long timerId) { + final String event = "onConnectTimeout"; + if (!timerId.equals(connectTimerId)) { + log.error("Unexpected connect timer ignored"); + return; + } + switch (state) { + case CONNECTING: + disconnectClient(); + enterDisconnected(event); + break; + case STOPPED: + break; + default: + handleUnexpectedEvent(event); + break; + } + } + + private void onConnectionLost(Void ignore) { + final String event = "onConnectionLost"; + log.error("Connection lost!"); + switch (state) { + case CONNECTED: + case CONNECTING: + enterDisconnected(event); + break; + case STOPPED: + break; + default: + handleUnexpectedEvent(event); + break; + } + } + + private void onReconnectTimeout(Long timerId) { + final String event = "onReconnectTimeout"; + if (!timerId.equals(reconnectTimerId)) { + log.error("Unexpected reconnect timer ignored"); + return; + } + reconnectTimerId = null; + switch (state) { + case DISCONNECTED: + log.info("Reconnecting ..."); + Future f = Future.future(); + enterConnecting(f, event); + break; + case STOPPED: + break; + default: + handleUnexpectedEvent(event); + break; + } + } + + private void handleUnexpectedEvent(String event) { + log.error("Unexpected event {} in state {} ignored", event, state); + } + + private void enterConnecting(Future future, String event) { log.debug("Connecting '{}@{}:{}'...", configuration.getUniqueClientId(), configuration.getHost(), configuration.getPort()); + connectTimerId = vertx.setTimer(40000, this::onConnectTimeout); + client.connect(configuration.getPort(), configuration.getHost(), ch -> { if (ch.succeeded()) { - log.debug("Connected"); future.complete(); + onConnectSuccess(); } else { - log.error("Failed to connect", ch.cause()); - checkReconnect(); future.fail(ch.cause()); + log.error("Failed to connect", ch.cause()); + onConnectFailure(); } }); + setState(State.CONNECTING, event); } - private void onPong(Void ignore) { - log.debug("Pong received!"); - reset(); + private void stopConnectTimer() { + stopTimer(connectTimerId); + reconnectTimerId = null; } - private void onClose(Void ignore) { - log.error("Connection lost!"); - checkReconnect(); + private void enterDisconnected(String event) { + startReconnectTimer(); + setState(State.DISCONNECTED, event); } - private void checkReconnect() { - if (shutdown) { - log.info("No reconnect due to shutdown"); - return; - } - if (reconnectPending) { - log.warn("Reconnect already pending"); - return; - } - reconnectPending = true; - scheduleReconnect(); + private void startReconnectTimer() { + log.info("Scheduling reconnect in {} msec", reconnectDelay); + assert reconnectTimerId == null; + reconnectTimerId = vertx.setTimer(reconnectDelay, this::onReconnectTimeout); reconnectDelay = 2 * reconnectDelay; if (reconnectDelay > MAX_RECONNECT_DELAY) { reconnectDelay = MAX_RECONNECT_DELAY; } } - private void scheduleReconnect() { - log.info("Scheduling reconnect in {} msec", reconnectDelay); - assert reconnectTimerId == null; - reconnectTimerId = vertx.setTimer(reconnectDelay, this::reconnect); + private void stopReconnectTimer() { + stopTimer(reconnectTimerId ); + reconnectTimerId = null; } - private void reconnect(Long timerId) { - if (shutdown) { - log.info("Timer ignored due to shutdown"); - return; + private void stopTimer(Long timerId) { + if (timerId != null) { + vertx.cancelTimer(timerId); } - if (!timerId.equals(reconnectTimerId)) { - log.error("Unexpected timer ignored"); - return; + } + + private void onPong(Void ignore) { + log.debug("Pong received!"); + reset(); + } + + private void setState(State state, String event) { + log.info("Transition {} =={}==> {}", this.state, event, state); + this.state = state; + } + + private void disconnectClient() { + try { + client.disconnect(); + } catch (RuntimeException e) { + log.debug("Exception disconnecting client", e); } - log.info("Reconnecting ..."); - reconnectTimerId = null; - reconnectPending = false; - Future f = Future.future(); - connect(f); } }