diff --git a/apps/roster/src/protocol/micro_auth.erl b/apps/roster/src/protocol/micro_auth.erl index 44cf751d9941b9527883180593b04582a40f4ff9..5508c1acb1edb76865310ca8da795215121cf1d4 100644 --- a/apps/roster/src/protocol/micro_auth.erl +++ b/apps/roster/src/protocol/micro_auth.erl @@ -45,6 +45,13 @@ check(#mqtt_client{client_id = <<"emqttd_", _/binary>> = ClientId, AuthPid = n2o_async:pid(system, ?MODULE), Ver = binary_to_list(BVer), case kvs:get('Auth', ClientId) of + {ok, #'Auth'{user_id = PhoneId, type = logout}} -> + FreshAuth = #'Auth'{user_id = PhoneId, phone = roster:phone(PhoneId), + last_online = roster:now_msec(), type = verified}, + kvs:put(FreshAuth), + roster:info(?MODULE, "~p:Auth:auth(micro)/check:session created, post logout ", [ClientId]), + AuthPid ! roster_auth:control_ver(FreshAuth, Ver), + ok; {ok, #'Auth'{user_id = PhoneId} = Auth} -> AuthPid ! roster_auth:control_ver(Auth#'Auth'{type = []}, Ver), ok; diff --git a/apps/roster/src/protocol/roster_message.erl b/apps/roster/src/protocol/roster_message.erl index 4c341efca888e0eea0765b42a84ff7fbb96d7a46..afdbe65b5040a0f79b97de6b6927e09a33836c3a 100644 --- a/apps/roster/src/protocol/roster_message.erl +++ b/apps/roster/src/protocol/roster_message.erl @@ -47,7 +47,7 @@ info(#'Message'{status = [], id = [], feed_id = F, from=From0, to = To, <<"sys">> -> From0; <<"emqttd">> -> roster:phone_id(ClientId) end, - Created=roster:now_msec(), + FId = roster:roster_id(From), FeedData = {R, UID} = case Feed = roster:feed_key(F) of @@ -268,9 +268,12 @@ info(#'Message'{id = Id, msg_id = ClMID, feed_id = Feed, from = From0, seenby = _ -> skip end; _ -> skip end, n2o_vnode:send(C, Topic, term_to_binary(Internal)), %% NOTE! send push about deleted msg only if acted user is in the seenby - case Seen of [-1] -> - n2o_async:pid(system, ?MODULE) ! {send_push, From1, To1, Internal, ?MSG_DELETE_ACTION}; _ -> - skip end, + case Seen of + [-1] -> + n2o_async:pid(system, ?MODULE) ! {send_push, From1, To1, Internal, ?MSG_DELETE_ACTION}; + _ -> + skip + end, case {Type, Link} of {[reply], Link} when is_integer(Link) andalso NewSeen == [-1] -> case kvs:get('Message', Link) of @@ -431,7 +434,13 @@ proc({send_push, From, To, #'Message'{type = TypeList} = Msg, Action}, #handler{ %% Check should server proceed push %% TODO add push for message/delete case From of - To -> case lists:member(cursor, TypeList) of true -> notify(From, To, Msg, Action); _ -> roster:info(?MODULE, "ExcessivePush:~p", [From]) end; + To -> + case lists:member(cursor, TypeList) of + true -> + notify(From, To, Msg, Action); + _ -> + roster:info(?MODULE, "ExcessivePush:~p", [From]) + end; _ -> notify(From, To, Msg, Action) end, {reply, [], H}; diff --git a/apps/roster/src/protocol/roster_presence.erl b/apps/roster/src/protocol/roster_presence.erl index 68ca6c17385ae92a4587a53bd5d94b7ce5fc973c..57e26ec43430b313225e121a5ab9f1c139357dc2 100644 --- a/apps/roster/src/protocol/roster_presence.erl +++ b/apps/roster/src/protocol/roster_presence.erl @@ -36,11 +36,20 @@ on_disconnect(#'Auth'{type = logout, phone = Phone, client_id = ClientId, user_i roster:info(?MODULE, "~p:~p:DISCONNECT:LOGOUT", [Phone, ClientId]), send_presence(offline, Phone, C, ClientId), roster:unsubscribe_p2p(ClientId, roster:roster_id(PhoneId)), - roster:unsubscribe_room(ClientId), kvs:delete('Auth', ClientId), + roster:unsubscribe_room(ClientId), + kvs:delete('Auth', ClientId), roster:final_disconnect(ClientId); -on_disconnect(#'Auth'{phone = Phone, client_id = ClientId}, C) -> +on_disconnect(#'Auth'{type = disconnect, phone = Phone, client_id = ClientId, user_id = PhoneId}, C) -> roster:info(?MODULE, "~p:~p:DISCONNECT", [Phone, ClientId]), + send_presence(offline, Phone, C, ClientId), + roster:unsubscribe_p2p(ClientId, roster:roster_id(PhoneId)), + roster:unsubscribe_room(ClientId), + %% Warning: Do not delete Auth record, it holds push notification tokens + roster:final_disconnect(ClientId); + +on_disconnect(#'Auth'{phone = Phone, client_id = ClientId, type=Type}, C) -> + roster:info(?MODULE, "~p:~p:DISCONNECT:~p", [Phone, ClientId, Type]), send_presence(offline, Phone, C, ClientId). on_verify(ClientId, PhoneId) -> roster:sub_client(subscribe, ClientId, PhoneId). diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index 094ff5327e58a5572e2a587025812a70506f4f36..15cd26b1e2efc5b1db4010b857cb8efe7da0c10b 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -321,7 +321,8 @@ parts_phone_id(PhoneId) -> [Phone] -> list_rosters(Phone, first_roster); _E -> [<<>>, 0] end. roster_id(<<"emqttd_", _/binary>> = ClientId) -> - {ok, #'Auth'{user_id = PhoneId}} = kvs:get('Auth', ClientId), roster_id(PhoneId); + {ok, #'Auth'{user_id = PhoneId}} = kvs:get('Auth', ClientId), + roster_id(PhoneId); roster_id(PhoneId) -> [_, Id] = parts_phone_id(PhoneId), Id. roster_index(Index,V)-> [Id || #'Roster'{id=Id} <-kvs:index('Roster',Index,V)]. @@ -568,18 +569,26 @@ unload() -> emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4), emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3). -on_client_disconnected(_Reason, #mqtt_client{client_id = <<"emqttd_", _/bytes>> = ClientId}, _Env) -> +on_client_disconnected(_Reason, #mqtt_client{client_id = <<"emqttd_", _/bytes>> = ClientId} = Client, _Env) -> + info(roster_auth, "~p:MQTT_CLIENT DISCONNECT ~p",[ClientId, Client]), % TODO: mqttc:publish(C, lists:concat(["ses/",Phone])) on_disconnect(ClientId); -on_client_disconnected(_Reason, #mqtt_client{client_id = <<"reg_", _/bytes>> = ClientId}, _) -> + +on_client_disconnected(_Reason, #mqtt_client{client_id = <<"reg_", _/bytes>> = ClientId} = Client, _) -> + info(roster_auth, "~p:MQTT_CLIENT DISCONNECT ~p",[ClientId, Client]), final_disconnect(ClientId); -on_client_disconnected(_Reason, _Client = #mqtt_client{}, _Env) -> ok. +on_client_disconnected(_Reason, #mqtt_client{client_id = ClientId} = Client, _Env) -> + info(roster_auth, "~p:MQTT_CLIENT DISCONNECT ~p",[ClientId, Client]), + ok. + on_disconnect(<<"emqttd_", _/binary>> = ClientId) -> info(roster_auth, "~p:CLIENT DISCONNECT",[ClientId]), case kvs:get('Auth', ClientId) of - {ok, Auth} -> n2o_async:pid(system, roster_auth) ! {disconnect, Auth}, - kvs:put(Auth#'Auth'{last_online = now_msec()}); - _ -> skip end, + {ok, Auth} -> + n2o_async:pid(system, roster_auth) ! {disconnect, Auth#'Auth'{type = disconnect}}, + kvs:put(Auth#'Auth'{last_online = now_msec(), type = logout}); + _ -> skip + end, ok. final_disconnect(ClientId) -> % info(roster_auth, "~p:ANY DISCONNECT",[RegClientId]), @@ -1095,14 +1104,20 @@ sort_readers(Room) -> readmsgs(#'Contact'{phone_id = R}=C, LocR) -> C#'Contact'{reader = p2p_readmsgs(LocR, R)}; readmsgs(M, _) -> M. -p2p_readmsgs(R, R) -> [contact_readmsg(R,R), contact_readmsg(R,R)]; +p2p_readmsgs(LocR, C = #'Contact'{ phone_id = PhoneId }) -> + [contact_readmsg(LocR, PhoneId), contact_readmsg(C)]; p2p_readmsgs(LocR, R) -> [contact_readmsg(LocR, R), contact_readmsg(R, LocR)]. + +contact_readmsg(#'Contact'{ reader = ReaderId }) when ReaderId /= [] -> + reader_cache(ReaderId); +contact_readmsg(_) -> + 0. + contact_readmsg(#'Roster'{id = LocalId, phone = Phone}, #'Roster'{} = R) -> contact_readmsg(roster:phone_id(LocalId, Phone), R); contact_readmsg(LocalPhoneId, #'Roster'{userlist = Contacts}) -> - case lists:keyfind(LocalPhoneId, #'Contact'.phone_id, Contacts) of - #'Contact'{reader = ReaderId} when ReaderId /= []-> reader_cache(ReaderId); _ -> 0 end; + contact_readmsg(lists:keyfind(LocalPhoneId, #'Contact'.phone_id, Contacts)); contact_readmsg(LocalPhoneId, PhoneId) -> case kvs:get('Roster', roster_id(PhoneId)) of {ok, #'Roster'{} = R}-> contact_readmsg(LocalPhoneId, R); @@ -1274,29 +1289,19 @@ objlist(Index, Id, LastSync, N, Fun) when Index == #'Roster'.userlist; Index == #'Roster'.roomlist; Index == #'Roster'.favorite -> case kvs:get('Roster', Id) of - {ok, R} -> + {ok, R} -> objlist(Index, R, LastSync, N, Fun); _ -> {error, roster_not_found} end. -build_object(Object, - [Chunk | _] = Acc, - Roster, - LastSync, - N, - Fun) when length(Chunk) >= N -> +build_object(Object, [Chunk | _] = Acc, Roster, LastSync, N, Fun) + when N /= [] andalso length(Chunk) >= N -> Fun(Chunk), build_object(Object, [[] | Acc], Roster, LastSync, N, Fun); -build_object(Object, - Acc, - #'Roster'{ - id = Id, - phone = Phone - } = Roster, - LastSync, - _N, - _Fun) -> +build_object(Object, Acc, + #'Roster'{ id = Id, phone = Phone } = Roster, + LastSync, _N, _Fun) -> PhoneId = phone_id(Phone, Id), Feed = feed(Object, PhoneId), LastUpd = last_upd(Object), @@ -1325,48 +1330,40 @@ set_explicit_removed([], RoomId, RoomName) -> set_explicit_removed(Room, _, _) -> Room. - userlist(R) -> hd(split_objlist(#'Roster'.userlist, R)). user(Roster, C) -> user(Roster, C, 0). user(RosterId, UserId, LastSync) -> user(RosterId, UserId, [], LastSync). -user(#'Roster'{ - id = Id, - phone = Phone - }, - #'Contact'{ - phone_id = PhoneId, - reader = Reader - } = C, - W, - _LastSync) +user(#'Roster'{ id = Id, phone = Phone } = Roster, + #'Contact'{ phone_id = PhoneId, reader = Reader } = C, + W, _LastSync) when is_record(W, writer); is_record(W, p2p); W == [] -> - Writer = case W of + LocalPhoneId = phone_id(Phone, Id), + Writer = case W of [] -> - kvs_stream:load_writer(feed(C, phone_id(Phone, Id))); + kvs_stream:load_writer(feed(C, LocalPhoneId)); _ -> - W + W end, - Local = phone_id(Phone, Id), - {Unread, LastMsg, _} = unread_msg(Writer, Reader, Local), + {Unread, LastMsg, _} = unread_msg(Writer, Reader, LocalPhoneId), {Presence, Update} = is_online2(PhoneId), - Msgs = case catch p2p_readmsgs(Local, PhoneId) of + Msgs = case catch p2p_readmsgs(Roster, C) of {'EXIT', {Err, _}} -> n2o:error(?MODULE,"Catch:~p~n",[n2o:stack_trace(error, Err)]), - throw({error, {user, Local, PhoneId}}); - Ms -> Ms + throw({error, {user, LocalPhoneId, PhoneId}}); + Ms -> Ms end, C#'Contact'{ presence = Presence, - update = Update, + update = Update, services = get_services(phone(PhoneId)), - unread = Unread, + unread = Unread, last_msg = LastMsg, - reader = Msgs + reader = Msgs }; user(#'Roster'{id = RosterId, userlist = Users, phone = Phone}=R, UserId, W, LastSync) -> case lists:keyfind(UserId, #'Contact'.phone_id, Users) of @@ -2056,8 +2053,9 @@ daystime_to_ms({D, Time}) -> (D * 86400 + calendar:time_to_seconds(Time)) * 1000 timestamp_to_datetime(TimeStamp) -> UnixEpochGS = calendar:datetime_to_gregorian_seconds({{1970, 1, 1}, {0, 0, 0}}), GregorianSeconds = (TimeStamp div 1000) + UnixEpochGS, + Ms = TimeStamp rem 1000, {{Year, Month, Day}, {Hour, Minute, Second}} = calendar:gregorian_seconds_to_datetime(GregorianSeconds), - lists:flatten(io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w", [Year, Month, Day, Hour, Minute, Second])). + lists:flatten(io_lib:format("~4..0w-~2..0w-~2..0wT~2..0w:~2..0w:~2..0w.~3..0w", [Year, Month, Day, Hour, Minute, Second, Ms])). mk2(I) when I < 10 -> [$0 | integer_to_list(I)]; mk2(I) -> integer_to_list(I). diff --git a/apps/roster/src/roster_db.erl b/apps/roster/src/roster_db.erl index 76d26189ceb95b352a5d5de571d48b8ec23f661d..6c4e33557df54c858d904027627e795004083ee8 100644 --- a/apps/roster/src/roster_db.erl +++ b/apps/roster/src/roster_db.erl @@ -1010,8 +1010,14 @@ add_linkage([{PhoneMiss, PhoneHas} | Rest], Acc1, Acc2) -> end. read_link(FileName) -> - {ok, Text} = file:read_file("/home/nynja/depot/server/" ++ FileName), - read_link(string:split(Text, ";", all), []). + case file:read_file(FileName) of + {ok, Text} -> + read_link(string:split(Text, ";", all), []); + {error, _} = Reason -> + roster:info(?MODULE, "read_link file not read ~p", [Reason]), + [] + end. + read_link([], Acc) -> Acc; read_link([Link | Rest], Acc) -> diff --git a/rebar.config b/rebar.config index 71df83a2c3648303c5f5123106344f20e0562994..dfc1848f47a406f57b8f584a14f7a04811477925 100644 --- a/rebar.config +++ b/rebar.config @@ -6,9 +6,9 @@ {active, ".*", {git, "git://github.com/synrc/active",{tag,"master"}}}, {esockd, ".*", {git, "https://github.com/voxoz/esockd",{ref, "a80634b961c315ffe5f020d73236473b53ae5dc9"}}}, {bpe, ".*", {git, "git://github.com/synrc/bpe", {tag,"4.4"}}}, - {emqttd, ".*", {git, "git://github.com/synrc/emqttd",{tag,"master"}}}, + {emqttd, ".*", {git, "git://github.com/NYNJA-MC/emqttd",{tag,"master"}}}, {n2o, ".*", {git, "git://github.com/synrc/n2o", {tag,"6.4"}}}, - {emqttc, ".*", {git, "git://github.com/voxoz/emqttc",{tag,"master"}}}, + {emqttc, ".*", {git, "git://github.com/NYNJA-MC/emqttc",{tag,"master"}}}, {rest, ".*", {git, "git://github.com/synrc/rest",{tag,"5.10"}}}, {gen_smtp, ".*", {git, "git://github.com/voxoz/gen_smtp",{tag,"master"}}}, {emq_dashboard, ".*", {git, "https://github.com/synrc/emq_dashboard",{tag,"master"}}},