From 65ed750382041a7cecbd4bae126a0504521a6de8 Mon Sep 17 00:00:00 2001 From: mapuo Date: Thu, 4 Jul 2019 17:55:12 +0300 Subject: [PATCH 1/2] on publish exception answer the message --- .../nynja/bridge/verticle/MQTTVerticle.java | 39 ++++++++++--------- 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index ab0e871..eb6da2b 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -127,7 +127,24 @@ public class MQTTVerticle extends AbstractVerticle { log.info("Publish message with model {}", model.toLogString()); Future publishFuture = Future.future(); - publishMqttMessage(publishFuture, model); + + try { + byte[] publishBytes = model.getBertFormat(); + log.trace("publishBytes: {}", Arrays.toString(publishBytes)); + + client.publish( + configuration.getTopic(), + Buffer.buffer(publishBytes), + MqttQoS.EXACTLY_ONCE, + false, + false, + event -> { + publishFuture.complete(); + }); + } catch (RuntimeException e) { + message.reply(false); + return; + } Future callBackFuture = Future.future(); // because if we added after publishing, we can have race condition (event loop flow) @@ -142,8 +159,9 @@ public class MQTTVerticle extends AbstractVerticle { return; } - callBackFuture.setHandler(result -> { - message.reply(result.result()); + callBackFuture.setHandler(asyncResult -> { + log.debug("asyncResult: {}", asyncResult); + message.reply(asyncResult.result()); }); vertx.setTimer(configuration.getMessageResponseTimeout(), timer -> { @@ -158,21 +176,6 @@ public class MQTTVerticle extends AbstractVerticle { }); } - private void publishMqttMessage(Future complete, MQTTModelBase mqttModelBase) { - byte[] publishBytes = mqttModelBase.getBertFormat(); - log.trace("publishBytes: {}", Arrays.toString(publishBytes)); - - client.publish( - configuration.getTopic(), - Buffer.buffer(publishBytes), - MqttQoS.EXACTLY_ONCE, - false, - false, - event -> { - complete.complete(); - }); - } - private void initSubscriber() { client.publishHandler(publish -> { final int messageId = publish.messageId(); -- GitLab From 07966323c5d5d2cdf959fac51a34a4c7eda67aaf Mon Sep 17 00:00:00 2001 From: mapuo Date: Thu, 4 Jul 2019 18:28:58 +0300 Subject: [PATCH 2/2] remove unused code add error log on publish exception --- .../java/biz/nynja/bridge/verticle/MQTTVerticle.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index eb6da2b..20503e4 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -69,14 +69,6 @@ public class MQTTVerticle extends AbstractVerticle { circuitBreaker = CircuitBreaker.create("reconnect", vertx, options); client = MqttClient.create(vertx, clientOptions); - client.subscribeCompletionHandler(h -> { - log.info("Receive SUBACK from server with granted QoS: {}", h.grantedQoSLevels()); - }); - client.unsubscribeCompletionHandler(h -> { - log.debug("Receive UNSUBACK from server"); - vertx.setTimer(DISCONNECT_DELAY, l -> - client.disconnect(d -> log.error("Disconnected form server"))); - }); client.pingResponseHandler(event -> log.debug("Pong received!")); client.closeHandler(event -> { log.warn("Connection lost!"); @@ -142,6 +134,7 @@ public class MQTTVerticle extends AbstractVerticle { publishFuture.complete(); }); } catch (RuntimeException e) { + log.error("Error publishing message!", e); message.reply(false); return; } -- GitLab