From bc40e4c305c85e17a9d5622bbdab539ee6ef9379 Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Tue, 14 Apr 2020 14:05:43 +0200 Subject: [PATCH 01/19] Fix history limit bug when searching backwards --- apps/roster/src/protocol/roster_history.erl | 4 ++-- apps/roster/src/roster.erl | 17 +++++++++++++++++ 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/apps/roster/src/protocol/roster_history.erl b/apps/roster/src/protocol/roster_history.erl index 31a00d5bd..1f721d9c0 100644 --- a/apps/roster/src/protocol/roster_history.erl +++ b/apps/roster/src/protocol/roster_history.erl @@ -97,7 +97,7 @@ info(#'History'{status = get, roster_id = Roster0, feed = Feed, size = N, entity _ -> MId end, %% Normalize if MId was too large... - MaxReadId = case MId2 > LastMsgId of true -> LastMsgId; _ -> MId2 end, + MaxReadId = min(MId2, LastMsgId), Filter = case N of [] -> msg_update; _ -> msg_filter end, %% select filter function {InnerFilterFun, Mime2} = case MsgData of @@ -127,7 +127,7 @@ info(#'History'{status = get, roster_id = Roster0, feed = Feed, size = N, entity FilterFun = fun (Msg, {Acc, _} = A) when length(Acc) < abs(N2) -> - (roster:msg_filter_fun(N2, InnerFilterFun, UID, AccFun))(Msg, A); + (roster:msg_filter_fun2(N2, InnerFilterFun, UID, AccFun))(Msg, A); (_, Acc) -> Acc end, diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index c4dafab74..530b1bd0c 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -1845,6 +1845,23 @@ msg_filter_fun(DirSize, InnerFilterFun, UID, AccFun) -> Rdr#reader{cache = {'Message', element(Iter, Msg)}, pos = Pos + Sign}; _ -> Rdr end}; (_, A) -> A end. +msg_filter_fun2(DirSize, InnerFilterFun, UID, AccFun) -> + {Iter, Sign} = case DirSize > 0 of true -> {#iterator.prev, 1}; _ -> {#iterator.next, -1} end, + fun(#'Message'{id = Id} = Msg, {Acc, #reader{cache = {'Message', LastReadId}, pos = Pos} = Rdr}) -> + %% Run AccFun if not yet caught up with reader or reader pos > 0 + Acc1 = case Id /= LastReadId orelse Pos > 0 of + true -> AccFun(InnerFilterFun(Msg, UID), Msg, Acc, DirSize); + false -> Acc + end, + %% Update reader if in sync + Rdr1 = case Id == LastReadId andalso Pos >= 0 of + true -> Rdr#reader{cache = {'Message', element(Iter, Msg)}, pos = Pos + Sign}; + false -> Rdr + end, + {Acc1, Rdr1}; + (_, A) -> A + end. + msg_stop_fun(MaxReadId, UID) -> msg_stop_fun(MaxReadId, UID, 1). msg_stop_fun(MaxReadId, UID, Dir) -> -- GitLab From f947a2ec6bb0ae1dc6b3e8bdc0fd97d527e44e96 Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Tue, 14 Apr 2020 17:07:34 +0200 Subject: [PATCH 02/19] Fix sort_readers to not include the member that just left --- apps/roster/src/roster.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index 530b1bd0c..4d5be3743 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -1175,7 +1175,9 @@ put_readers(TopAction, #'Member'{feed_id = #muc{name = R}} = M) -> sort_readers(#'Room'{id = Room, type = group} = R) -> {MembIds, _} = lists:unzip(lists:sublist(lists:reverse(lists:keysort(2, - [{MembId, get_reader(Member)} || #'Member'{id = MembId} = Member<-members(#muc{name = Room})])), 2)), + [{MembId, MsgId} + || #'Member'{id = MembId} = Member<-members(#muc{name = Room}), + MsgId <- [get_reader(Member)], MsgId > 0])), 2)), R#'Room'{readers = MembIds}; sort_readers(#'Room'{} = R) -> R; sort_readers(Room) -> -- GitLab From 33bb10b1481a1d05ec3ff2d401b29b797d47e337 Mon Sep 17 00:00:00 2001 From: Hans Svensson Date: Thu, 16 Apr 2020 10:09:31 +0200 Subject: [PATCH 03/19] Simplify sort_readers when leaving group (i.e. don't sort) Replace the left member by '0' so it will be bumped out when others act on the group. --- apps/roster/src/protocol/roster_room.erl | 2 +- apps/roster/src/roster.erl | 30 ++++++++++++++---------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/apps/roster/src/protocol/roster_room.erl b/apps/roster/src/protocol/roster_room.erl index b9aba8bd5..e6a475559 100644 --- a/apps/roster/src/protocol/roster_room.erl +++ b/apps/roster/src/protocol/roster_room.erl @@ -317,7 +317,7 @@ info(#'Room'{status = leave, id = RoomId}, Req, #cx{params = ClientId, client_pi #'Member'{phone_id = PhoneId, status = Status, reader = Reader} = M when [M] /= Admins -> roster:remove_rooms(ClientId, [RoomId]), kvs:delete(reader, Reader), - Room1 = roster:sort_readers(Room0), + Room1 = roster:sort_readers(M#'Member'.id, Room0), Room2 = case Status of admin -> Room1#'Room'{ admins = [M] }; _ -> Room1#'Room'{ members = [M] } diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index 4d5be3743..e8d82582d 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -1139,6 +1139,7 @@ put_readers(TopAction, #'Member'{id = Id, feed_id = #muc{name = RoomId}, phone_i case {Readers, TopAction} of {[Id | _], _} -> Readers; {[], _} -> [Id]; + {[0 | Rest], write_top} -> [Id | Rest -- [Id]]; {[_ | _], write_top} -> lists:sublist([Id | Readers--[Id]], 2); {[_], read_top} -> Readers ++ [Id]; {[_, Id], read_top} -> Readers; @@ -1147,11 +1148,12 @@ put_readers(TopAction, #'Member'{id = Id, feed_id = #muc{name = RoomId}, phone_i case kvs:get('Member', RMemberId) of {ok, #'Member'{reader = RId}} -> case kvs_stream:load_reader(RId) of - #reader{cache = {'Message', RMsgId}} when MsgId > RMsgId -> - [WMemberId, Id]; - _ -> Readers + #reader{cache = {'Message', RMsgId}} when MsgId =< RMsgId -> + Readers; + _ -> + [WMemberId, Id] end; - _ -> Readers end; + _ -> [WMemberId, Id] end; Res -> ?LOG_INFO("invalid put_readers:~p", [Res]), Readers end, Room2 = case Readers2 of Readers -> Room;_ -> @@ -1173,15 +1175,17 @@ put_readers(TopAction, #'Member'{feed_id = #muc{name = R}} = M) -> {ok, Room} = kvs:get('Room', R), put_readers(TopAction, M, Room). -sort_readers(#'Room'{id = Room, type = group} = R) -> - {MembIds, _} = lists:unzip(lists:sublist(lists:reverse(lists:keysort(2, - [{MembId, MsgId} - || #'Member'{id = MembId} = Member<-members(#muc{name = Room}), - MsgId <- [get_reader(Member)], MsgId > 0])), 2)), - R#'Room'{readers = MembIds}; -sort_readers(#'Room'{} = R) -> R; -sort_readers(Room) -> - case kvs:get('Room', Room) of {ok, R} -> sort_readers(R); Err -> Err end. +sort_readers(MId, #'Room'{id = Room, type = group, readers = Rs} = R) -> + Rs1 = case Rs of + [MId] -> []; + [MId, 0] -> []; + [0, MId] -> []; + [MId, X] -> [0, X]; + [X, MId] -> [X, 0]; + _ -> Rs + end, + R#'Room'{readers = Rs1}; +sort_readers(_, #'Room'{} = R) -> R. readmsgs(#'Contact'{phone_id = R}=C, LocR) -> C#'Contact'{reader = p2p_readmsgs(LocR, R)}; readmsgs(M, _) -> M. -- GitLab From 447d8ff4094bec964a35985278f80b777df6d3d3 Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Fri, 17 Apr 2020 16:51:45 +0200 Subject: [PATCH 04/19] Clean up history/delete Minor changes: - remove unnecessary setting of seenby - don't count "History removed" message as unread for remover --- apps/roster/src/protocol/roster_history.erl | 33 +++++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/apps/roster/src/protocol/roster_history.erl b/apps/roster/src/protocol/roster_history.erl index 1f721d9c0..14e2a339b 100644 --- a/apps/roster/src/protocol/roster_history.erl +++ b/apps/roster/src/protocol/roster_history.erl @@ -275,24 +275,31 @@ clean_history(Feed, From, To, Rs) -> files = [#'Desc'{payload = <<"History was removed">>}]}), case kvs:get(writer, Feed) of {ok, #writer{count = Top, id = #muc{name = MUC}} = W} -> - NewW = #writer{cache = #'Message'{next = NewFirst} = M} = + NewW = #writer{cache = #'Message'{id = NewFirst, next = OldLast} = M} = kvs_stream:add(W#writer{args = Msg#'Message'{created = roster:now_msec()}}), - roster:set_seenby(kvs:get('Message', NewFirst), -Top - 1, [-1]), - [kvs_stream:save((kvs_stream:load_reader(R))#reader{pos = 0, cache = {'Message', NewFirst}}) || #'Member'{reader = R} <- Rs, R > 0], + [begin + Reader0 = kvs_stream:load_reader(R), + %% Mark "History removed" as read for the remover and unread for the others + Reader1 = case PhoneId == From of + true -> Reader0#reader{pos = 1, cache = {'Message', NewFirst}}; + false -> Reader0#reader{pos = 0, cache = {'Message', OldLast}} + end, + kvs_stream:save(Reader1) + end || #'Member'{phone_id = PhoneId, reader = R} <- Rs, R > 0], kvs_stream:save(NewW#writer{count = 1, first = M}), {ok, Room} = kvs:get('Room', MUC), kvs:put(Room#'Room'{readers = []}), {0, M}; - {ok, #writer{count = Top, id = #p2p{}} = W} -> - #writer{cache = #'Message'{id = NewFirst, next = NextId} = M} = - kvs_stream:save(kvs_stream:add(W#writer{args = Msg#'Message'{seenby = [To], created = roster:now_msec()}})), - [begin - case kvs:get('Message', NextId) of - {ok, NextM} -> roster:set_seenby({ok, NextM}, -Top - 1, [From]); - %% TODO No need roster:set_seenby/4. History funs must stop on "clear" message - _ -> skip end, - kvs_stream:save((kvs_stream:load_reader(R))#reader{pos = 1, cache = {'Message', NewFirst}}) - end || #'Contact'{reader = R} <- Rs], + {ok, #writer{count = Top, id = #p2p{}} = Writer} -> + %% The other party should not see the "History removed" message. + Msg1 = Msg#'Message'{seenby = [To || To /= From], created = roster:now_msec()}, + Writer1 = Writer#writer{args = Msg1}, + #writer{cache = #'Message'{id = NewFirst} = M} = kvs_stream:save(kvs_stream:add(Writer1)), + + %% Update From's reader to make the "History removed" message the first in the history. + [#'Contact'{reader = R}] = Rs, + kvs_stream:save((kvs_stream:load_reader(R))#reader{pos = 1, cache = {'Message', NewFirst}}), + {0, M}; _ -> {#io{code = #error{code = invalid_data}}, <<>>} end. -- GitLab From ed96efe33990fcf2d120b2a24b1dc6666476f25a Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Mon, 20 Apr 2020 09:33:19 +0200 Subject: [PATCH 05/19] Properly don't count "History removed" message as unread --- apps/roster/src/protocol/roster_history.erl | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/apps/roster/src/protocol/roster_history.erl b/apps/roster/src/protocol/roster_history.erl index 14e2a339b..debc0b321 100644 --- a/apps/roster/src/protocol/roster_history.erl +++ b/apps/roster/src/protocol/roster_history.erl @@ -277,18 +277,20 @@ clean_history(Feed, From, To, Rs) -> {ok, #writer{count = Top, id = #muc{name = MUC}} = W} -> NewW = #writer{cache = #'Message'{id = NewFirst, next = OldLast} = M} = kvs_stream:add(W#writer{args = Msg#'Message'{created = roster:now_msec()}}), + {ok, Room} = kvs:get('Room', MUC), [begin Reader0 = kvs_stream:load_reader(R), %% Mark "History removed" as read for the remover and unread for the others Reader1 = case PhoneId == From of - true -> Reader0#reader{pos = 1, cache = {'Message', NewFirst}}; + true -> + %% Set the remover to the last writer of the feed + kvs:put(Room#'Room'{readers = [MemberId]}), + Reader0#reader{pos = 1, cache = {'Message', NewFirst}}; false -> Reader0#reader{pos = 0, cache = {'Message', OldLast}} end, kvs_stream:save(Reader1) - end || #'Member'{phone_id = PhoneId, reader = R} <- Rs, R > 0], + end || #'Member'{id = MemberId, phone_id = PhoneId, reader = R} <- Rs, R > 0], kvs_stream:save(NewW#writer{count = 1, first = M}), - {ok, Room} = kvs:get('Room', MUC), - kvs:put(Room#'Room'{readers = []}), {0, M}; {ok, #writer{count = Top, id = #p2p{}} = Writer} -> %% The other party should not see the "History removed" message. -- GitLab From 31041341534f14bf2c4f1e2e7c3ba2f5016d0037 Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Mon, 20 Apr 2020 11:39:46 +0200 Subject: [PATCH 06/19] Return invalid_data on History/get on message before history limit --- apps/roster/src/protocol/roster_history.erl | 11 +++++++++++ apps/roster/src/roster.erl | 15 +++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/apps/roster/src/protocol/roster_history.erl b/apps/roster/src/protocol/roster_history.erl index debc0b321..2be9f2f1b 100644 --- a/apps/roster/src/protocol/roster_history.erl +++ b/apps/roster/src/protocol/roster_history.erl @@ -68,6 +68,7 @@ info(#'History'{status = get, roster_id = Roster0, feed = Feed, size = N, entity error -> ?LOG_INFO("History/get.Error:~p", [R]), #io{code = #error{code = R}}; _ -> + try %% Fix indentation later! {N2, FId} = case {N, MsgData} of {_, [#'Message'{id = MId}]} when is_integer(MId) -> @@ -98,6 +99,13 @@ info(#'History'{status = get, roster_id = Roster0, feed = Feed, size = N, entity end, %% Normalize if MId was too large... MaxReadId = min(MId2, LastMsgId), + + %% Check that MaxReadId is not before history limit + case roster:message_within_history_limit(MaxReadId, Reader) of + true -> ok; + false -> throw({error, invalid_data}) + end, + Filter = case N of [] -> msg_update; _ -> msg_filter end, %% select filter function {InnerFilterFun, Mime2} = case MsgData of @@ -161,6 +169,9 @@ info(#'History'{status = get, roster_id = Roster0, feed = Feed, size = N, entity %% CheckedResponseMsgs = [roster:check_message(Msg) || Msg <- ResponseMsgs], case Msgs2 of #error{} = E -> #io{code = E}; _ -> History#'History'{data = Msgs2, size = length(Msgs2), status = InitialStatus} end + catch throw:#error{} = Err -> + #io{code = Err} + end end, {reply, {bert, IO}, Req, State}; diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index e8d82582d..d1b4f4ec7 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -1790,6 +1790,21 @@ find_bot(#writer{count = Count, cache = #'Message'{id = LastMsgId}}, #reader{} = #kvs{mod = store_mnesia}, #iterator.next, StopFun2), norm_reader(Rdr#reader{pos = 0}). +%% Check if a message is within the history limit of the given reader. +message_within_history_limit(MsgId, Reader) -> + case Reader of + #reader{cache = []} -> true; + #reader{cache = {'Message', ReadMsg}, pos = P} when P > 0, MsgId >= ReadMsg -> true; + #reader{cache = {'Message', ReadMsg}, pos = 0} when MsgId > ReadMsg -> true; + #reader{cache = {'Message', ReadMsg}, pos = Pos} -> + FilterFun = fun(#'Message'{id = MId}, {InLimit, Rd = #reader{ pos = P }}) -> + {InLimit andalso P > 0, Rd#reader{ pos = P - 1 }} + end, + {InLimit, _} = fold(FilterFun, {true, Reader}, 'Message', ReadMsg, MsgId - 1, + #kvs{mod = store_mnesia}, #iterator.next, Pos + 1), + InLimit + end. + start_reader(#reader{cache = []} = Reader) -> Reader#reader{cache = {'Message', []}}; start_reader(Reader) -> Reader. norm_reader(#reader{cache = {'Message', []}} = Reader) -> Reader#reader{cache = []}; -- GitLab From 446ee2e2f978cf9e7c95eaa276dffba7e64d2898 Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Mon, 20 Apr 2020 16:31:27 +0200 Subject: [PATCH 07/19] Update history limit when rejoining a room Previously the new limit was ignored --- apps/roster/src/roster.erl | 43 +++++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index d1b4f4ec7..9ad8c8384 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -1087,9 +1087,7 @@ add_member(#'Room'{} = R, #'Member'{id = MId, reader = 0, feed_id = Feed, phone_ {ok, #'Roster'{nick = Nick, names = Names, surnames = Surnames, avatar = Avatar}} -> Alias2 = case Alias of EmptyAlias when EmptyAlias == []; EmptyAlias == <<>> -> Nick; _ -> Alias end, Rdr = #reader{id = NewR} = kvs_stream:reader(Feed), %% Get reader, cached to first msg in feed - HistLimit = case HL of [] -> 0; [H | _] -> H - 1 end, - Rdr2 = find_bot(kvs_stream:load_writer(Feed), Rdr, MId, HistLimit), %% Move reader, to history-limit - Reader = kvs_stream:save(Rdr2), + Reader = kvs_stream:save(Rdr), {NewMId, AddFun} = case MId of [] -> {kvs:next_id('Member', 1), add};_ -> {MId, put} end, {ok, #'Profile'{settings = Settings}} = kvs:get('Profile', roster:phone(PhoneId)), {M2, Msg, R2} = add_member(R, M#'Member'{id = NewMId, reader = NewR, container = chain, feed_id = Feed, @@ -1101,22 +1099,35 @@ add_member(#'Room'{} = R, #'Member'{id = MId, reader = 0, feed_id = Feed, phone_ #error{} -> {error, roster_not_found} %% TODO add error handler end; add_member(#'Room'{} = R, - #'Member'{feed_id = #muc{name = Room} = Feed, alias = Alias, phone_id = Roster } = M, - {MsgPayload, _HL}) when is_record(Roster, 'Roster') -> + #'Member'{id = MemberId, feed_id = #muc{name = Room} = Feed, reader = ReaderId, alias = Alias, phone_id = Roster } = M, + {MsgPayload, HL}) when is_record(Roster, 'Roster') -> #'Roster'{id = RosterId, phone = Phone, names = Name, surnames = Surname} = Roster, PhoneId = phone_id(Phone, RosterId), - Msg = case MsgPayload of - no_muc_message -> []; + %% Update reader to new history limit + HistLimit = case HL of [] -> 0; [H | _] -> H - 1 end, + Writer = kvs_stream:load_writer(Feed), + LastRead = case kvs:get(reader, ReaderId) of + {ok, #reader{ cache = {'Message', MsgId} }} -> MsgId; + _ -> [] + end, + %% TODO: Update the existing reader instead of generating a new one + Reader1 = #reader{id = NewReaderId} + = kvs_stream:reader(Feed), %% Get reader, cached to first msg in feed + Reader2 = find_bot(Writer, Reader1, MemberId, LastRead, HistLimit), + Reader = kvs_stream:save(Reader2), + {Msg, R1} = case MsgPayload of + no_muc_message -> {[], R}; _ -> MucMsg = #'Message'{status = [], type = [sys], feed_id = Feed, from = PhoneId, to = Room, files = [#'Desc'{payload = MsgPayload}]}, #writer{cache = LastMsg} = add_message(MucMsg), - LastMsg + {_, NewRoom} = put_readers(write_top, M#'Member'{reader = Reader}, R), + {LastMsg, NewRoom} end, - update_rooms(RosterId, R), subscribe_room(M2 = M#'Member'{phone_id = PhoneId}), + update_rooms(RosterId, R1), subscribe_room(M2 = M#'Member'{phone_id = PhoneId}), subscribe_muc(PhoneId, Feed), - {M2#'Member'{alias = member_alias(Alias, Name, Surname)}, Msg, R}; + {M2#'Member'{reader = NewReaderId, alias = member_alias(Alias, Name, Surname)}, Msg, R1}; add_member(#'Room'{} = R, #'Member'{phone_id = PhoneId} = M, Args) -> case kvs:get('Roster', roster_id(PhoneId)) of {ok, Roster} -> add_member(R, M#'Member'{phone_id = Roster}, Args); @@ -1764,12 +1775,10 @@ status_msg(T, UID, FS) -> LStatus = [[], clear], true -> erlang:setelement(FS, T, update); R -> R end; _ -> false end. -find_bot(#writer{cache = []}, #reader{} = Reader, _UID, _HistLimit) -> Reader; -find_bot(#writer{count = Count}, #reader{} = Reader, _UID, HistLimit) when HistLimit >= Count -> - Reader#reader{pos = 0, cache = []}; -find_bot(#writer{cache = #'Message'{id = LastMsgId}}, Reader, _UID, 0) -> +find_bot(#writer{cache = []}, #reader{} = Reader, _UID, _StopMsg, _HistLimit) -> Reader; +find_bot(#writer{cache = #'Message'{id = LastMsgId}}, Reader, _UID, _StopMsg, 0) -> Reader#reader{pos = 0, cache = {'Message', LastMsgId}}; -find_bot(#writer{count = Count, cache = #'Message'{id = LastMsgId}}, #reader{} = Reader, UID, HistLimit) -> +find_bot(#writer{count = Count, cache = #'Message'{id = LastMsgId}}, #reader{} = Reader, UID, StopMsg, HistLimit) -> StartReader = Reader#reader{pos = Count, cache = {'Message', LastMsgId}}, AccFun = fun%%(_, #'Message'{type = [sys|_]}, C, _Dir) -> C; (1, _Msg, C, _Dir) when C > 0 -> C-1; @@ -1782,11 +1791,11 @@ find_bot(#writer{count = Count, cache = #'Message'{id = LastMsgId}}, #reader{} = end, StopFun = roster:msg_stop_fun([], UID), - StopFun2 = fun(_Msg, {C, #reader{}}) when C =< 0 -> 0; + StopFun2 = fun(_Msg, {C, #reader{pos = Pos}}) when C =< 0; Pos =< 1 -> 0; (Msg, A) -> StopFun(Msg, A) end, - {_, Rdr} = roster:fold(FilterFun2, {HistLimit, StartReader}, 'Message', LastMsgId, [], + {_, Rdr} = roster:fold(FilterFun2, {HistLimit, StartReader}, 'Message', LastMsgId, StopMsg, #kvs{mod = store_mnesia}, #iterator.next, StopFun2), norm_reader(Rdr#reader{pos = 0}). -- GitLab From 5c28e35ffc1d6f6a06294bfe25edee6086d13fbf Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Mon, 20 Apr 2020 16:32:57 +0200 Subject: [PATCH 08/19] When removing users from a room, also remove them from the "readers" list This will cause some additional read notifications to be sent. --- apps/roster/src/protocol/roster_room.erl | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/apps/roster/src/protocol/roster_room.erl b/apps/roster/src/protocol/roster_room.erl index e6a475559..758157f03 100644 --- a/apps/roster/src/protocol/roster_room.erl +++ b/apps/roster/src/protocol/roster_room.erl @@ -159,7 +159,8 @@ info(#'Room'{status = St, id = Room, members = Members, admins = Admins, readers #'Member'{} = M -> kvs:put(M2 = roster:patch_member(M#'Member'{status = Status}, Member)), {ignore, M2}; - _ -> {M2, _, UpdRoom} = roster:add_member(TmpRoom, + _ -> + {M2, _, UpdRoom} = roster:add_member(TmpRoom, Member#'Member'{feed_id = #muc{name = Room}, reader = 0}, {no_muc_message, HL}), {M2, UpdRoom} end, @@ -262,7 +263,9 @@ info(#'Room'{status = remove, members = Members, admins = Admins0, id = Room}, R Msg = roster:add_message(#'Message'{feed_id = #muc{name = Room}, from = APId, to = Room, msg_id = roster:msg_id(), status = [], type = [sys], files = [#'Desc'{payload = iolist_to_binary([APId, <<" removed: ">>, lists:droplast(Aliases)])}]}, Reader), - {_, StoredRoom2} = roster:put_readers(write_top, AdmMember, StoredRoom), + StoredRoom1 = lists:foldl(fun(#'Member'{id = Id}, R) -> roster:sort_readers(Id, R) end, + StoredRoom, Mmbrs), + {_, StoredRoom2} = roster:put_readers(write_top, AdmMember, StoredRoom1), kvs:put(R = StoredRoom2#'Room'{update = roster:now_msec(), last_msg = Msg#'Message'.id}), [roster:update_rooms(RId, R#'Room'{status = removed}) || RId <- RosterIds], R2 = roster:reader_cache(R#'Room'{status = remove, last_msg = [], -- GitLab From 89ef09ef1abfeb7d1665db5058da18eb39e9f088 Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Tue, 21 Apr 2020 10:22:21 +0200 Subject: [PATCH 09/19] Add fold_stream: replacement for roster:fold --- apps/roster/src/roster.erl | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index 9ad8c8384..d5bea7d2b 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -2069,6 +2069,31 @@ next_key(Table, CurrentId, Count) -> NextId -> next_key(Table, NextId, Count-1) end. +%% Traverse a stream in direction Dir, starting from Id. Stops when Fun returns +%% {stop, Res} or when the stream runs out (returning {eos, Acc}). For the +%% direction, 'fwd' means increasing ids and 'bwd' is decreasing ids. +-spec fold_stream(Table :: atom(), + Fun :: fun((Element, Acc) -> {stop, Res} | {cont, Acc}), + Id :: ElementId, + Acc :: Acc, + Dir :: fwd | bwd) -> {stop, Res} | {eos, Acc} when + ElementId :: non_neg_integer(), + Element :: tuple(). +fold_stream(_Table, _Fun, [], Acc, _Dir) -> + %% Stream ran out + {eos, Acc}; +fold_stream(Table, Fun, Id, Acc, Dir) -> + {ok, Elem} = kvs:get(Table, Id), + case Fun(Elem, Acc) of + {stop, Res} -> {ok, Res}; + {cont, Acc1} -> + Next = case Dir of + fwd -> element(#iterator.prev, Elem); + bwd -> element(#iterator.next, Elem) + end, + fold_stream(Table, Fun, Next, Acc1, Dir) + end. + %% kvs utils %% TODO add to kvs fold(_, Acc, _, [], _, _) -> Acc; fold(_, Acc, _, _, [], _) -> ?LOG_INFO("uncertain direction", []), Acc; -- GitLab From 9f735a48a4adddccd287d5c04ace7a42d410ca14 Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Tue, 21 Apr 2020 10:22:59 +0200 Subject: [PATCH 10/19] Implement find_bot using fold_stream --- apps/roster/src/roster.erl | 32 +++++++++++--------------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index d5bea7d2b..8e1064463 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -1776,28 +1776,18 @@ status_msg(T, UID, FS) -> LStatus = [[], clear], _ -> false end. find_bot(#writer{cache = []}, #reader{} = Reader, _UID, _StopMsg, _HistLimit) -> Reader; -find_bot(#writer{cache = #'Message'{id = LastMsgId}}, Reader, _UID, _StopMsg, 0) -> - Reader#reader{pos = 0, cache = {'Message', LastMsgId}}; find_bot(#writer{count = Count, cache = #'Message'{id = LastMsgId}}, #reader{} = Reader, UID, StopMsg, HistLimit) -> - StartReader = Reader#reader{pos = Count, cache = {'Message', LastMsgId}}, - AccFun = fun%%(_, #'Message'{type = [sys|_]}, C, _Dir) -> C; - (1, _Msg, C, _Dir) when C > 0 -> C-1; - (_, _, Acc, _) -> Acc - end, - - FilterFun = roster:msg_filter_fun(-Count, fun msg_filter/2, UID, AccFun), - FilterFun2 = fun(Msg, {C, _} = Acc) when C > 0 -> FilterFun(Msg, Acc); - (_Msg, Acc) -> Acc - end, - - StopFun = roster:msg_stop_fun([], UID), - StopFun2 = fun(_Msg, {C, #reader{pos = Pos}}) when C =< 0; Pos =< 1 -> 0; - (Msg, A) -> StopFun(Msg, A) - end, - - {_, Rdr} = roster:fold(FilterFun2, {HistLimit, StartReader}, 'Message', LastMsgId, StopMsg, - #kvs{mod = store_mnesia}, #iterator.next, StopFun2), - norm_reader(Rdr#reader{pos = 0}). + Fun = fun(#'Message'{id = MsgId} = Msg, {Pos, HL}) -> + if Pos == 0; HL == 0 -> {stop, {MsgId, 0}}; + MsgId == StopMsg -> {stop, {MsgId, Pos}}; + true -> {cont, {Pos - 1, HL - msg_filter(Msg, UID)}} + end end, + {Cache, Pos} = + case fold_stream('Message', Fun, LastMsgId, {Count, HistLimit}, bwd) of + {eos, _} -> {[], 0}; + {ok, {MId, P}} -> {{'Message', MId}, P} + end, + Reader#reader{pos = Pos, cache = Cache}. %% Check if a message is within the history limit of the given reader. message_within_history_limit(MsgId, Reader) -> -- GitLab From c5fc6fd3c1eb338af19ac3fa6a41e38b621b9355 Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Tue, 21 Apr 2020 10:23:30 +0200 Subject: [PATCH 11/19] Fix bug with last read when adding a new member --- apps/roster/src/roster.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index 8e1064463..5f1c4d636 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -1107,7 +1107,7 @@ add_member(#'Room'{} = R, HistLimit = case HL of [] -> 0; [H | _] -> H - 1 end, Writer = kvs_stream:load_writer(Feed), LastRead = case kvs:get(reader, ReaderId) of - {ok, #reader{ cache = {'Message', MsgId} }} -> MsgId; + {ok, #reader{ pos = Pos, cache = {'Message', MsgId} }} when Pos > 0 -> MsgId; _ -> [] end, %% TODO: Update the existing reader instead of generating a new one -- GitLab From 96bc803c637c3aaaebc3e235307ed008f7db8b49 Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Tue, 21 Apr 2020 10:57:58 +0200 Subject: [PATCH 12/19] Don't delete reader when leaving a room This makes it behave as Room/remove and allows us to remember which messages are read if rejoining. --- apps/roster/src/protocol/roster_room.erl | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/apps/roster/src/protocol/roster_room.erl b/apps/roster/src/protocol/roster_room.erl index 758157f03..ef9cbacd2 100644 --- a/apps/roster/src/protocol/roster_room.erl +++ b/apps/roster/src/protocol/roster_room.erl @@ -319,7 +319,6 @@ info(#'Room'{status = leave, id = RoomId}, Req, #cx{params = ClientId, client_pi case lists:keyfind(roster:phone_id(ClientId), #'Member'.phone_id, MUCMembers) of #'Member'{phone_id = PhoneId, status = Status, reader = Reader} = M when [M] /= Admins -> roster:remove_rooms(ClientId, [RoomId]), - kvs:delete(reader, Reader), Room1 = roster:sort_readers(M#'Member'.id, Room0), Room2 = case Status of admin -> Room1#'Room'{ admins = [M] }; @@ -344,7 +343,7 @@ info(#'Room'{status = leave, id = RoomId}, Req, #cx{params = ClientId, client_pi n2o_async:pid(system, ?MODULE) ! {send_push, [], Msg, leave}, Room2#'Room'{update = roster:now_msec(), last_msg = Msg#'Message'.id} end, - kvs:put(M#'Member'{reader = 0, update = roster:now_msec(), status = removed}), + kvs:put(M#'Member'{update = roster:now_msec(), status = removed}), kvs:put(Room5), <<>>; #'Member'{} -> #io{code = #error{ code = last_admin_cant_leave }}; -- GitLab From e36feed1710f9bcbb7963cf8778960bc9b9ba30a Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Tue, 21 Apr 2020 10:59:53 +0200 Subject: [PATCH 13/19] Don't create additional readers in add_member --- apps/roster/src/roster.erl | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index 5f1c4d636..aea290392 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -1099,22 +1099,20 @@ add_member(#'Room'{} = R, #'Member'{id = MId, reader = 0, feed_id = Feed, phone_ #error{} -> {error, roster_not_found} %% TODO add error handler end; add_member(#'Room'{} = R, - #'Member'{id = MemberId, feed_id = #muc{name = Room} = Feed, reader = ReaderId, alias = Alias, phone_id = Roster } = M, + #'Member'{id = MemberId, feed_id = #muc{name = Room} = Feed, reader = ReaderId, alias = Alias, phone_id = Roster } = Member, {MsgPayload, HL}) when is_record(Roster, 'Roster') -> #'Roster'{id = RosterId, phone = Phone, names = Name, surnames = Surname} = Roster, PhoneId = phone_id(Phone, RosterId), %% Update reader to new history limit - HistLimit = case HL of [] -> 0; [H | _] -> H - 1 end, + HistLimit = case HL of [] -> 0; [H | _] -> H - 1 end, Writer = kvs_stream:load_writer(Feed), - LastRead = case kvs:get(reader, ReaderId) of - {ok, #reader{ pos = Pos, cache = {'Message', MsgId} }} when Pos > 0 -> MsgId; + {ok, Reader0} = kvs:get(reader, ReaderId), + LastRead = case Reader0 of + #reader{ pos = Pos, cache = {'Message', MsgId} } when Pos > 0 -> MsgId; _ -> [] end, - %% TODO: Update the existing reader instead of generating a new one - Reader1 = #reader{id = NewReaderId} - = kvs_stream:reader(Feed), %% Get reader, cached to first msg in feed - Reader2 = find_bot(Writer, Reader1, MemberId, LastRead, HistLimit), - Reader = kvs_stream:save(Reader2), + Reader = find_bot(Writer, Reader0, MemberId, LastRead, HistLimit), + kvs:put(Reader), {Msg, R1} = case MsgPayload of no_muc_message -> {[], R}; _ -> @@ -1122,12 +1120,14 @@ add_member(#'Room'{} = R, from = PhoneId, to = Room, files = [#'Desc'{payload = MsgPayload}]}, #writer{cache = LastMsg} = add_message(MucMsg), - {_, NewRoom} = put_readers(write_top, M#'Member'{reader = Reader}, R), + {_, NewRoom} = put_readers(write_top, Member#'Member'{reader = Reader}, R), {LastMsg, NewRoom} end, - update_rooms(RosterId, R1), subscribe_room(M2 = M#'Member'{phone_id = PhoneId}), + Member2 = Member#'Member'{phone_id = PhoneId, alias = member_alias(Alias, Name, Surname)}, + update_rooms(RosterId, R1), + subscribe_room(Member2), subscribe_muc(PhoneId, Feed), - {M2#'Member'{reader = NewReaderId, alias = member_alias(Alias, Name, Surname)}, Msg, R1}; + {Member2, Msg, R1}; add_member(#'Room'{} = R, #'Member'{phone_id = PhoneId} = M, Args) -> case kvs:get('Roster', roster_id(PhoneId)) of {ok, Roster} -> add_member(R, M#'Member'{phone_id = Roster}, Args); -- GitLab From 74f5c0e05f1324cd8bfddb8955bca2b304af5545 Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Tue, 21 Apr 2020 15:09:14 +0200 Subject: [PATCH 14/19] Simplify read notification logic - Changes internal representation of #Room.readers to none | {last_read, MsgId} - Send read notification on History/update with MsgId > last_read --- apps/roster/include/roster.hrl | 10 +- apps/roster/src/protocol/roster_history.erl | 2 +- apps/roster/src/protocol/roster_message.erl | 3 - apps/roster/src/protocol/roster_room.erl | 11 +- apps/roster/src/roster.erl | 130 ++++++++++---------- 5 files changed, 71 insertions(+), 85 deletions(-) diff --git a/apps/roster/include/roster.hrl b/apps/roster/include/roster.hrl index bbd71b7e2..f8375165b 100644 --- a/apps/roster/include/roster.hrl +++ b/apps/roster/include/roster.hrl @@ -140,7 +140,7 @@ -record('Link', {id = [] :: [] | binary(), %% link value name = [] :: [] | binary(), %% unused atm room_id = [] :: [] | binary(), %% parent Room id - created = [] :: [] | integer(), + created = [] :: [] | integer(), type = [] :: [] | group | channel, status = [] :: [] | gen | check | add %% old unused fields | get | join | update | delete}). %% new fields @@ -158,7 +158,7 @@ tos_update = 0 :: [] | integer(), unread = 0 :: [] | integer(), mentions = [] :: list(integer()), - readers = [] :: list(integer()), + readers = [] :: list(integer()) | none | {last_read, integer()}, %% last_read :: MsgId last_msg = [] :: [] | integer() | #'Message'{}, update = 0 :: [] | integer(), created = 0 :: [] | integer(), @@ -230,7 +230,7 @@ status = [] :: [] | get | create | del | remove | nick | add | update | list | patch | last_msg }). --record('Profile', +-record('Profile', { phone = [] :: [] | binary(), services = [] :: [] | list(#'Service'{}), @@ -239,7 +239,7 @@ update = 0 :: integer(), balance = 0 :: integer(), presence = [] :: [] | offline | online | binary(), - status = [] :: [] | remove | get | patch | update| delete | create + status = [] :: [] | remove | get | patch | update| delete | create }). -record('Presence', {uid = <<>> :: binary(), @@ -301,7 +301,7 @@ -record('MessageErr', {feed_id = [] :: #muc{} | #p2p{}, msg_id = [] :: [] | binary(), error = [] :: [] | #error{}}). - + -record(io, {code = [] :: [] | #ok{} | #error{} | #ok2{} | #error2{} | transcribe | #'MessageErr'{}, data = <<>> :: [] | <<>> | #'Roster'{} | { atom(), binary() | integer() }}). diff --git a/apps/roster/src/protocol/roster_history.erl b/apps/roster/src/protocol/roster_history.erl index 2be9f2f1b..11aafafa3 100644 --- a/apps/roster/src/protocol/roster_history.erl +++ b/apps/roster/src/protocol/roster_history.erl @@ -210,7 +210,7 @@ info(#'History'{status = update, feed = Feed, entity_id = MId}, Req, _ when element(1, Unit) == error -> #io{code = Unit}; true -> - {ReadMsg, UpdRoom} = roster:put_readers(read_top, Unit, UpdReader), + {ReadMsg, UpdRoom} = roster:put_readers(Unit, UpdReader), roster:send_feed(C, Feed, ReadMsg), {Record, Readers} = case Feed of diff --git a/apps/roster/src/protocol/roster_message.erl b/apps/roster/src/protocol/roster_message.erl index 0469b1a96..972de20e2 100644 --- a/apps/roster/src/protocol/roster_message.erl +++ b/apps/roster/src/protocol/roster_message.erl @@ -115,7 +115,6 @@ info(#'Message'{status = [], id = [], feed_id = F, from=From0, to = To, Type -> kvs_stream:save(roster:offset_reader(Writer, R)); _ -> kvs_stream:load_reader(R) end, - roster:put_readers(write_top, UID, Reader), LnkRes3 = case LnkRes of #'Message'{prev = Prev, repliedby = ReplBy} -> LnkRes2 = LnkRes#'Message'{repliedby = [MsgId | ReplBy], @@ -198,7 +197,6 @@ info(#'Message'{status = edit, id = Id, msg_id = ClMID, feed_id = Feed, from = F R0 = kvs_stream:load_reader(Reader), R = roster:offset_reader(Writer, R0), kvs_stream:save(R), - roster:put_readers(write_top, Ent, R), n2o_async:pid(system, ?MODULE) ! {send_push, From1, To1, Internal, ?MSG_EDIT_ACTION}, %% publish @@ -277,7 +275,6 @@ info(#'Message'{id = Id, msg_id = ClMID, feed_id = Feed, from = From0, seenby = R0 = kvs_stream:load_reader(Reader), R = roster:offset_reader(Writer, R0), kvs_stream:save(R), - roster:put_readers(write_top, Ent, R), %% publish %% TODO update last_msg in Room if the current deleted message is really last message in room diff --git a/apps/roster/src/protocol/roster_room.erl b/apps/roster/src/protocol/roster_room.erl index ef9cbacd2..4ce77546c 100644 --- a/apps/roster/src/protocol/roster_room.erl +++ b/apps/roster/src/protocol/roster_room.erl @@ -204,9 +204,8 @@ info(#'Room'{status = St, id = Room, members = Members, admins = Admins, readers from = APId, to = Room, msg_id = roster:msg_id(), files = [#'Desc'{payload = Payload}] }, Msg = roster:add_message(Msg0, Reader), - {_, PutRoom} = roster:put_readers(write_top, AdmMember, R), n2o_async:pid(system, ?MODULE) ! {send_push, NewMmbrs, Msg, RStatus2}, - {PutRoom, Msg} + {R, Msg} end, Rx2 = Rx1#'Room'{status = RStatus2, members = Members2, admins = Admins3, links = [roster_link:get_link({ok, Rx1})], last_msg = LastMsg}, @@ -263,10 +262,7 @@ info(#'Room'{status = remove, members = Members, admins = Admins0, id = Room}, R Msg = roster:add_message(#'Message'{feed_id = #muc{name = Room}, from = APId, to = Room, msg_id = roster:msg_id(), status = [], type = [sys], files = [#'Desc'{payload = iolist_to_binary([APId, <<" removed: ">>, lists:droplast(Aliases)])}]}, Reader), - StoredRoom1 = lists:foldl(fun(#'Member'{id = Id}, R) -> roster:sort_readers(Id, R) end, - StoredRoom, Mmbrs), - {_, StoredRoom2} = roster:put_readers(write_top, AdmMember, StoredRoom1), - kvs:put(R = StoredRoom2#'Room'{update = roster:now_msec(), last_msg = Msg#'Message'.id}), + kvs:put(R = StoredRoom#'Room'{update = roster:now_msec(), last_msg = Msg#'Message'.id}), [roster:update_rooms(RId, R#'Room'{status = removed}) || RId <- RosterIds], R2 = roster:reader_cache(R#'Room'{status = remove, last_msg = [], members = Members2, admins = Admins3}), @@ -313,13 +309,12 @@ info(#'Room'{status = leave, id = RoomId}, Req, #cx{params = ClientId, client_pi ?LOG_INFO("~p:Room/leave:~p", [ClientId, RoomId]), {reply, {bert, case kvs:get('Room', RoomId) of - {ok, #'Room'{} = Room0} -> + {ok, #'Room'{} = Room1} -> MUCMembers = roster:members(#muc{ name = RoomId }), Admins = roster:filter_members(MUCMembers, admin), case lists:keyfind(roster:phone_id(ClientId), #'Member'.phone_id, MUCMembers) of #'Member'{phone_id = PhoneId, status = Status, reader = Reader} = M when [M] /= Admins -> roster:remove_rooms(ClientId, [RoomId]), - Room1 = roster:sort_readers(M#'Member'.id, Room0), Room2 = case Status of admin -> Room1#'Room'{ admins = [M] }; _ -> Room1#'Room'{ members = [M] } diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index aea290392..b99a54454 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -1090,11 +1090,9 @@ add_member(#'Room'{} = R, #'Member'{id = MId, reader = 0, feed_id = Feed, phone_ Reader = kvs_stream:save(Rdr), {NewMId, AddFun} = case MId of [] -> {kvs:next_id('Member', 1), add};_ -> {MId, put} end, {ok, #'Profile'{settings = Settings}} = kvs:get('Profile', roster:phone(PhoneId)), - {M2, Msg, R2} = add_member(R, M#'Member'{id = NewMId, reader = NewR, container = chain, feed_id = Feed, + {M2, Msg, Room} = add_member(R, M#'Member'{id = NewMId, reader = NewR, container = chain, feed_id = Feed, alias = Alias2, names = Names, surnames = Surnames, avatar = Avatar, settings = Settings}, Args), kvs:AddFun(M2), - Room = case MsgPayload of no_muc_message -> R2; - _ -> {_, R3} = put_readers(write_top, M#'Member'{reader = Reader}, R), R3 end, {M2, Msg, Room}; #error{} -> {error, roster_not_found} %% TODO add error handler end; @@ -1113,21 +1111,20 @@ add_member(#'Room'{} = R, end, Reader = find_bot(Writer, Reader0, MemberId, LastRead, HistLimit), kvs:put(Reader), - {Msg, R1} = case MsgPayload of - no_muc_message -> {[], R}; + Msg = case MsgPayload of + no_muc_message -> []; _ -> MucMsg = #'Message'{status = [], type = [sys], feed_id = Feed, from = PhoneId, to = Room, files = [#'Desc'{payload = MsgPayload}]}, #writer{cache = LastMsg} = add_message(MucMsg), - {_, NewRoom} = put_readers(write_top, Member#'Member'{reader = Reader}, R), - {LastMsg, NewRoom} + LastMsg end, Member2 = Member#'Member'{phone_id = PhoneId, alias = member_alias(Alias, Name, Surname)}, - update_rooms(RosterId, R1), + update_rooms(RosterId, R), subscribe_room(Member2), subscribe_muc(PhoneId, Feed), - {Member2, Msg, R1}; + {Member2, Msg, R}; add_member(#'Room'{} = R, #'Member'{phone_id = PhoneId} = M, Args) -> case kvs:get('Roster', roster_id(PhoneId)) of {ok, Roster} -> add_member(R, M#'Member'{phone_id = Roster}, Args); @@ -1137,66 +1134,52 @@ member_alias([], [], []) -> []; member_alias([], N, []) -> N; member_alias([], [ member_alias([], Name, Surname) when Name /= [], Surname /= [] -> <>/binary, Surname/binary>>; member_alias(Alias, _, _) -> Alias. -put_readers(_TopAction, _Memeber, #'Room'{type = channel} = Room) -> {[], Room}; -put_readers(TopAction, #'Member'{reader = ReaderId} = M, #reader{id = ReaderId} = Reader) when is_integer(ReaderId) -> - put_readers(TopAction, M#'Member'{reader = Reader}); -put_readers(_, #'Member'{reader = #reader{cache = []}}, #'Room'{} = Room) -> {[], Room}; -put_readers(TopAction, #'Member'{reader = ReaderId} = M, Room) when is_integer(ReaderId) -> - put_readers(TopAction, M#'Member'{reader = kvs_stream:load_reader(ReaderId)}, Room); -put_readers(TopAction, #'Member'{id = Id, feed_id = #muc{name = RoomId}, phone_id = PhoneId, - reader = #reader{cache = {'Message', MsgId}}}, #'Room'{id = RoomId, readers = Readers} = Room) - when TopAction == write_top; TopAction == read_top -> - Readers2 = - case {Readers, TopAction} of - {[Id | _], _} -> Readers; - {[], _} -> [Id]; - {[0 | Rest], write_top} -> [Id | Rest -- [Id]]; - {[_ | _], write_top} -> lists:sublist([Id | Readers--[Id]], 2); - {[_], read_top} -> Readers ++ [Id]; - {[_, Id], read_top} -> Readers; - {[WMemberId, 0], read_top} -> [WMemberId, Id]; - {[WMemberId, RMemberId], read_top} -> - case kvs:get('Member', RMemberId) of - {ok, #'Member'{reader = RId}} -> - case kvs_stream:load_reader(RId) of - #reader{cache = {'Message', RMsgId}} when MsgId =< RMsgId -> - Readers; - _ -> - [WMemberId, Id] - end; - _ -> [WMemberId, Id] end; - Res -> ?LOG_INFO("invalid put_readers:~p", [Res]), Readers +put_readers(_Member, #'Room'{type = channel} = Room) -> {[], Room}; +put_readers(#'Member'{reader = #reader{cache = []}}, + #'Room'{} = Room) -> + {[], Room}; +put_readers(#'Member'{reader = #reader{cache = {'Message', MsgId}}, + feed_id = #muc{}, + phone_id = PhoneId}, + #'Room'{readers = Readers} = Room) -> + Max = fun([], Id) -> Id; + (Id, []) -> Id; + (Id1, Id2) -> max(Id1, Id2) + end, + NewReaders = + case Readers of + none -> {last_read, MsgId}; + {last_read, MsgId1} -> {last_read, Max(MsgId1, MsgId)}; + %% Deal with old representation + [] -> {last_read, MsgId}; + [_W] -> {last_read, MsgId}; + [_W, RMemId] -> + try + {ok, #'Member'{reader = RId}} = kvs:get('Member', RMemId), + #reader{cache = {'Message', MsgId1}} = kvs_stream:load_reader(RId), + {last_read, Max(MsgId1, MsgId)} + catch _:_ -> + {last_read, MsgId} + end end, - Room2 = case Readers2 of Readers -> Room;_ -> - kvs:put(R2 = Room#'Room'{readers = Readers2}), R2 end, - {case {TopAction, lists:member(Id, Readers2)} of - {read_top, true} -> + case NewReaders == Readers of + true -> {[], Room}; + false -> + NewRoom = Room#'Room'{readers = NewReaders}, + kvs:put(NewRoom), {ok, Msg} = kvs:get('Message', MsgId), - Msg#'Message'{type = [read], files = [], from = PhoneId}; - _ -> [] end, Room2}; - -put_readers(read_top, #'Contact'{phone_id = PhoneId}, #reader{cache = {'Message', MsgId}}) -> + {Msg#'Message'{type = [read], files = [], from = PhoneId}, NewRoom} + end; +put_readers(#'Member'{reader = ReaderId, feed_id = #muc{name = R}} = M, + #reader{id = ReaderId} = Reader) when is_integer(ReaderId) -> + {ok, Room} = kvs:get('Room', R), + put_readers(M#'Member'{reader = Reader}, Room); +put_readers(#'Contact'{phone_id = PhoneId}, #reader{cache = {'Message', MsgId}}) -> {ok, Msg} = kvs:get('Message', MsgId), {Msg#'Message'{type = [read], files = [], from = PhoneId}, []}; -put_readers(_TopAction, #'Contact'{} = _C, _Reader) -> {[], []}; -put_readers(T, M, R) -> - ?LOG_INFO("invalid put_readers: ~p", [{T, M, R}]), {[], []}. - -put_readers(TopAction, #'Member'{feed_id = #muc{name = R}} = M) -> - {ok, Room} = kvs:get('Room', R), - put_readers(TopAction, M, Room). - -sort_readers(MId, #'Room'{id = Room, type = group, readers = Rs} = R) -> - Rs1 = case Rs of - [MId] -> []; - [MId, 0] -> []; - [0, MId] -> []; - [MId, X] -> [0, X]; - [X, MId] -> [X, 0]; - _ -> Rs - end, - R#'Room'{readers = Rs1}; -sort_readers(_, #'Room'{} = R) -> R. +put_readers(#'Contact'{} = _C, _Reader) -> {[], []}; +put_readers(M, R) -> + ?LOG_INFO("invalid put_readers: ~p", [{M, R}]), {[], []}. readmsgs(#'Contact'{phone_id = R}=C, LocR) -> C#'Contact'{reader = p2p_readmsgs(LocR, R)}; readmsgs(M, _) -> M. @@ -1724,9 +1707,20 @@ split_members(Members) -> reader_cache(ReaderId) when is_integer(ReaderId)-> reader_cache(kvs_stream:load_reader(ReaderId)); reader_cache(#reader{pos = 0}) -> 0; reader_cache(#reader{cache = {'Message', MsgId}}) -> MsgId; -reader_cache(#'Room'{admins = Admins, members = Members, readers = MembIds} = R) -> - R#'Room'{admins = reader_cache(Admins), members = reader_cache(Members), - readers = lists:sublist(reader_cache(MembIds)++[0,0], 2)}; +reader_cache(#'Room'{admins = Admins, id = RoomId, members = Members, readers = Readers} = R) -> + LastMsg = case kvs:get(writer, #muc{name = RoomId}) of + {ok, #writer{cache = #'Message'{id = MId}}} -> MId; + _ -> 0 + end, + Readers1 = + case Readers of + none -> [LastMsg, 0]; + {last_read, MsgId} -> [LastMsg, MsgId]; + MembIds -> lists:sublist(reader_cache(MembIds) ++ [0, 0], 2) + end, + R#'Room'{admins = reader_cache(Admins), + members = reader_cache(Members), + readers = Readers}; reader_cache(#'Member'{reader = 0} = Member) -> Member; reader_cache(#'Member'{reader = ReaderId}=Member) -> Member#'Member'{reader = reader_cache(ReaderId)}; -- GitLab From 0a1e097d36b982305c71acb7e39b3062452c1541 Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Tue, 21 Apr 2020 15:46:05 +0200 Subject: [PATCH 15/19] Fix warnings --- apps/roster/src/protocol/roster_history.erl | 4 ++-- apps/roster/src/protocol/roster_message.erl | 12 ++++++------ apps/roster/src/protocol/roster_room.erl | 4 ++-- apps/roster/src/roster.erl | 10 ++++------ 4 files changed, 14 insertions(+), 16 deletions(-) diff --git a/apps/roster/src/protocol/roster_history.erl b/apps/roster/src/protocol/roster_history.erl index 11aafafa3..a33749071 100644 --- a/apps/roster/src/protocol/roster_history.erl +++ b/apps/roster/src/protocol/roster_history.erl @@ -285,7 +285,7 @@ clean_history(Feed, From, To, Rs) -> msg_id = iolist_to_binary([<<"rmv_history_">>, roster:msg_id()]), files = [#'Desc'{payload = <<"History was removed">>}]}), case kvs:get(writer, Feed) of - {ok, #writer{count = Top, id = #muc{name = MUC}} = W} -> + {ok, #writer{id = #muc{name = MUC}} = W} -> NewW = #writer{cache = #'Message'{id = NewFirst, next = OldLast} = M} = kvs_stream:add(W#writer{args = Msg#'Message'{created = roster:now_msec()}}), {ok, Room} = kvs:get('Room', MUC), @@ -303,7 +303,7 @@ clean_history(Feed, From, To, Rs) -> end || #'Member'{id = MemberId, phone_id = PhoneId, reader = R} <- Rs, R > 0], kvs_stream:save(NewW#writer{count = 1, first = M}), {0, M}; - {ok, #writer{count = Top, id = #p2p{}} = Writer} -> + {ok, #writer{id = #p2p{}} = Writer} -> %% The other party should not see the "History removed" message. Msg1 = Msg#'Message'{seenby = [To || To /= From], created = roster:now_msec()}, Writer1 = Writer#writer{args = Msg1}, diff --git a/apps/roster/src/protocol/roster_message.erl b/apps/roster/src/protocol/roster_message.erl index 972de20e2..4472cfac5 100644 --- a/apps/roster/src/protocol/roster_message.erl +++ b/apps/roster/src/protocol/roster_message.erl @@ -111,10 +111,10 @@ info(#'Message'{status = [], id = [], feed_id = F, from=From0, to = To, _ -> skip end, %% move cursor - Reader = case lists:subtract(Type, [cursor]) of - Type -> kvs_stream:save(roster:offset_reader(Writer, R)); - _ -> kvs_stream:load_reader(R) - end, + case lists:member(cursor, Type) of + false -> kvs_stream:save(roster:offset_reader(Writer, R)); + true -> ok + end, LnkRes3 = case LnkRes of #'Message'{prev = Prev, repliedby = ReplBy} -> LnkRes2 = LnkRes#'Message'{repliedby = [MsgId | ReplBy], @@ -176,7 +176,7 @@ info(#'Message'{status = edit, id = Id, msg_id = ClMID, feed_id = Feed, from = F IO = case Data of #io{} -> Data; - {#'Message'{from=From1, to = To1, type = Type, prev = Prev} = Message, Reader, Ent} -> + {#'Message'{from=From1, to = To1, type = Type, prev = Prev} = Message, Reader, _Ent} -> %% create system message NewType = case Type of @@ -259,7 +259,7 @@ info(#'Message'{id = Id, msg_id = ClMID, feed_id = Feed, from = From0, seenby = {reply, {bert, case D of {_, _, _, _, <<>>} -> #io{code = #error{code = invalid_data}}; - {#'Message'{id = MsgId, from = From1, to = To1, prev = Prev, type = Type, link = Link} = Message, Topic, Reader, Ent, NewSeen} -> + {#'Message'{id = MsgId, from = From1, to = To1, prev = Prev, type = Type, link = Link} = Message, Topic, Reader, _Ent, NewSeen} -> %% create system message SMsg = roster:desc_id(#'Message'{status = delete, msg_id = ClMID, feed_id = Feed, from = From1, to = To1, link = MsgId, seenby = NewSeen, diff --git a/apps/roster/src/protocol/roster_room.erl b/apps/roster/src/protocol/roster_room.erl index 4ce77546c..3c4b1e4d9 100644 --- a/apps/roster/src/protocol/roster_room.erl +++ b/apps/roster/src/protocol/roster_room.erl @@ -180,7 +180,7 @@ info(#'Room'{status = St, id = Room, members = Members, admins = Admins, readers {_, _, join} -> {<>, join}; _ -> {iolist_to_binary(["added by ", APId, ": ", lists:droplast(Aliases)]), add} end, - {AdmMember = #'Member'{reader = Reader}, Admins3} = + {#'Member'{reader = Reader}, Admins3} = case {Mmbr, lists:keyfind(APId, #'Member'.phone_id, Admins2)} of {[], false} when St == join -> {hd(Members2), []}; {#'Member'{status = removed}, _} when St == join -> {hd(Members2), []}; @@ -313,7 +313,7 @@ info(#'Room'{status = leave, id = RoomId}, Req, #cx{params = ClientId, client_pi MUCMembers = roster:members(#muc{ name = RoomId }), Admins = roster:filter_members(MUCMembers, admin), case lists:keyfind(roster:phone_id(ClientId), #'Member'.phone_id, MUCMembers) of - #'Member'{phone_id = PhoneId, status = Status, reader = Reader} = M when [M] /= Admins -> + #'Member'{phone_id = PhoneId, status = Status} = M when [M] /= Admins -> roster:remove_rooms(ClientId, [RoomId]), Room2 = case Status of admin -> Room1#'Room'{ admins = [M] }; diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index b99a54454..f22a21b8a 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -1081,13 +1081,11 @@ member_lastmsg2(PhoneId, Room) -> R -> R end. add_member(#'Room'{} = R, #'Member'{} = M) -> add_member(R, M, {no_muc_message, []}). -add_member(#'Room'{} = R, #'Member'{id = MId, reader = 0, feed_id = Feed, phone_id = PhoneId, alias = Alias} = M, - {MsgPayload, HL} = Args) -> +add_member(#'Room'{} = R, #'Member'{id = MId, reader = 0, feed_id = Feed, phone_id = PhoneId, alias = Alias} = M, Args) -> case kvs:get('Roster', roster_id(PhoneId)) of {ok, #'Roster'{nick = Nick, names = Names, surnames = Surnames, avatar = Avatar}} -> Alias2 = case Alias of EmptyAlias when EmptyAlias == []; EmptyAlias == <<>> -> Nick; _ -> Alias end, - Rdr = #reader{id = NewR} = kvs_stream:reader(Feed), %% Get reader, cached to first msg in feed - Reader = kvs_stream:save(Rdr), + #reader{id = NewR} = kvs_stream:save(kvs_stream:reader(Feed)), %% Get reader, cached to first msg in feed {NewMId, AddFun} = case MId of [] -> {kvs:next_id('Member', 1), add};_ -> {MId, put} end, {ok, #'Profile'{settings = Settings}} = kvs:get('Profile', roster:phone(PhoneId)), {M2, Msg, Room} = add_member(R, M#'Member'{id = NewMId, reader = NewR, container = chain, feed_id = Feed, @@ -1720,7 +1718,7 @@ reader_cache(#'Room'{admins = Admins, id = RoomId, members = Members, readers = end, R#'Room'{admins = reader_cache(Admins), members = reader_cache(Members), - readers = Readers}; + readers = Readers1}; reader_cache(#'Member'{reader = 0} = Member) -> Member; reader_cache(#'Member'{reader = ReaderId}=Member) -> Member#'Member'{reader = reader_cache(ReaderId)}; @@ -1790,7 +1788,7 @@ message_within_history_limit(MsgId, Reader) -> #reader{cache = {'Message', ReadMsg}, pos = P} when P > 0, MsgId >= ReadMsg -> true; #reader{cache = {'Message', ReadMsg}, pos = 0} when MsgId > ReadMsg -> true; #reader{cache = {'Message', ReadMsg}, pos = Pos} -> - FilterFun = fun(#'Message'{id = MId}, {InLimit, Rd = #reader{ pos = P }}) -> + FilterFun = fun(#'Message'{}, {InLimit, Rd = #reader{ pos = P }}) -> {InLimit andalso P > 0, Rd#reader{ pos = P - 1 }} end, {InLimit, _} = fold(FilterFun, {true, Reader}, 'Message', ReadMsg, MsgId - 1, -- GitLab From bdebc9b63b40ec0d06089944d63f58719cfe0317 Mon Sep 17 00:00:00 2001 From: Hans Svensson Date: Wed, 22 Apr 2020 11:41:05 +0200 Subject: [PATCH 16/19] find_bot really needs to get to the end of the chain (or the history limit) --- apps/roster/src/roster.erl | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index f22a21b8a..b95b7aaed 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -1769,15 +1769,20 @@ status_msg(T, UID, FS) -> LStatus = [[], clear], find_bot(#writer{cache = []}, #reader{} = Reader, _UID, _StopMsg, _HistLimit) -> Reader; find_bot(#writer{count = Count, cache = #'Message'{id = LastMsgId}}, #reader{} = Reader, UID, StopMsg, HistLimit) -> - Fun = fun(#'Message'{id = MsgId} = Msg, {Pos, HL}) -> - if Pos == 0; HL == 0 -> {stop, {MsgId, 0}}; - MsgId == StopMsg -> {stop, {MsgId, Pos}}; - true -> {cont, {Pos - 1, HL - msg_filter(Msg, UID)}} + Fun = fun(#'Message'{id = MsgId} = Msg, {Pos, HL, NewPos}) -> + HL1 = HL - msg_filter(Msg, UID), + if Pos == 0; HL == 0 -> {stop, {if NewPos > 0 -> StopMsg; + true -> MsgId + end, NewPos}}; + MsgId == StopMsg -> {cont, {Pos - 1, HL1, 1}}; + NewPos > 0 -> {cont, {Pos - 1, HL1, NewPos + 1}}; + true -> {cont, {Pos - 1, HL1, 0}} end end, {Cache, Pos} = - case fold_stream('Message', Fun, LastMsgId, {Count, HistLimit}, bwd) of - {eos, _} -> {[], 0}; - {ok, {MId, P}} -> {{'Message', MId}, P} + case fold_stream('Message', Fun, LastMsgId, {Count, HistLimit, 0}, bwd) of + {eos, {_, _, 0}} -> {[], 0}; + {eos, {_, _, NewPos}} -> {{'Message', StopMsg}, NewPos}; + {ok, {MId, P}} -> {{'Message', MId}, P} end, Reader#reader{pos = Pos, cache = Cache}. -- GitLab From ed4aaa5e972e189a1db797aff77d381118f2902a Mon Sep 17 00:00:00 2001 From: Hans Svensson Date: Thu, 23 Apr 2020 08:39:14 +0200 Subject: [PATCH 17/19] Rewrite message_within_history_limit to use fold_stream --- apps/roster/src/roster.erl | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index b95b7aaed..d643f3607 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -1793,12 +1793,15 @@ message_within_history_limit(MsgId, Reader) -> #reader{cache = {'Message', ReadMsg}, pos = P} when P > 0, MsgId >= ReadMsg -> true; #reader{cache = {'Message', ReadMsg}, pos = 0} when MsgId > ReadMsg -> true; #reader{cache = {'Message', ReadMsg}, pos = Pos} -> - FilterFun = fun(#'Message'{}, {InLimit, Rd = #reader{ pos = P }}) -> - {InLimit andalso P > 0, Rd#reader{ pos = P - 1 }} - end, - {InLimit, _} = fold(FilterFun, {true, Reader}, 'Message', ReadMsg, MsgId - 1, - #kvs{mod = store_mnesia}, #iterator.next, Pos + 1), - InLimit + Fun = fun(#'Message'{id = MsgId1}, P) -> + if P == 0 -> {stop, false}; %% Reached the limit + MsgId1 =< MsgId -> {stop, true}; %% Found the msg within the limit + true -> {cont, P - 1} + end end, + case fold_stream('Message', Fun, ReadMsg, Pos, bwd) of + {eos, _} -> false; + {ok, InLimit} -> InLimit + end end. start_reader(#reader{cache = []} = Reader) -> Reader#reader{cache = {'Message', []}}; -- GitLab From 60f1370d7ea3a8036617a3f6c2ac198394362a8a Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Tue, 12 May 2020 09:09:00 +0200 Subject: [PATCH 18/19] Formatting and comments --- apps/roster/src/protocol/roster_room.erl | 4 +- apps/roster/src/roster.erl | 60 ++++++++++++++++-------- 2 files changed, 43 insertions(+), 21 deletions(-) diff --git a/apps/roster/src/protocol/roster_room.erl b/apps/roster/src/protocol/roster_room.erl index 3c4b1e4d9..7ceb5789b 100644 --- a/apps/roster/src/protocol/roster_room.erl +++ b/apps/roster/src/protocol/roster_room.erl @@ -160,8 +160,8 @@ info(#'Room'{status = St, id = Room, members = Members, admins = Admins, readers kvs:put(M2 = roster:patch_member(M#'Member'{status = Status}, Member)), {ignore, M2}; _ -> - {M2, _, UpdRoom} = roster:add_member(TmpRoom, - Member#'Member'{feed_id = #muc{name = Room}, reader = 0}, {no_muc_message, HL}), + Member1 = Member#'Member'{feed_id = #muc{name = Room}, reader = 0}, + {M2, _, UpdRoom} = roster:add_member(TmpRoom, Member1, {no_muc_message, HL}), {M2, UpdRoom} end, case MmbrRoom of diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index d643f3607..0c97391fe 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -1085,8 +1085,12 @@ add_member(#'Room'{} = R, #'Member'{id = MId, reader = 0, feed_id = Feed, phone_ case kvs:get('Roster', roster_id(PhoneId)) of {ok, #'Roster'{nick = Nick, names = Names, surnames = Surnames, avatar = Avatar}} -> Alias2 = case Alias of EmptyAlias when EmptyAlias == []; EmptyAlias == <<>> -> Nick; _ -> Alias end, - #reader{id = NewR} = kvs_stream:save(kvs_stream:reader(Feed)), %% Get reader, cached to first msg in feed - {NewMId, AddFun} = case MId of [] -> {kvs:next_id('Member', 1), add};_ -> {MId, put} end, + %% Reader is 0, so this is a new member. Create a fresh reader + %% pointing to the first message in the feed. History limit is + %% taken care of in the next clause. + #reader{id = NewR} = kvs_stream:save(kvs_stream:reader(Feed)), + {NewMId, AddFun} = case MId of [] -> {kvs:next_id('Member', 1), add}; + _ -> {MId, put} end, {ok, #'Profile'{settings = Settings}} = kvs:get('Profile', roster:phone(PhoneId)), {M2, Msg, Room} = add_member(R, M#'Member'{id = NewMId, reader = NewR, container = chain, feed_id = Feed, alias = Alias2, names = Names, surnames = Surnames, avatar = Avatar, settings = Settings}, Args), @@ -1132,14 +1136,20 @@ member_alias([], [], []) -> []; member_alias([], N, []) -> N; member_alias([], [ member_alias([], Name, Surname) when Name /= [], Surname /= [] -> <>/binary, Surname/binary>>; member_alias(Alias, _, _) -> Alias. +%% put_readers(Member, Room) updates last read message for Room after getting a +%% read notification from Member. Looks at the reader of Member to get the read +%% message, and set #'Room'.readers :: none | {last_read, MsgId} accordingly. +%% We also have to deal with the old representation which might still exist in the +%% database. In that case readers is a list of length at most two, recording the member +%% id of the last writer and the furthest reader. put_readers(_Member, #'Room'{type = channel} = Room) -> {[], Room}; put_readers(#'Member'{reader = #reader{cache = []}}, #'Room'{} = Room) -> {[], Room}; -put_readers(#'Member'{reader = #reader{cache = {'Message', MsgId}}, - feed_id = #muc{}, - phone_id = PhoneId}, - #'Room'{readers = Readers} = Room) -> +put_readers(#'Member'{reader = #reader{cache = {'Message', MsgId}}, + feed_id = #muc{}, + phone_id = PhoneId}, + #'Room'{readers = Readers} = Room) -> Max = fun([], Id) -> Id; (Id, []) -> Id; (Id1, Id2) -> max(Id1, Id2) @@ -1169,7 +1179,7 @@ put_readers(#'Member'{reader = #reader{cache = {'Message', MsgId}}, {Msg#'Message'{type = [read], files = [], from = PhoneId}, NewRoom} end; put_readers(#'Member'{reader = ReaderId, feed_id = #muc{name = R}} = M, - #reader{id = ReaderId} = Reader) when is_integer(ReaderId) -> + #reader{id = ReaderId} = Reader) when is_integer(ReaderId) -> {ok, Room} = kvs:get('Room', R), put_readers(M#'Member'{reader = Reader}, Room); put_readers(#'Contact'{phone_id = PhoneId}, #reader{cache = {'Message', MsgId}}) -> @@ -1706,6 +1716,9 @@ reader_cache(ReaderId) when is_integer(ReaderId)-> reader_cache(kvs_stream:load_ reader_cache(#reader{pos = 0}) -> 0; reader_cache(#reader{cache = {'Message', MsgId}}) -> MsgId; reader_cache(#'Room'{admins = Admins, id = RoomId, members = Members, readers = Readers} = R) -> + %% Here we are preparing a Room to be sent to the client. In this case + %% readers should contain two message ids: the last written message and the + %% latest read message. LastMsg = case kvs:get(writer, #muc{name = RoomId}) of {ok, #writer{cache = #'Message'{id = MId}}} -> MId; _ -> 0 @@ -1767,21 +1780,30 @@ status_msg(T, UID, FS) -> LStatus = [[], clear], true -> erlang:setelement(FS, T, update); R -> R end; _ -> false end. -find_bot(#writer{cache = []}, #reader{} = Reader, _UID, _StopMsg, _HistLimit) -> Reader; -find_bot(#writer{count = Count, cache = #'Message'{id = LastMsgId}}, #reader{} = Reader, UID, StopMsg, HistLimit) -> - Fun = fun(#'Message'{id = MsgId} = Msg, {Pos, HL, NewPos}) -> - HL1 = HL - msg_filter(Msg, UID), - if Pos == 0; HL == 0 -> {stop, {if NewPos > 0 -> StopMsg; - true -> MsgId - end, NewPos}}; - MsgId == StopMsg -> {cont, {Pos - 1, HL1, 1}}; - NewPos > 0 -> {cont, {Pos - 1, HL1, NewPos + 1}}; - true -> {cont, {Pos - 1, HL1, 0}} - end end, +%% The job of the aptly named find_bot(Writer, Reader, UID, LastRead, HistLimit) +%% is to compute the reader for a new member of a room. It needs to consider the +%% history limit of the member, and if the member has been in the room before, +%% the last read message. The new reader should start at the earliest message +%% the member is allowed to read and point to the last read message, if this +%% exists and is inside the history limit. +find_bot(#writer{cache = []}, #reader{} = Reader, _UID, _LastRead, _HistLimit) -> Reader; +find_bot(#writer{count = Count, cache = #'Message'{id = LastMsgId}}, #reader{} = Reader, UID, LastRead, HistLimit) -> + Fun = fun(#'Message'{id = MsgId} = Msg, {Pos, HL, NewPos}) -> + HL1 = HL - msg_filter(Msg, UID), + %% To get the correct pos for the reader we need to step all + %% the way back to the history limit or the beginning of the + %% feed. + if Pos == 0; HL == 0 -> {stop, {if NewPos > 0 -> LastRead; + true -> MsgId + end, NewPos}}; + MsgId == LastRead -> {cont, {Pos - 1, HL1, 1}}; + NewPos > 0 -> {cont, {Pos - 1, HL1, NewPos + 1}}; + true -> {cont, {Pos - 1, HL1, 0}} + end end, {Cache, Pos} = case fold_stream('Message', Fun, LastMsgId, {Count, HistLimit, 0}, bwd) of {eos, {_, _, 0}} -> {[], 0}; - {eos, {_, _, NewPos}} -> {{'Message', StopMsg}, NewPos}; + {eos, {_, _, NewPos}} -> {{'Message', LastRead}, NewPos}; {ok, {MId, P}} -> {{'Message', MId}, P} end, Reader#reader{pos = Pos, cache = Cache}. -- GitLab From 8d84dc0231a1a0b388c7cdbec5b93f70962aa833 Mon Sep 17 00:00:00 2001 From: Ulf Norell Date: Tue, 12 May 2020 12:01:03 +0200 Subject: [PATCH 19/19] Implement History/get using roster:fold_stream --- apps/roster/src/protocol/roster_history.erl | 109 ++++++++++++-------- apps/roster/src/roster.erl | 35 ++----- 2 files changed, 75 insertions(+), 69 deletions(-) diff --git a/apps/roster/src/protocol/roster_history.erl b/apps/roster/src/protocol/roster_history.erl index a33749071..39307e7b4 100644 --- a/apps/roster/src/protocol/roster_history.erl +++ b/apps/roster/src/protocol/roster_history.erl @@ -82,7 +82,6 @@ info(#'History'{status = get, roster_id = Roster0, feed = Feed, size = N, entity _ -> {N, []} end, - Iter = case N2 > 0 of true -> #iterator.prev; _ -> #iterator.next end, %% determine the chain direction Reader = #reader{cache = Cache} = kvs_stream:load_reader(R), %% load the reader %% Set starting point, 0 means do it automatically MId2 = @@ -98,57 +97,79 @@ info(#'History'{status = get, roster_id = Roster0, feed = Feed, size = N, entity _ -> MId end, %% Normalize if MId was too large... - MaxReadId = min(MId2, LastMsgId), + StartId = min(MId2, LastMsgId), - %% Check that MaxReadId is not before history limit - case roster:message_within_history_limit(MaxReadId, Reader) of - true -> ok; - false -> throw({error, invalid_data}) - end, + %% History limits are encoded in the reader of a member. The + %% message corresponding to #reader.pos = 1 is the first + %% message the member is allowed to read. So, to make sure we + %% respect the history limit we need to know the reader + %% position of StartId, if any. Fail if StartId is before + %% history limit. If StartId is unread we handle reader pos in + %% FoldFun. + Reader1 = + case roster:message_within_history_limit(StartId, Reader) of + unread -> Reader; + {reader_pos, P} -> Reader#reader{pos = P, cache = {'Message', StartId}}; + forbidden -> throw({error, invalid_data}) + end, - Filter = case N of [] -> msg_update; _ -> msg_filter end, %% select filter function - {InnerFilterFun, Mime2} = - case MsgData of - [#'Message'{files = [#'Desc'{mime = Mime = <<_:8, _/binary>>}]}] -> - {fun(#'Message'{files = Descs} = Msg, UIDLocal) -> %% filter by mime type - case roster:msg_filter(Msg, UIDLocal) of - 1 -> roster:bool_to_int(lists:keyfind(Mime, #'Desc'.mime, Descs)); - 0 -> 0 - end end, Mime}; - [] -> {fun roster:Filter/2, []}; - _ -> {fun roster:msg_filter/2, []} - end, + {Mime, MatchMime} = + case MsgData of + [#'Message'{files = [#'Desc'{mime = Mime1 = <<_:8, _/binary>>}]}] -> + {Mime1, fun(#'Message'{files = Descs}) -> + lists:keymember(Mime1, #'Desc'.mime, Descs) + end}; + _ -> + {[], fun(_) -> true end} + end, - StartId = - case Reader of - #reader{cache = {'Message', ReadMsg}} when N2 < 0, MaxReadId < ReadMsg -> - ReadMsg; %% if from message is lower then reader is placed - _ -> - MaxReadId - end, + Dir = case N2 > 0 of true -> fwd; false -> bwd end, - AccFun = fun (_, #'Message'{id = Id}, Acc, Dir) when Dir < 0, Id > MaxReadId -> Acc; %% if from message is lower then reader - (1, Msg, Acc, Dir) when Dir < 0 -> Acc ++ [roster:wrap_msg(Msg)]; - (0, _Msg, Acc, _Dir) -> Acc; - (1, Msg, Acc, _Dir) -> [roster:wrap_msg(Msg) | Acc] - end, + AccFun = fun(Msg, Count, Acc) -> + case roster:msg_filter(Msg, UID) > 0 andalso MatchMime(Msg) of + false -> {Count, Acc}; + true -> {Count - 1, [roster:wrap_msg(Msg) | Acc]} + end end, - FilterFun = - fun (Msg, {Acc, _} = A) when length(Acc) < abs(N2) -> - (roster:msg_filter_fun2(N2, InnerFilterFun, UID, AccFun))(Msg, A); - (_, Acc) -> Acc - end, + FoldFun = + case Dir of + fwd -> + fun(_, {0, _, Acc}) -> {stop, Acc}; + (#'Message'{id = Id}, {_, _, Acc}) when Id > FId -> {stop, Acc}; + (Msg, {Count, _Unused, Acc}) -> + {Count1, Acc1} = AccFun(Msg, Count, Acc), + {cont, {Count1, unused, Acc1}} + end; + bwd -> + IsInSync = + case Reader1 of + #reader{cache = {'Message', CursorId}} -> + fun(#'Message'{id = Id}) -> Id =< CursorId end; + _ -> + fun(_) -> false end + end, + fun(_, {0, _, Acc}) -> {stop, Acc}; + (#'Message'{id = Id}, {_, _, Acc}) when is_integer(FId), Id < FId -> {stop, Acc}; + (Msg, {Count, Pos, Acc}) -> + {Count1, Acc1} = AccFun(Msg, Count, Acc), + InSync = IsInSync(Msg), %% If we started on an unread message we might + Pos1 = if InSync -> Pos - 1; true -> Pos end, %% not yet be in sync with the reader pos. + if InSync, Pos =< 0 -> {stop, Acc}; %% Reached history limit + true -> {cont, {Count1, Pos1, Acc1}} + end + end + end, - StopFun = fun(Msg, {Acc, _} = A) when length(Acc) < abs(N2) -> - (roster:msg_stop_fun([], UID))(Msg, A); - (_, Acc) -> Acc end, - {Msgs, _} = - roster:fold(FilterFun, {[], roster:start_reader(Reader)}, - 'Message', StartId, FId, #kvs{mod = store_mnesia}, Iter, StopFun), + Msgs0 = + case roster:fold_stream('Message', FoldFun, StartId, {abs(N2), Reader1#reader.pos, []}, Dir) of + {ok, Ms} -> Ms; + {eos, {_, _, Ms}} -> Ms + end, + Msgs = case Dir of bwd -> lists:reverse(Msgs0); fwd -> Msgs0 end, Msgs2 = case is_integer(MId) of - true when MId > 0, Mime2 == [] -> + true when MId > 0, Mime == [] -> case kvs:get('Message', MId) of {ok, #'Message'{} = EntityMsg} -> lists:reverse(lists:ukeymerge(#'Message'.id, lists:reverse(Msgs), @@ -168,7 +189,7 @@ info(#'History'{status = get, roster_id = Roster0, feed = Feed, size = N, entity %% _ -> Msgs2 end, %% CheckedResponseMsgs = [roster:check_message(Msg) || Msg <- ResponseMsgs], case Msgs2 of #error{} = E -> #io{code = E}; _ -> - History#'History'{data = Msgs2, size = length(Msgs2), status = InitialStatus} end + History#'History'{data = Msgs2, size = length(Msgs2), status = InitialStatus} end catch throw:#error{} = Err -> #io{code = Err} end diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index 0c97391fe..2bb908fca 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -1809,20 +1809,22 @@ find_bot(#writer{count = Count, cache = #'Message'{id = LastMsgId}}, #reader{} = Reader#reader{pos = Pos, cache = Cache}. %% Check if a message is within the history limit of the given reader. +-spec message_within_history_limit(non_neg_integer(), #reader{}) -> + forbidden | {reader_pos, non_neg_integer()} | unread. message_within_history_limit(MsgId, Reader) -> case Reader of - #reader{cache = []} -> true; - #reader{cache = {'Message', ReadMsg}, pos = P} when P > 0, MsgId >= ReadMsg -> true; - #reader{cache = {'Message', ReadMsg}, pos = 0} when MsgId > ReadMsg -> true; + #reader{cache = []} -> unread; + #reader{cache = {'Message', ReadMsg}, pos = P} when P > 0, MsgId >= ReadMsg -> unread; + #reader{cache = {'Message', ReadMsg}, pos = 0} when MsgId > ReadMsg -> unread; #reader{cache = {'Message', ReadMsg}, pos = Pos} -> Fun = fun(#'Message'{id = MsgId1}, P) -> - if P == 0 -> {stop, false}; %% Reached the limit - MsgId1 =< MsgId -> {stop, true}; %% Found the msg within the limit + if P == 0 -> {stop, forbidden}; %% Reached the limit + MsgId1 == MsgId -> {stop, {reader_pos, P}}; %% Found the msg within the limit true -> {cont, P - 1} end end, case fold_stream('Message', Fun, ReadMsg, Pos, bwd) of - {eos, _} -> false; - {ok, InLimit} -> InLimit + {eos, _} -> forbidden; + {ok, Res} -> Res end end. @@ -1887,23 +1889,6 @@ msg_filter_fun(DirSize, InnerFilterFun, UID, AccFun) -> Rdr#reader{cache = {'Message', element(Iter, Msg)}, pos = Pos + Sign}; _ -> Rdr end}; (_, A) -> A end. -msg_filter_fun2(DirSize, InnerFilterFun, UID, AccFun) -> - {Iter, Sign} = case DirSize > 0 of true -> {#iterator.prev, 1}; _ -> {#iterator.next, -1} end, - fun(#'Message'{id = Id} = Msg, {Acc, #reader{cache = {'Message', LastReadId}, pos = Pos} = Rdr}) -> - %% Run AccFun if not yet caught up with reader or reader pos > 0 - Acc1 = case Id /= LastReadId orelse Pos > 0 of - true -> AccFun(InnerFilterFun(Msg, UID), Msg, Acc, DirSize); - false -> Acc - end, - %% Update reader if in sync - Rdr1 = case Id == LastReadId andalso Pos >= 0 of - true -> Rdr#reader{cache = {'Message', element(Iter, Msg)}, pos = Pos + Sign}; - false -> Rdr - end, - {Acc1, Rdr1}; - (_, A) -> A - end. - msg_stop_fun(MaxReadId, UID) -> msg_stop_fun(MaxReadId, UID, 1). msg_stop_fun(MaxReadId, UID, Dir) -> @@ -2088,7 +2073,7 @@ next_key(Table, CurrentId, Count) -> Fun :: fun((Element, Acc) -> {stop, Res} | {cont, Acc}), Id :: ElementId, Acc :: Acc, - Dir :: fwd | bwd) -> {stop, Res} | {eos, Acc} when + Dir :: fwd | bwd) -> {ok, Res} | {eos, Acc} when ElementId :: non_neg_integer(), Element :: tuple(). fold_stream(_Table, _Fun, [], Acc, _Dir) -> -- GitLab