diff --git a/Dockerfile b/Dockerfile index 4b5b6d38b365df697279eb23952476f7dbc8b851..bc6bf03547fa0d087b58ead57c168461948b814c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ FROM openjdk:10-jdk-slim -ENV JAVA_OPTS=-XX:+UseContainerSupport  +ENV JAVA_OPTS=-XX:+UseContainerSupport EXPOSE 8080 diff --git a/pom.xml b/pom.xml index e98046e9b32baacde26fdd26b5d7107900fbf7e2..a223ce73731db96aebaa094686c40b107d251079 100644 --- a/pom.xml +++ b/pom.xml @@ -16,16 +16,14 @@ - - jar - org.springframework.boot spring-boot-starter-parent 2.0.3.RELEASE - + + jar bridge-service Bridge microservice @@ -34,10 +32,11 @@ UTF-8 UTF-8 10 - 3.6.3 - 1.6.4 + 3.7.0 + 1.7.26 1.2.3 0.6.0 + 1.18.8 @@ -65,13 +64,18 @@ ${vertx.version} + + io.vertx + vertx-config-yaml + ${vertx.version} + + io.vertx vertx-mqtt ${vertx.version} - io.vertx vertx-grpc @@ -80,7 +84,7 @@ io.vertx - vertx-config-yaml + vertx-circuit-breaker ${vertx.version} @@ -90,6 +94,12 @@ ${vertx.version} + + org.erlang.otp + jinterface + 1.6.1 + + io.prometheus simpleclient @@ -106,7 +116,7 @@ org.projectlombok lombok - 1.18.2 + ${lombok.version} provided @@ -125,14 +135,8 @@ org.slf4j - slf4j-api - 1.8.0-beta2 - - - - org.slf4j - slf4j-simple - 1.8.0-beta2 + log4j-over-slf4j + ${slf4j.version} @@ -141,6 +145,7 @@ ${logback.version} + io.vertx vertx-unit @@ -155,7 +160,6 @@ test - junit junit @@ -166,14 +170,14 @@ org.codehaus.groovy groovy-all - 2.5.4 + 2.5.7 pom libs-snapshot-local.biz.nynja.protos - bridge-service-ny-5863-bridge-service + bridge-service-intracoldev 1.0-SNAPSHOT diff --git a/releases/staging/bridge-service.yaml b/releases/staging/bridge-service.yaml index e2854693ee61bff70198b50a1fab333ec59c8854..ccb9b728fd911371534f96447b70ebcb74996062 100644 --- a/releases/staging/bridge-service.yaml +++ b/releases/staging/bridge-service.yaml @@ -1,4 +1,4 @@ -#apiVersion: flux.weave.works/v1beta1 +apiVersion: flux.weave.works/v1beta1 kind: HelmRelease metadata: name: bridge-service diff --git a/src/main/java/biz/nynja/bridge/bert/Bert.java b/src/main/java/biz/nynja/bridge/bert/Bert.java deleted file mode 100644 index 333656637b50c6f89d9a59bf3988bfef7950fde6..0000000000000000000000000000000000000000 --- a/src/main/java/biz/nynja/bridge/bert/Bert.java +++ /dev/null @@ -1,377 +0,0 @@ -package biz.nynja.bridge.bert; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.ArrayList; -import java.util.HashMap; - -public class Bert { - - private byte[] mFloatStr = new byte[31]; - private ByteBuffer mBuffer = null; - private ByteArrayOutputStream bao = null; - private Object mValue = null; - - public static class Atom { - public String name; - - public Atom(String name) { - this.name = name; - } - - public int hashCode() { - return name.hashCode(); - } - - public boolean equals(Object obj) { - if (!(obj instanceof Atom)) return false; - return name.compareTo(((Atom) obj).name) == 0; - } - - public String toString() { - return name; - } - } - - public static class Time { - - public Time() { - } - - public Time(long ts) { - timestamp = ts; - - microsecond = (int) ((ts % 1000) * 1000); - second = (int) ((ts / 1000) % 1000000); - megasecond = (int) ((ts / 1000) / 1000000); - } - - public long timestamp = 0; - - public int megasecond = 0; - public int second = 0; - public int microsecond = 0; - } - - public static class Tuple extends ArrayList { - } - - public static class List extends ArrayList { - public boolean isProper = true; - } - - public static class Dict extends HashMap { - } - - public Bert() { - } - - private void writeAtom(Atom a, ByteArrayOutputStream bao) throws BertException { - int len = a.name.length(); - if (len >= 65536) throw new BertException("Atom Name too Long"); - bao.write(100); - bao.write((byte) (len >> 8) & 0x00FF); - bao.write((byte) (len) & 0x00FF); - try { - bao.write(a.name.getBytes("ISO-8859-1")); - } catch (UnsupportedEncodingException ex) { - throw new BertException("ISO 8859-1 is not Supported at Your Java Environment"); - } catch (IOException ex) { - throw new BertException(ex.getMessage()); - } - } - - private void writeTuple(Tuple tuple) throws BertException { - int len = tuple.size(); - - if (len < 256) { - bao.write(104); - bao.write((byte) (len & 0x00FF)); - } else { - bao.write(105); - bao.write((byte) ((len >> 24) & 0x00FF)); - bao.write((byte) ((len >> 16) & 0x00FF)); - bao.write((byte) ((len >> 8) & 0x00FF)); - bao.write((byte) ((len) & 0x00FF)); - } - - for (int count = 0; count < tuple.size(); count++) { - encodeTerm(tuple.get(count)); - } - } - - private void writeList(List list) throws BertException { - int len = list.size(); - - bao.write(108); - bao.write((byte) ((len >> 24) & 0x00FF)); - bao.write((byte) ((len >> 16) & 0x00FF)); - bao.write((byte) ((len >> 8) & 0x00FF)); - bao.write((byte) ((len) & 0x00FF)); - - for (int count = 0; count < list.size(); count++) { - encodeTerm(list.get(count)); - } - - if (list.isProper) bao.write(106); - } - - private void encodeTerm(Object o) throws BertException { - - if (o == null) { - Atom bert = new Atom("bert"); - Atom nil = new Atom("nil"); - Tuple tup = new Tuple(); - tup.add(bert); - tup.add(nil); - writeTuple(tup); - } else if (o instanceof Boolean) { - Atom bert = new Atom("bert"); - Atom nil = new Atom((boolean) o ? "true" : "false"); - Tuple tup = new Tuple(); - tup.add(bert); - tup.add(nil); - writeTuple(tup); - } else if (o instanceof Integer) { - int value = (int) o; - if (value >= 0 && value <= 255) { - bao.write(97); - bao.write((byte) (value & 0x00FF)); - } else { - bao.write(98); - bao.write((byte) ((value >> 24) & 0x00FF)); - bao.write((byte) ((value >> 16) & 0x00FF)); - bao.write((byte) ((value >> 8) & 0x00FF)); - bao.write((byte) ((value) & 0x00FF)); - } - } else if (o instanceof Double || o instanceof Float) { - double d = (double) o; - byte[] val = String.format("%.20e", o).getBytes(); - try { - bao.write(99); - bao.write(val); - if (val.length < 31) { - for (int count = 0; count < 31 - val.length; count++) bao.write(0); - } - } catch (IOException ex) { - throw new BertException(ex.getMessage()); - } - } else if (o instanceof Tuple) { - writeTuple((Tuple) o); - } else if (o instanceof String) { - try { - byte[] str = ((String) o).getBytes("UTF-8"); - bao.write(107); - bao.write((byte) ((str.length >> 8) & 0x00FF)); - bao.write((byte) ((str.length) & 0x00FF)); - bao.write(str); - } catch (UnsupportedEncodingException ex) { - new BertException("String not in UTF-8"); - } catch (IOException ex) { - new BertException(ex.getMessage()); - } - } else if (o instanceof Atom) { - writeAtom((Atom) o, bao); - } else if (o instanceof byte[]) { - int value = ((byte[]) o).length; - bao.write(109); - bao.write((byte) ((value >> 24) & 0x00FF)); - bao.write((byte) ((value >> 16) & 0x00FF)); - bao.write((byte) ((value >> 8) & 0x00FF)); - bao.write((byte) ((value) & 0x00FF)); - try { - bao.write((byte[]) o); - } catch (IOException ex) { - new BertException(ex.getMessage()); - } - } else if (o instanceof Time) { - Time time = (Time) o; - - Tuple tuple = new Tuple(); - tuple.add(new Atom("bert")); - tuple.add(new Atom("time")); - tuple.add(time.megasecond); - tuple.add(time.second); - tuple.add(time.microsecond); - - writeTuple(tuple); - } else if (o instanceof List) { - List list = (List) o; - if (list.size() == 0) { - bao.write(106); - } else { - writeList((List) o); - } - - } - - } - - public byte[] encode(Object o) throws BertException { - bao = new ByteArrayOutputStream(); - bao.write(-125); - - encodeTerm(o); - - return bao.toByteArray(); - } - - - public Bert(final byte[] data) throws BertException { - mBuffer = ByteBuffer.wrap(data).order(ByteOrder.BIG_ENDIAN); - - byte value = mBuffer.get(); - if (value != -125) - throw new BertException("Invalid Bert Data"); - - mValue = decode(); - } - - private Object decodeBertTerm(Tuple t) throws BertException { - if (t.get(0) instanceof Atom && ((Atom) t.get(0)).name.compareTo("bert") == 0) { - if (t.size() == 5) { - if (t.get(0) instanceof Atom && t.get(1) instanceof Atom && - ((Atom) t.get(0)).name.compareTo("bert") == 0 && - ((Atom) t.get(1)).name.compareTo("time") == 0 && - t.get(2) instanceof Integer && - t.get(3) instanceof Integer && - t.get(4) instanceof Integer) { - - Time time = new Time(); - - time.timestamp = ((int) t.get(2) * (long) 1000000 * (long) 1000) + ((int) t.get(3) * (long) 1000) + ((int) t.get(4) / 1000); - time.megasecond = (int) t.get(2); - time.second = (int) t.get(3); - time.microsecond = (int) t.get(4); - - return time; - } - } else if (t.size() == 2) { - String v = ((Atom) t.get(1)).name; - if (v.compareTo("nil") == 0) { - return null; - } else if (v.compareTo("true") == 0) { - return true; - } else if (v.compareTo("false") == 0) { - return false; - } - } else if (t.size() == 3) { - if (t.get(0) instanceof Atom && t.get(1) instanceof Atom && - ((Atom) t.get(0)).name.compareTo("bert") == 0 && - ((Atom) t.get(1)).name.compareTo("dict") == 0 && - t.get(2) instanceof List) { - Dict d = new Dict(); - List l = (List) t.get(2); - - for (int count = 0; count < l.size(); count++) { - Tuple tup = (Tuple) l.get(count); - if (tup.size() != 2) - throw new BertException("Invalid Dict Entry"); - d.put(tup.get(0), tup.get(1)); - } - - return d; - } - } - } - - return t; - } - - private Object decodeSmallTuple() throws BertException { - int len = mBuffer.get() & 0x00FFFFFFFF; - - Tuple tuple = new Tuple(); - for (int count = 0; count < len; count++) { - tuple.add(decode()); - } - - return decodeBertTerm(tuple); - } - - private Object decodeLargeTuple() throws BertException { - int len = mBuffer.getInt() & 0x00FF; - - Tuple tuple = new Tuple(); - for (int count = 0; count < len; count++) { - tuple.add(decode()); - } - - return decodeBertTerm(tuple); - } - - public List decodeList() throws BertException { - int len = mBuffer.getInt() & 0x00FF; - - List list = new List(); - for (int count = 0; count < len; count++) { - list.add(decode()); - } - - Object o = decode(); - if (!(o instanceof List)) { - list.add(o); - list.isProper = false; - } - - return list; - } - - private Object decode() throws BertException { - int tag = mBuffer.get() & 0x00FF; - byte[] val = null; - long len = 0; - - switch (tag) { - case 97: // SmallInt Tag - return (int) (mBuffer.get() & 0x00FF); - case 98: // Int Tag - return mBuffer.getInt(); - case 99: // FloatTag - mBuffer.get(mFloatStr); - return Double.parseDouble(new String(mFloatStr)); - case 100: // AtomTag - len = mBuffer.getShort() & 0x00FFFF; - val = new byte[(int) len]; - mBuffer.get(val); - Atom atom = new Atom(new String(val)); - return atom; - case 104: // SmallTupleTag - return decodeSmallTuple(); - case 105: // LargeTupleTag - return decodeLargeTuple(); - case 106: // NilTag - return new List(); - case 107: // StringTag - len = mBuffer.getShort() & 0x00FFFF; - val = new byte[(int) len]; - mBuffer.get(val); - try { - return new String(val, "UTF-8"); - } catch (UnsupportedEncodingException ex) { - return new String(val); - } - case 108: // ListTag - return decodeList(); - case 109: // BinTag - len = mBuffer.getInt() & 0x00FFFFFFFF; - val = new byte[(int) len]; - mBuffer.get(val); - return val; - default: - throw new BertException("Not Supported Bert Tag"); - } - } - - public Object getValue() { - return mValue; - } - - public String toString() { - return mValue == null ? null : mValue.toString(); - } - -} diff --git a/src/main/java/biz/nynja/bridge/bert/BertException.java b/src/main/java/biz/nynja/bridge/bert/BertException.java deleted file mode 100644 index bb72da05311e62c1ca47272076c3bedf2ec90427..0000000000000000000000000000000000000000 --- a/src/main/java/biz/nynja/bridge/bert/BertException.java +++ /dev/null @@ -1,21 +0,0 @@ -package biz.nynja.bridge.bert; - -public class BertException extends Exception { - - private static final long serialVersionUID = 0; - - private Throwable cause; - - public BertException(String message) { - super(message); - } - - public BertException(Throwable cause) { - super(cause.getMessage()); - this.cause = cause; - } - - public Throwable getCause() { - return this.cause; - } -} diff --git a/src/main/java/biz/nynja/bridge/bert/OtpErlangBinstr.java b/src/main/java/biz/nynja/bridge/bert/OtpErlangBinstr.java new file mode 100644 index 0000000000000000000000000000000000000000..67b9ba3b17e9becf4e43f3d47729a5f9b603804b --- /dev/null +++ b/src/main/java/biz/nynja/bridge/bert/OtpErlangBinstr.java @@ -0,0 +1,35 @@ +package biz.nynja.bridge.bert; + +import com.ericsson.otp.erlang.OtpErlangBinary; +import com.ericsson.otp.erlang.OtpErlangDecodeException; +import com.ericsson.otp.erlang.OtpInputStream; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Created by @author mapuo on 18/06/19. + */ +public class OtpErlangBinstr extends OtpErlangBinary { + + public OtpErlangBinstr(byte[] bin) { + super(bin); + } + + public OtpErlangBinstr(OtpInputStream buf) throws OtpErlangDecodeException { + super(buf); + } + + public OtpErlangBinstr(Object o) { + super(o); + } + + @Override + public String toString() { + String string = new String(bin, UTF_8); + if (string.isEmpty()) { + return String.valueOf(0); + } + return "#BinStr<'" + string + "'>"; + } + +} diff --git a/src/main/java/biz/nynja/bridge/bert/OtpErlangBuilder.java b/src/main/java/biz/nynja/bridge/bert/OtpErlangBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..6cdaf18bbfffec8501da043d88ce1e5c425e0c72 --- /dev/null +++ b/src/main/java/biz/nynja/bridge/bert/OtpErlangBuilder.java @@ -0,0 +1,278 @@ +package biz.nynja.bridge.bert; + +import com.ericsson.otp.erlang.*; +import lombok.Getter; +import lombok.ToString; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static java.nio.charset.StandardCharsets.UTF_8; + +/** + * Created by @author mapuo on 18/05/19. + */ +@ToString +public class OtpErlangBuilder { + + @Getter + private OtpErlangObject object; + + public OtpOutputStream asOtpOutputStream() { + return new OtpOutputStream(object); + } + + public ByteBuffer asByteBuffer() { + return ByteBuffer.wrap(asOtpOutputStream().toByteArray()); + } + + public ByteBuffer asByteBufferWithVersionTag() { + ByteBuffer buffer = asByteBuffer(); + return ByteBuffer.allocate(buffer.capacity() + 1) + .put((byte) OtpExternal.versionTag) + .put(buffer) + .flip(); + } + + private OtpErlangBuilder(Builder builder) { + object = builder.object; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static TupleBuilder newTupleBuilder() { + return new TupleBuilder(); + } + + public static ListBuilder newListBuilder() { + return new ListBuilder(); + } + + public static final class Builder { + private OtpErlangObject object; + + private Builder() { + } + + public Builder setObject(OtpErlangObject object) { + this.object = object; + return this; + } + + public OtpErlangBuilder build() { + return new OtpErlangBuilder(this); + } + } + + public static final class TupleBuilder extends TermBuilder { + private OtpErlangTuple tuple; + + public TupleBuilder() { + } + + public TupleBuilder setTuple(OtpErlangTuple tuple) { + this.tuple = tuple; + return this; + } + + @Override + TupleBuilder instance() { + return this; + } + + public OtpErlangBuilder build() { + tuple = new OtpErlangTuple(objects.toArray(new OtpErlangObject[0])); + Builder builder = new Builder().setObject(tuple); + return new OtpErlangBuilder(builder); + } + + } + + public static final class ListBuilder extends TermBuilder { + OtpErlangList list; + + public ListBuilder() { + } + + @Override + ListBuilder instance() { + return this; + } + + public OtpErlangBuilder build() { + list = new OtpErlangList(objects.toArray(new OtpErlangObject[0])); + Builder builder = new Builder().setObject(list); + return new OtpErlangBuilder(builder); + } + + } + + public static abstract class TermBuilder { + + protected List objects = new ArrayList<>(); + + private TermBuilder() { + } + + abstract T instance(); + + public T addObject(OtpErlangObject object) { + objects.add(object); + return instance(); + } + + public T addList() { + objects.add(new OtpErlangList()); + return instance(); + } + + public T addList(OtpErlangList list) { + objects.add(list); + return instance(); + } + + public T addList(OtpErlangObject[] elems) { + objects.add(new OtpErlangList(elems)); + return instance(); + } + + public T addList(ListBuilder builder) { + objects.add(builder.build().object); + return instance(); + } + + public T addList(OtpErlangBuilder formatter) { + objects.add(formatter.object); + return instance(); + } + + public T addTuple(OtpErlangTuple tuple) { + objects.add(tuple); + return instance(); + } + + public T addTuple(OtpErlangObject[] elems) { + objects.add(new OtpErlangTuple(elems)); + return instance(); + } + + public T addTuple(TupleBuilder builder) { + objects.add(builder.build().object); + return instance(); + } + + public T addTuple(OtpErlangBuilder formatter) { + objects.add(formatter.object); + return instance(); + } + + public T addAtom(String atom) { + objects.add(new OtpErlangAtom(atom)); + return instance(); + } + + public T addAtom(boolean atom) { + objects.add(new OtpErlangAtom(atom)); + return instance(); + } + + + public T addBinary(byte[] bin) { + objects.add(new OtpErlangBinary(bin)); + return instance(); + } + + public T addBoolean(boolean bool) { + objects.add(new OtpErlangBoolean(bool)); + return instance(); + } + + public T addByte(byte bin) { + objects.add(new OtpErlangByte(bin)); + return instance(); + } + + public T addBitstr(byte[] bin) { + objects.add(new OtpErlangBitstr(bin)); + return instance(); + } + + public T addChar(char c) { + objects.add(new OtpErlangChar(c)); + return instance(); + } + + public T addDouble(double d) { + objects.add(new OtpErlangDouble(d)); + return instance(); + } + + public T addFloat(float f) { + objects.add(new OtpErlangFloat(f)); + return instance(); + } + + public T addInt() { + objects.add(new OtpErlangInt(0)); + return instance(); + } + + public T addInt(int i) { + objects.add(new OtpErlangInt(i)); + return instance(); + } + + public T addLong(long l) { + objects.add(new OtpErlangLong(l)); + return instance(); + } + + public T addShort(short s) { + objects.add(new OtpErlangShort(s)); + return instance(); + } + + public T addUnsignedInt(int i) { + try { + objects.add(new OtpErlangUInt(i)); + } catch (OtpErlangRangeException e) { + e.printStackTrace(); + } + return instance(); + } + + public T addUnsignedShort(short s) { + try { + objects.add(new OtpErlangUShort(s)); + } catch (OtpErlangRangeException e) { + e.printStackTrace(); + } + return instance(); + } + + public T addString(String s) { + objects.add(new OtpErlangString(s)); + return instance(); + } + + public T addStringAsBinary(String s) { + objects.add(new OtpErlangBinstr(s.getBytes(UTF_8))); + return instance(); + } + + public T addStringAsBinary(String s, OtpErlangObject defaultIfNull) { + if (Objects.nonNull(s)) { + objects.add(new OtpErlangBinstr(s.getBytes(UTF_8))); + } else { + objects.add(defaultIfNull); + } + return instance(); + } + + } + +} diff --git a/src/main/java/biz/nynja/bridge/bert/OtpErlangHelper.java b/src/main/java/biz/nynja/bridge/bert/OtpErlangHelper.java new file mode 100644 index 0000000000000000000000000000000000000000..92d3baf013621d01783a7de9f753da322a9a7f45 --- /dev/null +++ b/src/main/java/biz/nynja/bridge/bert/OtpErlangHelper.java @@ -0,0 +1,49 @@ +package biz.nynja.bridge.bert; + +import com.ericsson.otp.erlang.OtpErlangAtom; +import com.ericsson.otp.erlang.OtpErlangBinary; +import com.ericsson.otp.erlang.OtpErlangObject; +import com.ericsson.otp.erlang.OtpErlangTuple; + +import java.util.UUID; + +/** + * Created by @author mapuo on 20/05/19. + */ +public class OtpErlangHelper { + + public static UUID getUuid(OtpErlangTuple profileOrAccount) { + return UUID.fromString(getId(profileOrAccount)); + } + + public static String getId(OtpErlangTuple profileOrAccount) { + OtpErlangObject elementAt = profileOrAccount.elementAt(1); + if (elementAt instanceof OtpErlangBinary) { + OtpErlangBinary uuidBin = (OtpErlangBinary) elementAt; + return new String(uuidBin.binaryValue()); + } + return null; + } + + public static boolean headEquals(OtpErlangTuple tuple, OtpErlangObject object) { + if (tuple == null || tuple.arity() == 0) { + return false; + } + + return tuple.elementAt(0).equals(object); + } + + public static boolean headEqualsAtom(OtpErlangTuple tuple, String atom) { + return headEqualsAtom(tuple, new OtpErlangAtom(atom)); + } + + public static boolean headEqualsAtom(OtpErlangTuple tuple, OtpErlangAtom atom) { + return headEquals(tuple, atom); + } + + public static String fromBin(OtpErlangBinary bin) { + return new String(bin.binaryValue()); + } + + +} diff --git a/src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java b/src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java new file mode 100644 index 0000000000000000000000000000000000000000..8e4ac9ed3174f671645a9955ea98b66408982086 --- /dev/null +++ b/src/main/java/biz/nynja/bridge/bert/OtpErlangParser.java @@ -0,0 +1,66 @@ +package biz.nynja.bridge.bert; + +import com.ericsson.otp.erlang.*; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import static biz.nynja.bridge.bert.OtpErlangHelper.headEqualsAtom; + +/** + * Created by @author mapuo on 23/05/19. + */ +@Slf4j +@Getter +public class OtpErlangParser { + + + private final OtpErlangTuple tuple; + private final boolean isErrorMessage; + private final String id; + + public static OtpErlangParser parse(byte[] bytes) { + + try (OtpInputParser inputStream = new OtpInputParser(bytes)) { + + OtpErlangObject parsedObject = inputStream.read_any(); + log.trace("parsedObject: {}", parsedObject); + + if (!(parsedObject instanceof OtpErlangTuple)) { + throw new OtpErlangDecodeException("Message is not tuple!"); + } + + return new OtpErlangParser((OtpErlangTuple) parsedObject); + + } catch (Exception exception) { + exception.printStackTrace(); + } + + return null; + } + + private OtpErlangParser(OtpErlangTuple tuple) { + this.tuple = tuple; + this.isErrorMessage = headEqualsAtom(tuple, "errors"); + this.id = OtpErlangHelper.getId( + (isErrorMessage) + ? extractProfileOrAccount() + : tuple); + } + + private OtpErlangTuple extractProfileOrAccount() { + // error code + // String code = fromBin((OtpErlangBinary) ((OtpErlangList) errorsTuple.elementAt(1)).getHead()); + + // list of errors + OtpErlangList listOfErrors = (OtpErlangList) tuple.elementAt(2); + // first error + OtpErlangTuple head = (OtpErlangTuple) listOfErrors.getHead(); + + // atom with the element that is in error + // OtpErlangObject errorField = head.elementAt(0); + + // tuple - Profile | Account + return (OtpErlangTuple) head.elementAt(1); + } + +} diff --git a/src/main/java/biz/nynja/bridge/bert/OtpInputParser.java b/src/main/java/biz/nynja/bridge/bert/OtpInputParser.java new file mode 100644 index 0000000000000000000000000000000000000000..25a03ef1ffac0c4ef86d359b847560fb1ed8077b --- /dev/null +++ b/src/main/java/biz/nynja/bridge/bert/OtpInputParser.java @@ -0,0 +1,35 @@ +package biz.nynja.bridge.bert; + +import com.ericsson.otp.erlang.*; + +/** + * Created by @author mapuo on 18/06/19. + */ +public class OtpInputParser extends OtpInputStream { + public OtpInputParser(byte[] buf) { + super(buf); + } + + public OtpInputParser(byte[] buf, int flags) { + super(buf, flags); + } + + public OtpInputParser(byte[] buf, int offset, int length, int flags) { + super(buf, offset, length, flags); + } + + @Override + public OtpErlangObject read_any() throws OtpErlangDecodeException { + OtpErlangObject object = super.read_any(); + if (object instanceof OtpErlangBinary) { + OtpErlangBinary bytes = (OtpErlangBinary) object; + return new OtpErlangBinstr(bytes.binaryValue()); + } + if (object instanceof OtpErlangBitstr) { + OtpErlangBitstr bytes = (OtpErlangBitstr) object; + return new OtpErlangBinstr(bytes.binaryValue()); + } + return object; + } + +} diff --git a/src/main/java/biz/nynja/bridge/model/Account.java b/src/main/java/biz/nynja/bridge/model/Account.java index 5caf5fc38fdfac1df2748e2840288c2d426306b9..231b6b42dab5e86012fde00beaede8e05767994b 100644 --- a/src/main/java/biz/nynja/bridge/model/Account.java +++ b/src/main/java/biz/nynja/bridge/model/Account.java @@ -1,80 +1,63 @@ package biz.nynja.bridge.model; -import biz.nynja.bridge.bert.Bert; +import biz.nynja.bridge.bert.OtpErlangBuilder; import biz.nynja.bridge.grpc.AccountData; +import com.ericsson.otp.erlang.OtpErlangList; import lombok.Getter; import lombok.SneakyThrows; +import lombok.ToString; -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.Objects; import java.util.UUID; -import static java.nio.charset.StandardCharsets.UTF_8; - -//Roster in ENC +@ToString public class Account implements MQTTModelBase { + + private static final String TYPE = "Roster"; + @Getter private UUID id; - private String names; - private String surnames; - private String email; - private String nick; - private UUID profileId; - private String avatar; - private Long update; - private StatusType status; - - public Account(AccountData accountData, StatusType status) { - this.id = UUID.fromString(accountData.getAccountId()); - this.names = accountData.getFirstName(); - this.surnames = accountData.getLastName(); - this.profileId = UUID.fromString(accountData.getProfileId()); - this.avatar = accountData.getAvatar(); - this.update = Long.valueOf(accountData.getLastUpdateTimestamp()); - this.status = status; - } - public Bert.Tuple getBertTupleWithData() { - Bert.Tuple tuple = new Bert.Tuple(); - tuple.add(new Bert.Atom("Roster")); - tuple.add(id.toString().getBytes(UTF_8)); - tuple.add(Objects.nonNull(names) ? names.getBytes(UTF_8) : new Bert.List()); - tuple.add(Objects.nonNull(surnames) ? surnames.getBytes(UTF_8) : new Bert.List()); - tuple.add(Objects.nonNull(email) ? email.getBytes(UTF_8) : new Bert.List()); - tuple.add(Objects.nonNull(nick) ? nick.getBytes(UTF_8) : new Bert.List()); + @Getter + private OtpErlangBuilder account; - tuple.add(new Bert.List());//userlist - tuple.add(new Bert.List());//roomlist - tuple.add(new Bert.List());//favorite - tuple.add(new Bert.List());//Tags - tuple.add(profileId.toString().getBytes(UTF_8)); - tuple.add(avatar.getBytes(UTF_8)); - tuple.add(new Bert.List());//update - tuple.add(new Bert.Atom(status.toString())); - return tuple; + public Account(AccountData accountData, StatusType status) { + id = UUID.fromString(accountData.getAccountId()); + + account = OtpErlangBuilder.newTupleBuilder() + .addAtom(TYPE) + // id - + .addStringAsBinary(id.toString()) + // names + .addStringAsBinary(accountData.getFirstName(), new OtpErlangList()) + // surnames + .addStringAsBinary(accountData.getLastName(), new OtpErlangList()) + // email + .addStringAsBinary(accountData.getEmail(), new OtpErlangList()) + // nick + .addStringAsBinary(accountData.getUsername(), new OtpErlangList()) + // userlist + .addList() + // roomlist + .addList() + // favorite + .addList() + // tags + .addList() + // phone - should be the profile_uuid + .addStringAsBinary(accountData.getProfileId()) + // avatar + .addStringAsBinary(accountData.getAvatar(), new OtpErlangList()) + // update + .addLong(Long.valueOf(accountData.getLastUpdateTimestamp())) + // status + .addAtom(status.toString()) + .build(); } @Override @SneakyThrows public byte[] getBertFormat() { - Bert.Tuple tuple = new Bert.Tuple(); - tuple.add("Roster".getBytes(UTF_8)); - tuple.add(id.toString().getBytes(UTF_8)); - tuple.add(Objects.nonNull(names) ? names.getBytes(UTF_8) : new Bert.List()); - tuple.add(Objects.nonNull(surnames) ? surnames.getBytes(UTF_8) : new Bert.List()); - tuple.add(Objects.nonNull(email) ? email.getBytes(UTF_8) : new Bert.List()); - tuple.add(Objects.nonNull(nick) ? nick.getBytes(UTF_8) : new Bert.List()); - - tuple.add(new Bert.List());//userlist - tuple.add(new Bert.List());//roomlist - tuple.add(new Bert.List());//favorite - tuple.add(new Bert.List());//Tags - tuple.add(profileId.toString().getBytes(UTF_8)); - tuple.add(avatar.getBytes(UTF_8)); - tuple.add(new Bert.Time(update)); - tuple.add(new Bert.Atom(status.toString())); - - return new Bert().encode(tuple); + return account.asByteBufferWithVersionTag().array(); } + } diff --git a/src/main/java/biz/nynja/bridge/model/MQTTModelBase.java b/src/main/java/biz/nynja/bridge/model/MQTTModelBase.java index 976ed0d97d46f56586f4ea2918567ab0500b643f..524fa4f99b64369d15a4c52f0ad0c8a1296d1396 100644 --- a/src/main/java/biz/nynja/bridge/model/MQTTModelBase.java +++ b/src/main/java/biz/nynja/bridge/model/MQTTModelBase.java @@ -1,8 +1,5 @@ package biz.nynja.bridge.model; -import biz.nynja.bridge.bert.BertException; - -import java.io.UnsupportedEncodingException; import java.util.UUID; public interface MQTTModelBase { @@ -10,4 +7,9 @@ public interface MQTTModelBase { UUID getId(); byte[] getBertFormat(); + + default String toLogString() { + return getClass().getSimpleName() + " [id:" + getId() + "]"; + } + } diff --git a/src/main/java/biz/nynja/bridge/model/Profile.java b/src/main/java/biz/nynja/bridge/model/Profile.java index 09dd125e8afd120629613c36865ce9fb8f5cd4a4..6b91b67187ed09b84e20783ce1ad37b19295e067 100644 --- a/src/main/java/biz/nynja/bridge/model/Profile.java +++ b/src/main/java/biz/nynja/bridge/model/Profile.java @@ -1,65 +1,89 @@ package biz.nynja.bridge.model; -import biz.nynja.bridge.bert.Bert; +import biz.nynja.bridge.bert.OtpErlangBuilder; +import biz.nynja.bridge.bert.OtpErlangBuilder.ListBuilder; import biz.nynja.bridge.grpc.ProfileData; +import com.ericsson.otp.erlang.OtpErlangList; import lombok.Getter; import lombok.SneakyThrows; +import lombok.ToString; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; import java.util.UUID; -import java.util.stream.Collectors; - -import static java.nio.charset.StandardCharsets.*; +@ToString public class Profile implements MQTTModelBase { + private static final String TYPE = "Profile"; + @Getter - private UUID id; //phone in erlang - private Account account; - private List accounts; - private StatusType status; - private Long update; + private UUID id; // phone in erlang + + private String accountId; + + private OtpErlangBuilder profile; public Profile(ProfileData profileData, StatusType status) { - this.id = UUID.fromString(profileData.getProfileId()); - if (status.equals(StatusType.CREATE)) { - this.account = new Account(profileData.getDefaultAccount(), status); + id = UUID.fromString(profileData.getProfileId()); + + ListBuilder accountBuilder = OtpErlangBuilder.newListBuilder(); + if (status == StatusType.CREATE) { + Account account = new Account(profileData.getDefaultAccount(), status); + accountBuilder.addTuple(account.getAccount()); + accountId = account.getId().toString(); } else { - this.accounts = profileData.getAccountsIdsList().stream().map(UUID::fromString).collect(Collectors.toList()); + profileData.getAccountsIdsList() + .forEach(accountBuilder::addStringAsBinary); + accountId = String.join(",", profileData.getAccountsIdsList()); + } + + OtpErlangList phone = new OtpErlangList(); + if (profileData.getPhoneNumber() != null && !profileData.getPhoneNumber().trim().isEmpty()) { + // #'Feature'{id = <<"PHONE_12134">>, key = <<"PHONE">>, value = <<"380566432134">>, group = <<"PROFILE_DATA">>}. + phone = (OtpErlangList) OtpErlangBuilder.newListBuilder() + .addTuple( + OtpErlangBuilder.newTupleBuilder() + .addAtom("Feature") + .addStringAsBinary(profileData.getProfileId()) + .addStringAsBinary("PHONE") + .addStringAsBinary(profileData.getPhoneNumber()) + .addStringAsBinary("PROFILE_DATA") + .build() + ) + .build() + .getObject(); } - this.status = status; - this.update = Long.valueOf(profileData.getLastUpdateTimestamp()); + + profile = OtpErlangBuilder.newTupleBuilder() + .addAtom(TYPE) + // phone + .addStringAsBinary(id.toString()) + // services + .addList() + // rosters + .addList(accountBuilder) + // settings - should have the phone number + // id - uniq id + .addObject(phone) + // update + .addLong(Long.valueOf(profileData.getLastUpdateTimestamp())) + // balance + .addInt() + // presence + .addList() + // status + .addAtom(status.toString()) + .build(); } @Override @SneakyThrows public byte[] getBertFormat() { - Bert.Tuple tuple = new Bert.Tuple(); - - tuple.add(new Bert.Atom("Profile")); - tuple.add(id.toString().getBytes(UTF_8));//phone - tuple.add(new Bert.List());//services - if (status.equals(StatusType.CREATE)) { - Bert.List accounts = new Bert.List(); - accounts.add(account.getBertTupleWithData()); - tuple.add(accounts); - } else { - Bert.List accountsIdsBertFormat = new Bert.List(); - accounts.forEach(uuid -> { - accountsIdsBertFormat.add(uuid.toString().getBytes(UTF_8)); - }); - tuple.add(accountsIdsBertFormat); - } - tuple.add(new Bert.List());//settings - tuple.add(new Bert.List());//update - tuple.add(new Bert.List());//balance - tuple.add(new Bert.List());//presence - tuple.add(new Bert.Atom(status.toString())); + return profile.asByteBufferWithVersionTag().array(); + } - Bert bert = new Bert(); - return new Bert().encode(tuple); + @Override + public String toLogString() { + return MQTTModelBase.super.toLogString() + " / Account [id:" + accountId + "]"; } -} \ No newline at end of file + +} diff --git a/src/main/java/biz/nynja/bridge/model/StatusType.java b/src/main/java/biz/nynja/bridge/model/StatusType.java index d61cf5e12cbbc68f4c7ba1fbe8b08478874bdfb2..6078ed31c8a78942e028394237a61ba03e951fa1 100644 --- a/src/main/java/biz/nynja/bridge/model/StatusType.java +++ b/src/main/java/biz/nynja/bridge/model/StatusType.java @@ -1,19 +1,19 @@ package biz.nynja.bridge.model; - import lombok.AllArgsConstructor; -import lombok.ToString; @AllArgsConstructor -@ToString public enum StatusType { + CREATE("create"), UPDATE("update"), - DELETE("del"); + DELETE("remove"); + private final String text; @Override public String toString() { return text; } + } diff --git a/src/main/java/biz/nynja/bridge/properties/ErlangBridgeConfiguration.java b/src/main/java/biz/nynja/bridge/properties/MqttConfiguration.java similarity index 55% rename from src/main/java/biz/nynja/bridge/properties/ErlangBridgeConfiguration.java rename to src/main/java/biz/nynja/bridge/properties/MqttConfiguration.java index 27f4ea1a5992a7c1eb0b5b43eabc408225566948..88cffa0007dd27c6a6bbcecd7c2abf695af63424 100644 --- a/src/main/java/biz/nynja/bridge/properties/ErlangBridgeConfiguration.java +++ b/src/main/java/biz/nynja/bridge/properties/MqttConfiguration.java @@ -1,20 +1,23 @@ package biz.nynja.bridge.properties; - -import io.vertx.core.json.JsonObject; import lombok.Getter; import lombok.Setter; -import lombok.extern.slf4j.Slf4j; +import lombok.ToString; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; +@ToString @Getter @Setter @Configuration -@ConfigurationProperties(prefix = "erlang-bridge") -public class ErlangBridgeConfiguration { +@ConfigurationProperties(prefix = "mqtt") +public class MqttConfiguration { private String host; private int port; + private String clientId; + private String topic; + private int keepAliveInterval; + private long messageResponseTimeout; } diff --git a/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java b/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java index 54b4cf025df8b68d59c9d143b3e374a73848020e..bb4478eda86d20333e62f4558db944312ba88cba 100644 --- a/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/GRPCVerticle.java @@ -16,8 +16,13 @@ import io.vertx.core.Handler; import io.vertx.core.eventbus.Message; import io.vertx.grpc.VertxServer; import io.vertx.grpc.VertxServerBuilder; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import static biz.nynja.bridge.verticle.MQTTVerticle.MQTT_BUS; +import static com.google.protobuf.TextFormat.shortDebugString; + +@Slf4j @Component public class GRPCVerticle extends AbstractVerticle { @@ -61,35 +66,40 @@ public class GRPCVerticle extends AbstractVerticle { public void createProfile(ProfileData request, Future response) { Histogram.Timer timer = requestLatency.startTimer(); Profile profile = new Profile(request, StatusType.CREATE); - vertx.eventBus().send("mqtt-bus", profile, new ReplyHandler(response, timer)); + log.debug("Received [ProfileData: {}] Sending [Profile: {}]", shortDebugString(request), profile); + vertx.eventBus().send(MQTT_BUS, profile, new ReplyHandler(response, timer)); } @Override public void deleteProfile(ProfileData request, Future response) { Histogram.Timer timer = requestLatency.startTimer(); Profile profile = new Profile(request, StatusType.DELETE); - vertx.eventBus().send("mqtt-bus", profile, new ReplyHandler(response, timer)); + log.debug("Received [ProfileData: {}] Sending [Profile: {}]", shortDebugString(request), profile); + vertx.eventBus().send(MQTT_BUS, profile, new ReplyHandler(response, timer)); } @Override public void createAccount(AccountData request, Future response) { Histogram.Timer timer = requestLatency.startTimer(); Account profile = new Account(request, StatusType.CREATE); - vertx.eventBus().send("mqtt-bus", profile, new ReplyHandler(response, timer)); + log.debug("Received [AccountData: {}] Sending [Account: {}]", shortDebugString(request), profile); + vertx.eventBus().send(MQTT_BUS, profile, new ReplyHandler(response, timer)); } @Override public void updateAccount(AccountData request, Future response) { Histogram.Timer timer = requestLatency.startTimer(); Account profile = new Account(request, StatusType.UPDATE); - vertx.eventBus().send("mqtt-bus", profile, new ReplyHandler(response, timer)); + log.debug("Received [AccountData: {}] Sending [Account: {}]", shortDebugString(request), profile); + vertx.eventBus().send(MQTT_BUS, profile, new ReplyHandler(response, timer)); } @Override public void deleteAccount(ProfileData request, Future response) { Histogram.Timer timer = requestLatency.startTimer(); Profile profile = new Profile(request, StatusType.DELETE); - vertx.eventBus().send("mqtt-bus", profile, new ReplyHandler(response, timer)); + log.debug("Received [AccountData: {}] Sending [Account: {}]", shortDebugString(request), profile); + vertx.eventBus().send(MQTT_BUS, profile, new ReplyHandler(response, timer)); } }; diff --git a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java index 00e3f3053c3a0cb5d7d86ad43613735355217f69..df65a9938ad587140e08bdaa973e6245fe24e430 100644 --- a/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/MQTTVerticle.java @@ -1,161 +1,226 @@ package biz.nynja.bridge.verticle; +import biz.nynja.bridge.bert.OtpErlangParser; import biz.nynja.bridge.cache.DataEventsStatus; import biz.nynja.bridge.model.MQTTModelBase; -import biz.nynja.bridge.properties.ErlangBridgeConfiguration; +import biz.nynja.bridge.properties.MqttConfiguration; import io.netty.handler.codec.mqtt.MqttQoS; +import io.vertx.circuitbreaker.CircuitBreaker; +import io.vertx.circuitbreaker.CircuitBreakerOptions; import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; import io.vertx.core.buffer.Buffer; import io.vertx.mqtt.MqttClient; import io.vertx.mqtt.MqttClientOptions; +import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.slf4j.Logger; import org.springframework.stereotype.Component; +import java.util.Arrays; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; - -import static java.nio.charset.StandardCharsets.UTF_8; +import java.util.function.BiConsumer; +import java.util.function.Function; @Slf4j @Component public class MQTTVerticle extends AbstractVerticle { - private static final String MQTT_TOPIC = "events/1//api/anon//"; - private MqttClient client; - private final ErlangBridgeConfiguration erlangBridgeConfiguration; + public static final String MQTT_BUS = "mqtt-bus"; + private static final int DISCONNECT_DELAY = 5000; + private static final int RECONNECT_DELAY = 5000; + private final MqttConfiguration configuration; private final ConcurrentHashMap dataEventsStatusCache; - public MQTTVerticle(ErlangBridgeConfiguration erlangBridgeConfiguration) { - this.erlangBridgeConfiguration = erlangBridgeConfiguration; + private MqttClient client; + private MqttClientOptions clientOptions; + private CircuitBreaker circuitBreaker; + + public MQTTVerticle(MqttConfiguration configuration) { + this.configuration = configuration; + log.info("Configuration: {}", this.configuration); dataEventsStatusCache = new ConcurrentHashMap<>(); + boolean keepAliveEnabled = configuration.getKeepAliveInterval() > 10; + clientOptions = new MqttClientOptions() + .setAutoKeepAlive(keepAliveEnabled) + .setKeepAliveTimeSeconds(configuration.getKeepAliveInterval()) + .setCleanSession(false) + .setClientId(configuration.getClientId()); } @Override - public void start() throws Exception { + public void start(Future startFuture) + throws Exception { + initMqttClient(); initBus(); + initSubscriber(); + connectMqtt(startFuture); } - private void initMqttClient() { - MqttClientOptions options = new MqttClientOptions().setKeepAliveTimeSeconds(2).setClientId("sys_micro_bridge"); - - client = MqttClient.create(vertx, options); - initSubscriber(); + CircuitBreakerOptions options = new CircuitBreakerOptions() + .setMaxFailures(5) + .setTimeout(15000) + .setMaxRetries(10) + .setResetTimeout(30000); + circuitBreaker = CircuitBreaker.create("reconnect", vertx, options); + + client = MqttClient.create(vertx, clientOptions); client.subscribeCompletionHandler(h -> { - log.info("Receive SUBACK from server with granted QoS : " + h.grantedQoSLevels()); - + log.info("Receive SUBACK from server with granted QoS: {}", h.grantedQoSLevels()); }); client.unsubscribeCompletionHandler(h -> { log.debug("Receive UNSUBACK from server"); - vertx.setTimer(5000, l -> - client.disconnect(d -> log.error("Disconnected form server")) - ); + vertx.setTimer(DISCONNECT_DELAY, l -> + client.disconnect(d -> log.error("Disconnected form server"))); }); - - client.connect(erlangBridgeConfiguration.getPort(), erlangBridgeConfiguration.getHost(), ch -> { - if (ch.succeeded()) { - log.debug("Connected to a server"); - client.subscribe(MQTT_TOPIC, 0); - } else { - log.error("Failed to connect to a server, cause {}", ch.cause()); - } + client.pingResponseHandler(event -> log.debug("Pong received!")); + client.closeHandler(event -> { + log.warn("Connection lost!"); + circuitBreaker + .retryPolicy(integer -> { + log.debug("integer: {}", integer); + long delay = ((integer + 1) * RECONNECT_DELAY); + log.info("Reconnecting in: {}ms", delay); + return delay; + }) + .execute(this::connectMqtt); }); } + private void connectMqtt(Future future) { + log.debug("Connecting to server '{}:{}'...", + configuration.getHost(), configuration.getPort()); + + vertx.executeBlocking( + event -> { + client.connect(configuration.getPort(), configuration.getHost(), ch -> { + if (ch.succeeded()) { + log.debug("Connected to a server"); + client.subscribe(configuration.getTopic(), 0); + event.complete(); + } else { + log.error("Failed to connect to a server!", ch.cause()); + event.fail(ch.cause()); + } + }); + }, + result -> { + if (result.succeeded()) { + log.trace("calling future.complete"); + future.complete(); + } else { + log.trace("calling future.fail"); + future.fail(result.cause()); + } + }); + + } + private void initBus() { - vertx.eventBus().consumer("mqtt-bus", message -> { + vertx.eventBus().consumer(MQTT_BUS, message -> { MQTTModelBase model = message.body(); - log.info("Publish message with model id = {}", model.getId()); + + log.info("Publish message with model {}", model.toLogString()); + Future publishFuture = Future.future(); publishMqttMessage(publishFuture, model); + Future callBackFuture = Future.future(); - //because if we added after publishing, we can have race condition (event loop flow) - dataEventsStatusCache.put(model.getId().toString(), new DataEventsStatus(System.currentTimeMillis(), callBackFuture, DataEventsStatus.DataStatus.SENT)); + // because if we added after publishing, we can have race condition (event loop flow) + long currentTimeMillis = System.currentTimeMillis(); + DataEventsStatus dataEventsStatus = new DataEventsStatus( + currentTimeMillis, callBackFuture, DataEventsStatus.DataStatus.SENT); + dataEventsStatusCache.put(model.getId().toString(), dataEventsStatus); + publishFuture.setHandler(booleanAsyncResult -> { if (booleanAsyncResult.failed()) { message.reply(false); return; } callBackFuture.setHandler(result -> { - message.reply(true); - } - ); - vertx.setTimer(2000, timer -> { + message.reply(true); + }); + vertx.setTimer(configuration.getMessageResponseTimeout(), timer -> { if (!callBackFuture.isComplete()) { dataEventsStatusCache.remove(model.getId().toString()); - log.debug("callBack future for {} not complete after 2 second", model.getId()); + log.warn("Callback future for [id:{}] not complete after {} milliseconds", + model.getId(), configuration.getMessageResponseTimeout()); callBackFuture.complete(false); } }); - }); }); } private void publishMqttMessage(Future complete, MQTTModelBase mqttModelBase) { client.publish( - MQTT_TOPIC, + configuration.getTopic(), Buffer.buffer(mqttModelBase.getBertFormat()), MqttQoS.EXACTLY_ONCE, false, false, - s -> { + event -> { complete.complete(); }); - } private void initSubscriber() { client.publishHandler(publish -> { - String response = publish.payload().toString(UTF_8).replace("\r", ""); - log.info("Just received message on [ {} ] payload [ {} ] with QoS [ {} ]", publish.topicName(), response, publish.qosLevel()); - if (response.contains("error")) { - erlangErrorHandler(response); - } else { - erlangSuccessHandler(getIdFromSuccesResponse(response)); + final int messageId = publish.messageId(); + + log.info("[messageId:{}] Received on [topic:{}] with QoS [qos:{}]", + messageId, publish.topicName(), publish.qosLevel()); + + byte[] responseBytes = publish.payload().getBytes(); + log.trace("responseBytes: {}", Arrays.toString(responseBytes)); + + final OtpErlangParser parse = OtpErlangParser.parse(responseBytes); + + if (parse == null) { + log.error("[messageId:{}] Could not read response!", messageId); + return; + } + + log.debug("[messageId:{}] response: {}", + messageId, parse.getTuple()); + + String id = parse.getId(); + boolean isErrorResponse = parse.isErrorMessage(); + LogLevel logLevel = isErrorResponse ? LogLevel.ERROR : LogLevel.INFO; + + + DataEventsStatus eventStatus = dataEventsStatusCache.remove(id); + + if (Objects.isNull(eventStatus)) { + logLevel.log(log, "[messageId:{}] Response from erlang [id:{}] without duplicating in cache", + messageId, id); + return; } + if (eventStatus.getCallBack().isComplete()) { + logLevel.log(log, "[messageId:{}] Response from erlang [id:{}]. Future is complete ", + messageId, id); + return; + } + logLevel.log(log, "[messageId:{}] Calling callback for [id:{}] with result = {}", + messageId, id, !isErrorResponse); + + eventStatus.getCallBack().complete(!isErrorResponse); }); } - private void erlangSuccessHandler(String id) { - log.debug("Success response from erlang with id = {}", id); - DataEventsStatus eventStatus = dataEventsStatusCache.remove(id); - if (Objects.isNull(eventStatus)) { - log.error("Success response from erlang with id = {} without duplicating in cache", id); - return; - } - if (eventStatus.getCallBack().isComplete()) { - log.error("Success response from erlang with id = {}. Future is complete ", id); - return; - } - eventStatus.getCallBack().complete(true); - } + @RequiredArgsConstructor + private enum LogLevel { + INFO(l -> l::info), + ERROR(l -> l::error); - private void erlangErrorHandler(String response) { - String id = response.substring(response.length() - 36); - log.debug("Error from erlang with id = {}", id); - DataEventsStatus eventStatus = dataEventsStatusCache.remove(id); - if (Objects.isNull(eventStatus)) { - log.error("Error from erlang with id = {} without duplicating in cache", id); - return; - } - if (eventStatus.getCallBack().isComplete()) { - log.error("Error from erlang with id = {}. Future is complete ", id); - return; - } - eventStatus.getCallBack().complete(false); - } + private final Function> function; - private String getIdFromSuccesResponse(String response) { - if (response.contains("Profile")) { - //see bert payload structure - return response.substring(18, 18 + 36); - } else { - int necessaryIndex = response.indexOf("Roster") + "Roster".length(); - return response.substring(necessaryIndex + 5, necessaryIndex + 41); + public void log(Logger logger, String message, Object... objects) { + function.apply(logger).accept(message, objects); } } diff --git a/src/main/java/biz/nynja/bridge/verticle/PrometheusVerticle.java b/src/main/java/biz/nynja/bridge/verticle/PrometheusVerticle.java index 1a3bf2e524c607163692e583f1d189aa2fd1e81f..86680e060b1be7888dd65f71300da9ef9a01fd4b 100644 --- a/src/main/java/biz/nynja/bridge/verticle/PrometheusVerticle.java +++ b/src/main/java/biz/nynja/bridge/verticle/PrometheusVerticle.java @@ -14,33 +14,33 @@ import org.springframework.stereotype.Component; @Component public class PrometheusVerticle extends AbstractVerticle { - private final MetricsConfiguration configuration; - private HttpServer server; - - public PrometheusVerticle(MetricsConfiguration configuration) { - this.configuration = configuration; - } - - @Override - public void start(Future startFuture) throws Exception { - - HttpServerOptions serverOptions = new HttpServerOptions().setPort(configuration.getPort()); - server = vertx.createHttpServer(serverOptions); - - Router router = Router.router(vertx); - - router.route("/metrics").handler(new MetricsHandler()); - - server.requestHandler(router); - server.listen(handler -> { - if (handler.succeeded()) { - startFuture.complete(); - log.info("Metrics listening on port: {}", server.actualPort()); - } else { - startFuture.fail(handler.cause()); - log.error("Exception starting the Metrics server!", handler.cause()); - } - }); - } + private final MetricsConfiguration configuration; + private HttpServer server; + + public PrometheusVerticle(MetricsConfiguration configuration) { + this.configuration = configuration; + } + + @Override + public void start(Future startFuture) throws Exception { + + HttpServerOptions serverOptions = new HttpServerOptions().setPort(configuration.getPort()); + server = vertx.createHttpServer(serverOptions); + + Router router = Router.router(vertx); + + router.route("/metrics").handler(new MetricsHandler()); + + server.requestHandler(router); + server.listen(handler -> { + if (handler.succeeded()) { + startFuture.complete(); + log.info("Metrics listening on port: {}", server.actualPort()); + } else { + startFuture.fail(handler.cause()); + log.error("Exception starting the Metrics server!", handler.cause()); + } + }); + } } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 4a37524e809781c1a407a59f1c4f1c18d7f3bc94..ad88aed9484ce78b3855c0de235728003d97c50f 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -1,9 +1,23 @@ +spring: + output: + ansi: + enabled: always + +logging: + level: + biz: + nynja: trace + grpc: port: 6580 -erlang-bridge: - host: 35.234.110.93 +mqtt: + host: 34.221.152.42 port: 1883 + clientId: sys_micro_bridge_dev + topic: events/1//api/anon// + keepAliveInterval: 20 + messageResponseTimeout: 2000 metrics: port: 6680 diff --git a/src/main/resources/application-production.yml b/src/main/resources/application-production.yml index 2887c911d2b3c480705ad60285dedf1223cc7a77..ca3cd4f6e3a2d647e274af5d0e8ac53dcd36ee73 100644 --- a/src/main/resources/application-production.yml +++ b/src/main/resources/application-production.yml @@ -1,9 +1,22 @@ +spring: + output: + ansi: + enabled: never + +logging: + level: + root: info + grpc: port: ${GRPC_SERVER_PORT:6570} -erlang-bridge: +mqtt: host: ${ERLANG_HOST:35.234.110.93} port: ${ERLANG_PORT:1883} + clientId: ${MQTT_CLIENT_ID:sys_micro_bridge} + topic: ${MQTT_TOPIC:events/1//api/anon//} + keepAliveInterval: ${MQTT_KEEP_ALIVE:20} + messageResponseTimeout: ${MQTT_RESP_WAIT:2000} metrics: port: ${METRICS_PORT:6680} diff --git a/src/test/java/biz/nynja/bridge/test/BertTest.java b/src/test/java/biz/nynja/bridge/test/BertTest.java deleted file mode 100644 index 2cd9f0cd9486479d1557b03275c8e174611f1fa7..0000000000000000000000000000000000000000 --- a/src/test/java/biz/nynja/bridge/test/BertTest.java +++ /dev/null @@ -1,47 +0,0 @@ -package biz.nynja.bridge.test; - -import biz.nynja.bridge.grpc.AccountData; -import biz.nynja.bridge.grpc.ProfileData; -import biz.nynja.bridge.model.Account; -import biz.nynja.bridge.model.MQTTModelBase; -import biz.nynja.bridge.model.Profile; -import biz.nynja.bridge.model.StatusType; -import io.vertx.ext.unit.TestContext; -import io.vertx.ext.unit.junit.VertxUnitRunner; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; - -import java.util.Arrays; -import java.util.UUID; - -@RunWith(VertxUnitRunner.class) -public class BertTest { - - @Test - public void testProfileToBert(TestContext context) { - MQTTModelBase profile = new Profile(buildTestProfileData(), StatusType.CREATE); - Assert.assertArrayEquals(profile.getBertFormat(), new byte[]{-125, 104, 9, 100, 0, 7, 80, 114, 111, 102, 105, 108, 101, 109, 0, 0, 0, 36, 97, 97, 99, 56, 101, 101, 57, 52, 45, 100, 56, 49, 99, 45, 52, 53, 97, 48, 45, 56, 51, 50, 48, 45, 50, 97, 102, 48, 52, 57, 56, 57, 49, 55, 100, 101, 106, 108, 0, 0, 0, 1, 104, 14, 100, 0, 6, 82, 111, 115, 116, 101, 114, 109, 0, 0, 0, 36, 57, 51, 49, 98, 57, 50, 100, 51, 45, 53, 55, 98, 99, 45, 52, 49, 55, 54, 45, 98, 57, 49, 52, 45, 51, 54, 57, 54, 54, 53, 51, 48, 48, 99, 57, 100, 109, 0, 0, 0, 13, 116, 101, 115, 116, 70, 105, 114, 115, 116, 78, 97, 109, 101, 109, 0, 0, 0, 12, 116, 101, 115, 116, 76, 97, 115, 116, 78, 97, 109, 101, 106, 106, 106, 106, 106, 106, 109, 0, 0, 0, 36, 97, 97, 99, 56, 101, 101, 57, 52, 45, 100, 56, 49, 99, 45, 52, 53, 97, 48, 45, 56, 51, 50, 48, 45, 50, 97, 102, 48, 52, 57, 56, 57, 49, 55, 100, 101, 109, 0, 0, 0, 60, 104, 116, 116, 112, 115, 58, 47, 47, 98, 105, 112, 98, 97, 112, 46, 114, 117, 47, 119, 112, 45, 99, 111, 110, 116, 101, 110, 116, 47, 117, 112, 108, 111, 97, 100, 115, 47, 50, 48, 49, 56, 47, 48, 51, 47, 55, 87, 70, 80, 113, 71, 68, 122, 45, 48, 115, 46, 106, 112, 103, 106, 100, 0, 6, 99, 114, 101, 97, 116, 101, 106, 106, 106, 106, 106, 100, 0, 6, 99, 114, 101, 97, 116, 101}); - } - - @Test - public void testAccountToBert(TestContext context) { - MQTTModelBase account = new Account(buildTestAccountData("aac8ee94-d81c-45a0-8320-2af0498917de"), StatusType.CREATE); - Assert.assertArrayEquals(account.getBertFormat(), new byte[]{-125, 104, 14, 109, 0, 0, 0, 6, 82, 111, 115, 116, 101, 114, 109, 0, 0, 0, 36, 57, 51, 49, 98, 57, 50, 100, 51, 45, 53, 55, 98, 99, 45, 52, 49, 55, 54, 45, 98, 57, 49, 52, 45, 51, 54, 57, 54, 54, 53, 51, 48, 48, 99, 57, 100, 109, 0, 0, 0, 13, 116, 101, 115, 116, 70, 105, 114, 115, 116, 78, 97, 109, 101, 109, 0, 0, 0, 12, 116, 101, 115, 116, 76, 97, 115, 116, 78, 97, 109, 101, 106, 106, 106, 106, 106, 106, 109, 0, 0, 0, 36, 97, 97, 99, 56, 101, 101, 57, 52, 45, 100, 56, 49, 99, 45, 52, 53, 97, 48, 45, 56, 51, 50, 48, 45, 50, 97, 102, 48, 52, 57, 56, 57, 49, 55, 100, 101, 109, 0, 0, 0, 60, 104, 116, 116, 112, 115, 58, 47, 47, 98, 105, 112, 98, 97, 112, 46, 114, 117, 47, 119, 112, 45, 99, 111, 110, 116, 101, 110, 116, 47, 117, 112, 108, 111, 97, 100, 115, 47, 50, 48, 49, 56, 47, 48, 51, 47, 55, 87, 70, 80, 113, 71, 68, 122, 45, 48, 115, 46, 106, 112, 103, 104, 5, 100, 0, 4, 98, 101, 114, 116, 100, 0, 4, 116, 105, 109, 101, 98, 0, 0, 6, 10, 98, 0, 6, -31, 84, 98, 0, 3, 9, 88, 100, 0, 6, 99, 114, 101, 97, 116, 101}); - } - - private static ProfileData buildTestProfileData() { - String profileId = "aac8ee94-d81c-45a0-8320-2af0498917de"; - return ProfileData.newBuilder().setProfileId(profileId) - .setDefaultAccount(buildTestAccountData(profileId)) - .setLastUpdateTimestamp(Long.toString(System.currentTimeMillis())).build(); - } - - - private static AccountData buildTestAccountData(String profileId) { - return AccountData.newBuilder().setAccountId("931b92d3-57bc-4176-b914-369665300c9d") - .setFirstName("testFirstName").setLastName("testLastName").setProfileId(profileId) - .setUsername("username").setAvatar("https://bipbap.ru/wp-content/uploads/2018/03/7WFPqGDz-0s.jpg") - .setLastUpdateTimestamp("1546450900199").build(); - } -} diff --git a/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java b/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java index 7390f6c61e8617702f742b08c12a84a5c0a06090..949f892d8cb03635896323b277ea3b82918fff7b 100644 --- a/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java +++ b/src/test/java/biz/nynja/bridge/verticle/AccountBridgeClient.java @@ -6,34 +6,36 @@ import biz.nynja.bridge.grpc.BridgeSuccessResponse; import biz.nynja.bridge.grpc.ProfileData; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; +import io.grpc.StatusRuntimeException; +import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; -import static biz.nynja.bridge.grpc.AccountBridgeGrpc.newBlockingStub; -import static biz.nynja.bridge.grpc.AccountBridgeGrpc.newFutureStub; import static io.grpc.ManagedChannelBuilder.forAddress; public class AccountBridgeClient { - private final AccountBridgeGrpc.AccountBridgeBlockingStub bridgeStub; + private static final Logger logger = Logger.getLogger(AccountBridgeClient.class.getName()); + private final static int GRPC_PORT = 6580; + private final ManagedChannel channel; - private AccountBridgeGrpc.AccountBridgeFutureStub futureStub; + private final AccountBridgeGrpc.AccountBridgeBlockingStub bridgeStub; public AccountBridgeClient(String host, int port) { - this(forAddress(host, port)); + this(ManagedChannelBuilder.forAddress(host, port).usePlaintext()); } public AccountBridgeClient(ManagedChannelBuilder channelBuilder) { channel = channelBuilder.build(); - bridgeStub = newBlockingStub(channel); - futureStub = newFutureStub(channel); + bridgeStub = AccountBridgeGrpc.newBlockingStub(channel); } public void shutdown() throws InterruptedException { channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); } - public BridgeSuccessResponse createProfile(ProfileData request) { return bridgeStub.createProfile(request); } @@ -54,4 +56,124 @@ public class AccountBridgeClient { return bridgeStub.deleteAccount(request); } + public static void main(String[] args) + throws Exception { + + ManagedChannelBuilder channelBuilder = forAddress("localhost", GRPC_PORT).usePlaintext(); + AccountBridgeClient client = new AccountBridgeClient(channelBuilder); + +// String accountId = "d0025ee6-7acd-4e77-9095-cc5910aa8bc2"; + String accountId = UUID.randomUUID().toString(); +// String profileId = "63a7bafd-28e3-46dd-a25e-be194f99496e"; + String profileId = UUID.randomUUID().toString(); + + logger.log(Level.INFO, "accountId: " + accountId + ", profileId: " + profileId); + + try { + createProfile(client, profileId, accountId); +// createAccount(client, profileId, accountId); +// updateAccount(client, profileId, accountId); +// deleteAccount(client, profileId, accountId); + deleteProfile(client, profileId, accountId); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "RPC Response failed: {0}", e.getStatus()); + return; + } + + client.shutdown(); + } + + private static void createProfile(AccountBridgeClient client, String profileId, String accountId) + throws Exception { + + String currentTimeMillis = String.valueOf(System.currentTimeMillis()); + ProfileData request = ProfileData.newBuilder() + .setProfileId(profileId) + .setLastUpdateTimestamp(currentTimeMillis) + .setDefaultAccount(AccountData.newBuilder() + .setAccountId(accountId) + .setProfileId(profileId) + .setFirstName("Test") + .setLastName("Mest") + .setUsername("test") + .setLastUpdateTimestamp(currentTimeMillis) + .build()) + .build(); + + client.createProfile(request); + } + + private static void createAccount(AccountBridgeClient client, String profileId, String accountId) + throws Exception { + + String currentTimeMillis = String.valueOf(System.currentTimeMillis()); + AccountData request = AccountData.newBuilder() + .setProfileId(profileId) + .setLastUpdateTimestamp(currentTimeMillis) + .setAccountId(accountId) + .setFirstName("Test") + .setLastName("Mest") + .setUsername("test") + .build(); + + client.createAccount(request); + } + + private static void updateAccount(AccountBridgeClient client, String profileId, String accountId) + throws Exception { + + String currentTimeMillis = Long.toString(System.currentTimeMillis()); + AccountData request = AccountData.newBuilder() + .setLastUpdateTimestamp(currentTimeMillis) + .setProfileId(profileId) + .setAccountId(accountId) + .setFirstName("Test3") + .setLastName("Mest4") + .setUsername("test") + .build(); + + client.updateAccount(request); + } + + private static void deleteAccount(AccountBridgeClient client, String profileId, String accountId) + throws Exception { + + String currentTimeMillis = String.valueOf(System.currentTimeMillis()); + ProfileData request = ProfileData.newBuilder() + .setProfileId(profileId) + .setLastUpdateTimestamp(currentTimeMillis) + .setDefaultAccount(AccountData.newBuilder() + .setProfileId(profileId) + .setAccountId(accountId) + .setFirstName("Test") + .setLastName("Mest") + .setUsername("test") + .setLastUpdateTimestamp(currentTimeMillis) + .build()) + .build(); + + client.deleteAccount(request); + } + + private static void deleteProfile(AccountBridgeClient client, String profileId, String accountId) + throws Exception { + + String currentTimeMillis = String.valueOf(System.currentTimeMillis()); + ProfileData request = ProfileData.newBuilder() + .setProfileId(profileId) + .addAccountsIds(accountId) + .setLastUpdateTimestamp(currentTimeMillis) + .setDefaultAccount(AccountData.newBuilder() + .setProfileId(profileId) + .setAccountId(accountId) + .setFirstName("Test") + .setLastName("Mest") + .setUsername("test") + .setLastUpdateTimestamp(currentTimeMillis) + .build()) + .build(); + + client.deleteProfile(request); + } + } diff --git a/src/test/java/biz/nynja/bridge/verticle/GRPCVerticleTest.java b/src/test/java/biz/nynja/bridge/verticle/GRPCVerticleTest.java index d2b356c7386548962e254faf52d16e9408c99704..aae1d683d98dc6bbc252c5bf74f3905409bbf1c6 100644 --- a/src/test/java/biz/nynja/bridge/verticle/GRPCVerticleTest.java +++ b/src/test/java/biz/nynja/bridge/verticle/GRPCVerticleTest.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import static biz.nynja.bridge.verticle.MQTTVerticle.MQTT_BUS; import static io.grpc.ManagedChannelBuilder.forAddress; @RunWith(VertxUnitRunner.class) @@ -52,7 +53,7 @@ public class GRPCVerticleTest { vertx.eventBus().registerDefaultCodec(Profile.class, new ProfileCodec()); vertx.eventBus().registerDefaultCodec(Account.class, new AccountCodec()); - vertx.eventBus().consumer("mqtt-bus", message -> { + vertx.eventBus().consumer(MQTT_BUS, message -> { vertx.setTimer(500, timer -> { message.reply(true); });