diff --git a/charts/account-service/Chart.yaml b/charts/account-service/Chart.yaml index 5f041fe4151402092bee5f4ebab5819f4698b57e..0d67982822af27e0788532f4bf0f8ab41e8071c3 100644 --- a/charts/account-service/Chart.yaml +++ b/charts/account-service/Chart.yaml @@ -2,4 +2,4 @@ apiVersion: v1 appVersion: "1.0" description: Deployment of the nynja account service. name: account-service -version: 0.1.8 +version: 0.1.9 diff --git a/pom.xml b/pom.xml index bca61689965759ab68561d55244fb3607567964c..e3c914b8c50481e00def69206f602fdf6187ffaf 100644 --- a/pom.xml +++ b/pom.xml @@ -184,6 +184,11 @@ 1.4.0 + + org.springframework.kafka + spring-kafka + 2.2.7.RELEASE + diff --git a/releases/staging/account-service.yaml b/releases/staging/account-service.yaml index 913f070897881fbb54c330ac25b9eca6acb02118..c4341b66ac6d85faeb6a26ac44f8216369ff5318 100644 --- a/releases/staging/account-service.yaml +++ b/releases/staging/account-service.yaml @@ -8,7 +8,7 @@ spec: chart: repository: https://nynjagroup.jfrog.io/nynjagroup/helm/ name: account-service - version: 0.1.8 + version: 0.1.9 values: replicaCount: 2 diff --git a/src/main/java/biz/nynja/account/Application.java b/src/main/java/biz/nynja/account/Application.java index cffdaaf1b5f28ec8a9924d0f0389a0cd29676a73..135dda199aaf2782d8e9b958983107f0fa995a66 100644 --- a/src/main/java/biz/nynja/account/Application.java +++ b/src/main/java/biz/nynja/account/Application.java @@ -8,11 +8,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; /** * Main entry point class. */ -@SpringBootApplication +@SpringBootApplication(exclude = KafkaAutoConfiguration.class) public class Application { private static final Logger LOGGER = LogManager.getLogger(Application.class); diff --git a/src/main/java/biz/nynja/account/kafka/Constants.java b/src/main/java/biz/nynja/account/kafka/Constants.java new file mode 100644 index 0000000000000000000000000000000000000000..0205d5c6b54eddf28cbedeaf8e5329e7427fd259 --- /dev/null +++ b/src/main/java/biz/nynja/account/kafka/Constants.java @@ -0,0 +1,11 @@ +/** + * Copyright (C) 2018 Nynja Inc. All rights reserved. + */ +package biz.nynja.account.kafka; + +public interface Constants { + + interface EventType { + String SIGN_UP = "signup"; + } +} diff --git a/src/main/java/biz/nynja/account/kafka/KafkaProducerConfig.java b/src/main/java/biz/nynja/account/kafka/KafkaProducerConfig.java new file mode 100644 index 0000000000000000000000000000000000000000..b9f2ebc2cd176cf6e2e1e84a0dec842941df02fe --- /dev/null +++ b/src/main/java/biz/nynja/account/kafka/KafkaProducerConfig.java @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2018 Nynja Inc. All rights reserved. + */ +package biz.nynja.account.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +@Configuration +public class KafkaProducerConfig { + + @Value("${kafka.host}") + private String KAFKA_HOST; + @Value("${kafka.port}") + private String KAFKA_PORT; + @Value("${kafka.topic}") + private String KAFKA_TOPIC; + + @Bean(name = "kafkaTopic") + public String kafkaTopic() { + return KAFKA_TOPIC; + } + + // ======= KAFKA PRODUCER CONFIGURATION =========== + + @Bean + public ProducerFactory producerFactory() { + + Map configProps = new HashMap(3); + + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + + return new DefaultKafkaProducerFactory(configProps); + + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate(producerFactory()); + } + +} diff --git a/src/main/java/biz/nynja/account/kafka/UserEventsMessageTemplate.java b/src/main/java/biz/nynja/account/kafka/UserEventsMessageTemplate.java new file mode 100644 index 0000000000000000000000000000000000000000..0572237b3c552d78cd5266262cb4dedd016759a6 --- /dev/null +++ b/src/main/java/biz/nynja/account/kafka/UserEventsMessageTemplate.java @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2018 Nynja Inc. All rights reserved. + */ +package biz.nynja.account.kafka; + +public class UserEventsMessageTemplate { + + private String account_id; + private String event_type; + private String refercode; + + public UserEventsMessageTemplate(String account_id, String event_type, String refercode) { + this.account_id = account_id; + this.event_type = event_type; + this.refercode = refercode; + } + + public String getRefercode() { + return refercode; + } + + public void setRefercode(String refercode) { + this.refercode = refercode; + } + + public String getAccount_id() { + return account_id; + } + + public void setAccount_id(String account_id) { + this.account_id = account_id; + } + + public String getEvent_type() { + return event_type; + } + + public void setEvent_type(String event_type) { + this.event_type = event_type; + } + + @Override + public String toString() { + return "UserEventsMessageTemplate{" + "account_id='" + account_id + '\'' + ", event_type='" + event_type + '\'' + + ", refercode='" + refercode + '\'' + '}'; + } +} diff --git a/src/main/java/biz/nynja/account/kafka/UserEventsProducer.java b/src/main/java/biz/nynja/account/kafka/UserEventsProducer.java new file mode 100644 index 0000000000000000000000000000000000000000..1d087de05f2c48a2f21a9befd276b234b86e962c --- /dev/null +++ b/src/main/java/biz/nynja/account/kafka/UserEventsProducer.java @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2018 Nynja Inc. All rights reserved. + */ +package biz.nynja.account.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Component +public class UserEventsProducer { + + private static final Logger logger = LoggerFactory.getLogger(UserEventsProducer.class); + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + @Qualifier("kafkaTopic") + private String kafkaTopic; + + public void sendKafkaMessage(UserEventsMessageTemplate userEventsMessageTemplate) { + kafkaTemplate.send(kafkaTopic, userEventsMessageTemplate); + + logger.info("Message sent to kafka topic : {}", kafkaTopic); + + } +} diff --git a/src/main/java/biz/nynja/account/services/decomposition/AccountCreator.java b/src/main/java/biz/nynja/account/services/decomposition/AccountCreator.java index 59a3347677cb6e4866e57fad34ccde53a87f0ccf..1af73ba703e2c3bdc35e70c494c7a97d3eb63bf5 100644 --- a/src/main/java/biz/nynja/account/services/decomposition/AccountCreator.java +++ b/src/main/java/biz/nynja/account/services/decomposition/AccountCreator.java @@ -20,6 +20,9 @@ import biz.nynja.account.grpc.CreatePendingAccountRequest; import biz.nynja.account.grpc.CreatePendingAccountResponse; import biz.nynja.account.grpc.ErrorResponse; import biz.nynja.account.grpc.ErrorResponse.Cause; +import biz.nynja.account.kafka.Constants; +import biz.nynja.account.kafka.UserEventsMessageTemplate; +import biz.nynja.account.kafka.UserEventsProducer; import biz.nynja.account.grpc.Role; import biz.nynja.account.models.Account; import biz.nynja.account.models.AuthenticationProvider; @@ -38,12 +41,15 @@ public class AccountCreator { private final PendingAccountRepository pendingAccountRepository; private final AccountRepositoryAdditional accountRepositoryAdditional; private final PhoneNumberNormalizer phoneNumberNormalizer; + private final UserEventsProducer userEventsProducer; public AccountCreator(PendingAccountRepository pendingAccountRepository, - AccountRepositoryAdditional accountRepositoryAdditional, PhoneNumberNormalizer phoneNumberNormalizer) { + AccountRepositoryAdditional accountRepositoryAdditional, PhoneNumberNormalizer phoneNumberNormalizer, + UserEventsProducer userEventsProducer) { this.pendingAccountRepository = pendingAccountRepository; this.accountRepositoryAdditional = accountRepositoryAdditional; this.phoneNumberNormalizer = phoneNumberNormalizer; + this.userEventsProducer = userEventsProducer; } /** @@ -159,6 +165,8 @@ public class AccountCreator { logger.debug("Account \"{}\" saved into the DB", createdAccount); AccountDetails details = createdAccount.toProto(); logger.debug("Account: \"{}\" created successfully.", response); + logger.debug("Send event for new account to Kafka."); + sendEventToKafka(details.getAccountId()); return AccountResponse.newBuilder().setAccountDetails(details).build(); } } @@ -191,4 +199,14 @@ public class AccountCreator { logger.error(logMessage, logValue); return newBuilder.setError(ErrorResponse.newBuilder().setCause(cause).setMessage(errorMessage)).build(); } + + private void sendEventToKafka(String accountId) { + // send event for new user to Kafka + try { + userEventsProducer.sendKafkaMessage(new UserEventsMessageTemplate(accountId, Constants.EventType.SIGN_UP, "")); + }catch (Exception e) { + logger.error("Error sending event to Kafka for account id {}", accountId); + logger.debug("Error sending event to Kafka for account id {}, {}", accountId, e.getMessage()); + } + } } diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 3e295124d4b43769e013e270b0cbd6ee539e35c6..879dcbaa39ecec1afa858e09adc1da683f827298 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -43,6 +43,11 @@ erlang-bridge: host: localhost port: 6580 +kafka: + host: 127.0.0.1 + port: 9092 + topic: userEventss + #Metrics related configurations management: endpoint: diff --git a/src/main/resources/application-production.yml b/src/main/resources/application-production.yml index c76cc62a22c1964068e5402212ad11f24b644236..f97fa408ddd1337766799ba344708638a7ba8351 100644 --- a/src/main/resources/application-production.yml +++ b/src/main/resources/application-production.yml @@ -36,6 +36,11 @@ erlang-bridge: host: ${BRIDGE_HOST} port: ${BRIDGE_PORT} +kafka: + host: ${KAFKA_HOST:kafka.kafka.svc.cluster.local} + port: ${KAFKA_PORT:9092} + topic: ${KAFKA_TOPIC:userEvents} + #Metrics related configurations management: endpoint: