diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index 8610832ec57fd5da8d6fd06cf92489353f698729..04c261497d027f03dda93270266dfd553e385dbe 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -4,8 +4,6 @@ import biz.nynja.bridge.bert.OtpErlangParser; import biz.nynja.bridge.model.MQTTModelBase; import biz.nynja.bridge.properties.MqttConfiguration; import io.netty.handler.codec.mqtt.MqttQoS; -import io.vertx.circuitbreaker.CircuitBreaker; -import io.vertx.circuitbreaker.CircuitBreakerOptions; import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; @@ -13,103 +11,50 @@ import io.vertx.core.eventbus.Message; import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; import io.vertx.mqtt.messages.MqttPublishMessage; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.slf4j.Logger; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import java.util.function.BiConsumer; -import java.util.function.Function; @Slf4j @Component public class MQTTVerticle extends AbstractVerticle { - public static final String MQTT_BUS = "mqtt-bus"; - private static final int RECONNECT_DELAY = 5000; + static final String MQTT_BUS = "mqtt-bus"; private final MqttConfiguration configuration; private final ConcurrentHashMap pendingRequests; private MqttClient client; - private MqttClientOptions clientOptions; - private CircuitBreaker circuitBreaker; + private MqttConnector connector; public MQTTVerticle(MqttConfiguration configuration) { this.configuration = configuration; log.info("Configuration: {}", this.configuration); pendingRequests = new ConcurrentHashMap<>(); - boolean keepAliveEnabled = this.configuration.getKeepAliveInterval() > 10; - clientOptions = new MqttClientOptions() + } + + @Override + public void start(Future startFuture) { + boolean keepAliveEnabled = configuration.getKeepAliveInterval() > 10; + MqttClientOptions clientOptions = new MqttClientOptions() .setAutoKeepAlive(keepAliveEnabled) .setUsername(this.configuration.getUsername()) .setKeepAliveTimeSeconds(this.configuration.getKeepAliveInterval()) - .setCleanSession(false) + .setCleanSession(true) .setClientId(this.configuration.getUniqueClientId()); - } - - @Override - public void start(Future startFuture) - throws Exception { - - initMqttClient(); - vertx.eventBus().consumer(MQTT_BUS, this::processBusMessage); - client.publishHandler(this::processMqttMessage); - connectMqtt(startFuture); - } - - private void initMqttClient() { - CircuitBreakerOptions options = new CircuitBreakerOptions() - .setMaxFailures(5) - .setTimeout(15000) - .setMaxRetries(10) - .setResetTimeout(30000); - circuitBreaker = CircuitBreaker - .create("reconnect", vertx, options) - .retryPolicy(integer -> { - log.debug("integer: {}", integer); - long delay = ((integer + 1) * RECONNECT_DELAY); - log.info("Reconnecting in: {}ms", delay); - return delay; - }); - client = MqttClient.create(vertx, clientOptions); - client.pingResponseHandler(event -> log.debug("Pong received!")); - client.closeHandler(event -> { - log.warn("Connection lost!"); - circuitBreaker.execute(this::connectMqtt); - }); + client.publishHandler(this::processMqttMessage); + connector = new MqttConnector(vertx, client, configuration); + vertx.eventBus().consumer(MQTT_BUS, this::processBusMessage); + connector.start(startFuture); } - private void connectMqtt(Future future) { - log.debug("Connecting to server '{}@{}:{}'...", - configuration.getUniqueClientId(), configuration.getHost(), configuration.getPort()); - - vertx.executeBlocking( - event -> { - client.connect(configuration.getPort(), configuration.getHost(), ch -> { - if (ch.succeeded()) { - log.debug("Connected to a server"); - event.complete(); - } else { - log.error("Failed to connect to a server!", ch.cause()); - event.fail(ch.cause()); - } - }); - }, - result -> { - if (result.succeeded()) { - log.trace("calling future.complete"); - future.complete(); - } else { - log.trace("calling future.fail"); - future.fail(result.cause()); - } - }); - + @Override + public void stop() { + connector.stop(); } private void processBusMessage(Message busMessage) { @@ -132,9 +77,9 @@ public class MQTTVerticle extends AbstractVerticle { PendingRequest pendingRequest = new PendingRequest(currentTimeMillis, callBackFuture); pendingRequests.put(modelId, pendingRequest); - vertx.setTimer(configuration.getMessageResponseTimeout(), timer -> { - respond(modelId, Boolean.FALSE, "timeout", false); - }); + vertx.setTimer( + configuration.getMessageResponseTimeout(), + timer -> respond(modelId, Boolean.FALSE, "timeout", false)); try { byte[] publishBytes = model.getBertFormat(); @@ -143,7 +88,7 @@ public class MQTTVerticle extends AbstractVerticle { client.publish( configuration.getTopic(), Buffer.buffer(publishBytes), - MqttQoS.EXACTLY_ONCE, + MqttQoS.AT_LEAST_ONCE, false, false, event -> { @@ -185,6 +130,9 @@ public class MQTTVerticle extends AbstractVerticle { } private void processMqttMessage(MqttPublishMessage mqttMessage) { + // received something, so the connection is good and we can reset connector delays + connector.reset(); + final int messageId = mqttMessage.messageId(); log.info("[messageId:{}] Received on [topic:{}] with QoS [qos:{}]", @@ -207,16 +155,4 @@ public class MQTTVerticle extends AbstractVerticle { respond(id, !parse.isErrorMessage(), "erlang response", true); } - @RequiredArgsConstructor - private enum LogLevel { - INFO(l -> l::info), - ERROR(l -> l::error); - - private final Function> function; - - public void log(Logger logger, String message, Object... objects) { - function.apply(logger).accept(message, objects); - } - } - } diff --git a/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java b/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java new file mode 100644 index 0000000000000000000000000000000000000000..59532636dd41631055acf4c0c17044e3348c18c1 --- /dev/null +++ b/src/main/java/biz/nynja/bridge/verticle/MqttConnector.java @@ -0,0 +1,127 @@ +package biz.nynja.bridge.verticle; + +import biz.nynja.bridge.properties.MqttConfiguration; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.mqtt.MqttClient; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +class MqttConnector { + + private static final int MAX_RECONNECT_DELAY = 20000; + private static final int INITIAL_RECONNECT_DELAY = 100; + + private final Vertx vertx; + private final MqttClient client; + private final MqttConfiguration configuration; + private int reconnectDelay; + private Long reconnectTimerId; + private boolean shutdown; + private boolean reconnectPending; + + MqttConnector(Vertx vertx, MqttClient client, MqttConfiguration configuration) { + this.vertx = vertx; + this.client = client; + this.configuration = configuration; + this.client.pingResponseHandler(this::onPong); + this.client.closeHandler(this::onClose); + this.reconnectTimerId = null; + this.shutdown = false; + this.reconnectPending = false; + reset(); + } + + void start(Future future) { + // wait some time for the initial connect and then let then reconnect run in background + Future connectFuture = Future.future(); + connectFuture.setHandler(event -> { + if (!future.isComplete()) { + future.complete(); + } + }); + vertx.setTimer(5000, timerId -> { + if (!future.isComplete()) { + future.complete(); + } + }); + connect(connectFuture); + } + + void stop() { + shutdown = true; + if (reconnectTimerId != null) { + vertx.cancelTimer(reconnectTimerId); + } + client.disconnect(); + } + + void reset() { + reconnectDelay = INITIAL_RECONNECT_DELAY; + } + + private void connect(Future future) { + log.debug("Connecting '{}@{}:{}'...", + configuration.getUniqueClientId(), configuration.getHost(), configuration.getPort()); + + client.connect(configuration.getPort(), configuration.getHost(), ch -> { + if (ch.succeeded()) { + log.debug("Connected"); + future.complete(); + } else { + log.error("Failed to connect", ch.cause()); + checkReconnect(); + future.fail(ch.cause()); + } + }); + } + + private void onPong(Void ignore) { + log.debug("Pong received!"); + reset(); + } + + private void onClose(Void ignore) { + log.error("Connection lost!"); + checkReconnect(); + } + + private void checkReconnect() { + if (shutdown) { + log.info("No reconnect due to shutdown"); + return; + } + if (reconnectPending) { + log.warn("Reconnect already pending"); + return; + } + reconnectPending = true; + scheduleReconnect(); + reconnectDelay = 2 * reconnectDelay; + if (reconnectDelay > MAX_RECONNECT_DELAY) { + reconnectDelay = MAX_RECONNECT_DELAY; + } + } + + private void scheduleReconnect() { + log.info("Scheduling reconnect in {} msec", reconnectDelay); + assert reconnectTimerId == null; + reconnectTimerId = vertx.setTimer(reconnectDelay, this::reconnect); + } + + private void reconnect(Long timerId) { + if (shutdown) { + log.info("Timer ignored due to shutdown"); + return; + } + if (!timerId.equals(reconnectTimerId)) { + log.error("Unexpected timer ignored"); + return; + } + log.info("Reconnecting ..."); + reconnectTimerId = null; + reconnectPending = false; + Future f = Future.future(); + connect(f); + } +}