diff --git a/src/main/java/biz/nynja/bridge/properties/MqttConfiguration.java b/src/main/java/biz/nynja/bridge/properties/MqttConfiguration.java index 88cffa0007dd27c6a6bbcecd7c2abf695af63424..46504c9fcdc8c004e88dd4c1573c27ffef24b00e 100644 --- a/src/main/java/biz/nynja/bridge/properties/MqttConfiguration.java +++ b/src/main/java/biz/nynja/bridge/properties/MqttConfiguration.java @@ -6,6 +6,8 @@ import lombok.ToString; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; +import java.util.UUID; + @ToString @Getter @Setter @@ -16,8 +18,36 @@ public class MqttConfiguration { private String host; private int port; private String clientId; + private String clientIdSuffix; + private String username; private String topic; private int keepAliveInterval; private long messageResponseTimeout; + public String getUniqueClientId() { + return clientId + "_" + sanitizeSuffix(clientIdSuffix); + } + + private String sanitizeSuffix(String suffix) { + if (suffix.contains("bridge-service-")) { + return suffix.replace("bridge-service-", "") + .replace("-", "_"); + } else if (isUuid(suffix)) { + return suffix.replace("-", ""); + } + + return suffix; + } + + private boolean isUuid(String suffix) { + try { + UUID uuid = UUID.fromString(suffix); + return true; + } catch (IllegalArgumentException e) { + // Nothing to do + } + + return false; + } + } diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index 820409b50f40d60e7fe4b4cd56aa1f1a5f4207b3..86252da1e679a275ab20f0b4e4c1484a90d0b26c 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -19,7 +19,6 @@ import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.Objects; -import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import java.util.function.Function; @@ -29,7 +28,6 @@ import java.util.function.Function; public class MQTTVerticle extends AbstractVerticle { public static final String MQTT_BUS = "mqtt-bus"; - private static final int DISCONNECT_DELAY = 5000; private static final int RECONNECT_DELAY = 5000; private final MqttConfiguration configuration; @@ -43,13 +41,13 @@ public class MQTTVerticle extends AbstractVerticle { this.configuration = configuration; log.info("Configuration: {}", this.configuration); dataEventsStatusCache = new ConcurrentHashMap<>(); - boolean keepAliveEnabled = configuration.getKeepAliveInterval() > 10; + boolean keepAliveEnabled = this.configuration.getKeepAliveInterval() > 10; clientOptions = new MqttClientOptions() .setAutoKeepAlive(keepAliveEnabled) - .setUsername("micro") - .setKeepAliveTimeSeconds(configuration.getKeepAliveInterval()) + .setUsername(this.configuration.getUsername()) + .setKeepAliveTimeSeconds(this.configuration.getKeepAliveInterval()) .setCleanSession(false) - .setClientId("sys_micro_bridge_" + UUID.randomUUID().toString().replace("-", "")); + .setClientId(this.configuration.getUniqueClientId()); } @Override @@ -86,15 +84,14 @@ public class MQTTVerticle extends AbstractVerticle { } private void connectMqtt(Future future) { - log.debug("Connecting to server '{}:{}'...", - configuration.getHost(), configuration.getPort()); + log.debug("Connecting to server '{}@{}:{}'...", + configuration.getUniqueClientId(), configuration.getHost(), configuration.getPort()); vertx.executeBlocking( event -> { client.connect(configuration.getPort(), configuration.getHost(), ch -> { if (ch.succeeded()) { log.debug("Connected to a server"); - // client.subscribe(configuration.getTopic(), 0); event.complete(); } else { log.error("Failed to connect to a server!", ch.cause()); diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index ed2c534d564a40248478355f0226de16039c8415..12e0f29c5ce77851c6e6f71f372a316eb3e3c580 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -15,6 +15,8 @@ mqtt: host: nynja-dev-uw1-messaging01.dev-eu.nynja.net port: 1883 clientId: sys_micro_bridge_dev + clientIdSuffix: ${MQTT_CLIENT_ID:${HOSTNAME:${random.uuid}}} + username: micro topic: events/1//api/anon// keepAliveInterval: 20 messageResponseTimeout: 2000 diff --git a/src/main/resources/application-production.yml b/src/main/resources/application-production.yml index ca3cd4f6e3a2d647e274af5d0e8ac53dcd36ee73..1926443ec48f0506b326993db8e2b6c9766b8821 100644 --- a/src/main/resources/application-production.yml +++ b/src/main/resources/application-production.yml @@ -11,9 +11,11 @@ grpc: port: ${GRPC_SERVER_PORT:6570} mqtt: - host: ${ERLANG_HOST:35.234.110.93} + host: ${ERLANG_HOST:nynja-staging-ew3-messaging01.staging.nynja.net} port: ${ERLANG_PORT:1883} - clientId: ${MQTT_CLIENT_ID:sys_micro_bridge} + clientId: sys_micro_bridge + clientIdSuffix: ${MQTT_CLIENT_ID:${HOSTNAME:${random.uuid}}} + username: ${MQTT_USERNAME:micro} topic: ${MQTT_TOPIC:events/1//api/anon//} keepAliveInterval: ${MQTT_KEEP_ALIVE:20} messageResponseTimeout: ${MQTT_RESP_WAIT:2000}