From fe84b28211adcb6afabd124992d009557daba1e5 Mon Sep 17 00:00:00 2001 From: Angel Botev Date: Thu, 25 Jul 2019 16:10:45 +0300 Subject: [PATCH 1/2] NY-7892_publicsh_kafka_event - add kafka config, constants and template; - add send event in complete pending account; - add configurations; Signed-off-by: Angel Botev --- pom.xml | 5 ++ .../java/biz/nynja/account/Application.java | 3 +- .../biz/nynja/account/kafka/Constants.java | 11 ++++ .../account/kafka/KafkaProducerConfig.java | 54 +++++++++++++++++++ .../kafka/UserEventsMessageTemplate.java | 47 ++++++++++++++++ .../account/kafka/UserEventsProducer.java | 31 +++++++++++ .../account/services/AccountServiceImpl.java | 17 +++++- src/main/resources/application-dev.yml | 5 ++ src/main/resources/application-production.yml | 5 ++ 9 files changed, 176 insertions(+), 2 deletions(-) create mode 100644 src/main/java/biz/nynja/account/kafka/Constants.java create mode 100644 src/main/java/biz/nynja/account/kafka/KafkaProducerConfig.java create mode 100644 src/main/java/biz/nynja/account/kafka/UserEventsMessageTemplate.java create mode 100644 src/main/java/biz/nynja/account/kafka/UserEventsProducer.java diff --git a/pom.xml b/pom.xml index bca6168..e3c914b 100644 --- a/pom.xml +++ b/pom.xml @@ -184,6 +184,11 @@ 1.4.0 + + org.springframework.kafka + spring-kafka + 2.2.7.RELEASE + diff --git a/src/main/java/biz/nynja/account/Application.java b/src/main/java/biz/nynja/account/Application.java index cffdaaf..135dda1 100644 --- a/src/main/java/biz/nynja/account/Application.java +++ b/src/main/java/biz/nynja/account/Application.java @@ -8,11 +8,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; /** * Main entry point class. */ -@SpringBootApplication +@SpringBootApplication(exclude = KafkaAutoConfiguration.class) public class Application { private static final Logger LOGGER = LogManager.getLogger(Application.class); diff --git a/src/main/java/biz/nynja/account/kafka/Constants.java b/src/main/java/biz/nynja/account/kafka/Constants.java new file mode 100644 index 0000000..0205d5c --- /dev/null +++ b/src/main/java/biz/nynja/account/kafka/Constants.java @@ -0,0 +1,11 @@ +/** + * Copyright (C) 2018 Nynja Inc. All rights reserved. + */ +package biz.nynja.account.kafka; + +public interface Constants { + + interface EventType { + String SIGN_UP = "signup"; + } +} diff --git a/src/main/java/biz/nynja/account/kafka/KafkaProducerConfig.java b/src/main/java/biz/nynja/account/kafka/KafkaProducerConfig.java new file mode 100644 index 0000000..b9f2ebc --- /dev/null +++ b/src/main/java/biz/nynja/account/kafka/KafkaProducerConfig.java @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2018 Nynja Inc. All rights reserved. + */ +package biz.nynja.account.kafka; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +@Configuration +public class KafkaProducerConfig { + + @Value("${kafka.host}") + private String KAFKA_HOST; + @Value("${kafka.port}") + private String KAFKA_PORT; + @Value("${kafka.topic}") + private String KAFKA_TOPIC; + + @Bean(name = "kafkaTopic") + public String kafkaTopic() { + return KAFKA_TOPIC; + } + + // ======= KAFKA PRODUCER CONFIGURATION =========== + + @Bean + public ProducerFactory producerFactory() { + + Map configProps = new HashMap(3); + + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT); + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + + return new DefaultKafkaProducerFactory(configProps); + + } + + @Bean + public KafkaTemplate kafkaTemplate() { + return new KafkaTemplate(producerFactory()); + } + +} diff --git a/src/main/java/biz/nynja/account/kafka/UserEventsMessageTemplate.java b/src/main/java/biz/nynja/account/kafka/UserEventsMessageTemplate.java new file mode 100644 index 0000000..0572237 --- /dev/null +++ b/src/main/java/biz/nynja/account/kafka/UserEventsMessageTemplate.java @@ -0,0 +1,47 @@ +/** + * Copyright (C) 2018 Nynja Inc. All rights reserved. + */ +package biz.nynja.account.kafka; + +public class UserEventsMessageTemplate { + + private String account_id; + private String event_type; + private String refercode; + + public UserEventsMessageTemplate(String account_id, String event_type, String refercode) { + this.account_id = account_id; + this.event_type = event_type; + this.refercode = refercode; + } + + public String getRefercode() { + return refercode; + } + + public void setRefercode(String refercode) { + this.refercode = refercode; + } + + public String getAccount_id() { + return account_id; + } + + public void setAccount_id(String account_id) { + this.account_id = account_id; + } + + public String getEvent_type() { + return event_type; + } + + public void setEvent_type(String event_type) { + this.event_type = event_type; + } + + @Override + public String toString() { + return "UserEventsMessageTemplate{" + "account_id='" + account_id + '\'' + ", event_type='" + event_type + '\'' + + ", refercode='" + refercode + '\'' + '}'; + } +} diff --git a/src/main/java/biz/nynja/account/kafka/UserEventsProducer.java b/src/main/java/biz/nynja/account/kafka/UserEventsProducer.java new file mode 100644 index 0000000..1d087de --- /dev/null +++ b/src/main/java/biz/nynja/account/kafka/UserEventsProducer.java @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2018 Nynja Inc. All rights reserved. + */ +package biz.nynja.account.kafka; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.stereotype.Component; + +@Component +public class UserEventsProducer { + + private static final Logger logger = LoggerFactory.getLogger(UserEventsProducer.class); + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + @Qualifier("kafkaTopic") + private String kafkaTopic; + + public void sendKafkaMessage(UserEventsMessageTemplate userEventsMessageTemplate) { + kafkaTemplate.send(kafkaTopic, userEventsMessageTemplate); + + logger.info("Message sent to kafka topic : {}", kafkaTopic); + + } +} diff --git a/src/main/java/biz/nynja/account/services/AccountServiceImpl.java b/src/main/java/biz/nynja/account/services/AccountServiceImpl.java index 457c784..7d0da1a 100644 --- a/src/main/java/biz/nynja/account/services/AccountServiceImpl.java +++ b/src/main/java/biz/nynja/account/services/AccountServiceImpl.java @@ -55,6 +55,9 @@ import biz.nynja.account.grpc.StatusResponse; import biz.nynja.account.grpc.UpdateAccountRequest; import biz.nynja.account.grpc.UpdateAuthenticationProviderRequest; import biz.nynja.account.grpc.UpdateSearchableOptionRequest; +import biz.nynja.account.kafka.Constants; +import biz.nynja.account.kafka.UserEventsMessageTemplate; +import biz.nynja.account.kafka.UserEventsProducer; import biz.nynja.account.models.Account; import biz.nynja.account.models.AccountByProfileId; import biz.nynja.account.models.AccountByQrCode; @@ -107,6 +110,7 @@ public class AccountServiceImpl extends AccountServiceGrpc.AccountServiceImplBas private final PermissionsValidator permissionsValidator; private final ProfileDataConfiguration profileDataConfiguration; private final AccessPointService accessPointService; + private final UserEventsProducer userEventsProducer; public AccountServiceImpl(AccountRepositoryAdditional accountRepositoryAdditional, ProfileRepository profileRepository, @@ -115,7 +119,8 @@ public class AccountServiceImpl extends AccountServiceGrpc.AccountServiceImplBas AccountByUsernameRepository accountByUsernameRepository, AccountProvider accountProvider, AccountByProfileIdRepository accountByProfileIdRepository, PhoneNumberNormalizer phoneNumberNormalizer, AccountCreator accountCreator, ProfileProvider profileProvider, PermissionsValidator permissionsValidator, - ProfileDataConfiguration profileDataConfiguration, AccessPointService accessPointService) { + ProfileDataConfiguration profileDataConfiguration, AccessPointService accessPointService, + UserEventsProducer userEventsProducer) { this.accountRepositoryAdditional = accountRepositoryAdditional; this.profileRepository = profileRepository; this.profileByAutheticationProviderRepository = profileByAutheticationProviderRepository; @@ -129,6 +134,7 @@ public class AccountServiceImpl extends AccountServiceGrpc.AccountServiceImplBas this.permissionsValidator = permissionsValidator; this.profileDataConfiguration = profileDataConfiguration; this.accessPointService = accessPointService; + this.userEventsProducer = userEventsProducer; } @Override @@ -499,6 +505,15 @@ public class AccountServiceImpl extends AccountServiceGrpc.AccountServiceImplBas logger.debug("Complete pending account creation...: {} ...", request); AccountResponse response = accountCreator.retrieveCompletePendingAccountResponse(request); + // send event for new user to Kafka + try { + userEventsProducer.sendKafkaMessage(new UserEventsMessageTemplate(response.getAccountDetails().getAccountId(), + Constants.EventType.SIGN_UP, "")); + }catch (Exception e) { + logger.error("Error sending event to Kafka for account id {}", response.getAccountDetails().getAccountId()); + logger.debug("Error sending event to Kafka for account id {}, {}", response.getAccountDetails().getAccountId(), + e.getMessage()); + } logger.info("SUCCESS: Completed pending account creation for account ID {}.", request.getAccountId()); responseObserver.onNext(response); diff --git a/src/main/resources/application-dev.yml b/src/main/resources/application-dev.yml index 3e29512..879dcba 100644 --- a/src/main/resources/application-dev.yml +++ b/src/main/resources/application-dev.yml @@ -43,6 +43,11 @@ erlang-bridge: host: localhost port: 6580 +kafka: + host: 127.0.0.1 + port: 9092 + topic: userEventss + #Metrics related configurations management: endpoint: diff --git a/src/main/resources/application-production.yml b/src/main/resources/application-production.yml index c76cc62..f97fa40 100644 --- a/src/main/resources/application-production.yml +++ b/src/main/resources/application-production.yml @@ -36,6 +36,11 @@ erlang-bridge: host: ${BRIDGE_HOST} port: ${BRIDGE_PORT} +kafka: + host: ${KAFKA_HOST:kafka.kafka.svc.cluster.local} + port: ${KAFKA_PORT:9092} + topic: ${KAFKA_TOPIC:userEvents} + #Metrics related configurations management: endpoint: -- GitLab From b8198ff611db725a3fc24a60e6f8aa5ba0152f9a Mon Sep 17 00:00:00 2001 From: Angel Botev Date: Thu, 25 Jul 2019 17:51:00 +0300 Subject: [PATCH 2/2] NY-7892_publish_kafka_event - bump version and move logic in AccountCreator; Signed-off-by: Angel Botev --- charts/account-service/Chart.yaml | 2 +- releases/staging/account-service.yaml | 2 +- .../account/services/AccountServiceImpl.java | 17 +--------------- .../decomposition/AccountCreator.java | 20 ++++++++++++++++++- 4 files changed, 22 insertions(+), 19 deletions(-) diff --git a/charts/account-service/Chart.yaml b/charts/account-service/Chart.yaml index 5f041fe..0d67982 100644 --- a/charts/account-service/Chart.yaml +++ b/charts/account-service/Chart.yaml @@ -2,4 +2,4 @@ apiVersion: v1 appVersion: "1.0" description: Deployment of the nynja account service. name: account-service -version: 0.1.8 +version: 0.1.9 diff --git a/releases/staging/account-service.yaml b/releases/staging/account-service.yaml index 913f070..c4341b6 100644 --- a/releases/staging/account-service.yaml +++ b/releases/staging/account-service.yaml @@ -8,7 +8,7 @@ spec: chart: repository: https://nynjagroup.jfrog.io/nynjagroup/helm/ name: account-service - version: 0.1.8 + version: 0.1.9 values: replicaCount: 2 diff --git a/src/main/java/biz/nynja/account/services/AccountServiceImpl.java b/src/main/java/biz/nynja/account/services/AccountServiceImpl.java index 7d0da1a..457c784 100644 --- a/src/main/java/biz/nynja/account/services/AccountServiceImpl.java +++ b/src/main/java/biz/nynja/account/services/AccountServiceImpl.java @@ -55,9 +55,6 @@ import biz.nynja.account.grpc.StatusResponse; import biz.nynja.account.grpc.UpdateAccountRequest; import biz.nynja.account.grpc.UpdateAuthenticationProviderRequest; import biz.nynja.account.grpc.UpdateSearchableOptionRequest; -import biz.nynja.account.kafka.Constants; -import biz.nynja.account.kafka.UserEventsMessageTemplate; -import biz.nynja.account.kafka.UserEventsProducer; import biz.nynja.account.models.Account; import biz.nynja.account.models.AccountByProfileId; import biz.nynja.account.models.AccountByQrCode; @@ -110,7 +107,6 @@ public class AccountServiceImpl extends AccountServiceGrpc.AccountServiceImplBas private final PermissionsValidator permissionsValidator; private final ProfileDataConfiguration profileDataConfiguration; private final AccessPointService accessPointService; - private final UserEventsProducer userEventsProducer; public AccountServiceImpl(AccountRepositoryAdditional accountRepositoryAdditional, ProfileRepository profileRepository, @@ -119,8 +115,7 @@ public class AccountServiceImpl extends AccountServiceGrpc.AccountServiceImplBas AccountByUsernameRepository accountByUsernameRepository, AccountProvider accountProvider, AccountByProfileIdRepository accountByProfileIdRepository, PhoneNumberNormalizer phoneNumberNormalizer, AccountCreator accountCreator, ProfileProvider profileProvider, PermissionsValidator permissionsValidator, - ProfileDataConfiguration profileDataConfiguration, AccessPointService accessPointService, - UserEventsProducer userEventsProducer) { + ProfileDataConfiguration profileDataConfiguration, AccessPointService accessPointService) { this.accountRepositoryAdditional = accountRepositoryAdditional; this.profileRepository = profileRepository; this.profileByAutheticationProviderRepository = profileByAutheticationProviderRepository; @@ -134,7 +129,6 @@ public class AccountServiceImpl extends AccountServiceGrpc.AccountServiceImplBas this.permissionsValidator = permissionsValidator; this.profileDataConfiguration = profileDataConfiguration; this.accessPointService = accessPointService; - this.userEventsProducer = userEventsProducer; } @Override @@ -505,15 +499,6 @@ public class AccountServiceImpl extends AccountServiceGrpc.AccountServiceImplBas logger.debug("Complete pending account creation...: {} ...", request); AccountResponse response = accountCreator.retrieveCompletePendingAccountResponse(request); - // send event for new user to Kafka - try { - userEventsProducer.sendKafkaMessage(new UserEventsMessageTemplate(response.getAccountDetails().getAccountId(), - Constants.EventType.SIGN_UP, "")); - }catch (Exception e) { - logger.error("Error sending event to Kafka for account id {}", response.getAccountDetails().getAccountId()); - logger.debug("Error sending event to Kafka for account id {}, {}", response.getAccountDetails().getAccountId(), - e.getMessage()); - } logger.info("SUCCESS: Completed pending account creation for account ID {}.", request.getAccountId()); responseObserver.onNext(response); diff --git a/src/main/java/biz/nynja/account/services/decomposition/AccountCreator.java b/src/main/java/biz/nynja/account/services/decomposition/AccountCreator.java index 59a3347..1af73ba 100644 --- a/src/main/java/biz/nynja/account/services/decomposition/AccountCreator.java +++ b/src/main/java/biz/nynja/account/services/decomposition/AccountCreator.java @@ -20,6 +20,9 @@ import biz.nynja.account.grpc.CreatePendingAccountRequest; import biz.nynja.account.grpc.CreatePendingAccountResponse; import biz.nynja.account.grpc.ErrorResponse; import biz.nynja.account.grpc.ErrorResponse.Cause; +import biz.nynja.account.kafka.Constants; +import biz.nynja.account.kafka.UserEventsMessageTemplate; +import biz.nynja.account.kafka.UserEventsProducer; import biz.nynja.account.grpc.Role; import biz.nynja.account.models.Account; import biz.nynja.account.models.AuthenticationProvider; @@ -38,12 +41,15 @@ public class AccountCreator { private final PendingAccountRepository pendingAccountRepository; private final AccountRepositoryAdditional accountRepositoryAdditional; private final PhoneNumberNormalizer phoneNumberNormalizer; + private final UserEventsProducer userEventsProducer; public AccountCreator(PendingAccountRepository pendingAccountRepository, - AccountRepositoryAdditional accountRepositoryAdditional, PhoneNumberNormalizer phoneNumberNormalizer) { + AccountRepositoryAdditional accountRepositoryAdditional, PhoneNumberNormalizer phoneNumberNormalizer, + UserEventsProducer userEventsProducer) { this.pendingAccountRepository = pendingAccountRepository; this.accountRepositoryAdditional = accountRepositoryAdditional; this.phoneNumberNormalizer = phoneNumberNormalizer; + this.userEventsProducer = userEventsProducer; } /** @@ -159,6 +165,8 @@ public class AccountCreator { logger.debug("Account \"{}\" saved into the DB", createdAccount); AccountDetails details = createdAccount.toProto(); logger.debug("Account: \"{}\" created successfully.", response); + logger.debug("Send event for new account to Kafka."); + sendEventToKafka(details.getAccountId()); return AccountResponse.newBuilder().setAccountDetails(details).build(); } } @@ -191,4 +199,14 @@ public class AccountCreator { logger.error(logMessage, logValue); return newBuilder.setError(ErrorResponse.newBuilder().setCause(cause).setMessage(errorMessage)).build(); } + + private void sendEventToKafka(String accountId) { + // send event for new user to Kafka + try { + userEventsProducer.sendKafkaMessage(new UserEventsMessageTemplate(accountId, Constants.EventType.SIGN_UP, "")); + }catch (Exception e) { + logger.error("Error sending event to Kafka for account id {}", accountId); + logger.debug("Error sending event to Kafka for account id {}, {}", accountId, e.getMessage()); + } + } } -- GitLab