diff --git a/charts/account-service/Chart.yaml b/charts/account-service/Chart.yaml
index b0c2b6318cb25894ebc1539b201fe3287d32a489..5f041fe4151402092bee5f4ebab5819f4698b57e 100644
--- a/charts/account-service/Chart.yaml
+++ b/charts/account-service/Chart.yaml
@@ -2,4 +2,4 @@ apiVersion: v1
appVersion: "1.0"
description: Deployment of the nynja account service.
name: account-service
-version: 0.1.7
+version: 0.1.8
diff --git a/pom.xml b/pom.xml
index bca61689965759ab68561d55244fb3607567964c..e3c914b8c50481e00def69206f602fdf6187ffaf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -184,6 +184,11 @@
1.4.0
+
+ org.springframework.kafka
+ spring-kafka
+ 2.2.7.RELEASE
+
diff --git a/releases/staging/account-service.yaml b/releases/staging/account-service.yaml
index 17d4d94ec5d27fb6718e2e6b7a2f13e93067303e..913f070897881fbb54c330ac25b9eca6acb02118 100644
--- a/releases/staging/account-service.yaml
+++ b/releases/staging/account-service.yaml
@@ -8,7 +8,7 @@ spec:
chart:
repository: https://nynjagroup.jfrog.io/nynjagroup/helm/
name: account-service
- version: 0.1.7
+ version: 0.1.8
values:
replicaCount: 2
diff --git a/src/main/java/biz/nynja/account/Application.java b/src/main/java/biz/nynja/account/Application.java
index cffdaaf1b5f28ec8a9924d0f0389a0cd29676a73..135dda199aaf2782d8e9b958983107f0fa995a66 100644
--- a/src/main/java/biz/nynja/account/Application.java
+++ b/src/main/java/biz/nynja/account/Application.java
@@ -8,11 +8,12 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
/**
* Main entry point class.
*/
-@SpringBootApplication
+@SpringBootApplication(exclude = KafkaAutoConfiguration.class)
public class Application {
private static final Logger LOGGER = LogManager.getLogger(Application.class);
diff --git a/src/main/java/biz/nynja/account/kafka/Constants.java b/src/main/java/biz/nynja/account/kafka/Constants.java
new file mode 100644
index 0000000000000000000000000000000000000000..0205d5c6b54eddf28cbedeaf8e5329e7427fd259
--- /dev/null
+++ b/src/main/java/biz/nynja/account/kafka/Constants.java
@@ -0,0 +1,11 @@
+/**
+ * Copyright (C) 2018 Nynja Inc. All rights reserved.
+ */
+package biz.nynja.account.kafka;
+
+public interface Constants {
+
+ interface EventType {
+ String SIGN_UP = "signup";
+ }
+}
diff --git a/src/main/java/biz/nynja/account/kafka/KafkaProducerConfig.java b/src/main/java/biz/nynja/account/kafka/KafkaProducerConfig.java
new file mode 100644
index 0000000000000000000000000000000000000000..b9f2ebc2cd176cf6e2e1e84a0dec842941df02fe
--- /dev/null
+++ b/src/main/java/biz/nynja/account/kafka/KafkaProducerConfig.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2018 Nynja Inc. All rights reserved.
+ */
+package biz.nynja.account.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+@Configuration
+public class KafkaProducerConfig {
+
+ @Value("${kafka.host}")
+ private String KAFKA_HOST;
+ @Value("${kafka.port}")
+ private String KAFKA_PORT;
+ @Value("${kafka.topic}")
+ private String KAFKA_TOPIC;
+
+ @Bean(name = "kafkaTopic")
+ public String kafkaTopic() {
+ return KAFKA_TOPIC;
+ }
+
+ // ======= KAFKA PRODUCER CONFIGURATION ===========
+
+ @Bean
+ public ProducerFactory producerFactory() {
+
+ Map configProps = new HashMap(3);
+
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT);
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+
+ return new DefaultKafkaProducerFactory(configProps);
+
+ }
+
+ @Bean
+ public KafkaTemplate kafkaTemplate() {
+ return new KafkaTemplate(producerFactory());
+ }
+
+}
diff --git a/src/main/java/biz/nynja/account/kafka/UserEventsMessageTemplate.java b/src/main/java/biz/nynja/account/kafka/UserEventsMessageTemplate.java
new file mode 100644
index 0000000000000000000000000000000000000000..0572237b3c552d78cd5266262cb4dedd016759a6
--- /dev/null
+++ b/src/main/java/biz/nynja/account/kafka/UserEventsMessageTemplate.java
@@ -0,0 +1,47 @@
+/**
+ * Copyright (C) 2018 Nynja Inc. All rights reserved.
+ */
+package biz.nynja.account.kafka;
+
+public class UserEventsMessageTemplate {
+
+ private String account_id;
+ private String event_type;
+ private String refercode;
+
+ public UserEventsMessageTemplate(String account_id, String event_type, String refercode) {
+ this.account_id = account_id;
+ this.event_type = event_type;
+ this.refercode = refercode;
+ }
+
+ public String getRefercode() {
+ return refercode;
+ }
+
+ public void setRefercode(String refercode) {
+ this.refercode = refercode;
+ }
+
+ public String getAccount_id() {
+ return account_id;
+ }
+
+ public void setAccount_id(String account_id) {
+ this.account_id = account_id;
+ }
+
+ public String getEvent_type() {
+ return event_type;
+ }
+
+ public void setEvent_type(String event_type) {
+ this.event_type = event_type;
+ }
+
+ @Override
+ public String toString() {
+ return "UserEventsMessageTemplate{" + "account_id='" + account_id + '\'' + ", event_type='" + event_type + '\''
+ + ", refercode='" + refercode + '\'' + '}';
+ }
+}
diff --git a/src/main/java/biz/nynja/account/kafka/UserEventsProducer.java b/src/main/java/biz/nynja/account/kafka/UserEventsProducer.java
new file mode 100644
index 0000000000000000000000000000000000000000..1d087de05f2c48a2f21a9befd276b234b86e962c
--- /dev/null
+++ b/src/main/java/biz/nynja/account/kafka/UserEventsProducer.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright (C) 2018 Nynja Inc. All rights reserved.
+ */
+package biz.nynja.account.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+@Component
+public class UserEventsProducer {
+
+ private static final Logger logger = LoggerFactory.getLogger(UserEventsProducer.class);
+
+ @Autowired
+ private KafkaTemplate kafkaTemplate;
+
+ @Autowired
+ @Qualifier("kafkaTopic")
+ private String kafkaTopic;
+
+ public void sendKafkaMessage(UserEventsMessageTemplate userEventsMessageTemplate) {
+ kafkaTemplate.send(kafkaTopic, userEventsMessageTemplate);
+
+ logger.info("Message sent to kafka topic : {}", kafkaTopic);
+
+ }
+}
diff --git a/src/main/java/biz/nynja/account/services/AccountServiceImpl.java b/src/main/java/biz/nynja/account/services/AccountServiceImpl.java
index 457c7841b693d981ebf92951980815dc5ca4e339..7d0da1a9ecb51414f0d34916d83927069f43f5d3 100644
--- a/src/main/java/biz/nynja/account/services/AccountServiceImpl.java
+++ b/src/main/java/biz/nynja/account/services/AccountServiceImpl.java
@@ -55,6 +55,9 @@ import biz.nynja.account.grpc.StatusResponse;
import biz.nynja.account.grpc.UpdateAccountRequest;
import biz.nynja.account.grpc.UpdateAuthenticationProviderRequest;
import biz.nynja.account.grpc.UpdateSearchableOptionRequest;
+import biz.nynja.account.kafka.Constants;
+import biz.nynja.account.kafka.UserEventsMessageTemplate;
+import biz.nynja.account.kafka.UserEventsProducer;
import biz.nynja.account.models.Account;
import biz.nynja.account.models.AccountByProfileId;
import biz.nynja.account.models.AccountByQrCode;
@@ -107,6 +110,7 @@ public class AccountServiceImpl extends AccountServiceGrpc.AccountServiceImplBas
private final PermissionsValidator permissionsValidator;
private final ProfileDataConfiguration profileDataConfiguration;
private final AccessPointService accessPointService;
+ private final UserEventsProducer userEventsProducer;
public AccountServiceImpl(AccountRepositoryAdditional accountRepositoryAdditional,
ProfileRepository profileRepository,
@@ -115,7 +119,8 @@ public class AccountServiceImpl extends AccountServiceGrpc.AccountServiceImplBas
AccountByUsernameRepository accountByUsernameRepository, AccountProvider accountProvider,
AccountByProfileIdRepository accountByProfileIdRepository, PhoneNumberNormalizer phoneNumberNormalizer,
AccountCreator accountCreator, ProfileProvider profileProvider, PermissionsValidator permissionsValidator,
- ProfileDataConfiguration profileDataConfiguration, AccessPointService accessPointService) {
+ ProfileDataConfiguration profileDataConfiguration, AccessPointService accessPointService,
+ UserEventsProducer userEventsProducer) {
this.accountRepositoryAdditional = accountRepositoryAdditional;
this.profileRepository = profileRepository;
this.profileByAutheticationProviderRepository = profileByAutheticationProviderRepository;
@@ -129,6 +134,7 @@ public class AccountServiceImpl extends AccountServiceGrpc.AccountServiceImplBas
this.permissionsValidator = permissionsValidator;
this.profileDataConfiguration = profileDataConfiguration;
this.accessPointService = accessPointService;
+ this.userEventsProducer = userEventsProducer;
}
@Override
@@ -499,6 +505,15 @@ public class AccountServiceImpl extends AccountServiceGrpc.AccountServiceImplBas
logger.debug("Complete pending account creation...: {} ...", request);
AccountResponse response = accountCreator.retrieveCompletePendingAccountResponse(request);
+ // send event for new user to Kafka
+ try {
+ userEventsProducer.sendKafkaMessage(new UserEventsMessageTemplate(response.getAccountDetails().getAccountId(),
+ Constants.EventType.SIGN_UP, ""));
+ }catch (Exception e) {
+ logger.error("Error sending event to Kafka for account id {}", response.getAccountDetails().getAccountId());
+ logger.debug("Error sending event to Kafka for account id {}, {}", response.getAccountDetails().getAccountId(),
+ e.getMessage());
+ }
logger.info("SUCCESS: Completed pending account creation for account ID {}.", request.getAccountId());
responseObserver.onNext(response);
diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml
index 3e295124d4b43769e013e270b0cbd6ee539e35c6..879dcbaa39ecec1afa858e09adc1da683f827298 100644
--- a/src/main/resources/application-dev.yml
+++ b/src/main/resources/application-dev.yml
@@ -43,6 +43,11 @@ erlang-bridge:
host: localhost
port: 6580
+kafka:
+ host: 127.0.0.1
+ port: 9092
+ topic: userEventss
+
#Metrics related configurations
management:
endpoint:
diff --git a/src/main/resources/application-production.yml b/src/main/resources/application-production.yml
index c76cc62a22c1964068e5402212ad11f24b644236..f97fa408ddd1337766799ba344708638a7ba8351 100644
--- a/src/main/resources/application-production.yml
+++ b/src/main/resources/application-production.yml
@@ -36,6 +36,11 @@ erlang-bridge:
host: ${BRIDGE_HOST}
port: ${BRIDGE_PORT}
+kafka:
+ host: ${KAFKA_HOST:kafka.kafka.svc.cluster.local}
+ port: ${KAFKA_PORT:9092}
+ topic: ${KAFKA_TOPIC:userEvents}
+
#Metrics related configurations
management:
endpoint: