diff --git a/app/focus/build.gradle b/app/focus/build.gradle index 75726506deba793e8fa685a69c55366392ec0546..0edddce39da09d69d2552ba3ff2b1271bf953cd0 100644 --- a/app/focus/build.gradle +++ b/app/focus/build.gradle @@ -20,7 +20,8 @@ task confereceFocus(type: CreateStartScripts) { outputDir = new File(project.buildDir, 'tmp') classpath = jar.outputs.files + project.configurations.runtime defaultJvmOpts = [ - "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager" + "-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager", + "-enableassertions:com.nynjacoin.nccs.conferencefactory..." ] } diff --git a/app/focus/src/main/java/com/nynjacoin/nccs/app/focus/FocusApp.java b/app/focus/src/main/java/com/nynjacoin/nccs/app/focus/FocusApp.java index 7e919efd3c20665fcbb52a8434a63e39290fc98d..cfb9997fa9c4e094dd4e7ebfd88bb410c0e1de0a 100644 --- a/app/focus/src/main/java/com/nynjacoin/nccs/app/focus/FocusApp.java +++ b/app/focus/src/main/java/com/nynjacoin/nccs/app/focus/FocusApp.java @@ -35,6 +35,7 @@ import com.nynjacoin.nccs.lib.grpcutil.MetadataServerInterceptor; import com.nynjacoin.nccs.lib.vertxutil.restclient.push.PushNotificationSender; import com.nynjacoin.nccs.lib.vertxutil.app.VertxApp; import com.nynjacoin.nccs.lib.grpcutil.WebApiServer; +import com.nynjacoin.nccs.lib.vertxutil.restclient.roomlink.RoomLinkClient; import com.nynjacoin.nccs.mediacontroller.MediaController; import com.nynjacoin.nccs.protocol.chp.CallHistoryDeletion; import com.nynjacoin.nccs.protocol.chp.CallHistoryRecord; @@ -44,6 +45,7 @@ import com.nynjacoin.nccs.protocol.metadata.CommonMetadataKey; import com.nynjacoin.nccs.stateholder.api.StateHolderGrpcApi; import com.nynjacoin.nccs.stateholder.api.StateHolderWebApi; import com.nynjacoin.nccs.stateholder.service.StateHolderVerticle; +import com.nynjacoin.nccs.stateholder.statepackage.RoomPublisher; import com.nynjacoin.nccs.stateholder.statepackage.UserActivityPublisher; import io.grpc.ServerInterceptors; import io.vertx.core.Vertx; @@ -91,8 +93,15 @@ public class FocusApp extends VertxApp { JoinLinkComposer joinLinkComposer = new JoinLinkComposer(config.getString(ConfigKey.JOIN_LINK_BASE_URL.key())); + ChatRoomClient chatRoomClient = ChatRoomClient.create(vertx, config); + ConferenceFactoryVerticle conferenceFactoryVerticle = - new ConferenceFactoryVerticle(config,joinLinkComposer); + new ConferenceFactoryVerticle( + config, + joinLinkComposer, + new RoomPublisher(stateHolderVerticle), + RoomLinkClient.create(vertx, config), + chatRoomClient); vertx.deployVerticle(conferenceFactoryVerticle); PushNotificationSender pushNotificationSender = @@ -107,11 +116,6 @@ public class FocusApp extends VertxApp { FocusInternalApiClient focusInternalApiClient = new FocusInternalApiClient(); - ChatRoomClient chatRoomClient = - config.getBoolean(ConfigKey.CHAT_ROOM_ENABLED.key(), true) - ? ChatRoomClient.create(vertx, config) - : null; - ConferenceControlVerticle conferenceControlVerticle = new ConferenceControlVerticle( conferenceFactoryVerticle, @@ -121,9 +125,12 @@ public class FocusApp extends VertxApp { joinLinkComposer, historyEventsProcessor, bubbleSender, - chatRoomClient); + chatRoomClient, + config.getBoolean(ConfigKey.CHAT_ROOM_ENABLED.key(), true)); vertx.deployVerticle(conferenceControlVerticle); + conferenceFactoryVerticle.setConferenceControlVerticle(conferenceControlVerticle); + MediaController mediaController = new MediaController(vertx, config); Focus focus = new Focus(conferenceControlVerticle, stateHolderVerticle, mediaController); CallVerticle callVerticle = new CallVerticle(focus); diff --git a/app/focus/src/main/resources/nccs-config.json b/app/focus/src/main/resources/nccs-config.json index 91286186db00ba11b878edfd6e6d09f82fed7fc7..7446cca590b95185d53670e1f163c8b276126619 100644 --- a/app/focus/src/main/resources/nccs-config.json +++ b/app/focus/src/main/resources/nccs-config.json @@ -16,5 +16,7 @@ "chat_room_key": "nynja:nynjaTS", "bubble_enabled": false, "bubble_url": "http://nynja-dev-uw1-messaging01.dev-eu.nynja.net:8888/cri/bubbles", - "bubble_key": "nynja:nynjaTS" + "bubble_key": "nynja:nynjaTS", + "room_link_url": "http://nynja-dev-uw1-messaging01.dev-eu.nynja.net:8888/link", + "room_link_key": "nynja:nynjaTS" } diff --git a/charts/calling-service/Chart.yaml b/charts/calling-service/Chart.yaml index 02a88a1c57755946ffb202023b25f710244fe439..c352b3d6c1986ddbb57f0a4e2b1e0fc69c11f80a 100644 --- a/charts/calling-service/Chart.yaml +++ b/charts/calling-service/Chart.yaml @@ -2,4 +2,4 @@ apiVersion: v1 appVersion: "1.0" description: Calling service Helm chart name: calling-service -version: 0.2.0 +version: 0.2.1 diff --git a/charts/calling-service/templates/deployment.yaml b/charts/calling-service/templates/deployment.yaml index ff33e35ddbb3ce8871863a26ebb89da02384d640..52bfbf123b5010bbca7973450f74ea203fef5786 100644 --- a/charts/calling-service/templates/deployment.yaml +++ b/charts/calling-service/templates/deployment.yaml @@ -73,6 +73,12 @@ spec: value: "{{ .Values.chatroom.url }}" - name: chat_room_key value: "{{ .Values.chatroom.key }}" + - name: room_link_enabled + value: "{{ .Values.roomlink.enabled }}" + - name: room_link_url + value: "{{ .Values.roomlink.url }}" + - name: room_link_key + value: "{{ .Values.roomlink.key }}" args: - "sleep 5;cd /app; bin/nccs-focus" diff --git a/charts/calling-service/values.yaml b/charts/calling-service/values.yaml index 8fa88687c1e8b8610e9e25d034db6e997095fdf6..c3ae0259b01f82a4ec24e93438704f6f08d1ba84 100644 --- a/charts/calling-service/values.yaml +++ b/charts/calling-service/values.yaml @@ -40,6 +40,14 @@ push: host: messaging-service.messaging.svc.cluster.local apiKey: nynja apiSecret: secret +chatroom: + enabled: true + url: http://messaging-service.messaging.svc.cluster.local:8888/cri/rooms + key: nynja:nynjaTS +roomlink: + enabled: true + url: http://messaging-service.messaging.svc.cluster.local:8888/link + key: nynja:nynjaTS nodeSelector: {} diff --git a/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/api/ConferenceControlGrpcApi.java b/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/api/ConferenceControlGrpcApi.java index bf3e3e6bff33bf4822ba9c9e56539a4e40ff37a3..84ee54bd4d63359b7b8e401f993c2f8bf5172c85 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/api/ConferenceControlGrpcApi.java +++ b/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/api/ConferenceControlGrpcApi.java @@ -2,8 +2,6 @@ package com.nynjacoin.nccs.conferencecontrol.api; import com.nynjacoin.nccs.conferencecontrol.service.ConferenceControlVerticle; import com.nynjacoin.nccs.lib.grpcutil.GrpcContextKeys; -import com.nynjacoin.nccs.protocol.ccp.AddMemberRequest; -import com.nynjacoin.nccs.protocol.ccp.AddMemberResponse; import com.nynjacoin.nccs.protocol.ccp.BindByLinkRequest; import com.nynjacoin.nccs.protocol.ccp.BindByLinkResponse; import com.nynjacoin.nccs.protocol.ccp.ConferenceGrpc; @@ -15,6 +13,8 @@ import com.nynjacoin.nccs.protocol.ccp.ManageParticipanRequest; import com.nynjacoin.nccs.protocol.ccp.RejectRequest; import com.nynjacoin.nccs.protocol.ccp.RemoveMemberRequest; import com.nynjacoin.nccs.protocol.ccp.UpdateConferenceInfoRequest; +import com.nynjacoin.nccs.protocol.def.AddMemberRequest; +import com.nynjacoin.nccs.protocol.def.AddMemberResponse; import com.nynjacoin.nccs.protocol.def.MemberInfo; import com.nynjacoin.nccs.protocol.def.Void; import io.vertx.core.Future; @@ -43,11 +43,17 @@ public class ConferenceControlGrpcApi @Override public void addMember(AddMemberRequest request, Future response) { - String accountId = GrpcContextKeys.ACCOUNT_ID.get(); String conferenceId = getConferenceId(); - - conferenceControlVerticle.addMember(accountId, conferenceId, request, response); + conferenceControlVerticle.runOnContext( + () -> conferenceControlVerticle.conferenceControl.addMember( + accountId, + conferenceId, + request, + response), + response, + GrpcContextKeys.LOG_CONTEXT.get() + ); } @Override diff --git a/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/api/ConferenceControlWebApi.java b/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/api/ConferenceControlWebApi.java index f7f57317a049a71a0b060b402097969e48651641..b2e8bbec1c0099c1354644c94e058baecb25ef27 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/api/ConferenceControlWebApi.java +++ b/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/api/ConferenceControlWebApi.java @@ -9,8 +9,6 @@ import com.nynjacoin.nccs.lib.grpcutil.WebApi; import com.nynjacoin.nccs.lib.grpcutil.WebEventTarget; import com.nynjacoin.nccs.lib.grpcutil.WebMetadata; import com.nynjacoin.nccs.lib.grpcutil.WebResponseHandler; -import com.nynjacoin.nccs.protocol.ccp.AddMemberRequest; -import com.nynjacoin.nccs.protocol.ccp.AddMemberResponse; import com.nynjacoin.nccs.protocol.ccp.BindByLinkRequest; import com.nynjacoin.nccs.protocol.ccp.BindByLinkResponse; import com.nynjacoin.nccs.protocol.ccp.GetMembersRequest; @@ -21,6 +19,8 @@ import com.nynjacoin.nccs.protocol.ccp.ManageParticipanRequest; import com.nynjacoin.nccs.protocol.ccp.RejectRequest; import com.nynjacoin.nccs.protocol.ccp.RemoveMemberRequest; import com.nynjacoin.nccs.protocol.ccp.UpdateConferenceInfoRequest; +import com.nynjacoin.nccs.protocol.def.AddMemberRequest; +import com.nynjacoin.nccs.protocol.def.AddMemberResponse; import com.nynjacoin.nccs.protocol.def.MemberInfo; import com.nynjacoin.nccs.protocol.def.Void; import com.nynjacoin.nccs.protocol.error.NccsWrappingException; @@ -127,18 +127,18 @@ public class ConferenceControlWebApi implements WebApi { AddMemberRequest request = AddMemberRequest.parseFrom(bodyBuffer.getBytes()); Future response = WebResponseHandler.future(ctx); - Context logGrpcContext = Context.current() - .withValue(GrpcContextKeys.LOG_CONTEXT, webMetadata.getLogContext()); - Context prevGrpcContext = logGrpcContext.attach(); - - try (final CloseableThreadContext.Instance tc = - CloseableThreadContext.putAll(webMetadata.getLogContext())) { - - LOGGER.debug("AddMember {{}}", TextFormat.shortDebugString(request)); - conferenceControlVerticle.addMember(accountId, conferenceId, request, response); - } finally { - logGrpcContext.detach(prevGrpcContext); - } + conferenceControlVerticle.runOnContext( + () -> { + LOGGER.debug("AddMember {{}}", TextFormat.shortDebugString(request)); + conferenceControlVerticle.conferenceControl.addMember( + accountId, + conferenceId, + request, + response); + }, + response, + webMetadata.getLogContext() + ); })); } diff --git a/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/service/Conference.java b/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/service/Conference.java index a62902c4391771285436b6096d4e934cc9e27a10..e395e9bb73d2e4a071d529f4c200b59bcc1f9976 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/service/Conference.java +++ b/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/service/Conference.java @@ -10,8 +10,7 @@ import com.nynjacoin.nccs.lib.util.InstanceLogger; import com.nynjacoin.nccs.lib.util.UuidGenerator; import com.nynjacoin.nccs.lib.vertxutil.restclient.bubble.BubbleSender.AnswerStatus; import com.nynjacoin.nccs.lib.vertxutil.restclient.bubble.BubbleSender.ContentType; -import com.nynjacoin.nccs.protocol.ccp.AddMemberRequest; -import com.nynjacoin.nccs.protocol.ccp.AddMemberResponse; +import com.nynjacoin.nccs.lib.vertxutil.restclient.chatroom.ChatRoomClient; import com.nynjacoin.nccs.protocol.ccp.BindByLinkRequest; import com.nynjacoin.nccs.protocol.ccp.BindByLinkResponse; import com.nynjacoin.nccs.protocol.ccp.GetMembersRequest; @@ -22,6 +21,8 @@ import com.nynjacoin.nccs.protocol.ccp.ManageParticipanRequest; import com.nynjacoin.nccs.protocol.ccp.RejectRequest; import com.nynjacoin.nccs.protocol.ccp.RemoveMemberRequest; import com.nynjacoin.nccs.protocol.ccp.UpdateConferenceInfoRequest; +import com.nynjacoin.nccs.protocol.def.AddMemberRequest; +import com.nynjacoin.nccs.protocol.def.AddMemberResponse; import com.nynjacoin.nccs.protocol.def.Address; import com.nynjacoin.nccs.protocol.def.Address.Type; import com.nynjacoin.nccs.protocol.def.CallEndedBy; @@ -46,6 +47,7 @@ import io.vertx.core.Vertx; import io.vertx.grpc.GrpcWriteStream; import io.prometheus.client.Gauge; +import java.util.ArrayList; import java.util.Base64; import java.util.Collections; import java.util.Date; @@ -61,7 +63,7 @@ class Conference { private static final Logger LOGGER = LogManager.getLogger(Conference.class); - static final Gauge CONFERENCE_ACTIVE = Gauge.build() + private static final Gauge CONFERENCE_ACTIVE = Gauge.build() .name("nccs_conference_active") .help("Active conference calls") .create(); @@ -76,6 +78,7 @@ class Conference { private final StateHolderVerticle stateHolderVerticle; private final ConferencePublisher conferencePublisher; private final FocusInternalApiClient focusInternalApiClient; + private final ChatRoomClient chatRoomClient; private final Map members; private final Map participants; private ChatRoomManager chatRoomManager; @@ -86,13 +89,15 @@ class Conference { ConferenceControl parent, ConferenceInfo conferenceInfo, StateHolderVerticle stateHolderVerticle, - FocusInternalApiClient focusInternalApiClient) { + FocusInternalApiClient focusInternalApiClient, + ChatRoomClient chatRoomClient) { this.parent = parent; this.conferenceInfo = conferenceInfo; this.stateHolderVerticle = stateHolderVerticle; this.conferencePublisher = new ConferencePublisher(stateHolderVerticle); this.focusInternalApiClient = focusInternalApiClient; + this.chatRoomClient = chatRoomClient; this.members = new HashMap<>(); this.participants = new HashMap<>(); this.userConferenceTemplate = null; @@ -120,6 +125,34 @@ class Conference { return hasReadAccess(accountId); } + // TODO: addMember() updateSubject()... should follow end() suite by calling this method + // TODO: and letting room members do changes; at that point we should remove sync methods + // TODO: hasChangeAccess and authorizeChangeAccess and rename xxxAsync() to xxx() + private Future authorizeChangeAccessAsync(String accountId) { + return hasChangeAccessAsync(accountId) + .compose(hasChangeAccess -> + hasChangeAccess ? + Future.succeededFuture() : Future.failedFuture(NccsError.UNAUTHORIZED.exception())); + } + + private Future hasChangeAccessAsync(String accountId) { + String roomId = conferenceInfo.getRoomId(); + if (roomId.isEmpty()) { + return Future.succeededFuture(hasChangeAccess(accountId)); + } + if (isOwner(accountId)) { + return Future.succeededFuture(Boolean.TRUE); + } + return chatRoomClient.isMember(conferenceInfo.getOwnerAccountId(), roomId, accountId); + } + + private void authorizeChangeAccess(String accountId) { + if (hasChangeAccess(accountId)) { + return; + } + throw NccsError.UNAUTHORIZED.exception(); + } + private boolean authorizeJoin(String accountId, Member member) { return member != null && @@ -150,24 +183,9 @@ class Conference { response.complete(Void.getDefaultInstance()); } - void addMember(String accountId, AddMemberRequest request, - Future response) { - - if (!hasChangeAccess(accountId)) { - response.fail(NccsError.UNAUTHORIZED.exception()); - return; - } - Future addFuture = Future.future(); - addFuture.setHandler(event -> { - if (event.succeeded()) { - response.complete(AddMemberResponse.newBuilder() - .setMemberId(event.result().getMemberId()) - .build()); - } else { - response.fail(event.cause()); - } - }); - addMember(request, addFuture); + String addMember(String accountId, AddMemberRequest request) { + authorizeChangeAccess(accountId); + return addMember(request).getMemberId(); } void bindByLink( @@ -188,17 +206,8 @@ class Conference { logger.info("Bound by link: {{}}", TextFormat.shortDebugString(address)); } - Future addFuture = Future.future(); - addFuture.setHandler(event -> { - if (event.succeeded()) { - response.complete(BindByLinkResponse.newBuilder() - .setMemberId(event.result().getMemberId()) - .build()); - } else { - response.fail(event.cause()); - } - }); - addMember(addMemberRequest, addFuture); + String memberId = addMember(addMemberRequest).getMemberId(); + response.complete(BindByLinkResponse.newBuilder().setMemberId(memberId).build()); } void removeMember(String accountId, RemoveMemberRequest request, Future response) { @@ -275,17 +284,17 @@ class Conference { } public void end(String accountId, Void request, Future response) { - if (!isOwner(accountId)) { - response.fail(NccsError.UNAUTHORIZED.exception()); - return; - } - if (conferenceInfo.isOngoing()) { - conferenceCompleted(); - focusInternalApiClient.endConference(conferenceInfo.getConferenceId()); - } else { - selfRemove(); - } - response.complete(Void.getDefaultInstance()); + authorizeChangeAccessAsync(accountId).compose( + ignoredVoid -> { + if (conferenceInfo.isOngoing()) { + conferenceCompleted(); + focusInternalApiClient.endConference(conferenceInfo.getConferenceId()); + } else { + selfRemove(); + } + response.complete(Void.getDefaultInstance()); + }, + response); } void manageParticipant(String accountId, ManageParticipanRequest request, Future response) { @@ -379,32 +388,41 @@ class Conference { CONFERENCE_ACTIVE.dec(); } - private void addMember(AddMemberRequest request, Future response) { - getMemberByAddress(request.getAddress()).ifPresentOrElse( - member -> response.complete(member.getInfo()) - , - () -> { + List addMembers(String accountId, List memberRequests) { + authorizeChangeAccess(accountId); + List result = new ArrayList<>(); + for (var request : memberRequests) { + MemberInfo member = addMember(request); + result.add(AddMemberResponse.newBuilder() + .setMemberId(member.getMemberId()) + .setAddress(request.getAddress()) + .build()); + } + return result; + } + + private MemberInfo addMember(AddMemberRequest request) { + Member member = getMemberByAddress(request.getAddress()) + .orElseGet(() -> { + if (conferenceInfo.getMembersCountLimit() > 0 + && members.size() >= conferenceInfo.getMembersCountLimit()) { + + throw NccsError.CONFERENCE_MEMBERS_LIMIT_REACHED.exception(); + } MemberInfo memberInfo = MemberInfo.newBuilder() .setMemberId("mbr_" + UuidGenerator.generate()) .setAddress(request.getAddress()) .setDisplayName(request.getDisplayName()) .addAllOption(request.getOptionList()) .build(); - if (conferenceInfo.getMembersCountLimit() > 0 - && members.size() >= conferenceInfo.getMembersCountLimit()) { - response.fail(NccsError.CONFERENCE_MEMBERS_LIMIT_REACHED.exception()); - return; - } - addMember(memberInfo); - response.complete(memberInfo); + return addMember(memberInfo); }); + return member.getInfo(); } private Address getBindByLinkAddress(String accountId) { - final boolean isAnonymous = accountId.toLowerCase() - .startsWith(ConferenceInfo.ANONYMOUS_PREFIX); return Address.newBuilder() - .setType(isAnonymous ? Type.ANONYMOUS : Type.ACCOUNT) + .setType(ConferenceInfo.isAnonymous(accountId) ? Type.ANONYMOUS : Type.ACCOUNT) .setValue(accountId) .build(); } @@ -443,7 +461,7 @@ class Conference { .anyMatch(memberInfo -> isMatchingAccountAddress(memberInfo.getAddress(), accountId)); } - private void addMember(MemberInfo memberInfo) { + private Member addMember(MemberInfo memberInfo) { Member member = new Member(memberInfo, conferenceVersion); members.put(memberInfo.getMemberId(), member); if (conferenceInfo.isOngoing()) { @@ -452,6 +470,7 @@ class Conference { .addMember(conferenceInfo.getConferenceId(), memberInfo, conferenceVersion); chatRoomManager.onMemberAdded(member.getAddress()); } + return member; } private void notifyMembersForStart() { diff --git a/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/service/ConferenceControl.java b/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/service/ConferenceControl.java index 46a898b7b15199cc12da03c8c7f6167cb68c43e3..0a2f81cb3c86ceb4b6d31216e083957b0ee461e7 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/service/ConferenceControl.java +++ b/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/service/ConferenceControl.java @@ -7,8 +7,6 @@ import com.nynjacoin.nccs.focus.internal.api.client.FocusInternalApiClient; import com.nynjacoin.nccs.lib.vertxutil.restclient.bubble.BubbleSender; import com.nynjacoin.nccs.lib.vertxutil.restclient.chatroom.ChatRoomClient; import com.nynjacoin.nccs.lib.vertxutil.restclient.push.PushNotificationSender; -import com.nynjacoin.nccs.protocol.ccp.AddMemberRequest; -import com.nynjacoin.nccs.protocol.ccp.AddMemberResponse; import com.nynjacoin.nccs.protocol.ccp.BindByLinkRequest; import com.nynjacoin.nccs.protocol.ccp.BindByLinkResponse; import com.nynjacoin.nccs.protocol.ccp.GetMembersRequest; @@ -19,6 +17,8 @@ import com.nynjacoin.nccs.protocol.ccp.ManageParticipanRequest; import com.nynjacoin.nccs.protocol.ccp.RejectRequest; import com.nynjacoin.nccs.protocol.ccp.RemoveMemberRequest; import com.nynjacoin.nccs.protocol.ccp.UpdateConferenceInfoRequest; +import com.nynjacoin.nccs.protocol.def.AddMemberRequest; +import com.nynjacoin.nccs.protocol.def.AddMemberResponse; import com.nynjacoin.nccs.protocol.def.MemberInfo; import com.nynjacoin.nccs.protocol.def.Void; import com.nynjacoin.nccs.protocol.error.NccsError; @@ -28,11 +28,16 @@ import com.nynjacoin.nccs.stateholder.service.StateHolderVerticle; import io.vertx.core.Future; import io.vertx.grpc.GrpcWriteStream; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class ConferenceControl { + private static final Logger LOGGER = LogManager.getLogger(ConferenceControl.class); + private final ConferenceFactoryVerticle conferenceFactoryVerticle; private final StateHolderVerticle stateHolderVerticle; @@ -43,15 +48,17 @@ public class ConferenceControl { private final HistoryEventsProcessor history; private final BubbleSender bubbleSender; private final ChatRoomClient chatRoomClient; + private final Boolean isChatRoomIntegrationEnabled; - public ConferenceControl( + ConferenceControl( ConferenceFactoryVerticle conferenceFactoryVerticle, StateHolderVerticle stateHolderVerticle, FocusInternalApiClient focusInternalApiClient, PushNotificationSender pushClient, JoinLinkComposer joinLinkComposer, HistoryEventsProcessor history, BubbleSender bubbleSender, - ChatRoomClient chatRoomClient) { + ChatRoomClient chatRoomClient, + Boolean isChatRoomIntegrationEnabled) { this.conferenceFactoryVerticle = conferenceFactoryVerticle; this.stateHolderVerticle = stateHolderVerticle; @@ -61,6 +68,7 @@ public class ConferenceControl { this.history = history; this.bubbleSender = bubbleSender; this.chatRoomClient = chatRoomClient; + this.isChatRoomIntegrationEnabled = isChatRoomIntegrationEnabled; Conference.registerMetrics(); } @@ -77,8 +85,6 @@ public class ConferenceControl { return history; } - ChatRoomClient getChatRoomClient() { return chatRoomClient; } - public void getConferenceInfo(String conferenceId, Future future) { @@ -90,7 +96,7 @@ public class ConferenceControl { } } - public void getMembers(String accountId, String conferenceId, GetMembersRequest request, + void getMembers(String accountId, String conferenceId, GetMembersRequest request, GrpcWriteStream response) { Conference conference = conferences.get(conferenceId); @@ -101,7 +107,7 @@ public class ConferenceControl { } } - public void getParticipant(String conferenceId, String participantId, + void getParticipant(String conferenceId, String participantId, Future future) { Conference conference = conferences.get(conferenceId); @@ -117,7 +123,7 @@ public class ConferenceControl { } } - public void updateConferenceInfo(String accountId, String conferenceId, + void updateConferenceInfo(String accountId, String conferenceId, UpdateConferenceInfoRequest request, Future response) { @@ -140,7 +146,11 @@ public class ConferenceControl { return; } Conference conference = conferenceResponse.result(); - conference.addMember(accountId, request, response); + String memberId = conference.addMember(accountId, request); + response.complete(AddMemberResponse.newBuilder() + .setMemberId(memberId) + .setAddress(request.getAddress()) + .build()); }); } @@ -158,7 +168,7 @@ public class ConferenceControl { } } - public void removeMember(String accountId, String conferenceId, RemoveMemberRequest request, + void removeMember(String accountId, String conferenceId, RemoveMemberRequest request, Future response) { Conference conference = conferences.get(conferenceId); @@ -221,7 +231,7 @@ public class ConferenceControl { } } - Future getConference(String accountId, String conferenceId) { + private Future getConference(String accountId, String conferenceId) { Future conferenceFuture = Future.future(); Conference conference = conferences.get(conferenceId); if (conference != null) { @@ -232,7 +242,7 @@ public class ConferenceControl { return conferenceFuture; } - void handleConference(String accountId, Conference conference, + private void handleConference(String accountId, Conference conference, Future future) { if (!conference.getConferenceInfo().getOwnerAccountId().equals(accountId)) { @@ -255,14 +265,7 @@ public class ConferenceControl { handleConference(accountId, conference, conferenceFuture); } else { ConferenceInfo conferenceInfo = event.result(); - conference = - new Conference( - this, - conferenceInfo, - stateHolderVerticle, - focusInternalApiClient); - conference.setChatRoomManager(getChatRoomManager(conference)); - conferences.put(conference.getConferenceInfo().getConferenceId(), conference); + conference = createAndStoreConference(conferenceInfo); conferenceFuture.complete(conference); } } @@ -270,6 +273,71 @@ public class ConferenceControl { conferenceFactoryVerticle.getConferenceInfo(conferenceId, conferenceInfoFuture); } + private Conference createAndStoreConference(ConferenceInfo conferenceInfo) { + Conference conference = + new Conference( + this, + conferenceInfo, + stateHolderVerticle, + focusInternalApiClient, + chatRoomClient); + conference.setChatRoomManager(getChatRoomManager(conference)); + conferences.put(conference.getConferenceInfo().getConferenceId(), conference); + return conference; + } + + void createAddMembersStart( + ConferenceInfo conferenceInfo, + List members, + boolean start, + Future> result) { + + Conference conference = getOrCreate(conferenceInfo); + String ownerAccountId = conferenceInfo.getOwnerAccountId(); + List addMembersResult; + try { + addMembersResult = conference.addMembers(ownerAccountId, members); + } catch (Exception e) { + cleanupConference(conferenceInfo.getConferenceId()); + result.fail(e); + return; + } + if (start) { + Future startFuture = Future.future(); + conference.start(ownerAccountId, Void.getDefaultInstance(), startFuture); + startFuture.setHandler(startResult -> { + if (startResult.succeeded()) { + result.complete(addMembersResult); + } else { + cleanupConference(conferenceInfo.getConferenceId()); + result.fail(startResult.cause()); + } + }); + } else { + result.complete(addMembersResult); + } + } + + private void cleanupConference(String conferenceId) { + Conference conference = conferences.remove(conferenceId); + try { + conference.conferenceCompleted(); + } catch (Exception e) { + LOGGER.error("Failed to complete created conference after another failure", e); + } + } + + private Conference getOrCreate(ConferenceInfo ci) { + Conference conference = conferences.get(ci.getConferenceId()); + if (conference == null) { + return createAndStoreConference(ci); + } + if (!ci.getOwnerAccountId().equals(conference.getConferenceInfo().getOwnerAccountId())) { + throw NccsError.INTERNAL.exception("Unexpected owner mismatch"); + } + return conference; + } + public void join(String accountId, String conferenceId, JoinRequest request, Future response) { @@ -292,7 +360,7 @@ public class ConferenceControl { } } - public void conferenceCompleted(String conferenceId) { + void conferenceCompleted(String conferenceId) { Conference conference = conferences.get(conferenceId); if (conference == null) { return; @@ -307,7 +375,7 @@ public class ConferenceControl { private ChatRoomManager getChatRoomManager(Conference conference) { ConferenceInfo ci = conference.getConferenceInfo(); - if (Objects.isNull(chatRoomClient)) { + if (!isChatRoomIntegrationEnabled) { // chat room management disabled by server config // TODO: remove this backward compatibility diff --git a/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/service/ConferenceControlVerticle.java b/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/service/ConferenceControlVerticle.java index b944eb6450c92f85cc889fdfc7b1d64c82b0a312..79a47ebe836bd36719cf913820ebde53177c0481 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/service/ConferenceControlVerticle.java +++ b/focus/src/main/java/com/nynjacoin/nccs/conferencecontrol/service/ConferenceControlVerticle.java @@ -9,14 +9,16 @@ import com.nynjacoin.nccs.lib.vertxdispatch.NccsVerticle; import com.nynjacoin.nccs.lib.vertxutil.restclient.bubble.BubbleSender; import com.nynjacoin.nccs.lib.vertxutil.restclient.chatroom.ChatRoomClient; import com.nynjacoin.nccs.lib.vertxutil.restclient.push.PushNotificationSender; -import com.nynjacoin.nccs.protocol.ccp.AddMemberRequest; -import com.nynjacoin.nccs.protocol.ccp.AddMemberResponse; +import com.nynjacoin.nccs.protocol.ccp.BindByLinkRequest; +import com.nynjacoin.nccs.protocol.ccp.BindByLinkResponse; import com.nynjacoin.nccs.protocol.ccp.GetMembersRequest; import com.nynjacoin.nccs.protocol.ccp.JoinRequest; import com.nynjacoin.nccs.protocol.ccp.JoinResponse; import com.nynjacoin.nccs.protocol.ccp.RejectRequest; import com.nynjacoin.nccs.protocol.ccp.RemoveMemberRequest; import com.nynjacoin.nccs.protocol.ccp.UpdateConferenceInfoRequest; +import com.nynjacoin.nccs.protocol.def.AddMemberRequest; +import com.nynjacoin.nccs.protocol.def.AddMemberResponse; import com.nynjacoin.nccs.protocol.def.MemberInfo; import com.nynjacoin.nccs.protocol.def.Void; import com.nynjacoin.nccs.stateholder.service.StateHolderVerticle; @@ -24,6 +26,7 @@ import io.vertx.core.Context; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.grpc.GrpcWriteStream; +import java.util.List; import org.apache.logging.log4j.CloseableThreadContext; import org.apache.logging.log4j.ThreadContext; @@ -40,7 +43,8 @@ public class ConferenceControlVerticle extends NccsVerticle { JoinLinkComposer joinLinkComposer, HistoryEventsProcessor history, BubbleSender bubbleSender, - ChatRoomClient chatRoomClient) { + ChatRoomClient chatRoomClient, + Boolean isChatRoomIntegrationEnabled) { this.focusInternalApiClient = focusInternalApiClient; this.conferenceControl = new ConferenceControl( @@ -51,7 +55,8 @@ public class ConferenceControlVerticle extends NccsVerticle { joinLinkComposer, history, bubbleSender, - chatRoomClient); + chatRoomClient, + isChatRoomIntegrationEnabled); } public void updateConferenceInfo(String accountId, String conferenceId, @@ -65,17 +70,6 @@ public class ConferenceControlVerticle extends NccsVerticle { }); } - public void addMember(String accountId, String conferenceId, AddMemberRequest request, - Future response) { - - final var logContext = GrpcContextKeys.LOG_CONTEXT.get(); - context.runOnContext(v -> { - try (final CloseableThreadContext.Instance tc = CloseableThreadContext.putAll(logContext)) { - conferenceControl.addMember(accountId, conferenceId, request, response); - } - }); - } - public void removeMember(String accountId, String conferenceId, RemoveMemberRequest request, Future response) { @@ -194,4 +188,33 @@ public class ConferenceControlVerticle extends NccsVerticle { } }); } + + public void createAddMembersStart( + ConferenceInfo conferenceInfo, + List members, + boolean startConference, + Future> result) { + + final var logContext = ThreadContext.getContext(); + context.runOnContext(v -> { + try (final CloseableThreadContext.Instance tc = CloseableThreadContext.putAll(logContext)) { + conferenceControl.createAddMembersStart(conferenceInfo, members, startConference, result); + } + }); + } + + public void bindByLink( + String accountId, + String conferenceId, + BindByLinkRequest request, + Future response) { + + final var logContext = ThreadContext.getContext(); + context.runOnContext(v -> { + try (final CloseableThreadContext.Instance tc = CloseableThreadContext.putAll(logContext)) { + conferenceControl.bindByLink(accountId, conferenceId, request, response); + } + }); + } + } diff --git a/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/api/ConferenceFactoryGrpcApi.java b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/api/ConferenceFactoryGrpcApi.java index e570967de250e41dc42a31330f51a435dc7c3d0a..fbf72fae97f80256804d75576f2e7ec4b06f29f0 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/api/ConferenceFactoryGrpcApi.java +++ b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/api/ConferenceFactoryGrpcApi.java @@ -3,29 +3,67 @@ package com.nynjacoin.nccs.conferencefactory.api; import com.nynjacoin.nccs.conferencefactory.service.ConferenceFactoryVerticle; import com.nynjacoin.nccs.lib.grpcutil.GrpcContextKeys; import com.nynjacoin.nccs.protocol.cfp.ConferenceFactoryGrpc; +import com.nynjacoin.nccs.protocol.cfp.CreateConferenceRequest; +import com.nynjacoin.nccs.protocol.cfp.CreateConferenceResponse; +import com.nynjacoin.nccs.protocol.cfp.EnterRoomRequest; +import com.nynjacoin.nccs.protocol.cfp.EnterRoomResponse; import com.nynjacoin.nccs.protocol.cfp.GetConferenceIdRequest; import com.nynjacoin.nccs.protocol.cfp.GetConferenceIdResponse; +import com.nynjacoin.nccs.protocol.cfp.GetLimitsRequest; +import com.nynjacoin.nccs.protocol.cfp.GetLimitsResponse; +import com.nynjacoin.nccs.protocol.cfp.GetRoomInfoRequest; +import com.nynjacoin.nccs.protocol.cfp.GetRoomInfoResponse; +import com.nynjacoin.nccs.protocol.cfp.StartConferenceRequest; +import com.nynjacoin.nccs.protocol.cfp.StartConferenceResponse; import io.vertx.core.Future; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; public class ConferenceFactoryGrpcApi extends ConferenceFactoryGrpc.ConferenceFactoryVertxImplBase { - private static final Logger LOGGER = LogManager.getLogger(ConferenceFactoryGrpcApi.class); - private final ConferenceFactoryVerticle conferenceFactoryVerticle; + private final ConferenceFactoryVerticle verticle; public ConferenceFactoryGrpcApi(ConferenceFactoryVerticle conferenceFactoryVerticle) { - this.conferenceFactoryVerticle = conferenceFactoryVerticle; + this.verticle = conferenceFactoryVerticle; } @Override - public void createConference(com.nynjacoin.nccs.protocol.cfp.CreateConferenceRequest request, - io.vertx.core.Future response) { + public void getLimits(GetLimitsRequest request, Future response) { String accountId = GrpcContextKeys.ACCOUNT_ID.get(); - conferenceFactoryVerticle.createConference(accountId, request, response); + verticle.runOnContext( + () -> verticle.conferenceFactory.getLimits(accountId, request, response), + response, + GrpcContextKeys.LOG_CONTEXT.get() + ); + } + + @Override + public void createConference( + CreateConferenceRequest request, + Future response) { + + String accountId = GrpcContextKeys.ACCOUNT_ID.get(); + + verticle.runOnContext( + () -> verticle.conferenceFactory.createConference(accountId, request, response), + response, + GrpcContextKeys.LOG_CONTEXT.get() + ); + } + + @Override + public void startConference( + StartConferenceRequest request, + Future response) { + + String accountId = GrpcContextKeys.ACCOUNT_ID.get(); + + verticle.runOnContext( + () -> verticle.conferenceFactory.startConference(accountId, request, response), + response, + GrpcContextKeys.LOG_CONTEXT.get() + ); } @Override @@ -35,12 +73,38 @@ public class ConferenceFactoryGrpcApi String accountId = GrpcContextKeys.ACCOUNT_ID.get(); - conferenceFactoryVerticle.runOnContext( - () -> { - conferenceFactoryVerticle.conferenceFactory.getConferenceId(accountId, request, response); - }, + verticle.runOnContext( + () -> verticle.conferenceFactory.getConferenceId(accountId, request, response), + response, + GrpcContextKeys.LOG_CONTEXT.get() + ); + } + + @Override + public void getRoomInfo( + GetRoomInfoRequest request, + Future response) { + + String accountId = GrpcContextKeys.ACCOUNT_ID.get(); + + verticle.runOnContext( + () -> verticle.conferenceFactory.getRoomInfo(accountId, request, response), response, GrpcContextKeys.LOG_CONTEXT.get() ); } + + @Override + public void enterRoom( + EnterRoomRequest request, + Future response) { + String accountId = GrpcContextKeys.ACCOUNT_ID.get(); + + verticle.runOnContext( + () -> verticle.conferenceFactory.enterRoom(accountId, request, response), + response, + GrpcContextKeys.LOG_CONTEXT.get() + ); + } + } diff --git a/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/api/ConferenceFactoryWebApi.java b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/api/ConferenceFactoryWebApi.java index 59dcff8a465e87975aa82e261eac89d642a7ef33..ccf15672c08d1f1d711f9cbaa32878f922de29b0 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/api/ConferenceFactoryWebApi.java +++ b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/api/ConferenceFactoryWebApi.java @@ -1,17 +1,24 @@ package com.nynjacoin.nccs.conferencefactory.api; +import com.google.protobuf.InvalidProtocolBufferException; import com.nynjacoin.nccs.conferencefactory.service.ConferenceFactoryVerticle; -import com.nynjacoin.nccs.lib.grpcutil.GrpcContextKeys; +import com.nynjacoin.nccs.lib.grpcutil.PathHandler; import com.nynjacoin.nccs.lib.grpcutil.WebApi; import com.nynjacoin.nccs.lib.grpcutil.WebMetadata; import com.nynjacoin.nccs.lib.grpcutil.WebResponseHandler; import com.nynjacoin.nccs.protocol.cfp.CreateConferenceRequest; import com.nynjacoin.nccs.protocol.cfp.CreateConferenceResponse; +import com.nynjacoin.nccs.protocol.cfp.EnterRoomRequest; +import com.nynjacoin.nccs.protocol.cfp.EnterRoomResponse; import com.nynjacoin.nccs.protocol.cfp.GetConferenceIdRequest; import com.nynjacoin.nccs.protocol.cfp.GetConferenceIdResponse; -import com.nynjacoin.nccs.protocol.error.NccsWrappingException; +import com.nynjacoin.nccs.protocol.cfp.GetLimitsRequest; +import com.nynjacoin.nccs.protocol.cfp.GetLimitsResponse; +import com.nynjacoin.nccs.protocol.cfp.GetRoomInfoRequest; +import com.nynjacoin.nccs.protocol.cfp.GetRoomInfoResponse; +import com.nynjacoin.nccs.protocol.cfp.StartConferenceRequest; +import com.nynjacoin.nccs.protocol.cfp.StartConferenceResponse; import com.nynjacoin.nccs.protocol.metadata.CommonMetadataKey; -import io.grpc.Context; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.ext.web.Router; @@ -20,47 +27,105 @@ import io.vertx.ext.web.RoutingContext; public class ConferenceFactoryWebApi implements WebApi { public ConferenceFactoryWebApi(ConferenceFactoryVerticle conferenceFactoryVerticle) { - this.conferenceFactoryVerticle = conferenceFactoryVerticle; + this.verticle = conferenceFactoryVerticle; } - private final ConferenceFactoryVerticle conferenceFactoryVerticle; - - private void createConference(RoutingContext ctx) { - ctx.request().bodyHandler(NccsWrappingException.wrap(bodyBuffer -> { - WebMetadata webMetadata = new WebMetadata(ctx); - String accountId = webMetadata.get(CommonMetadataKey.ACCOUNT_ID); - - CreateConferenceRequest request = CreateConferenceRequest.parseFrom(bodyBuffer.getBytes()); - Future response = WebResponseHandler.future(ctx); - - Context logGrpcContext = Context.current() - .withValue(GrpcContextKeys.LOG_CONTEXT, webMetadata.getLogContext()); - Context prevGrpcContext = logGrpcContext.attach(); - try { - conferenceFactoryVerticle.createConference(accountId, request, response); - } finally { - logGrpcContext.detach(prevGrpcContext); - } - })); + private final ConferenceFactoryVerticle verticle; + + private void getLimits(RoutingContext ctx, byte[] body) + throws InvalidProtocolBufferException { + + WebMetadata webMetadata = new WebMetadata(ctx); + String accountId = webMetadata.get(CommonMetadataKey.ACCOUNT_ID); + + GetLimitsRequest request = GetLimitsRequest.parseFrom(body); + Future response = WebResponseHandler.future(ctx); + + verticle.runOnContext( + () -> verticle.conferenceFactory.getLimits(accountId, request, response), + response, + webMetadata.getLogContext() + ); + } + + private void createConference(RoutingContext ctx, byte[] body) + throws InvalidProtocolBufferException { + + WebMetadata webMetadata = new WebMetadata(ctx); + String accountId = webMetadata.get(CommonMetadataKey.ACCOUNT_ID); + + CreateConferenceRequest request = CreateConferenceRequest.parseFrom(body); + Future response = WebResponseHandler.future(ctx); + + verticle.runOnContext( + () -> verticle.conferenceFactory.createConference(accountId, request, response), + response, + webMetadata.getLogContext() + ); + } + + private void startConference(RoutingContext ctx, byte[] body) + throws InvalidProtocolBufferException { + + WebMetadata webMetadata = new WebMetadata(ctx); + String accountId = webMetadata.get(CommonMetadataKey.ACCOUNT_ID); + + StartConferenceRequest request = StartConferenceRequest.parseFrom(body); + Future response = WebResponseHandler.future(ctx); + + verticle.runOnContext( + () -> verticle.conferenceFactory.startConference(accountId, request, response), + response, + webMetadata.getLogContext() + ); } - private void getConferenceId(RoutingContext ctx) { - ctx.request().bodyHandler(NccsWrappingException.wrap(bodyBuffer -> { - WebMetadata webMetadata = new WebMetadata(ctx); - String accountId = webMetadata.get(CommonMetadataKey.ACCOUNT_ID); - - GetConferenceIdRequest request = GetConferenceIdRequest.parseFrom(bodyBuffer.getBytes()); - Future response = WebResponseHandler.future(ctx); - - conferenceFactoryVerticle.runOnContext( - () -> { - conferenceFactoryVerticle.conferenceFactory - .getConferenceId(accountId, request, response); - }, - response, - webMetadata.getLogContext() - ); - })); + private void getConferenceId(RoutingContext ctx, byte[] body) + throws InvalidProtocolBufferException { + + WebMetadata webMetadata = new WebMetadata(ctx); + String accountId = webMetadata.get(CommonMetadataKey.ACCOUNT_ID); + + GetConferenceIdRequest request = GetConferenceIdRequest.parseFrom(body); + Future response = WebResponseHandler.future(ctx); + + verticle.runOnContext( + () -> verticle.conferenceFactory.getConferenceId(accountId, request, response), + response, + webMetadata.getLogContext() + ); + } + + private void getRoomInfo(RoutingContext ctx, byte[] body) + throws InvalidProtocolBufferException { + + WebMetadata webMetadata = new WebMetadata(ctx); + String accountId = webMetadata.get(CommonMetadataKey.ACCOUNT_ID); + + var request = GetRoomInfoRequest.parseFrom(body); + Future response = WebResponseHandler.future(ctx); + + verticle.runOnContext( + () -> verticle.conferenceFactory.getRoomInfo(accountId, request, response), + response, + webMetadata.getLogContext() + ); + } + + private void enterRoom(RoutingContext ctx, byte[] body) + throws InvalidProtocolBufferException { + + WebMetadata webMetadata = new WebMetadata(ctx); + String accountId = webMetadata.get(CommonMetadataKey.ACCOUNT_ID); + + var request = EnterRoomRequest.parseFrom(body); + Future response = WebResponseHandler.future(ctx); + + verticle.runOnContext( + () -> verticle.conferenceFactory.enterRoom(accountId, request, response), + response, + webMetadata.getLogContext() + ); } @Override @@ -72,12 +137,12 @@ public class ConferenceFactoryWebApi implements WebApi { public Router createRouter(Vertx vertx) { Router router = Router.router(vertx); - router - .route("/CreateConference") - .handler(this::createConference); - router - .route("/GetConferenceId") - .handler(this::getConferenceId); + PathHandler.route(router, "/GetLimits", this::getLimits); + PathHandler.route(router, "/CreateConference", this::createConference); + PathHandler.route(router, "/StartConference", this::startConference); + PathHandler.route(router, "/GetConferenceId", this::getConferenceId); + PathHandler.route(router, "/GetRoomInfo", this::getRoomInfo); + PathHandler.route(router, "/EnterRoom", this::enterRoom); return router; } diff --git a/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/AccountConferences.java b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/AccountConferences.java new file mode 100644 index 0000000000000000000000000000000000000000..2adc0634fcad4d22200b0ebc59ce8fc054e3d07d --- /dev/null +++ b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/AccountConferences.java @@ -0,0 +1,51 @@ +package com.nynjacoin.nccs.conferencefactory.service; + +import com.nynjacoin.nccs.protocol.cfp.CreateConferenceResponse; +import java.util.HashMap; + +class AccountConferences { + + private static class CachedCreateResponse { + + private final CreateConferenceResponse response; + private final String externalId; + CachedCreateResponse(CreateConferenceResponse response, String externalId) { + this.response = response; + this.externalId = externalId; + } + + CreateConferenceResponse getResponse() { + return response; + } + + String getExternalId() { + return externalId; + } + + } + private final HashMap container; + + AccountConferences() { + container = new HashMap<>(); + } + + boolean isEmpty() { + return container.isEmpty(); + } + + void put(ConferenceInfo ci, CreateConferenceResponse response) { + container.put(ci.getConferenceId(), new CachedCreateResponse(response, ci.getExternalId())); + } + + void remove(String conferenceId) { + container.remove(conferenceId); + } + + CreateConferenceResponse getByExternalId(String externalId) { + return container.values().stream() + .filter(cachedResponse -> cachedResponse.getExternalId().equals(externalId)) + .findFirst() + .map(CachedCreateResponse::getResponse) + .orElse(null); + } +} diff --git a/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceFactory.java b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceFactory.java index c1929922b7f3f555c58ea204849f408d703218b7..10e8503a54691d9759ecf9973b267d0895fa341e 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceFactory.java +++ b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceFactory.java @@ -1,75 +1,199 @@ package com.nynjacoin.nccs.conferencefactory.service; +import com.nynjacoin.nccs.conferencecontrol.service.ConferenceControlVerticle; import com.nynjacoin.nccs.conferencecontrol.service.JoinLinkComposer; import com.nynjacoin.nccs.lib.config.ConfigKey; +import com.nynjacoin.nccs.lib.grpcutil.GrpcTimestamp; import com.nynjacoin.nccs.lib.util.UuidGenerator; +import com.nynjacoin.nccs.lib.vertxutil.restclient.chatroom.ChatRoomClient; +import com.nynjacoin.nccs.lib.vertxutil.restclient.roomlink.RoomLinkClient; +import com.nynjacoin.nccs.protocol.ccp.BindByLinkRequest; +import com.nynjacoin.nccs.protocol.ccp.BindByLinkResponse; import com.nynjacoin.nccs.protocol.cfp.CreateConferenceRequest; import com.nynjacoin.nccs.protocol.cfp.CreateConferenceResponse; +import com.nynjacoin.nccs.protocol.cfp.EnterRoomRequest; +import com.nynjacoin.nccs.protocol.cfp.EnterRoomResponse; import com.nynjacoin.nccs.protocol.cfp.GetConferenceIdRequest; import com.nynjacoin.nccs.protocol.cfp.GetConferenceIdResponse; +import com.nynjacoin.nccs.protocol.cfp.GetLimitsRequest; +import com.nynjacoin.nccs.protocol.cfp.GetLimitsResponse; +import com.nynjacoin.nccs.protocol.cfp.GetRoomInfoRequest; +import com.nynjacoin.nccs.protocol.cfp.GetRoomInfoResponse; +import com.nynjacoin.nccs.protocol.cfp.StartConferenceRequest; +import com.nynjacoin.nccs.protocol.cfp.StartConferenceResponse; +import com.nynjacoin.nccs.protocol.def.AddMemberRequest; +import com.nynjacoin.nccs.protocol.def.AddMemberResponse; +import com.nynjacoin.nccs.protocol.def.Address; +import com.nynjacoin.nccs.protocol.def.Address.Type; import com.nynjacoin.nccs.protocol.error.NccsError; +import com.nynjacoin.nccs.protocol.metadata.CommonMetadataKey; +import com.nynjacoin.nccs.protocol.sp.conference.RoomConferenceRecord; +import com.nynjacoin.nccs.protocol.validation.NccsValidator; +import com.nynjacoin.nccs.stateholder.statepackage.RoomPublisher; import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class ConferenceFactory { - static class AccountConferences extends HashMap { - - public Optional getByExternalId(String externalId) { - return this.values().stream() - .filter(conferenceInfo -> conferenceInfo.getExternalId().equals(externalId)) - .findFirst(); - } - - } - + private static Logger LOGGER = LogManager.getLogger(ConferenceFactory.class); private final Map accountConferences; - private final Map conferences; private final Map conferencesByLink; + private final Map rooms; private final int membersCountLimit; private final JoinLinkComposer joinLinkComposer; private final LinkIdGenerator linkIdGenerator; + private final RoomPublisher roomPublisher; + private final RoomLinkClient roomLinkClient; + private final ChatRoomClient chatRoomClient; + private ConferenceControlVerticle conferenceControlVerticle; + ConferenceFactory( + JsonObject config, + JoinLinkComposer joinLinkComposer, + RoomPublisher roomPublisher, + RoomLinkClient roomLinkClient, + ChatRoomClient chatRoomClient) { - public ConferenceFactory(JsonObject config, JoinLinkComposer joinLinkComposer) { this.accountConferences = new HashMap<>(); this.conferences = new HashMap<>(); this.conferencesByLink = new HashMap<>(); + this.rooms = new HashMap<>(); this.membersCountLimit = config.getInteger( ConfigKey.CONFERENCE_MEMBERS_COUNT_LIMIT.key(), 30); this.joinLinkComposer = joinLinkComposer; this.linkIdGenerator = new LinkIdGenerator(); + this.roomPublisher = roomPublisher; + this.roomLinkClient = roomLinkClient; + this.chatRoomClient = chatRoomClient; + } + + public void getLimits( + String accountId, + GetLimitsRequest request, + Future response) { + + response.complete(GetLimitsResponse.newBuilder().setMaxMembers(membersCountLimit).build()); + } + + void setConferenceControlVerticle(ConferenceControlVerticle conferenceControlVerticle) { + this.conferenceControlVerticle = conferenceControlVerticle; + } + + public void createConference( + String accountId, + CreateConferenceRequest request, + Future response) { + + createAndStartConference(accountId, request, false, "") + .compose(response::complete, response); } - void createConference(String accountId, - com.nynjacoin.nccs.protocol.cfp.CreateConferenceRequest request, - io.vertx.core.Future response) { + private Future createAndStartConference( + String accountId, + CreateConferenceRequest request, + boolean start, + String roomId) { AccountConferences accountConferences = getOrCreateAccountConferences(accountId); final String externalId = request.getExternalId(); if (!externalId.isEmpty()) { - accountConferences.getByExternalId(externalId).ifPresent(ci -> { + var cachedResponse = accountConferences.getByExternalId(externalId); + if (cachedResponse != null) { // found by external id, so treat it as a retry - response.complete(toCreateConferenceResponse(ci)); - return; - }); + return Future.succeededFuture(cachedResponse); + } } - ConferenceInfo ci = createNewConference(accountId, request); + ConferenceInfo ci = createNewConference(accountId, request, roomId); conferences.put(ci.getConferenceId(), ci); conferencesByLink.put(ci.getLinkId(), ci); - if (!ci.getExternalId().isEmpty()) { - accountConferences.put(ci.getExternalId(), ci); + if (request.getAddMemberCount() == 0 && !start) { + assert roomId.isEmpty(); + var createResponse = toCreateConferenceResponse(ci, List.of()); + if (!ci.getExternalId().isEmpty()) { + accountConferences.put(ci, createResponse); + } + return Future.succeededFuture(createResponse); + } + + ConferenceRoomState room; + if (roomId.isEmpty()) { + room = null; + } else { + assert start; + ConferenceRoomState existingRoom = rooms.get(roomId); + if (existingRoom != null) { + if (existingRoom.conferenceStarted()) { + return Future.failedFuture(NccsError.CONFERENCE_ROOM_BUSY.exception()); + } + Future waitCompletionFuture = Future.future(); + existingRoom.addCompletionWaiter(() -> + createAndStartConference(accountId, request, start, roomId) + .compose(waitCompletionFuture::complete, waitCompletionFuture) + ); + return waitCompletionFuture; + } + room = new ConferenceRoomState(roomId, ci.getConferenceId()); + rooms.put(roomId, room); + } + + Future> ccResult = Future.future(); + conferenceControlVerticle.createAddMembersStart( + ci, + request.getAddMemberList(), + start, + ccResult); + Future result = Future.future(); + ccResult.setHandler(ar -> { + if (ar.succeeded()) { + var createResponse = toCreateConferenceResponse(ci, ar.result()); + if (!ci.getExternalId().isEmpty()) { + accountConferences.put(ci, createResponse); + } + if (room != null) { + roomPublisher.publishConference( + roomId, + RoomConferenceRecord.newBuilder() + .setOwnerAccountId(ci.getOwnerAccountId()) + .setSubject(ci.getSubject()) + .setStartTime(GrpcTimestamp.create(ci.getStartTime())) + ); + room.onConferenceStarted(); + } + result.complete(createResponse); + } else { + discardConference(ci.getConferenceId()); + result.fail(ar.cause()); + } + }); + return result; + } + + public void startConference( + String accountId, + StartConferenceRequest request, + Future response) { + + Future f; + if (request.getRoomId().isEmpty()) { + f = createAndStartConference(accountId, request.getCreateRequest(), true, ""); + } else { + f = createAndStartRoomConference(accountId, request.getCreateRequest(), request.getRoomId()); } - response.complete(toCreateConferenceResponse(ci)); + f.compose( + createResponse -> response.complete( + StartConferenceResponse.newBuilder().setCreateResponse(createResponse).build()), + response); } public void getConferenceId( @@ -91,36 +215,164 @@ public class ConferenceFactory { .setConferenceId(ci.getConferenceId()) ; if (Objects.isNull(accountId) || accountId.isEmpty()) { - responseBuilder.setAnonymousId(ConferenceInfo.ANONYMOUS_PREFIX + UuidGenerator.generate()); + responseBuilder.setAnonymousId(ConferenceInfo.generateAnonymousId()); } response.complete(responseBuilder.build()); } - public void discardConference(String conferenceId) { - ConferenceInfo conferenceInfo = conferences.remove(conferenceId); - if (conferenceInfo == null) { + public void getRoomInfo( + String accountId, + GetRoomInfoRequest request, + Future response) { + + roomLinkClient.getRoomInfo(request.getLinkId()) + .map(roomInfo -> { + var responseBuilder = GetRoomInfoResponse.newBuilder() + .setRoomId(roomInfo.getId()) + .setRoomName(roomInfo.getName()); + if (Objects.isNull(accountId) || accountId.isEmpty()) { + responseBuilder.setAnonymousId(ConferenceInfo.generateAnonymousId()); + } + return responseBuilder.build(); + }) + .setHandler(response); + } + + public void enterRoom( + String accountId, + EnterRoomRequest request, + Future responseFuture) { + + String roomId = request.getRoomId(); + NccsValidator.requireNonEmpty(roomId, "room_id"); + NccsValidator.requireNonEmpty(accountId, CommonMetadataKey.ACCOUNT_ID.name()); + + checkAccessToRoom(accountId, roomId) + .compose(accessType -> { + ConferenceRoomState room = rooms.get(roomId); + switch (accessType) { + case START_CONFERENCE: + if (room == null) { + // no conference in the room, so create one + CreateConferenceRequest createReq = CreateConferenceRequest.newBuilder() + .addAddMember(AddMemberRequest.newBuilder() + .setAddress(Address.newBuilder().setType(Type.ACCOUNT).setValue(accountId))) + .setChatRoomId(roomId) + .build(); + createAndStartConference(accountId, createReq, true, roomId) + .map(createResponse -> EnterRoomResponse.newBuilder() + .setConferenceId(createResponse.getConferenceId()) + .setMemberId(createResponse.getMemberResult(0).getMemberId()) + .build()) + .compose(responseFuture::complete, responseFuture); + return; + } + if (!room.conferenceStarted()) { + room.addCompletionWaiter(() -> enterRoom(accountId, request, responseFuture)); + return; + } + break; + case JOIN: + if (room == null || !room.conferenceStarted()) { + responseFuture.complete(EnterRoomResponse.getDefaultInstance()); + return; + } + break; + default: + throw NccsError.UNAUTHORIZED.exception( + "Access to conference room not allowed"); + } + String conferenceId = room.getConferenceId(); + BindByLinkRequest bindReq = BindByLinkRequest.newBuilder() + .setDisplayName(request.getDisplayName()) + .setPii(request.getPii()) + .build(); + Future bindFuture = Future.future(); + conferenceControlVerticle.bindByLink(accountId, conferenceId, bindReq, bindFuture); + bindFuture + .map(bindResp -> EnterRoomResponse.newBuilder() + .setConferenceId(conferenceId) + .setMemberId(bindResp.getMemberId()) + .build()) + .compose(responseFuture::complete, responseFuture); + }, responseFuture); + } + + void discardConference(String conferenceId) { + ConferenceInfo ci = conferences.remove(conferenceId); + if (ci == null) { return; } - AccountConferences ac = accountConferences.get(conferenceInfo.getOwnerAccountId()); + assert ci.getConferenceId().equals(conferenceId); + AccountConferences ac = accountConferences.get(ci.getOwnerAccountId()); if (ac == null) { return; } ac.remove(conferenceId); if (ac.isEmpty()) { - accountConferences.remove(conferenceInfo.getOwnerAccountId()); + accountConferences.remove(ci.getOwnerAccountId()); } - final String linkId = conferenceInfo.getLinkId(); + final String linkId = ci.getLinkId(); // make the link to point to a dummy conference, to mark the fact it existed conferencesByLink.put(linkId, ConferenceInfo.newBuilder().build()); // and remove this mark latter: - Vertx.currentContext().owner().setTimer(5 * 60000, v -> { - conferencesByLink.remove(linkId); - }); + Vertx.currentContext().owner().setTimer(5 * 60000, v -> conferencesByLink.remove(linkId)); + + String roomId = ci.getRoomId(); + if (roomId.isEmpty()) { + return; + } + ConferenceRoomState room = rooms.get(roomId); + if (room == null) { + LOGGER.warn("Discard room conference {} with empty room {}", conferenceId, roomId); + return; + } + if (!room.getConferenceId().equals(conferenceId)) { + LOGGER.warn("Discard conference {} room {} mismatch (room conference id {})", + conferenceId, + roomId, + room.getConferenceId()); + return; + } + if (room.conferenceStarted()) { + roomPublisher.discardConference(roomId); + } + rooms.remove(roomId); + room.onConferenceDiscarded(); + } + + private Future + createAndStartRoomConference(String accountId, CreateConferenceRequest request, String roomId) { + assert !roomId.isEmpty(); + return checkAccessToRoom(accountId, roomId) + .compose(accessType -> { + if (accessType == RoomAccess.START_CONFERENCE) { + return createAndStartConference(accountId, request, true, roomId); + } + throw NccsError.UNAUTHORIZED.exception(); + }); } - private ConferenceInfo createNewConference(String accountId, CreateConferenceRequest request) { + private enum RoomAccess { + JOIN, + START_CONFERENCE; + } + + private Future checkAccessToRoom(String accountId, String roomId) { + if (ConferenceInfo.isAnonymous(accountId)) { + // TODO: real authorization based on access to the particular room in the access token + return Future.succeededFuture(RoomAccess.JOIN); + } + return chatRoomClient.isMember(accountId, roomId, accountId) + .map(isMember -> isMember ? RoomAccess.START_CONFERENCE : RoomAccess.JOIN); + } + + private ConferenceInfo createNewConference( + String accountId, + CreateConferenceRequest request, + String roomId) { return ConferenceInfo.newBuilder() .setAccountId(accountId) .setConferenceId(UuidGenerator.generate()) @@ -132,14 +384,18 @@ public class ConferenceFactory { .setChatRoomId(request.getChatRoomId()) .setChatRoomOptions(request.getChatRoomOptions()) .setReplaceRef(request.getReplaceRef()) + .setRoomId(roomId) .build(); } - private CreateConferenceResponse toCreateConferenceResponse(ConferenceInfo ci) { + private CreateConferenceResponse toCreateConferenceResponse( + ConferenceInfo ci, + List addedMembers) { return CreateConferenceResponse.newBuilder() .setConferenceId(ci.getConferenceId()) .setMembersCountLimit(ci.getMembersCountLimit()) .setJoinLink(joinLinkComposer.compose(ci.getLinkId())) + .addAllMemberResult(addedMembers) .build(); } diff --git a/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceFactoryVerticle.java b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceFactoryVerticle.java index 2c92b13a0a654c1a0ca54c926ad73823e12102c2..24ccc04af91c3388343029c584a0481eceed0c2a 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceFactoryVerticle.java +++ b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceFactoryVerticle.java @@ -1,8 +1,11 @@ package com.nynjacoin.nccs.conferencefactory.service; +import com.nynjacoin.nccs.conferencecontrol.service.ConferenceControlVerticle; import com.nynjacoin.nccs.conferencecontrol.service.JoinLinkComposer; -import com.nynjacoin.nccs.lib.grpcutil.GrpcContextKeys; import com.nynjacoin.nccs.lib.vertxdispatch.NccsVerticle; +import com.nynjacoin.nccs.lib.vertxutil.restclient.chatroom.ChatRoomClient; +import com.nynjacoin.nccs.lib.vertxutil.restclient.roomlink.RoomLinkClient; +import com.nynjacoin.nccs.stateholder.statepackage.RoomPublisher; import io.vertx.core.Future; import io.vertx.core.json.JsonObject; import org.apache.logging.log4j.CloseableThreadContext; @@ -12,20 +15,23 @@ public class ConferenceFactoryVerticle extends NccsVerticle { public final ConferenceFactory conferenceFactory; - public ConferenceFactoryVerticle(JsonObject config, JoinLinkComposer joinLinkComposer) { - this.conferenceFactory = new ConferenceFactory(config, joinLinkComposer); + public ConferenceFactoryVerticle( + JsonObject config, + JoinLinkComposer joinLinkComposer, + RoomPublisher roomPublisher, + RoomLinkClient roomLinkClient, + ChatRoomClient chatRoomClient) { + + this.conferenceFactory = new ConferenceFactory( + config, + joinLinkComposer, + roomPublisher, + roomLinkClient, + chatRoomClient); } - public void createConference(String accountId, - com.nynjacoin.nccs.protocol.cfp.CreateConferenceRequest request, - io.vertx.core.Future response) { - - final var logContext = GrpcContextKeys.LOG_CONTEXT.get(); - context.runOnContext(v -> { - try (final CloseableThreadContext.Instance tc = CloseableThreadContext.putAll(logContext)) { - conferenceFactory.createConference(accountId, request, response); - } - }); + public void setConferenceControlVerticle(ConferenceControlVerticle conferenceControlVerticle) { + conferenceFactory.setConferenceControlVerticle(conferenceControlVerticle); } public void getConferenceInfo(String conferenceId, Future response) { diff --git a/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceInfo.java b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceInfo.java index 017fb741be673f2bcbc68e9191eca2178c8f5e64..7a21becc7903477f0cb12a08967e1d67f0af3da5 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceInfo.java +++ b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceInfo.java @@ -1,5 +1,6 @@ package com.nynjacoin.nccs.conferencefactory.service; +import com.nynjacoin.nccs.lib.util.UuidGenerator; import com.nynjacoin.nccs.protocol.cfp.ChatRoomOptions; import com.nynjacoin.nccs.protocol.error.NccsError; import com.nynjacoin.nccs.protocol.sp.conference.ChatRoomType; @@ -7,12 +8,19 @@ import java.util.Date; public class ConferenceInfo { - public static final String ANONYMOUS_PREFIX = "ann_"; + private static final String ANONYMOUS_PREFIX = "ann_"; + + public static boolean isAnonymous(String accountId) { + return accountId.toLowerCase().startsWith(ConferenceInfo.ANONYMOUS_PREFIX); + } + + static String generateAnonymousId() { + return ConferenceInfo.ANONYMOUS_PREFIX + UuidGenerator.generate(); + } private final String ownerAccountId; private final String conferenceId; private final String externalId; - private final String replaceRef; private String externalInfo; private final int membersCountLimit; private final String linkId; @@ -20,6 +28,8 @@ public class ConferenceInfo { private String chatRoomId; private ChatRoomType chatRoomType; private final ChatRoomOptions chatRoomOptions; + private final String replaceRef; + private final String roomId; private Date startTime; private ConferenceState state; @@ -33,7 +43,8 @@ public class ConferenceInfo { String subject, String chatRoomId, ChatRoomOptions chatRoomOptions, - String replaceRef) { + String replaceRef, + String roomId) { this.ownerAccountId = ownerAccountId; this.conferenceId = conferenceId; @@ -48,6 +59,7 @@ public class ConferenceInfo { this.chatRoomOptions = chatRoomOptions; this.replaceRef = replaceRef; this.state = ConferenceState.CREATED; + this.roomId = roomId; } public static ConferenceInfoBuilder newBuilder() { return new ConferenceInfoBuilder(); } @@ -107,6 +119,10 @@ public class ConferenceInfo { return replaceRef; } + public String getRoomId() { + return roomId; + } + public Date getStartTime() { return startTime; } diff --git a/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceInfoBuilder.java b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceInfoBuilder.java index b6e10e7f9b1566900f331cc8893305b06e161f42..8b41e61c3a68383738846efb61a612e39398ff25 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceInfoBuilder.java +++ b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceInfoBuilder.java @@ -1,6 +1,5 @@ package com.nynjacoin.nccs.conferencefactory.service; -import com.nynjacoin.nccs.conferencecontrol.service.ParticipantInfo.Builder; import com.nynjacoin.nccs.protocol.cfp.ChatRoomOptions; public class ConferenceInfoBuilder { @@ -15,6 +14,7 @@ public class ConferenceInfoBuilder { private String chatRoomId = ""; private ChatRoomOptions chatRoomOptions; private String replaceRef; + private String roomId = ""; public ConferenceInfoBuilder setAccountId(String accountId) { this.accountId = accountId; @@ -66,6 +66,11 @@ public class ConferenceInfoBuilder { return this; } + public ConferenceInfoBuilder setRoomId(String roomId) { + this.roomId = roomId; + return this; + } + public ConferenceInfo build() { return new ConferenceInfo( accountId, @@ -77,6 +82,7 @@ public class ConferenceInfoBuilder { subject, chatRoomId, chatRoomOptions, - replaceRef); + replaceRef, + roomId); } } diff --git a/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceRoomState.java b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceRoomState.java new file mode 100644 index 0000000000000000000000000000000000000000..57479a021bb6f83418b16dd8d3405aad9361dd9f --- /dev/null +++ b/focus/src/main/java/com/nynjacoin/nccs/conferencefactory/service/ConferenceRoomState.java @@ -0,0 +1,73 @@ +package com.nynjacoin.nccs.conferencefactory.service; + +import java.util.ArrayList; +import java.util.List; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ConferenceRoomState { + + private static Logger LOGGER = LogManager.getLogger(ConferenceRoomState.class); + + private final String conferenceId; + private final String roomId; + private boolean started; + private boolean completed; + private List waitingForCompletion; + + ConferenceRoomState(String roomId, String conferenceId) { + this.roomId = roomId; + this.conferenceId = conferenceId; + this.started = false; + this.completed = false; + this.waitingForCompletion = new ArrayList<>(); + } + + boolean conferenceStarted() { + return started; + } + + public String getConferenceId() { + return conferenceId; + } + + void onConferenceStarted() { + assert !started; + assert !completed; + this.started = true; + this.completed = true; + callWaiting(); + } + + void onConferenceDiscarded() { + if (!completed) { + this.completed = true; + callWaiting(); + } + } + + void addCompletionWaiter(Runnable waiting) { + assert !completed; + waitingForCompletion.add(waiting); + } + + private void callWaiting() { + assert completed; + List localWaiting = waitingForCompletion; + waitingForCompletion = null; + localWaiting.forEach(this::callWaiting); + } + + private void callWaiting(Runnable waiting) { + try { + LOGGER.debug("Calling {}", waiting); + waiting.run(); + } catch (Throwable e) { + LOGGER.error("Exception calling waiting for a room {} conference {} started: {}", + roomId, + conferenceId, + started, + e); + } + } +} diff --git a/focus/src/main/java/com/nynjacoin/nccs/stateholder/api/StateHolderGrpcApi.java b/focus/src/main/java/com/nynjacoin/nccs/stateholder/api/StateHolderGrpcApi.java index 1b745f1cdb86699603abdadcbb26aa21d4ec8dbb..70d97122f2a1de42de8f25431ac740cd754fe7df 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/stateholder/api/StateHolderGrpcApi.java +++ b/focus/src/main/java/com/nynjacoin/nccs/stateholder/api/StateHolderGrpcApi.java @@ -1,18 +1,21 @@ package com.nynjacoin.nccs.stateholder.api; import com.nynjacoin.nccs.lib.grpcutil.GrpcContextKeys; +import com.nynjacoin.nccs.protocol.def.Void; +import com.nynjacoin.nccs.protocol.shp.CancelRequest; import com.nynjacoin.nccs.protocol.shp.FetchRequest; import com.nynjacoin.nccs.protocol.shp.StateHolderGrpc; import com.nynjacoin.nccs.protocol.shp.UpdatePacket; import com.nynjacoin.nccs.stateholder.service.StateHolderVerticle; +import io.vertx.core.Future; import io.vertx.grpc.GrpcWriteStream; public class StateHolderGrpcApi extends StateHolderGrpc.StateHolderVertxImplBase { - private final StateHolderVerticle stateHolderVerticle; + private final StateHolderVerticle verticle; - public StateHolderGrpcApi(StateHolderVerticle stateHolderVerticle) { - this.stateHolderVerticle = stateHolderVerticle; + public StateHolderGrpcApi(StateHolderVerticle verticle) { + this.verticle = verticle; } @Override @@ -21,7 +24,23 @@ public class StateHolderGrpcApi extends StateHolderGrpc.StateHolderVertxImplBase String accountId = GrpcContextKeys.ACCOUNT_ID.get(); String instanceId = GrpcContextKeys.INSTANCE_ID.get(); - stateHolderVerticle.fetch(accountId, instanceId, request, response); + verticle.runOnContext( + () -> verticle.stateHolderService.fetch(accountId, instanceId, request, response), + response, + GrpcContextKeys.LOG_CONTEXT.get() + ); + } + + @Override + public void cancelLongPoll(CancelRequest request, Future response) { + String accountId = GrpcContextKeys.ACCOUNT_ID.get(); + String instanceId = GrpcContextKeys.INSTANCE_ID.get(); + + verticle.runOnContext( + () -> verticle.stateHolderService.cancelLongPoll(accountId, instanceId, request, response), + response, + GrpcContextKeys.LOG_CONTEXT.get() + ); } } diff --git a/focus/src/main/java/com/nynjacoin/nccs/stateholder/api/StateHolderWebApi.java b/focus/src/main/java/com/nynjacoin/nccs/stateholder/api/StateHolderWebApi.java index 8720e9895b3cebe7d700caac33e9066a35604e81..d1d8d294dd01085cde6c39b4fd6e9caf302c9ff7 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/stateholder/api/StateHolderWebApi.java +++ b/focus/src/main/java/com/nynjacoin/nccs/stateholder/api/StateHolderWebApi.java @@ -1,34 +1,31 @@ package com.nynjacoin.nccs.stateholder.api; import com.google.protobuf.InvalidProtocolBufferException; -import com.nynjacoin.nccs.lib.grpcutil.GrpcContextKeys; import com.nynjacoin.nccs.lib.grpcutil.GrpcWriteStreamEventSourceAdapter; +import com.nynjacoin.nccs.lib.grpcutil.PathHandler; import com.nynjacoin.nccs.lib.grpcutil.WebApi; -import com.nynjacoin.nccs.lib.grpcutil.WebEventTarget; import com.nynjacoin.nccs.lib.grpcutil.WebMetadata; +import com.nynjacoin.nccs.lib.grpcutil.WebResponseHandler; +import com.nynjacoin.nccs.protocol.def.Void; +import com.nynjacoin.nccs.protocol.error.NccsWebEndpointException; import com.nynjacoin.nccs.protocol.metadata.CallingMetadataKey; import com.nynjacoin.nccs.protocol.metadata.CommonMetadataKey; +import com.nynjacoin.nccs.protocol.shp.CancelRequest; import com.nynjacoin.nccs.protocol.shp.FetchRequest; import com.nynjacoin.nccs.protocol.shp.UpdatePacket; import com.nynjacoin.nccs.stateholder.service.StateHolderVerticle; -import io.grpc.Context; -import io.vertx.core.MultiMap; +import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; import io.vertx.grpc.GrpcWriteStream; -import java.io.IOException; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; public class StateHolderWebApi implements WebApi { - private static final Logger LOGGER = LogManager.getLogger(StateHolderWebApi.class); + private final StateHolderVerticle verticle; - private final StateHolderVerticle stateHolderVerticle; - - public StateHolderWebApi(StateHolderVerticle stateHolderVerticle) { - this.stateHolderVerticle = stateHolderVerticle; + public StateHolderWebApi(StateHolderVerticle verticle) { + this.verticle = verticle; } @Override @@ -40,54 +37,44 @@ public class StateHolderWebApi implements WebApi { public Router createRouter(Vertx vertx) { Router router = Router.router(vertx); - router - .route("/Fetch") - .handler(this::fetch); + PathHandler.routeSSE(router, "/Fetch", this::fetch); + PathHandler.route(router, "/CancelLongPoll", this::cancelLongPoll); return router; } - private void fetch(RoutingContext ctx) { + private void fetch(RoutingContext ctx, byte[] body) + throws InvalidProtocolBufferException, NccsWebEndpointException { + WebMetadata webMetadata = new WebMetadata(ctx); String accountId = webMetadata.get(CommonMetadataKey.ACCOUNT_ID); String instanceId = webMetadata.get(CallingMetadataKey.INSTANCE_ID); - FetchRequest request; - try { - request = parseFetchRequest(ctx.request().params()); - } catch (InvalidProtocolBufferException e) { - ctx.fail(e); - return; - } - - WebEventTarget target = new WebEventTarget(ctx); - - try { - target.open(); - } catch (IOException e) { - LOGGER.warn("Error while opening event source!", e); - return; - } - - GrpcWriteStream response = - new GrpcWriteStreamEventSourceAdapter(target, Vertx.currentContext()); - - LOGGER.debug("Fetch account: {} instance: {} request {}", accountId, instanceId, request); - - Context logGrpcContext = Context.current() - .withValue(GrpcContextKeys.LOG_CONTEXT, webMetadata.getLogContext()); - Context prevGrpcContext = logGrpcContext.attach(); - try { - stateHolderVerticle.fetch(accountId, instanceId, request, response); - } finally { - logGrpcContext.detach(prevGrpcContext); - } + FetchRequest request = FetchRequest.parseFrom(body); + GrpcWriteStream response = GrpcWriteStreamEventSourceAdapter.create(ctx); + + verticle.runOnContext( + () -> verticle.stateHolderService.fetch(accountId, instanceId, request, response), + response, + webMetadata.getLogContext() + ); } - private FetchRequest parseFetchRequest(MultiMap params) throws InvalidProtocolBufferException { - byte[] body = WebEventTarget.getEventSourceRequestBody(params); - FetchRequest fetchRequest = FetchRequest.parseFrom(body); - return fetchRequest; + private void cancelLongPoll(RoutingContext ctx, byte[] body) + throws InvalidProtocolBufferException { + + WebMetadata webMetadata = new WebMetadata(ctx); + String accountId = webMetadata.get(CommonMetadataKey.ACCOUNT_ID); + String instanceId = webMetadata.get(CallingMetadataKey.INSTANCE_ID); + + var request = CancelRequest.parseFrom(body); + Future response = WebResponseHandler.future(ctx); + + verticle.runOnContext( + () -> verticle.stateHolderService.cancelLongPoll(accountId, instanceId, request, response), + response, + webMetadata.getLogContext() + ); } } diff --git a/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/StateHolder.java b/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/StateHolder.java index 6ff40163f0cb40ab25862a9660973a23d773d8a3..0ef92bd97178c6df3e9b970793ee1d377c02d474 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/StateHolder.java +++ b/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/StateHolder.java @@ -1,10 +1,12 @@ package com.nynjacoin.nccs.stateholder.service; import com.nynjacoin.nccs.lib.util.UuidGenerator; +import com.nynjacoin.nccs.protocol.def.Void; import com.nynjacoin.nccs.protocol.error.NccsError; import com.nynjacoin.nccs.protocol.shp.FetchRequest; import com.nynjacoin.nccs.protocol.shp.Record; import com.nynjacoin.nccs.protocol.shp.UpdatePacket; +import io.vertx.core.Future; import io.vertx.grpc.GrpcWriteStream; import java.util.HashMap; import java.util.Map; @@ -54,6 +56,15 @@ public class StateHolder { .fetch(request.getVersion(), request.getLongPoll(), response, subscriberInfo); } + void cancelLongPoll(String topic, Future response, String subscriberId) { + TopicDb topicDb = topicDbs.get(topic); + if (topicDb == null) { + response.complete(); + return; + } + topicDb.cancelLongPoll(response, subscriberId); + } + void publish(String topic, Record record) { getOrCreateTopicDb(topic).publish(record); } diff --git a/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/StateHolderService.java b/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/StateHolderService.java index cef45a635f0bd7f938beb86122635cfc3f4f3991..35280a0982f8bd80217295cbb0bf09916b1cbc5a 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/StateHolderService.java +++ b/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/StateHolderService.java @@ -1,9 +1,12 @@ package com.nynjacoin.nccs.stateholder.service; +import com.nynjacoin.nccs.protocol.def.Void; import com.nynjacoin.nccs.protocol.error.NccsError; +import com.nynjacoin.nccs.protocol.shp.CancelRequest; import com.nynjacoin.nccs.protocol.shp.FetchRequest; import com.nynjacoin.nccs.protocol.shp.Record; import com.nynjacoin.nccs.protocol.shp.UpdatePacket; +import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.grpc.GrpcWriteStream; import java.util.HashMap; @@ -62,7 +65,24 @@ public class StateHolderService { FetchRequest request, GrpcWriteStream response) { getOrCreateAndAuthorizeStateHolder(accountId, instanceId, request.getTopic()) - .fetch(request, response, accountId + '@' + request.hashCode()); + .fetch(request, response, getSubscriberId(accountId, request)); + } + + public void cancelLongPoll( + String accountId, + String instanceId, + CancelRequest request, + Future response) { + + StateHolder stateHolder = stateHolders.get(instanceId); + if (stateHolder == null) { + response.complete(); + return; + } + stateHolder.cancelLongPoll( + request.getTopic(), + response, + getSubscriberId(accountId, request.getClientId())); } public void publish(String instanceId, String topic, Record record) { @@ -77,6 +97,18 @@ public class StateHolderService { return longPollMaxTime; } + private String getSubscriberId(String accountId, FetchRequest request) { + String clientId = request.getClientId(); + if (clientId.isEmpty()) { + clientId = Integer.toString(request.hashCode()); + } + return getSubscriberId(accountId, clientId); + } + + private String getSubscriberId(String accountId, String clientId) { + return accountId + '@' + clientId; + } + private StateHolder getOrCreateStateHolder(String instanceId, String topic) { StateHolder stateHolder = stateHolders.get(instanceId); if (stateHolder == null) { diff --git a/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/StateHolderVerticle.java b/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/StateHolderVerticle.java index 2ae73f07b9160f0162c190c05e81659ffc0a5805..9cab5161d1dec4c2779e1370016bf8afdaa7f945 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/StateHolderVerticle.java +++ b/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/StateHolderVerticle.java @@ -1,59 +1,24 @@ package com.nynjacoin.nccs.stateholder.service; -import com.nynjacoin.nccs.lib.grpcutil.GrpcContextKeys; +import com.nynjacoin.nccs.lib.vertxdispatch.NccsVerticle; import com.nynjacoin.nccs.protocol.def.Void; -import com.nynjacoin.nccs.protocol.shp.FetchRequest; import com.nynjacoin.nccs.protocol.shp.Record; -import com.nynjacoin.nccs.protocol.shp.UpdatePacket; -import io.vertx.core.AbstractVerticle; -import io.vertx.core.Context; import io.vertx.core.Future; -import io.vertx.core.Vertx; -import io.vertx.grpc.GrpcWriteStream; import org.apache.logging.log4j.CloseableThreadContext; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.ThreadContext; -public class StateHolderVerticle extends AbstractVerticle { +public class StateHolderVerticle extends NccsVerticle { - private static final Logger LOGGER = LogManager.getLogger(StateHolderVerticle.class); - - private final StateHolderService stateHolderService; - private Context verticleContext; + public final StateHolderService stateHolderService; public StateHolderVerticle(long expireInterval, long longPollMaxTime) { stateHolderService = new StateHolderService(expireInterval, longPollMaxTime); } - @Override - public void start() { - LOGGER.info("Starting ..."); - verticleContext = Vertx.currentContext(); - stateHolderService.start(); - } - - @Override - public void stop() { - stateHolderService.stop(); - LOGGER.info("Stopped."); - } - - public void fetch(String accountId, String instanceId, - FetchRequest request, GrpcWriteStream response) { - - final var logContext = GrpcContextKeys.LOG_CONTEXT.get(); - verticleContext.runOnContext(v -> { - try (final CloseableThreadContext.Instance tc = CloseableThreadContext.putAll(logContext)) { - stateHolderService.fetch(accountId, instanceId, request, response); - } - }); - } - public Future publish(String instanceId, String topic, Record record) { Future f = Future.future(); final var logContext = ThreadContext.getContext(); - verticleContext.runOnContext(v -> { + context.runOnContext(v -> { try (final CloseableThreadContext.Instance tc = CloseableThreadContext.putAll(logContext)) { stateHolderService.publish(instanceId, topic, record); f.complete(Void.getDefaultInstance()); @@ -66,7 +31,7 @@ public class StateHolderVerticle extends AbstractVerticle { public void recant(String instanceId, String topic, String key) { final var logContext = ThreadContext.getContext(); - verticleContext.runOnContext(v -> { + context.runOnContext(v -> { try (final CloseableThreadContext.Instance tc = CloseableThreadContext.putAll(logContext)) { stateHolderService.recant(instanceId, topic, key); } @@ -76,7 +41,7 @@ public class StateHolderVerticle extends AbstractVerticle { public Future createTopic(String instanceId, String topic) { Future f = Future.future(); final var logContext = ThreadContext.getContext(); - verticleContext.runOnContext(v -> { + context.runOnContext(v -> { try (final CloseableThreadContext.Instance tc = CloseableThreadContext.putAll(logContext)) { stateHolderService.createTopic(instanceId, topic); f.complete(Void.getDefaultInstance()); @@ -89,7 +54,7 @@ public class StateHolderVerticle extends AbstractVerticle { public void discardTopic(String instanceId, String topic) { final var logContext = ThreadContext.getContext(); - verticleContext.runOnContext(v -> { + context.runOnContext(v -> { try (final CloseableThreadContext.Instance tc = CloseableThreadContext.putAll(logContext)) { stateHolderService.discardTopic(instanceId, topic); } diff --git a/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/Topic.java b/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/Topic.java index 75c7c63cb872eea340140e8c40183dd193303e1c..0fe80893df907bfb41af49c08d4c09a14e1a1b44 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/Topic.java +++ b/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/Topic.java @@ -6,6 +6,7 @@ public enum Topic { USER_ACTIVITY("user-activity", "account", true), @Deprecated USER_CONFERENCE("user-conference", "account", true), + ROOM("room", "room", true), ; private String wireName; diff --git a/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/TopicDb.java b/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/TopicDb.java index 1f006e7d392b27bf02259f2a44cfe4b4144b50d2..129dddb8fa7ad6e2e62e001b5881e428d1f33bfd 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/TopicDb.java +++ b/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/TopicDb.java @@ -2,6 +2,7 @@ package com.nynjacoin.nccs.stateholder.service; import com.google.protobuf.TextFormat; import com.nynjacoin.nccs.lib.util.InstanceLogger; +import com.nynjacoin.nccs.protocol.def.Void; import com.nynjacoin.nccs.protocol.error.NccsError; import com.nynjacoin.nccs.protocol.shp.Deletion; import com.nynjacoin.nccs.protocol.shp.EndMarker; @@ -10,6 +11,7 @@ import com.nynjacoin.nccs.protocol.shp.StartMarker; import com.nynjacoin.nccs.protocol.shp.UpdatePacket; import com.nynjacoin.nccs.protocol.shp.UpdateType; import com.nynjacoin.nccs.protocol.shp.Version; +import io.vertx.core.Future; import io.vertx.grpc.GrpcWriteStream; import java.util.ArrayDeque; import java.util.ArrayList; @@ -96,29 +98,40 @@ public class TopicDb { Version version, boolean longPoll, GrpcWriteStream response, - String subsInfo) { + String subsId) { logger.debug("{} fetch version: {{}} longPoll: {}", - subsInfo, TextFormat.shortDebugString(version), longPoll); + subsId, TextFormat.shortDebugString(version), longPoll); logNextVersion(); if (!version.getGeneration().equals(generation)) { - sendFullUpdate(response, subsInfo); + sendFullUpdate(response, subsId); } else if (version.getSequence() > nextDelta.getSequence()) { - logger.warn("{} invalid version sequence", subsInfo); + logger.warn("{} invalid version sequence", subsId); logNextVersion(); response.fail(NccsError.INVALID_ARGUMENT.exception("Invalid version sequence")); } else if (version.getSequence() == nextDelta.getSequence()) { nextDelta.releaseReference(); if (longPoll) { - registerSubscriber(response, subsInfo); + registerSubscriber(response, subsId); } else { - sendEmptyUpdate(response, subsInfo); + sendEmptyUpdate(response, subsId); } } else { - fetchDelta(version.getSequence(), response, subsInfo); + fetchDelta(version.getSequence(), response, subsId); } } + void cancelLongPoll(Future response, String subsId) { + List toCancel = subscribers.stream() + .filter(subscriber -> subsId.equals(subscriber.getSubsId())) + .collect(Collectors.toList()); + toCancel.forEach(subscriber -> { + subscribers.remove(subscriber); + sendSpecialUpdate(subscriber, UpdateType.POLL_CANCEL); + }); + response.complete(); + } + void publish(Record record) { logger.debug("publish {}", record); logNextVersion(); @@ -155,7 +168,7 @@ public class TopicDb { .collect(Collectors.toList()); expired.forEach(subscriber -> { subscribers.remove(subscriber); - sendKeepAliveUpdate(subscriber); + sendSpecialUpdate(subscriber, UpdateType.KEEP_ALIVE); }); } @@ -209,15 +222,15 @@ public class TopicDb { badSubscribers.forEach(subscribers::remove); } - private void registerSubscriber(GrpcWriteStream response, String subsInfo) { - TopicSubscriber topicSubscriber = new TopicSubscriber(response, subsInfo); + private void registerSubscriber(GrpcWriteStream response, String subsId) { + TopicSubscriber topicSubscriber = new TopicSubscriber(response, subsId); logger.info("Subscriber added: {}", topicSubscriber); subscribers.add(topicSubscriber); } - private void fetchDelta(int sequence, GrpcWriteStream response, String subsInfo) { + private void fetchDelta(int sequence, GrpcWriteStream response, String subsId) { if (deltas.isEmpty() || sequence < deltas.getLast().getSequence()) { - sendFullUpdate(response, subsInfo); + sendFullUpdate(response, subsId); return; } @@ -233,7 +246,7 @@ public class TopicDb { } if (versionDelta == null) { - sendFullUpdate(response, subsInfo); + sendFullUpdate(response, subsId); return; } @@ -243,19 +256,19 @@ public class TopicDb { sendDeltas(it.next(), response); } response.write(createEndMarker()); - logger.info("{} got delta", subsInfo); + logger.info("{} got delta", subsId); logNextVersion(); response.end(); } - private void sendFullUpdate(GrpcWriteStream response, String subsInfo) { + private void sendFullUpdate(GrpcWriteStream response, String subsId) { response.write(createStartMarker(UpdateType.FULL)); records.values().forEach(record -> { UpdatePacket updatePacket = createUpdate(record); response.write(updatePacket); }); response.write(createEndMarker()); - logger.info("{} got full", subsInfo); + logger.info("{} got full", subsId); logNextVersion(); response.end(); } @@ -266,20 +279,21 @@ public class TopicDb { ); } - private void sendEmptyUpdate(GrpcWriteStream response, String subsInfo) { + private void sendEmptyUpdate(GrpcWriteStream response, String subsId) { response.write(createStartMarker(UpdateType.INCREMENTAL)); response.write(createEndMarker()); - logger.info("req: {} got empty", subsInfo); + logger.info("{} got empty", subsId); logNextVersion(); response.end(); } - private void sendKeepAliveUpdate(TopicSubscriber subscriber) { + private void sendSpecialUpdate(TopicSubscriber subscriber, UpdateType type) { + assert type == UpdateType.KEEP_ALIVE || type == UpdateType.POLL_CANCEL; try { var response = subscriber.getResponseStream(); - response.write(createStartMarker(UpdateType.KEEP_ALIVE)); + response.write(createStartMarker(type)); response.write(createEndMarker()); - logger.info("{} got keep-alive", subscriber); + logger.info("{} got {}", subscriber.getSubsId(), type); logNextVersion(); response.end(); } catch (Throwable ignored) { diff --git a/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/TopicSubscriber.java b/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/TopicSubscriber.java index f854dadf678c0b01a01f4a5ffe4ee83a690d26cc..82eb6fb3430211b344853926b0c102ef75923902 100644 --- a/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/TopicSubscriber.java +++ b/focus/src/main/java/com/nynjacoin/nccs/stateholder/service/TopicSubscriber.java @@ -2,29 +2,34 @@ package com.nynjacoin.nccs.stateholder.service; import com.nynjacoin.nccs.protocol.shp.UpdatePacket; import io.vertx.grpc.GrpcWriteStream; -import java.util.Date; public class TopicSubscriber { private final GrpcWriteStream responseStream; - private final long requestTime; - private final String logId; + private final String subsId; + private final long requestTime; + private final String logId; - TopicSubscriber(GrpcWriteStream responseStream, String subsInfo) { - this.responseStream = responseStream; - this.requestTime = System.currentTimeMillis(); - this.logId = subsInfo + '/' + String.valueOf(requestTime); - } + TopicSubscriber(GrpcWriteStream responseStream, String subsId) { + this.responseStream = responseStream; + this.subsId = subsId; + this.requestTime = System.currentTimeMillis(); + this.logId = subsId + '/' + String.valueOf(requestTime); + } - GrpcWriteStream getResponseStream() { - return responseStream; - } + GrpcWriteStream getResponseStream() { + return responseStream; + } - long getRequestTime() { - return requestTime; - } + long getRequestTime() { + return requestTime; + } - @Override - public String toString() { - return logId; - } + @Override + public String toString() { + return logId; + } + + String getSubsId() { + return subsId; + } } diff --git a/focus/src/main/java/com/nynjacoin/nccs/stateholder/statepackage/RoomPublisher.java b/focus/src/main/java/com/nynjacoin/nccs/stateholder/statepackage/RoomPublisher.java new file mode 100644 index 0000000000000000000000000000000000000000..be373782279ce9515cf48d66f087cf83c2bc1185 --- /dev/null +++ b/focus/src/main/java/com/nynjacoin/nccs/stateholder/statepackage/RoomPublisher.java @@ -0,0 +1,32 @@ +package com.nynjacoin.nccs.stateholder.statepackage; + +import com.nynjacoin.nccs.protocol.def.Void; +import com.nynjacoin.nccs.protocol.sp.conference.RoomConferenceRecord; +import com.nynjacoin.nccs.protocol.sp.conference.RoomRecord; +import com.nynjacoin.nccs.stateholder.service.StateHolderVerticle; +import com.nynjacoin.nccs.stateholder.service.Topic; +import io.vertx.core.Future; + +public class RoomPublisher extends StatePublisher { + + public static final Topic TOPIC = Topic.ROOM; + public static final String ROOM_CONFERENCE_KEY = "room-conference"; + + public RoomPublisher(StateHolderVerticle stateHolderVerticle) { + super(stateHolderVerticle, TOPIC); + } + + public Future publishConference( + String roomId, + RoomConferenceRecord.Builder roomConference) { + + return publish(roomId, ROOM_CONFERENCE_KEY, + RoomRecord.newBuilder() + .setConference(roomConference) + .build()); + } + + public void discardConference(String roomId) { + discard(roomId, ROOM_CONFERENCE_KEY); + } +} diff --git a/grpcurl-commands.txt b/grpcurl-commands.txt new file mode 100644 index 0000000000000000000000000000000000000000..04c04e913e5ec3b3c0673e1b027570b052c0a969 --- /dev/null +++ b/grpcurl-commands.txt @@ -0,0 +1,37 @@ +grpcurl --proto conference_factory.proto --import-path protocol/public/src/main/proto --plaintext --emit-defaults -d '{"room_id":"some-room-id"}' localhost:41414 nynjacoin.nccs.cfp.ConferenceFactory.GetLimits + +grpcurl -H 'Account-ID: acc1' --proto conference_factory.proto --import-path protocol/public/src/main/proto --plaintext --emit-defaults -d '{}' localhost:41414 nynjacoin.nccs.cfp.ConferenceFactory.CreateConference + +grpcurl -H 'Account-ID: acc1' --proto conference_factory.proto --import-path protocol/public/src/main/proto --plaintext --emit-defaults -d '{"add_member":[{"address":{"value":"peshko"}}, {"address":{"value":"meshko"}}]}' localhost:41414 nynjacoin.nccs.cfp.ConferenceFactory.CreateConference + +grpcurl -H 'Account-ID: acc1' --proto conference_factory.proto --import-path protocol/public/src/main/proto --plaintext --emit-defaults -d '{"create_request":{"add_member":[{"address":{"value":"peshko"}}, {"address":{"value":"meshko"}}]}}' localhost:41414 nynjacoin.nccs.cfp.ConferenceFactory.StartConference + +grpcurl -H 'Account-ID: acc1' --proto conference_factory.proto --import-path protocol/public/src/main/proto --plaintext --emit-defaults -d '{"room_id":"my_room", "create_request":{"add_member":[{"address":{"value":"peshko"}}, {"address":{"value":"meshko"}}]}}' localhost:41414 nynjacoin.nccs.cfp.ConferenceFactory.StartConference + +grpcurl -H 'Account-ID: 37936e07-1930-4317-8719-aadac2524184_2806' -H 'Instance-ID: aXaNH0BUH2DfAVa2Xi9kJ6' --proto conference_control.proto --import-path protocol/public/src/main/proto --plaintext --emit-defaults -d '{}' localhost:41414 nynjacoin.nccs.ccp.Conference.End + +grpcurl -H 'Account-ID: acc1' --proto conference_factory.proto --import-path protocol/public/src/main/proto --plaintext --emit-defaults -d '{"link_id":"some-room-id"}' localhost:41414 nynjacoin.nccs.cfp.ConferenceFactory.GetRoomId + +grpcurl -H 'Account-ID: acc1' --proto conference_factory.proto --import-path protocol/public/src/main/proto --plaintext --emit-defaults -d '{"link_id":"a5ad9dc2"}' localhost:41414 nynjacoin.nccs.cfp.ConferenceFactory.GetRoomId + +grpcurl -H 'Account-ID: 4d13c5cc-a153-4dd3-98e4-effdd69f30c9' --proto conference_factory.proto --import-path protocol/public/src/main/proto --plaintext --emit-defaults -d '{"link_id":"81d5db27"}' localhost:41414 nynjacoin.nccs.cfp.ConferenceFactory.GetRoomId + +// guest enter +grpcurl -H 'Account-ID: 5ef289e7-3c9b-44e2-9dce-637e30f3f02a_2807' --proto conference_factory.proto --import-path protocol/public/src/main/proto --plaintext --emit-defaults -d '{"room_id":"4d13c5cc-a153-4dd3-98e4-effdd69f30c9"}' localhost:41414 nynjacoin.nccs.cfp.ConferenceFactory.EnterRoom + +// host enter +grpcurl -H 'Account-ID: 37936e07-1930-4317-8719-aadac2524184_2806' --proto conference_factory.proto --import-path protocol/public/src/main/proto --plaintext --emit-defaults -d '{"room_id":"4d13c5cc-a153-4dd3-98e4-effdd69f30c9"}' localhost:41414 nynjacoin.nccs.cfp.ConferenceFactory.EnterRoom + +// owner and non-owner hosts enter +grpcurl -H 'Account-ID: 37936e07-1930-4317-8719-aadac2524184_2806' --proto conference_factory.proto --import-path protocol/public/src/main/proto --plaintext --emit-defaults -d '{"room_id":"d57061b1-3313-43bb-b745-7612677d3195"}' localhost:41414 nynjacoin.nccs.cfp.ConferenceFactory.EnterRoom +grpcurl -H 'Account-ID: 5ef289e7-3c9b-44e2-9dce-637e30f3f02a_2807' --proto conference_factory.proto --import-path protocol/public/src/main/proto --plaintext --emit-defaults -d '{"room_id":"d57061b1-3313-43bb-b745-7612677d3195"}' localhost:41414 nynjacoin.nccs.cfp.ConferenceFactory.EnterRoom + + +ilia account id: +37936e07-1930-4317-8719-aadac2524184_2806 + +ilia_email account id: +5ef289e7-3c9b-44e2-9dce-637e30f3f02a_2807 + +4d13c5cc-a153-4dd3-98e4-effdd69f30c9, 81d5db27 - ilia +d57061b1-3313-43bb-b745-7612677d3195, 1f3869fe - ilia, ilia_email diff --git a/lib/grpc-util/src/main/java/com/nynjacoin/nccs/lib/grpcutil/GrpcWriteStreamEventSourceAdapter.java b/lib/grpc-util/src/main/java/com/nynjacoin/nccs/lib/grpcutil/GrpcWriteStreamEventSourceAdapter.java index 8e81401d2d7e8cb0f30e1d971c1ddd2b03d1c0ea..0efd13d747982ff978265f857fbf52f1254fdebf 100644 --- a/lib/grpc-util/src/main/java/com/nynjacoin/nccs/lib/grpcutil/GrpcWriteStreamEventSourceAdapter.java +++ b/lib/grpc-util/src/main/java/com/nynjacoin/nccs/lib/grpcutil/GrpcWriteStreamEventSourceAdapter.java @@ -1,12 +1,16 @@ package com.nynjacoin.nccs.lib.grpcutil; import com.google.protobuf.GeneratedMessageV3; +import com.nynjacoin.nccs.protocol.error.NccsWebEndpointException; import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.stub.StreamObserver; import io.vertx.core.Context; import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.ext.web.RoutingContext; import io.vertx.grpc.GrpcWriteStream; +import java.io.IOException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -18,6 +22,20 @@ public class GrpcWriteStreamEventSourceAdapter GrpcWriteStream create(RoutingContext ctx) + throws NccsWebEndpointException { + + WebEventTarget target = new WebEventTarget(ctx); + + try { + target.open(); + } catch (IOException e) { + throw new NccsWebEndpointException("Error while opening event source!", e); + } + + return new GrpcWriteStreamEventSourceAdapter<>(target, Vertx.currentContext()); + } + public GrpcWriteStreamEventSourceAdapter(WebEventTarget target, Context vertxContext) { this.target = target; this.vertxContext = vertxContext; diff --git a/lib/grpc-util/src/main/java/com/nynjacoin/nccs/lib/grpcutil/PathHandler.java b/lib/grpc-util/src/main/java/com/nynjacoin/nccs/lib/grpcutil/PathHandler.java new file mode 100644 index 0000000000000000000000000000000000000000..3ed872b23f80f96153796261d533a480db02c0dc --- /dev/null +++ b/lib/grpc-util/src/main/java/com/nynjacoin/nccs/lib/grpcutil/PathHandler.java @@ -0,0 +1,58 @@ +package com.nynjacoin.nccs.lib.grpcutil; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.nynjacoin.nccs.protocol.error.NccsWebEndpointException; +import io.vertx.core.Handler; +import io.vertx.ext.web.Router; +import io.vertx.ext.web.RoutingContext; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class PathHandler { + + private static Logger LOGGER = LogManager.getLogger(PathHandler.class); + + public static void route(Router router, String path, WebApiCallConsumer consumer) { + router.route(path).handler(compose(path, consumer)); + } + + public static void routeSSE(Router router, String path, WebApiCallConsumer consumer) { + router.route(path).handler(composeSSE(path, consumer)); + } + + private static Handler compose(String path, WebApiCallConsumer consumer) { + var invoker = new ConsumerInvoker(path, consumer); + return ctx -> ctx.request().bodyHandler( + bodyBuffer -> invoker.invoke(ctx, bodyBuffer.getBytes()) + ); + } + + private static Handler composeSSE(String path, WebApiCallConsumer consumer) { + var invoker = new ConsumerInvoker(path, consumer); + return ctx -> { + byte[] body = WebEventTarget.getEventSourceRequestBody(ctx.request().params()); + invoker.invoke(ctx, body); + }; + } + + private static class ConsumerInvoker { + private final String path; + private final WebApiCallConsumer consumer; + + private ConsumerInvoker(String path, WebApiCallConsumer consumer) { + this.path = path; + this.consumer = consumer; + } + + void invoke(RoutingContext ctx, byte[] body) { + try { + consumer.accept(ctx, body); + } catch (InvalidProtocolBufferException e) { + LOGGER.warn("{}: body is not a valid proto.3: {}", path, body); + } catch (NccsWebEndpointException e) { + LOGGER.error("{}: failure processing request: {}", path, body, e); + } + } + } + +} diff --git a/lib/grpc-util/src/main/java/com/nynjacoin/nccs/lib/grpcutil/WebApiCallConsumer.java b/lib/grpc-util/src/main/java/com/nynjacoin/nccs/lib/grpcutil/WebApiCallConsumer.java new file mode 100644 index 0000000000000000000000000000000000000000..a47b18756d4cf30f3604bd83af3a4b5e35daac2e --- /dev/null +++ b/lib/grpc-util/src/main/java/com/nynjacoin/nccs/lib/grpcutil/WebApiCallConsumer.java @@ -0,0 +1,13 @@ +package com.nynjacoin.nccs.lib.grpcutil; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.nynjacoin.nccs.protocol.error.NccsWebEndpointException; +import io.vertx.core.buffer.Buffer; +import io.vertx.ext.web.RoutingContext; + +public interface WebApiCallConsumer { + + void accept(RoutingContext ctx, byte[] body) + throws InvalidProtocolBufferException, NccsWebEndpointException; + +} diff --git a/lib/grpc-util/src/main/java/com/nynjacoin/nccs/lib/grpcutil/WebResponseHandler.java b/lib/grpc-util/src/main/java/com/nynjacoin/nccs/lib/grpcutil/WebResponseHandler.java index 604983181225aa89a2a8e0640e021aa3d4dc4a4f..b138dab9a20caebb6d1668e45cf7f8e639be8865 100644 --- a/lib/grpc-util/src/main/java/com/nynjacoin/nccs/lib/grpcutil/WebResponseHandler.java +++ b/lib/grpc-util/src/main/java/com/nynjacoin/nccs/lib/grpcutil/WebResponseHandler.java @@ -26,7 +26,7 @@ public class WebResponseHandler implements Handler private final RoutingContext ctx; - public WebResponseHandler(RoutingContext ctx) { + private WebResponseHandler(RoutingContext ctx) { this.ctx = ctx; } diff --git a/lib/nccs-config/src/main/java/com/nynjacoin/nccs/lib/config/ConfigKey.java b/lib/nccs-config/src/main/java/com/nynjacoin/nccs/lib/config/ConfigKey.java index 008c967ac4cd770247e508ff353c1e1bbf134f87..9b519fe7b0aa77bc6730bb8df61849c483938595 100644 --- a/lib/nccs-config/src/main/java/com/nynjacoin/nccs/lib/config/ConfigKey.java +++ b/lib/nccs-config/src/main/java/com/nynjacoin/nccs/lib/config/ConfigKey.java @@ -51,6 +51,12 @@ public enum ConfigKey { CHAT_ROOM_RETRY_AFTER_MILLISECONDS("chat_room_retry_after_milliseconds"), CHAT_ROOM_PER_REQUEST_POOL_SIZE("chat_room_per_request_pool_size"), CHAT_ROOM_KEY("chat_room_key"), + + ROOM_LINK_URL("room_link_url"), + ROOM_LINK_MAX_RETRIES("room_link_max_retries"), + ROOM_LINK_RETRY_AFTER_MILLISECONDS("room_link_retry_after_milliseconds"), + ROOM_LINK_PER_REQUEST_POOL_SIZE("room_link_per_request_pool_size"), + ROOM_LINK_KEY("room_link_key"), ; private final String key; diff --git a/lib/vertx-dispatch/src/main/java/com/nynjacoin/nccs/lib/vertxdispatch/NccsVerticle.java b/lib/vertx-dispatch/src/main/java/com/nynjacoin/nccs/lib/vertxdispatch/NccsVerticle.java index 2d1dc5b749c6b97f9f656e8d47a452972846a200..3455eb1bf8db5f79d2883ea33cacf6f1b11ba0f0 100644 --- a/lib/vertx-dispatch/src/main/java/com/nynjacoin/nccs/lib/vertxdispatch/NccsVerticle.java +++ b/lib/vertx-dispatch/src/main/java/com/nynjacoin/nccs/lib/vertxdispatch/NccsVerticle.java @@ -3,7 +3,7 @@ package com.nynjacoin.nccs.lib.vertxdispatch; import com.nynjacoin.nccs.protocol.error.NccsException; import io.vertx.core.AbstractVerticle; import io.vertx.core.Future; -import io.vertx.grpc.GrpcBidiExchange; +import io.vertx.grpc.GrpcWriteStream; import java.util.Map; import org.apache.logging.log4j.CloseableThreadContext; import org.apache.logging.log4j.ThreadContext; @@ -24,20 +24,20 @@ public class NccsVerticle extends AbstractVerticle { }); } - public void runOnContext(Runnable action, GrpcBidiExchange exchange) { - runOnContext(action, exchange, ThreadContext.getContext()); + public void runOnContext(Runnable action, GrpcWriteStream response) { + runOnContext(action, response, ThreadContext.getContext()); } public void runOnContext( Runnable action, - GrpcBidiExchange exchange, + GrpcWriteStream response, Map logContext) { context.runOnContext(v -> { try (final CloseableThreadContext.Instance tc = CloseableThreadContext.putAll(logContext)) { action.run(); } catch (NccsException e) { - exchange.fail(e); + response.fail(e); } }); } diff --git a/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/HttpException.java b/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/HttpException.java new file mode 100644 index 0000000000000000000000000000000000000000..62cdad18ade0147266398eb7db0732e86166ba6e --- /dev/null +++ b/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/HttpException.java @@ -0,0 +1,19 @@ +package com.nynjacoin.nccs.lib.vertxutil.restclient; + +public class HttpException extends RuntimeException { + private final int statusCode; + private final String statusMessage; + + public HttpException(int statusCode, String statusMessage) { + this.statusCode = statusCode; + this.statusMessage = statusMessage; + } + + public int getStatusCode() { + return statusCode; + } + + public String getStatusMessage() { + return statusMessage; + } +} diff --git a/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/JsonHttpException.java b/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/JsonHttpException.java new file mode 100644 index 0000000000000000000000000000000000000000..fdac0d9cd55df52cc6676de1841aed23b87f9880 --- /dev/null +++ b/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/JsonHttpException.java @@ -0,0 +1,17 @@ +package com.nynjacoin.nccs.lib.vertxutil.restclient; + +import io.vertx.core.json.JsonObject; + +public class JsonHttpException extends HttpException { + + private final JsonObject body; + + public JsonHttpException(int statusCode, String statusMessage, JsonObject body) { + super(statusCode, statusMessage); + this.body = body; + } + + public JsonObject getBody() { + return body; + } +} diff --git a/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/RetryExecutor.java b/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/RetryExecutor.java index bfd5646fe0bb11e30841e3e4a63f78a0356fd03c..a4e5bf2fcc647dc097ade66a0a2f7333fd957ee0 100644 --- a/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/RetryExecutor.java +++ b/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/RetryExecutor.java @@ -169,6 +169,13 @@ public class RetryExecutor { if (ar.failed()) { return ar.cause(); } + if (ar.result() != null) { + JsonObject body = ar.result().body(); + if (body == null) { + return new HttpException(ar.result().statusCode(), ar.result().statusMessage()); + } + return new JsonHttpException(ar.result().statusCode(), ar.result().statusMessage(), body); + } return new RuntimeException("HTTP error: " + getError(ar)); } diff --git a/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/chatroom/ChatRoomClient.java b/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/chatroom/ChatRoomClient.java index d88b0512d46b8549be12f841ab1edced98390cc0..1372c936ec58b5f19aa6887fff2c7de3a2d0070f 100644 --- a/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/chatroom/ChatRoomClient.java +++ b/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/chatroom/ChatRoomClient.java @@ -86,6 +86,10 @@ public class ChatRoomClient extends RestClientBase { return result; } + public Future isMember(String ownerAccountId, String roomId, String accountId) { + return areAllMembers(ownerAccountId, roomId, List.of(accountId)); + } + public Future areAllMembers(String ownerAccountId, String roomId, List accountIds) { var request = getWebClient() .getAbs(membersPath) diff --git a/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/roomlink/RoomInfo.java b/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/roomlink/RoomInfo.java new file mode 100644 index 0000000000000000000000000000000000000000..9ec4fe089a542a0508520055b23af4137f5ec181 --- /dev/null +++ b/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/roomlink/RoomInfo.java @@ -0,0 +1,20 @@ +package com.nynjacoin.nccs.lib.vertxutil.restclient.roomlink; + +public class RoomInfo { + + private final String id; + private final String name; + + RoomInfo(String id, String name) { + this.id = id; + this.name = name; + } + + public String getId() { + return id; + } + + public String getName() { + return name; + } +} diff --git a/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/roomlink/RoomLinkClient.java b/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/roomlink/RoomLinkClient.java new file mode 100644 index 0000000000000000000000000000000000000000..df8d248fc17c4619a736236fcd62d7a4c2b42727 --- /dev/null +++ b/lib/vertx-util/src/main/java/com/nynjacoin/nccs/lib/vertxutil/restclient/roomlink/RoomLinkClient.java @@ -0,0 +1,83 @@ +package com.nynjacoin.nccs.lib.vertxutil.restclient.roomlink; + +import com.nynjacoin.nccs.lib.config.ConfigKey; +import com.nynjacoin.nccs.lib.vertxutil.restclient.AuthenticationHandler; +import com.nynjacoin.nccs.lib.vertxutil.restclient.JsonHttpException; +import com.nynjacoin.nccs.lib.vertxutil.restclient.RestClientBase; +import com.nynjacoin.nccs.lib.vertxutil.restclient.RetryExecutor; +import com.nynjacoin.nccs.lib.vertxutil.restclient.ValidatingJsonObject; +import com.nynjacoin.nccs.lib.vertxutil.restclient.WebClientFactory; +import com.nynjacoin.nccs.protocol.error.NccsError; +import io.vertx.core.Future; +import io.vertx.core.Vertx; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.client.WebClient; +import java.util.Objects; + +public class RoomLinkClient extends RestClientBase { + + private static final String FIELD_ROOM_ID = "room_id"; + private static final String FIELD_ROOM_NAME = "room_name"; + + public static RoomLinkClient create(Vertx vertx, JsonObject config) { + int poolSize = config.getInteger(ConfigKey.ROOM_LINK_PER_REQUEST_POOL_SIZE.key(), 32); + WebClient webClient = WebClientFactory.create(vertx, poolSize); + + String auth = config.getString(ConfigKey.ROOM_LINK_KEY.key()); + AuthenticationHandler authenticationHandler = AuthenticationHandler.create(auth); + + RetryExecutor retryExecutor = RetryExecutor.newBuilder(vertx) + .setOrDefaultMaxRetries(config.getInteger( + ConfigKey.ROOM_LINK_MAX_RETRIES.key())) + .setOrDefaultRetryMilliseconds(config.getLong( + ConfigKey.ROOM_LINK_RETRY_AFTER_MILLISECONDS.key())) + .build(); + + String rootUrl = config.getString(ConfigKey.ROOM_LINK_URL.key()); + Objects.requireNonNull(rootUrl); + + return new RoomLinkClient(webClient, authenticationHandler, retryExecutor, rootUrl); + } + + private RoomLinkClient(WebClient webClient, + AuthenticationHandler authenticationHandler, + RetryExecutor retryExecutor, + String rootUrl) { + + super(webClient, authenticationHandler, retryExecutor, rootUrl); + } + + public Future getRoomInfo(String linkId) { + var request = getWebClient().getAbs(getRoomUrl(linkId)); + request = getAuthenticationHandler().apply(request); + var response = getRetryExecutor().execute(request); + return response + .map(rawBody -> { + ValidatingJsonObject body = ValidatingJsonObject.wrap(rawBody); + return new RoomInfo( + body.getRequiredString(FIELD_ROOM_ID), + rawBody.getString(FIELD_ROOM_NAME, "")); + }) + .recover(cause -> Future.failedFuture(translateLinkNotFound(cause))); + } + + private static Throwable translateLinkNotFound(Throwable cause) { + if (cause instanceof JsonHttpException) { + JsonHttpException jsonHttpException = (JsonHttpException) cause; + if (jsonHttpException.getStatusCode() == 404) { + JsonObject body = jsonHttpException.getBody(); + try { + if ("not_found".equals(body.getString("error"))) { + return NccsError.INVALID_ARGUMENT.exception("Invalid link_id"); + } + } catch (Exception ignored) { + } + } + } + return cause; + } + + private String getRoomUrl(String linkId) { + return getRootUrl() + '/' + linkId + "/room_id"; + } +} diff --git a/lib/vertx-util/src/test/java/com/nynjacoin/nccs/lib/vertxutil/restclient/chatroom/ChatRoomClientManualTest.java b/lib/vertx-util/src/test/java/com/nynjacoin/nccs/lib/vertxutil/restclient/chatroom/ChatRoomClientManualTest.java index 32f408d12f563a4a77c9aa46eb405b0af4990f5e..8ee8a26fb65efbd11542b79310dc046a1a16ed76 100644 --- a/lib/vertx-util/src/test/java/com/nynjacoin/nccs/lib/vertxutil/restclient/chatroom/ChatRoomClientManualTest.java +++ b/lib/vertx-util/src/test/java/com/nynjacoin/nccs/lib/vertxutil/restclient/chatroom/ChatRoomClientManualTest.java @@ -18,7 +18,7 @@ import org.junit.Test; import org.junit.runner.RunWith; @RunWith(VertxUnitRunner.class) -//@Ignore +@Ignore public class ChatRoomClientManualTest { private ChatRoomClient chatRoomClient; diff --git a/protocol/public/src/main/java/com/nynjacoin/nccs/protocol/error/NccsError.java b/protocol/public/src/main/java/com/nynjacoin/nccs/protocol/error/NccsError.java index 439e0af4ec328338bf73293e3d91d43e01712063..70af5dfb0879d3e935c4992e31bfc47b924028e2 100644 --- a/protocol/public/src/main/java/com/nynjacoin/nccs/protocol/error/NccsError.java +++ b/protocol/public/src/main/java/com/nynjacoin/nccs/protocol/error/NccsError.java @@ -10,10 +10,15 @@ public enum NccsError { FAILED_PRECONDITION(Status.FAILED_PRECONDITION), UNIMPLEMENTED(Status.UNIMPLEMENTED), + SERVICE_UNAVAILABLE_RETRY( + "service-unavailable-retry", + Status.DEADLINE_EXCEEDED, + "Operation failed, please retry"), BAD_PROTO3("bad-proto3", Status.INVALID_ARGUMENT, "Failed to parse proto.3 message"), CONFERENCE_NOT_FOUND("conf-not-found", Status.NOT_FOUND, "Conference not found"), CONFERENCE_BAD_STATE("conf-bad-state", Status.FAILED_PRECONDITION, "Invalid conference state"), CONFERENCE_MEMBERS_LIMIT_REACHED("conf-max-members", Status.FAILED_PRECONDITION, "Conference members limit reached"), + CONFERENCE_ROOM_BUSY("conf-room-busy", Status.FAILED_PRECONDITION, "The room already hosts a conference"), LINK_NOT_FOUND("link-not-found", Status.NOT_FOUND, "Unknown link id"), LINK_EXPIRED("link-expired", Status.OUT_OF_RANGE, "Link expired"), MEMBER_NOT_FOUND("conf-member-not-found", Status.NOT_FOUND, "Member not found"), @@ -64,4 +69,5 @@ public enum NccsError { } public String getNccsReason() { return nccsReason; } + } diff --git a/protocol/public/src/main/java/com/nynjacoin/nccs/protocol/error/NccsWebEndpointException.java b/protocol/public/src/main/java/com/nynjacoin/nccs/protocol/error/NccsWebEndpointException.java new file mode 100644 index 0000000000000000000000000000000000000000..47edd3f364dff909f015572c6cb37411ac57a1fd --- /dev/null +++ b/protocol/public/src/main/java/com/nynjacoin/nccs/protocol/error/NccsWebEndpointException.java @@ -0,0 +1,7 @@ +package com.nynjacoin.nccs.protocol.error; + +public class NccsWebEndpointException extends Exception { + public NccsWebEndpointException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/protocol/public/src/main/java/com/nynjacoin/nccs/protocol/metadata/CommonMetadataKey.java b/protocol/public/src/main/java/com/nynjacoin/nccs/protocol/metadata/CommonMetadataKey.java index 86e184dc7bf8ccd50697d22d123caf9ae3cc284b..5e63b96735d86077d614193710ccdd72dafbce71 100644 --- a/protocol/public/src/main/java/com/nynjacoin/nccs/protocol/metadata/CommonMetadataKey.java +++ b/protocol/public/src/main/java/com/nynjacoin/nccs/protocol/metadata/CommonMetadataKey.java @@ -1,6 +1,5 @@ package com.nynjacoin.nccs.protocol.metadata; -import io.grpc.Context; import io.grpc.Metadata; import io.grpc.Metadata.Key; diff --git a/protocol/public/src/main/java/com/nynjacoin/nccs/protocol/validation/NccsValidator.java b/protocol/public/src/main/java/com/nynjacoin/nccs/protocol/validation/NccsValidator.java new file mode 100644 index 0000000000000000000000000000000000000000..c2d7f3871495bdb473ae6261ae284c9f127b8c9a --- /dev/null +++ b/protocol/public/src/main/java/com/nynjacoin/nccs/protocol/validation/NccsValidator.java @@ -0,0 +1,25 @@ +package com.nynjacoin.nccs.protocol.validation; + +import com.nynjacoin.nccs.protocol.error.NccsError; + +public class NccsValidator { + + public static void requireNonNull(String value, String what) { + if (value == null) { + throw NccsError.INVALID_ARGUMENT.exception("Required but missing: " + what); + } + } + + public static void requireNonEmpty(String value, String what) { + if (value == null) { + throw NccsError.INVALID_ARGUMENT.exception("Required non empty value: " + what); + } + } + + public static void checkRequiredHeader(String value, String header) { + if (value == null || value.isEmpty()) { + throw NccsError.INVALID_ARGUMENT.exception("Missing required header: " + header); + } + } + +} diff --git a/protocol/public/src/main/proto b/protocol/public/src/main/proto index 30370c0a2ee2e314c9b3dfac124182f9ab7d3789..e93c9e597e9d4b25f6ab45436183a16aee905d27 160000 --- a/protocol/public/src/main/proto +++ b/protocol/public/src/main/proto @@ -1 +1 @@ -Subproject commit 30370c0a2ee2e314c9b3dfac124182f9ab7d3789 +Subproject commit e93c9e597e9d4b25f6ab45436183a16aee905d27 diff --git a/sandbox/grpc/src/test/java/com/nynjacoin/nccs/sandbox/grpc/NccsClientTest.java b/sandbox/grpc/src/test/java/com/nynjacoin/nccs/sandbox/grpc/NccsClientTest.java index ef5859b3de8272b0aa63b6d8440c05fd664e1fda..99bd3b438f18edbe565b7fa969b6edcb717cefbe 100644 --- a/sandbox/grpc/src/test/java/com/nynjacoin/nccs/sandbox/grpc/NccsClientTest.java +++ b/sandbox/grpc/src/test/java/com/nynjacoin/nccs/sandbox/grpc/NccsClientTest.java @@ -1,5 +1,5 @@ package com.nynjacoin.nccs.sandbox.grpc; - +/* import com.nynjacoin.nccs.protocol.ccp.AddMemberRequest; import com.nynjacoin.nccs.protocol.ccp.JoinResponse; import com.nynjacoin.nccs.protocol.chp.Range; @@ -364,3 +364,4 @@ public class NccsClientTest extends BaseTest { } } +*/