diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index 86252da1e679a275ab20f0b4e4c1484a90d0b26c..8610832ec57fd5da8d6fd06cf92489353f698729 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -1,7 +1,6 @@ package biz.nynja.bridge.verticle; import biz.nynja.bridge.bert.OtpErlangParser; -import biz.nynja.bridge.cache.DataEventsStatus; import biz.nynja.bridge.model.MQTTModelBase; import biz.nynja.bridge.properties.MqttConfiguration; import io.netty.handler.codec.mqtt.MqttQoS; @@ -10,8 +9,10 @@ import io.vertx.circuitbreaker.CircuitBreakerOptions; import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; +import io.vertx.core.eventbus.Message; import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; +import io.vertx.mqtt.messages.MqttPublishMessage; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; @@ -31,7 +32,7 @@ public class MQTTVerticle extends AbstractVerticle { private static final int RECONNECT_DELAY = 5000; private final MqttConfiguration configuration; - private final ConcurrentHashMap dataEventsStatusCache; + private final ConcurrentHashMap pendingRequests; private MqttClient client; private MqttClientOptions clientOptions; @@ -40,7 +41,7 @@ public class MQTTVerticle extends AbstractVerticle { public MQTTVerticle(MqttConfiguration configuration) { this.configuration = configuration; log.info("Configuration: {}", this.configuration); - dataEventsStatusCache = new ConcurrentHashMap<>(); + pendingRequests = new ConcurrentHashMap<>(); boolean keepAliveEnabled = this.configuration.getKeepAliveInterval() > 10; clientOptions = new MqttClientOptions() .setAutoKeepAlive(keepAliveEnabled) @@ -55,8 +56,8 @@ public class MQTTVerticle extends AbstractVerticle { throws Exception { initMqttClient(); - initBus(); - initSubscriber(); + vertx.eventBus().consumer(MQTT_BUS, this::processBusMessage); + client.publishHandler(this::processMqttMessage); connectMqtt(startFuture); } @@ -111,104 +112,99 @@ public class MQTTVerticle extends AbstractVerticle { } - private void initBus() { - vertx.eventBus().consumer(MQTT_BUS, message -> { - MQTTModelBase model = message.body(); - - log.info("Publish message with model {}", model.toLogString()); - - Future publishFuture = Future.future(); - - try { - byte[] publishBytes = model.getBertFormat(); - log.trace("publishBytes: {}", Arrays.toString(publishBytes)); - - client.publish( - configuration.getTopic(), - Buffer.buffer(publishBytes), - MqttQoS.EXACTLY_ONCE, - false, - false, - event -> { - publishFuture.complete(); - }); - } catch (RuntimeException e) { - log.error("Error publishing message!", e); - message.reply(false); - return; + private void processBusMessage(Message busMessage) { + final MQTTModelBase model = busMessage.body(); + final String modelId = model.getId().toString(); + + log.info("Publish message with model {}", model.toLogString()); + + Future callBackFuture = Future.future(); + callBackFuture.setHandler(asyncResult -> { + if (asyncResult.succeeded()) { + busMessage.reply(asyncResult.result()); + } else { + log.error("Callback failure for [id:{}]", modelId, asyncResult.cause()); + busMessage.reply(false); } + }); - Future callBackFuture = Future.future(); - // because if we added after publishing, we can have race condition (event loop flow) - long currentTimeMillis = System.currentTimeMillis(); - DataEventsStatus dataEventsStatus = new DataEventsStatus( - currentTimeMillis, callBackFuture, DataEventsStatus.DataStatus.SENT); - dataEventsStatusCache.put(model.getId().toString(), dataEventsStatus); - - publishFuture.setHandler(booleanAsyncResult -> { - if (booleanAsyncResult.failed()) { - message.reply(false); - return; - } - - callBackFuture.setHandler(asyncResult -> { - log.debug("asyncResult: {}", asyncResult); - message.reply(asyncResult.result()); - }); + long currentTimeMillis = System.currentTimeMillis(); + PendingRequest pendingRequest = new PendingRequest(currentTimeMillis, callBackFuture); + pendingRequests.put(modelId, pendingRequest); + + vertx.setTimer(configuration.getMessageResponseTimeout(), timer -> { + respond(modelId, Boolean.FALSE, "timeout", false); + }); - vertx.setTimer(configuration.getMessageResponseTimeout(), timer -> { - if (!callBackFuture.isComplete()) { - dataEventsStatusCache.remove(model.getId().toString()); - log.warn("Callback future for [id:{}] not complete after {} milliseconds", - model.getId(), configuration.getMessageResponseTimeout()); - callBackFuture.complete(false); + try { + byte[] publishBytes = model.getBertFormat(); + log.trace("publishBytes: {}", Arrays.toString(publishBytes)); + + client.publish( + configuration.getTopic(), + Buffer.buffer(publishBytes), + MqttQoS.EXACTLY_ONCE, + false, + false, + event -> { + if (event.failed()) { + log.error("Async failure publishing message!", event.cause()); + respondFailure(modelId, "async publish"); } }); - }); - }); + } catch (RuntimeException e) { + log.error("Exception publishing message!", e); + respondFailure(modelId, "sync publish"); + } } - private void initSubscriber() { - client.publishHandler(publish -> { - final int messageId = publish.messageId(); - - log.info("[messageId:{}] Received on [topic:{}] with QoS [qos:{}]", - messageId, publish.topicName(), publish.qosLevel()); + private void respondFailure(String id, String reason) { + respond(id, Boolean.FALSE, reason, true); + } - byte[] responseBytes = publish.payload().getBytes(); - log.trace("responseBytes: {}", Arrays.toString(responseBytes)); + private void respond(String id, Boolean result, String reason, boolean logMissing) { + PendingRequest pendingRequest = pendingRequests.remove(id); + if (Objects.isNull(pendingRequest)) { + if (logMissing) { + log.warn("Callback for message [id:{}]: MISSING. Result: {} {}", + id, result, result ? "" : reason); + } + return; + } + if (pendingRequest.getCallback().isComplete()) { + log.error("Callback for message [id:{}]: ALREADY COMPLETE. Result: {} {}", + id, result, result ? "" : reason); + return; + } + if (Boolean.TRUE.equals(result)) { + log.info("Callback for message [id:{}]: OK", id); + } else { + log.error("Callback for message [id:{}]: FAILURE: {}", id, reason); + } + pendingRequest.getCallback().complete(result); + } - final OtpErlangParser parse = OtpErlangParser.parse(responseBytes); + private void processMqttMessage(MqttPublishMessage mqttMessage) { + final int messageId = mqttMessage.messageId(); - if (parse == null) { - log.error("[messageId:{}] Could not read response!", messageId); - return; - } + log.info("[messageId:{}] Received on [topic:{}] with QoS [qos:{}]", + messageId, mqttMessage.topicName(), mqttMessage.qosLevel()); - log.debug("[messageId:{}] response: {}", - messageId, parse.getTuple()); + byte[] responseBytes = mqttMessage.payload().getBytes(); + log.trace("responseBytes: {}", Arrays.toString(responseBytes)); - String id = parse.getId(); - boolean isErrorResponse = parse.isErrorMessage(); - LogLevel logLevel = isErrorResponse ? LogLevel.ERROR : LogLevel.INFO; + final OtpErlangParser parse = OtpErlangParser.parse(responseBytes); - DataEventsStatus eventStatus = dataEventsStatusCache.remove(id); + if (parse == null) { + log.error("[messageId:{}] Could not read response!", messageId); + return; + } - if (Objects.isNull(eventStatus)) { - logLevel.log(log, "[messageId:{}] Response from erlang [id:{}] without duplicating in cache", - messageId, id); - return; - } - if (eventStatus.getCallBack().isComplete()) { - logLevel.log(log, "[messageId:{}] Response from erlang [id:{}]. Future is complete ", - messageId, id); - return; - } - logLevel.log(log, "[messageId:{}] Calling callback for [id:{}] with result = {}", - messageId, id, !isErrorResponse); + log.debug("[messageId:{}] response: {}", + messageId, parse.getTuple()); - eventStatus.getCallBack().complete(!isErrorResponse); - }); + String id = parse.getId(); + respond(id, !parse.isErrorMessage(), "erlang response", true); } @RequiredArgsConstructor diff --git a/src/main/java/biz/nynja/bridge/cache/DataEventsStatus.java b/src/main/java/biz/nynja/bridge/verticle/PendingRequest.java similarity index 51% rename from src/main/java/biz/nynja/bridge/cache/DataEventsStatus.java rename to src/main/java/biz/nynja/bridge/verticle/PendingRequest.java index 09ae9b4f6484bc8f8d5662ed9b6906b948d71b57..13f96d3aa4e2c7a08f8a7a7fbac4016a3c435515 100644 --- a/src/main/java/biz/nynja/bridge/cache/DataEventsStatus.java +++ b/src/main/java/biz/nynja/bridge/verticle/PendingRequest.java @@ -1,4 +1,4 @@ -package biz.nynja.bridge.cache; +package biz.nynja.bridge.verticle; import io.vertx.core.Future; import lombok.AllArgsConstructor; @@ -10,17 +10,9 @@ import lombok.Setter; @Getter @EqualsAndHashCode @AllArgsConstructor -public class DataEventsStatus { - +public class PendingRequest { private long timestamp; - private Future callBack; - - @Setter - private DataStatus status; - + private Future callback; - public enum DataStatus { - SENT, FAILED, SUCCESS - } }