diff --git a/charts/account-service/Chart.yaml b/charts/account-service/Chart.yaml
index 5f041fe4151402092bee5f4ebab5819f4698b57e..0d67982822af27e0788532f4bf0f8ab41e8071c3 100644
--- a/charts/account-service/Chart.yaml
+++ b/charts/account-service/Chart.yaml
@@ -2,4 +2,4 @@ apiVersion: v1
appVersion: "1.0"
description: Deployment of the nynja account service.
name: account-service
-version: 0.1.8
+version: 0.1.9
diff --git a/pom.xml b/pom.xml
index bca61689965759ab68561d55244fb3607567964c..e3c914b8c50481e00def69206f602fdf6187ffaf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -184,6 +184,11 @@
1.4.0
+
+ org.springframework.kafka
+ spring-kafka
+ 2.2.7.RELEASE
+
diff --git a/releases/staging/account-service.yaml b/releases/staging/account-service.yaml
index 913f070897881fbb54c330ac25b9eca6acb02118..c4341b66ac6d85faeb6a26ac44f8216369ff5318 100644
--- a/releases/staging/account-service.yaml
+++ b/releases/staging/account-service.yaml
@@ -8,7 +8,7 @@ spec:
chart:
repository: https://nynjagroup.jfrog.io/nynjagroup/helm/
name: account-service
- version: 0.1.8
+ version: 0.1.9
values:
replicaCount: 2
diff --git a/src/main/java/biz/nynja/account/Application.java b/src/main/java/biz/nynja/account/Application.java
index cffdaaf1b5f28ec8a9924d0f0389a0cd29676a73..135dda199aaf2782d8e9b958983107f0fa995a66 100644
--- a/src/main/java/biz/nynja/account/Application.java
+++ b/src/main/java/biz/nynja/account/Application.java
@@ -8,11 +8,12 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
/**
* Main entry point class.
*/
-@SpringBootApplication
+@SpringBootApplication(exclude = KafkaAutoConfiguration.class)
public class Application {
private static final Logger LOGGER = LogManager.getLogger(Application.class);
diff --git a/src/main/java/biz/nynja/account/kafka/Constants.java b/src/main/java/biz/nynja/account/kafka/Constants.java
new file mode 100644
index 0000000000000000000000000000000000000000..0205d5c6b54eddf28cbedeaf8e5329e7427fd259
--- /dev/null
+++ b/src/main/java/biz/nynja/account/kafka/Constants.java
@@ -0,0 +1,11 @@
+/**
+ * Copyright (C) 2018 Nynja Inc. All rights reserved.
+ */
+package biz.nynja.account.kafka;
+
+public interface Constants {
+
+ interface EventType {
+ String SIGN_UP = "signup";
+ }
+}
diff --git a/src/main/java/biz/nynja/account/kafka/KafkaProducerConfig.java b/src/main/java/biz/nynja/account/kafka/KafkaProducerConfig.java
new file mode 100644
index 0000000000000000000000000000000000000000..b9f2ebc2cd176cf6e2e1e84a0dec842941df02fe
--- /dev/null
+++ b/src/main/java/biz/nynja/account/kafka/KafkaProducerConfig.java
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2018 Nynja Inc. All rights reserved.
+ */
+package biz.nynja.account.kafka;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+@Configuration
+public class KafkaProducerConfig {
+
+ @Value("${kafka.host}")
+ private String KAFKA_HOST;
+ @Value("${kafka.port}")
+ private String KAFKA_PORT;
+ @Value("${kafka.topic}")
+ private String KAFKA_TOPIC;
+
+ @Bean(name = "kafkaTopic")
+ public String kafkaTopic() {
+ return KAFKA_TOPIC;
+ }
+
+ // ======= KAFKA PRODUCER CONFIGURATION ===========
+
+ @Bean
+ public ProducerFactory producerFactory() {
+
+ Map configProps = new HashMap(3);
+
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT);
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+
+ return new DefaultKafkaProducerFactory(configProps);
+
+ }
+
+ @Bean
+ public KafkaTemplate kafkaTemplate() {
+ return new KafkaTemplate(producerFactory());
+ }
+
+}
diff --git a/src/main/java/biz/nynja/account/kafka/UserEventsMessageTemplate.java b/src/main/java/biz/nynja/account/kafka/UserEventsMessageTemplate.java
new file mode 100644
index 0000000000000000000000000000000000000000..0572237b3c552d78cd5266262cb4dedd016759a6
--- /dev/null
+++ b/src/main/java/biz/nynja/account/kafka/UserEventsMessageTemplate.java
@@ -0,0 +1,47 @@
+/**
+ * Copyright (C) 2018 Nynja Inc. All rights reserved.
+ */
+package biz.nynja.account.kafka;
+
+public class UserEventsMessageTemplate {
+
+ private String account_id;
+ private String event_type;
+ private String refercode;
+
+ public UserEventsMessageTemplate(String account_id, String event_type, String refercode) {
+ this.account_id = account_id;
+ this.event_type = event_type;
+ this.refercode = refercode;
+ }
+
+ public String getRefercode() {
+ return refercode;
+ }
+
+ public void setRefercode(String refercode) {
+ this.refercode = refercode;
+ }
+
+ public String getAccount_id() {
+ return account_id;
+ }
+
+ public void setAccount_id(String account_id) {
+ this.account_id = account_id;
+ }
+
+ public String getEvent_type() {
+ return event_type;
+ }
+
+ public void setEvent_type(String event_type) {
+ this.event_type = event_type;
+ }
+
+ @Override
+ public String toString() {
+ return "UserEventsMessageTemplate{" + "account_id='" + account_id + '\'' + ", event_type='" + event_type + '\''
+ + ", refercode='" + refercode + '\'' + '}';
+ }
+}
diff --git a/src/main/java/biz/nynja/account/kafka/UserEventsProducer.java b/src/main/java/biz/nynja/account/kafka/UserEventsProducer.java
new file mode 100644
index 0000000000000000000000000000000000000000..1d087de05f2c48a2f21a9befd276b234b86e962c
--- /dev/null
+++ b/src/main/java/biz/nynja/account/kafka/UserEventsProducer.java
@@ -0,0 +1,31 @@
+/**
+ * Copyright (C) 2018 Nynja Inc. All rights reserved.
+ */
+package biz.nynja.account.kafka;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+@Component
+public class UserEventsProducer {
+
+ private static final Logger logger = LoggerFactory.getLogger(UserEventsProducer.class);
+
+ @Autowired
+ private KafkaTemplate kafkaTemplate;
+
+ @Autowired
+ @Qualifier("kafkaTopic")
+ private String kafkaTopic;
+
+ public void sendKafkaMessage(UserEventsMessageTemplate userEventsMessageTemplate) {
+ kafkaTemplate.send(kafkaTopic, userEventsMessageTemplate);
+
+ logger.info("Message sent to kafka topic : {}", kafkaTopic);
+
+ }
+}
diff --git a/src/main/java/biz/nynja/account/services/decomposition/AccountCreator.java b/src/main/java/biz/nynja/account/services/decomposition/AccountCreator.java
index 59a3347677cb6e4866e57fad34ccde53a87f0ccf..1af73ba703e2c3bdc35e70c494c7a97d3eb63bf5 100644
--- a/src/main/java/biz/nynja/account/services/decomposition/AccountCreator.java
+++ b/src/main/java/biz/nynja/account/services/decomposition/AccountCreator.java
@@ -20,6 +20,9 @@ import biz.nynja.account.grpc.CreatePendingAccountRequest;
import biz.nynja.account.grpc.CreatePendingAccountResponse;
import biz.nynja.account.grpc.ErrorResponse;
import biz.nynja.account.grpc.ErrorResponse.Cause;
+import biz.nynja.account.kafka.Constants;
+import biz.nynja.account.kafka.UserEventsMessageTemplate;
+import biz.nynja.account.kafka.UserEventsProducer;
import biz.nynja.account.grpc.Role;
import biz.nynja.account.models.Account;
import biz.nynja.account.models.AuthenticationProvider;
@@ -38,12 +41,15 @@ public class AccountCreator {
private final PendingAccountRepository pendingAccountRepository;
private final AccountRepositoryAdditional accountRepositoryAdditional;
private final PhoneNumberNormalizer phoneNumberNormalizer;
+ private final UserEventsProducer userEventsProducer;
public AccountCreator(PendingAccountRepository pendingAccountRepository,
- AccountRepositoryAdditional accountRepositoryAdditional, PhoneNumberNormalizer phoneNumberNormalizer) {
+ AccountRepositoryAdditional accountRepositoryAdditional, PhoneNumberNormalizer phoneNumberNormalizer,
+ UserEventsProducer userEventsProducer) {
this.pendingAccountRepository = pendingAccountRepository;
this.accountRepositoryAdditional = accountRepositoryAdditional;
this.phoneNumberNormalizer = phoneNumberNormalizer;
+ this.userEventsProducer = userEventsProducer;
}
/**
@@ -159,6 +165,8 @@ public class AccountCreator {
logger.debug("Account \"{}\" saved into the DB", createdAccount);
AccountDetails details = createdAccount.toProto();
logger.debug("Account: \"{}\" created successfully.", response);
+ logger.debug("Send event for new account to Kafka.");
+ sendEventToKafka(details.getAccountId());
return AccountResponse.newBuilder().setAccountDetails(details).build();
}
}
@@ -191,4 +199,14 @@ public class AccountCreator {
logger.error(logMessage, logValue);
return newBuilder.setError(ErrorResponse.newBuilder().setCause(cause).setMessage(errorMessage)).build();
}
+
+ private void sendEventToKafka(String accountId) {
+ // send event for new user to Kafka
+ try {
+ userEventsProducer.sendKafkaMessage(new UserEventsMessageTemplate(accountId, Constants.EventType.SIGN_UP, ""));
+ }catch (Exception e) {
+ logger.error("Error sending event to Kafka for account id {}", accountId);
+ logger.debug("Error sending event to Kafka for account id {}, {}", accountId, e.getMessage());
+ }
+ }
}
diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml
index 3e295124d4b43769e013e270b0cbd6ee539e35c6..879dcbaa39ecec1afa858e09adc1da683f827298 100644
--- a/src/main/resources/application-dev.yml
+++ b/src/main/resources/application-dev.yml
@@ -43,6 +43,11 @@ erlang-bridge:
host: localhost
port: 6580
+kafka:
+ host: 127.0.0.1
+ port: 9092
+ topic: userEventss
+
#Metrics related configurations
management:
endpoint:
diff --git a/src/main/resources/application-production.yml b/src/main/resources/application-production.yml
index c76cc62a22c1964068e5402212ad11f24b644236..f97fa408ddd1337766799ba344708638a7ba8351 100644
--- a/src/main/resources/application-production.yml
+++ b/src/main/resources/application-production.yml
@@ -36,6 +36,11 @@ erlang-bridge:
host: ${BRIDGE_HOST}
port: ${BRIDGE_PORT}
+kafka:
+ host: ${KAFKA_HOST:kafka.kafka.svc.cluster.local}
+ port: ${KAFKA_PORT:9092}
+ topic: ${KAFKA_TOPIC:userEvents}
+
#Metrics related configurations
management:
endpoint: