From 1973f5fd26f9025f09abb5a8220aabbd2623a4b2 Mon Sep 17 00:00:00 2001 From: Oleg Zhymolokhov Date: Wed, 14 Nov 2018 17:36:42 +0200 Subject: [PATCH] events-subscription-change: Added configurable margin for 20 blocks for Transfer event. Changed TransferEvent subscription to obtain transaction hash additionally to event data. Refactored error handling of some "json-rpc"-related functionality. Some other changes. --- .../walletservice/config/ConfigValues.java | 3 + .../walletservice/config/Web3jWarmUp.java | 14 ++-- .../controller/ControllerHelper.java | 2 +- .../controller/EthereumController.java | 9 +-- .../controller/TransactionController.java | 2 +- .../messagebroker/TokenBalanceChangeDto.java | 5 +- .../walletservice/service/TokenService.java | 12 ++- .../walletservice/service/Web3JService.java | 74 ++++++++++++------- .../service/listener/EventListener.java | 2 +- .../service/listener/MintEventHandler.java | 2 +- .../listener/TransferEventHandler.java | 63 ++++++++++++---- .../operation/TokenOperationFactory.java | 2 +- .../operation/impl/BalanceOperation.java | 8 +- src/main/resources/application.yml | 2 + 14 files changed, 133 insertions(+), 67 deletions(-) diff --git a/src/main/java/com/nynja/walletservice/config/ConfigValues.java b/src/main/java/com/nynja/walletservice/config/ConfigValues.java index 4eebc77..072cc96 100644 --- a/src/main/java/com/nynja/walletservice/config/ConfigValues.java +++ b/src/main/java/com/nynja/walletservice/config/ConfigValues.java @@ -34,6 +34,9 @@ public class ConfigValues { @Value("${wallet-discovery.search-job-pool-size}") private int poolSize; + @Value("${ethereum.event.margin}") + private BigInteger blockSubscriptionMargin; + public BigInteger gasPrice() { return BigInteger.valueOf(gasPrice); } diff --git a/src/main/java/com/nynja/walletservice/config/Web3jWarmUp.java b/src/main/java/com/nynja/walletservice/config/Web3jWarmUp.java index a109a64..c43cdf4 100644 --- a/src/main/java/com/nynja/walletservice/config/Web3jWarmUp.java +++ b/src/main/java/com/nynja/walletservice/config/Web3jWarmUp.java @@ -1,26 +1,25 @@ package com.nynja.walletservice.config; import com.nynja.walletservice.provider.Web3jProvider; +import com.nynja.walletservice.service.Web3JService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.ContextRefreshedEvent; -import org.web3j.protocol.core.DefaultBlockParameterName; -import org.web3j.protocol.core.methods.response.EthGetBalance; - -import java.math.BigInteger; @Configuration @Slf4j public class Web3jWarmUp implements ApplicationListener { + private Web3JService web3JService; private Web3jProvider web3jProvider; private ConfigValues configValues; @Autowired - public Web3jWarmUp(Web3jProvider web3jProvider, ConfigValues configValues) { + public Web3jWarmUp(Web3jProvider web3jProvider, Web3JService web3JService, ConfigValues configValues) { this.web3jProvider = web3jProvider; + this.web3JService = web3JService; this.configValues = configValues; } @@ -28,10 +27,7 @@ public class Web3jWarmUp implements ApplicationListener { public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) { log.info("Warming up the Web3j client"); web3jProvider.get().ethGasPrice().sendAsync(); - web3jProvider.get().ethGetBalance(configValues.getAdminAddress(), DefaultBlockParameterName.LATEST) - .sendAsync() - .thenApplyAsync(EthGetBalance::getBalance) - .thenApplyAsync(BigInteger::toString) + web3JService.getBalance(configValues.getAdminAddress()) .thenAcceptAsync(log::info) .exceptionally(ex -> { log.error(ex.getMessage(), ex); diff --git a/src/main/java/com/nynja/walletservice/controller/ControllerHelper.java b/src/main/java/com/nynja/walletservice/controller/ControllerHelper.java index 6698c15..fd6bb95 100644 --- a/src/main/java/com/nynja/walletservice/controller/ControllerHelper.java +++ b/src/main/java/com/nynja/walletservice/controller/ControllerHelper.java @@ -5,7 +5,7 @@ import org.springframework.http.ResponseEntity; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; -public class ControllerHelper { +class ControllerHelper { static CompletableFuture> handleAndRespondAsync(Supplier supplier) { return CompletableFuture.supplyAsync(supplier).thenApplyAsync(ResponseEntity::ok); diff --git a/src/main/java/com/nynja/walletservice/controller/EthereumController.java b/src/main/java/com/nynja/walletservice/controller/EthereumController.java index 9f077a9..f205f4e 100644 --- a/src/main/java/com/nynja/walletservice/controller/EthereumController.java +++ b/src/main/java/com/nynja/walletservice/controller/EthereumController.java @@ -10,11 +10,9 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; -import org.web3j.protocol.core.methods.response.EthGetBalance; import org.web3j.utils.Convert; import java.math.BigDecimal; -import java.math.BigInteger; import java.util.concurrent.CompletableFuture; import static com.nynja.walletservice.constant.Constants.DataTypes.LONG; @@ -43,8 +41,9 @@ public class EthereumController { } @GetMapping(API_VERSION + "/version") - public ResponseEntity checkClientVersion() { - return ResponseEntity.ok(web3JService.getClientVersion()); + public CompletableFuture> checkClientVersion() { + return web3JService.getClientVersion() + .thenApplyAsync(ResponseEntity::ok); } @ApiOperation(value = "Get balance of an ethereum account", httpMethod = "GET") @@ -56,8 +55,6 @@ public class EthereumController { @GetMapping(API_VERSION + "/address-balance") public CompletableFuture> getBalanceForAddress(@NotBlank(message = ADDRESS_VALIDATION_MESSAGE) @RequestParam String address) { return web3JService.getBalance(address) - .thenApplyAsync(EthGetBalance::getBalance) - .thenApplyAsync(BigInteger::toString) .thenApplyAsync(ResponseEntity::ok); } diff --git a/src/main/java/com/nynja/walletservice/controller/TransactionController.java b/src/main/java/com/nynja/walletservice/controller/TransactionController.java index 49ae3ed..4e7587a 100644 --- a/src/main/java/com/nynja/walletservice/controller/TransactionController.java +++ b/src/main/java/com/nynja/walletservice/controller/TransactionController.java @@ -52,7 +52,7 @@ public class TransactionController { @ApiResponse(code = 200, message = RETRIEVED) }) @GetMapping(API_VERSION + "/balance-for-address") - public CompletableFuture> getBalanceByAddress(@NotBlank(message = ADDRESS_VALIDATION_MESSAGE) @RequestParam String walletAddress) { + public CompletableFuture> getBalanceByAddress(@NotBlank(message = ADDRESS_VALIDATION_MESSAGE) @RequestParam String walletAddress) { return tokenService.tokenBalanceByAddress(walletAddress).thenApplyAsync(ResponseEntity::ok); } diff --git a/src/main/java/com/nynja/walletservice/dto/messagebroker/TokenBalanceChangeDto.java b/src/main/java/com/nynja/walletservice/dto/messagebroker/TokenBalanceChangeDto.java index 60b125a..8c1b518 100644 --- a/src/main/java/com/nynja/walletservice/dto/messagebroker/TokenBalanceChangeDto.java +++ b/src/main/java/com/nynja/walletservice/dto/messagebroker/TokenBalanceChangeDto.java @@ -5,6 +5,8 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; +import java.math.BigInteger; + @Data @NoArgsConstructor @AllArgsConstructor @@ -13,5 +15,6 @@ public class TokenBalanceChangeDto { private String from; private String to; private String amount; - private Long balance; + private BigInteger balance; + private String txHash; } diff --git a/src/main/java/com/nynja/walletservice/service/TokenService.java b/src/main/java/com/nynja/walletservice/service/TokenService.java index de4fda0..491595f 100644 --- a/src/main/java/com/nynja/walletservice/service/TokenService.java +++ b/src/main/java/com/nynja/walletservice/service/TokenService.java @@ -14,6 +14,7 @@ import org.web3j.utils.Convert; import java.math.BigDecimal; import java.math.BigInteger; +import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.function.Function; @@ -51,9 +52,16 @@ public class TokenService { return contractProvider.deployNynjaCoinContract(); } - public CompletableFuture tokenBalanceByAddress(String address) { + public CompletableFuture tokenBalanceByAddress(String address) { return operationFactory.balanceOperation(ethConfig.getAdminCredentials(), address) - .execute().thenApplyAsync(BigInteger::longValue); + .execute() + .thenApplyAsync(balance -> { + if(Objects.isNull(balance)) { + throw new RuntimeException("SmartContract returned empty value"); + } else { + return balance; + } + }); } //TODO For demo purposes. Remove after. diff --git a/src/main/java/com/nynja/walletservice/service/Web3JService.java b/src/main/java/com/nynja/walletservice/service/Web3JService.java index df2819c..d24f140 100644 --- a/src/main/java/com/nynja/walletservice/service/Web3JService.java +++ b/src/main/java/com/nynja/walletservice/service/Web3JService.java @@ -13,14 +13,15 @@ import org.springframework.stereotype.Service; import org.web3j.crypto.Credentials; import org.web3j.protocol.Web3j; import org.web3j.protocol.core.DefaultBlockParameterName; +import org.web3j.protocol.core.Response; import org.web3j.protocol.core.methods.response.*; import org.web3j.tx.RawTransactionManager; import org.web3j.tx.Transfer; import org.web3j.utils.Convert; -import java.io.IOException; import java.math.BigDecimal; import java.math.BigInteger; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import static com.nynja.walletservice.constant.Constants.LogMessages.*; @@ -42,15 +43,11 @@ public class Web3JService { this.conf = conf; } - public String getClientVersion() { - Web3ClientVersion web3ClientVersion; - try { - web3ClientVersion = web3jProvider.get().web3ClientVersion().send(); - } catch (IOException e) { - log.error("Error obtaining web3j version."); - throw new RuntimeException(e); - } - return web3ClientVersion.getWeb3ClientVersion(); + public CompletableFuture getClientVersion() { + return web3j().web3ClientVersion() + .sendAsync() + .thenApplyAsync(this::validateResponse) + .thenApplyAsync(Web3ClientVersion::getWeb3ClientVersion); } public CompletableFuture> sendEtherAsync(Credentials credentials, String toAddress, @@ -66,20 +63,21 @@ public class Web3JService { }); } - public CompletableFuture getBalance(String address) { - return web3j().ethGetBalance(address, DefaultBlockParameterName.LATEST).sendAsync(); + public CompletableFuture getBalance(String address) { + return web3j().ethGetBalance(address, DefaultBlockParameterName.LATEST) + .sendAsync() + .thenApplyAsync(this::validateResponse) + .thenApplyAsync(EthGetBalance::getBalance) + .thenApplyAsync(BigInteger::toString); } public CompletableFuture> sendSignedTransaction(String transaction) { return web3j().ethSendRawTransaction(transaction) .sendAsync() + .thenApplyAsync(this::validateResponse) .thenApplyAsync(ethSendTransaction -> { - if (ethSendTransaction.getError() != null) { - throw new RuntimeException(ethSendTransaction.getError().getMessage()); - } else { - log.info(SIGNED_TRANSACTION_SENT, ethSendTransaction.getTransactionHash()); - return new TransactionResponseDto<>(ethSendTransaction.getTransactionHash(), ""); - } + log.info(SIGNED_TRANSACTION_SENT, ethSendTransaction.getTransactionHash()); + return new TransactionResponseDto<>(ethSendTransaction.getTransactionHash(), ""); }).exceptionally(ex -> { throw new TransactionFailedException(SIGNED_TRANSACTION_FAIL, ex.getCause().getMessage()); }); @@ -88,6 +86,7 @@ public class Web3JService { public CompletableFuture getSingleTransactionInfo(String hash) { return web3j().ethGetTransactionReceipt(hash) .sendAsync() + .thenApplyAsync(this::validateResponse) .thenApplyAsync(EthGetTransactionReceipt::getTransactionReceipt) .thenApplyAsync(trOpt -> trOpt.orElseThrow(() -> new TransactionNotFoundException(hash))); } @@ -95,6 +94,7 @@ public class Web3JService { public CompletableFuture getTransactionCount(String address) { return web3j().ethGetTransactionCount(address, PENDING) .sendAsync() + .thenApplyAsync(this::validateResponse) .thenApplyAsync(EthGetTransactionCount::getTransactionCount); } @@ -107,21 +107,43 @@ public class Web3JService { dto.getTo(), dto.getValue(), dto.getData() - )).sendAsync().thenComposeAsync(ethEstimateGas -> web3j().ethGasPrice() - .sendAsync() - .thenApplyAsync(ethGasPrice -> EstimateGasResponseDto.builder() - .gasConsumed(ethEstimateGas.getAmountUsed()) - .gasPrice(ethGasPrice.getGasPrice()) - .build() - )); + )).sendAsync() + .thenApplyAsync(this::validateResponse) + .thenComposeAsync(ethEstimateGas -> web3j().ethGasPrice() + .sendAsync() + .thenApplyAsync(this::validateResponse) + .thenApplyAsync(ethGasPrice -> EstimateGasResponseDto.builder() + .gasConsumed(ethEstimateGas.getAmountUsed()) + .gasPrice(ethGasPrice.getGasPrice()) + .build() + )); } public CompletableFuture getCurrentBlockNumber() { return web3j().ethBlockNumber().sendAsync() + .thenApplyAsync(this::validateResponse) .thenApplyAsync(EthBlockNumber::getBlockNumber); } - private Web3j web3j() { + public CompletableFuture getCurrentBlockWithMargin() { + return getCurrentBlockNumber() + .thenApplyAsync(blockNumber -> blockNumber.compareTo(conf.getBlockSubscriptionMargin()) < 0 + ? blockNumber + : blockNumber.subtract(conf.getBlockSubscriptionMargin()) + ); + } + + T validateResponse(T response) { + Optional.ofNullable(response) + .filter(Response::hasError) + .ifPresent(r -> { + throw new RuntimeException(r.getError().getMessage()); + }); + + return response; + } + + public Web3j web3j() { return web3jProvider.get(); } } diff --git a/src/main/java/com/nynja/walletservice/service/listener/EventListener.java b/src/main/java/com/nynja/walletservice/service/listener/EventListener.java index 0b6082c..8fa6f90 100644 --- a/src/main/java/com/nynja/walletservice/service/listener/EventListener.java +++ b/src/main/java/com/nynja/walletservice/service/listener/EventListener.java @@ -24,7 +24,7 @@ public class EventListener implements ApplicationListener subscribe(); } - public void subscribe() { + private void subscribe() { eventHandlers.forEach(EventHandler::subscribe); } } diff --git a/src/main/java/com/nynja/walletservice/service/listener/MintEventHandler.java b/src/main/java/com/nynja/walletservice/service/listener/MintEventHandler.java index 1763476..166f4c3 100644 --- a/src/main/java/com/nynja/walletservice/service/listener/MintEventHandler.java +++ b/src/main/java/com/nynja/walletservice/service/listener/MintEventHandler.java @@ -24,7 +24,7 @@ public class MintEventHandler extends EventHandler @Override public void subscribe() { - web3JService.getCurrentBlockNumber() + web3JService.getCurrentBlockWithMargin() .thenAcceptAsync(blockNumber -> provider.getNynjaCoin().mintEventObservable( DefaultBlockParameter.valueOf(blockNumber), diff --git a/src/main/java/com/nynja/walletservice/service/listener/TransferEventHandler.java b/src/main/java/com/nynja/walletservice/service/listener/TransferEventHandler.java index c630722..71907a9 100644 --- a/src/main/java/com/nynja/walletservice/service/listener/TransferEventHandler.java +++ b/src/main/java/com/nynja/walletservice/service/listener/TransferEventHandler.java @@ -15,17 +15,29 @@ import org.springframework.beans.factory.annotation.Value; import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; import org.springframework.web.client.RestTemplate; +import org.web3j.abi.EventEncoder; +import org.web3j.abi.EventValues; +import org.web3j.abi.TypeReference; +import org.web3j.abi.datatypes.Address; +import org.web3j.abi.datatypes.Event; +import org.web3j.abi.datatypes.generated.Uint256; import org.web3j.protocol.core.DefaultBlockParameter; import org.web3j.protocol.core.DefaultBlockParameterName; +import org.web3j.protocol.core.methods.request.EthFilter; +import org.web3j.protocol.core.methods.response.Log; +import java.math.BigInteger; +import java.util.Arrays; import java.util.Base64; +import static org.web3j.tx.Contract.staticExtractEventParameters; + /** * @author Oleg Zhymolokhov (oleg.zhimolokhov@dataart.com) */ @Slf4j @Component -public class TransferEventHandler extends EventHandler { +public class TransferEventHandler extends EventHandler { @Autowired private ContractProvider provider; @@ -40,32 +52,54 @@ public class TransferEventHandler extends EventHandler - provider.getNynjaCoin().transferEventObservable( - DefaultBlockParameter.valueOf(blockNumber), - DefaultBlockParameterName.LATEST - ).subscribe(this, ex -> log.error(ex.getMessage(), ex)) + web3JService.getCurrentBlockWithMargin() + .thenAcceptAsync(blockNumber -> { + event = new Event("Transfer", + Arrays.asList(new TypeReference
() {}, new TypeReference
() {}), + Arrays.asList(new TypeReference() {})); + + EthFilter filter = new EthFilter( + DefaultBlockParameter.valueOf(blockNumber), + DefaultBlockParameterName.LATEST, + provider.getNynjaAddress() + ); + filter.addSingleTopic(EventEncoder.encode(event)); + web3JService.web3j().ethLogObservable(filter) + .subscribe(this, ex -> log.error(ex.getMessage(), ex)); + } ); } @Override - public void call(NynjaCoin.TransferEventResponse response) { - log.info("{} Nynja-coins has been transferred from {} to {}", response.value, response.from, response.to ); + public void call(Log response) { + NynjaCoin.TransferEventResponse typedResponse = extractEvent(response); - tokenService.tokenBalanceByAddress(response.to) - .thenAcceptAsync(balance -> sendMessageToMessageBroker(response, balance)); + log.info("{} Nynja-coins has been transferred from {} to {}. Transaction hash: {}", + typedResponse.value, typedResponse.from, typedResponse.to, response.getTransactionHash()); + + tokenService.tokenBalanceByAddress(typedResponse.to).thenAcceptAsync(balance -> + sendMessageToMessageBroker(typedResponse, balance, response.getTransactionHash())); } + private NynjaCoin.TransferEventResponse extractEvent(Log response) { + EventValues eventValues = staticExtractEventParameters(event, response); + NynjaCoin.TransferEventResponse typedResponse = new NynjaCoin.TransferEventResponse(); + typedResponse.from = (String) eventValues.getIndexedValues().get(0).getValue(); + typedResponse.to = (String) eventValues.getIndexedValues().get(1).getValue(); + typedResponse.value = (BigInteger) eventValues.getNonIndexedValues().get(0).getValue(); + return typedResponse; + } - private void sendMessageToMessageBroker(NynjaCoin.TransferEventResponse response, long balance) { + private void sendMessageToMessageBroker(NynjaCoin.TransferEventResponse response, BigInteger balance, String txHash) { RestTemplate template = new RestTemplate(); ResponseEntity mbResponse = template.postForEntity( messageBrokerUrl, - prepareRequest(response, balance), + prepareRequest(response, balance, txHash), MessageBrokerResponse.class ); @@ -79,12 +113,13 @@ public class TransferEventHandler extends EventHandler { - private String ownerAddress; + private String address; - public BalanceOperation(Credentials senderCredentials, String ownerAddress) { + public BalanceOperation(Credentials senderCredentials, String address) { super(senderCredentials); - this.ownerAddress = ownerAddress; + this.address = address; } @Override protected CompletableFuture execute(NynjaCoin token) { - return token.balanceOf(ownerAddress).sendAsync(); + return token.balanceOf(address).sendAsync(); } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 420b7e2..011ecda 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -28,6 +28,8 @@ ethereum: nynja-coin-address: "0xF36c7AdAd65c39A4848F3c85001F67f31d00d207" gas-price: 41000000000 gas-limit: 400000 + event: + margin: 20 wallet-discovery: search-job-pool-size: 2 etherscan: -- GitLab