From 0e110871e38960fecd81da27ac38dd35a31c5533 Mon Sep 17 00:00:00 2001 From: Ilia Krustev Date: Sun, 28 Jul 2019 22:07:41 +0300 Subject: [PATCH 1/4] Refactor reconnect. --- .../nynja/bridge/verticle/MQTTVerticle.java | 106 ++++------------ .../nynja/bridge/verticle/MqttConnector.java | 114 ++++++++++++++++++ 2 files changed, 135 insertions(+), 85 deletions(-) create mode 100644 src/main/java/biz/nynja/bridge/verticle/MqttConnector.java diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index 8610832..09e9839 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -4,8 +4,6 @@ import biz.nynja.bridge.bert.OtpErlangParser; import biz.nynja.bridge.model.MQTTModelBase; import biz.nynja.bridge.properties.MqttConfiguration; import io.netty.handler.codec.mqtt.MqttQoS; -import io.vertx.circuitbreaker.CircuitBreaker; -import io.vertx.circuitbreaker.CircuitBreakerOptions; import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; @@ -13,106 +11,56 @@ import io.vertx.core.eventbus.Message; import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; import io.vertx.mqtt.messages.MqttPublishMessage; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.slf4j.Logger; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.BiConsumer; -import java.util.function.Function; @Slf4j @Component public class MQTTVerticle extends AbstractVerticle { - public static final String MQTT_BUS = "mqtt-bus"; - private static final int RECONNECT_DELAY = 5000; + static final String MQTT_BUS = "mqtt-bus"; private final MqttConfiguration configuration; private final ConcurrentHashMap pendingRequests; private MqttClient client; - private MqttClientOptions clientOptions; - private CircuitBreaker circuitBreaker; + private MqttConnector connector; public MQTTVerticle(MqttConfiguration configuration) { this.configuration = configuration; log.info("Configuration: {}", this.configuration); pendingRequests = new ConcurrentHashMap<>(); - boolean keepAliveEnabled = this.configuration.getKeepAliveInterval() > 10; - clientOptions = new MqttClientOptions() + } + + @Override + public void start(Future startFuture) { + boolean keepAliveEnabled = configuration.getKeepAliveInterval() > 10; + MqttClientOptions clientOptions = new MqttClientOptions() .setAutoKeepAlive(keepAliveEnabled) .setUsername(this.configuration.getUsername()) .setKeepAliveTimeSeconds(this.configuration.getKeepAliveInterval()) .setCleanSession(false) .setClientId(this.configuration.getUniqueClientId()); - } - - @Override - public void start(Future startFuture) - throws Exception { - - initMqttClient(); - vertx.eventBus().consumer(MQTT_BUS, this::processBusMessage); - client.publishHandler(this::processMqttMessage); - connectMqtt(startFuture); - } - - private void initMqttClient() { - CircuitBreakerOptions options = new CircuitBreakerOptions() - .setMaxFailures(5) - .setTimeout(15000) - .setMaxRetries(10) - .setResetTimeout(30000); - circuitBreaker = CircuitBreaker - .create("reconnect", vertx, options) - .retryPolicy(integer -> { - log.debug("integer: {}", integer); - long delay = ((integer + 1) * RECONNECT_DELAY); - log.info("Reconnecting in: {}ms", delay); - return delay; - }); - client = MqttClient.create(vertx, clientOptions); - client.pingResponseHandler(event -> log.debug("Pong received!")); - client.closeHandler(event -> { - log.warn("Connection lost!"); - circuitBreaker.execute(this::connectMqtt); - }); + client.publishHandler(this::processMqttMessage); + connector = new MqttConnector(vertx, client, configuration); + vertx.eventBus().consumer(MQTT_BUS, this::processBusMessage); + connector.start(startFuture); } - private void connectMqtt(Future future) { - log.debug("Connecting to server '{}@{}:{}'...", - configuration.getUniqueClientId(), configuration.getHost(), configuration.getPort()); - - vertx.executeBlocking( - event -> { - client.connect(configuration.getPort(), configuration.getHost(), ch -> { - if (ch.succeeded()) { - log.debug("Connected to a server"); - event.complete(); - } else { - log.error("Failed to connect to a server!", ch.cause()); - event.fail(ch.cause()); - } - }); - }, - result -> { - if (result.succeeded()) { - log.trace("calling future.complete"); - future.complete(); - } else { - log.trace("calling future.fail"); - future.fail(result.cause()); - } - }); - + @Override + public void stop() { + connector.stop(); } private void processBusMessage(Message busMessage) { + // received something, so the connection is good and we can reset connector delays + connector.reset(); + final MQTTModelBase model = busMessage.body(); final String modelId = model.getId().toString(); @@ -132,9 +80,9 @@ public class MQTTVerticle extends AbstractVerticle { PendingRequest pendingRequest = new PendingRequest(currentTimeMillis, callBackFuture); pendingRequests.put(modelId, pendingRequest); - vertx.setTimer(configuration.getMessageResponseTimeout(), timer -> { - respond(modelId, Boolean.FALSE, "timeout", false); - }); + vertx.setTimer( + configuration.getMessageResponseTimeout(), + timer -> respond(modelId, Boolean.FALSE, "timeout", false)); try { byte[] publishBytes = model.getBertFormat(); @@ -207,16 +155,4 @@ public class MQTTVerticle extends AbstractVerticle { respond(id, !parse.isErrorMessage(), "erlang response", true); } - @RequiredArgsConstructor - private enum LogLevel { - INFO(l -> l::info), - ERROR(l -> l::error); - - private final Function> function; - - public void log(Logger logger, String message, Object... objects) { - function.apply(logger).accept(message, objects); - } - } - } diff --git a/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java b/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java new file mode 100644 index 0000000..b73cb93 --- /dev/null +++ b/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java @@ -0,0 +1,114 @@ +package biz.nynja.bridge.verticle; + +import biz.nynja.bridge.properties.MqttConfiguration; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.mqtt.MqttClient; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +class MqttConnector { + + private static final int MAX_RECONNECT_DELAY = 20000; + private static final int INITIAL_RECONNECT_DELAY = 100; + + private final Vertx vertx; + private final MqttClient client; + private final MqttConfiguration configuration; + private int reconnectDelay; + private Long reconnectTimerId; + private boolean shutdown; + + MqttConnector(Vertx vertx, MqttClient client, MqttConfiguration configuration) { + this.vertx = vertx; + this.client = client; + this.configuration = configuration; + this.client.pingResponseHandler(this::onPong); + this.client.closeHandler(this::onClose); + this.reconnectTimerId = null; + this.shutdown = false; + reset(); + } + + void start(Future future) { + // wait some time for the initial connect and then let then reconnect run in background + Future connectFuture = Future.future(); + connectFuture.setHandler(event -> { + if (!future.isComplete()) { + future.complete(); + } + }); + vertx.setTimer(5000, timerId -> { + if (!future.isComplete()) { + future.complete(); + } + }); + connect(connectFuture); + } + + void stop() { + shutdown = true; + if (reconnectTimerId != null) { + vertx.cancelTimer(reconnectTimerId); + } + client.disconnect(); + } + + void reset() { + reconnectDelay = INITIAL_RECONNECT_DELAY; + } + + private void connect(Future future) { + log.debug("Connecting '{}@{}:{}'...", + configuration.getUniqueClientId(), configuration.getHost(), configuration.getPort()); + + client.connect(configuration.getPort(), configuration.getHost(), ch -> { + if (ch.succeeded()) { + log.debug("Connected"); + future.complete(); + } else { + log.error("Failed to connect", ch.cause()); + future.fail(ch.cause()); + } + }); + } + + private void onPong(Void ignore) { + log.debug("Pong received!"); + reset(); + } + + private void onClose(Void ignore) { + log.error("Connection lost!"); + if (shutdown) { + log.info("No reconnect due to shutdown"); + return; + } + scheduleReconnect(); + reconnectDelay = 2 * reconnectDelay; + if (reconnectDelay > MAX_RECONNECT_DELAY) { + reconnectDelay = MAX_RECONNECT_DELAY; + } + } + + private void scheduleReconnect() { + log.info("Scheduling reconnect in {} msec", reconnectDelay); + assert reconnectTimerId == null; + reconnectTimerId = vertx.setTimer(reconnectDelay, this::reconnect); + } + + private void reconnect(Long timerId) { + if (shutdown) { + log.info("Timer ignored due to shutdown"); + return; + } + if (!timerId.equals(reconnectTimerId)) { + log.error("Unexpected timer ignored"); + return; + } + log.info("Reconnecting ..."); + reconnectTimerId = null; + Future f = Future.future(); + connect(f); + } +} -- GitLab From bfc2b49cad9b8d1d703f606a2df1146135db21da Mon Sep 17 00:00:00 2001 From: Ilia Krustev Date: Sun, 28 Jul 2019 23:11:42 +0300 Subject: [PATCH 2/4] Fix: connector reset on the wrong place. --- src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index 09e9839..db1cb28 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -58,9 +58,6 @@ public class MQTTVerticle extends AbstractVerticle { } private void processBusMessage(Message busMessage) { - // received something, so the connection is good and we can reset connector delays - connector.reset(); - final MQTTModelBase model = busMessage.body(); final String modelId = model.getId().toString(); @@ -133,6 +130,9 @@ public class MQTTVerticle extends AbstractVerticle { } private void processMqttMessage(MqttPublishMessage mqttMessage) { + // received something, so the connection is good and we can reset connector delays + connector.reset(); + final int messageId = mqttMessage.messageId(); log.info("[messageId:{}] Received on [topic:{}] with QoS [qos:{}]", -- GitLab From c875bf502c7187df5b49c823cce802b1951e456e Mon Sep 17 00:00:00 2001 From: Ilia Krustev Date: Sun, 28 Jul 2019 23:44:22 +0300 Subject: [PATCH 3/4] Fix: initiate reconnect after connect failure --- .../biz/nynja/bridge/verticle/MqttConnector.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java b/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java index b73cb93..5953263 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java +++ b/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java @@ -18,6 +18,7 @@ class MqttConnector { private int reconnectDelay; private Long reconnectTimerId; private boolean shutdown; + private boolean reconnectPending; MqttConnector(Vertx vertx, MqttClient client, MqttConfiguration configuration) { this.vertx = vertx; @@ -27,6 +28,7 @@ class MqttConnector { this.client.closeHandler(this::onClose); this.reconnectTimerId = null; this.shutdown = false; + this.reconnectPending = false; reset(); } @@ -68,6 +70,7 @@ class MqttConnector { future.complete(); } else { log.error("Failed to connect", ch.cause()); + checkReconnect(); future.fail(ch.cause()); } }); @@ -80,10 +83,19 @@ class MqttConnector { private void onClose(Void ignore) { log.error("Connection lost!"); + checkReconnect(); + } + + private void checkReconnect() { if (shutdown) { log.info("No reconnect due to shutdown"); return; } + if (reconnectPending) { + log.warn("Reconnect already pending"); + return; + } + reconnectPending = true; scheduleReconnect(); reconnectDelay = 2 * reconnectDelay; if (reconnectDelay > MAX_RECONNECT_DELAY) { @@ -108,6 +120,7 @@ class MqttConnector { } log.info("Reconnecting ..."); reconnectTimerId = null; + reconnectPending = false; Future f = Future.future(); connect(f); } -- GitLab From d89e853fdb6ab92495f4e693065d9f7ad954115c Mon Sep 17 00:00:00 2001 From: Ilia Krustev Date: Sun, 28 Jul 2019 23:45:48 +0300 Subject: [PATCH 4/4] Use clean session and QoS level 1. --- src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index db1cb28..04c2614 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -43,7 +43,7 @@ public class MQTTVerticle extends AbstractVerticle { .setAutoKeepAlive(keepAliveEnabled) .setUsername(this.configuration.getUsername()) .setKeepAliveTimeSeconds(this.configuration.getKeepAliveInterval()) - .setCleanSession(false) + .setCleanSession(true) .setClientId(this.configuration.getUniqueClientId()); client = MqttClient.create(vertx, clientOptions); client.publishHandler(this::processMqttMessage); @@ -88,7 +88,7 @@ public class MQTTVerticle extends AbstractVerticle { client.publish( configuration.getTopic(), Buffer.buffer(publishBytes), - MqttQoS.EXACTLY_ONCE, + MqttQoS.AT_LEAST_ONCE, false, false, event -> { -- GitLab