diff --git a/eqc/ebin/Emakefile b/eqc/ebin/Emakefile new file mode 100644 index 0000000000000000000000000000000000000000..410149bd665c167f854291c25822ccc446296134 --- /dev/null +++ b/eqc/ebin/Emakefile @@ -0,0 +1,2 @@ +{"../*", []}. +{"../../test/*", []}. diff --git a/eqc/ebin/shell b/eqc/ebin/shell new file mode 100644 index 0000000000000000000000000000000000000000..0baadcb075beda29b108c09f1fb2515da9fb61bf --- /dev/null +++ b/eqc/ebin/shell @@ -0,0 +1 @@ +erl -pz ../../_build/test/lib/*/ebin diff --git a/eqc/nynja_eqc.erl b/eqc/nynja_eqc.erl index f391ea97ff1c8d53cc3d151e39d74569829d923a..627f3ecf31b1be68e04b2ff71c6b744b47194014 100644 --- a/eqc/nynja_eqc.erl +++ b/eqc/nynja_eqc.erl @@ -4,7 +4,7 @@ -include_lib("eqc/include/eqc.hrl"). -include_lib("eqc/include/eqc_statem.hrl"). -include_lib("emqttc/include/emqttc_packet.hrl"). --include("roster.hrl"). +-include_lib("roster/include/roster.hrl"). -compile([export_all, nowarn_export_all]). diff --git a/eqc/server_eqc.erl b/eqc/server_eqc.erl index 32b042906bd8d9ad875e8b6a4e0442aa2a1cd3fe..90c314da6250e41dfea3ea8c8261284828c2aace 100644 --- a/eqc/server_eqc.erl +++ b/eqc/server_eqc.erl @@ -10,7 +10,7 @@ -include_lib("eqc/include/eqc_component.hrl"). -import(eqc_statem, [conj/1, tag/2]). -include_lib("emqttc/include/emqttc_packet.hrl"). --include("roster.hrl"). +-include_lib("roster/include/roster.hrl"). -define(mqtt(Payload), #mqtt_packet{payload = Payload}). @@ -19,13 +19,15 @@ -define(message(Handle, From, Msg, Status, Type, Args), ?CALLOUT(mock, message, [Handle, From, Msg, Status, Type, Args], ok)). -define(contact(Handle, PhoneId, Status, Args), ?CALLOUT(mock, contact, [Handle, PhoneId, Status, Args], ok)). -define(message_ack(Handle, Id, Args), ?CALLOUT(mock, message_ack, [Handle, Id, Args], ok)). +-define(error(Err), ?CALLOUT(mock, error, [Err], ok)). %% -- TODO ------------------------------------------------------------------- %% - get_history with MsgId == 0 %% - get_history after leaving room %% - get_history with N == [] -%% - clear messages +%% - #'Link' (roster_link) +%% - search contact %% -- State ------------------------------------------------------------------ @@ -47,9 +49,10 @@ }). -record(message, - { id :: symbolic(msg_id()) - , payload :: list(string()) - , from = 0 :: phone() | sys }). + { id :: symbolic(msg_id()) + , payload :: list(string()) + , visible_to = all :: all | [phone()] + , from = sys :: phone() | sys }). -record(contact, { id :: binary() @@ -71,16 +74,17 @@ , readers :: list(msg_id()) , unread :: nat() }). --record(feed, - { id :: {p2p, phone(), phone()} | {muc, binary()} - , history = [] :: [#message{}] - , seen = #{} :: #{phone() => symbolic(msg_id())} - , write = none :: none | {phone(), symbolic(msg_id())} - , read = none :: none | {phone(), symbolic(msg_id())} - , limit = #{} :: #{phone() => symbolic(msg_id())}}). +-record(feed_info, + { id :: {p2p, phone(), phone()} | {muc, binary()} + , history = [] :: [#message{}] + , seen = #{} :: #{phone() => symbolic(msg_id())} + , last_read = 0 :: symbolic(msg_id()) + , start = 0 :: symbolic(msg_id()) + , limit = #{} :: #{phone() => symbolic(msg_id())}}). -record(room, { id :: binary() + , name = <<>> :: binary() %% set when room is patched otherwise == id , members = [] :: list(phone()) , admins = [] :: list(phone()) }). @@ -88,7 +92,7 @@ { clients = [] :: [#client{}] , users = [] :: [#user{}] , friend_requests = [] :: [{phone(), phone()}] - , feeds = [] :: [#feed{}] + , feeds = [] :: [#feed_info{}] , rooms = [] :: [#room{}] }). @@ -134,28 +138,32 @@ phone_id_phone(S, PhoneId) -> (get_user_by_id(S, PhoneId))#user.phone. add_feed(FeedId, S) -> - Feed = #feed{ id = FeedId }, + Feed = #feed_info{ id = FeedId }, add(#state.feeds, Feed, S). get_feed(S, FeedId) -> - lists:keyfind(FeedId, #feed.id, S#state.feeds). + lists:keyfind(FeedId, #feed_info.id, S#state.feeds). feed_history(S, FeedId) -> - (get_feed(S, FeedId))#feed.history. + (get_feed(S, FeedId))#feed_info.history. feed_history(S, FeedId, Phone) -> Feed = get_feed(S, FeedId), History = - case maps:get(Phone, Feed#feed.limit, 0) of - 0 -> Feed#feed.history; + case maps:get(Phone, Feed#feed_info.limit, 0) of + 0 -> Feed#feed_info.history; MsgId -> - {Hist, Rest} = lists:splitwith(fun(#message{id = Id}) -> Id > MsgId end, Feed#feed.history), + {Hist, Rest} = lists:splitwith(fun(#message{id = Id}) -> Id > MsgId end, Feed#feed_info.history), case Rest of [#message{id = MsgId} = Msg | _] -> Hist ++ [Msg]; _ -> Hist end end, - [ M || M = #message{ payload = P } <- History, P /= deleted ]. + [ M || M <- History, visible_to(Phone, M) ]. + +in_history_limit(S, FeedId, Phone, MsgId) -> + Feed = get_feed(S, FeedId), + MsgId >= maps:get(Phone, Feed#feed_info.limit, 0). feed_last_msg_id(S, FeedId) -> case feed_history(S, FeedId) of @@ -173,16 +181,12 @@ feed_members(S, {muc, RoomId}) -> lists:usort(Room#room.members ++ Room#room.admins). feed_last_seen(S, Phone, FeedId) -> - maps:get(Phone, (get_feed(S, FeedId))#feed.seen, 0). - -is_first_read_of_msg_id(S, FeedId, User, MsgId) -> - #feed{ read = Read, write = Write } = get_feed(S, FeedId), - case {Write, Read} of - {{User, MsgId2}, _} when MsgId > MsgId2 -> true; - {_, {_, MsgId2}} when MsgId > MsgId2 -> true; - {_, none} -> true; - _ -> false - end. + maps:get(Phone, (get_feed(S, FeedId))#feed_info.seen, 0). + +is_first_read_of_msg_id(S, {p2p, _, _} = FeedId, Phone, MsgId) -> + feed_last_seen(S, Phone, FeedId) < MsgId; +is_first_read_of_msg_id(S, FeedId, _Phone, MsgId) -> + MsgId > (get_feed(S, FeedId))#feed_info.last_read. user_feeds(S, Phone) -> User = get_user(S, Phone), @@ -203,78 +207,63 @@ make_feed_id(_S, {muc, Name}) -> {muc, Name}. set_feed(FeedId, NewFeed, S = #state{ feeds = Feeds }) -> - S#state{ feeds = lists:keyreplace(FeedId, #feed.id, Feeds, NewFeed) }. + S#state{ feeds = lists:keyreplace(FeedId, #feed_info.id, Feeds, NewFeed) }. + +visible_to(Phone, #message{ visible_to = Visible }) -> + case Visible of + all -> true; + Phones -> lists:member(Phone, Phones) + end. + +intersect_visibility(all, V) -> V; +intersect_visibility(V, all) -> V; +intersect_visibility(Ps1, Ps2) -> Ps1 -- (Ps1 -- Ps2). feed_see_msg(FeedId, User, MsgId, S) -> - Feed0 = #feed{ seen = Seen } = get_feed(S, FeedId), - set_feed(FeedId, Feed0#feed{ seen = Seen#{ User => MsgId } }, S). - -feed_read_msg(FeedId, User, MsgId, S) -> - Feed0 = #feed{ write = Write, read = Read } = get_feed(S, FeedId), - Write1 = case Write of - none when Read == none -> {User, MsgId}; - {User, MsgIdX} -> {User, max(MsgId, MsgIdX)}; - _ -> Write - end, - Read1 = case {Write1, Read} of - {{User, _}, _} -> Read; - {_, {_, MsgId2}} when MsgId =< MsgId2 -> Read; - _ -> {User, MsgId} - end, - set_feed(FeedId, Feed0#feed{ read = Read1, write = Write1 }, S). + Feed0 = #feed_info{ seen = Seen } = get_feed(S, FeedId), + set_feed(FeedId, Feed0#feed_info{ seen = Seen#{ User => MsgId } }, S). -feed_write_msg(FeedId, User, MsgId, S) -> - Feed0 = #feed{ write = Write, read = Read } = get_feed(S, FeedId), - Feed1 = case Read of - {User, _} -> Feed0#feed{ read = none }; - _ -> Feed0 - end, - Feed2 = case Write of - none -> Feed1; - {User, _} -> Feed1; - _ -> Feed1#feed{ read = Write } - end, - set_feed(FeedId, Feed2#feed{ write = {User, MsgId} }, S). +feed_read_msg(FeedId, MsgId, S) -> + Feed0 = #feed_info{ last_read = LastMsg } = get_feed(S, FeedId), + set_feed(FeedId, Feed0#feed_info{ last_read = max(MsgId, LastMsg) }, S). -feed_add_msg(FeedId, Msg = #message{}, S) -> +feed_write_msg(FeedId, Msg = #message{}, S) -> Feed0 = get_feed(S, FeedId), - set_feed(FeedId, Feed0#feed{ history = [Msg | Feed0#feed.history] }, S). - -feed_edit_msg(FeedId, Msg = #message{ id = MsgId }, S) -> - Feed0 = #feed{ history = Hist0 } = get_feed(S, FeedId), - Hist1 = lists:keyreplace(MsgId, #message.id, Hist0, Msg), - set_feed(FeedId, Feed0#feed{ history = Hist1 }, S). - -feed_delete_all_msgs(FeedId, S) -> - Feed0 = #feed{ history = Hist0 } = get_feed(S, FeedId), - Hist1 = [ case M of - #message{} -> M#message{ payload = deleted }; - _ -> io:format("BADNESS: ~p\n", [Hist0]) - end || M <- Hist0 ], - set_feed(FeedId, Feed0#feed{ history = Hist1 }, S). - -feed_remove_user(FeedId, User, S) -> - Feed0 = get_feed(S, FeedId), - Feed1 = Feed0#feed{ write = case Feed0#feed.write of {User, _} -> none; Other -> Other end, - read = case Feed0#feed.read of {User, _} -> none; Other -> Other end, - limit = maps:remove(User, Feed0#feed.limit), - seen = maps:remove(User, Feed0#feed.seen) }, - set_feed(FeedId, Feed1, S). + set_feed(FeedId, Feed0#feed_info{ history = [Msg | Feed0#feed_info.history] }, S). + +feed_edit_msg(FeedId, #message{ id = MsgId, payload = Payload, visible_to = Vis0 }, S) -> + Feed0 = #feed_info{ history = Hist0 } = get_feed(S, FeedId), + Msg0 = lists:keyfind(MsgId, #message.id, Hist0), + Vis1 = intersect_visibility(Vis0, Msg0#message.visible_to), + Msg1 = Msg0#message{ payload = Payload, visible_to = Vis1}, + Hist1 = lists:keyreplace(MsgId, #message.id, Hist0, Msg1), + set_feed(FeedId, Feed0#feed_info{ history = Hist1 }, S). + +feed_delete_all_msgs(FeedId, Phone, MsgId, S) -> + Feed0 = #feed_info{ history = Hist0 } = get_feed(S, FeedId), + Visibility = + case FeedId of + {muc, _} -> []; + {p2p, P, P} -> []; + {p2p, _, _} -> [p2p_other_party(Phone, FeedId)] + end, + Hist1 = [ M#message{ visible_to = intersect_visibility(Visibility, M#message.visible_to) } + || M <- Hist0 ], + set_feed(FeedId, Feed0#feed_info{ history = Hist1, start = MsgId }, S). feed_set_history_limit(FeedId, User, HL, S) -> - Feed0 = get_feed(S, FeedId), - History = [ M || M = #message{ payload = Payload } <- Feed0#feed.history, - Payload /= deleted ], - Feed1 = case maps:is_key(User, Feed0#feed.limit) of - true -> Feed0; - false -> - Ix = case HL of [] -> 1; [N] -> N end, - Lim = if Ix =< length(History) -> - (lists:nth(Ix, History))#message.id; - true -> 0 - end, - Feed0#feed{ limit = (Feed0#feed.limit)#{ User => Lim } } + Feed0 = #feed_info{ seen = Seen0, limit = Limit0 } = get_feed(S, FeedId), + History = [ M || M <- Feed0#feed_info.history, visible_to(User, M) ], + Ix = case HL of [] -> 1; [N] -> N end, + Lim = if Ix =< length(History) -> + (lists:nth(Ix, History))#message.id; + true -> Feed0#feed_info.start + end, + Seen1 = case maps:get(User, Seen0, 0) < Lim of + true -> maps:remove(User, Seen0); + false -> Seen0 end, + Feed1 = Feed0#feed_info{ limit = Limit0#{ User => Lim }, seen = Seen1 }, set_feed(FeedId, Feed1, S). on_user(Phone, S, Fun) -> @@ -285,6 +274,10 @@ is_friend(_, Phone, Phone) -> true; is_friend(S, Phone1, Phone2) -> maps:is_key(Phone2, (get_user(S, Phone1))#user.friends). +is_admin(_, {p2p, A, B}, Phone) -> lists:member(Phone, [A, B]); +is_admin(S, {muc, RoomId}, Phone) -> + lists:member(Phone, (get_room(S, RoomId))#room.admins). + add_friend1(Phone1, Phone2, S) -> on_user(Phone1, S, fun(U) -> U#user{ friends = (U#user.friends)#{ Phone2 => true } } end). @@ -292,7 +285,7 @@ add_friend(Phone1, Phone2, S) -> Feed = p2p_feed(Phone1, Phone2), add_friend1(Phone1, Phone2, add_friend1(Phone2, Phone1, - add(#state.feeds, #feed{id = Feed, history = []}, S))). + add(#state.feeds, #feed_info{id = Feed, history = []}, S))). is_room(S, RoomId) -> lists:keymember(RoomId, #room.id, S#state.rooms). @@ -300,6 +293,18 @@ is_room(S, RoomId) -> get_room(S, Id) -> lists:keyfind(Id, #room.id, S#state.rooms). +get_room_id_by_name(S, Name) -> + case get_room(S, Name) of + false -> (lists:keyfind(Name, #room.name, S#state.rooms))#room.id; + #room{} -> Name + end. + +room_name(S, RoomId) -> + case get_room(S, RoomId) of + #room{ name = <<>> } -> RoomId; + #room{ name = Name } -> Name + end. + room_members(S, Id) -> #room{ members = Ms, admins = As } = get_room(S, Id), Ms ++ As. @@ -324,6 +329,11 @@ del_room_member(RoomId, Member, Role, S) -> end, S#state{ rooms = lists:keyreplace(RoomId, #room.id, S#state.rooms, Room1) }. +rename_room(OldRoom, NewRoom, S) -> + Room0 = get_room(S, OldRoom), + Room1 = Room0#room{ name = NewRoom }, + S#state{ rooms = lists:keyreplace(OldRoom, #room.id, S#state.rooms, Room1) }. + %% -- Generators ------------------------------------------------------------- gen_phone() -> noshrink(choose(1, 1000)). @@ -361,9 +371,16 @@ gen_message_id(S, FeedId) -> gen_unread_msg_id(S, Phone, FeedId) -> Feed = get_feed(S, FeedId), - LastSeen = maps:get(Phone, Feed#feed.seen, 0), + LastSeen = maps:get(Phone, Feed#feed_info.seen, 0), elements([ Id || #message{id = Id} <- feed_history(S, FeedId), Id > LastSeen ]). +gen_room_patch() -> + ?LET(PTs, eqc_gen:sublist([room, avatar]), + [{PT, gen_string(atom_to_list(PT))} || PT <- PTs]). + +gen_admin_type() -> + weighted_default({9, admin}, {1, any}). + %% -- Operations ------------------------------------------------------------- %% --- register_user --- @@ -380,7 +397,7 @@ register(Phone) -> register_next(S, V, [Phone]) -> add(#state.users, #user{ phone = Phone, phone_id = V }, - add(#state.feeds, #feed{ id = {p2p, Phone, Phone} }, S)). + add(#state.feeds, #feed_info{ id = {p2p, Phone, Phone} }, S)). %% --- connect --- @@ -442,7 +459,8 @@ check_friend(S, Phone, #contact{ id = FriendId, reader = [_, Reader], unread = Unread }) -> check_feed(S, Phone, p2p_feed(Phone, phone_id_phone(S, FriendId)), Reader, Unread). -check_room(S, Phone, #room_info{ name = Room, unread = Unread, admins = As, members = Ms }) -> +check_room(S, Phone, #room_info{ name = Room0, unread = Unread, admins = As, members = Ms }) -> + Room = get_room_id_by_name(S, Room0), #member{ reader = Reader } = lists:keyfind(user_phone_id(S, Phone), #member.phone_id, As ++ Ms), check_feed(S, Phone, {muc, Room}, Reader, Unread). @@ -625,9 +643,9 @@ send_message_callouts(S, [Handle, Feed, _FeedId, Message, Ack]) -> editable_messages(S) -> [ {Handle, Feed, MsgId} || #client{ handle = Handle, phone = Phone } <- S#state.clients, - #feed{ id = Feed, history = Msgs } <- S#state.feeds, - #message{ id = MsgId, from = Phone1, payload = Payload } <- Msgs, - Phone == Phone1, Payload /= deleted ]. + #feed_info{ id = Feed, history = Msgs } <- S#state.feeds, + #message{ id = MsgId, from = Phone1 } = Msg <- Msgs, + Phone == Phone1, visible_to(Phone, Msg) ]. edit_message_pre(S) -> [] /= editable_messages(S). @@ -690,17 +708,30 @@ delete_message_callouts(S, [Handle, Feed, _FeedId, MsgId0, Ack]) -> %% --- get_history --- -clients_with_nonempty_feeds(S) -> +clients_with_nonempty_feeds(S, Type) -> [ {Handle, Feed} || #client{ handle = Handle, phone = Phone } <- S#state.clients, - Feed <- user_feeds(S, Phone), [] /= feed_history(S, Feed) ]. + Feed <- user_feeds(S, Phone), + [] /= feed_history(S, Feed), + Type == any orelse (Type == admin) == is_admin(S, Feed, Phone) ]. + +clients_with_nonempty_feeds(S) -> + clients_with_nonempty_feeds(S, any). get_history_pre(S) -> [] /= clients_with_nonempty_feeds(S). +%% TODO: Test with MsgId = 0 and Count = []/0 +%% - MsgId = 0 means +%% * start from top if Count < 0 +%% * start from last seen or bottom if Count > 0 +%% +%% - Count = []/0 means +%% return "all history", possibly this means including edit +%% and delete "placeholders" get_history_args(S) -> ?LET({Handle, Feed}, elements(clients_with_nonempty_feeds(S)), - ?LET(MsgId, gen_message_id(S, Feed), %% TODO: also test with MsgId == 0 - [Handle, Feed, make_feed_id(S, Feed), MsgId, ?SUCHTHAT(N, int(), N < 0)])). + ?LET(MsgId, gen_message_id(S, Feed), + [Handle, Feed, make_feed_id(S, Feed), MsgId, ?SUCHTHAT(N, int(), N /= 0)])). get_history_pre(S, [Handle, Feed, _FeedId, MsgId, _Count]) -> lists:member({Handle, Feed}, clients_with_nonempty_feeds(S)) andalso @@ -711,24 +742,36 @@ get_history(Handle, _Feed, FeedId, MsgId, Count) -> get_history_post(S, [Handle, Feed, _FeedId, MsgId, Count], V) -> Phone = client_phone(S, Handle), - ExpMsgList0 = - case slice(MsgId, #message.id, -Count, feed_history(S, Feed, Phone)) of - Slice = [#message{id = MsgId} | _] -> Slice; - Slice -> - M = lists:keyfind(MsgId, #message.id, feed_history(S, Feed)), - [M#message{payload = []} | Slice] + Expect = + case in_history_limit(S, Feed, Phone, MsgId) of + false -> {error, invalid_data}; + true -> + Slice1 = + case slice(MsgId, #message.id, -Count, feed_history(S, Feed, Phone)) of + Slice0 = [#message{id = MsgId} | _] -> Slice0; + Slice0 -> + M = lists:keyfind(MsgId, #message.id, feed_history(S, Feed)), + [M#message{payload = []} | Slice0] + end, + Slice2 = + case Count < 0 of + true -> Slice1; + false -> lists:reverse(Slice1) + end, + [ M#message{ from = try user_phone_id(S, P) catch _:_ -> P end, + visible_to = all } + || M = #message{ from = P } <- Slice2 ] end, - ExpMsgList = [ M#message{ from = try user_phone_id(S, P) catch _:_ -> P end } - || M = #message{ from = P } <- ExpMsgList0 ], - eq(V, ExpMsgList). + eq(V, Expect). %% --- delete_history --- -delete_history_pre(S) -> [] /= clients_with_nonempty_feeds(S). +delete_history_pre(S) -> [] /= clients_with_nonempty_feeds(S, admin). delete_history_args(S) -> - ?LET({Handle, Feed}, elements(clients_with_nonempty_feeds(S)), - [Handle, Feed, make_feed_id(S, Feed)]). + ?LET(Type, gen_admin_type(), + ?LET({Handle, Feed}, elements(clients_with_nonempty_feeds(S, Type)), + [Handle, Feed, make_feed_id(S, Feed)])). delete_history_pre(S, [Handle, Feed, _FeedId]) -> lists:member({Handle, Feed}, clients_with_nonempty_feeds(S)). @@ -740,12 +783,22 @@ delete_history_callouts(S, [Handle, Feed, _FeedId]) -> Phone1 = client_phone(S, Handle), Phones = feed_members(S, Feed), HistoryMessage = <<"History was removed">>, - ?PAR([ ?CALLOUTS( - ?MATCH({MsgId, ok}, ?message(Handle, ?VAR, [HistoryMessage], '_', [sys], '_')), - ?APPLY(feed_delete_all_msgs, [Feed]), - ?APPLY(feed_write_msg, [Feed, Phone1, sys, MsgId, [HistoryMessage]]) )] ++ - [ ?message(H1, '_', [HistoryMessage], '_', [sys], '_') - || Phone <- Phones, H1 <- get_clients(S, Phone), H1 /= Handle ]). + Visibility = case Feed of + {muc, _} -> all; + {p2p, _, _} -> [Phone1] + end, + case is_admin(S, Feed, Phone1) of + false -> ?error(permission_denied); + true -> + ?PAR([ ?CALLOUTS( + ?MATCH({MsgId, ok}, ?message(Handle, ?VAR, [HistoryMessage], '_', [sys], '_')), + ?APPLY(feed_delete_all_msgs, [Feed, MsgId, Phone1]), + ?APPLY(feed_write_msg, [Feed, Phone1, #message{id = MsgId, visible_to = Visibility, + payload = [HistoryMessage]}]), + ?APPLY(set_history_limit_on_delete, [Feed, Phone1, MsgId]) )] ++ + [ ?message(H1, '_', [HistoryMessage], '_', [sys], '_') + || Phone <- Phones, H1 <- get_clients(S, Phone), H1 /= Handle ]) + end. %% --- read_message --- @@ -782,7 +835,7 @@ read_message_callouts(S, [Handle, Feed, _FeedId, MsgId]) -> read_message_next(S, _V, [Handle, Feed, _FeedId, MsgId]) -> Phone = client_phone(S, Handle), feed_see_msg(Feed, Phone, MsgId, - feed_read_msg(Feed, Phone, MsgId, S)). + feed_read_msg(Feed, MsgId, S)). %% --- create_room --- @@ -823,6 +876,56 @@ create_room_callouts(S, [Handle, Members0, Room]) -> || Phone2 <- Members, Phone2 /= Phone1, H1 <- get_clients(S, Phone2) ]). + +%% --- patch_room --- + +patchable_rooms(S) -> + [ {Handle, Room} + || #client{ handle = Handle, phone = Phone } <- S#state.clients, + #room{ id = Room, admins = As } <- S#state.rooms, + lists:member(Phone, As) ]. + +patch_room_pre(S) -> + [] /= patchable_rooms(S). + +patch_room_args(S) -> + ?LET({Handle, Room}, elements(patchable_rooms(S)), + [Handle, Room, gen_room_patch()]). + +patch_room_pre(S, [Handle, Room, _Patch]) -> + lists:member({Handle, Room}, patchable_rooms(S)). + +patch_room(Handle, Room, Patch) -> + call_client(Handle, {patch_room, room_name(Room), Patch}), timer:sleep(100). + +patch_room_callouts(S, [Handle, RoomId, Patch]) -> + FeedId = {muc, RoomId}, + Room = room_name(S, RoomId), + Phone1 = client_phone(S, Handle), + Phones = feed_members(S, FeedId), + NewRoom = proplists:get_value(room, Patch, Room), + Renamed = NewRoom /= Room, + NewAvatar = lists:keymember(avatar, 1, Patch), + AvatarMsg = [<<"Group avatar is updated">>], + NameMsg = [<<"Group is renamed to \"", NewRoom/binary, "\"">>], + ?PAR([ ?CALLOUTS( + ?MATCH({MsgId, ok}, ?message(Handle, ?VAR, NameMsg, [], [sys], '_')), + ?APPLY(feed_write_msg, [FeedId, Phone1, sys, MsgId, NameMsg]), + ?APPLY(rename_room, [RoomId, NewRoom]) + ) || Renamed] ++ + [ ?CALLOUTS( + ?MATCH({MsgId, ok}, ?message(Handle, ?VAR, AvatarMsg, [], [sys], '_')), + ?APPLY(feed_write_msg, [FeedId, Phone1, sys, MsgId, AvatarMsg]) + ) || NewAvatar] ++ + [ ?message(H, '_', NameMsg, [], [sys], '_') + || Renamed, Phone <- Phones, H <- get_clients(S, Phone), H /= Handle ] ++ + [ ?message(H, '_', AvatarMsg, [], [sys], '_') + || NewAvatar, Phone <- Phones, H <- get_clients(S, Phone), H /= Handle ] ++ + [ ?room(H, NewRoom, patch, '_', '_', '_') + || Phone <- Phones, H <- get_clients(S, Phone) ] + ). + + %% --- join_room --- joinable_clients(S) -> @@ -843,13 +946,14 @@ join_room_pre(S, [Handle, Room]) -> join_room(Handle, Room) -> call_client(Handle, {join_room, room_name(Room)}). -join_room_callouts(S, [Handle, Room]) -> +join_room_callouts(S, [Handle, RoomId]) -> + FeedId = {muc, RoomId}, + Room = room_name(S, RoomId), Phone1 = client_phone(S, Handle), - Phones = [Phone1 | feed_members(S, {muc, Room})], - FeedId = {muc, Room}, + Phones = [Phone1 | feed_members(S, FeedId)], ?PAR([ ?CALLOUTS( ?MATCH({MsgId, Msg, ok}, ?room(Handle, Room, join, ?VAR, ?VAR, '_')), - ?APPLY(add_room_member, [Room, Phone1, member]), + ?APPLY(add_room_member, [RoomId, Phone1, member]), ?APPLY(feed_write_msg, [FeedId, Phone1, sys, MsgId, Msg]), ?APPLY(feed_set_history_limit, [FeedId, Phone1, []]) )] ++ [ ?room(H1, Room, join, '_', '_', '_') || Phone <- Phones, H1 <- get_clients(S, Phone), H1 /= Handle ]). @@ -876,39 +980,36 @@ leave_room_pre(S, [Handle, Handle2, Room, Role]) -> leave_room(Handle, _Handle2, Room, Role) -> call_client(Handle, {leave_room, room_name(Room), Role}). -leave_room_callouts(S, [Handle, Handle2, Room, Role]) -> +leave_room_callouts(S, [Handle, Handle2, RoomId, Role]) -> + FeedId = {muc, RoomId}, + Room = room_name(S, RoomId), Phone1 = client_phone(S, Handle), - Phones = feed_members(S, {muc, Room}), - FeedId = {muc, Room}, + Phones = feed_members(S, FeedId), ?PAR([ ?room(H1, Room, leave, '_', '_', '_') || Phone <- Phones, H1 <- get_clients(S, Phone) ] ++ [ ?CALLOUTS( ?MATCH({MsgId, Msg, ok}, ?message(Handle2, ?VAR, ?VAR, '_', [sys], '_')), ?APPLY(feed_add_msg, [FeedId, sys, MsgId, Msg]), - ?APPLY(remove_member, [Room, Phone1, Role]), - %% NOTE: When leaving a room this history limit is cleared, but - %% when being removed it is not. This also clears the reader, - %% which will cause read notifications to be sent for - %% messages read by this user. This is also what the server - %% does. - ?APPLY(feed_remove_user, [FeedId, Phone1]) )] ++ + ?APPLY(remove_member, [RoomId, Phone1, Role]) )] ++ [ ?message(H1, '_', '_', '_', [sys], '_') || Phone <- Phones, Phone /= Phone1, H1 <- get_clients(S, Phone), H1 /= Handle2 ]). %% --- add_to_room --- -possible_new_members(S) -> +possible_new_members(S) -> possible_new_members(S, any). +possible_new_members(S, Type) -> [ {Handle, Room, Member} || #client{ handle = Handle, phone = Phone } <- S#state.clients, #room{ id = Room, admins = As, members = Ms } <- S#state.rooms, - lists:member(Phone, As), + lists:member(Phone, As ++ [ M || Type == any, M <- Ms ]), #user{ phone = P, phone_id = Member } <- S#state.users, not lists:member(P, Ms ++ As) ]. add_to_room_pre(S) -> - [] /= possible_new_members(S). + [] /= possible_new_members(S, admin). add_to_room_args(S) -> - ?LET({Handle, Room, Member}, elements(possible_new_members(S)), - [Handle, Room, Member, elements([member, admin]), gen_history_limit()]). + ?LET(Type, gen_admin_type(), + ?LET({Handle, Room, Member}, elements(possible_new_members(S, Type)), + [Handle, Room, Member, elements([member, admin]), gen_history_limit()])). add_to_room_pre(S, [Handle, Room, Member, _Role, _HL]) -> lists:member({Handle, Room, Member}, possible_new_members(S)). @@ -916,34 +1017,41 @@ add_to_room_pre(S, [Handle, Room, Member, _Role, _HL]) -> add_to_room(Handle, Room, Member, Role, HL) -> call_client(Handle, {add_to_room, room_name(Room), Member, Role, HL}). -add_to_room_callouts(S, [Handle, Room, MemberId, Role, HL]) -> +add_to_room_callouts(S, [Handle, RoomId, MemberId, Role, HL]) -> + FeedId = {muc, RoomId}, + Room = room_name(S, RoomId), Phone1 = client_phone(S, Handle), Member = phone_id_phone(S, MemberId), - FeedId = {muc, Room}, Phones = [Member | feed_members(S, FeedId)], - ?PAR([ ?CALLOUTS( - ?MATCH({MsgId, Msg, ok}, ?room(Handle, Room, add, ?VAR, ?VAR, '_')), - ?APPLY(add_room_member, [Room, Member, Role]), - ?APPLY(feed_write_msg, [FeedId, Phone1, sys, MsgId, Msg]), - ?APPLY(feed_set_history_limit, [FeedId, Member, HL]) )] ++ - [ ?room(H1, Room, add, '_', '_', '_') || Phone <- Phones, H1 <- get_clients(S, Phone), H1 /= Handle ]). + case is_admin(S, FeedId, Phone1) of + false -> ?error(permission_denied); + true -> + ?PAR([ ?CALLOUTS( + ?MATCH({MsgId, Msg, ok}, ?room(Handle, Room, add, ?VAR, ?VAR, '_')), + ?APPLY(add_room_member, [RoomId, Member, Role]), + ?APPLY(feed_write_msg, [FeedId, Phone1, sys, MsgId, Msg]), + ?APPLY(feed_set_history_limit, [FeedId, Member, HL]) )] ++ + [ ?room(H1, Room, add, '_', '_', '_') || Phone <- Phones, H1 <- get_clients(S, Phone), H1 /= Handle ]) + end. %% --- remove_from_room --- -removable_members(S) -> +removable_members(S) -> removable_members(S, any). +removable_members(S, Type) -> [ {Handle, Room, Member, case lists:member(P, As) of true -> admin; false -> member end} || #client{ handle = Handle, phone = Phone } <- S#state.clients, #room{ id = Room, admins = As, members = Ms } <- S#state.rooms, - lists:member(Phone, As), + lists:member(Phone, As ++ [ M || Type == any, M <- Ms ]), #user{ phone = P, phone_id = Member } <- S#state.users, P /= Phone, lists:member(P, Ms ++ As) ]. remove_from_room_pre(S) -> - [] /= removable_members(S). + [] /= removable_members(S, admin). remove_from_room_args(S) -> - ?LET({Handle, Room, Member, Role}, elements(removable_members(S)), - [Handle, Room, Member, Role]). + ?LET(Type, gen_admin_type(), + ?LET({Handle, Room, Member, Role}, elements(removable_members(S, Type)), + [Handle, Room, Member, Role])). remove_from_room_pre(S, [Handle, Room, Member, Role]) -> lists:member({Handle, Room, Member, Role}, removable_members(S)). @@ -951,37 +1059,45 @@ remove_from_room_pre(S, [Handle, Room, Member, Role]) -> remove_from_room(Handle, Room, Member, Role) -> call_client(Handle, {remove_from_room, room_name(Room), Member, Role}). -remove_from_room_callouts(S, [Handle, Room, MemberId, Role]) -> +remove_from_room_callouts(S, [Handle, RoomId, MemberId, Role]) -> + FeedId = {muc, RoomId}, + Room = room_name(S, RoomId), Phone1 = client_phone(S, Handle), Member = phone_id_phone(S, MemberId), - FeedId = {muc, Room}, Phones = feed_members(S, FeedId), - ?PAR([ ?CALLOUTS( - ?MATCH({MsgId, Msg, ok}, ?message(Handle, ?VAR, ?VAR, '_', [sys], '_')), - ?APPLY(feed_write_msg, [FeedId, Phone1, sys, MsgId, Msg]), - ?APPLY(remove_member, [Room, Member, Role]))] ++ - [ ?room(H1, Room, remove, '_', '_', '_') || Phone <- Phones, H1 <- get_clients(S, Phone) ] ++ - [ ?message(H1, '_', '_', '_', [sys], '_') || Phone <- Phones, H1 <- get_clients(S, Phone), H1 /= Handle ]). + case is_admin(S, FeedId, Phone1) of + false -> ?error(permission_denied); + true -> + ?PAR([ ?CALLOUTS( + ?MATCH({MsgId, Msg, ok}, ?message(Handle, ?VAR, ?VAR, '_', [sys], '_')), + ?APPLY(feed_write_msg, [FeedId, Phone1, sys, MsgId, Msg]), + ?APPLY(remove_member, [RoomId, Member, Role]))] ++ + [ ?room(H1, Room, remove, '_', '_', '_') || Phone <- Phones, H1 <- get_clients(S, Phone) ] ++ + [ ?message(H1, '_', '_', '_', [sys], '_') || Phone <- Phones, H1 <- get_clients(S, Phone), H1 /= Handle ]) + end. remove_member_callouts(_S, [Room, Member, Role]) -> - ?APPLY(del_room_member, [Room, Member, Role]), - ?APPLY(feed_set_history_limit, [{muc, Room}, Member, [infinity]]). + ?APPLY(del_room_member, [Room, Member, Role]). %% --- promote_member --- promotable_members(S) -> + promotable_members(S, any). + +promotable_members(S, Type) -> [ {Handle, Room, Member} || #client{ handle = Handle, phone = Phone } <- S#state.clients, #room{ id = Room, admins = As, members = Ms } <- S#state.rooms, - lists:member(Phone, As), + lists:member(Phone, As ++ [ M || Type == any, M <- Ms ]), #user{ phone = P, phone_id = Member } <- S#state.users, P /= Phone, lists:member(P, Ms) ]. promote_member_pre(S) -> - [] /= promotable_members(S). + [] /= promotable_members(S, admin). promote_member_args(S) -> - ?LET({Handle, Room, Member}, elements(promotable_members(S)), - [Handle, Room, Member]). + ?LET(Type, gen_admin_type(), + ?LET({Handle, Room, Member}, elements(promotable_members(S, Type)), + [Handle, Room, Member])). promote_member_pre(S, [Handle, Room, Member]) -> lists:member({Handle, Room, Member}, promotable_members(S)). @@ -989,11 +1105,17 @@ promote_member_pre(S, [Handle, Room, Member]) -> promote_member(Handle, Room, Member) -> call_client(Handle, {promote_member, room_name(Room), Member}). -promote_member_callouts(S, [_Handle, Room, _Member]) -> - Phones = feed_members(S, {muc, Room}), - ?PAR([ ?room(H1, Room, add, '_', '_', '_') || Phone <- Phones, H1 <- get_clients(S, Phone) ]). +promote_member_callouts(S, [Handle, RoomId, Member]) -> + Phones = feed_members(S, {muc, RoomId}), + Room = room_name(S, RoomId), + case is_admin(S, {muc, RoomId}, client_phone(S, Handle)) of + false -> ?error(permission_denied); + true -> + ?PAR([ ?room(H1, Room, add, '_', '_', '_') || Phone <- Phones, H1 <- get_clients(S, Phone) ]), + ?APPLY(do_promote_member, [RoomId, Member]) + end. -promote_member_next(S, _V, [_Handle, Room, MemberId]) -> +do_promote_member_next(S, _V, [Room, MemberId]) -> Member = phone_id_phone(S, MemberId), Room0 = get_room(S, Room), Room1 = Room0#room{ members = Room0#room.members -- [Member], @@ -1002,19 +1124,21 @@ promote_member_next(S, _V, [_Handle, Room, MemberId]) -> %% --- demote_member --- -demotable_members(S) -> +demotable_members(S) -> demotable_members(S, any). +demotable_members(S, Type) -> [ {Handle, Room, Member} || #client{ handle = Handle, phone = Phone } <- S#state.clients, - #room{ id = Room, admins = As } <- S#state.rooms, - lists:member(Phone, As), + #room{ id = Room, admins = As, members = Ms } <- S#state.rooms, + lists:member(Phone, As ++ [ M || Type == any, M <- Ms ]), #user{ phone = P, phone_id = Member } <- S#state.users, P /= Phone, lists:member(P, As) ]. demote_member_pre(S) -> - [] /= demotable_members(S). + [] /= demotable_members(S, admin). demote_member_args(S) -> - ?LET({Handle, Room, Member}, elements(demotable_members(S)), - [Handle, Room, Member]). + ?LET(Type, gen_admin_type(), + ?LET({Handle, Room, Member}, elements(demotable_members(S, Type)), + [Handle, Room, Member])). demote_member_pre(S, [Handle, Room, Member]) -> lists:member({Handle, Room, Member}, demotable_members(S)). @@ -1022,11 +1146,17 @@ demote_member_pre(S, [Handle, Room, Member]) -> demote_member(Handle, Room, Member) -> call_client(Handle, {demote_member, room_name(Room), Member}). -demote_member_callouts(S, [_Handle, Room, _Member]) -> - Phones = feed_members(S, {muc, Room}), - ?PAR([ ?room(H1, Room, add, '_', '_', '_') || Phone <- Phones, H1 <- get_clients(S, Phone) ]). +demote_member_callouts(S, [Handle, RoomId, Member]) -> + Phones = feed_members(S, {muc, RoomId}), + Room = room_name(S, RoomId), + case is_admin(S, {muc, RoomId}, client_phone(S, Handle)) of + false -> ?error(permission_denied); + true -> + ?PAR([ ?room(H1, Room, add, '_', '_', '_') || Phone <- Phones, H1 <- get_clients(S, Phone) ]), + ?APPLY(do_demote_member, [RoomId, Member]) + end. -demote_member_next(S, _V, [_Handle, Room, MemberId]) -> +do_demote_member_next(S, _V, [Room, MemberId]) -> Member = phone_id_phone(S, MemberId), Room0 = get_room(S, Room), Room1 = Room0#room{ members = Room0#room.members ++ [Member], @@ -1034,6 +1164,38 @@ demote_member_next(S, _V, [_Handle, Room, MemberId]) -> S#state{ rooms = lists:keyreplace(Room, #room.id, S#state.rooms, Room1) }. +%% --- delete_room --- + +deleteable_rooms(S) -> + [ Room || #room{ id = Room } <- S#state.rooms ]. + +delete_room_pre(S) -> + [] /= deleteable_rooms(S). + +delete_room_args(S) -> + [elements(deleteable_rooms(S))]. + +delete_room_pre(S, [Room]) -> + lists:member(Room, deleteable_rooms(S)). + +delete_room(Room) -> + nynja:delete_room(room_name(Room)), timer:sleep(50), ok. + +delete_room_callouts(S, [RoomId]) -> + FeedId = {muc, RoomId}, + Phones = feed_members(S, FeedId), + Room = room_name(S, RoomId), + %% Can only delete if there are no messages + NoMsg = [] == [ x || #message{ from = F } <- feed_history(S, FeedId), F /= sys ], + ?PAR([?room(H, Room, delete, -1, [], '_') + || NoMsg, Phone <- Phones, H <- get_clients(S, Phone) ] ++ + [?APPLY(do_delete_room, [RoomId]) || NoMsg]). + +do_delete_room_next(S, _V, [RoomId]) -> + Room0 = get_room(S, RoomId), + Room1 = Room0#room{ members = [], admins = [] }, + S#state{ rooms = lists:keyreplace(RoomId, #room.id, S#state.rooms, Room1) }. + %% -- Callouts -------------------------------------------------------------- add_room_next(S, _V, [RoomId, Admins, Members]) -> @@ -1053,30 +1215,38 @@ feed_set_history_limit_next(S, _V, [FeedId, User, HL]) -> feed_edit_msg_next(S, _V, [FeedId, Phone, MsgId0, MsgId, Payload]) -> feed_see_msg(FeedId, Phone, MsgId, - feed_write_msg(FeedId, Phone, MsgId, - feed_edit_msg(FeedId, #message{id = MsgId0, payload = Payload, from = Phone}, S))). + feed_edit_msg(FeedId, #message{id = MsgId0, payload = Payload, from = Phone}, S)). feed_delete_msg_next(S, _V, [FeedId, Phone, MsgId0, MsgId]) -> feed_see_msg(FeedId, Phone, MsgId, - feed_write_msg(FeedId, Phone, MsgId, - feed_edit_msg(FeedId, #message{id = MsgId0, payload = deleted, from = Phone}, S))). + feed_edit_msg(FeedId, #message{id = MsgId0, payload = deleted, visible_to = [], from = Phone}, S)). -feed_delete_all_msgs_next(S, _V, [FeedId]) -> - feed_delete_all_msgs(FeedId, S). +feed_delete_all_msgs_next(S, _V, [FeedId, MsgId, Phone]) -> + feed_delete_all_msgs(FeedId, Phone, MsgId, S). + +set_history_limit_on_delete_next(S, _V, [FeedId, Phone, MsgId]) -> + Feed = #feed_info{ limit = Limit } = get_feed(S, FeedId), + Phones = + case FeedId of + {p2p, _, _} -> [Phone]; + {muc, _} -> feed_members(S, FeedId) + end, + Limit1 = maps:merge(Limit, maps:from_list([{P, MsgId} || P <- Phones])), + set_feed(FeedId, Feed#feed_info{ limit = Limit1 }, S). feed_write_msg_next(S, V, [FeedId, Phone, MsgId, Payload]) -> feed_write_msg_next(S, V, [FeedId, Phone, Phone, MsgId, Payload]); -feed_write_msg_next(S, _V, [FeedId, Phone, From, MsgId, Payload]) -> +feed_write_msg_next(S, V, [FeedId, Phone, From, MsgId, Payload]) -> + feed_write_msg_next(S, V, [FeedId, Phone, #message{id = MsgId, payload = Payload, from = From}]); +feed_write_msg_next(S, _V, [FeedId, Phone, Msg = #message{id = MsgId}]) -> feed_see_msg(FeedId, Phone, MsgId, - feed_add_msg(FeedId, #message{id = MsgId, payload = Payload, from = From}, - feed_write_msg(FeedId, Phone, MsgId, S))). + feed_write_msg(FeedId, Msg, S)). feed_add_msg_next(S, _V, [FeedId, Phone, MsgId, Payload]) -> - feed_add_msg(FeedId, #message{id = MsgId, payload = Payload, from = Phone}, S). - -feed_remove_user_next(S, _V, [FeedId, Phone]) -> - feed_remove_user(FeedId, Phone, S). + feed_write_msg(FeedId, #message{id = MsgId, payload = Payload, from = Phone}, S). +rename_room_next(S, _V, [OldRoom, NewRoom]) -> + rename_room(OldRoom, NewRoom, S). %% -- Common ----------------------------------------------------------------- @@ -1089,7 +1259,7 @@ weight(_, disconnect) -> 1; weight(_, get_profile) -> 1; weight(_, get_room_info) -> 1; weight(_, get_history) -> 2; -weight(_, delete_history) -> 0; %% TODO: not quite working yet +weight(_, delete_history) -> 1; weight(_, accept_friend_request) -> 10; weight(_, send_message) -> 7; weight(_, edit_message) -> 3; @@ -1097,6 +1267,8 @@ weight(_, delete_message) -> 3; weight(_, send_friend_request) -> 4; weight(_, read_message) -> 7; weight(_, create_room) -> 2; +weight(_, patch_room) -> 2; +weight(S, delete_room) -> case length(S#state.rooms) > 1 of true -> 1; false -> 0 end; weight(_, join_room) -> 4; weight(_, add_to_room) -> 5; weight(_, remove_from_room) -> 3; @@ -1143,11 +1315,18 @@ handle_connection(Parent, User) -> catch mock:message_ack(self(), Id, #{prev => Prev, feed => Feed}), handle_connection(Parent, User); {Pid, ?mqtt(#'Room'{name = Name0, status = Status, last_msg = LastMsg, admins = _As, members = _Ms})} -> - [Name, _] = binary:split(Name0, <<"-">>), + Name = + case Name0 of + [] -> <<"no name">>; + _ -> hd(binary:split(Name0, <<"-">>)) + end, catch mock:room(self(), Name, Status, to_msg_id(LastMsg), to_msg_payload(LastMsg), #{ }), handle_connection(Parent, User); {Pid, ?mqtt(#io{ code = {ok, _}, data = {ok, _} })} -> handle_connection(Parent, User); + {Pid, ?mqtt(#io{ code = {error, Err}})} -> + catch mock:error(Err), + handle_connection(Parent, User); {Pid, ?mqtt(Payload)} -> catch mock:unexpected(self(), Payload), handle_connection(Parent, User); @@ -1218,8 +1397,10 @@ handle_call(User, {read_msg, FeedId, MsgId}) -> async_send(User, ReadMsg); handle_call(User, {get_history, Feed, MsgId, Count}) -> - ?mqtt(#'History'{data = Messages}) = nynja:get_messages(User, Feed, MsgId, Count), - [ to_message(Msg) || Msg <- Messages ]; + case nynja:get_messages(User, Feed, MsgId, Count) of + ?mqtt(#'History'{data = Messages}) -> [ to_message(Msg) || Msg <- Messages ]; + ?mqtt(#io{code = Error}) -> Error + end; handle_call(User, {delete_history, FeedId}) -> async_send(User, nynja:delete_hist(FeedId)); @@ -1233,6 +1414,16 @@ handle_call(User, {create_room, Members, RoomName}) -> type = group, tos = <<"EQC FTW!">>, status = create }, async_send(User, Room); +handle_call(User, {patch_room, RoomName, Patch}) -> + RoomId = nynja:typed_uuid(<<"room">>, RoomName), + Data = case proplists:get_value(avatar, Patch, []) of + [] -> []; + Av -> [#'Desc'{ id = Av, mime = <<"image">>, payload = Av }] + end, + NewName = proplists:get_value(room, Patch, []), + Room = #'Room'{ id = RoomId, name = NewName, data = Data, status = patch }, + async_send(User, Room); + handle_call(User, {join_room, RoomName}) -> Room = singleton_room(RoomName, User, member, join), async_send(User, Room); @@ -1271,7 +1462,7 @@ prop_ok() -> with_parameter(default_process, worker, with_parameter(color, true, ?FORALL(Cmds, more_commands(5, commands(?MODULE)), - ?SOMETIMES(5, + ?SOMETIMES(2, begin next_prefix(), HSR={H, S, Res} = run_commands(?MODULE, Cmds), @@ -1299,24 +1490,11 @@ cleanup(#state{ clients = Clients }) -> %% same phone numbers. To avoid this problem we prefix phone numbers by a %% global prefix that should change between each test run. --define(prefix_table, server_eqc_prefix). - get_prefix() -> - case ets:info(?prefix_table) of - undefined -> - ets:new(?prefix_table, [named_table, public, set]), - set_prefix(0); - _ -> - [{prefix, Prefix}] = ets:lookup(?prefix_table, prefix), - Prefix - end. - -set_prefix(Prefix) -> - ets:insert(?prefix_table, {prefix, Prefix}), - Prefix. + load_test:get_prefix(). next_prefix() -> - set_prefix(get_prefix() + 1). + load_test:next_prefix(). %% -- Utilities -------------------------------------------------------------- @@ -1365,7 +1543,7 @@ to_message(#'Message'{id = Id, files = Files, from = From0, type = Type}) -> #message{ id = Id, payload = Msg, from = From }. to_room(#'Room'{ name = Name0, readers = Readers, unread = Unread, members = Members, admins = Admins }) -> - [Name, _] = binary:split(Name0, <<"-">>), + [Name | _] = binary:split(Name0, <<"-">>), #room_info{ name = Name, members = [to_member(M) || M <- Members], admins = [to_member(A) || A <- Admins], unread = Unread, readers = Readers }. @@ -1391,13 +1569,22 @@ async_send(User, Msg) -> nynja:ws_send_async(User, Packet), ok. +%% Result starts with Id and steps through in direction of N +%% So, slice(Id = 3, N = -2, [5, 4, 3, 2, 1]) == [3, 4] +%% and slice(Id = 3, N = 2, [5, 4, 3, 2, 1]) == [3, 2] slice(Id, Elem, N, List) when N < 0 -> - lists:reverse(slice(Id, Elem, -N, lists:reverse(List))); -slice(Id, Elem, N, [E | _] = List) when element(Elem, E) =< Id -> - lists:sublist(List, N); -slice(Id, Elem, N, [_ | List]) -> - slice(Id, Elem, N, List); -slice(_, _, _, []) -> + Stop = fun(E) -> element(Elem, E) >= Id end, + slice1(Id, Stop, -N, lists:reverse(List)); +slice(Id, Elem, N, List) when N > 0 -> + Stop = fun(E) -> element(Elem, E) =< Id end, + slice1(Id, Stop, N, List). + +slice1(Id, Stop, N, [E | Tail] = List) -> + case Stop(E) of + true -> lists:sublist(List, N); + false -> slice1(Id, Stop, N, Tail) + end; +slice1(_, _, _, []) -> []. %% -- API spec --------------------------------------------------------------- @@ -1415,6 +1602,7 @@ api_spec() -> , #api_fun{ name = message, arity = 6 } , #api_fun{ name = message_ack, arity = 3 } , #api_fun{ name = room, arity = 6 } + , #api_fun{ name = error, arity = 1 } ] } ] }. diff --git a/test/load_test.erl b/test/load_test.erl new file mode 100644 index 0000000000000000000000000000000000000000..3acbf76777e25248f686777525175b90e2765899 --- /dev/null +++ b/test/load_test.erl @@ -0,0 +1,736 @@ +%%% File : load_test.erl +%%% Author : Ulf Norell +%%% Description : +%%% Created : 27 Apr 2020 by Ulf Norell +-module(load_test). + +-compile([export_all, nowarn_export_all]). + +-include_lib("emqttc/include/emqttc_packet.hrl"). +-include_lib("roster/include/roster.hrl"). + +-define(mqtt(Payload), #mqtt_packet{payload = Payload}). + +-define(any(Rec), #Rec{_ = '_'}). +-define(any(Rec, Fld), (#Rec{_ = '_'})#Rec Fld). + +-record(client, {pid :: pid(), + phone :: binary()}). + +%% - Connections (web socket), one or more per User +%% - User +%% - Scenario, one or more Connections/Users +do_test(Cfg) -> + run(Cfg, in_parallel(Cfg#{fanout => 1}, [test2(Cfg), test_room(Cfg)])). + +test(Cfg) -> + N = maps:get(p2p, Cfg, 100), + Users = maps:get(users, Cfg, 200), + Fanout = Users div 2, + Times = maps:get(msgs, Cfg, 5), + Gap = maps:get(gap, Cfg, 1500), + P2P = in_sequence([make_friends(), + wait(3000), + repeat({Times, times, gap, Gap}, send_p2p())]), + %% P2P = in_sequence([make_friends(), send_p2p(), send_p2p()]), + in_parallel(Cfg#{fanout => Fanout}, lists:duplicate(N, P2P)). + +test2(Cfg) -> + N = maps:get(p2p, Cfg, 100), + Users = maps:get(users, Cfg, 200), + Fanout = Users div 2, + For = maps:get(for, Cfg, 10000), + Every = maps:get(every, Cfg, 500), + P2P = in_sequence([make_friends(), + wait(3000), + repeat({for, For, every, Every}, send_p2p()) + ]), + in_parallel(Cfg#{fanout => Fanout}, lists:duplicate(N, P2P)). + +test_room(Cfg) -> + N = maps:get(rooms, Cfg, 100), + MaxSize = maps:get(max_size, Cfg, 10), + Users = maps:get(users, Cfg, 200), + Fanout = Users div MaxSize, + For = maps:get(for, Cfg, 10000), + Every = maps:get(every, Cfg, 500), + + Sizes = [ rand:uniform(MaxSize) || _ <- lists:seq(1, N) ], + + Scenarios = + [ in_sequence([create_room(Size), + wait(5000), + repeat({for, For, every, Every}, send_room(Size))]) + || Size <- Sizes ], + + in_parallel(Cfg#{ fanout => Fanout }, Scenarios). + +connect_test(Cfg) -> + N = maps:get(users, Cfg, 200), + Fanout = N, + Connect = {#{users => 1}, fun(#{users := [Client]} = Env) -> + Tag = make_tag(connect), + DeviceId = rand:uniform(1000), + connect(Client, DeviceId, Tag), + timestamp(Client, Tag), + timer:sleep(1000), + disconnect(Client, DeviceId), + Env + end}, + in_parallel(Cfg#{fanout => Fanout}, lists:duplicate(N, Connect)). + +%% Simplest non-trivial scenario: +%% One connected user calls get_profile once. + +get_profile() -> + {#{users => 1}, + fun(#{users := [Client]} = Env) -> + OuterTag = make_tag(connect_and_get_profile), + InnerTag = make_tag(get_profile), + DeviceId = rand:uniform(4), + connect(Client, DeviceId, OuterTag), + send(Client, DeviceId, InnerTag, #'Profile'{ status = get }), + recv(Client, DeviceId, InnerTag, ?any('Profile')), + disconnect(Client, DeviceId, OuterTag), + Env + end}. + +make_friends() -> + {#{users => 2}, + fun(#{users := [Client1, Client2]} = Env) -> + Connect1 = make_tag(connect), + Connect2 = make_tag(connect), + PhoneId1 = connect_user(Client1, Connect1), + timestamp(Client1, Connect1), + PhoneId2 = connect_user(Client2, Connect2), + timestamp(Client2, Connect2), + with_lock([Client1, Client2], make_friends, fun() -> + CheckTag = make_tag(check_friends), + send(Client1, CheckTag, #'Profile'{ status = get }), + #'Profile'{rosters = [#'Roster'{userlist = Contacts}]} = recv(Client1, CheckTag, #'Profile'{_ = '_'}), + case lists:keymember(PhoneId2, #'Contact'.phone_id, Contacts) of + true -> ok; + false -> + MakeTag = make_tag(make_friends), + send(Client1, MakeTag, nynja:friend_req(PhoneId1, PhoneId2)), + recv(Client1, ?any('Contact', {phone_id = PhoneId2})), + recv(Client2, ?any('Contact', {phone_id = PhoneId1})), + send(Client2, nynja:friend_req_accept_msg(PhoneId2, PhoneId1)), + recv(Client1, ?any('Contact', {phone_id = PhoneId2})), + recv(Client2, MakeTag, ?any('Contact', {phone_id = PhoneId1})) + end + end), + Env + end}. + +%% Assume already friends +send_p2p() -> + {#{users => 2}, + fun(#{users := [Client1, Client2]} = Env) -> + Tag = make_tag(?FUNCTION_NAME), + PhoneId1 = connect_user(Client1), + PhoneId2 = connect_user(Client2), + Msg = rnd_string("msg"), + Ack = false, + Feed = nynja:p2p_feed(PhoneId1, PhoneId2), + Packet = nynja:make_message(PhoneId1, PhoneId1, PhoneId2, Feed, Msg, Ack), + send(Client1, Tag, Packet), + MsgPat = ?any('Message', {files = {member, ?any('Desc', {payload = Msg})}}), + async_recv(Client1, Tag, MsgPat), + #'Message'{ id = MsgId } = recv(Client2, Tag, MsgPat), + Env#{ message => {MsgId, Msg} } + end}. + +%% -- Rooms -- + +create_room(N) -> + {#{users => N}, + fun(#{users := Clients = [AdminClient | MemberClients]} = Env) -> + Name = rnd_string("room"), + [Admin | Members] = [ connect_user(C) || C <- Clients ], + [ disconnect(C) || C <- MemberClients ], + + RoomId = nynja:typed_uuid(<<"room">>, Name), + Ms = [ nynja:make_member(#{ phone_id => PId, room_id => RoomId, status => member }) + || PId <- Members ], + As = [nynja:make_member(#{ phone_id => Admin, room_id => RoomId, status => admin })], + Room = #'Room'{ id = RoomId, name = Name, members = Ms, admins = As, + type = group, tos = <<"aLL tHE LoAD">>, status = create }, + Tag = make_tag(create_room), + send(AdminClient, Tag, Room), + recv(AdminClient, Tag, ?any('Room', {id = RoomId})), + disconnect(AdminClient), + Env#{ room => RoomId } + end}. + +send_room(N) -> + {#{users => N}, + fun(#{users := Clients, room := RoomId} = Env) -> + Sender = lists:nth(rand:uniform(length(Clients)), Clients), + Receiver = lists:nth(rand:uniform(length(Clients)), Clients), + Device1 = 1, + Device2 = 2, + PhoneId = connect_user(Sender, Device1), + connect(Receiver, Device2), + Msg = rnd_string("msg"), + MsgPat = ?any('Message', {files = {member, ?any('Desc', {payload = Msg})}, to = RoomId}), + Packet = nynja:make_message(PhoneId, PhoneId, RoomId, {muc, RoomId}, Msg, false), + Tag = make_tag(send_room), + send(Sender, Device1, Tag, Packet), + recv(Sender, Device1, Tag, MsgPat), + recv(Receiver, Device2, Tag, MsgPat), + Env + end}. + + +%% -- Combinators ------------------------------------------------------------ + +wait(T) -> + {#{users => 0}, fun(Env) -> timer:sleep(T), Env end}. + +repeat({for, N, every, T}, {Cfg, Run}) -> + {Cfg, fun(Env) -> + Ref = make_ref(), + Root = self(), + Loop = fun Loop(Env0) -> + receive + stop -> Root ! {stopped, Ref, Env0}; + go -> + erlang:send_after(T + fudge_time(T), self(), go), + Env1 = try_run(Run, Env0), + Loop(Env1) + end end, + Pid = spawn(fun() -> Loop(Env) end), + timer:sleep(abs(fudge_time(T))), + Pid ! go, + erlang:send_after(N, Pid, stop), + receive + {stopped, Ref, Env1} -> Env1 + end end}; +repeat({N, times, gap, T}, {Cfg, Run}) -> + { Cfg, + fun(Env) -> + Loop = fun Loop(0, Env0) -> Env0; + Loop(I, Env0) -> + Env1 = try_run(Run, Env0), + timer:sleep(T + fudge_time(T)), + Loop(I - 1, Env1) + end, + timer:sleep(abs(fudge_time(T))), + Loop(N, Env) + end}. + + +in_parallel(Scenarios) -> + in_parallel(#{}, Scenarios). + +in_parallel(Cfg, Scenarios) -> + K = maps:get(fanout, Cfg, 2), + Stagger = maps:get(stagger, Cfg, 1000), + Is = [ I || {#{users := I}, _} <- Scenarios ], + N = round(K * lists:max(Is)), + Slices = [ lists:sort(rnd_without_duplicates(I, N)) || I <- Is ], + {#{users => N}, + fun(#{users := Users} = Env) -> + Root = self(), + Pids = [ spawn(fun() -> + timer:sleep(rand:uniform(Stagger)), + try_run(Run, Env#{users => slice(Slice, Users)}), + Root ! {self(), done} end) + || {Slice, {_, Run}} <- lists:zip(Slices, Scenarios) ], + [ receive {Pid, done} -> ok end || Pid <- Pids ], + Env + end}. + +in_sequence(Scenarios) -> + N = lists:max([ I || {#{users := I}, _} <- Scenarios ]), + {#{users => N}, + fun(Env0 = #{users := Users}) -> + lists:foldl(fun({#{users := I}, Run}, Env) -> + Run(Env#{ users := lists:sublist(Users, I) }) + end, Env0, Scenarios) + end}. + +%% -- Scenarios -------------------------------------------------------------- + +run(Scenario) -> run(#{}, Scenario). + +run(Cfg0, {Cfg, Run}) -> + next_prefix(), + #{users := NUsers} = Cfg, + SetupWorkers = maps:get(setup_workers, Cfg0, 4), + Monitor = start_monitor(), + T1 = os:timestamp(), + Users = setup_users(SetupWorkers, Monitor, NUsers), + io:format("--- SETUP DONE (in ~.2f ms)\n\n", [timer:now_diff(os:timestamp(), T1) / 1000]), + Env1 = try_run(Run, #{monitor => Monitor, users => Users}), + {UnMatched, LeftOvers0} = lists:unzip([ stop(Client) || Client <- Users ]), + LeftOvers = group(LeftOvers0), + Report = stop_monitor(Monitor), + pretty_report(Report, lists:sum(UnMatched)), + {LeftOvers, maps:without([users, monitor], Env1)}. + +try_run(Run, Env = #{monitor := Monitor}) -> + try Run(Env) + catch + _:{Reason, Tag, What} -> + report_error(Monitor, Reason, Tag, What), + Env; + _:Err:Stacktrace -> + io:format("Error: ~p\n ~p\n", [Err, Stacktrace]) + end. + +setup_users(NWorkers, Monitor, NUsers) -> + RegU = fun(Parent, A, B) -> + Sys = nynja:connect_sys(), + [ nynja:register_profile(Sys, make_phone(I), false) || I <- lists:seq(A, B) ], + nynja:ws_close(Sys), + Parent ! {self(), done} + end, + Root = self(), + Pids = [ spawn(fun() -> RegU(Root, A, B) end) + || {A, B} <- split_work(NWorkers, NUsers) ], + [ receive {Pid, done} -> ok end || Pid <- Pids ], + + [ begin + Phone = make_phone(I), + Handler = connection_handler(Monitor, Phone), + #client{ pid = Handler, phone = Phone } + end || I <- lists:seq(1, NUsers) + ]. + +split_work(I, N) -> + S = N div I, + split_work(1, S, N). + +split_work(N, S, X) when N + S >= X -> [{N, X}]; +split_work(N, S, X) -> [{N, N + S} | split_work(N + S + 1, S, X)]. + + +%% -- Connection handler ----------------------------------------------------- + +connection_handler(Monitor, Phone) -> + Parent = self(), + Env = #{ parent => Parent, monitor => Monitor, locks => #{}, + phone => Phone, devices => #{}, connections => #{} }, + Pid = spawn_link(fun() -> + process_flag(trap_exit, true), + Parent ! {self(), connected}, + handle_connection(Env) + end), + receive + {Pid, connected} -> Pid + end. + +-record(expect, {from :: pid() | noreply, + pattern :: term(), + tag :: term()}). + +-record(received, {timestamp :: os:timestamp(), + payload :: term()}). + +-record(device, {connections = 0, + handle, + expecting, + received}). + +handle_connection(Env = #{parent := Parent, + monitor := Monitor, + devices := Devices, + connections := Connections}) -> + receive + {'EXIT', Parent, _Reason} -> + [ nynja:ws_close(Conn) || #device{ handle = Conn } <- maps:values(Devices) ]; + + {call, From, _DeviceId, Tag, stop} -> + [ nynja:ws_close(Conn) || #device{ handle = Conn } <- maps:values(Devices) ], + LeftOvers = [ {Name, Pat} || #device{ expecting = Expecting } <- maps:values(Devices), + #expect{ from = noreply, pattern = Pat, + tag = {Name, _} } <- q_to_list(Expecting) ], + + UnMatched = lists:sum([ length(q_to_list(Rcvd)) || #device{ received = Rcvd } <- maps:values(Devices) ]), + reply(From, Tag, {UnMatched, LeftOvers}); + + {call, From, DeviceId, Tag, Msg} -> + Env1 = handle_call(Env, From, DeviceId, Tag, Msg), + handle_connection(Env1); + + {Pid, ?mqtt(Payload)} -> + case maps:get(Pid, Connections, undefined) of + undefined -> + %% Lagging message to disconnecting client. + %% io:format("Unknown connection sent ~p\n", [Payload]), + handle_connection(Env); + DeviceId -> + #device{ expecting = Expecting, received = Received } = D = maps:get(DeviceId, Devices), + %% io:format("~p <- ~p\n", [maps:get(phone, Env), Payload]), + T = os:timestamp(), + case find_matching_pattern(q_to_list(Expecting), Payload) of + false -> + Recv = #received{ timestamp = T, payload = Payload }, + D1 = D#device{ received = snoc(Received, Recv) }, + handle_connection(Env#{ devices := Devices#{ DeviceId := D1 } }); + {#expect{from = From, tag = Tag}, Expecting1} -> + report_time(Monitor, Tag, T), + reply(From, Tag, Payload), + D1 = D#device{ expecting = q_from_list(Expecting1) }, + handle_connection(Env#{ devices := Devices#{ DeviceId := D1 } }) + end + end + end. + +handle_call(Env = #{monitor := Monitor, + phone := Phone, + locks := Locks, + devices := Devices, + connections := Connections}, From, DeviceId, Tag, Msg) -> + case Msg of + + {lock, Lock} -> + case Locks of + #{ Lock := Callers } -> + Env#{ locks := Locks#{ Lock := Callers ++ [{From, Tag}] } }; + _ -> + reply(From, Tag, locked), + Env#{ locks := Locks#{ Lock => [] } } + end; + + {unlock, Lock} -> + reply(From, Tag, unlocked), + case maps:get(Lock, Locks) of + [] -> Env#{ locks := maps:remove(Lock, Locks) }; + [{Next, Tag1} | Callers1] -> + reply(Next, Tag1, locked), + Env#{ locks := Locks#{ Lock := Callers1 } } + end; + + connect -> + report_time(Monitor, Tag, os:timestamp()), + try + {Devices1, Connections1} = + case Devices of + #{DeviceId := D} -> + {Devices#{DeviceId := D#device{ connections = D#device.connections + 1 }}, + Connections}; + _ -> + Conn = nynja:connect_user_(Phone), + Pid = nynja:user_pid(Conn), + {Devices#{DeviceId => #device{ connections = 1, + handle = Conn, + expecting = empty_q(), + received = empty_q() }}, + Connections#{ Pid => DeviceId }} + end, + reply(From, Tag, ok), + Env#{ devices := Devices1, connections := Connections1 } + catch _:R:S -> + io:format("Crash: ~p\n ~p\n", [R, S]), + Env + end; + + disconnect -> + report_time(Monitor, Tag, os:timestamp()), + {Devices1, Connections1} = + case Devices of + #{DeviceId := #device{ connections = 1, handle = Conn }} -> + nynja:ws_close(Conn), + {maps:remove(DeviceId, Devices), maps:remove(nynja:user_pid(Conn), Connections)}; + #{DeviceId := D} -> + {Devices#{DeviceId := D#device{ connections = D#device.connections - 1 }}, + Connections} + end, + reply(From, Tag, ok), + Env#{ devices := Devices1, connections := Connections1 }; + + tag -> + report_time(Monitor, Tag, os:timestamp()), + Env; + + {send, Payload} -> + %% io:format("~p -> ~p\n", [Phone, Payload]), + #device{ handle = Conn } = maps:get(DeviceId, Devices), + Packet = nynja:mqtt_publish(nynja:user_client(Conn), Payload), + nynja:ws_send_async(Conn, Packet), + report_time(Monitor, Tag, os:timestamp()), + reply(From, Tag, ok), + Env; + + {recv, Pattern} -> + #device{ expecting = Expecting, received = Received } = D = maps:get(DeviceId, Devices), + case find_matching_value(Pattern, q_to_list(Received)) of + false -> + Expect = #expect{ from = From, pattern = Pattern, tag = Tag }, + D1 = D#device{ expecting = snoc(Expecting, Expect) }, + Env#{ devices := Devices#{ DeviceId := D1 } }; + {#received{ timestamp = T, payload = Packet }, Received1} -> + report_time(Monitor, Tag, T), + reply(From, Tag, Packet), + D1 = D#device{ received = q_from_list(Received1) }, + Env#{ devices := Devices#{ DeviceId := D1 } } + end + end. + +%% --- + +-define(TIMEOUT, 5000). + +stop(Client) -> + try call(Client, 0, notag, stop, 5000) + catch _:_ -> [] end. + +connect_user(Client) -> connect_user(Client, 1). +connect_user(Client, DeviceId) when is_integer(DeviceId) -> connect_user(Client, DeviceId, notag); +connect_user(Client, Tag) -> connect_user(Client, 1, Tag). +connect_user(Client, DeviceId, Tag) -> + connect(Client, DeviceId, Tag), + send(Client, DeviceId, Tag, #'Profile'{ status = get }), + #'Profile'{ phone = Phone, rosters = [#'Roster'{ id = RosterIx }] } = + recv(Client, DeviceId, Tag, ?any('Profile')), + nynja:make_phone_id(Phone, RosterIx). + +connect(Client) -> connect(Client, 1). +connect(Client, DeviceId) when is_integer(DeviceId) -> connect(Client, DeviceId, notag); +connect(Client, Tag) -> connect(Client, 1, Tag). +connect(Client, DeviceId, Tag) -> + call(Client, DeviceId, Tag, connect, 20000). + +disconnect(Client) -> disconnect(Client, 1). +disconnect(Client, DeviceId) when is_integer(DeviceId) -> disconnect(Client, DeviceId, notag); +disconnect(Client, Tag) -> disconnect(Client, 1, Tag). +disconnect(Client, DeviceId, Tag) -> + cast(Client, DeviceId, Tag, disconnect). + +send(Client, Packet) -> send(Client, 1, Packet). +send(Client, DeviceId, Packet) when is_integer(DeviceId) -> send(Client, DeviceId, notag, Packet); +send(Client, Tag, Packet) -> send(Client, 1, Tag, Packet). +send(Client, DeviceId, Tag, Packet) -> + cast(Client, DeviceId, Tag, {send, Packet}). + +async_recv(Client, Pattern) -> async_recv(Client, 1, Pattern). +async_recv(Client, DeviceId, Pattern) when is_integer(DeviceId) -> async_recv(Client, DeviceId, notag, Pattern); +async_recv(Client, Tag, Pattern) -> async_recv(Client, 1, Tag, Pattern). +async_recv(Client, DeviceId, Tag, Pattern) -> + cast(Client, DeviceId, Tag, {recv, Pattern}). + +recv(Client, Pattern) -> recv(Client, 1, Pattern). +recv(Client, DeviceId, Pattern) when is_integer(DeviceId) -> recv(Client, DeviceId, notag, Pattern); +recv(Client, Tag, Pattern) -> recv(Client, 1, Tag, Pattern). +recv(Client, DeviceId, Tag, Pattern) -> + call(Client, DeviceId, Tag, {recv, Pattern}). + +timestamp(Client, Tag) -> + cast(Client, 0, Tag, tag). + +lock(Clients, Lock) when is_list(Clients) -> + [ lock(Client, Lock) || Client <- lists:sort(Clients) ]; +lock(Client, Lock) -> + call(Client, 0, notag, {lock, Lock}, 20000). + +unlock(Clients, Lock) when is_list(Clients) -> + [ unlock(Client, Lock) || Client <- lists:reverse(lists:sort(Clients)) ]; +unlock(Client, Lock) -> + call(Client, 0, notag, {unlock, Lock}). + +with_lock(Clients, Lock, Fun) -> + lock(Clients, Lock), + try Fun() after unlock(Clients, Lock) end. + +cast(#client{ pid = Pid }, DeviceId, Tag, Msg) -> + Pid ! {call, noreply, DeviceId, Tag, Msg}, + ok. + +call(Client, DeviceId, Tag, Msg) -> + call(Client, DeviceId, Tag, Msg, ?TIMEOUT). + +call(#client{ pid = Pid, phone = Phone }, DeviceId, Tag, Msg, Timeout) -> + Ref = make_ref(), + Pid ! {call, {self(), Ref}, DeviceId, Tag, Msg}, + receive + {Ref, Tag, Res} -> Res + after Timeout -> + error({timeout, Tag, {Phone, Msg}}) + end. + +reply(noreply, _, _) -> ok; +reply({Pid, Ref}, Tag, Packet) -> Pid ! {Ref, Tag, Packet}. + +find(Fun, Xs) -> + case lists:splitwith(fun(X) -> not Fun(X) end, Xs) of + {Ys, [X | Zs]} -> {X, Ys ++ Zs}; + _ -> false + end. + +find_matching_value(Pattern, Received) -> + Res = find(fun(#received{payload = Payload}) -> match(Pattern, Payload) end, Received), + %% io:format("find_value\n Pat = ~p\n Vals = ~p\n Res = ~p\n", [Pattern, Received, Res]), + Res. + +find_matching_pattern(Expecting, Payload) -> + Res = find(fun(#expect{pattern = Pattern}) -> match(Pattern, Payload) end, Expecting), + %% io:format("find_pattern\n Pats = ~p\n Val = ~p\n Res = ~p\n", [Expecting, Payload, Res]), + Res. + +match('_', _) -> true; +match({member, P}, Xs) when is_list(Xs) -> + lists:any(fun(X) -> match(P, X) end, Xs); +match(X, X) -> true; +match(Pat, Val) when is_tuple(Pat), is_tuple(Val) -> + match(tuple_to_list(Pat), tuple_to_list(Val)); +match([P | Ps], [V | Vs]) -> + match(P, V) andalso match(Ps, Vs); +match(_, _) -> false. + +%% -- Monitor ---------------------------------------------------------------- + +start_monitor() -> + spawn(fun() -> monitor_loop(#{}, #{}) end). + +stop_monitor(Monitor) -> + Ref = make_ref(), + Monitor ! {self(), Ref, stop}, + receive + {Ref, Report} -> Report + after 1000 -> + {error, timeout} + end. + +report_time(_, notag, _) -> ok; +report_time(Monitor, Tag, Time) -> + Monitor ! {report, Tag, Time}. + +report_error(Monitor, Reason, Tag, What) -> + Monitor ! {error, Tag, {Reason, What}}. + +monitor_loop(Timestamps, Errors) -> + receive + {report, Tag, T} -> + Timestamps1 = maps:update_with(Tag, fun(Ts) -> [T | Ts] end, [T], Timestamps), + monitor_loop(Timestamps1, Errors); + {error, Tag0, Reason } -> + Tag = case Tag0 of {T, _} -> T; T -> T end, + Errors1 = maps:update_with(Tag, fun(Errs) -> [Reason | Errs] end, [Reason], Errors), + monitor_loop(Timestamps, Errors1); + {From, Ref, stop} -> + Report = + [ {Name, timer:now_diff(lists:max(Ts), lists:min(Ts))} + || {{Name, _Ref}, Ts} <- maps:to_list(maps:without(maps:keys(Errors), Timestamps)) ], + TotalReport = + maps:map(fun(_, {Min, Max}) -> timer:now_diff(Max, Min) end, + lists:foldl(fun({{Name, _Ref}, Ts}, Rep) -> + Min1 = lists:min(Ts), + Max1 = lists:max(Ts), + maps:update_with(Name, fun({Min, Max}) -> {min(Min, Min1), max(Max, Max1)} end, {Min1, Max1}, Rep) + end, #{}, maps:to_list(Timestamps))), + ErrorReport = + [ {Name, [ Err || {Name1, Errs} <- maps:to_list(Errors), + Name1 == Name, + Err <- Errs ]} + || Name <- lists:usort(maps:keys(Errors)) ], + From ! {Ref, {Report, TotalReport, ErrorReport}} + end. + +pretty_report({Report0, TotalReport, Errs}, UnMatched) -> + Report = group(Report0), + io:format("--- LOAD TEST REPORT\n"), + lists:foreach(fun(R) -> report_tag(TotalReport, R) end, Report), + io:format("\nTotal number of unmatched messages: ~p\n", [UnMatched]), + io:format("--- END ---\n\n"), + [ io:format("~s\n ~p\n", [Tag, Es]) || {Tag, Es} <- Errs ], + io:format("~p\n", [TotalReport]). + +report_tag(Totals, {Tag, Xs}) -> + Total = maps:get(Tag, Totals), + Num = length(Xs), + Throughput = Total / Num, + Sum = lists:sum(Xs), + Mean = Sum / Num, + M2 = lists:sum([ (X - Mean) * (X - Mean) || X <- Xs ]), + %% Min = lists:min(Xs), + Max = lists:max(Xs), + Var = if Num =< 1 -> 0; + true -> M2 / (Num - 1) end, + StdDev = math:sqrt(Var), + io:format("~30s : Total = ~.2f ms, Throughput = ~.2f ms, N = ~p, Avg = ~.2f ms, Max = ~.2f ms, StdDev: ~.2f ms\n", + [io_lib:format("~p", [Tag]), Total / 1000, Throughput / 1000, Num, Mean / 1000, Max / 1000, StdDev / 1000]). + + +%% -- Prefixes --------------------------------------------------------------- + +-define(prefix_table, load_test_prefix). + +ensure_prefix() -> + case ets:info(?prefix_table) of + undefined -> + ets:new(?prefix_table, [named_table, public, set]), + ets:insert(?prefix_table, {prefix, 0}); + _ -> ok + end. + +get_prefix() -> + ensure_prefix(), + [{prefix, Prefix}] = ets:lookup(?prefix_table, prefix), + Prefix. + +set_prefix(Prefix) -> + ensure_prefix(), + ets:insert(?prefix_table, {prefix, Prefix}), + Prefix. + +next_prefix() -> + fetch_prefix(). + +-define(prefix_user, <<"00">>). + +init_prefix() -> + nynja:ws_close(nynja:register_profile(?prefix_user)). + +fetch_prefix() -> + User = nynja:connect_user(?prefix_user), + #mqtt_packet{ payload = #'Message'{ id = MsgId } } = + nynja:send_message_(User, <<"Me has prefix?">>, nynja:user_roster(User), nynja:feed_id(User, User)), + nynja:ws_close(User), + set_prefix(MsgId). + +%% -- Utilities -------------------------------------------------------------- + +make_phone(N) -> + Prefix = get_prefix(), + iolist_to_binary(io_lib:format("~p~9.10.0b", [Prefix, N])). + +make_tag(Name) -> + {Name, make_ref()}. + +snoc({Front, End}, X) -> {Front, [X | End]}. + +empty_q() -> q_from_list([]). + +q_to_list({Front, End}) -> Front ++ lists:reverse(End). +q_from_list(Xs) -> {Xs, []}. + +slice(Ixs, Xs) -> slice(1, Ixs, Xs). + +slice(I, [I | Ixs], [X | Xs]) -> [X | slice(I + 1, Ixs, Xs)]; +slice(I, Ixs, [_ | Xs]) -> slice(I + 1, Ixs, Xs); +slice(_, [], _) -> []. + +rnd_without_duplicates(K, N) -> + Is = lists:seq(1, N), + rnd_draw(K, N, Is). + +rnd_draw(0, _, _) -> []; +rnd_draw(K, N, Is) -> + Ix = rand:uniform(N), %% vv could be optimized with a tree structure instead of list + {Is0, [I | Is1]} = lists:split(Ix - 1, Is), + [I | rnd_draw(K - 1, N - 1, Is0 ++ Is1)]. + +rnd_string(Str) -> + N = rand:uniform(1 bsl 40) - 1, + iolist_to_binary(io_lib:format("~s:~8.32.0b", [Str, N])). + +fudge_time(T) -> + round(T * 0.1 * (rand:uniform() - 0.5)). + +group(Xs) -> + Keys = lists:usort([ Key || {Key, _} <- Xs ]), + [ {Key, [ Val || {K, Val} <- Xs, K == Key ]} || Key <- Keys ]. + diff --git a/test/nynja.erl b/test/nynja.erl index 7d583043cdaa2723603595a5ae77673ac746bbb0..500a2ac02a833b51405f12df32fc0ce1c8e90629 100644 --- a/test/nynja.erl +++ b/test/nynja.erl @@ -2,7 +2,7 @@ -module(nynja). -include_lib("emqttc/include/emqttc_packet.hrl"). --include("apps/roster/include/roster.hrl"). +-include_lib("roster/include/roster.hrl"). -compile([export_all, nowarn_export_all]). @@ -74,10 +74,14 @@ register_profile(Sys, Info, Connect) -> connect_user(Phone) -> PhoneLink = phone_uuid(Phone), RosterUUID = roster_uuid(Phone), - connect_user(Phone, PhoneLink, RosterUUID). -connect_user(Phone, PhoneLink, RosterUUID) -> +connect_user_(Phone) -> + PhoneLink = phone_uuid(Phone), + RosterUUID = roster_uuid(Phone), + connect_user_(Phone, PhoneLink, RosterUUID). + +connect_user_(Phone, PhoneLink, RosterUUID) -> Token = jwt_token(RosterUUID), User0 = ws_connect([{<<"x-json-web-token">>, Token}]), ClientId = <<"emqttd_", (pick_client_id(Phone))/binary>>, @@ -86,8 +90,11 @@ connect_user(Phone, PhoneLink, RosterUUID) -> roster_uuid = RosterUUID, phone_uuid = PhoneLink }, connect_micro(User, ClientId), + User. - timer:sleep(25), %% avoid race between subscriptions and get_profile +connect_user(Phone, PhoneLink, RosterUUID) -> + User = connect_user_(Phone, PhoneLink, RosterUUID), + %% timer:sleep(25), %% avoid race between subscriptions and get_profile #mqtt_packet{ payload = #'Profile'{ rosters = [#'Roster'{ id = RosterIx }] } } = get_profile(User, 5000), @@ -139,7 +146,10 @@ room_feed(Room) -> #muc{ name = typed_uuid(<<"room">>, Room) }. feed_id(#user{roster_id = RId1}, #user{ roster_id = RId2 }) -> - list_to_tuple([p2p | lists:sort([RId1, RId2])]). + p2p_feed(RId1, RId2). + +p2p_feed(PhoneId1, PhoneId2) -> + list_to_tuple([p2p | lists:sort([PhoneId1, PhoneId2])]). send_message(#user{roster_id = RosterId1} = User1, User2, Msg, FeedId) -> @@ -199,6 +209,16 @@ remove_from_room(User = #user{ client = ClientId }, Phone, RoomName, Status) -> ws_send(User, mqtt_publish(ClientId, Room)) end. +delete_room(RoomName) -> + Sys = connect_sys(), + delete_room(Sys, RoomName), + ws_close(Sys). + +delete_room(Sys = #user{ client = SysId }, RoomName) -> + RoomId = typed_uuid(<<"room">>, RoomName), + Room = #'Room'{ id = RoomId, status = delete }, + ws_send(Sys, mqtt_publish(SysId, Room), 0, 100). %% no reply?! + get_member_phone_id(User, Alias, RoomName) -> #mqtt_packet{ payload = Room } = get_room(User, RoomName), Members = Room#'Room'.admins ++ Room#'Room'.members, @@ -230,7 +250,10 @@ make_room_message(User, RoomName, Msg, Ack) -> make_message(User, RoomId, #muc{ name = RoomId }, Msg, Ack). make_message(#user{ client = ClientId, roster_id = PhoneId }, To, Feed, Msg, Ack) -> - TS = integer_to_binary(now_usec()), + make_message(ClientId, PhoneId, To, Feed, Msg, Ack). + +make_message(ClientId, PhoneId, To, Feed, Msg, Ack) -> + TS = integer_to_binary(uniq()), ID = <>, PayloadDesc = case Msg of @@ -426,7 +449,7 @@ read_msg(#user{ roster_id = RId }, FeedId, MsgId) -> #'History'{ roster_id = RId, feed = FeedId, entity_id = MsgId, status = update }. delete_hist(FeedId) -> - #'History'{ feed = FeedId, status = delete }. + #'History'{ roster_id = <<>>, feed = FeedId, status = delete }. user_conn(#user{conn = Conn}) -> Conn. @@ -547,8 +570,8 @@ ws_connect_(Hdrs) -> {gun_error, ConnPid, _StreamRef, Reason} -> exit({ws_upgrade_error, Reason}) - after 1000 -> - exit(timeout) + after 10000 -> + exit(ws_connect_timeout) end. ws_close(#user{ client_pid = Pid }) when is_pid(Pid) -> @@ -629,6 +652,9 @@ decode_frame({binary, Bin}) -> Resp end. +uniq() -> + rand:uniform(1 bsl 64) - 1. + now_usec() -> {A, B, C} = os:timestamp(), C + 1000000 * (B + 1000000 * A).