diff --git a/pom.xml b/pom.xml
index 7a41f5d74bc1599557bcc375794b4a8ce12f2456..4dff9bb490a86de420406dcc92276ca73dcd03af 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,6 +111,18 @@
+
+ libs-snapshot-local.biz.nynja.protos
+ bridge-service-ny-5863-bridge-service
+ 1.0-SNAPSHOT
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
com.googlecode.libphonenumber
libphonenumber
diff --git a/src/main/java/biz/nynja/account/repositories/AccountRepositoryAdditionalImpl.java b/src/main/java/biz/nynja/account/repositories/AccountRepositoryAdditionalImpl.java
index 79ca704f5c96171532ff02a1b6269651434300a7..e480ba45f1736527bc7317226bdca976c938e4b0 100644
--- a/src/main/java/biz/nynja/account/repositories/AccountRepositoryAdditionalImpl.java
+++ b/src/main/java/biz/nynja/account/repositories/AccountRepositoryAdditionalImpl.java
@@ -15,6 +15,9 @@ import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
+import biz.nynja.account.repositories.batch.SagaTransaction;
+import biz.nynja.account.repositories.batch.Transaction;
+import biz.nynja.account.services.erlang.ErlangAccountBridge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.cassandra.core.CassandraBatchOperations;
@@ -71,7 +74,7 @@ public class AccountRepositoryAdditionalImpl implements AccountRepositoryAdditio
private final PendingAccountRepository pendingAccountRepository;
private final AccountServiceHelper accountServiceHelper;
private final Session session;
- private final ErlangAccountHttpBridge erlangAccountBridge;
+ private final ErlangAccountBridge erlangAccountBridge;
private final PermissionsValidator permissionsValidator;
public AccountRepositoryAdditionalImpl(PendingAccountConfiguration pendingAccountConfiguration,
@@ -82,7 +85,7 @@ public class AccountRepositoryAdditionalImpl implements AccountRepositoryAdditio
ProfileByAuthenticationProviderRepository profileByAuthenticationProviderRepository,
PendingAccountByAuthenticationProviderRepository pendingAccountByAuthenticationProviderRepository,
PendingAccountRepository pendingAccountRepository, AccountServiceHelper accountServiceHelper,
- Session session, ErlangAccountHttpBridge erlangAccountBridge, PermissionsValidator permissionsValidator,
+ Session session, ErlangAccountBridge erlangAccountBridge, PermissionsValidator permissionsValidator,
AccountDataConfiguration accountDataConfiguration) {
this.pendingAccountConfiguration = pendingAccountConfiguration;
this.cassandraTemplate = cassandraTemplate;
@@ -103,7 +106,7 @@ public class AccountRepositoryAdditionalImpl implements AccountRepositoryAdditio
}
public Account completePendingAccountCreation(CompletePendingAccountCreationRequest request) {
- CassandraBatchOperations batchOperations = cassandraTemplate.batchOps();
+ Transaction sagaTransaction = new SagaTransaction(cassandraTemplate);
PendingAccount pendingAccount = pendingAccountRepository
.findByAccountId(UUID.fromString(request.getAccountId()));
if (pendingAccount == null) {
@@ -134,10 +137,15 @@ public class AccountRepositoryAdditionalImpl implements AccountRepositoryAdditio
Long timeCreated = Instant.now().toEpochMilli();
WriteResult wr;
try {
- newAccountInsert(batchOperations, request, pendingAccount, timeCreated);
- newProfileByAuthenticationProviderInsert(batchOperations, pendingAccount);
- newProfileInsert(batchOperations, request, pendingAccount, timeCreated);
- wr = batchOperations.execute();
+ Account account = newAccountInsert(sagaTransaction, request, pendingAccount, timeCreated);
+ newProfileByAuthenticationProviderInsert(sagaTransaction, pendingAccount);
+ Profile profile = newProfileInsert(sagaTransaction, request, pendingAccount, timeCreated);
+ wr = sagaTransaction.execute();
+ if (!erlangAccountBridge.createProfile(profile, account)) {
+ logger.error("Internal error with erlang");
+ sagaTransaction.rollBack();
+ return null;
+ }
} catch (IllegalArgumentException | IllegalStateException e) {
logger.info("Exception while completing pending account creation.");
logger.debug("Exception while completing pending account creation: {} ...", e.getMessage());
@@ -219,7 +227,7 @@ public class AccountRepositoryAdditionalImpl implements AccountRepositoryAdditio
}
public boolean deleteAccount(UUID accountId) {
- CassandraBatchOperations batchOperations = cassandraTemplate.batchOps();
+ Transaction sagaTransaction = new SagaTransaction(cassandraTemplate);
Account existingAccount = accountRepository.findByAccountId(accountId);
if (!doExistAccountAndProfileToDelete(accountId)) {
return false;
@@ -235,28 +243,34 @@ public class AccountRepositoryAdditionalImpl implements AccountRepositoryAdditio
boolean alsoDeleteProfile = existingAccountsForProfile.size() == 1;
WriteResult wr = null;
try {
- deleteAccountData(batchOperations, existingAccount);
+ deleteAccountData(sagaTransaction, existingAccount);
if (alsoDeleteProfile) {
if (existingProfile.getAuthenticationProviders() != null
&& existingProfile.getAuthenticationProviders().size() > 0) {
- deleteAuthenticationProvidersFromProfile(batchOperations, existingProfile.getProfileId(),
+ deleteAuthenticationProvidersFromProfile(sagaTransaction, existingProfile.getProfileId(),
existingProfile.getAuthenticationProviders());
}
- deleteProfileData(batchOperations, existingProfile);
+ deleteProfileData(sagaTransaction, existingProfile);
} else {
if (existingAccount.getAuthenticationProviderType() != null
&& existingAccount.getAuthenticationProvider() != null) {
- deleteProfileByAuthenticationProvider(batchOperations, existingAccount.getProfileId(),
+ deleteProfileByAuthenticationProvider(sagaTransaction, existingAccount.getProfileId(),
AuthenticationProvider.createAuthenticationProviderFromStringsWithDefaultSearchableOption(
existingAccount.getAuthenticationProviderType(),
existingAccount.getAuthenticationProvider()));
}
- if (!removeCreationProvider(batchOperations, existingAccount, existingProfile)) {
+ if (!removeCreationProvider(sagaTransaction, existingAccount, existingProfile)) {
logger.error("Error deleting account {}", existingAccount.getAccountId());
return false;
}
}
- wr = batchOperations.execute();
+ wr = sagaTransaction.execute();
+
+ if (!erlangAccountBridge.deleteAccount(existingProfile.getProfileId(), accountId)) {
+ logger.error("Internal error with erlang");
+ sagaTransaction.rollBack();
+ return false;
+ }
} catch (IllegalArgumentException | IllegalStateException e) {
logger.error("Exception while deleting account: {}.", e.getMessage());
return false;
@@ -461,7 +475,7 @@ public class AccountRepositoryAdditionalImpl implements AccountRepositoryAdditio
return Optional.of(Cause.ERROR_DELETING_PROFILE);
}
- private void deleteProfileAccountsWhenDeletingProfile(CassandraBatchOperations batchOperations,
+ private void deleteProfileAccountsWhenDeletingProfile(UUID profileId, CassandraBatchOperations batchOperations,
List existingAccountsForProfile) {
for (AccountByProfileId accountByProfileId : existingAccountsForProfile) {
Account existingAccount = accountRepository.findByAccountId(accountByProfileId.getAccountId());
diff --git a/src/main/java/biz/nynja/account/services/erlang/ErlangAccountBridge.java b/src/main/java/biz/nynja/account/services/erlang/ErlangAccountBridge.java
index 803db0ed0bfad8c77cd95b663424be7e786790a9..f30b96de8e78a525154c2a33ca21fdac1631e159 100644
--- a/src/main/java/biz/nynja/account/services/erlang/ErlangAccountBridge.java
+++ b/src/main/java/biz/nynja/account/services/erlang/ErlangAccountBridge.java
@@ -6,18 +6,18 @@ package biz.nynja.account.services.erlang;
import biz.nynja.account.models.Account;
import biz.nynja.account.models.Profile;
+import java.util.List;
import java.util.UUID;
-
public interface ErlangAccountBridge {
boolean createProfile(Profile profile, Account defaultAccount);
- boolean deleteProfile(UUID profileId);
+ boolean deleteProfile(UUID profileId, List accountsIds);
boolean createAccount(Account account);
boolean updateAccount(Account account);
- boolean deleteAccount(UUID accountId);
+ boolean deleteAccount(UUID profileId, UUID accountId);
}
diff --git a/src/main/java/biz/nynja/account/services/erlang/ErlangAccountHttpBridge.java b/src/main/java/biz/nynja/account/services/erlang/ErlangAccountHttpBridge.java
index aa88d2f435dbf10bc6552b1611965abf54070b8b..84f6e7ebac755f554f89a2e472f853d86eef771c 100644
--- a/src/main/java/biz/nynja/account/services/erlang/ErlangAccountHttpBridge.java
+++ b/src/main/java/biz/nynja/account/services/erlang/ErlangAccountHttpBridge.java
@@ -6,6 +6,7 @@ package biz.nynja.account.services.erlang;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
+import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
@@ -22,8 +23,6 @@ import biz.nynja.account.models.Account;
import biz.nynja.account.models.Profile;
import biz.nynja.account.services.erlang.connector.HttpClient;
-
-
@Service
// TODO: 11/19/2018 change boolean response when ENC will implement return error part
public class ErlangAccountHttpBridge implements ErlangAccountBridge {
@@ -46,9 +45,9 @@ public class ErlangAccountHttpBridge implements ErlangAccountBridge {
profileURL = null;
return;
}
- accountURL = new URL(
+ accountURL = new URL("http://" +
erlangBridgeConfiguration.getHost() + ":" + erlangBridgeConfiguration.getPort() + "/swm/account");
- profileURL = new URL(
+ profileURL = new URL("http://" +
erlangBridgeConfiguration.getHost() + ":" + erlangBridgeConfiguration.getPort() + "/swm/profile");
}
@@ -66,7 +65,7 @@ public class ErlangAccountHttpBridge implements ErlangAccountBridge {
}
@Override
- public boolean deleteProfile(UUID profileId) {
+ public boolean deleteProfile(UUID profileId, List accountsIds) {
if (!erlangBridgeConfiguration.isEnabled())
return true;
JsonObject profileObject = new JsonObject();
@@ -77,7 +76,7 @@ public class ErlangAccountHttpBridge implements ErlangAccountBridge {
return false;
}
- return false;
+ return true;
}
@Override
@@ -99,7 +98,7 @@ public class ErlangAccountHttpBridge implements ErlangAccountBridge {
}
@Override
- public boolean deleteAccount(UUID accountId) {
+ public boolean deleteAccount(UUID profileId, UUID accountId) {
if (!erlangBridgeConfiguration.isEnabled())
return true;
JsonObject accountObject = new JsonObject();
@@ -109,7 +108,7 @@ public class ErlangAccountHttpBridge implements ErlangAccountBridge {
} catch (IOException e) {
return false;
}
- return false;
+ return true;
}
private JsonObject prepareProfileJsonObject(Profile profile, Account account) {
diff --git a/src/main/java/biz/nynja/account/services/erlang/ErlangAccountMqttBridge.java b/src/main/java/biz/nynja/account/services/erlang/ErlangAccountMqttBridge.java
new file mode 100644
index 0000000000000000000000000000000000000000..0bd0d70e562d81ed6c5f5945bb484cd1bb4bf46d
--- /dev/null
+++ b/src/main/java/biz/nynja/account/services/erlang/ErlangAccountMqttBridge.java
@@ -0,0 +1,151 @@
+/**
+ * Copyright (C) 2018 Nynja Inc. All rights reserved.
+ */
+package biz.nynja.account.services.erlang;
+
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import org.springframework.context.annotation.Primary;
+import org.springframework.stereotype.Component;
+
+import biz.nynja.account.configuration.ErlangBridgeConfiguration;
+import biz.nynja.account.models.Account;
+import biz.nynja.account.models.Profile;
+import biz.nynja.account.services.erlang.interceptor.TokenInterceptorConstants;
+import biz.nynja.bridge.grpc.AccountBridgeGrpc;
+import biz.nynja.bridge.grpc.AccountData;
+import biz.nynja.bridge.grpc.BridgeSuccessResponse;
+import biz.nynja.bridge.grpc.ProfileData;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.Metadata;
+import io.grpc.stub.MetadataUtils;
+
+@Component
+@Primary
+public class ErlangAccountMqttBridge implements ErlangAccountBridge {
+
+ private final ErlangBridgeConfiguration erlangBridgeConfiguration;
+
+ public ErlangAccountMqttBridge(ErlangBridgeConfiguration erlangBridgeConfiguration) throws MalformedURLException {
+ this.erlangBridgeConfiguration = erlangBridgeConfiguration;
+ }
+
+ @Override
+ public boolean createProfile(Profile profile, Account account) {
+
+ if (!erlangBridgeConfiguration.isEnabled())
+ return true;
+ ProfileData profileData = buildProfileData(profile, account);
+ // todo update after testing with real connection
+ BridgeSuccessResponse response = buildGrpcConnection().createProfile(profileData);
+ return true;
+ }
+
+ @Override
+ public boolean deleteProfile(UUID profileId, List accountsIds) {
+ if (!erlangBridgeConfiguration.isEnabled())
+ return true;
+ BridgeSuccessResponse response = buildGrpcConnection()
+ .deleteProfile(buildDeleteProfileData(profileId, (UUID[]) accountsIds.toArray()));
+ return true;
+ }
+
+ @Override
+ public boolean createAccount(Account account) {
+ if (!erlangBridgeConfiguration.isEnabled())
+ return true;
+ BridgeSuccessResponse response = buildGrpcConnection().createAccount(buildAccountData(account));
+
+ return true;
+ }
+
+ @Override
+ public boolean updateAccount(Account account) {
+ if (!erlangBridgeConfiguration.isEnabled())
+ return true;
+
+ return true;
+ }
+
+ @Override
+ public boolean deleteAccount(UUID profileId, UUID accountId) {
+ if (!erlangBridgeConfiguration.isEnabled())
+ return true;
+ BridgeSuccessResponse response = buildGrpcConnection()
+ .deleteAccount(buildDeleteProfileData(profileId, accountId));
+ return true;
+ }
+
+ private ProfileData buildProfileData(Profile profile, Account account) {
+ return ProfileData.newBuilder().setProfileId(profile.getProfileId().toString())
+ .setDefaultAccount(buildAccountData(account))
+ .setLastUpdateTimestamp(profile.getCreationTimestamp().toString()).build();
+ }
+
+ private AccountData buildAccountData(Account account) {
+ return AccountData.newBuilder().setAccountId(account.getAccountId().toString())
+ .setFirstName(account.getFirstName()).setLastName(account.getLastName())
+ .setProfileId(account.getProfileId().toString())
+ .setUsername(account.getUsername()).setAvatar(account.getAvatar())
+ .setLastUpdateTimestamp(
+ Objects.isNull(account.getLastUpdateTimestamp()) ? Long.toString(System.currentTimeMillis())
+ : account.getLastUpdateTimestamp().toString())
+ .build();
+ }
+
+ // Erlang protocol
+ private ProfileData buildDeleteProfileData(UUID profileId, UUID... accountsId) {
+ return ProfileData.newBuilder().setProfileId(profileId.toString())
+ .addAllAccountsIds(Arrays.stream(accountsId).map(UUID::toString).collect(Collectors.toList())).build();
+ }
+
+ private AccountBridgeGrpc.AccountBridgeBlockingStub buildGrpcConnection() {
+ ManagedChannel managedChannel = ManagedChannelBuilder.forAddress(erlangBridgeConfiguration.getHost(),
+ Integer.parseInt(erlangBridgeConfiguration.getPort())).usePlaintext().build();
+ AccountBridgeGrpc.AccountBridgeBlockingStub bridgeServiceBlockingStub = AccountBridgeGrpc
+ .newBlockingStub(managedChannel);
+ return MetadataUtils.attachHeaders(bridgeServiceBlockingStub, getHeaders());
+ }
+
+ /*
+ * public StatusResponse updateAuthProvider(String profileId, AuthProviderDetails details, SidType sidTypeToRemove,
+ * String sidToRemove) {
+ *
+ * UpdateAuthenticationProviderRequest request = UpdateAuthenticationProviderRequest.newBuilder()
+ * .setProfileId(profileId)
+ * .setOldAuthProvider(AuthProviderDetails.newBuilder().setAuthenticationProvider(sidToRemove)
+ * .setAuthenticationType(AuthenticationType.valueOf(sidTypeToRemove.name()))) .setUpdatedAuthProvider(
+ * AuthProviderDetails.newBuilder().setAuthenticationProvider(details.getAuthenticationProvider())
+ * .setAuthenticationType(details.getAuthenticationType())) .build(); ManagedChannel managedChannel =
+ * ManagedChannelBuilder.forAddress(accountServiceAddress, accountServicePort) .usePlaintext().build();
+ *
+ * AccountServiceGrpc.AccountServiceBlockingStub accountServiceBlockingStub = AccountServiceGrpc
+ * .newBlockingStub(managedChannel);
+ *
+ * Metadata headers = getHeaders(); accountServiceBlockingStub =
+ * MetadataUtils.attachHeaders(accountServiceBlockingStub, headers);
+ *
+ * StatusResponse response = accountServiceBlockingStub.updateAuthenticationProviderForProfile(request); return
+ * response; }
+ *
+ */
+
+ /**
+ * Attaches the access token to the rpc header
+ *
+ * @return
+ * @throws InternalError
+ */
+ private Metadata getHeaders() throws InternalError {
+ Metadata headers = new Metadata();
+ Metadata.Key key = Metadata.Key.of("accessToken", Metadata.ASCII_STRING_MARSHALLER);
+ headers.put(key, "Bearer " + TokenInterceptorConstants.ACCESS_TOKEN_CTX.get());
+ return headers;
+ }
+}
diff --git a/src/main/java/biz/nynja/account/services/erlang/interceptor/TokenInterceptor.java b/src/main/java/biz/nynja/account/services/erlang/interceptor/TokenInterceptor.java
new file mode 100644
index 0000000000000000000000000000000000000000..67de571509ab08d80f99da93b900722d325cdaed
--- /dev/null
+++ b/src/main/java/biz/nynja/account/services/erlang/interceptor/TokenInterceptor.java
@@ -0,0 +1,19 @@
+/**
+ * Copyright (C) 2018 Nynja Inc. All rights reserved.
+ */
+package biz.nynja.account.services.erlang.interceptor;
+
+import io.grpc.*;
+import org.lognet.springboot.grpc.GRpcGlobalInterceptor;
+
+@GRpcGlobalInterceptor
+public class TokenInterceptor implements ServerInterceptor {
+
+ @Override
+ public ServerCall.Listener interceptCall(ServerCall serverCall, Metadata metadata,
+ ServerCallHandler serverCallHandler) {
+ String accessToken = metadata.get(TokenInterceptorConstants.ACCESS_TOKEN_METADATA);
+ Context ctx = Context.current().withValue(TokenInterceptorConstants.ACCESS_TOKEN_CTX, accessToken);
+ return Contexts.interceptCall(ctx, serverCall, metadata, serverCallHandler);
+ }
+}
diff --git a/src/main/java/biz/nynja/account/services/erlang/interceptor/TokenInterceptorConstants.java b/src/main/java/biz/nynja/account/services/erlang/interceptor/TokenInterceptorConstants.java
new file mode 100644
index 0000000000000000000000000000000000000000..5df55f124e6950e9266d867261ea6aefe2cf3f02
--- /dev/null
+++ b/src/main/java/biz/nynja/account/services/erlang/interceptor/TokenInterceptorConstants.java
@@ -0,0 +1,11 @@
+package biz.nynja.account.services.erlang.interceptor;
+
+import io.grpc.Context;
+import io.grpc.Metadata;
+
+import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
+
+public class TokenInterceptorConstants {
+ public static final Metadata.Key ACCESS_TOKEN_METADATA = Metadata.Key.of("accessToken", ASCII_STRING_MARSHALLER);
+ public static final Context.Key ACCESS_TOKEN_CTX = Context.key("accessToken");
+}
diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml
index a958a6a4086e848ddca21f3bbe63bb2a3d8ec0e3..86f0ed5b5e06b20cbb0b7718a29f79e41c339dc7 100644
--- a/src/main/resources/application-dev.yml
+++ b/src/main/resources/application-dev.yml
@@ -41,9 +41,9 @@ profile-data:
max-authenticationproviders-per-profile: 20
erlang-bridge:
- enable: false;
- ip:
- port:
+ enabled: true
+ host: localhost
+ port: 6580
#Metrics related configurations
management: