From 029b6aef45d51691c32053aedf5cecba6affc4b8 Mon Sep 17 00:00:00 2001 From: Hans Svensson Date: Wed, 8 Apr 2020 21:26:28 +0200 Subject: [PATCH] Adding comments, better layout, better names, etc No functional changes --- apps/roster/src/protocol/roster_history.erl | 105 ++++++++++++-------- apps/roster/src/protocol/roster_room.erl | 75 ++++++++------ apps/roster/src/roster.erl | 47 +++++++-- 3 files changed, 144 insertions(+), 83 deletions(-) diff --git a/apps/roster/src/protocol/roster_history.erl b/apps/roster/src/protocol/roster_history.erl index 40613e416..31a00d5bd 100644 --- a/apps/roster/src/protocol/roster_history.erl +++ b/apps/roster/src/protocol/roster_history.erl @@ -59,61 +59,84 @@ info(#'History'{status = get, roster_id = Roster0, feed = Feed, size = N, entity PhoneId = roster:phone_id(ClientId), ?LOG_INFO("~p:~p:History/get:~p:~p", [PhoneId, ClientId, Feed, History]), {UID, R, LastMsgId, _Unit} = - case PhoneId of Roster0 -> roster:get_feed_data(Feed, PhoneId); - _ -> {error, permission_denied, [],[]} end, + case PhoneId == Roster0 of + true -> roster:get_feed_data(Feed, PhoneId); + _ -> {error, permission_denied, [],[]} + end, IO = case UID of error -> ?LOG_INFO("History/get.Error:~p", [R]), #io{code = #error{code = R}}; _ -> - {N2, FId} = case {N, MsgData} of - {_, [#'Message'{id = MId}]} when is_integer(MId) -> throw({error, invalid_data}); - {_, [#'Message'{id = FinishId}]} when is_integer(FinishId), FinishId /= MId -> - K = FinishId-MId, - {case is_integer(N) of true -> round(abs(N)*abs(K)/K); _ -> K end, FinishId}; - {[], []} -> #writer{count = Count} = kvs_stream:load_writer(Feed), - {case MId > 0 of true -> Count;_-> -Count end, []}; - _ -> {N, []} end, + {N2, FId} = + case {N, MsgData} of + {_, [#'Message'{id = MId}]} when is_integer(MId) -> + throw({error, invalid_data}); + {_, [#'Message'{id = FinishId}]} when is_integer(FinishId), FinishId /= MId -> + K = FinishId-MId, + {case is_integer(N) of true -> round(abs(N)*abs(K)/K); _ -> K end, FinishId}; + {[], []} -> + #writer{count = Count} = kvs_stream:load_writer(Feed), + {case MId > 0 of true -> Count; _ -> -Count end, []}; + _ -> + {N, []} + end, Iter = case N2 > 0 of true -> #iterator.prev; _ -> #iterator.next end, %% determine the chain direction - Reader=#reader{cache=Cache} = kvs_stream:load_reader(R), - MId2 = case MId of - 0 when N>0 andalso N/=[] andalso Cache/=[] -> {_,MsId}=Cache,MsId; - 0 when N>0 andalso N/=[] -> case kvs:get(writer,Feed) of - {ok, #writer{first=#'Message'{id = MsId}}}-> MsId; - _ -> LastMsgId end; + Reader = #reader{cache = Cache} = kvs_stream:load_reader(R), %% load the reader + %% Set starting point, 0 means do it automatically + MId2 = + case MId of + %% N > 0 means going forward find cached or first MsgId (if any) + 0 when N>0 andalso N/=[] andalso Cache/=[] -> {_, MsId} = Cache, MsId; + 0 when N>0 andalso N/=[] -> + case kvs:get(writer, Feed) of + {ok, #writer{first=#'Message'{id = MsId}}} -> MsId; + _ -> LastMsgId %% Last resort, there are no messages in this Feed?! + end; 0 -> LastMsgId; - _ -> MId end, + _ -> MId + end, + %% Normalize if MId was too large... MaxReadId = case MId2 > LastMsgId of true -> LastMsgId; _ -> MId2 end, - Filter = case N of [] -> msg_update; _ -> msg_filter end, %% select filtration function - {InnerFilterFun, Mime2} = case MsgData of - [#'Message'{files = [#'Desc'{mime = Mime = <<_:8, _/binary>>}]}] -> - {fun(#'Message'{files = Descs} = Msg, UIDLocal) -> %% filter by mime type - case roster:msg_filter(Msg, UIDLocal) of - 1 -> roster:bool_to_int(lists:keyfind(Mime, #'Desc'.mime, Descs)); - 0 -> 0 end end, Mime}; - [] -> {fun roster:Filter/2, []}; - _ -> {fun roster:msg_filter/2, []} - end, + Filter = case N of [] -> msg_update; _ -> msg_filter end, %% select filter function + {InnerFilterFun, Mime2} = + case MsgData of + [#'Message'{files = [#'Desc'{mime = Mime = <<_:8, _/binary>>}]}] -> + {fun(#'Message'{files = Descs} = Msg, UIDLocal) -> %% filter by mime type + case roster:msg_filter(Msg, UIDLocal) of + 1 -> roster:bool_to_int(lists:keyfind(Mime, #'Desc'.mime, Descs)); + 0 -> 0 + end end, Mime}; + [] -> {fun roster:Filter/2, []}; + _ -> {fun roster:msg_filter/2, []} + end, + + StartId = + case Reader of + #reader{cache = {'Message', ReadMsg}} when N2 < 0, MaxReadId < ReadMsg -> + ReadMsg; %% if from message is lower then reader is placed + _ -> + MaxReadId + end, - StartId = case Reader of - #reader{cache = {'Message', ReadMsg}} - when N2 < 0, MaxReadId < ReadMsg -> ReadMsg; %% if from message is lower then reader is placed - _ -> MaxReadId end, + AccFun = fun (_, #'Message'{id = Id}, Acc, Dir) when Dir < 0, Id > MaxReadId -> Acc; %% if from message is lower then reader + (1, Msg, Acc, Dir) when Dir < 0 -> Acc ++ [roster:wrap_msg(Msg)]; + (0, _Msg, Acc, _Dir) -> Acc; + (1, Msg, Acc, _Dir) -> [roster:wrap_msg(Msg) | Acc] + end, - AccFun = fun(_, #'Message'{id = Id}, Acc, Dir) when Dir < 0, Id > MaxReadId -> Acc; %% if from message is lower then reader - (1, Msg, Acc, Dir) when Dir < 0 -> Acc++[roster:wrap_msg(Msg)]; - (0,_Msg, Acc,_Dir) -> Acc; - (1, Msg, Acc,_Dir) -> [roster:wrap_msg(Msg)|Acc] end, + FilterFun = + fun (Msg, {Acc, _} = A) when length(Acc) < abs(N2) -> + (roster:msg_filter_fun(N2, InnerFilterFun, UID, AccFun))(Msg, A); + (_, Acc) -> Acc + end, - FilterFun = - fun(Msg, {Acc, _} = A) when length(Acc) < abs(N2) -> - (roster:msg_filter_fun(N2, InnerFilterFun, UID, AccFun))(Msg, A); - (_, Acc) -> Acc end, StopFun = fun(Msg, {Acc, _} = A) when length(Acc) < abs(N2) -> (roster:msg_stop_fun([], UID))(Msg, A); (_, Acc) -> Acc end, - {Msgs, _} = roster:fold(FilterFun, {[], roster:start_reader(Reader)}, - 'Message', StartId, FId, #kvs{mod = store_mnesia}, Iter, StopFun), + {Msgs, _} = + roster:fold(FilterFun, {[], roster:start_reader(Reader)}, + 'Message', StartId, FId, #kvs{mod = store_mnesia}, Iter, StopFun), Msgs2 = case is_integer(MId) of diff --git a/apps/roster/src/protocol/roster_room.erl b/apps/roster/src/protocol/roster_room.erl index 0c4aeccdb..b9aba8bd5 100644 --- a/apps/roster/src/protocol/roster_room.erl +++ b/apps/roster/src/protocol/roster_room.erl @@ -146,7 +146,7 @@ info(#'Room'{status = St, id = Room, members = Members, admins = Admins, readers Mmbr#'Member'.status == removed andalso Prefix == <<"sys">>) -> {Mmbrs, Aliases, NewMmbrs, StoredRoom2} = lists:foldl( - fun(#'Member'{phone_id = PhoneId, status = Status} = Member, {Ms, Alss, NewMs, TmpRoom} = MAcc) + fun(#'Member'{phone_id = PhoneId, status = Status} = Member, {Ms, Alss, NewMs, TmpRoom}) when Status == admin; Status == member -> MmbrRoom = case roster:muc_member(PhoneId, Room, presence) of @@ -163,14 +163,12 @@ info(#'Room'{status = St, id = Room, members = Members, admins = Admins, readers Member#'Member'{feed_id = #muc{name = Room}, reader = 0}, {no_muc_message, HL}), {M2, UpdRoom} end, - case MmbrRoom of error -> - ?LOG_INFO("ERROR:add_member:RosterNotFound:~p", [PhoneId]), - MAcc; + case MmbrRoom of {ignore, M5} -> {Ms ++ [M5#'Member'{presence = roster:is_online(PhoneId)}], Alss, NewMs, StoredRoom}; {#'Member'{alias = Alias} = M4, UpdRoom2} -> {Ms ++ [M4#'Member'{presence = roster:is_online(PhoneId)}], - Alss ++ [Alias, <<",">>], NewMs ++ [M4], UpdRoom2} end + Alss ++ [Alias, <<",">>], NewMs ++ [M4], UpdRoom2} end end, {[], [], [], StoredRoom}, Admins ++ Members), {Admins2, Members2} = roster:split_members(Mmbrs), {Payload, RStatus2} = @@ -191,30 +189,44 @@ info(#'Room'{status = St, id = Room, members = Members, admins = Admins, readers R = StoredRoom2#'Room'{update = roster:now_msec()}, - R2 = case Mmbrs of + R2 = + case Mmbrs of [] -> R; - _ -> {R3, R4} = - case Aliases of - [] -> - {R#'Room'{last_msg = element(2, kvs:get('Message', R#'Room'.last_msg))}, R}; - _ -> Msg = roster:add_message( - #'Message'{status = [], type = [sys], feed_id = #muc{name = Room}, from = APId, - to = Room, msg_id = roster:msg_id(), - files = [#'Desc'{payload = Payload}]}, Reader), - {_, PutRoom} = roster:put_readers(write_top, AdmMember, R), - n2o_async:pid(system, ?MODULE) ! {send_push, NewMmbrs, Msg, RStatus2}, - {PutRoom#'Room'{last_msg = Msg}, PutRoom#'Room'{last_msg = Msg#'Message'.id}} end, - R5 = R3#'Room'{status = RStatus2, members = Members2, admins = Admins3, links=[roster_link:get_link({ok,R3})]}, - [ begin roster:send_ses(C, roster:phone(PhId), - roster:reader_cache(R5#'Room'{unread = - case {RStatus, lists:keymember(MembId, #'Member'.id, Mmbrs)} of - {create, _} when PhId == APId -> 0; {create, true} -> 1; - _ -> element(1, roster:unread_msg(R3#'Room'.last_msg, Rdr, PhId)) - end})) end - || #'Member'{id = MembId, phone_id = PhId, reader = Rdr, status = SST} - <- roster:members(#muc{name = Room}), SST /= removed], - R4 end, - kvs:put(R2), <<>>; + _ -> + {Rx1, LastMsg} = + case Aliases of + [] -> + {ok, LMsg} = kvs:get('Message', R#'Room'.last_msg), + {R, LMsg}; + _ -> + Msg0 = #'Message'{ status = [], type = [sys], feed_id = #muc{name = Room}, + from = APId, to = Room, msg_id = roster:msg_id(), + files = [#'Desc'{payload = Payload}] }, + Msg = roster:add_message(Msg0, Reader), + {_, PutRoom} = roster:put_readers(write_top, AdmMember, R), + n2o_async:pid(system, ?MODULE) ! {send_push, NewMmbrs, Msg, RStatus2}, + {PutRoom, Msg} + end, + Rx2 = Rx1#'Room'{status = RStatus2, members = Members2, admins = Admins3, + links = [roster_link:get_link({ok, Rx1})], last_msg = LastMsg}, + + [ begin + Unread = + case {RStatus, lists:keymember(MembId, #'Member'.id, Mmbrs)} of + {create, _} when PhId == APId -> 0; + {create, true} -> 1; + _ -> + {Unread0, _, _} = roster:unread_msg(LastMsg, Rdr, PhId), + Unread0 + end, + roster:send_ses(C, roster:phone(PhId), roster:reader_cache(Rx2#'Room'{unread = Unread})) + end || #'Member'{id = MembId, phone_id = PhId, reader = Rdr, status = SST} + <- roster:members(#muc{name = Room}), SST /= removed], + + Rx1#'Room'{last_msg = LastMsg#'Message'.id} + end, + kvs:put(R2), + <<>>; _ -> #io{code = #error{code = permission_denied}} end; {error, _} -> #io{code = #error{code = room_not_found}} end, {reply, {bert, IO}, Req, State}; @@ -239,11 +251,12 @@ info(#'Room'{status = remove, members = Members, admins = Admins0, id = Room}, R roster:unsubscribe_room(M), %% Unsubscribe this member from the room. RosterId = roster:roster_id(PhoneId), {ok, #'Roster'{roomlist = Rooms} = Roster} = kvs:get('Roster', RosterId), - roster:unsubscribe_muc(Roster#'Roster'{roomlist = - lists:ukeymerge(#'Room'.id, [StoredRoom#'Room'{status = removed}], Rooms)}, Feed), + RoomList = lists:ukeymerge(#'Room'.id, [StoredRoom#'Room'{status = removed}], Rooms), + roster:unsubscribe_muc(Roster#'Roster'{roomlist = RoomList}, Feed), kvs:put(M2 = M#'Member'{status = removed, update = roster:now_msec()}), {[M2 | Ms], [Alias, <<",">> | Alss], [RosterId | RIds]}; - false -> Acc end end, {[], [], []}, Members ++ Admins), + false -> Acc + end end, {[], [], []}, Members ++ Admins), {Admins2, Members2} = roster:split_members(Mmbrs), Admins3 = lists:ukeymerge(#'Member'.id, lists:ukeysort(#'Member'.id, Admins2), [AdmMember]), Msg = roster:add_message(#'Message'{feed_id = #muc{name = Room}, from = APId, to = Room, diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index a4ef16b3c..c4dafab74 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -695,11 +695,14 @@ del_jobs(Name, PhoneId) -> ok. add_message(#'Message'{feed_id = Feed} = Msg) -> - kvs_stream:save(kvs_stream:add( - (kvs_stream:load_writer(Feed))#writer{args = - desc_id(Msg#'Message'{created = now_msec()})})). + Writer = kvs_stream:load_writer(Feed), + DescId = desc_id(Msg#'Message'{created = now_msec()}), + kvs_stream:save(kvs_stream:add(Writer#writer{ args = DescId })). + add_message(#'Message'{} = Msg, Reader) -> - kvs_stream:save(offset_reader(#writer{cache = LMsg} = add_message(Msg), Reader)), LMsg. + Writer = add_message(Msg), + kvs_stream:save(offset_reader(Writer, Reader)), + Writer#writer.cache. -spec unread_msg(#muc{} |#p2p{} |#writer{} |#error{}, integer() | #reader{}, @@ -744,6 +747,20 @@ unread_msg(F, R, U) -> ?LOG_INFO("unread_msg2: unexpected data format: ~p", [{F, R, U}]), {0, [], []}. +%% Counting unread messages in feed +%% Writer has MaxReadId (last message written) and Count = number of messages +%% Reader has ReadMsgId (possibly []) and ReadCurPos. +%% +%% We traverse the message chain starting from MaxReadId. +%% We increment Unread if: +%% - the message is an active message (not edit/delete) AND +%% - MsgId > ReadMsgId OR ReadMsgId == [] +%% +%% We stop if: +%% - Pos (in reader) becomes negative OR +%% - MsgId =< ReadMsgId AND ReadCurPos >= Pos. +%% +%% ReadCurPos is always >= Pos so that condition isn't needed. unread_msg(#writer{cache = #'Message'{id = MaxReadId}, count = Count}, #reader{cache = {'Message', ReadMsgId}, pos = ReadCurPos} = Reader, UID, GetFun) -> AccFun = @@ -1070,7 +1087,7 @@ add_member(#'Room'{} = R, #'Member'{id = MId, reader = 0, feed_id = Feed, phone_ {ok, #'Roster'{nick = Nick, names = Names, surnames = Surnames, avatar = Avatar}} -> Alias2 = case Alias of EmptyAlias when EmptyAlias == []; EmptyAlias == <<>> -> Nick; _ -> Alias end, Rdr = #reader{id = NewR} = kvs_stream:reader(Feed), %% Get reader, cached to first msg in feed - HistLimit = case HL of [] -> 0; [H | _] -> H-1 end, + HistLimit = case HL of [] -> 0; [H | _] -> H - 1 end, Rdr2 = find_bot(kvs_stream:load_writer(Feed), Rdr, MId, HistLimit), %% Move reader, to history-limit Reader = kvs_stream:save(Rdr2), {NewMId, AddFun} = case MId of [] -> {kvs:next_id('Member', 1), add};_ -> {MId, put} end, @@ -1083,14 +1100,20 @@ add_member(#'Room'{} = R, #'Member'{id = MId, reader = 0, feed_id = Feed, phone_ {M2, Msg, Room}; #error{} -> {error, roster_not_found} %% TODO add error handler end; -add_member(#'Room'{} = R, #'Member'{feed_id = #muc{name = Room} = Feed, alias = Alias, - phone_id = #'Roster'{id = RosterId, phone = Phone, - names = Name, surnames = Surname}} = M, {MsgPayload, _}) -> +add_member(#'Room'{} = R, + #'Member'{feed_id = #muc{name = Room} = Feed, alias = Alias, phone_id = Roster } = M, + {MsgPayload, _HL}) when is_record(Roster, 'Roster') -> + #'Roster'{id = RosterId, phone = Phone, names = Name, surnames = Surname} = Roster, PhoneId = phone_id(Phone, RosterId), Msg = case MsgPayload of no_muc_message -> []; - _ -> #writer{cache = LastMsg} = add_message(#'Message'{status = [], type = [sys], feed_id = Feed, from = PhoneId, to = Room, - files = [#'Desc'{payload = MsgPayload}]}), LastMsg end, + _ -> + MucMsg = #'Message'{status = [], type = [sys], feed_id = Feed, + from = PhoneId, to = Room, + files = [#'Desc'{payload = MsgPayload}]}, + #writer{cache = LastMsg} = add_message(MucMsg), + LastMsg + end, update_rooms(RosterId, R), subscribe_room(M2 = M#'Member'{phone_id = PhoneId}), subscribe_muc(PhoneId, Feed), {M2#'Member'{alias = member_alias(Alias, Name, Surname)}, Msg, R}; @@ -1126,7 +1149,9 @@ put_readers(TopAction, #'Member'{id = Id, feed_id = #muc{name = RoomId}, phone_i case kvs_stream:load_reader(RId) of #reader{cache = {'Message', RMsgId}} when MsgId > RMsgId -> [WMemberId, Id]; - _ -> Readers end; _ -> Readers end; + _ -> Readers + end; + _ -> Readers end; Res -> ?LOG_INFO("invalid put_readers:~p", [Res]), Readers end, Room2 = case Readers2 of Readers -> Room;_ -> -- GitLab