From 5a18a3e76f29842f7afe66be911139b81ca40547 Mon Sep 17 00:00:00 2001 From: Hans Svensson Date: Thu, 23 Apr 2020 13:25:44 +0200 Subject: [PATCH 1/5] Don't send messages to subscribers until the DB-transaction is done --- apps/roster/src/protocol/roster_message.erl | 216 +++++++++++--------- 1 file changed, 115 insertions(+), 101 deletions(-) diff --git a/apps/roster/src/protocol/roster_message.erl b/apps/roster/src/protocol/roster_message.erl index 00f1c4be3..c56d6d4c1 100644 --- a/apps/roster/src/protocol/roster_message.erl +++ b/apps/roster/src/protocol/roster_message.erl @@ -44,111 +44,125 @@ info(#'Message'{feed_id = #muc{name = To}, to = []} = RequestData, Req, State) w info(#'Message'{status = [], id = [], feed_id = F, from=From0, to = To, type = Type, msg_id = ClientMsgId, files = Descs} = Msg, Req, #cx{client_pid = C, params = ClientId, state=ack} = State) -> - From = case hd(binary:split(ClientId, <<"_">>)) of - <<"sys">> -> From0; - <<"emqttd">> -> roster:phone_id(ClientId) - end, + try + From = case hd(binary:split(ClientId, <<"_">>)) of + <<"sys">> -> From0; + <<"emqttd">> -> roster:phone_id(ClientId) + end, - FId = roster:roster_id(From), - - FeedData = {R, UID} = case Feed = roster:feed_key(F) of - _ when From == <<>> -> #error{code = permission_denied}; - #muc{name = To} -> - case roster:muc_member(From, To) of - #'Member'{status = removed} -> #error{code = permission_denied}; - #'Member'{reader = ReaderId} = Member -> {ReaderId, Member}; - _ -> #error{code = member_not_found} - end; - #p2p{} -> - case roster:get_contact(FId, To) of - #'Contact'{reader = ReaderId, status = friend} = Contact -> - {ReaderId, Contact}; - _ -> #error{code = permission_denied} - end; - _E -> #error{code = permission_denied} - end, - %% Without the transaction there is a race condition between loading the - %% writer and storing the updated writer. This caused the broken message - %% chains observed in NY-7274. - {atomic, IO} = mnesia:transaction(fun() -> - case R of - error -> - case has_flag(Descs, message_ack) of - true -> #io{code = #'MessageErr'{feed_id = F, - msg_id = ClientMsgId, - error = FeedData}}; - false -> #io{code = FeedData} - end; - _ when is_integer(R),R > 0-> - case roster:link_msg(Msg, UID) of - #error{} = Err -> - case has_flag(Descs, message_ack) of - true -> - #'MessageErr'{feed_id = F, - msg_id = ClientMsgId, - error = Err}; - false -> - #io{code = Err} - end; - LnkRes -> - PrevWriter2 = - case {Feed, PrevWriter = kvs_stream:load_writer(Feed)} of - {#p2p{from = From0, to = From0}, #writer{first = [], cache = []}} -> - ClnMsg = roster:desc_id(#'Message'{status = clear, feed_id = Feed, from = From0, to = From0, type = [sys], - files = [#'Desc'{payload = <<"History was removed">>}], created = roster:now_msec()}), - W = kvs_stream:save(kvs_stream:add(PrevWriter#writer{args = ClnMsg})), - roster:send_feed(C, Feed, W#writer.cache), W; - _ -> PrevWriter - end, -%% create message amd save to db - Writer = #writer{cache = #'Message'{id = MsgId} = M} = - kvs_stream:save(kvs_stream:add(PrevWriter2#writer{args = Msg#'Message'{container = chain, - feed_id = Feed, created = roster:now_msec()}})), - case Feed of - #muc{name = Room} -> - {ok, Room2} = kvs:get('Room', Room), - kvs:put(Room2#'Room'{last_msg = MsgId}); - _ -> skip end, - -%% move cursor - case lists:member(cursor, Type) of - false -> kvs_stream:save(roster:offset_reader(Writer, R)); - true -> ok + {R, UID} = + case Feed = roster:feed_key(F) of + _ when From == <<>> -> + throw({error, permission_denied}); + #muc{name = To} -> + case roster:muc_member(From, To) of + #'Member'{status = removed} -> throw({error, permission_denied}); + #'Member'{reader = ReaderId} = Member -> {ReaderId, Member}; + _ -> throw({error, member_not_found}) + end; + #p2p{} -> + FId = roster:roster_id(From), + case roster:get_contact(FId, To) of + #'Contact'{reader = ReaderId, status = friend} = Contact -> + {ReaderId, Contact}; + _ -> + throw({error, permission_denied}) + end; + _E -> + throw({error, permission_denied}) + end, + + if R =< 0 -> throw({error, reader_not_found}); + true -> ok + end, + + + case roster:link_msg(Msg, UID) of + #error{} = Err1 -> + throw(Err1); + LinkRes -> + %% Without the transaction there is a race condition between loading the + %% writer and storing the updated writer. This caused the broken message + %% chains observed in NY-7274. + {atomic, {ToSend, Ack}} = mnesia:transaction(fun() -> + Writer0 = kvs_stream:load_writer(Feed), + {ToSend1, Writer1} = + case {Feed, Writer0} of + %% If this is the first message in the MeChat, add an initial message + {#p2p{from = From0, to = From0}, #writer{first = [], cache = []}} -> + FstMsg = #'Message'{status = clear, feed_id = Feed, + from = From0, to = From0, type = [sys], + files = [#'Desc'{payload = <<"History was removed">>}], + created = roster:now_msec()}, + W1 = kvs_stream:save( + kvs_stream:add(Writer0#writer{args = roster:desc_id(FstMsg)})), + {[W1#writer.cache], W1}; + _ -> + {[], Writer0} end, - LnkRes3 = case LnkRes of - #'Message'{prev = Prev, repliedby = ReplBy} -> - LnkRes2 = LnkRes#'Message'{repliedby = [MsgId | ReplBy], - prev = case Prev of [] -> MsgId; _ -> - Prev end}, %% get valid prev iterator - kvs:put(LnkRes2), LnkRes2; - _ -> LnkRes end, - roster:send_feed(C, F, Msg2 = M#'Message'{link = LnkRes3}), - ?LOG_INFO("~p:Message/new:~p", [From, To]), -%% have to skip push notifications for call bubbles - case [lists:keyfind(BubbleContentType, #'Desc'.mime, Descs) || BubbleContentType <- [?CONTENT_TYPE_VIDEOCALL, ?CONTENT_TYPE_AUDIOCALL]] of - [false, false] -> n2o_async:pid(system, ?MODULE) ! {send_push, From, To, Msg2, []}; - _ -> skip + + %% Create message and write it + Msg1 = Msg#'Message'{container = chain, feed_id = Feed, created = roster:now_msec()}, + Writer2 = #writer{cache = #'Message'{id = MsgId} = Msg2} = + kvs_stream:save(kvs_stream:add(Writer1#writer{args = Msg1})), + + %% If this is a MUC update last_msg + case Feed of + #muc{name = RoomId} -> + {ok, Room} = kvs:get('Room', RoomId), + kvs:put(Room#'Room'{last_msg = MsgId}); + #p2p{} -> + ok + end, + + %% Move cursor + case lists:member(cursor, Type) of + false -> kvs_stream:save(roster:offset_reader(Writer2, R)); + true -> ok + end, + + %% Update linked message + LinkRes1 = + case LinkRes of + #'Message'{ prev = Next0, repliedby = ReplBy } -> + Next = case Next0 of [] -> MsgId; _ -> Next0 end, + LM = LinkRes#'Message'{ repliedby = [MsgId | ReplBy], + prev = Next }, + kvs:put(LM), + LM; + _ -> + LinkRes end, + Msg3 = Msg2#'Message'{link = LinkRes1}, + + Ack1 = case has_flag(Descs, message_ack) of - true -> - #'MessageAck'{id = MsgId, - next = Msg2#'Message'.next, - feed_id = Msg2#'Message'.feed_id, - msg_id = ClientMsgId, - created = Msg2#'Message'.created}; - false -> - <<>> - end - end; - _ -> - case has_flag(Descs, message_ack) of - true -> #io{code = #'MessageErr'{feed_id = F, - msg_id = ClientMsgId, - error = FeedData}}; - false -> #io{code = FeedData} - end - end end), - {reply, {bert, IO}, Req, State}; + true -> #'MessageAck'{id = MsgId, + next = Msg3#'Message'.next, + feed_id = Msg3#'Message'.feed_id, + msg_id = ClientMsgId, + created = Msg3#'Message'.created}; + false -> <<>> + end, + {ToSend1 ++ [Msg3], Ack1} + end), + + %% Send the message(s) after DB transaction + [ roster:send_feed(C, Feed, M) || M <- ToSend ], + + %% Maybe also push the message + PushM = lists:last(ToSend), + case [lists:keyfind(BubbleContentType, #'Desc'.mime, Descs) + || BubbleContentType <- [?CONTENT_TYPE_VIDEOCALL, ?CONTENT_TYPE_AUDIOCALL]] of + [false, false] -> n2o_async:pid(system, ?MODULE) ! {send_push, From, To, PushM, []}; + _ -> skip + end, + + {reply, {bert, Ack}, Req, State} + end + catch throw:#error{} = Err -> + {reply, {bert, #io{code = Err}}, Req, State} + end; info(#'Message'{status = edit, id = Id, msg_id = ClMID, feed_id = Feed, from = From, to = To, mentioned = Mentioned, files = [#'Desc'{} | _] = Descs}, Req, -- GitLab From 9c7ee011914e8f53bc635f5e89801b2327075555 Mon Sep 17 00:00:00 2001 From: Hans Svensson Date: Fri, 24 Apr 2020 16:33:59 +0200 Subject: [PATCH 2/5] Add access checks in clean_history (room/p2p-channel delete) --- apps/roster/src/protocol/roster_history.erl | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/apps/roster/src/protocol/roster_history.erl b/apps/roster/src/protocol/roster_history.erl index 39307e7b4..cd0284d26 100644 --- a/apps/roster/src/protocol/roster_history.erl +++ b/apps/roster/src/protocol/roster_history.erl @@ -290,17 +290,18 @@ info(#'History'{} = Data, Req, State) -> % Helper Section -clean_history(#muc{name = Room} = Feed, <<"emqttd_", _/binary>> = ClientId) -> - case roster:muc_member(PhoneId = roster:phone_id(ClientId), Room) of - #'Member'{status = admin} -> - clean_history(Feed, PhoneId); - _ -> #error{code = permission_denied} - end; clean_history(#muc{name = Room} = Feed, PhoneId) -> - clean_history(Feed, PhoneId, Room, roster:members(Feed)); -clean_history(#p2p{} = Feed, PhoneId) -> - Friend = roster:friend(PhoneId, Feed), - clean_history(Feed, PhoneId, Friend, [roster:get_contact(PhoneId, Friend)]). + case roster:muc_member(PhoneId, Room) of + #'Member'{status = admin} -> clean_history(Feed, PhoneId, Room, roster:members(Feed)); + _ -> #error{code = permission_denied} + end; +clean_history(#p2p{from = From, to = To} = Feed, PhoneId) -> + case PhoneId of + From -> clean_history(Feed, From, To, [roster:get_contact(From, To)]); + To -> clean_history(Feed, To, From, [roster:get_contact(To, From)]); + _ -> #error{code = permission_denied} + end. + clean_history(Feed, From, To, Rs) -> Msg = roster:desc_id(#'Message'{status = clear, feed_id = Feed, from = From, to = To, type = [sys], msg_id = iolist_to_binary([<<"rmv_history_">>, roster:msg_id()]), -- GitLab From ec02ec96ccd7d3fdf18d21fa71c2b5f96a381c82 Mon Sep 17 00:00:00 2001 From: Hans Svensson Date: Fri, 24 Apr 2020 16:38:38 +0200 Subject: [PATCH 3/5] Don't send notification on Room/delete to removed members + general cleanup of Room/delete clause --- apps/roster/src/protocol/roster_room.erl | 68 +++++++++++++++--------- 1 file changed, 42 insertions(+), 26 deletions(-) diff --git a/apps/roster/src/protocol/roster_room.erl b/apps/roster/src/protocol/roster_room.erl index 7ceb5789b..4410a2d1d 100644 --- a/apps/roster/src/protocol/roster_room.erl +++ b/apps/roster/src/protocol/roster_room.erl @@ -276,32 +276,48 @@ info(#'Room'{status = remove, members = Members, admins = Admins0, id = Room}, R {error, _} -> #io{code = #error{code = room_not_found}} end, {reply, {bert, IO}, Req, State}; -info(#'Room'{status = delete, members = [], admins = [], id = Room} = R, Req, - #cx{params = <<"sys_", _/binary>> = ClientId, client_pid = C} = State) -> - ?LOG_INFO("~p:Room/delete:~p", [ClientId, Room]), - case kvs_stream:load_writer(Feed = #muc{name = Room}) of - #writer{first = #'Message'{id = FirstMsgId}, cache = #'Message'{id = LastMsgId}} -> - FilterFun = fun(#'Message'{type = [sys|_]}, _Acc) -> true; (_, _Acc) -> false end, - StopFun = fun(_, false) -> 0; (_M, _A) -> 1 end, - Now = roster:now_msec(), - IsMsg = roster:fold(FilterFun, true, 'Message', LastMsgId, FirstMsgId, - #kvs{mod = store_mnesia}, #iterator.prev, StopFun), - [case IsMsg of - true -> - roster:remove_rooms(PhoneId, [Room]), - kvs:delete(reader, Reader), - kvs:put(Member#'Member'{reader = 0, update = roster:now_msec(), status = removed}), - roster:send_ses(C, roster:phone(PhoneId), R#'Room'{update = Now}); - _ -> - RosterId = roster:roster_id(PhoneId), - {ok, #'Roster'{roomlist = Rooms}} = kvs:get('Roster', RosterId), - RoomRoster = lists:keyfind(Room, #'Room'.id, Rooms), - roster:update_rooms(RosterId, RoomRoster#'Room'{type = group}) - end || #'Member'{reader = Reader, phone_id = PhoneId} = Member <- roster:members(Feed), Reader/=0], - {ok, R2} = kvs:get('Room', Room), - kvs:put(case IsMsg of true -> R2#'Room'{status = delete}; _-> R2#'Room'{type = group} end), - skip; - _ -> skip +info(#'Room'{status = delete, members = [], admins = [], id = RoomId} = Room, Req, + #cx{params = <<"sys_", _/binary>> = ClientId, client_pid = C} = State) -> + ?LOG_INFO("~p:Room/delete:~p", [ClientId, RoomId]), + case kvs_stream:load_writer(Feed = #muc{name = RoomId}) of + #writer{cache = #'Message'{id = LastMsgId}} -> + Fun = fun(#'Message'{type = [sys | _]}, _) -> {cont, false}; + (#'Message'{}, _) -> {stop, true} + end, + {_, HasMsg} = roster:fold_stream('Message', Fun, LastMsgId, false, bwd), + Now = roster:now_msec(), + {ok, Room2} = kvs:get('Room', RoomId), + [case HasMsg of + false -> + roster:remove_rooms(PhoneId, [RoomId]), + kvs:delete(reader, Reader), + kvs:put(Member#'Member'{reader = 0, update = Now, status = removed}), + [ roster:send_ses(C, roster:phone(PhoneId), + Room#'Room'{update = Now, name = Room2#'Room'.name}) + || Member#'Member'.status /= removed ]; + true -> + %% Don't delete the room if it has messages. Why not? + %% Theory: because when disconnecting a call the corresponding + %% room is automatically deleted, but we don't want that + %% if there are messages. + RosterId = roster:roster_id(PhoneId), + {ok, #'Roster'{roomlist = Rooms}} = kvs:get('Roster', RosterId), + case lists:keyfind(RoomId, #'Room'.id, Rooms) of + RoomR = #'Room'{} -> + roster:update_rooms(RosterId, RoomR#'Room'{type = group}); + _ -> + %% Member already left the room + ok + end + end || #'Member'{reader = Reader, phone_id = PhoneId} = Member <- roster:members(Feed), + Reader /= 0], + + Room3 = if HasMsg -> Room2#'Room'{type = group, update = Now}; + true -> Room2#'Room'{status = delete, update = Now} + end, + kvs:put(Room3); + _ -> + ok end, {reply, {bert, <<>>}, Req, State}; -- GitLab From ee86d7453847e9e8fcae3a1c14d7693385fa1c13 Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Mon, 11 May 2020 15:16:06 +0200 Subject: [PATCH 4/5] Fix issue with non-validating packets still being processed See also https://github.com/NYNJA-MC/n2o/pull/1 --- apps/roster/src/roster.erl | 9 +++++---- rebar.lock | 2 +- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index d27314fb1..da83c466b 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -990,10 +990,11 @@ validate(Payload, ClientId) -> Term = prevalidate(binary_to_term(Payload)), case roster_validator:validate(Term) of [] -> ok; - Res -> ?LOG_INFO("validate error:~n~p", [Res]), - emqttd:publish(emqttd_message:make( - ClientId, 2, action_topic(ClientId), - term_to_binary(#errors{code = [<<"666">>], data = Res}))), error + Res -> + ?LOG_INFO("validate error:~n~p", [Res]), + ErrMsg = emqttd_message:make(ClientId, 2, action_topic(ClientId), + term_to_binary(#errors{code = [<<"666">>], data = Res})), + {error, ErrMsg} end. %% MUC API diff --git a/rebar.lock b/rebar.lock index dae57e48f..32a75bd4a 100644 --- a/rebar.lock +++ b/rebar.lock @@ -143,7 +143,7 @@ 1}, {<<"n2o">>, {git,"git://github.com/NYNJA-MC/n2o", - {ref,"e215ca7168b41f9e0061ea7162c078f44b53029e"}}, + {ref,"a90d3660196abe9dc6f260680dd85f44ca6ad755"}}, 0}, {<<"nitro">>, {git,"git://github.com/synrc/nitro", -- GitLab From 2fecd61db31c133dd617e45fc3f3c2731134d21a Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Tue, 19 May 2020 10:15:24 +0200 Subject: [PATCH 5/5] Update rebar.lock --- rebar.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.lock b/rebar.lock index 32a75bd4a..930b18f4a 100644 --- a/rebar.lock +++ b/rebar.lock @@ -156,7 +156,7 @@ {<<"parse_trans">>,{pkg,<<"parse_trans">>,<<"3.3.0">>},2}, {<<"prometheus">>, {git,"https://github.com/deadtrickster/prometheus.erl", - {ref,"f8619006f945eeaeb1725206209ec89a1409575c"}}, + {ref,"e40e92d811cc057e31cdd74dc0a2049932ab56b9"}}, 0}, {<<"qdate">>, {git,"https://github.com/enterprizing/qdate.git", -- GitLab