From e9de846fd25c5cad6a36cb43ae666166d5099d76 Mon Sep 17 00:00:00 2001 From: mapuo Date: Fri, 19 Apr 2019 15:23:35 +0300 Subject: [PATCH 01/25] fix formatting --- .../bridge/verticle/PrometheusVerticle.java | 56 +++++++++---------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/src/main/java/biz/nynja/bridge/verticle/PrometheusVerticle.java b/src/main/java/biz/nynja/bridge/verticle/PrometheusVerticle.java index 1a3bf2e..86680e0 100644 --- a/src/main/java/biz/nynja/bridge/verticle/PrometheusVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/PrometheusVerticle.java @@ -14,33 +14,33 @@ import org.springframework.stereotype.Component; @Component public class PrometheusVerticle extends AbstractVerticle { - private final MetricsConfiguration configuration; - private HttpServer server; - - public PrometheusVerticle(MetricsConfiguration configuration) { - this.configuration = configuration; - } - - @Override - public void start(Future startFuture) throws Exception { - - HttpServerOptions serverOptions = new HttpServerOptions().setPort(configuration.getPort()); - server = vertx.createHttpServer(serverOptions); - - Router router = Router.router(vertx); - - router.route("/metrics").handler(new MetricsHandler()); - - server.requestHandler(router); - server.listen(handler -> { - if (handler.succeeded()) { - startFuture.complete(); - log.info("Metrics listening on port: {}", server.actualPort()); - } else { - startFuture.fail(handler.cause()); - log.error("Exception starting the Metrics server!", handler.cause()); - } - }); - } + private final MetricsConfiguration configuration; + private HttpServer server; + + public PrometheusVerticle(MetricsConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public void start(Future startFuture) throws Exception { + + HttpServerOptions serverOptions = new HttpServerOptions().setPort(configuration.getPort()); + server = vertx.createHttpServer(serverOptions); + + Router router = Router.router(vertx); + + router.route("/metrics").handler(new MetricsHandler()); + + server.requestHandler(router); + server.listen(handler -> { + if (handler.succeeded()) { + startFuture.complete(); + log.info("Metrics listening on port: {}", server.actualPort()); + } else { + startFuture.fail(handler.cause()); + log.error("Exception starting the Metrics server!", handler.cause()); + } + }); + } } -- GitLab From c52d1f10e449b08dd86e537fb007dae6798a7369 Mon Sep 17 00:00:00 2001 From: Zahari Date: Wed, 24 Apr 2019 12:01:25 +0300 Subject: [PATCH 02/25] In class AccountBridgeClient added main(). --- .../bridge/verticle/AccountBridgeClient.java | 214 +++++++++++++++++- 1 file changed, 210 insertions(+), 4 deletions(-) diff --git a/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java b/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java index 7390f6c..8304fb7 100644 --- a/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java +++ b/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java @@ -6,33 +6,45 @@ import biz.nynja.bridge.grpc.BridgeSuccessResponse; import biz.nynja.bridge.grpc.ProfileData; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.StatusRuntimeException; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; -import static biz.nynja.bridge.grpc.AccountBridgeGrpc.newBlockingStub; import static biz.nynja.bridge.grpc.AccountBridgeGrpc.newFutureStub; import static io.grpc.ManagedChannelBuilder.forAddress; + public class AccountBridgeClient { + private static final Logger logger = Logger.getLogger(AccountBridgeClient.class.getName()); private final AccountBridgeGrpc.AccountBridgeBlockingStub bridgeStub; + private static AccountBridgeGrpc.AccountBridgeBlockingStub blockingStub; + private final ManagedChannel channel; private AccountBridgeGrpc.AccountBridgeFutureStub futureStub; + private final static int GRPC_PORT = 42253; + + public AccountBridgeClient(String host, int port) { - this(forAddress(host, port)); + this(ManagedChannelBuilder.forAddress(host, port).usePlaintext()); } public AccountBridgeClient(ManagedChannelBuilder channelBuilder) { channel = channelBuilder.build(); - bridgeStub = newBlockingStub(channel); + bridgeStub = AccountBridgeGrpc.newBlockingStub(channel); + blockingStub = AccountBridgeGrpc.newBlockingStub(channel); futureStub = newFutureStub(channel); + } public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); - } + } public BridgeSuccessResponse createProfile(ProfileData request) { return bridgeStub.createProfile(request); @@ -54,4 +66,198 @@ public class AccountBridgeClient { return bridgeStub.deleteAccount(request); } + public static void main(String[] args) throws Exception { + + createProfileData(); + deleteProfileData(); + createAccountData(); + updateAccount(); + deleteAccount(); + } + + private static void createProfileData() throws InterruptedException { + + ManagedChannelBuilder channelBuilder = forAddress("localhost", GRPC_PORT).usePlaintext(); + AccountBridgeClient client = new AccountBridgeClient(channelBuilder); + + String profileId = UUID.randomUUID().toString(); + String accountId = UUID.randomUUID().toString(); + String currentTimeMillis = String.valueOf(System.currentTimeMillis()); + ProfileData request = ProfileData.newBuilder() + .setProfileId(profileId) + .setLastUpdateTimestamp(currentTimeMillis) + .setDefaultAccount(AccountData.newBuilder() + .setAccountId(accountId) + .setProfileId(profileId) + .setFirstName("Test") + .setLastName("Mest") + .setUsername("test") + .setLastUpdateTimestamp(currentTimeMillis) + .build()) + .build(); + + try { + blockingStub.createProfile(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); + return; + } + + try { + client.createProfile(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC Response failed: {0}", e.getStatus()); + return; + } + + client.shutdown(); + } + + private static void deleteProfileData() throws InterruptedException { + + ManagedChannelBuilder channelBuilder = forAddress("localhost", GRPC_PORT).usePlaintext(); + AccountBridgeClient client = new AccountBridgeClient(channelBuilder); + + String profileId = UUID.randomUUID().toString(); + String accountId = UUID.randomUUID().toString(); + String currentTimeMillis = String.valueOf(System.currentTimeMillis()); + ProfileData request = ProfileData.newBuilder() + .setProfileId(profileId) + .setLastUpdateTimestamp(currentTimeMillis) + .setDefaultAccount(AccountData.newBuilder() + .setAccountId(accountId) + .setProfileId(profileId) + .setFirstName("Test") + .setLastName("Mest") + .setUsername("test") + .setLastUpdateTimestamp(currentTimeMillis) + .build()) + .build(); + + try { + blockingStub.deleteProfile(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); + return; + } + + try { + client.deleteProfile(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC Response failed: {0}", e.getStatus()); + return; + } + + client.shutdown(); + } + + private static void createAccountData() throws InterruptedException { + + ManagedChannelBuilder channelBuilder = forAddress("localhost", GRPC_PORT).usePlaintext(); + AccountBridgeClient client = new AccountBridgeClient(channelBuilder); + + String profileId = UUID.randomUUID().toString(); + String accountId = UUID.randomUUID().toString(); + String currentTimeMillis = String.valueOf(System.currentTimeMillis()); + AccountData request = AccountData.newBuilder() + .setProfileId(profileId) + .setLastUpdateTimestamp(currentTimeMillis) + .setAccountId(accountId) + .setProfileId(profileId) + .setFirstName("Test") + .setLastName("Mest") + .setUsername("test") + .setLastUpdateTimestamp(currentTimeMillis) + .build(); + + try { + blockingStub.createAccount(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); + return; + } + + try { + client.createAccount(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC Response failed: {0}", e.getStatus()); + return; + } + + client.shutdown(); + } + + private static void updateAccount() throws InterruptedException { + + ManagedChannelBuilder channelBuilder = forAddress("localhost", GRPC_PORT).usePlaintext(); + AccountBridgeClient client = new AccountBridgeClient(channelBuilder); + + String profileId = UUID.randomUUID().toString(); + String accountId = UUID.randomUUID().toString(); + String currentTimeMillis = String.valueOf(System.currentTimeMillis()); + AccountData request = AccountData.newBuilder() + .setProfileId(profileId) + .setLastUpdateTimestamp(currentTimeMillis) + .setAccountId(accountId) + .setProfileId(profileId) + .setFirstName("Test") + .setLastName("Mest") + .setUsername("test") + .setLastUpdateTimestamp(currentTimeMillis) + .build(); + + try { + blockingStub.updateAccount(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); + return; + } + + try { + client.updateAccount(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC Response failed: {0}", e.getStatus()); + return; + } + + client.shutdown(); + } + + private static void deleteAccount() throws InterruptedException { + + ManagedChannelBuilder channelBuilder = forAddress("localhost", GRPC_PORT).usePlaintext(); + AccountBridgeClient client = new AccountBridgeClient(channelBuilder); + + String profileId = UUID.randomUUID().toString(); + String accountId = UUID.randomUUID().toString(); + String currentTimeMillis = String.valueOf(System.currentTimeMillis()); + ProfileData request = ProfileData.newBuilder() + .setProfileId(profileId) + .setLastUpdateTimestamp(currentTimeMillis) + .setDefaultAccount(AccountData.newBuilder() + .setAccountId(accountId) + .setProfileId(profileId) + .setFirstName("Test") + .setLastName("Mest") + .setUsername("test") + .setLastUpdateTimestamp(currentTimeMillis) + .build()) + .build(); + + try { + blockingStub.deleteAccount(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); + return; + } + + try { + client.deleteAccount(request); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC Response failed: {0}", e.getStatus()); + return; + } + + client.shutdown(); + } } -- GitLab From 457915ceb92e3e7c8d73841d901cd24197663e71 Mon Sep 17 00:00:00 2001 From: Zahari Date: Wed, 24 Apr 2019 17:14:50 +0300 Subject: [PATCH 03/25] AccountBridgeClient VM options tested. --- .../java/biz/nynja/bridge/verticle/AccountBridgeClient.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java b/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java index 8304fb7..1938233 100644 --- a/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java +++ b/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java @@ -26,7 +26,7 @@ public class AccountBridgeClient { private final ManagedChannel channel; private AccountBridgeGrpc.AccountBridgeFutureStub futureStub; - private final static int GRPC_PORT = 42253; + private final static int GRPC_PORT = 6580; public AccountBridgeClient(String host, int port) { -- GitLab From 76608a3545b2b94288724e35412b2f5430f8b1dc Mon Sep 17 00:00:00 2001 From: mapuo Date: Mon, 20 May 2019 16:16:13 +0300 Subject: [PATCH 04/25] replace Bert with OTP Jinterface --- Dockerfile | 2 +- pom.xml | 14 +- src/main/java/biz/nynja/bridge/bert/Bert.java | 377 ------------------ .../biz/nynja/bridge/bert/BertException.java | 21 - .../nynja/bridge/bert/OtpErlangBuilder.java | 273 +++++++++++++ .../nynja/bridge/bert/OtpErlangHelper.java | 45 +++ .../java/biz/nynja/bridge/model/Account.java | 90 ++--- .../biz/nynja/bridge/model/MQTTModelBase.java | 4 +- .../java/biz/nynja/bridge/model/Profile.java | 78 ++-- .../biz/nynja/bridge/model/StatusType.java | 8 +- .../properties/ErlangBridgeConfiguration.java | 5 +- .../nynja/bridge/verticle/MQTTVerticle.java | 170 +++++--- .../java/biz/nynja/bridge/test/BertTest.java | 47 --- .../bridge/verticle/AccountBridgeClient.java | 186 +++------ 14 files changed, 554 insertions(+), 766 deletions(-) delete mode 100644 src/main/java/biz/nynja/bridge/bert/Bert.java delete mode 100644 src/main/java/biz/nynja/bridge/bert/BertException.java create mode 100644 src/main/java/biz/nynja/bridge/bert/OtpErlangBuilder.java create mode 100644 src/main/java/biz/nynja/bridge/bert/OtpErlangHelper.java delete mode 100644 src/test/java/biz/nynja/bridge/test/BertTest.java diff --git a/Dockerfile b/Dockerfile index 4b5b6d3..bc6bf03 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM openjdk:10-jdk-slim -ENV JAVA_OPTS=-XX:+UseContainerSupport  +ENV JAVA_OPTS=-XX:+UseContainerSupport EXPOSE 8080 diff --git a/pom.xml b/pom.xml index e98046e..b925894 100644 --- a/pom.xml +++ b/pom.xml @@ -16,16 +16,14 @@ - - jar - org.springframework.boot spring-boot-starter-parent 2.0.3.RELEASE - + + jar bridge-service Bridge microservice @@ -71,7 +69,6 @@ ${vertx.version} - io.vertx vertx-grpc @@ -90,6 +87,12 @@ ${vertx.version} + + org.erlang.otp + jinterface + 1.6.1 + + io.prometheus simpleclient @@ -155,7 +158,6 @@ test - junit junit diff --git a/src/main/java/biz/nynja/bridge/bert/Bert.java b/src/main/java/biz/nynja/bridge/bert/Bert.java deleted file mode 100644 index 3336566..0000000 --- a/src/main/java/biz/nynja/bridge/bert/Bert.java +++ /dev/null @@ -1,377 +0,0 @@ -package biz.nynja.bridge.bert; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.ArrayList; -import java.util.HashMap; - -public class Bert { - - private byte[] mFloatStr = new byte[31]; - private ByteBuffer mBuffer = null; - private ByteArrayOutputStream bao = null; - private Object mValue = null; - - public static class Atom { - public String name; - - public Atom(String name) { - this.name = name; - } - - public int hashCode() { - return name.hashCode(); - } - - public boolean equals(Object obj) { - if (!(obj instanceof Atom)) return false; - return name.compareTo(((Atom) obj).name) == 0; - } - - public String toString() { - return name; - } - } - - public static class Time { - - public Time() { - } - - public Time(long ts) { - timestamp = ts; - - microsecond = (int) ((ts % 1000) * 1000); - second = (int) ((ts / 1000) % 1000000); - megasecond = (int) ((ts / 1000) / 1000000); - } - - public long timestamp = 0; - - public int megasecond = 0; - public int second = 0; - public int microsecond = 0; - } - - public static class Tuple extends ArrayList { - } - - public static class List extends ArrayList { - public boolean isProper = true; - } - - public static class Dict extends HashMap { - } - - public Bert() { - } - - private void writeAtom(Atom a, ByteArrayOutputStream bao) throws BertException { - int len = a.name.length(); - if (len >= 65536) throw new BertException("Atom Name too Long"); - bao.write(100); - bao.write((byte) (len >> 8) & 0x00FF); - bao.write((byte) (len) & 0x00FF); - try { - bao.write(a.name.getBytes("ISO-8859-1")); - } catch (UnsupportedEncodingException ex) { - throw new BertException("ISO 8859-1 is not Supported at Your Java Environment"); - } catch (IOException ex) { - throw new BertException(ex.getMessage()); - } - } - - private void writeTuple(Tuple tuple) throws BertException { - int len = tuple.size(); - - if (len < 256) { - bao.write(104); - bao.write((byte) (len & 0x00FF)); - } else { - bao.write(105); - bao.write((byte) ((len >> 24) & 0x00FF)); - bao.write((byte) ((len >> 16) & 0x00FF)); - bao.write((byte) ((len >> 8) & 0x00FF)); - bao.write((byte) ((len) & 0x00FF)); - } - - for (int count = 0; count < tuple.size(); count++) { - encodeTerm(tuple.get(count)); - } - } - - private void writeList(List list) throws BertException { - int len = list.size(); - - bao.write(108); - bao.write((byte) ((len >> 24) & 0x00FF)); - bao.write((byte) ((len >> 16) & 0x00FF)); - bao.write((byte) ((len >> 8) & 0x00FF)); - bao.write((byte) ((len) & 0x00FF)); - - for (int count = 0; count < list.size(); count++) { - encodeTerm(list.get(count)); - } - - if (list.isProper) bao.write(106); - } - - private void encodeTerm(Object o) throws BertException { - - if (o == null) { - Atom bert = new Atom("bert"); - Atom nil = new Atom("nil"); - Tuple tup = new Tuple(); - tup.add(bert); - tup.add(nil); - writeTuple(tup); - } else if (o instanceof Boolean) { - Atom bert = new Atom("bert"); - Atom nil = new Atom((boolean) o ? "true" : "false"); - Tuple tup = new Tuple(); - tup.add(bert); - tup.add(nil); - writeTuple(tup); - } else if (o instanceof Integer) { - int value = (int) o; - if (value >= 0 && value <= 255) { - bao.write(97); - bao.write((byte) (value & 0x00FF)); - } else { - bao.write(98); - bao.write((byte) ((value >> 24) & 0x00FF)); - bao.write((byte) ((value >> 16) & 0x00FF)); - bao.write((byte) ((value >> 8) & 0x00FF)); - bao.write((byte) ((value) & 0x00FF)); - } - } else if (o instanceof Double || o instanceof Float) { - double d = (double) o; - byte[] val = String.format("%.20e", o).getBytes(); - try { - bao.write(99); - bao.write(val); - if (val.length < 31) { - for (int count = 0; count < 31 - val.length; count++) bao.write(0); - } - } catch (IOException ex) { - throw new BertException(ex.getMessage()); - } - } else if (o instanceof Tuple) { - writeTuple((Tuple) o); - } else if (o instanceof String) { - try { - byte[] str = ((String) o).getBytes("UTF-8"); - bao.write(107); - bao.write((byte) ((str.length >> 8) & 0x00FF)); - bao.write((byte) ((str.length) & 0x00FF)); - bao.write(str); - } catch (UnsupportedEncodingException ex) { - new BertException("String not in UTF-8"); - } catch (IOException ex) { - new BertException(ex.getMessage()); - } - } else if (o instanceof Atom) { - writeAtom((Atom) o, bao); - } else if (o instanceof byte[]) { - int value = ((byte[]) o).length; - bao.write(109); - bao.write((byte) ((value >> 24) & 0x00FF)); - bao.write((byte) ((value >> 16) & 0x00FF)); - bao.write((byte) ((value >> 8) & 0x00FF)); - bao.write((byte) ((value) & 0x00FF)); - try { - bao.write((byte[]) o); - } catch (IOException ex) { - new BertException(ex.getMessage()); - } - } else if (o instanceof Time) { - Time time = (Time) o; - - Tuple tuple = new Tuple(); - tuple.add(new Atom("bert")); - tuple.add(new Atom("time")); - tuple.add(time.megasecond); - tuple.add(time.second); - tuple.add(time.microsecond); - - writeTuple(tuple); - } else if (o instanceof List) { - List list = (List) o; - if (list.size() == 0) { - bao.write(106); - } else { - writeList((List) o); - } - - } - - } - - public byte[] encode(Object o) throws BertException { - bao = new ByteArrayOutputStream(); - bao.write(-125); - - encodeTerm(o); - - return bao.toByteArray(); - } - - - public Bert(final byte[] data) throws BertException { - mBuffer = ByteBuffer.wrap(data).order(ByteOrder.BIG_ENDIAN); - - byte value = mBuffer.get(); - if (value != -125) - throw new BertException("Invalid Bert Data"); - - mValue = decode(); - } - - private Object decodeBertTerm(Tuple t) throws BertException { - if (t.get(0) instanceof Atom && ((Atom) t.get(0)).name.compareTo("bert") == 0) { - if (t.size() == 5) { - if (t.get(0) instanceof Atom && t.get(1) instanceof Atom && - ((Atom) t.get(0)).name.compareTo("bert") == 0 && - ((Atom) t.get(1)).name.compareTo("time") == 0 && - t.get(2) instanceof Integer && - t.get(3) instanceof Integer && - t.get(4) instanceof Integer) { - - Time time = new Time(); - - time.timestamp = ((int) t.get(2) * (long) 1000000 * (long) 1000) + ((int) t.get(3) * (long) 1000) + ((int) t.get(4) / 1000); - time.megasecond = (int) t.get(2); - time.second = (int) t.get(3); - time.microsecond = (int) t.get(4); - - return time; - } - } else if (t.size() == 2) { - String v = ((Atom) t.get(1)).name; - if (v.compareTo("nil") == 0) { - return null; - } else if (v.compareTo("true") == 0) { - return true; - } else if (v.compareTo("false") == 0) { - return false; - } - } else if (t.size() == 3) { - if (t.get(0) instanceof Atom && t.get(1) instanceof Atom && - ((Atom) t.get(0)).name.compareTo("bert") == 0 && - ((Atom) t.get(1)).name.compareTo("dict") == 0 && - t.get(2) instanceof List) { - Dict d = new Dict(); - List l = (List) t.get(2); - - for (int count = 0; count < l.size(); count++) { - Tuple tup = (Tuple) l.get(count); - if (tup.size() != 2) - throw new BertException("Invalid Dict Entry"); - d.put(tup.get(0), tup.get(1)); - } - - return d; - } - } - } - - return t; - } - - private Object decodeSmallTuple() throws BertException { - int len = mBuffer.get() & 0x00FFFFFFFF; - - Tuple tuple = new Tuple(); - for (int count = 0; count < len; count++) { - tuple.add(decode()); - } - - return decodeBertTerm(tuple); - } - - private Object decodeLargeTuple() throws BertException { - int len = mBuffer.getInt() & 0x00FF; - - Tuple tuple = new Tuple(); - for (int count = 0; count < len; count++) { - tuple.add(decode()); - } - - return decodeBertTerm(tuple); - } - - public List decodeList() throws BertException { - int len = mBuffer.getInt() & 0x00FF; - - List list = new List(); - for (int count = 0; count < len; count++) { - list.add(decode()); - } - - Object o = decode(); - if (!(o instanceof List)) { - list.add(o); - list.isProper = false; - } - - return list; - } - - private Object decode() throws BertException { - int tag = mBuffer.get() & 0x00FF; - byte[] val = null; - long len = 0; - - switch (tag) { - case 97: // SmallInt Tag - return (int) (mBuffer.get() & 0x00FF); - case 98: // Int Tag - return mBuffer.getInt(); - case 99: // FloatTag - mBuffer.get(mFloatStr); - return Double.parseDouble(new String(mFloatStr)); - case 100: // AtomTag - len = mBuffer.getShort() & 0x00FFFF; - val = new byte[(int) len]; - mBuffer.get(val); - Atom atom = new Atom(new String(val)); - return atom; - case 104: // SmallTupleTag - return decodeSmallTuple(); - case 105: // LargeTupleTag - return decodeLargeTuple(); - case 106: // NilTag - return new List(); - case 107: // StringTag - len = mBuffer.getShort() & 0x00FFFF; - val = new byte[(int) len]; - mBuffer.get(val); - try { - return new String(val, "UTF-8"); - } catch (UnsupportedEncodingException ex) { - return new String(val); - } - case 108: // ListTag - return decodeList(); - case 109: // BinTag - len = mBuffer.getInt() & 0x00FFFFFFFF; - val = new byte[(int) len]; - mBuffer.get(val); - return val; - default: - throw new BertException("Not Supported Bert Tag"); - } - } - - public Object getValue() { - return mValue; - } - - public String toString() { - return mValue == null ? null : mValue.toString(); - } - -} diff --git a/src/main/java/biz/nynja/bridge/bert/BertException.java b/src/main/java/biz/nynja/bridge/bert/BertException.java deleted file mode 100644 index bb72da0..0000000 --- a/src/main/java/biz/nynja/bridge/bert/BertException.java +++ /dev/null @@ -1,21 +0,0 @@ -package biz.nynja.bridge.bert; - -public class BertException extends Exception { - - private static final long serialVersionUID = 0; - - private Throwable cause; - - public BertException(String message) { - super(message); - } - - public BertException(Throwable cause) { - super(cause.getMessage()); - this.cause = cause; - } - - public Throwable getCause() { - return this.cause; - } -} diff --git a/src/main/java/biz/nynja/bridge/bert/OtpErlangBuilder.java b/src/main/java/biz/nynja/bridge/bert/OtpErlangBuilder.java new file mode 100644 index 0000000..4534bd3 --- /dev/null +++ b/src/main/java/biz/nynja/bridge/bert/OtpErlangBuilder.java @@ -0,0 +1,273 @@ +package biz.nynja.bridge.bert; + +import com.ericsson.otp.erlang.*; +import lombok.Getter; +import lombok.ToString; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Created by @author mapuo on 18/05/19. + */ +@ToString +public class OtpErlangBuilder { + + @Getter + private OtpErlangObject object; + + public OtpOutputStream asOtpOutputStream() { + return new OtpOutputStream(object); + } + + public ByteBuffer asByteBuffer() { + return ByteBuffer.wrap(asOtpOutputStream().toByteArray()); + } + + public ByteBuffer asByteBufferWithVersionTag() { + ByteBuffer buffer = asByteBuffer(); + return ByteBuffer.allocate(buffer.capacity() + 1) + .put((byte) OtpExternal.versionTag) + .put(buffer) + .flip(); + } + + private OtpErlangBuilder(Builder builder) { + object = builder.object; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static TupleBuilder newTupleBuilder() { + return new TupleBuilder(); + } + + public static ListBuilder newListBuilder() { + return new ListBuilder(); + } + + public static final class Builder { + private OtpErlangObject object; + + private Builder() { + } + + public Builder setObject(OtpErlangObject object) { + this.object = object; + return this; + } + + public OtpErlangBuilder build() { + return new OtpErlangBuilder(this); + } + } + + public static final class TupleBuilder extends TermBuilder { + private OtpErlangTuple tuple; + + public TupleBuilder() { + } + + public TupleBuilder setTuple(OtpErlangTuple tuple) { + this.tuple = tuple; + return this; + } + + @Override + TupleBuilder instance() { + return this; + } + + public OtpErlangBuilder build() { + tuple = new OtpErlangTuple(objects.toArray(new OtpErlangObject[0])); + Builder builder = new Builder().setObject(tuple); + return new OtpErlangBuilder(builder); + } + + } + + public static final class ListBuilder extends TermBuilder { + OtpErlangList list; + + public ListBuilder() { + } + + @Override + ListBuilder instance() { + return this; + } + + public OtpErlangBuilder build() { + list = new OtpErlangList(objects.toArray(new OtpErlangObject[0])); + Builder builder = new Builder().setObject(list); + return new OtpErlangBuilder(builder); + } + + } + + public static abstract class TermBuilder { + + protected List objects = new ArrayList<>(); + + private TermBuilder() { + } + + abstract T instance(); + + public T addList() { + objects.add(new OtpErlangList()); + return instance(); + } + + public T addList(OtpErlangList list) { + objects.add(list); + return instance(); + } + + public T addList(OtpErlangObject[] elems) { + objects.add(new OtpErlangList(elems)); + return instance(); + } + + public T addList(ListBuilder builder) { + objects.add(builder.build().object); + return instance(); + } + + public T addList(OtpErlangBuilder formatter) { + objects.add(formatter.object); + return instance(); + } + + public T addTuple(OtpErlangTuple tuple) { + objects.add(tuple); + return instance(); + } + + public T addTuple(OtpErlangObject[] elems) { + objects.add(new OtpErlangTuple(elems)); + return instance(); + } + + public T addTuple(TupleBuilder builder) { + objects.add(builder.build().object); + return instance(); + } + + public T addTuple(OtpErlangBuilder formatter) { + objects.add(formatter.object); + return instance(); + } + + public T addAtom(String atom) { + objects.add(new OtpErlangAtom(atom)); + return instance(); + } + + public T addAtom(boolean atom) { + objects.add(new OtpErlangAtom(atom)); + return instance(); + } + + + public T addBinary(byte[] bin) { + objects.add(new OtpErlangBinary(bin)); + return instance(); + } + + public T addBoolean(boolean bool) { + objects.add(new OtpErlangBoolean(bool)); + return instance(); + } + + public T addByte(byte bin) { + objects.add(new OtpErlangByte(bin)); + return instance(); + } + + public T addBitstr(byte[] bin) { + objects.add(new OtpErlangBitstr(bin)); + return instance(); + } + + public T addChar(char c) { + objects.add(new OtpErlangChar(c)); + return instance(); + } + + public T addDouble(double d) { + objects.add(new OtpErlangDouble(d)); + return instance(); + } + + public T addFloat(float f) { + objects.add(new OtpErlangFloat(f)); + return instance(); + } + + public T addInt() { + objects.add(new OtpErlangInt(0)); + return instance(); + } + + public T addInt(int i) { + objects.add(new OtpErlangInt(i)); + return instance(); + } + + public T addLong(long l) { + objects.add(new OtpErlangLong(l)); + return instance(); + } + + public T addShort(short s) { + objects.add(new OtpErlangShort(s)); + return instance(); + } + + public T addUnsignedInt(int i) { + try { + objects.add(new OtpErlangUInt(i)); + } catch (OtpErlangRangeException e) { + e.printStackTrace(); + } + return instance(); + } + + public T addUnsignedShort(short s) { + try { + objects.add(new OtpErlangUShort(s)); + } catch (OtpErlangRangeException e) { + e.printStackTrace(); + } + return instance(); + } + + public T addString(String s) { + objects.add(new OtpErlangString(s)); + return instance(); + } + + public T addStringAsBinary(String s) { + objects.add(new OtpErlangBinary(s.getBytes(UTF_8))); + return instance(); + } + + public T addStringAsBinary(String s, OtpErlangObject defaultIfNull) { + if (Objects.nonNull(s)) { + objects.add(new OtpErlangBinary(s.getBytes(UTF_8))); + } else { + objects.add(defaultIfNull); + } + return instance(); + } + + } + +} diff --git a/src/main/java/biz/nynja/bridge/bert/OtpErlangHelper.java b/src/main/java/biz/nynja/bridge/bert/OtpErlangHelper.java new file mode 100644 index 0000000..5133695 --- /dev/null +++ b/src/main/java/biz/nynja/bridge/bert/OtpErlangHelper.java @@ -0,0 +1,45 @@ +package biz.nynja.bridge.bert; + +import com.ericsson.otp.erlang.OtpErlangAtom; +import com.ericsson.otp.erlang.OtpErlangBinary; +import com.ericsson.otp.erlang.OtpErlangObject; +import com.ericsson.otp.erlang.OtpErlangTuple; + +import java.util.UUID; + +/** + * Created by @author mapuo on 20/05/19. + */ +public class OtpErlangHelper { + + public static UUID getUuid(OtpErlangTuple profileOrAccount) { + return UUID.fromString(getId(profileOrAccount)); + } + + public static String getId(OtpErlangTuple profileOrAccount) { + OtpErlangBinary uuidBin = (OtpErlangBinary) profileOrAccount.elementAt(1); + return new String(uuidBin.binaryValue()); + } + + public static boolean headEquals(OtpErlangTuple tuple, OtpErlangObject object) { + if (tuple == null || tuple.arity() == 0) { + return false; + } + + return tuple.elementAt(0).equals(object); + } + + public static boolean headEqualsAtom(OtpErlangTuple tuple, String atom) { + return headEqualsAtom(tuple, new OtpErlangAtom(atom)); + } + + public static boolean headEqualsAtom(OtpErlangTuple tuple, OtpErlangAtom atom) { + return headEquals(tuple, atom); + } + + public static String fromBin(OtpErlangBinary bin) { + return new String(bin.binaryValue()); + } + + +} diff --git a/src/main/java/biz/nynja/bridge/model/Account.java b/src/main/java/biz/nynja/bridge/model/Account.java index 5caf5fc..2a5af2b 100644 --- a/src/main/java/biz/nynja/bridge/model/Account.java +++ b/src/main/java/biz/nynja/bridge/model/Account.java @@ -1,80 +1,50 @@ package biz.nynja.bridge.model; -import biz.nynja.bridge.bert.Bert; +import biz.nynja.bridge.bert.OtpErlangBuilder; import biz.nynja.bridge.grpc.AccountData; +import com.ericsson.otp.erlang.OtpErlangList; import lombok.Getter; import lombok.SneakyThrows; +import lombok.ToString; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Objects; import java.util.UUID; -import static java.nio.charset.StandardCharsets.UTF_8; - -//Roster in ENC +@ToString public class Account implements MQTTModelBase { + + private static final String TYPE = "Roster"; + @Getter private UUID id; - private String names; - private String surnames; - private String email; - private String nick; - private UUID profileId; - private String avatar; - private Long update; - private StatusType status; - - public Account(AccountData accountData, StatusType status) { - this.id = UUID.fromString(accountData.getAccountId()); - this.names = accountData.getFirstName(); - this.surnames = accountData.getLastName(); - this.profileId = UUID.fromString(accountData.getProfileId()); - this.avatar = accountData.getAvatar(); - this.update = Long.valueOf(accountData.getLastUpdateTimestamp()); - this.status = status; - } - public Bert.Tuple getBertTupleWithData() { - Bert.Tuple tuple = new Bert.Tuple(); - tuple.add(new Bert.Atom("Roster")); - tuple.add(id.toString().getBytes(UTF_8)); - tuple.add(Objects.nonNull(names) ? names.getBytes(UTF_8) : new Bert.List()); - tuple.add(Objects.nonNull(surnames) ? surnames.getBytes(UTF_8) : new Bert.List()); - tuple.add(Objects.nonNull(email) ? email.getBytes(UTF_8) : new Bert.List()); - tuple.add(Objects.nonNull(nick) ? nick.getBytes(UTF_8) : new Bert.List()); + @Getter + private OtpErlangBuilder account; - tuple.add(new Bert.List());//userlist - tuple.add(new Bert.List());//roomlist - tuple.add(new Bert.List());//favorite - tuple.add(new Bert.List());//Tags - tuple.add(profileId.toString().getBytes(UTF_8)); - tuple.add(avatar.getBytes(UTF_8)); - tuple.add(new Bert.List());//update - tuple.add(new Bert.Atom(status.toString())); - return tuple; + public Account(AccountData accountData, StatusType status) { + id = UUID.fromString(accountData.getAccountId()); + account = OtpErlangBuilder.newTupleBuilder() + .addAtom(TYPE) + .addStringAsBinary(id.toString()) + .addStringAsBinary(accountData.getFirstName(), new OtpErlangList()) + .addStringAsBinary(accountData.getLastName(), new OtpErlangList()) + .addStringAsBinary(null, new OtpErlangList()) // email + .addStringAsBinary(null, new OtpErlangList()) // nickname + .addList() + .addList() + .addList() + .addList() + .addStringAsBinary(accountData.getProfileId()) + .addStringAsBinary(accountData.getAvatar(), new OtpErlangList()) + .addLong(Long.valueOf(accountData.getLastUpdateTimestamp())) + .addAtom(status.toString()) + .build(); } @Override @SneakyThrows public byte[] getBertFormat() { - Bert.Tuple tuple = new Bert.Tuple(); - tuple.add("Roster".getBytes(UTF_8)); - tuple.add(id.toString().getBytes(UTF_8)); - tuple.add(Objects.nonNull(names) ? names.getBytes(UTF_8) : new Bert.List()); - tuple.add(Objects.nonNull(surnames) ? surnames.getBytes(UTF_8) : new Bert.List()); - tuple.add(Objects.nonNull(email) ? email.getBytes(UTF_8) : new Bert.List()); - tuple.add(Objects.nonNull(nick) ? nick.getBytes(UTF_8) : new Bert.List()); - - tuple.add(new Bert.List());//userlist - tuple.add(new Bert.List());//roomlist - tuple.add(new Bert.List());//favorite - tuple.add(new Bert.List());//Tags - tuple.add(profileId.toString().getBytes(UTF_8)); - tuple.add(avatar.getBytes(UTF_8)); - tuple.add(new Bert.Time(update)); - tuple.add(new Bert.Atom(status.toString())); - - return new Bert().encode(tuple); + return account.asByteBufferWithVersionTag() + .array(); } + } diff --git a/src/main/java/biz/nynja/bridge/model/MQTTModelBase.java b/src/main/java/biz/nynja/bridge/model/MQTTModelBase.java index 976ed0d..2c7f355 100644 --- a/src/main/java/biz/nynja/bridge/model/MQTTModelBase.java +++ b/src/main/java/biz/nynja/bridge/model/MQTTModelBase.java @@ -1,8 +1,5 @@ package biz.nynja.bridge.model; -import biz.nynja.bridge.bert.BertException; - -import java.io.UnsupportedEncodingException; import java.util.UUID; public interface MQTTModelBase { @@ -10,4 +7,5 @@ public interface MQTTModelBase { UUID getId(); byte[] getBertFormat(); + } diff --git a/src/main/java/biz/nynja/bridge/model/Profile.java b/src/main/java/biz/nynja/bridge/model/Profile.java index 09dd125..ca01e65 100644 --- a/src/main/java/biz/nynja/bridge/model/Profile.java +++ b/src/main/java/biz/nynja/bridge/model/Profile.java @@ -1,65 +1,55 @@ package biz.nynja.bridge.model; -import biz.nynja.bridge.bert.Bert; +import biz.nynja.bridge.bert.OtpErlangBuilder; +import biz.nynja.bridge.bert.OtpErlangBuilder.ListBuilder; import biz.nynja.bridge.grpc.ProfileData; import lombok.Getter; import lombok.SneakyThrows; +import lombok.ToString; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; import java.util.UUID; -import java.util.stream.Collectors; - -import static java.nio.charset.StandardCharsets.*; +@ToString public class Profile implements MQTTModelBase { + private static final String TYPE = "Profile"; + @Getter - private UUID id; //phone in erlang - private Account account; - private List accounts; - private StatusType status; - private Long update; + private UUID id; // phone in erlang + + private OtpErlangBuilder profile; public Profile(ProfileData profileData, StatusType status) { - this.id = UUID.fromString(profileData.getProfileId()); - if (status.equals(StatusType.CREATE)) { - this.account = new Account(profileData.getDefaultAccount(), status); + id = UUID.fromString(profileData.getProfileId()); + + ListBuilder accountBuilder = OtpErlangBuilder.newListBuilder(); + if (status == StatusType.CREATE) { + Account account = new Account(profileData.getDefaultAccount(), status); + accountBuilder.addTuple(account.getAccount()); } else { - this.accounts = profileData.getAccountsIdsList().stream().map(UUID::fromString).collect(Collectors.toList()); + profileData.getAccountsIdsList() + .forEach(accountBuilder::addStringAsBinary); } - this.status = status; - this.update = Long.valueOf(profileData.getLastUpdateTimestamp()); + + profile = OtpErlangBuilder.newTupleBuilder() + .addAtom(TYPE) + .addStringAsBinary(id.toString()) + .addList() + .addList(accountBuilder) + .addList() + // update + .addLong(Long.valueOf(profileData.getLastUpdateTimestamp())) + // balance + .addInt() + .addList() + .addAtom(status.toString()) + .build(); } @Override @SneakyThrows public byte[] getBertFormat() { - Bert.Tuple tuple = new Bert.Tuple(); - - tuple.add(new Bert.Atom("Profile")); - tuple.add(id.toString().getBytes(UTF_8));//phone - tuple.add(new Bert.List());//services - if (status.equals(StatusType.CREATE)) { - Bert.List accounts = new Bert.List(); - accounts.add(account.getBertTupleWithData()); - tuple.add(accounts); - } else { - Bert.List accountsIdsBertFormat = new Bert.List(); - accounts.forEach(uuid -> { - accountsIdsBertFormat.add(uuid.toString().getBytes(UTF_8)); - }); - tuple.add(accountsIdsBertFormat); - } - tuple.add(new Bert.List());//settings - tuple.add(new Bert.List());//update - tuple.add(new Bert.List());//balance - tuple.add(new Bert.List());//presence - tuple.add(new Bert.Atom(status.toString())); - - Bert bert = new Bert(); - return new Bert().encode(tuple); + return profile.asByteBufferWithVersionTag().array(); } -} \ No newline at end of file + +} diff --git a/src/main/java/biz/nynja/bridge/model/StatusType.java b/src/main/java/biz/nynja/bridge/model/StatusType.java index d61cf5e..6078ed3 100644 --- a/src/main/java/biz/nynja/bridge/model/StatusType.java +++ b/src/main/java/biz/nynja/bridge/model/StatusType.java @@ -1,19 +1,19 @@ package biz.nynja.bridge.model; - import lombok.AllArgsConstructor; -import lombok.ToString; @AllArgsConstructor -@ToString public enum StatusType { + CREATE("create"), UPDATE("update"), - DELETE("del"); + DELETE("remove"); + private final String text; @Override public String toString() { return text; } + } diff --git a/src/main/java/biz/nynja/bridge/properties/ErlangBridgeConfiguration.java b/src/main/java/biz/nynja/bridge/properties/ErlangBridgeConfiguration.java index 27f4ea1..b991a44 100644 --- a/src/main/java/biz/nynja/bridge/properties/ErlangBridgeConfiguration.java +++ b/src/main/java/biz/nynja/bridge/properties/ErlangBridgeConfiguration.java @@ -1,13 +1,12 @@ package biz.nynja.bridge.properties; - -import io.vertx.core.json.JsonObject; import lombok.Getter; import lombok.Setter; -import lombok.extern.slf4j.Slf4j; +import lombok.ToString; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; +@ToString @Getter @Setter @Configuration diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index 00e3f30..8ef8d40 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -3,64 +3,75 @@ package biz.nynja.bridge.verticle; import biz.nynja.bridge.cache.DataEventsStatus; import biz.nynja.bridge.model.MQTTModelBase; import biz.nynja.bridge.properties.ErlangBridgeConfiguration; +import com.ericsson.otp.erlang.*; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; import org.springframework.stereotype.Component; +import java.util.Arrays; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Function; -import static java.nio.charset.StandardCharsets.UTF_8; +import static biz.nynja.bridge.bert.OtpErlangHelper.*; @Slf4j @Component public class MQTTVerticle extends AbstractVerticle { - private static final String MQTT_TOPIC = "events/1//api/anon//"; - private MqttClient client; - private final ErlangBridgeConfiguration erlangBridgeConfiguration; + private static final String MQTT_TOPIC = "events/1//api/anon//"; + private static final long MESSAGE_TIMEOUT_MILLIS = 2000; + private final ErlangBridgeConfiguration configuration; private final ConcurrentHashMap dataEventsStatusCache; + private MqttClient client; + public MQTTVerticle(ErlangBridgeConfiguration erlangBridgeConfiguration) { - this.erlangBridgeConfiguration = erlangBridgeConfiguration; + configuration = erlangBridgeConfiguration; dataEventsStatusCache = new ConcurrentHashMap<>(); } @Override - public void start() throws Exception { + public void start() + throws Exception { + initMqttClient(); initBus(); } - private void initMqttClient() { - MqttClientOptions options = new MqttClientOptions().setKeepAliveTimeSeconds(2).setClientId("sys_micro_bridge"); + MqttClientOptions options = new MqttClientOptions() + .setKeepAliveTimeSeconds(2) + .setClientId("sys_micro_bridge"); client = MqttClient.create(vertx, options); + initSubscriber(); - client.subscribeCompletionHandler(h -> { - log.info("Receive SUBACK from server with granted QoS : " + h.grantedQoSLevels()); + client.subscribeCompletionHandler(h -> { + log.info("Receive SUBACK from server with granted QoS: {}", h.grantedQoSLevels()); }); client.unsubscribeCompletionHandler(h -> { log.debug("Receive UNSUBACK from server"); vertx.setTimer(5000, l -> - client.disconnect(d -> log.error("Disconnected form server")) - ); + client.disconnect(d -> log.error("Disconnected form server"))); }); - - client.connect(erlangBridgeConfiguration.getPort(), erlangBridgeConfiguration.getHost(), ch -> { + client.connect(configuration.getPort(), configuration.getHost(), ch -> { if (ch.succeeded()) { log.debug("Connected to a server"); client.subscribe(MQTT_TOPIC, 0); } else { - log.error("Failed to connect to a server, cause {}", ch.cause()); + log.error("Failed to connect to a server, cause", ch.cause()); } }); } @@ -68,29 +79,36 @@ public class MQTTVerticle extends AbstractVerticle { private void initBus() { vertx.eventBus().consumer("mqtt-bus", message -> { MQTTModelBase model = message.body(); - log.info("Publish message with model id = {}", model.getId()); + + log.info("Publish message with model {} [id:{}]", + model.getClass().getSimpleName(), model.getId()); + Future publishFuture = Future.future(); publishMqttMessage(publishFuture, model); + Future callBackFuture = Future.future(); - //because if we added after publishing, we can have race condition (event loop flow) - dataEventsStatusCache.put(model.getId().toString(), new DataEventsStatus(System.currentTimeMillis(), callBackFuture, DataEventsStatus.DataStatus.SENT)); + // because if we added after publishing, we can have race condition (event loop flow) + long currentTimeMillis = System.currentTimeMillis(); + DataEventsStatus dataEventsStatus = new DataEventsStatus( + currentTimeMillis, callBackFuture, DataEventsStatus.DataStatus.SENT); + dataEventsStatusCache.put(model.getId().toString(), dataEventsStatus); + publishFuture.setHandler(booleanAsyncResult -> { if (booleanAsyncResult.failed()) { message.reply(false); return; } callBackFuture.setHandler(result -> { - message.reply(true); - } - ); - vertx.setTimer(2000, timer -> { + message.reply(true); + }); + vertx.setTimer(MESSAGE_TIMEOUT_MILLIS, timer -> { if (!callBackFuture.isComplete()) { dataEventsStatusCache.remove(model.getId().toString()); - log.debug("callBack future for {} not complete after 2 second", model.getId()); + log.warn("Callback future for [id:{}] not complete after {} second(s)", + model.getId(), TimeUnit.MILLISECONDS.toSeconds(MESSAGE_TIMEOUT_MILLIS)); callBackFuture.complete(false); } }); - }); }); } @@ -102,60 +120,82 @@ public class MQTTVerticle extends AbstractVerticle { MqttQoS.EXACTLY_ONCE, false, false, - s -> { + event -> { complete.complete(); }); - } private void initSubscriber() { client.publishHandler(publish -> { - String response = publish.payload().toString(UTF_8).replace("\r", ""); - log.info("Just received message on [ {} ] payload [ {} ] with QoS [ {} ]", publish.topicName(), response, publish.qosLevel()); - if (response.contains("error")) { - erlangErrorHandler(response); - } else { - erlangSuccessHandler(getIdFromSuccesResponse(response)); + final int messageId = publish.messageId(); + + log.info("[messageId:{}] Received on [topic:{}] with QoS [qos:{}]", + messageId, publish.topicName(), publish.qosLevel()); + + try { + byte[] responseBytes = publish.payload().getBytes(); + log.debug("responseBytes: {}", Arrays.toString(responseBytes)); + + OtpInputStream inputStream = new OtpInputStream(responseBytes); + OtpErlangObject erlangObject = inputStream.read_any(); + log.debug("[messageId:{}] response: {}", messageId, erlangObject); + + if (erlangObject instanceof OtpErlangTuple) { + OtpErlangTuple tuple = (OtpErlangTuple) erlangObject; + boolean isErrorResponse = headEqualsAtom(tuple, "errors"); + LogLevel logLevel = isErrorResponse ? LogLevel.ERROR : LogLevel.INFO; + + String id = (isErrorResponse) + ? getId(extractProfileOrAccount(tuple)) + : getId(tuple); + + DataEventsStatus eventStatus = dataEventsStatusCache.remove(id); + + if (Objects.isNull(eventStatus)) { + logLevel.log(log, "[messageId:{}] Response from erlang [id:{}] without duplicating in cache", + messageId, id); + return; + } + if (eventStatus.getCallBack().isComplete()) { + logLevel.log(log, "[messageId:{}] Response from erlang [id:{}]. Future is complete ", + messageId, id); + return; + } + logLevel.log(log, "[messageId:{}] Calling callback for [id:{}] with result = {}", + messageId, id, !isErrorResponse); + eventStatus.getCallBack().complete(!isErrorResponse); + } + } catch (OtpErlangDecodeException e) { + log.error("[messageId:{}] Could not read response!", messageId); } }); } - private void erlangSuccessHandler(String id) { - log.debug("Success response from erlang with id = {}", id); - DataEventsStatus eventStatus = dataEventsStatusCache.remove(id); - if (Objects.isNull(eventStatus)) { - log.error("Success response from erlang with id = {} without duplicating in cache", id); - return; - } - if (eventStatus.getCallBack().isComplete()) { - log.error("Success response from erlang with id = {}. Future is complete ", id); - return; - } - eventStatus.getCallBack().complete(true); - } + private OtpErlangTuple extractProfileOrAccount(OtpErlangTuple errorsTuple) { + // error code + // String code = fromBin((OtpErlangBinary) ((OtpErlangList) errorsTuple.elementAt(1)).getHead()); - private void erlangErrorHandler(String response) { - String id = response.substring(response.length() - 36); - log.debug("Error from erlang with id = {}", id); - DataEventsStatus eventStatus = dataEventsStatusCache.remove(id); - if (Objects.isNull(eventStatus)) { - log.error("Error from erlang with id = {} without duplicating in cache", id); - return; - } - if (eventStatus.getCallBack().isComplete()) { - log.error("Error from erlang with id = {}. Future is complete ", id); - return; - } - eventStatus.getCallBack().complete(false); + // list of errors + OtpErlangList listOfErrors = (OtpErlangList) errorsTuple.elementAt(2); + // first error + OtpErlangTuple head = (OtpErlangTuple) listOfErrors.getHead(); + + // atom with the element that is in error + // OtpErlangObject errorField = head.elementAt(0); + + // tuple - Profile | Account + return (OtpErlangTuple) head.elementAt(1); } - private String getIdFromSuccesResponse(String response) { - if (response.contains("Profile")) { - //see bert payload structure - return response.substring(18, 18 + 36); - } else { - int necessaryIndex = response.indexOf("Roster") + "Roster".length(); - return response.substring(necessaryIndex + 5, necessaryIndex + 41); + @RequiredArgsConstructor + private enum LogLevel { + INFO(l -> l::info), + ERROR(l -> l::error); + + private final Function> function; + + public void log(Logger logger, String message, Object... objects) { + function.apply(logger).accept(message, objects); } } diff --git a/src/test/java/biz/nynja/bridge/test/BertTest.java b/src/test/java/biz/nynja/bridge/test/BertTest.java deleted file mode 100644 index 2cd9f0c..0000000 --- a/src/test/java/biz/nynja/bridge/test/BertTest.java +++ /dev/null @@ -1,47 +0,0 @@ -package biz.nynja.bridge.test; - -import biz.nynja.bridge.grpc.AccountData; -import biz.nynja.bridge.grpc.ProfileData; -import biz.nynja.bridge.model.Account; -import biz.nynja.bridge.model.MQTTModelBase; -import biz.nynja.bridge.model.Profile; -import biz.nynja.bridge.model.StatusType; -import io.vertx.ext.unit.TestContext; -import io.vertx.ext.unit.junit.VertxUnitRunner; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; - -import java.util.Arrays; -import java.util.UUID; - -@RunWith(VertxUnitRunner.class) -public class BertTest { - - @Test - public void testProfileToBert(TestContext context) { - MQTTModelBase profile = new Profile(buildTestProfileData(), StatusType.CREATE); - Assert.assertArrayEquals(profile.getBertFormat(), new byte[]{-125, 104, 9, 100, 0, 7, 80, 114, 111, 102, 105, 108, 101, 109, 0, 0, 0, 36, 97, 97, 99, 56, 101, 101, 57, 52, 45, 100, 56, 49, 99, 45, 52, 53, 97, 48, 45, 56, 51, 50, 48, 45, 50, 97, 102, 48, 52, 57, 56, 57, 49, 55, 100, 101, 106, 108, 0, 0, 0, 1, 104, 14, 100, 0, 6, 82, 111, 115, 116, 101, 114, 109, 0, 0, 0, 36, 57, 51, 49, 98, 57, 50, 100, 51, 45, 53, 55, 98, 99, 45, 52, 49, 55, 54, 45, 98, 57, 49, 52, 45, 51, 54, 57, 54, 54, 53, 51, 48, 48, 99, 57, 100, 109, 0, 0, 0, 13, 116, 101, 115, 116, 70, 105, 114, 115, 116, 78, 97, 109, 101, 109, 0, 0, 0, 12, 116, 101, 115, 116, 76, 97, 115, 116, 78, 97, 109, 101, 106, 106, 106, 106, 106, 106, 109, 0, 0, 0, 36, 97, 97, 99, 56, 101, 101, 57, 52, 45, 100, 56, 49, 99, 45, 52, 53, 97, 48, 45, 56, 51, 50, 48, 45, 50, 97, 102, 48, 52, 57, 56, 57, 49, 55, 100, 101, 109, 0, 0, 0, 60, 104, 116, 116, 112, 115, 58, 47, 47, 98, 105, 112, 98, 97, 112, 46, 114, 117, 47, 119, 112, 45, 99, 111, 110, 116, 101, 110, 116, 47, 117, 112, 108, 111, 97, 100, 115, 47, 50, 48, 49, 56, 47, 48, 51, 47, 55, 87, 70, 80, 113, 71, 68, 122, 45, 48, 115, 46, 106, 112, 103, 106, 100, 0, 6, 99, 114, 101, 97, 116, 101, 106, 106, 106, 106, 106, 100, 0, 6, 99, 114, 101, 97, 116, 101}); - } - - @Test - public void testAccountToBert(TestContext context) { - MQTTModelBase account = new Account(buildTestAccountData("aac8ee94-d81c-45a0-8320-2af0498917de"), StatusType.CREATE); - Assert.assertArrayEquals(account.getBertFormat(), new byte[]{-125, 104, 14, 109, 0, 0, 0, 6, 82, 111, 115, 116, 101, 114, 109, 0, 0, 0, 36, 57, 51, 49, 98, 57, 50, 100, 51, 45, 53, 55, 98, 99, 45, 52, 49, 55, 54, 45, 98, 57, 49, 52, 45, 51, 54, 57, 54, 54, 53, 51, 48, 48, 99, 57, 100, 109, 0, 0, 0, 13, 116, 101, 115, 116, 70, 105, 114, 115, 116, 78, 97, 109, 101, 109, 0, 0, 0, 12, 116, 101, 115, 116, 76, 97, 115, 116, 78, 97, 109, 101, 106, 106, 106, 106, 106, 106, 109, 0, 0, 0, 36, 97, 97, 99, 56, 101, 101, 57, 52, 45, 100, 56, 49, 99, 45, 52, 53, 97, 48, 45, 56, 51, 50, 48, 45, 50, 97, 102, 48, 52, 57, 56, 57, 49, 55, 100, 101, 109, 0, 0, 0, 60, 104, 116, 116, 112, 115, 58, 47, 47, 98, 105, 112, 98, 97, 112, 46, 114, 117, 47, 119, 112, 45, 99, 111, 110, 116, 101, 110, 116, 47, 117, 112, 108, 111, 97, 100, 115, 47, 50, 48, 49, 56, 47, 48, 51, 47, 55, 87, 70, 80, 113, 71, 68, 122, 45, 48, 115, 46, 106, 112, 103, 104, 5, 100, 0, 4, 98, 101, 114, 116, 100, 0, 4, 116, 105, 109, 101, 98, 0, 0, 6, 10, 98, 0, 6, -31, 84, 98, 0, 3, 9, 88, 100, 0, 6, 99, 114, 101, 97, 116, 101}); - } - - private static ProfileData buildTestProfileData() { - String profileId = "aac8ee94-d81c-45a0-8320-2af0498917de"; - return ProfileData.newBuilder().setProfileId(profileId) - .setDefaultAccount(buildTestAccountData(profileId)) - .setLastUpdateTimestamp(Long.toString(System.currentTimeMillis())).build(); - } - - - private static AccountData buildTestAccountData(String profileId) { - return AccountData.newBuilder().setAccountId("931b92d3-57bc-4176-b914-369665300c9d") - .setFirstName("testFirstName").setLastName("testLastName").setProfileId(profileId) - .setUsername("username").setAvatar("https://bipbap.ru/wp-content/uploads/2018/03/7WFPqGDz-0s.jpg") - .setLastUpdateTimestamp("1546450900199").build(); - } -} diff --git a/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java b/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java index 1938233..949f892 100644 --- a/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java +++ b/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java @@ -13,21 +13,15 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; -import static biz.nynja.bridge.grpc.AccountBridgeGrpc.newFutureStub; import static io.grpc.ManagedChannelBuilder.forAddress; - public class AccountBridgeClient { - private static final Logger logger = Logger.getLogger(AccountBridgeClient.class.getName()); - - private final AccountBridgeGrpc.AccountBridgeBlockingStub bridgeStub; - private static AccountBridgeGrpc.AccountBridgeBlockingStub blockingStub; - - private final ManagedChannel channel; - private AccountBridgeGrpc.AccountBridgeFutureStub futureStub; + private static final Logger logger = Logger.getLogger(AccountBridgeClient.class.getName()); private final static int GRPC_PORT = 6580; + private final ManagedChannel channel; + private final AccountBridgeGrpc.AccountBridgeBlockingStub bridgeStub; public AccountBridgeClient(String host, int port) { this(ManagedChannelBuilder.forAddress(host, port).usePlaintext()); @@ -36,14 +30,10 @@ public class AccountBridgeClient { public AccountBridgeClient(ManagedChannelBuilder channelBuilder) { channel = channelBuilder.build(); bridgeStub = AccountBridgeGrpc.newBlockingStub(channel); - blockingStub = AccountBridgeGrpc.newBlockingStub(channel); - futureStub = newFutureStub(channel); - } public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); - } public BridgeSuccessResponse createProfile(ProfileData request) { @@ -66,45 +56,25 @@ public class AccountBridgeClient { return bridgeStub.deleteAccount(request); } - public static void main(String[] args) throws Exception { - - createProfileData(); - deleteProfileData(); - createAccountData(); - updateAccount(); - deleteAccount(); - } - - private static void createProfileData() throws InterruptedException { + public static void main(String[] args) + throws Exception { ManagedChannelBuilder channelBuilder = forAddress("localhost", GRPC_PORT).usePlaintext(); AccountBridgeClient client = new AccountBridgeClient(channelBuilder); - String profileId = UUID.randomUUID().toString(); +// String accountId = "d0025ee6-7acd-4e77-9095-cc5910aa8bc2"; String accountId = UUID.randomUUID().toString(); - String currentTimeMillis = String.valueOf(System.currentTimeMillis()); - ProfileData request = ProfileData.newBuilder() - .setProfileId(profileId) - .setLastUpdateTimestamp(currentTimeMillis) - .setDefaultAccount(AccountData.newBuilder() - .setAccountId(accountId) - .setProfileId(profileId) - .setFirstName("Test") - .setLastName("Mest") - .setUsername("test") - .setLastUpdateTimestamp(currentTimeMillis) - .build()) - .build(); +// String profileId = "63a7bafd-28e3-46dd-a25e-be194f99496e"; + String profileId = UUID.randomUUID().toString(); - try { - blockingStub.createProfile(request); - } catch (StatusRuntimeException e) { - logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); - return; - } + logger.log(Level.INFO, "accountId: " + accountId + ", profileId: " + profileId); try { - client.createProfile(request); + createProfile(client, profileId, accountId); +// createAccount(client, profileId, accountId); +// updateAccount(client, profileId, accountId); +// deleteAccount(client, profileId, accountId); + deleteProfile(client, profileId, accountId); } catch (StatusRuntimeException e) { logger.log(Level.WARNING, "RPC Response failed: {0}", e.getStatus()); return; @@ -113,13 +83,9 @@ public class AccountBridgeClient { client.shutdown(); } - private static void deleteProfileData() throws InterruptedException { - - ManagedChannelBuilder channelBuilder = forAddress("localhost", GRPC_PORT).usePlaintext(); - AccountBridgeClient client = new AccountBridgeClient(channelBuilder); + private static void createProfile(AccountBridgeClient client, String profileId, String accountId) + throws Exception { - String profileId = UUID.randomUUID().toString(); - String accountId = UUID.randomUUID().toString(); String currentTimeMillis = String.valueOf(System.currentTimeMillis()); ProfileData request = ProfileData.newBuilder() .setProfileId(profileId) @@ -134,109 +100,51 @@ public class AccountBridgeClient { .build()) .build(); - try { - blockingStub.deleteProfile(request); - } catch (StatusRuntimeException e) { - logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); - return; - } - - try { - client.deleteProfile(request); - } catch (StatusRuntimeException e) { - logger.log(Level.WARNING, "RPC Response failed: {0}", e.getStatus()); - return; - } - - client.shutdown(); + client.createProfile(request); } - private static void createAccountData() throws InterruptedException { - - ManagedChannelBuilder channelBuilder = forAddress("localhost", GRPC_PORT).usePlaintext(); - AccountBridgeClient client = new AccountBridgeClient(channelBuilder); + private static void createAccount(AccountBridgeClient client, String profileId, String accountId) + throws Exception { - String profileId = UUID.randomUUID().toString(); - String accountId = UUID.randomUUID().toString(); String currentTimeMillis = String.valueOf(System.currentTimeMillis()); AccountData request = AccountData.newBuilder() .setProfileId(profileId) .setLastUpdateTimestamp(currentTimeMillis) .setAccountId(accountId) - .setProfileId(profileId) .setFirstName("Test") .setLastName("Mest") .setUsername("test") - .setLastUpdateTimestamp(currentTimeMillis) .build(); - try { - blockingStub.createAccount(request); - } catch (StatusRuntimeException e) { - logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); - return; - } - - try { - client.createAccount(request); - } catch (StatusRuntimeException e) { - logger.log(Level.WARNING, "RPC Response failed: {0}", e.getStatus()); - return; - } - - client.shutdown(); + client.createAccount(request); } - private static void updateAccount() throws InterruptedException { - - ManagedChannelBuilder channelBuilder = forAddress("localhost", GRPC_PORT).usePlaintext(); - AccountBridgeClient client = new AccountBridgeClient(channelBuilder); + private static void updateAccount(AccountBridgeClient client, String profileId, String accountId) + throws Exception { - String profileId = UUID.randomUUID().toString(); - String accountId = UUID.randomUUID().toString(); - String currentTimeMillis = String.valueOf(System.currentTimeMillis()); + String currentTimeMillis = Long.toString(System.currentTimeMillis()); AccountData request = AccountData.newBuilder() - .setProfileId(profileId) .setLastUpdateTimestamp(currentTimeMillis) - .setAccountId(accountId) .setProfileId(profileId) - .setFirstName("Test") - .setLastName("Mest") + .setAccountId(accountId) + .setFirstName("Test3") + .setLastName("Mest4") .setUsername("test") - .setLastUpdateTimestamp(currentTimeMillis) .build(); - try { - blockingStub.updateAccount(request); - } catch (StatusRuntimeException e) { - logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); - return; - } - - try { - client.updateAccount(request); - } catch (StatusRuntimeException e) { - logger.log(Level.WARNING, "RPC Response failed: {0}", e.getStatus()); - return; - } - - client.shutdown(); + client.updateAccount(request); } - private static void deleteAccount() throws InterruptedException { + private static void deleteAccount(AccountBridgeClient client, String profileId, String accountId) + throws Exception { - ManagedChannelBuilder channelBuilder = forAddress("localhost", GRPC_PORT).usePlaintext(); - AccountBridgeClient client = new AccountBridgeClient(channelBuilder); - - String profileId = UUID.randomUUID().toString(); - String accountId = UUID.randomUUID().toString(); String currentTimeMillis = String.valueOf(System.currentTimeMillis()); ProfileData request = ProfileData.newBuilder() .setProfileId(profileId) .setLastUpdateTimestamp(currentTimeMillis) .setDefaultAccount(AccountData.newBuilder() - .setAccountId(accountId) .setProfileId(profileId) + .setAccountId(accountId) .setFirstName("Test") .setLastName("Mest") .setUsername("test") @@ -244,20 +152,28 @@ public class AccountBridgeClient { .build()) .build(); - try { - blockingStub.deleteAccount(request); - } catch (StatusRuntimeException e) { - logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); - return; - } + client.deleteAccount(request); + } - try { - client.deleteAccount(request); - } catch (StatusRuntimeException e) { - logger.log(Level.WARNING, "RPC Response failed: {0}", e.getStatus()); - return; - } + private static void deleteProfile(AccountBridgeClient client, String profileId, String accountId) + throws Exception { - client.shutdown(); + String currentTimeMillis = String.valueOf(System.currentTimeMillis()); + ProfileData request = ProfileData.newBuilder() + .setProfileId(profileId) + .addAccountsIds(accountId) + .setLastUpdateTimestamp(currentTimeMillis) + .setDefaultAccount(AccountData.newBuilder() + .setProfileId(profileId) + .setAccountId(accountId) + .setFirstName("Test") + .setLastName("Mest") + .setUsername("test") + .setLastUpdateTimestamp(currentTimeMillis) + .build()) + .build(); + + client.deleteProfile(request); } + } -- GitLab From 6e9dc491561feae7ca8257a4b25861eff699b9c2 Mon Sep 17 00:00:00 2001 From: mapuo Date: Tue, 21 May 2019 21:25:21 +0300 Subject: [PATCH 05/25] log mqtt configuration & successful connection --- src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index 8ef8d40..b58c6e6 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -38,6 +38,7 @@ public class MQTTVerticle extends AbstractVerticle { public MQTTVerticle(ErlangBridgeConfiguration erlangBridgeConfiguration) { configuration = erlangBridgeConfiguration; + log.info("mqtt configuration: {}", configuration); dataEventsStatusCache = new ConcurrentHashMap<>(); } @@ -68,7 +69,7 @@ public class MQTTVerticle extends AbstractVerticle { }); client.connect(configuration.getPort(), configuration.getHost(), ch -> { if (ch.succeeded()) { - log.debug("Connected to a server"); + log.info("Connected to a server"); client.subscribe(MQTT_TOPIC, 0); } else { log.error("Failed to connect to a server, cause", ch.cause()); -- GitLab From 28b5602bff4b5fff9ce532ee056b8e99461d39ef Mon Sep 17 00:00:00 2001 From: mapuo Date: Wed, 22 May 2019 18:35:50 +0300 Subject: [PATCH 06/25] add auto keep alive for the mqtt connection --- src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index b58c6e6..b3b3dfb 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -30,6 +30,7 @@ public class MQTTVerticle extends AbstractVerticle { private static final String MQTT_TOPIC = "events/1//api/anon//"; private static final long MESSAGE_TIMEOUT_MILLIS = 2000; + private static final int KEEP_ALIVE_TIME_SECONDS = 20; private final ErlangBridgeConfiguration configuration; private final ConcurrentHashMap dataEventsStatusCache; @@ -52,7 +53,9 @@ public class MQTTVerticle extends AbstractVerticle { private void initMqttClient() { MqttClientOptions options = new MqttClientOptions() - .setKeepAliveTimeSeconds(2) + .setAutoKeepAlive(true) + .setKeepAliveTimeSeconds(KEEP_ALIVE_TIME_SECONDS) + .setCleanSession(false) .setClientId("sys_micro_bridge"); client = MqttClient.create(vertx, options); -- GitLab From 914b522e67caa3f7ff71ed5787e63265f879a99a Mon Sep 17 00:00:00 2001 From: mapuo Date: Wed, 22 May 2019 22:02:14 +0300 Subject: [PATCH 07/25] implement reconnect for the MQTT client add better logging --- pom.xml | 18 +++---- .../nynja/bridge/verticle/GRPCVerticle.java | 12 +++-- .../nynja/bridge/verticle/MQTTVerticle.java | 49 +++++++++++-------- src/main/resources/application-dev.yml | 5 ++ src/main/resources/application-production.yml | 9 ++++ .../bridge/verticle/GRPCVerticleTest.java | 3 +- 6 files changed, 58 insertions(+), 38 deletions(-) diff --git a/pom.xml b/pom.xml index b925894..4a8c898 100644 --- a/pom.xml +++ b/pom.xml @@ -33,9 +33,10 @@ UTF-8 10 3.6.3 - 1.6.4 + 1.7.26 1.2.3 0.6.0 + 1.18.8 @@ -109,7 +110,7 @@ org.projectlombok lombok - 1.18.2 + ${lombok.version} provided @@ -128,14 +129,8 @@ org.slf4j - slf4j-api - 1.8.0-beta2 - - - - org.slf4j - slf4j-simple - 1.8.0-beta2 + log4j-over-slf4j + ${slf4j.version} @@ -144,6 +139,7 @@ ${logback.version} + io.vertx vertx-unit @@ -168,7 +164,7 @@ org.codehaus.groovy groovy-all - 2.5.4 + 2.5.7 pom diff --git a/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java b/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java index 54b4cf0..3227934 100644 --- a/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java @@ -18,6 +18,8 @@ import io.vertx.grpc.VertxServer; import io.vertx.grpc.VertxServerBuilder; import org.springframework.stereotype.Component; +import static biz.nynja.bridge.verticle.MQTTVerticle.MQTT_BUS; + @Component public class GRPCVerticle extends AbstractVerticle { @@ -61,35 +63,35 @@ public class GRPCVerticle extends AbstractVerticle { public void createProfile(ProfileData request, Future response) { Histogram.Timer timer = requestLatency.startTimer(); Profile profile = new Profile(request, StatusType.CREATE); - vertx.eventBus().send("mqtt-bus", profile, new ReplyHandler(response, timer)); + vertx.eventBus().send(MQTT_BUS, profile, new ReplyHandler(response, timer)); } @Override public void deleteProfile(ProfileData request, Future response) { Histogram.Timer timer = requestLatency.startTimer(); Profile profile = new Profile(request, StatusType.DELETE); - vertx.eventBus().send("mqtt-bus", profile, new ReplyHandler(response, timer)); + vertx.eventBus().send(MQTT_BUS, profile, new ReplyHandler(response, timer)); } @Override public void createAccount(AccountData request, Future response) { Histogram.Timer timer = requestLatency.startTimer(); Account profile = new Account(request, StatusType.CREATE); - vertx.eventBus().send("mqtt-bus", profile, new ReplyHandler(response, timer)); + vertx.eventBus().send(MQTT_BUS, profile, new ReplyHandler(response, timer)); } @Override public void updateAccount(AccountData request, Future response) { Histogram.Timer timer = requestLatency.startTimer(); Account profile = new Account(request, StatusType.UPDATE); - vertx.eventBus().send("mqtt-bus", profile, new ReplyHandler(response, timer)); + vertx.eventBus().send(MQTT_BUS, profile, new ReplyHandler(response, timer)); } @Override public void deleteAccount(ProfileData request, Future response) { Histogram.Timer timer = requestLatency.startTimer(); Profile profile = new Profile(request, StatusType.DELETE); - vertx.eventBus().send("mqtt-bus", profile, new ReplyHandler(response, timer)); + vertx.eventBus().send(MQTT_BUS, profile, new ReplyHandler(response, timer)); } }; diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index b3b3dfb..f79c8d9 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -28,7 +28,10 @@ import static biz.nynja.bridge.bert.OtpErlangHelper.*; @Component public class MQTTVerticle extends AbstractVerticle { + public static final String MQTT_BUS = "mqtt-bus"; + private static final String MQTT_TOPIC = "events/1//api/anon//"; + private static final String CLIENT_ID = "sys_micro_bridge"; private static final long MESSAGE_TIMEOUT_MILLIS = 2000; private static final int KEEP_ALIVE_TIME_SECONDS = 20; @@ -36,32 +39,24 @@ public class MQTTVerticle extends AbstractVerticle { private final ConcurrentHashMap dataEventsStatusCache; private MqttClient client; + private MqttClientOptions clientOptions; - public MQTTVerticle(ErlangBridgeConfiguration erlangBridgeConfiguration) { - configuration = erlangBridgeConfiguration; - log.info("mqtt configuration: {}", configuration); + public MQTTVerticle(ErlangBridgeConfiguration configuration) { + this.configuration = configuration; + log.info("mqtt configuration: {}", this.configuration); dataEventsStatusCache = new ConcurrentHashMap<>(); + clientOptions = new MqttClientOptions() + .setAutoKeepAlive(true) + .setKeepAliveTimeSeconds(KEEP_ALIVE_TIME_SECONDS) + .setCleanSession(false) + .setClientId(CLIENT_ID); } @Override public void start() throws Exception { - initMqttClient(); - initBus(); - } - - private void initMqttClient() { - MqttClientOptions options = new MqttClientOptions() - .setAutoKeepAlive(true) - .setKeepAliveTimeSeconds(KEEP_ALIVE_TIME_SECONDS) - .setCleanSession(false) - .setClientId("sys_micro_bridge"); - - client = MqttClient.create(vertx, options); - - initSubscriber(); - + client = MqttClient.create(vertx, clientOptions); client.subscribeCompletionHandler(h -> { log.info("Receive SUBACK from server with granted QoS: {}", h.grantedQoSLevels()); }); @@ -70,18 +65,30 @@ public class MQTTVerticle extends AbstractVerticle { vertx.setTimer(5000, l -> client.disconnect(d -> log.error("Disconnected form server"))); }); + client.pingResponseHandler(event -> log.debug("Pong received!")); + client.closeHandler(event -> { + log.warn("Connection lost! Reconnecting..."); + connectMqtt(); + }); + + connectMqtt(); + initSubscriber(); + initBus(); + } + + private void connectMqtt() { client.connect(configuration.getPort(), configuration.getHost(), ch -> { if (ch.succeeded()) { - log.info("Connected to a server"); + log.debug("Connected to a server"); client.subscribe(MQTT_TOPIC, 0); } else { - log.error("Failed to connect to a server, cause", ch.cause()); + log.error("Failed to connect to a server!", ch.cause()); } }); } private void initBus() { - vertx.eventBus().consumer("mqtt-bus", message -> { + vertx.eventBus().consumer(MQTT_BUS, message -> { MQTTModelBase model = message.body(); log.info("Publish message with model {} [id:{}]", diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 4a37524..a65f433 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -1,3 +1,8 @@ +logging: + level: + biz: + nynja: debug + grpc: port: 6580 diff --git a/src/main/resources/application-production.yml b/src/main/resources/application-production.yml index 2887c91..a7fae12 100644 --- a/src/main/resources/application-production.yml +++ b/src/main/resources/application-production.yml @@ -1,3 +1,12 @@ +spring: + output: + ansi: + enabled: always + +logging: + level: + root: info + grpc: port: ${GRPC_SERVER_PORT:6570} diff --git a/src/test/java/biz/nynja/bridge/verticle/GRPCVerticleTest.java b/src/test/java/biz/nynja/bridge/verticle/GRPCVerticleTest.java index d2b356c..aae1d68 100644 --- a/src/test/java/biz/nynja/bridge/verticle/GRPCVerticleTest.java +++ b/src/test/java/biz/nynja/bridge/verticle/GRPCVerticleTest.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static biz.nynja.bridge.verticle.MQTTVerticle.MQTT_BUS; import static io.grpc.ManagedChannelBuilder.forAddress; @RunWith(VertxUnitRunner.class) @@ -52,7 +53,7 @@ public class GRPCVerticleTest { vertx.eventBus().registerDefaultCodec(Profile.class, new ProfileCodec()); vertx.eventBus().registerDefaultCodec(Account.class, new AccountCodec()); - vertx.eventBus().consumer("mqtt-bus", message -> { + vertx.eventBus().consumer(MQTT_BUS, message -> { vertx.setTimer(500, timer -> { message.reply(true); }); -- GitLab From ef0d4d46e87a1991fb533fe45930341a27aaa121 Mon Sep 17 00:00:00 2001 From: mapuo Date: Wed, 22 May 2019 22:50:10 +0300 Subject: [PATCH 08/25] add delay before reconnecting disable log colors in production --- src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java | 7 +++++-- src/main/resources/application-dev.yml | 5 +++++ src/main/resources/application-production.yml | 2 +- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index f79c8d9..e5448ab 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -67,8 +67,11 @@ public class MQTTVerticle extends AbstractVerticle { }); client.pingResponseHandler(event -> log.debug("Pong received!")); client.closeHandler(event -> { - log.warn("Connection lost! Reconnecting..."); - connectMqtt(); + log.warn("Connection lost! Waiting..."); + vertx.setTimer(5000, timer -> { + log.warn("Reconnecting..."); + connectMqtt(); + }); }); connectMqtt(); diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index a65f433..5929902 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -1,3 +1,8 @@ +spring: + output: + ansi: + enabled: always + logging: level: biz: diff --git a/src/main/resources/application-production.yml b/src/main/resources/application-production.yml index a7fae12..8c5db52 100644 --- a/src/main/resources/application-production.yml +++ b/src/main/resources/application-production.yml @@ -1,7 +1,7 @@ spring: output: ansi: - enabled: always + enabled: never logging: level: -- GitLab From bda567995b18b3566f6d0676e2d078bd157fb07c Mon Sep 17 00:00:00 2001 From: mapuo Date: Wed, 22 May 2019 23:37:56 +0300 Subject: [PATCH 09/25] start verticle with future --- .../biz/nynja/bridge/verticle/MQTTVerticle.java | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index e5448ab..d03e900 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -53,7 +53,7 @@ public class MQTTVerticle extends AbstractVerticle { } @Override - public void start() + public void start(Future startFuture) throws Exception { client = MqttClient.create(vertx, clientOptions); @@ -74,18 +74,30 @@ public class MQTTVerticle extends AbstractVerticle { }); }); - connectMqtt(); + connectMqtt(startFuture); initSubscriber(); initBus(); } private void connectMqtt() { + connectMqtt(null); + } + + private void connectMqtt(Future startFuture) { + log.debug("Connecting to server '{}:{}'...", + configuration.getHost(), configuration.getPort()); client.connect(configuration.getPort(), configuration.getHost(), ch -> { if (ch.succeeded()) { log.debug("Connected to a server"); client.subscribe(MQTT_TOPIC, 0); + if (startFuture != null) { + startFuture.complete(); + } } else { log.error("Failed to connect to a server!", ch.cause()); + if (startFuture != null) { + startFuture.fail(ch.cause()); + } } }); } -- GitLab From 2cc67cea7b5cd835d07e5403527a7ebb19ebd84c Mon Sep 17 00:00:00 2001 From: mapuo Date: Thu, 23 May 2019 12:46:01 +0300 Subject: [PATCH 10/25] rename ErlangBridgeConfiguration to MqttConfiguration add more configuration fields for the Mqtt client --- ...figuration.java => MqttConfiguration.java} | 8 ++- .../nynja/bridge/verticle/MQTTVerticle.java | 51 ++++++++++--------- src/main/resources/application-dev.yml | 8 ++- src/main/resources/application-production.yml | 6 ++- 4 files changed, 43 insertions(+), 30 deletions(-) rename src/main/java/biz/nynja/bridge/properties/{ErlangBridgeConfiguration.java => MqttConfiguration.java} (61%) diff --git a/src/main/java/biz/nynja/bridge/properties/ErlangBridgeConfiguration.java b/src/main/java/biz/nynja/bridge/properties/MqttConfiguration.java similarity index 61% rename from src/main/java/biz/nynja/bridge/properties/ErlangBridgeConfiguration.java rename to src/main/java/biz/nynja/bridge/properties/MqttConfiguration.java index b991a44..88cffa0 100644 --- a/src/main/java/biz/nynja/bridge/properties/ErlangBridgeConfiguration.java +++ b/src/main/java/biz/nynja/bridge/properties/MqttConfiguration.java @@ -10,10 +10,14 @@ import org.springframework.context.annotation.Configuration; @Getter @Setter @Configuration -@ConfigurationProperties(prefix = "erlang-bridge") -public class ErlangBridgeConfiguration { +@ConfigurationProperties(prefix = "mqtt") +public class MqttConfiguration { private String host; private int port; + private String clientId; + private String topic; + private int keepAliveInterval; + private long messageResponseTimeout; } diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index d03e900..dc702e2 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -2,7 +2,7 @@ package biz.nynja.bridge.verticle; import biz.nynja.bridge.cache.DataEventsStatus; import biz.nynja.bridge.model.MQTTModelBase; -import biz.nynja.bridge.properties.ErlangBridgeConfiguration; +import biz.nynja.bridge.properties.MqttConfiguration; import com.ericsson.otp.erlang.*; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.AbstractVerticle; @@ -18,65 +18,66 @@ import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Function; -import static biz.nynja.bridge.bert.OtpErlangHelper.*; +import static biz.nynja.bridge.bert.OtpErlangHelper.getId; +import static biz.nynja.bridge.bert.OtpErlangHelper.headEqualsAtom; @Slf4j @Component public class MQTTVerticle extends AbstractVerticle { public static final String MQTT_BUS = "mqtt-bus"; + private static final int DISCONNECT_DELAY = 5000; + private static final int RECONNECT_DELAY = 5000; - private static final String MQTT_TOPIC = "events/1//api/anon//"; - private static final String CLIENT_ID = "sys_micro_bridge"; - private static final long MESSAGE_TIMEOUT_MILLIS = 2000; - private static final int KEEP_ALIVE_TIME_SECONDS = 20; - - private final ErlangBridgeConfiguration configuration; + private final MqttConfiguration configuration; private final ConcurrentHashMap dataEventsStatusCache; private MqttClient client; private MqttClientOptions clientOptions; - public MQTTVerticle(ErlangBridgeConfiguration configuration) { + public MQTTVerticle(MqttConfiguration configuration) { this.configuration = configuration; - log.info("mqtt configuration: {}", this.configuration); + log.info("Configuration: {}", this.configuration); dataEventsStatusCache = new ConcurrentHashMap<>(); + boolean keepAliveEnabled = configuration.getKeepAliveInterval() > 10; clientOptions = new MqttClientOptions() - .setAutoKeepAlive(true) - .setKeepAliveTimeSeconds(KEEP_ALIVE_TIME_SECONDS) + .setAutoKeepAlive(keepAliveEnabled) + .setKeepAliveTimeSeconds(configuration.getKeepAliveInterval()) .setCleanSession(false) - .setClientId(CLIENT_ID); + .setClientId(configuration.getClientId()); } @Override public void start(Future startFuture) throws Exception { + initMqttClient(); + initBus(); + initSubscriber(); + connectMqtt(startFuture); + } + + private void initMqttClient() { client = MqttClient.create(vertx, clientOptions); client.subscribeCompletionHandler(h -> { log.info("Receive SUBACK from server with granted QoS: {}", h.grantedQoSLevels()); }); client.unsubscribeCompletionHandler(h -> { log.debug("Receive UNSUBACK from server"); - vertx.setTimer(5000, l -> + vertx.setTimer(DISCONNECT_DELAY, l -> client.disconnect(d -> log.error("Disconnected form server"))); }); client.pingResponseHandler(event -> log.debug("Pong received!")); client.closeHandler(event -> { log.warn("Connection lost! Waiting..."); - vertx.setTimer(5000, timer -> { + vertx.setTimer(RECONNECT_DELAY, timer -> { log.warn("Reconnecting..."); connectMqtt(); }); }); - - connectMqtt(startFuture); - initSubscriber(); - initBus(); } private void connectMqtt() { @@ -89,7 +90,7 @@ public class MQTTVerticle extends AbstractVerticle { client.connect(configuration.getPort(), configuration.getHost(), ch -> { if (ch.succeeded()) { log.debug("Connected to a server"); - client.subscribe(MQTT_TOPIC, 0); + client.subscribe(configuration.getTopic(), 0); if (startFuture != null) { startFuture.complete(); } @@ -127,11 +128,11 @@ public class MQTTVerticle extends AbstractVerticle { callBackFuture.setHandler(result -> { message.reply(true); }); - vertx.setTimer(MESSAGE_TIMEOUT_MILLIS, timer -> { + vertx.setTimer(configuration.getMessageResponseTimeout(), timer -> { if (!callBackFuture.isComplete()) { dataEventsStatusCache.remove(model.getId().toString()); - log.warn("Callback future for [id:{}] not complete after {} second(s)", - model.getId(), TimeUnit.MILLISECONDS.toSeconds(MESSAGE_TIMEOUT_MILLIS)); + log.warn("Callback future for [id:{}] not complete after {} milliseconds", + model.getId(), configuration.getMessageResponseTimeout()); callBackFuture.complete(false); } }); @@ -141,7 +142,7 @@ public class MQTTVerticle extends AbstractVerticle { private void publishMqttMessage(Future complete, MQTTModelBase mqttModelBase) { client.publish( - MQTT_TOPIC, + configuration.getTopic(), Buffer.buffer(mqttModelBase.getBertFormat()), MqttQoS.EXACTLY_ONCE, false, diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 5929902..d8de02c 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -11,9 +11,13 @@ logging: grpc: port: 6580 -erlang-bridge: - host: 35.234.110.93 +mqtt: + host: 34.221.152.42 port: 1883 + clientId: sys_micro_bridge_dev + topic: events/1//api/anon// + keepAliveInterval: 20 + messageResponseTimeout: 2000 metrics: port: 6680 diff --git a/src/main/resources/application-production.yml b/src/main/resources/application-production.yml index 8c5db52..ca3cd4f 100644 --- a/src/main/resources/application-production.yml +++ b/src/main/resources/application-production.yml @@ -10,9 +10,13 @@ logging: grpc: port: ${GRPC_SERVER_PORT:6570} -erlang-bridge: +mqtt: host: ${ERLANG_HOST:35.234.110.93} port: ${ERLANG_PORT:1883} + clientId: ${MQTT_CLIENT_ID:sys_micro_bridge} + topic: ${MQTT_TOPIC:events/1//api/anon//} + keepAliveInterval: ${MQTT_KEEP_ALIVE:20} + messageResponseTimeout: ${MQTT_RESP_WAIT:2000} metrics: port: ${METRICS_PORT:6680} -- GitLab From 34bde87c275eb54ff43f756d2499cca688408630 Mon Sep 17 00:00:00 2001 From: mapuo Date: Thu, 23 May 2019 13:35:29 +0300 Subject: [PATCH 11/25] move parsing of the response to own class --- .../nynja/bridge/bert/OtpErlangParser.java | 63 +++++++++++++++ .../nynja/bridge/verticle/MQTTVerticle.java | 77 +++++++------------ 2 files changed, 91 insertions(+), 49 deletions(-) create mode 100644 src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java diff --git a/src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java b/src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java new file mode 100644 index 0000000..a895da1 --- /dev/null +++ b/src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java @@ -0,0 +1,63 @@ +package biz.nynja.bridge.bert; + +import com.ericsson.otp.erlang.*; +import lombok.Getter; + +import static biz.nynja.bridge.bert.OtpErlangHelper.headEqualsAtom; + +/** + * Created by @author mapuo on 23/05/19. + */ +@Getter +public class OtpErlangParser { + + + private final OtpErlangTuple tuple; + private final boolean isErrorMessage; + private final String id; + + public static OtpErlangParser parse(byte[] bytes) { + + try (OtpInputStream inputStream = new OtpInputStream(bytes)) { + + OtpErlangObject parsedObject = inputStream.read_any(); + + if (!(parsedObject instanceof OtpErlangTuple)) { + throw new OtpErlangDecodeException("Message is not tuple!"); + } + + return new OtpErlangParser((OtpErlangTuple) parsedObject); + + } catch (Exception exception) { + exception.printStackTrace(); + } + + return null; + } + + private OtpErlangParser(OtpErlangTuple tuple) { + this.tuple = tuple; + this.isErrorMessage = headEqualsAtom(tuple, "errors"); + this.id = OtpErlangHelper.getId( + (isErrorMessage) + ? extractProfileOrAccount() + : tuple); + } + + private OtpErlangTuple extractProfileOrAccount() { + // error code + // String code = fromBin((OtpErlangBinary) ((OtpErlangList) errorsTuple.elementAt(1)).getHead()); + + // list of errors + OtpErlangList listOfErrors = (OtpErlangList) tuple.elementAt(2); + // first error + OtpErlangTuple head = (OtpErlangTuple) listOfErrors.getHead(); + + // atom with the element that is in error + // OtpErlangObject errorField = head.elementAt(0); + + // tuple - Profile | Account + return (OtpErlangTuple) head.elementAt(1); + } + +} diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index dc702e2..6f7ecd3 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -1,9 +1,9 @@ package biz.nynja.bridge.verticle; +import biz.nynja.bridge.bert.OtpErlangParser; import biz.nynja.bridge.cache.DataEventsStatus; import biz.nynja.bridge.model.MQTTModelBase; import biz.nynja.bridge.properties.MqttConfiguration; -import com.ericsson.otp.erlang.*; import io.netty.handler.codec.mqtt.MqttQoS; import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; @@ -21,9 +21,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; import java.util.function.Function; -import static biz.nynja.bridge.bert.OtpErlangHelper.getId; -import static biz.nynja.bridge.bert.OtpErlangHelper.headEqualsAtom; - @Slf4j @Component public class MQTTVerticle extends AbstractVerticle { @@ -159,59 +156,41 @@ public class MQTTVerticle extends AbstractVerticle { log.info("[messageId:{}] Received on [topic:{}] with QoS [qos:{}]", messageId, publish.topicName(), publish.qosLevel()); - try { - byte[] responseBytes = publish.payload().getBytes(); - log.debug("responseBytes: {}", Arrays.toString(responseBytes)); - - OtpInputStream inputStream = new OtpInputStream(responseBytes); - OtpErlangObject erlangObject = inputStream.read_any(); - log.debug("[messageId:{}] response: {}", messageId, erlangObject); - - if (erlangObject instanceof OtpErlangTuple) { - OtpErlangTuple tuple = (OtpErlangTuple) erlangObject; - boolean isErrorResponse = headEqualsAtom(tuple, "errors"); - LogLevel logLevel = isErrorResponse ? LogLevel.ERROR : LogLevel.INFO; - - String id = (isErrorResponse) - ? getId(extractProfileOrAccount(tuple)) - : getId(tuple); + byte[] responseBytes = publish.payload().getBytes(); + log.debug("responseBytes: {}", Arrays.toString(responseBytes)); - DataEventsStatus eventStatus = dataEventsStatusCache.remove(id); + final OtpErlangParser parse = OtpErlangParser.parse(responseBytes); - if (Objects.isNull(eventStatus)) { - logLevel.log(log, "[messageId:{}] Response from erlang [id:{}] without duplicating in cache", - messageId, id); - return; - } - if (eventStatus.getCallBack().isComplete()) { - logLevel.log(log, "[messageId:{}] Response from erlang [id:{}]. Future is complete ", - messageId, id); - return; - } - logLevel.log(log, "[messageId:{}] Calling callback for [id:{}] with result = {}", - messageId, id, !isErrorResponse); - eventStatus.getCallBack().complete(!isErrorResponse); - } - } catch (OtpErlangDecodeException e) { + if (parse == null) { log.error("[messageId:{}] Could not read response!", messageId); + return; } - }); - } - private OtpErlangTuple extractProfileOrAccount(OtpErlangTuple errorsTuple) { - // error code - // String code = fromBin((OtpErlangBinary) ((OtpErlangList) errorsTuple.elementAt(1)).getHead()); + log.debug("[messageId:{}] response: {}", + messageId, parse.getTuple()); - // list of errors - OtpErlangList listOfErrors = (OtpErlangList) errorsTuple.elementAt(2); - // first error - OtpErlangTuple head = (OtpErlangTuple) listOfErrors.getHead(); + String id = parse.getId(); + boolean isErrorResponse = parse.isErrorMessage(); + LogLevel logLevel = isErrorResponse ? LogLevel.ERROR : LogLevel.INFO; - // atom with the element that is in error - // OtpErlangObject errorField = head.elementAt(0); - // tuple - Profile | Account - return (OtpErlangTuple) head.elementAt(1); + DataEventsStatus eventStatus = dataEventsStatusCache.remove(id); + + if (Objects.isNull(eventStatus)) { + logLevel.log(log, "[messageId:{}] Response from erlang [id:{}] without duplicating in cache", + messageId, id); + return; + } + if (eventStatus.getCallBack().isComplete()) { + logLevel.log(log, "[messageId:{}] Response from erlang [id:{}]. Future is complete ", + messageId, id); + return; + } + logLevel.log(log, "[messageId:{}] Calling callback for [id:{}] with result = {}", + messageId, id, !isErrorResponse); + + eventStatus.getCallBack().complete(!isErrorResponse); + }); } @RequiredArgsConstructor -- GitLab From 42c4b24fac66c7c59474d826a2f2a025a7d1891b Mon Sep 17 00:00:00 2001 From: mapuo Date: Thu, 23 May 2019 14:26:03 +0300 Subject: [PATCH 12/25] add CircuitBreaker when reconnecting to MQTT --- pom.xml | 8 ++- .../nynja/bridge/verticle/MQTTVerticle.java | 68 ++++++++++++------- src/main/resources/application-dev.yml | 2 +- 3 files changed, 52 insertions(+), 26 deletions(-) diff --git a/pom.xml b/pom.xml index 4a8c898..6fa588b 100644 --- a/pom.xml +++ b/pom.xml @@ -64,6 +64,12 @@ ${vertx.version} + + io.vertx + vertx-config-yaml + ${vertx.version} + + io.vertx vertx-mqtt @@ -78,7 +84,7 @@ io.vertx - vertx-config-yaml + vertx-circuit-breaker ${vertx.version} diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index 6f7ecd3..f5347a5 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -5,6 +5,8 @@ import biz.nynja.bridge.cache.DataEventsStatus; import biz.nynja.bridge.model.MQTTModelBase; import biz.nynja.bridge.properties.MqttConfiguration; import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.circuitbreaker.CircuitBreaker; +import io.vertx.circuitbreaker.CircuitBreakerOptions; import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; @@ -34,6 +36,7 @@ public class MQTTVerticle extends AbstractVerticle { private MqttClient client; private MqttClientOptions clientOptions; + private CircuitBreaker circuitBreaker; public MQTTVerticle(MqttConfiguration configuration) { this.configuration = configuration; @@ -58,6 +61,13 @@ public class MQTTVerticle extends AbstractVerticle { } private void initMqttClient() { + CircuitBreakerOptions options = new CircuitBreakerOptions() + .setMaxFailures(5) + .setTimeout(15000) + .setMaxRetries(10) + .setResetTimeout(30000); + circuitBreaker = CircuitBreaker.create("reconnect", vertx, options); + client = MqttClient.create(vertx, clientOptions); client.subscribeCompletionHandler(h -> { log.info("Receive SUBACK from server with granted QoS: {}", h.grantedQoSLevels()); @@ -69,35 +79,45 @@ public class MQTTVerticle extends AbstractVerticle { }); client.pingResponseHandler(event -> log.debug("Pong received!")); client.closeHandler(event -> { - log.warn("Connection lost! Waiting..."); - vertx.setTimer(RECONNECT_DELAY, timer -> { - log.warn("Reconnecting..."); - connectMqtt(); - }); + log.warn("Connection lost!"); + circuitBreaker + .retryPolicy(integer -> { + log.debug("integer: {}", integer); + long delay = ((integer + 1) * RECONNECT_DELAY); + log.info("Reconnecting in: {}ms", delay); + return delay; + }) + .execute(this::connectMqtt); }); } - private void connectMqtt() { - connectMqtt(null); - } - - private void connectMqtt(Future startFuture) { + private void connectMqtt(Future future) { log.debug("Connecting to server '{}:{}'...", configuration.getHost(), configuration.getPort()); - client.connect(configuration.getPort(), configuration.getHost(), ch -> { - if (ch.succeeded()) { - log.debug("Connected to a server"); - client.subscribe(configuration.getTopic(), 0); - if (startFuture != null) { - startFuture.complete(); - } - } else { - log.error("Failed to connect to a server!", ch.cause()); - if (startFuture != null) { - startFuture.fail(ch.cause()); - } - } - }); + + vertx.executeBlocking( + event -> { + client.connect(configuration.getPort(), configuration.getHost(), ch -> { + if (ch.succeeded()) { + log.debug("Connected to a server"); + client.subscribe(configuration.getTopic(), 0); + event.complete(); + } else { + log.error("Failed to connect to a server!", ch.cause()); + event.fail(ch.cause()); + } + }); + }, + result -> { + if (result.succeeded()) { + log.trace("calling future.complete"); + future.complete(); + } else { + log.trace("calling future.fail"); + future.fail(result.cause()); + } + }); + } private void initBus() { diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index d8de02c..ad88aed 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -6,7 +6,7 @@ spring: logging: level: biz: - nynja: debug + nynja: trace grpc: port: 6580 -- GitLab From 20f7d91908c82b7eec44f8bfcfc1393f21957560 Mon Sep 17 00:00:00 2001 From: mapuo Date: Thu, 23 May 2019 14:26:17 +0300 Subject: [PATCH 13/25] upgrade to Vertx 3.7.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 6fa588b..b38554b 100644 --- a/pom.xml +++ b/pom.xml @@ -32,7 +32,7 @@ UTF-8 UTF-8 10 - 3.6.3 + 3.7.0 1.7.26 1.2.3 0.6.0 -- GitLab From d5ff0800a8d86092f6c835d2c3885836ed2cdada Mon Sep 17 00:00:00 2001 From: mapuo Date: Thu, 6 Jun 2019 19:03:41 +0300 Subject: [PATCH 14/25] when logging Profile ID also log the Account ID --- src/main/java/biz/nynja/bridge/model/Account.java | 3 +-- src/main/java/biz/nynja/bridge/model/MQTTModelBase.java | 4 ++++ src/main/java/biz/nynja/bridge/model/Profile.java | 9 +++++++++ .../java/biz/nynja/bridge/verticle/MQTTVerticle.java | 3 +-- 4 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/main/java/biz/nynja/bridge/model/Account.java b/src/main/java/biz/nynja/bridge/model/Account.java index 2a5af2b..83b3007 100644 --- a/src/main/java/biz/nynja/bridge/model/Account.java +++ b/src/main/java/biz/nynja/bridge/model/Account.java @@ -43,8 +43,7 @@ public class Account implements MQTTModelBase { @Override @SneakyThrows public byte[] getBertFormat() { - return account.asByteBufferWithVersionTag() - .array(); + return account.asByteBufferWithVersionTag().array(); } } diff --git a/src/main/java/biz/nynja/bridge/model/MQTTModelBase.java b/src/main/java/biz/nynja/bridge/model/MQTTModelBase.java index 2c7f355..524fa4f 100644 --- a/src/main/java/biz/nynja/bridge/model/MQTTModelBase.java +++ b/src/main/java/biz/nynja/bridge/model/MQTTModelBase.java @@ -8,4 +8,8 @@ public interface MQTTModelBase { byte[] getBertFormat(); + default String toLogString() { + return getClass().getSimpleName() + " [id:" + getId() + "]"; + } + } diff --git a/src/main/java/biz/nynja/bridge/model/Profile.java b/src/main/java/biz/nynja/bridge/model/Profile.java index ca01e65..ed09788 100644 --- a/src/main/java/biz/nynja/bridge/model/Profile.java +++ b/src/main/java/biz/nynja/bridge/model/Profile.java @@ -17,6 +17,8 @@ public class Profile implements MQTTModelBase { @Getter private UUID id; // phone in erlang + private String accountId; + private OtpErlangBuilder profile; public Profile(ProfileData profileData, StatusType status) { @@ -26,9 +28,11 @@ public class Profile implements MQTTModelBase { if (status == StatusType.CREATE) { Account account = new Account(profileData.getDefaultAccount(), status); accountBuilder.addTuple(account.getAccount()); + accountId = account.getId().toString(); } else { profileData.getAccountsIdsList() .forEach(accountBuilder::addStringAsBinary); + accountId = String.join(",", profileData.getAccountsIdsList()); } profile = OtpErlangBuilder.newTupleBuilder() @@ -52,4 +56,9 @@ public class Profile implements MQTTModelBase { return profile.asByteBufferWithVersionTag().array(); } + @Override + public String toLogString() { + return MQTTModelBase.super.toLogString() + " / Account [id:" + accountId + "]"; + } + } diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index f5347a5..7079de5 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -124,8 +124,7 @@ public class MQTTVerticle extends AbstractVerticle { vertx.eventBus().consumer(MQTT_BUS, message -> { MQTTModelBase model = message.body(); - log.info("Publish message with model {} [id:{}]", - model.getClass().getSimpleName(), model.getId()); + log.info("Publish message with model {}", model.toLogString()); Future publishFuture = Future.future(); publishMqttMessage(publishFuture, model); -- GitLab From d58f246ee4baed54f65203927e7086c391cf14f1 Mon Sep 17 00:00:00 2001 From: mapuo Date: Thu, 13 Jun 2019 12:08:04 +0300 Subject: [PATCH 15/25] username should be added to the BERT --- src/main/java/biz/nynja/bridge/model/Account.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/biz/nynja/bridge/model/Account.java b/src/main/java/biz/nynja/bridge/model/Account.java index 83b3007..f39b969 100644 --- a/src/main/java/biz/nynja/bridge/model/Account.java +++ b/src/main/java/biz/nynja/bridge/model/Account.java @@ -22,13 +22,14 @@ public class Account implements MQTTModelBase { public Account(AccountData accountData, StatusType status) { id = UUID.fromString(accountData.getAccountId()); + account = OtpErlangBuilder.newTupleBuilder() .addAtom(TYPE) .addStringAsBinary(id.toString()) .addStringAsBinary(accountData.getFirstName(), new OtpErlangList()) .addStringAsBinary(accountData.getLastName(), new OtpErlangList()) .addStringAsBinary(null, new OtpErlangList()) // email - .addStringAsBinary(null, new OtpErlangList()) // nickname + .addStringAsBinary(accountData.getUsername(), new OtpErlangList()) // nickname .addList() .addList() .addList() -- GitLab From 8fc85846583f13b98f3818eb046776586b096ae3 Mon Sep 17 00:00:00 2001 From: mapuo Date: Fri, 14 Jun 2019 00:22:42 +0300 Subject: [PATCH 16/25] [NY-7528] Pass phone number and email values from account microservice to the messaging server --- pom.xml | 2 +- .../nynja/bridge/bert/OtpErlangBuilder.java | 5 ++++ .../java/biz/nynja/bridge/model/Account.java | 17 ++++++++++++-- .../java/biz/nynja/bridge/model/Profile.java | 23 ++++++++++++++++++- 4 files changed, 43 insertions(+), 4 deletions(-) diff --git a/pom.xml b/pom.xml index b38554b..a223ce7 100644 --- a/pom.xml +++ b/pom.xml @@ -177,7 +177,7 @@ libs-snapshot-local.biz.nynja.protos - bridge-service-ny-5863-bridge-service + bridge-service-intracoldev 1.0-SNAPSHOT diff --git a/src/main/java/biz/nynja/bridge/bert/OtpErlangBuilder.java b/src/main/java/biz/nynja/bridge/bert/OtpErlangBuilder.java index 4534bd3..a933041 100644 --- a/src/main/java/biz/nynja/bridge/bert/OtpErlangBuilder.java +++ b/src/main/java/biz/nynja/bridge/bert/OtpErlangBuilder.java @@ -120,6 +120,11 @@ public class OtpErlangBuilder { abstract T instance(); + public T addObject(OtpErlangObject object) { + objects.add(object); + return instance(); + } + public T addList() { objects.add(new OtpErlangList()); return instance(); diff --git a/src/main/java/biz/nynja/bridge/model/Account.java b/src/main/java/biz/nynja/bridge/model/Account.java index f39b969..231b6b4 100644 --- a/src/main/java/biz/nynja/bridge/model/Account.java +++ b/src/main/java/biz/nynja/bridge/model/Account.java @@ -25,18 +25,31 @@ public class Account implements MQTTModelBase { account = OtpErlangBuilder.newTupleBuilder() .addAtom(TYPE) + // id - .addStringAsBinary(id.toString()) + // names .addStringAsBinary(accountData.getFirstName(), new OtpErlangList()) + // surnames .addStringAsBinary(accountData.getLastName(), new OtpErlangList()) - .addStringAsBinary(null, new OtpErlangList()) // email - .addStringAsBinary(accountData.getUsername(), new OtpErlangList()) // nickname + // email + .addStringAsBinary(accountData.getEmail(), new OtpErlangList()) + // nick + .addStringAsBinary(accountData.getUsername(), new OtpErlangList()) + // userlist .addList() + // roomlist .addList() + // favorite .addList() + // tags .addList() + // phone - should be the profile_uuid .addStringAsBinary(accountData.getProfileId()) + // avatar .addStringAsBinary(accountData.getAvatar(), new OtpErlangList()) + // update .addLong(Long.valueOf(accountData.getLastUpdateTimestamp())) + // status .addAtom(status.toString()) .build(); } diff --git a/src/main/java/biz/nynja/bridge/model/Profile.java b/src/main/java/biz/nynja/bridge/model/Profile.java index ed09788..4aeb85a 100644 --- a/src/main/java/biz/nynja/bridge/model/Profile.java +++ b/src/main/java/biz/nynja/bridge/model/Profile.java @@ -3,6 +3,8 @@ package biz.nynja.bridge.model; import biz.nynja.bridge.bert.OtpErlangBuilder; import biz.nynja.bridge.bert.OtpErlangBuilder.ListBuilder; import biz.nynja.bridge.grpc.ProfileData; +import com.ericsson.otp.erlang.OtpErlangList; +import com.ericsson.otp.erlang.OtpErlangObject; import lombok.Getter; import lombok.SneakyThrows; import lombok.ToString; @@ -35,17 +37,36 @@ public class Profile implements MQTTModelBase { accountId = String.join(",", profileData.getAccountsIdsList()); } + OtpErlangObject phone = new OtpErlangList(); + if (profileData.getPhoneNumber() != null) { + // #'Feature'{id = <<"PHONE_12134">>, key = <<"PHONE">>, value = <<"380566432134">>, group = <<"PROFILE_DATA">>}. + phone = OtpErlangBuilder.newTupleBuilder() + .addStringAsBinary(profileData.getProfileId()) + .addStringAsBinary("PHONE") + .addStringAsBinary(profileData.getPhoneNumber()) + .addStringAsBinary("PROFILE_DATA") + .build() + .getObject(); + } + profile = OtpErlangBuilder.newTupleBuilder() .addAtom(TYPE) + // phone .addStringAsBinary(id.toString()) + // services .addList() + // rosters .addList(accountBuilder) - .addList() + // settings - should have the phone number + // id - uniq id + .addObject(phone) // update .addLong(Long.valueOf(profileData.getLastUpdateTimestamp())) // balance .addInt() + // presence .addList() + // status .addAtom(status.toString()) .build(); } -- GitLab From 32b2ec497e2f1d6232d265a27f9a5b9f555183ca Mon Sep 17 00:00:00 2001 From: mapuo Date: Fri, 14 Jun 2019 15:40:10 +0300 Subject: [PATCH 17/25] [NY-7528] Pass phone number and email values from account microservice to the messaging server fix Feature building --- .../java/biz/nynja/bridge/model/Profile.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/src/main/java/biz/nynja/bridge/model/Profile.java b/src/main/java/biz/nynja/bridge/model/Profile.java index 4aeb85a..6b91b67 100644 --- a/src/main/java/biz/nynja/bridge/model/Profile.java +++ b/src/main/java/biz/nynja/bridge/model/Profile.java @@ -4,7 +4,6 @@ import biz.nynja.bridge.bert.OtpErlangBuilder; import biz.nynja.bridge.bert.OtpErlangBuilder.ListBuilder; import biz.nynja.bridge.grpc.ProfileData; import com.ericsson.otp.erlang.OtpErlangList; -import com.ericsson.otp.erlang.OtpErlangObject; import lombok.Getter; import lombok.SneakyThrows; import lombok.ToString; @@ -37,14 +36,19 @@ public class Profile implements MQTTModelBase { accountId = String.join(",", profileData.getAccountsIdsList()); } - OtpErlangObject phone = new OtpErlangList(); - if (profileData.getPhoneNumber() != null) { + OtpErlangList phone = new OtpErlangList(); + if (profileData.getPhoneNumber() != null && !profileData.getPhoneNumber().trim().isEmpty()) { // #'Feature'{id = <<"PHONE_12134">>, key = <<"PHONE">>, value = <<"380566432134">>, group = <<"PROFILE_DATA">>}. - phone = OtpErlangBuilder.newTupleBuilder() - .addStringAsBinary(profileData.getProfileId()) - .addStringAsBinary("PHONE") - .addStringAsBinary(profileData.getPhoneNumber()) - .addStringAsBinary("PROFILE_DATA") + phone = (OtpErlangList) OtpErlangBuilder.newListBuilder() + .addTuple( + OtpErlangBuilder.newTupleBuilder() + .addAtom("Feature") + .addStringAsBinary(profileData.getProfileId()) + .addStringAsBinary("PHONE") + .addStringAsBinary(profileData.getPhoneNumber()) + .addStringAsBinary("PROFILE_DATA") + .build() + ) .build() .getObject(); } -- GitLab From d81570802b16d15f4f42ef0fbe3b3540e3c880d2 Mon Sep 17 00:00:00 2001 From: mapuo Date: Tue, 18 Jun 2019 20:01:47 +0300 Subject: [PATCH 18/25] make BERT messages easy to read in log --- .../nynja/bridge/bert/OtpErlangBinstr.java | 35 +++++++++++++++++++ .../nynja/bridge/bert/OtpErlangBuilder.java | 4 +-- .../nynja/bridge/bert/OtpErlangParser.java | 2 +- .../biz/nynja/bridge/bert/OtpInputParser.java | 35 +++++++++++++++++++ .../nynja/bridge/verticle/GRPCVerticle.java | 5 +++ .../nynja/bridge/verticle/MQTTVerticle.java | 2 +- 6 files changed, 79 insertions(+), 4 deletions(-) create mode 100644 src/main/java/biz/nynja/bridge/bert/OtpErlangBinstr.java create mode 100644 src/main/java/biz/nynja/bridge/bert/OtpInputParser.java diff --git a/src/main/java/biz/nynja/bridge/bert/OtpErlangBinstr.java b/src/main/java/biz/nynja/bridge/bert/OtpErlangBinstr.java new file mode 100644 index 0000000..67b9ba3 --- /dev/null +++ b/src/main/java/biz/nynja/bridge/bert/OtpErlangBinstr.java @@ -0,0 +1,35 @@ +package biz.nynja.bridge.bert; + +import com.ericsson.otp.erlang.OtpErlangBinary; +import com.ericsson.otp.erlang.OtpErlangDecodeException; +import com.ericsson.otp.erlang.OtpInputStream; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Created by @author mapuo on 18/06/19. + */ +public class OtpErlangBinstr extends OtpErlangBinary { + + public OtpErlangBinstr(byte[] bin) { + super(bin); + } + + public OtpErlangBinstr(OtpInputStream buf) throws OtpErlangDecodeException { + super(buf); + } + + public OtpErlangBinstr(Object o) { + super(o); + } + + @Override + public String toString() { + String string = new String(bin, UTF_8); + if (string.isEmpty()) { + return String.valueOf(0); + } + return "#BinStr<'" + string + "'>"; + } + +} diff --git a/src/main/java/biz/nynja/bridge/bert/OtpErlangBuilder.java b/src/main/java/biz/nynja/bridge/bert/OtpErlangBuilder.java index a933041..6cdaf18 100644 --- a/src/main/java/biz/nynja/bridge/bert/OtpErlangBuilder.java +++ b/src/main/java/biz/nynja/bridge/bert/OtpErlangBuilder.java @@ -260,13 +260,13 @@ public class OtpErlangBuilder { } public T addStringAsBinary(String s) { - objects.add(new OtpErlangBinary(s.getBytes(UTF_8))); + objects.add(new OtpErlangBinstr(s.getBytes(UTF_8))); return instance(); } public T addStringAsBinary(String s, OtpErlangObject defaultIfNull) { if (Objects.nonNull(s)) { - objects.add(new OtpErlangBinary(s.getBytes(UTF_8))); + objects.add(new OtpErlangBinstr(s.getBytes(UTF_8))); } else { objects.add(defaultIfNull); } diff --git a/src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java b/src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java index a895da1..9bb1adb 100644 --- a/src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java +++ b/src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java @@ -18,7 +18,7 @@ public class OtpErlangParser { public static OtpErlangParser parse(byte[] bytes) { - try (OtpInputStream inputStream = new OtpInputStream(bytes)) { + try (OtpInputParser inputStream = new OtpInputParser(bytes)) { OtpErlangObject parsedObject = inputStream.read_any(); diff --git a/src/main/java/biz/nynja/bridge/bert/OtpInputParser.java b/src/main/java/biz/nynja/bridge/bert/OtpInputParser.java new file mode 100644 index 0000000..25a03ef --- /dev/null +++ b/src/main/java/biz/nynja/bridge/bert/OtpInputParser.java @@ -0,0 +1,35 @@ +package biz.nynja.bridge.bert; + +import com.ericsson.otp.erlang.*; + +/** + * Created by @author mapuo on 18/06/19. + */ +public class OtpInputParser extends OtpInputStream { + public OtpInputParser(byte[] buf) { + super(buf); + } + + public OtpInputParser(byte[] buf, int flags) { + super(buf, flags); + } + + public OtpInputParser(byte[] buf, int offset, int length, int flags) { + super(buf, offset, length, flags); + } + + @Override + public OtpErlangObject read_any() throws OtpErlangDecodeException { + OtpErlangObject object = super.read_any(); + if (object instanceof OtpErlangBinary) { + OtpErlangBinary bytes = (OtpErlangBinary) object; + return new OtpErlangBinstr(bytes.binaryValue()); + } + if (object instanceof OtpErlangBitstr) { + OtpErlangBitstr bytes = (OtpErlangBitstr) object; + return new OtpErlangBinstr(bytes.binaryValue()); + } + return object; + } + +} diff --git a/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java b/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java index 3227934..770b866 100644 --- a/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java @@ -16,10 +16,13 @@ import io.vertx.core.Handler; import io.vertx.core.eventbus.Message; import io.vertx.grpc.VertxServer; import io.vertx.grpc.VertxServerBuilder; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import static biz.nynja.bridge.verticle.MQTTVerticle.MQTT_BUS; +import static com.google.protobuf.TextFormat.shortDebugString; +@Slf4j @Component public class GRPCVerticle extends AbstractVerticle { @@ -63,6 +66,7 @@ public class GRPCVerticle extends AbstractVerticle { public void createProfile(ProfileData request, Future response) { Histogram.Timer timer = requestLatency.startTimer(); Profile profile = new Profile(request, StatusType.CREATE); + log.debug("Received [ProfileData: {}] Sending [Profile: {}]", shortDebugString(request), profile); vertx.eventBus().send(MQTT_BUS, profile, new ReplyHandler(response, timer)); } @@ -70,6 +74,7 @@ public class GRPCVerticle extends AbstractVerticle { public void deleteProfile(ProfileData request, Future response) { Histogram.Timer timer = requestLatency.startTimer(); Profile profile = new Profile(request, StatusType.DELETE); + log.debug("Received [ProfileData: {}] Sending [Profile: {}]", shortDebugString(request), profile); vertx.eventBus().send(MQTT_BUS, profile, new ReplyHandler(response, timer)); } diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index 7079de5..df65a99 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -176,7 +176,7 @@ public class MQTTVerticle extends AbstractVerticle { messageId, publish.topicName(), publish.qosLevel()); byte[] responseBytes = publish.payload().getBytes(); - log.debug("responseBytes: {}", Arrays.toString(responseBytes)); + log.trace("responseBytes: {}", Arrays.toString(responseBytes)); final OtpErlangParser parse = OtpErlangParser.parse(responseBytes); -- GitLab From 21637a243270284a8c4db5ba2e1b3c754af6a51c Mon Sep 17 00:00:00 2001 From: mapuo Date: Tue, 18 Jun 2019 20:02:18 +0300 Subject: [PATCH 19/25] make debug logs appear in production (should be temporary) --- src/main/resources/application-production.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/resources/application-production.yml b/src/main/resources/application-production.yml index ca3cd4f..7364ee8 100644 --- a/src/main/resources/application-production.yml +++ b/src/main/resources/application-production.yml @@ -5,7 +5,7 @@ spring: logging: level: - root: info + root: debug grpc: port: ${GRPC_SERVER_PORT:6570} -- GitLab From cc394f2654adb85d18d0a9733cc555a20ecbe058 Mon Sep 17 00:00:00 2001 From: mapuo Date: Tue, 18 Jun 2019 20:24:13 +0300 Subject: [PATCH 20/25] make debug logs appear in production (should be temporary) --- src/main/resources/application-production.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/resources/application-production.yml b/src/main/resources/application-production.yml index 7364ee8..3209b1f 100644 --- a/src/main/resources/application-production.yml +++ b/src/main/resources/application-production.yml @@ -5,7 +5,9 @@ spring: logging: level: - root: debug + root: info + biz: + nynja: debug grpc: port: ${GRPC_SERVER_PORT:6570} -- GitLab From 29b6e3e305d8fffefd522865a0d62a2c192ac975 Mon Sep 17 00:00:00 2001 From: mapuo Date: Tue, 18 Jun 2019 20:43:13 +0300 Subject: [PATCH 21/25] make BERT messages easy to read in log --- src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java b/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java index 770b866..bb4478e 100644 --- a/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java @@ -82,6 +82,7 @@ public class GRPCVerticle extends AbstractVerticle { public void createAccount(AccountData request, Future response) { Histogram.Timer timer = requestLatency.startTimer(); Account profile = new Account(request, StatusType.CREATE); + log.debug("Received [AccountData: {}] Sending [Account: {}]", shortDebugString(request), profile); vertx.eventBus().send(MQTT_BUS, profile, new ReplyHandler(response, timer)); } @@ -89,6 +90,7 @@ public class GRPCVerticle extends AbstractVerticle { public void updateAccount(AccountData request, Future response) { Histogram.Timer timer = requestLatency.startTimer(); Account profile = new Account(request, StatusType.UPDATE); + log.debug("Received [AccountData: {}] Sending [Account: {}]", shortDebugString(request), profile); vertx.eventBus().send(MQTT_BUS, profile, new ReplyHandler(response, timer)); } @@ -96,6 +98,7 @@ public class GRPCVerticle extends AbstractVerticle { public void deleteAccount(ProfileData request, Future response) { Histogram.Timer timer = requestLatency.startTimer(); Profile profile = new Profile(request, StatusType.DELETE); + log.debug("Received [AccountData: {}] Sending [Account: {}]", shortDebugString(request), profile); vertx.eventBus().send(MQTT_BUS, profile, new ReplyHandler(response, timer)); } }; -- GitLab From ccf651e011ca3fee6eccf2922bf9fefc22bd83f8 Mon Sep 17 00:00:00 2001 From: mapuo Date: Wed, 19 Jun 2019 13:06:28 +0300 Subject: [PATCH 22/25] don't throw exception if there is no ID --- src/main/java/biz/nynja/bridge/bert/OtpErlangHelper.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/main/java/biz/nynja/bridge/bert/OtpErlangHelper.java b/src/main/java/biz/nynja/bridge/bert/OtpErlangHelper.java index 5133695..92d3baf 100644 --- a/src/main/java/biz/nynja/bridge/bert/OtpErlangHelper.java +++ b/src/main/java/biz/nynja/bridge/bert/OtpErlangHelper.java @@ -17,8 +17,12 @@ public class OtpErlangHelper { } public static String getId(OtpErlangTuple profileOrAccount) { - OtpErlangBinary uuidBin = (OtpErlangBinary) profileOrAccount.elementAt(1); - return new String(uuidBin.binaryValue()); + OtpErlangObject elementAt = profileOrAccount.elementAt(1); + if (elementAt instanceof OtpErlangBinary) { + OtpErlangBinary uuidBin = (OtpErlangBinary) elementAt; + return new String(uuidBin.binaryValue()); + } + return null; } public static boolean headEquals(OtpErlangTuple tuple, OtpErlangObject object) { -- GitLab From 786899d57a78bd5c1c541efbdc5664f4897a63e6 Mon Sep 17 00:00:00 2001 From: mapuo Date: Wed, 19 Jun 2019 13:06:57 +0300 Subject: [PATCH 23/25] log parsed erlang object --- src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java b/src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java index 9bb1adb..8e4ac9e 100644 --- a/src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java +++ b/src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java @@ -2,12 +2,14 @@ package biz.nynja.bridge.bert; import com.ericsson.otp.erlang.*; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import static biz.nynja.bridge.bert.OtpErlangHelper.headEqualsAtom; /** * Created by @author mapuo on 23/05/19. */ +@Slf4j @Getter public class OtpErlangParser { @@ -21,6 +23,7 @@ public class OtpErlangParser { try (OtpInputParser inputStream = new OtpInputParser(bytes)) { OtpErlangObject parsedObject = inputStream.read_any(); + log.trace("parsedObject: {}", parsedObject); if (!(parsedObject instanceof OtpErlangTuple)) { throw new OtpErlangDecodeException("Message is not tuple!"); -- GitLab From ba943bc0c9fa135e7980bd0848c5d0b24c863c51 Mon Sep 17 00:00:00 2001 From: mapuo Date: Wed, 19 Jun 2019 13:08:10 +0300 Subject: [PATCH 24/25] revert: make debug logs appear in production (should be temporary) --- src/main/resources/application-production.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/resources/application-production.yml b/src/main/resources/application-production.yml index 3209b1f..ca3cd4f 100644 --- a/src/main/resources/application-production.yml +++ b/src/main/resources/application-production.yml @@ -6,8 +6,6 @@ spring: logging: level: root: info - biz: - nynja: debug grpc: port: ${GRPC_SERVER_PORT:6570} -- GitLab From 3617e1efeba48b6327947ca4ef47981ea1d258b3 Mon Sep 17 00:00:00 2001 From: Nicolas Berthet Date: Wed, 19 Jun 2019 19:57:37 +0800 Subject: [PATCH 25/25] Update staging deployment to use flux --- releases/staging/bridge-service.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/releases/staging/bridge-service.yaml b/releases/staging/bridge-service.yaml index e285469..ccb9b72 100644 --- a/releases/staging/bridge-service.yaml +++ b/releases/staging/bridge-service.yaml @@ -1,4 +1,4 @@ -#apiVersion: flux.weave.works/v1beta1 +apiVersion: flux.weave.works/v1beta1 kind: HelmRelease metadata: name: bridge-service -- GitLab