diff --git a/apps/roster/include/roster.hrl b/apps/roster/include/roster.hrl index 851dd76a08c63112598fc73beb1f4ca61293613f..337faa5e3e1099f1691da18eba6ff76c1a5c98d0 100644 --- a/apps/roster/include/roster.hrl +++ b/apps/roster/include/roster.hrl @@ -2,10 +2,9 @@ -define(ROSTER_HRL, true). -include_lib("kvs/include/feed.hrl"). -include_lib("kvs/include/kvs.hrl"). --include_lib("bpe/include/bpe.hrl"). --define(SYS_BPE_CLIENT, <<"sys_bpe">>). --define(SYS_REST_CLIENT, <<"sys_rest">>). +-define(SYS_SCHED_CLIENT, <<"sys_sched">>). +-define(SYS_REST_CLIENT, <<"sys_rest">>). -define(HIST_LIMIT, 1000). -define(MAX_UNREAD, 100). @@ -265,10 +264,10 @@ prev = [] :: [] | integer(), next = [] :: [] | integer(), context = [] :: [] | integer() | binary(), - proc = [] :: [] | integer() | #process{}, + proc = [] :: [] | integer() | term(), time = [] :: [] | integer(), data = [] :: [] | binary() | list(term() | #'Message'{}), - events = [] :: [] | list(#messageEvent{}), + events = [] :: [] | list(term()), settings = [] :: [] | list(#'Feature'{}), status = [] :: [] | init | update | delete | pending | stop | complete | restart}). diff --git a/apps/roster/src/processes/job.erl b/apps/roster/src/processes/job.erl deleted file mode 100644 index aa83e085ede4a7f9ed965675deaa37a4068de6e6..0000000000000000000000000000000000000000 --- a/apps/roster/src/processes/job.erl +++ /dev/null @@ -1,137 +0,0 @@ --module(job). -%-include_lib("kvs/include/feed.hrl"). --include("roster.hrl"). --include_lib("kvs/include/user.hrl"). --include_lib("kernel/include/logger.hrl"). --compile(export_all). - --define(QUANT, 500). - -def() -> job_process:definition(). -def(J) -> job_process:definition(J). - - -action({request,'Init'}, Proc) -> - ?LOG_INFO("Action Init"), - io:format("Action Init~n"), - {reply,Proc}; - -%%action({request,'Action'}, Proc) -> -%% Sending = bpe:doc({'Proc'},Proc), -%% io:format("Action start~n"), -%% case is_tuple(Sending ) of -%% true -> {reply,'Process',Proc}; -%% false -> {reply,'Update',Proc} end; - -action({request,'Stop'}, Proc) -> - ?LOG_INFO("Stop Process"), - io:format("Stop Process~n"), - {reply,'Action',Proc}; - -action({request,'Timeout'}, #process{id=_Id}=Proc) -> - ?LOG_INFO("Timeout Process"), - io:format("Timeout Process~n"), - %#process{options=Opts}=bpe:load(Id), - case bpe:doc(#'Schedule'{},Proc) of - #'Schedule'{state=TRef} -> timer:cancel(TRef); - _ -> skip end, - % TRef= proplists:get_value(timer,Opts,undefined), - %, - {reply,'Action',Proc}; - - -%%action({event,Name,Payload},#process{id=Id, options=Opts}=Proc)-> -%% io:format("Event Process~n"), -%% {reply,Proc}; - -action({request,'Action'}, #process{}=Proc) -> - ?LOG_INFO("Action Process"), - io:format("Action Process ~n"), - %C= proplists:get_value(send_pid,Opts,undefined), - CTimeout= case lists:keytake(timeoutEvent,1,bpe:events(Proc)) of - {value,Event,_} -> roster:daystime_to_ms(element(#timeoutEvent.timeout,Event)); - false -> 1000 end, - %job_process:get_proc(Id,pos), - case bpe:doc(#'Schedule'{},Proc) of - #'Schedule'{data = <<"stop">>} -> {reply,'Stop',Proc}; - #'Schedule'{id=Time, data = []} -> catch n2o_async:pid(system,roster_bpe) ! {next, Proc, Time}, - {reply,'Stop',Proc}; - #'Schedule'{id=Time}=Sh when Time/=[] -> Timeout=Time-roster:now_msec(), - case Timeout-CTimeout of - T when T =<3*?QUANT -> %io:format("Pub ~p, ~p ~n", [Id,Timeout]), - catch n2o_async:pid(system,roster_bpe) ! {publish, Proc}, - {reply,'Stop',Proc}; - _T -> catch n2o_async:pid(system,roster_bpe) ! {timeout, Sh, Timeout}, - {reply,'Timeout',Proc} end; - _ -> {reply,'Stop',Proc} - end; - -action({request,'Final'}, #process{}=Proc) -> - io:format(" Final~n"), - %roster:send_event(C, <<>>, <<>>, #'Job'{id=Id, status=delete}), - %n2o_async:pid(system,roster_bpe) ! #'Job'{id=Id, status=delete}, - n2o_async:pid(system,roster_bpe) ! {clean, Proc#process{}}, - {reply,Proc}. - - -worker(#process{id=Id}=P) -> - case kvs:get(feed,process) of - {ok,Feed} when Feed#feed.top =:= Id -> - case bpe:hist(Id) of - [H|_] ->%kvs:info(?MODULE,"Worker Start: ~p~n",[Id]), - worker_do(calendar:time_difference(H#hist.time,calendar:local_time()),P); - __ -> skip end; - __ -> skip end - . - -worker_do({Days,_Time},_P) when Days >= 14 -> skip; -worker_do({_Days,_Time},#process{id=Id}=P) when P#process.task =:= 'Action' -> -% catch n2o_async:pid(system,roster_bpe) ! {restart, P}, - bpe:start(P, []), - %bpe:complite(Id), - kvs:info(?MODULE,"BPE Start: ~p~n",[Id]); -worker_do({_Days,_Time},#process{id=Id}=P) when P#process.task =:= 'Stop' -> - bpe:start(P, []), - % bpe:complite(Id), - io:format("BPE Start: ~p~n",[Id]); -worker_do({_Days,_Time},#process{id=Id}=P) when P#process.task =:= 'Init' -> - bpe:start(P, []), -% bpe:complite(Id), - kvs:info(?MODULE,"BPE Start: ~p~n",[Id]); -%%worker_do({Days,Time},P) when P#process.task =:= 'Final' -> kvs:info(?MODULE,"BPE Start: ~p~n",[bpe:start(P,[])]); -%%worker_do({Days,Time},P) when P#process.task =:= 'FirstDelay' -> kvs:info(?MODULE,"BPE Start: ~p~n",[bpe:start(P,[])]); -worker_do({_Days,_Time},#process{id=Id}=P) when P#process.task =:= 'Timeout' -> - bpe:start(P, []), - kvs:info(?MODULE,"BPE Start: ~p~n",[Id]); -% bpe:complite(Id); - -worker_do({_Days,_Time},P) ->kvs:info(?MODULE,"BPE Start: ~p~n",[P]). %skip. - - -next(Fun,R,Current,Acc) -> - case mnesia:dirty_next(R, Current) of - '$end_of_table' -> - Acc; - Next -> - [S] = mnesia:dirty_read({R, Next}), - case Fun(S) of - true->next(Fun,R,Next,[S|Acc]); - false->Acc - end - end. - -next(R,Current) -> - case mnesia:dirty_next(R, Current) of - '$end_of_table' -> - []; - Next -> - [S] = mnesia:dirty_read({R, Next}), - S - end. - -clear_schedule()-> [kvs:delete('Schedule', Time) || #'Schedule'{id=Time} <- kvs:all('Schedule'), Time < roster:now_msec()]. - -clear_hist(#process{id=Id})-> case bpe:hist(Id) of - [_|T] -> [kvs:remove(hist,I) || #'hist'{id=I} <- T]; - __ -> skip end. - diff --git a/apps/roster/src/processes/job_process.erl b/apps/roster/src/processes/job_process.erl deleted file mode 100644 index 653afbf4708aaee8a459244db628cfa417b18e17..0000000000000000000000000000000000000000 --- a/apps/roster/src/processes/job_process.erl +++ /dev/null @@ -1,90 +0,0 @@ --module(job_process). --include("roster.hrl"). --compile(export_all). - -definition() -> - - #process { name = 'Schedule', - - flows = [ - #sequenceFlow{source='Init', target='Action'}, - #sequenceFlow{source='Action', target='Action'}, - #sequenceFlow{source='Action', target='Stop'}, - #sequenceFlow{source='Stop', target='Action'}, - #sequenceFlow{source='Action', target='End'}, - % #sequenceFlow{source='Delay', target='Process'}, - #sequenceFlow{source='End' } - ], - - tasks = [ - #userTask { name='Init', module = job }, - #serviceTask { name='Action', module = job }, - #userTask { name='Stop', module = job }, - %#serviceTask { name='Process', module = job }, - % #serviceTask { name='Delay', module = job }, - #endEvent { name='End'} - ], - - beginEvent = 'Init', - endEvent = 'End', - events = [ - #boundaryEvent{ name = '*', timeout={0,{0,30,0}} }, - #timeoutEvent{name='Action', timeout={0,{0,0,1}}, module=job} - ] - }. - -definition(#'Schedule'{id=T, proc= <<"publish">>}) -> - - #process { name = 'Schedule', - - flows = [ - #sequenceFlow{source='Init', target='Action'}, - #sequenceFlow{source='Action', target='Timeout'}, - #sequenceFlow{source='Action', target='Stop'}, - #sequenceFlow{source='Action', target='Final'}, - #sequenceFlow{source='Timeout', target='Action'}, - #sequenceFlow{source='Timeout', target='Stop'}, - #sequenceFlow{source='Stop', target='Action'}, - #sequenceFlow{source='Stop', target='Final'}, - #sequenceFlow{source='Final', target='End'} - ], - - tasks = [ - #userTask { name='Init', module = job }, - #serviceTask { name='Action', module = job }, - #userTask { name='Stop', module = job }, - #userTask { name='Timeout', module = job }, - #serviceTask { name='Final', module = job }, - #endEvent { name='End' } - ], - - beginEvent = 'Init', - endEvent = 'End', - events = [ - #boundaryEvent{ name = '*', timeout={100,{0,0,0}} }, - #timeoutEvent{name='Action', timeout=T, module=job}, -% #timeoutEvent{name='Stop', timeout={0,{0,1,0}}, module=job}, - #timeoutEvent{name='Final', timeout={0,{0,1,0}}, module=job} - - ] - }. - - -update_event(Proc,Task,T)-> - %Proc=bpe:load(Id), - Events=lists:keystore(Task,#timeoutEvent.name,bpe:events(Proc),#timeoutEvent{name=Task, timeout=T, module=job }), - NewP=Proc#process{events = Events}, - kvs:put(NewP), NewP -. - -update_opt(#process{options = Opts}=P,Opt)-> - %Proc=bpe:load(Id), - Os=lists:keystore(element(1,Opt),1,Opts,Opt), - NewP=P#process{options = Os}, - kvs:put(NewP), NewP -. - -get_proc(Proc,Key) -> - #process{options = Opts}=bpe:load(Proc), - case proplists:get_value(Key,Opts,undefined) of T when is_integer(T) -> T; _ -> 0 end - . diff --git a/apps/roster/src/protocol/micro_auth.erl b/apps/roster/src/protocol/micro_auth.erl index a0bb2b30dc4dc269cd3aaecbfa07d3a618ac64bb..e0d41956cde05f1d5d469cd04899f2ede9191697 100644 --- a/apps/roster/src/protocol/micro_auth.erl +++ b/apps/roster/src/protocol/micro_auth.erl @@ -3,7 +3,6 @@ -include("roster.hrl"). -include("micro.hrl"). -include("static_auth.hrl"). -%-include_lib("bpe/include/bpe.hrl"). -include_lib("emqttd/include/emqttd.hrl"). -include_lib("n2o/include/n2o.hrl"). -behaviour(emqttd_auth_mod). diff --git a/apps/roster/src/protocol/roster_auth.erl b/apps/roster/src/protocol/roster_auth.erl index 1b66fbd249009d8d846d4d245b8ba404258a0116..054496ac64f0b5bf874506a453e0066781774a50 100644 --- a/apps/roster/src/protocol/roster_auth.erl +++ b/apps/roster/src/protocol/roster_auth.erl @@ -2,7 +2,6 @@ -include_lib("kernel/include/logger.hrl"). -include("roster.hrl"). -include("static_auth.hrl"). -%-include_lib("bpe/include/bpe.hrl"). -include_lib("emqttd/include/emqttd.hrl"). -include_lib("n2o/include/n2o.hrl"). -behaviour(emqttd_auth_mod). diff --git a/apps/roster/src/protocol/roster_bpe.erl b/apps/roster/src/protocol/roster_bpe.erl deleted file mode 100644 index 23052e1d5f5252b67fcfad19a53dc880f4b203a9..0000000000000000000000000000000000000000 --- a/apps/roster/src/protocol/roster_bpe.erl +++ /dev/null @@ -1,408 +0,0 @@ --module(roster_bpe). --include_lib("kernel/include/logger.hrl"). --include_lib("roster.hrl"). -%%-include_lib("kvs/include/feed.hrl"). -%%-include_lib("kvs/include/kvs.hrl"). --include_lib("emqttd/include/emqttd.hrl"). --include_lib("n2o/include/n2o.hrl"). --compile({parse_transform, bert_javascript}). --compile(export_all). - --define(QUANT, 500). --define(JOBS_LIMIT, 300). --define(JOBS_MS_LIMIT, 500). - - -start() -> n2o_async:start(#handler{module = ?MODULE, class = system, group = roster, name = ?MODULE, state = []}). - -info(#'History'{roster_id = Roster, feed = Feed, size = N, entity_id = MId, status = get} = Data, Req, - #cx{params = ClientId} = State) -> - ?ROSTER_LOG_REQ('History', get, ClientId, "Feed=~p, N=~p, MId=~p", [Feed, N, MId]), - PhoneId = roster:phone_id(ClientId), - %FId = roster:roster_id(Roster), - %N0=case N of Size when N < -8 -> -7; _ when MId=:=0 -> -7; Size when N > 8 -> 7; Size -> Size end, - {R, Writer} = case Feed of - {act, <<"publish">>, PhoneId} -> - case kvs:get(writer, Feed) of - {ok, #writer{first = #'Job'{context = RID}} = W} -> {RID, W}; - _ -> {0, 0} end; - _ -> ?LOG_INFO("~p:Feed/error:~p", [Roster, Feed]), {0, -1} - end, - D = case kvs:get(reader, R) of - {ok, #reader{cache = []} = Rdr} when MId == 0 -> - #writer{count = Count, cache = #'Job'{}} = Writer, - case Count of - 0 -> - ?LOG_ERROR("There are not messages: ~p:~n", [Writer]), - <<>>; - _ -> - #reader{cache = {_, RId}} = kvs_stream:save(kvs_stream:top((Rdr#reader{dir = 0}))), - %{ok,#'Job'{prev = ID }}=kvs:get('Job',RId), - Js = roster:exclude({'Job', RId}, #'Job'.status, -?JOBS_LIMIT, 0, []), - Data#'History'{data = Js} %[ case S of complete-> J#'Job'{data=[]}; _ -> J end || J=#'Job'{status=S}<-Js]} - end; - {ok, #reader{pos =_Pos} =_Reader} when N == [], MId == 0 -> - #writer{count =_Top, cache = #'Job'{id = Id}} = Writer, -%% #reader{cache={_,RId} }=kvs_stream:drop(Reader#reader{args = Pos, dir=1}), -%% #reader{cache={_,RId} }=RR=kvs_stream:bot(Reader), - % {ok,#'Job'{prev = ID }}=kvs:get('Job',RId), - Js = roster:exclude({'Job', Id}, #'Job'.status, -?JOBS_LIMIT, 0, []), - Data#'History'{data = Js}; - {ok, #reader{}} when N == [], MId =/= 0 -> - #writer{} = Writer, - %{ok,#'Job'{prev = ID }}=kvs:get('Job',MId), - Js = roster:exclude({'Job', MId}, #'Job'.status, ?JOBS_LIMIT, 0, []), - NewP = length(Js), - Data#'History'{data = Js, size = NewP}; - {ok, #reader{}} when N < 0, MId =/= 0 -> - {ok, #'Job'{next = ID}} = kvs:get('Job', MId), - Js = roster:exclude({'Job', ID}, #'Job'.status, N, 0, []), - NewP = length(Js), - Data#'History'{data = Js, size = NewP}; - {ok, #reader{}} when MId =/= 0 -> - {ok, #'Job'{prev = ID}} = kvs:get('Job', MId), - Js = roster:exclude({'Job', ID}, #'Job'.status, N, 0, []), - NewP = length(Js), - Data#'History'{data = Js, size = NewP}; - _ when Writer == 0 -> Data#'History'{data = []}; - _ -> roster_channel_helper:error_response_400(Data) -%% #io{code = #error{code = invalid_data}} - end, - {reply, {bert, D}, Req, State}; - -info(#'Job'{id = [], feed_id = {act, <<"publish">>, PhoneId} = Feed, time = [], data = [#'Message'{} | _] = Msgs, status = init} = RequestData, R, - #cx{params = ClientId} = S) -> - ?ROSTER_LOG_REQ('Job', init, ClientId, "Feed=~p", [Feed]), - %J = J0#'Job'{id = Id = kvs:next_id('Job', 1)}, - Replay = case roster:phone_id(ClientId) of - PhoneId -> %roster:restart_module(Pid,?MODULE), - n2o_async:pid(system, ?MODULE) ! {send, Msgs}, <<>>; - _ -> roster_channel_helper:error_response_400(RequestData) -%% #io{code = #error{code = invalid_data}} - end, - {reply, {bert, Replay}, R, S}; - -info(#'Job'{id = [], feed_id = {act, <<"roster">>,PhoneId} = Feed, time = [], data = Actions, status = init} = RequestData, R, - #cx{params = ClientId} = S) -> - ?ROSTER_LOG_REQ('Job', init, ClientId, "Feed=~p", [Feed]), - %J = J0#'Job'{id = Id = kvs:next_id('Job', 1)}, - Replay = case roster:phone_id(ClientId) of - PhoneId ->[ action(A, R, S) || A <-Actions]; - _ -> roster_channel_helper:error_response_400(RequestData) -%% #io{code = #error{code = invalid_data}} - end, - {reply, {bert, Replay}, R, S}; - - -info(#'Job'{id = [], feed_id = {act, <<"publish">>, PhoneId} = Feed, time = Time, data = [#'Message'{} | _] = D0, status = init} = J0, R, - #cx{params = ClientId} = S) -> - %J = J0#'Job'{id = Id = kvs:next_id('Job', 1)}, - Current = roster:now_msec(), - ?ROSTER_LOG_REQ('Job', init, ClientId, "Feed=~p, Time=~p", [Feed, roster:msToUT(Time)]), -% Size=length(D0), - %{D,T}=lists:split( case Size > ?JOBS_MS_LIMIT of true -> ?JOBS_MS_LIMIT; false -> Size end, D0), - D = D0, - JobDelay = application:get_env(roster, job_delay, 1), - Replay = case roster:phone_id(ClientId) of - PhoneId when Time > Current + JobDelay * 1000 -> %% minimum Timeout - {#'Job'{} = J, Context} = - case kvs_stream:load_writer(Feed) of - {error, not_found} -> kvs_stream:save(#writer{id = Feed}), - NewR = (kvs_stream:save(kvs_stream:reader(Feed)))#reader.id, - {J0#'Job'{time = round(Time / ?QUANT) * ?QUANT, context = NewR}, {0, 0}}; - _ -> - {J0#'Job'{time = round(Time / ?QUANT) * ?QUANT}, {0, 0}} end, - catch n2o_async:pid(system, ?MODULE) ! {update, J#'Job'{container = chain, data = D, status = pending}, Context, S}, - <<>>; - _ -> roster_channel_helper:error_response_400(J0) -%% #io{code = #error{code = invalid_data}} - end, - {reply, {bert, Replay}, R, S}; - -info(#'Job'{id = Id, time = T0, feed_id = {act, <<"publish">>, PhoneId} = Feed, data = D, settings = Stt, status = update} = RequestData, R, - #cx{params = ClientId, client_pid = C} = S) -> - ?ROSTER_LOG_REQ('Job', update, ClientId, "Feed=~p, Time=~p", [Feed, roster:msToUT(T0)]), - Current = roster:now_msec(), - Replay = case roster:phone_id(ClientId) of - PhoneId when T0 > Current -> - %{NewJ=#'Job'{id=Id},Context}= - Time = round(T0 / ?QUANT) * ?QUANT, - case kvs:get('Job', Id) of - {ok, #'Job'{time = OldTime, data = Acts, settings = Stt0, status = pending} = J} -> - NewJ = J#'Job'{time = Time, data = NData = case D of [] -> - Acts; [#'Message'{} | _] = NewD -> NewD; _ -> 0 end, - settings = case Stt of [] -> Stt0; _ -> Stt end}, - case kvs:get('Schedule', OldTime) of - {ok, #'Schedule'{id = SID, data = Data} = Sh} when abs(Time - OldTime) > 2 * ?QUANT, NData /= 0 -> - case Data--[{'Job', Id}] of [] -> kvs:delete('Schedule', SID); ND -> - kvs:put(Sh#'Schedule'{data = ND}) end, - n2o_async:pid(system, ?MODULE) ! {update, NewJ, {OldTime, 0}, S}, <<>>; - {ok, #'Schedule'{}} when NData /= 0 -> - kvs:put(NewJ), - roster:send_ses(C, roster:phone(PhoneId), NewJ#'Job'{status = update}), <<>>; - _ when NData /= 0 -> n2o_async:pid(system, ?MODULE) ! {update, NewJ, {OldTime, 0}, S}, - <<>>; - __________ -> roster_channel_helper:error_response_400(RequestData) -%% #io{code = #error{code = invalid_data}} - end; - _ -> roster_channel_helper:error_response_400(RequestData) -%% #io{code = #error{code = invalid_data}} - end; - _ -> roster_channel_helper:error_response_400(RequestData) -%% #io{code = #error{code = invalid_data}} - end, - {reply, {bert, Replay}, R, S}; - - -info(#'Job'{proc = Proc, data = <<"stop">>, status = update}, R, #cx{params = ClientId} = S) -> - ?ROSTER_LOG_REQ('Job', update, ClientId, "stop"), - Docs = #'act'{data = <<"stop">>}, - {reply, {bert, {io, bpe:amend(Proc, Docs, noflow), <<>>}}, R, S}; - -info(#'Job'{proc = Proc, data = D, time = Time, events = [], status = restart}, R, #cx{params = ClientId} = S) -> - ?ROSTER_LOG_REQ('Job', restart, ClientId, "Data=~p, Time=~p", [D, roster:msToUT(Time)]), - P = bpe:load(Proc), -% bpe:find_pid(Proc) ! {'DOWN', <<>>, <<>>, <<>>, <<>>}, - T = calendar:time_difference(erlang:universaltime(), roster:msToUT(Time)), - Docs = #act{data = D}, - NewP = job_process:update_event(P, 'act', T), bpe:start(NewP, []), - {reply, {bert, {io, bpe:amend(Proc, Docs, noflow), <<>>}}, R, S}; - - -info(#'Job'{id = Id, feed_id = {act, <<"publish">>, UID} = Feed, status = delete} = RequestData, R, #cx{params = ClientId, client_pid = C} = S) -> - ?ROSTER_LOG_REQ('Job', delete, ClientId, "Feed=~p", [Feed]), - PhoneId = roster:phone_id(ClientId), - D = case kvs:get('Job', Id) of - {ok, #'Job'{feed_id = {act, <<"publish">>, PhoneId}} = J} when UID == PhoneId -> - Job = J#'Job'{status = delete}, - case kvs_stream:load_writer(Feed) of - #writer{cache = #'Job'{id = JID}} = W when JID == Id -> - kvs_stream:save(W#writer{cache = Job}); - _ -> skip end, - kvs:put(Job), - roster:send_ses(C, roster:phone(PhoneId), Job), <<>>; - _ -> roster_channel_helper:error_response_400(RequestData) -%% #io{code = #error{code = invalid_data}} - end, - %timer:apply_after(Time, bpe, cleanup, [Id]), - {reply, {bert, D}, R, S}; - -info(#'Job'{feed_id = Feed, status = Status} = RequestData, R, #cx{params = ClientId} = S) -> - ?ROSTER_LOG_REQ('Job', Status, ClientId, "Unknown request, Feed=~p", [Feed]), - {reply, {bert, roster_channel_helper:error_response_400(RequestData) -%% #io{code = #error{code = invalid_data}} - }, R, S}; - -info(M, R, S) -> ?LOG_INFO("UNKNOWN:~w", [M]), {unknown, M, R, S}. - -proc(init, #handler{name = roster_bpe} = Async) -> - {ok, C} = emqttc:start_link([{client_id, <<"sys_bpe">>}, - {logger, {error_logger, error}}, - {reconnect, 5}]), - Table = process, - Proc = case kvs:get(feed, Table) of - {ok, #feed{top = Top} =_Feed} -> -%% ProcIDs=kvs:fold(fun(#process{name=Name}=A,Acc) -> -%% case Name of 'Shedule' -> [element(2,A)|Acc]; _ -> Acc end end,[], -%% Table, Feed#feed.top,undefined, #iterator.prev,#kvs{mod=store_mnesia}), -%% [H|T]=lists:reverse(ProcIDs), -% -% P1=job_process:update_event(P, 'Action', {0, {0, 0, 1}}), - try case bpe:find_pid(Top) of - Pid when is_pid(Pid) -> - case process_info(Pid) of - undefined -> restart_bpe(bpe:load(Top), C); - _ -> Top - end, - timer:sleep(300), - bpe:complete(Top); - _ -> skip - end - catch - _:_ -> skip end, - Top; - _ -> - {ok, Id0} = bpe:start(job:def(#'Schedule'{id = {0, {0, 0, 1}}, proc = <<"publish">>}), []), - bpe:complete(Id0), Id0 end, - % bpe:amend(Proc,{send_pid, C},noflow), - ?LOG_INFO("ASYNC BPE started: ~p; ~p", [C, Proc]), - % Proc is transfered through state - {ok, Async#handler{state = {C, Proc}, seq = 0}}; - -proc({update, #'Job'{id = Id0, time = Time, feed_id = {act, <<"publish">>, PhoneId} = Feed} = J, {OldTime, Limit}, - #cx{} = CX}, #handler{state = {C, Proc}} = H) when Limit < 8 -> - roster:restart_module(C,roster_bpe), - P = bpe:load(Proc), - {NewJ0, Id} = case Id0 of [] -> NId = kvs:next_id('Job', 1), {J#'Job'{id = NId, proc = Proc}, NId}; _ -> - {J#'Job'{proc = Proc}, Id0} end, - {T, NewD, Par} = case bpe:doc(#'Schedule'{}, P) of - #'Schedule'{id = Current, data = Jobs} =_Sh when abs(Time - Current) =< 2 * ?QUANT -> - %bpe:amend(Proc, NewSh = Sh#'Schedule'{data = lists:usort(Jobs++[{'Job', Id}])},noflow), - {Current, Jobs ++ [{'Job', Id}], noflow}; - #'Schedule'{id = Current, data = Jobs} when Time - Current < -2 * ?QUANT orelse Jobs == <<"stop">> -> - %bpe:amend(Proc, NewSh = #'Schedule'{id = Time, proc =Proc, data = [{'Job', Id}]}), - {Time, [{'Job', Id}], []}; -%% #'Schedule'{id = Current, data = Jobs} = Sh when Time - Current < -2*?QUANT -> -%% %bpe:amend(Proc, NewSh = #'Schedule'{id = Time, proc =Proc, data = [{'Job', Id}]}), -%% {Time,[{'Job', Id}],[]}; - #'Schedule'{id = Current, data = Jobs} =_Sh when abs(OldTime - Current) =< 2 * ?QUANT -> - %bpe:amend(Proc, NewSh = Sh#'Schedule'{data =Jobs-- [{'Job', Id}]}), - {Time, Jobs-- [{'Job', Id}], []}; - [] -> - %bpe:amend(Proc, NewSh = #'Schedule'{id = Time, proc =Proc, data = [{'Job', Id}]}), - {Time, [{'Job', Id}], []}; - _ -> - {Time, [], []} - end, - NewJ = NewJ0#'Job'{time = T, proc = Proc}, - ?LOG_INFO("Job started: ~p;~p", [Id, Limit]), - case kvs_stream:load_writer(Feed) of - #writer{} = W when OldTime == 0 -> - #writer{first = #'Job'{}} = kvs_stream:save(kvs_stream:add(W#writer{args = NewJ})); - #writer{cache = #'Job'{id = JID}} = W when JID == Id -> kvs_stream:save(W#writer{cache = NewJ}), - kvs:put(NewJ); - _ -> kvs:put(NewJ) - end, -%% case kvs:get('Schedule', T) of -%% {ok, #'Schedule'{id = T, data = Data} = NSh} when abs(Time - T) =< ?QUANT-> -%% %bpe:amend(PID, NewSh = #'Shedule'{id = T, proc =PID, data = [{'Job', Id}]++Data}), -%% kvs:put(NSh#'Schedule'{proc =Proc, data = lists:usort(Data++[{'Job', Id}])}); -%% _ -> kvs:put(#'Schedule'{id = Time, proc =Proc, data = [{'Job', Id}]}) -%% end, - set_schedule(kvs:get('Schedule', T), Time, NewJ), - case NewD of - [] -> - roster:send_ses(C, roster:phone(PhoneId), case OldTime of 0 -> NewJ; _ -> NewJ#'Job'{status = update} end), - {reply, [], H#handler{state = {C, Proc}}}; - _ -> try amend(Proc, #'Schedule'{id = T, proc = Proc, data = NewD}, Par), - roster:send_ses(C, roster:phone(PhoneId), case OldTime of 0 -> NewJ; _ -> NewJ#'Job'{status = update} end), - {reply, [], H#handler{state = {C, Proc}}} - catch Err:Rea -> ?LOG_ERROR("Catch:~p~n", [n2o:stack_trace(Err, Rea)]), - restart_bpe(P, C), - timer:sleep(300), - bpe:complete(Proc), - proc({update, NewJ0, {OldTime, Limit + 1}, CX}, #handler{state = {C, Proc}} = H) - end - end; - -proc({update, #'Job'{feed_id = {act, <<"publish">>,_PhoneId}}, {_OldTime,_Limit}, - #cx{} = _S}, #handler{state = {C, Proc}} = H) -> - ?LOG_INFO("BPE update limit ~p; ~p", [C, Proc]), - {reply, [], H}; - -proc({publish, #process{id = Id} = P}, #handler{state = {C, Proc}, seq = S} = H) -> - ?LOG_INFO("BPE PROC publish: ~p", [Id]), - roster:restart_module(C,roster_bpe), - %#act{data=Jobs}=bpe:doc(#act{},Proc), - %{ok, #'Schedule'{id=Tsh, data=Jobs}}=kvs:get('Schedule',Current), - Curr= case bpe:doc(#'Schedule'{}, P) of - #'Schedule'{id = Current, data = [{'Job',_I}|_]=Jobs} -> - Msgs = lists:flatten([begin case kvs:get('Job', I) of - {ok, #'Job'{feed_id = {act, <<"publish">>, PhoneId} = Feed, status = pending, data = Ms} = J} -> - NewJ = J#'Job'{data = [], status = complete}, - case kvs_stream:load_writer(Feed) of - #writer{cache = #'Job'{id = JID}} = W when JID == I -> - kvs_stream:save(W#writer{cache = NewJ}); - _ -> skip end, - kvs:put(NewJ), - roster:send_ses(C, roster:phone(PhoneId), NewJ), Ms; - _ -> [] end - end || {'Job', I} <- Jobs]), - [begin roster:send_event(C, <<"sys_bpe">>, <<>>, Msg, roster:get_vnode(<<"sys_bpe">>,Msg)), timer:sleep(1) end || #'Message'{}=Msg <- Msgs], - Current; - #'Schedule'{id = Current} -> Current; - _ -> roster:now_msec() end, - job:clear_hist(P), - proc({next, P, Curr}, #handler{state = {C, Proc}, seq = S} = H); - -proc({next, #process{id = Id}, Current}, #handler{state = {_C, _Proc}} = H) -> - %?LOG_INFO("BPE PROC next: ~p", [P]), - FirstS = try mnesia:dirty_first('Schedule') of '$end_of_table' -> Current; Key -> Key catch _ -> Current end, - Stop = #'Schedule'{id = roster:now_msec(), data = <<"stop">>}, - Docs = case job:next('Schedule', Current) of - [] -> [Stop]; - % kvs:put(Stop); - Doc when FirstS =:= Current -> [Doc]; - _ -> case kvs:get('Schedule', FirstS) of - {ok, #'Schedule'{} = Doc} -> [Doc]; - _ -> [Stop] - end - end, - try bpe:amend(Id, Docs) catch Err:Rea -> ?LOG_ERROR("Catch:~p~n", [n2o:stack_trace(Err, Rea)]) end, - kvs:delete('Schedule', Current), - {reply, [], H}; - - -proc({restart, #'process'{} = P}, #handler{state = {C, Proc}} = H) -> - ?LOG_INFO("BPE PROC restarted", []), -% P1=job_process:update_event(P, 'Action', {0, {0, 0, 1}}), - Proc = restart_bpe(P, C), - bpe:complete(Proc), - {reply, [], #handler{state = {C, Proc}} = H}; - -proc({timeout, #'Schedule'{id =_Current} = Sh, T}, #handler{state = {_C, Proc}} = H) -> - ?LOG_INFO("BPE PROC Timeout: ~p", [T]), - %{value,#boundaryEvent{timeout = BT},_}=lists:keytake('*',#boundaryEvent.name,bpe:events(P)), - %{ok, TRef} = timer:apply_after(T, bpe, complete, [Proc]), - {ok, TRef} = timer:apply_after(T, roster_bpe, publish, [Proc]), - try bpe:amend(Proc, Sh#'Schedule'{state = TRef}, noflow) catch Err:Rea -> - ?LOG_ERROR("Catch:~p~n", [n2o:stack_trace(Err, Rea)]) end, kvs:put(Sh#'Schedule'{state = TRef}), - %job_process:update_opt(bpe:load(Proc),{timer,TRef}), - {reply, [], H}; - - -proc({clean, #'process'{id = Id} = P}, #handler{state = {_C,_Proc}} = H) -> - ?LOG_INFO("BPE PROC clean", []), - bpe:complete(Id), - {value, #boundaryEvent{timeout = BT}, _} = lists:keytake('*', #boundaryEvent.name, bpe:events(P)), - %bpe:find_pid(Id) ! {'DOWN', <<>>, <<>>, <<>>, <<>>}, - timer:apply_after(roster:daystimeToms(BT) + 5000, bpe, cleanup, [Id]), - {reply, [], H}; - -proc({send, Msgs}, #handler{state = {C, Proc}} = H) -> - ?LOG_INFO("BPE PROC send", []), - roster:restart_module(C,roster_bpe), -% [case kvs:get(mqtt_session, roster:get_vnode(Msg)) of {ok,#mqtt_session{sess_pid = Pid}} -> n2o_async:send(Pid,{publish, , Msg}); _ -> skip end || Msg = #'Message'{} <- Msgs], - [begin roster:send_event(C, <<"sys_bpe">>, <<>>, Msg, roster:get_vnode(<<"sys_bpe">>,Msg)),timer:sleep(5) end || Msg = #'Message'{} <- Msgs], - {reply, [], H#handler{state = {C, Proc}}}; - - -proc({mqttc, C, connected}, State = #handler{state = {C,_Proc}, seq = S}) -> {ok, State#handler{seq = S + 1}}; -proc({mqttc, _C, disconnected}, State) -> {ok, State}. - - -publish(Proc) -> n2o_async:pid(system, ?MODULE) ! {publish, bpe:load(Proc)}. - -restart_bpe(#process{id = Id} = P, _C) -> - try case bpe:find_pid(Id) of - Pid when is_pid(Pid) -> case process_info(Pid) of - undefined ->%Pid ! {'DOWN', <<>>, <<>>, <<>>, <<>>}, - [supervisor:F(bpe_sup, Id) || F <- [terminate_child, delete_child]], - bpe:cache({process, Id}, undefined), - {ok, Pi} = bpe:start(P, []), Pi; - _ -> Id % job_process:update_opt(P,{send_pid, C}) - end; - _ -> {ok, Pi} = bpe:start(P, []), Pi - end - catch - _Err:_Rea -> {ok, ID} = bpe:start(P, []), ID - end. - -amend(ProcId, Form, []) -> bpe:amend(ProcId, Form); -amend(ProcId, Form, Par) -> bpe:amend(ProcId, Form, Par). - -set_schedule({ok, #'Schedule'{id = T, data = Data} = NSh}, Time, #'Job'{id = Id, proc = Proc}) when abs(Time - T) =< 2 * ?QUANT -> - kvs:put(NSh#'Schedule'{proc = Proc, data = lists:usort(Data ++ [{'Job', Id}])}); - -set_schedule(_, Time, #'Job'{id = Id, proc = Proc, time = T0} = J) -> - case kvs:get('Schedule', Time - ?QUANT) of - {ok, #'Schedule'{data = Data} = NSh} -> - kvs:put(NSh#'Schedule'{proc = Proc, data = lists:usort(Data ++ [{'Job', Id}])}); - _ when Time - ?QUANT == T0 + ?QUANT -> kvs:put(#'Schedule'{id = T0, proc = Proc, data = [{'Job', Id}]}); - _ -> set_schedule(kvs:get('Schedule', Time + ?QUANT), Time + 2 * ?QUANT, J#'Job'{}) - % - end. - - -action(#'History'{status=update}=Term,R,S)-> - roster_history:info(Term, R, S). diff --git a/apps/roster/src/protocol/roster_message.erl b/apps/roster/src/protocol/roster_message.erl index 7df9b0534d973ac71d720708abd49c0ac2c04a40..99119a529f893d498d40b867e306f11b23300db5 100644 --- a/apps/roster/src/protocol/roster_message.erl +++ b/apps/roster/src/protocol/roster_message.erl @@ -211,7 +211,6 @@ info(#'Message'{status = edit, id = Id, msg_id = ClMID, feed_id = Feed, from = F info(#'Message'{id = Id, msg_id = ClMID, feed_id = Feed, from = From0, seenby = Seen, status = delete}, Req, #cx{params = ClientId, client_pid = C, state=ack} = State) when is_integer(Id) -> ?ROSTER_LOG_REQ('Message', delete, ClientId, "ClientMsgId=~s, MsgId=~p, Feed=~p", [ClMID, Id, Feed]), - %%TODO for security From= PhoneId = roster:phone_id(ClientId, From0), D = case kvs:get('Message', Id) of {ok, #'Message'{type=[sys|_]}} -> @@ -310,7 +309,7 @@ info(#'Message'{id = Id, msg_id = ClMID, feed_id = Feed, from = From0, seenby = info(#'Message'{status = update, type = [draft], feed_id = Feed0, from = From, to = To, files = File}=M, Req, #cx{params = ClientId, client_pid = C, state=ack} = State) -> - PhoneId = case ClientId of <<"sys_bpe">> -> From; <<"emqttd_", _/binary>> -> roster:phone_id(ClientId) end, + PhoneId = case ClientId of ?SYS_SCHED_CLIENT -> From; <<"emqttd_", _/binary>> -> roster:phone_id(ClientId) end, ?ROSTER_LOG_REQ('Message', update, ClientId, "Type=draft, Feed=~p, Files=~p", [Feed0, [PL || #'Desc'{payload = PL} <- File]]), {Feed,M1,D}=case File of @@ -410,7 +409,7 @@ info(#'Message'{status = update, id = Id, files = [#'Desc'{mime = <<"transcribe" info(#'Message'{status = update, id = Id, feed_id = Feed, from = From, files = [#'Desc'{id = ID, payload = Payload, data = Data, mime = DMime} = ND | _]}, Req, #cx{params = ClientId, client_pid = C, state=ack} = State) when is_integer(Id) -> - PhoneId = case ClientId of <<"sys_bpe">> -> From; <<"emqttd_", _/binary>> -> roster:phone_id(ClientId) end, + PhoneId = case ClientId of ?SYS_SCHED_CLIENT -> From; <<"emqttd_", _/binary>> -> roster:phone_id(ClientId) end, ?ROSTER_LOG_REQ('Message', update, ClientId, "Id=~p", [Id]), Lang = roster:get_data_val(?LANG_KEY, Data), IO = case kvs:get('Message', Id) of diff --git a/apps/roster/src/protocol/roster_profile.erl b/apps/roster/src/protocol/roster_profile.erl index f97a2e286b7c4dedf670c206a63522584e45390a..907a01d34c29a72bd9f049cd6bd0d477b41e8051 100644 --- a/apps/roster/src/protocol/roster_profile.erl +++ b/apps/roster/src/protocol/roster_profile.erl @@ -108,7 +108,7 @@ proc(init,#handler{name=?MODULE} = Async) -> {ok,Async}; proc({restart, M}, #handler{state = {_C, _Proc}} = H) -> - ?LOG_INFO("BPE PROC restarted", []), + ?LOG_INFO("PROC restarting: ~p", [M]), roster:restart_module(M), {reply, [], H}. diff --git a/apps/roster/src/protocol/roster_schedule.erl b/apps/roster/src/protocol/roster_schedule.erl new file mode 100644 index 0000000000000000000000000000000000000000..9ac200be836ec8df7768c945af8fa19e33a04022 --- /dev/null +++ b/apps/roster/src/protocol/roster_schedule.erl @@ -0,0 +1,232 @@ +%%% Description : A simple scheduling engine +-module(roster_schedule). + +-export([start/0, info/3, proc/2]). +-include_lib("kernel/include/logger.hrl"). +-include_lib("roster.hrl"). +-include_lib("n2o/include/n2o.hrl"). + +-record(worker, { pid + , job_id + , monitor_ref }). + +start() -> + n2o_async:start(#handler{ module = ?MODULE, class = system, group = roster, + name = ?MODULE, state = #{} }). + +%% roster_schedule handles the following requests: +%% +%% - #'History'{ status = get, feed = {act, <<"publish">>, PhoneId} } +%% Gets the job history for a user. +%% +%% - #'Job'{ status = init, feed_id = {act, <<"publish">>, PhoneId}, +%% data = list(#'Message'{}) } +%% Schedule messages to be sent at a later time. +%% +%% - #'Job'{ status = delete, id = JobId } +%% Cancel a scheduled job. + +info(#'History'{feed = {act, <<"publish">>, PhoneId} = Feed, size = N0, + entity_id = MId0, status = get} = History, Req, #cx{params = ClientId} = State) -> + ?ROSTER_LOG_REQ('History', get, ClientId, "Feed=~p, N=~p, MId=~p", [Feed, N0, MId0]), + Reply = + case PhoneId == roster:phone_id(ClientId) of + true -> job_history(History); + false -> #io{code = #error{code = permission_denied}} + end, + {reply, {bert, Reply}, Req, State}; + +info(#'Job'{id = [], feed_id = {act, <<"publish">>, PhoneId} = Feed, time = Time, + data = [#'Message'{} | _], status = init} = J0, Req, #cx{params = ClientId, client_pid = C} = S) -> + ?ROSTER_LOG_REQ('Job', init, ClientId, "Feed=~p, Time=~p", [Feed, Time]), + JId = kvs:next_id('Job', 1), + J1 = J0#'Job'{id = JId, status = pending}, + Writer = case kvs_stream:load_writer(Feed) of + {error, not_found} -> kvs_stream:save(#writer{id = Feed}); + W = #writer{} -> W + end, + %% kvs_stream:add will kvs:put(J1')... + kvs_stream:save(kvs_stream:add(Writer#writer{args = J1})), + roster:send_ses(C, roster:phone(PhoneId), J1), + kvs:put(#'Schedule'{id = {Time, JId}}), + {reply, {bert, <<>>}, Req, S}; + +info(#'Job'{id = JId, status = delete}, Req, #cx{params = ClientId, client_pid = C} = S) -> + PhoneId = roster:phone_id(ClientId), + R = mnesia:transaction(fun() -> + case mnesia:read('Job', JId, write) of + [Job = #'Job'{feed_id = {act, <<"publish">>, PhoneId}, time = TS, status = pending}] -> + Job1 = Job#'Job'{data = [{delete, os:system_time(millisecond)}], status = delete}, + mnesia:delete('Schedule', {TS, JId}, write), + update_job(Job1), + Job1; + [#'Job'{feed_id = {act, <<"publish">>, _PhoneId}, status = _Other}] -> + #io{code = #error{code = job_not_pending}}; + [#'Job'{}] -> + #io{code = #error{code = permission_denied}}; + _ -> + #io{code = #error{code = invalid_data}} + end end), + case R of + {atomic, J = #'Job'{feed_id = #act{data = PhoneId}}} -> + roster:send_ses(C, roster:phone(PhoneId), J), + {reply, {bert, <<>>}, Req, S}; + {atomic, Err} -> {reply, {bert, Err}, Req, S}; + {aborted, _} -> {reply, {bert, #io{code = #error{code = request_failed}}}, Req, S} + end; + +info(Msg, Req, #cx{params = ClientId} = State) -> + ?ROSTER_LOG_REQ('SEng', unknown, ClientId, "Msg=~p", [Msg]), + {reply, {bert, roster_channel_helper:error_response_400(Msg)}, Req, State}. + + +proc(init, #handler{name = ?MODULE} = Handler) -> + {ok, C} = emqttc:start_link([{client_id, ?SYS_SCHED_CLIENT}, + {logger, {error_logger, error}}, + {reconnect, 5}]), + schedule_tick(5000), + convert_schedule_table(), %% Can be removed after the first startup + {ok, Handler#handler{state = #{mqtt_client => C, workers => []}}}; + +proc(tick, #handler{state = State} = Handler) -> + {TickT, State1} = schedule_work(State), + schedule_tick(TickT), + {reply, ok, Handler#handler{state = State1}}; + +proc({'DOWN', Ref, process, Pid, normal}, #handler{state = State} = Handler) -> + {reply, ok, Handler#handler{state = worker_done(Pid, Ref, State)}}; + +proc({'DOWN', Ref, process, Pid, Reason}, #handler{state = State} = Handler) -> + ?LOG_INFO("Worker(~p) failed, reason: ~p", [Pid, Reason]), + {reply, ok, Handler#handler{state = worker_done(Pid, Ref, State)}}; + +proc(Msg, Handler) -> + ?LOG_INFO("Unknown request ~p", [Msg]), + {reply, ok, Handler}. + +schedule_work(State) -> + schedule_work(mnesia:dirty_first('Schedule'), os:system_time(millisecond), State). + +-define(TICK_TIME, 1000). +-define(SCHED_RESOLUTION, 100). + +schedule_work('$end_of_table', _Now, State) -> {?TICK_TIME, State}; +schedule_work({TimeStamp, _JobId}, Now, State) when TimeStamp > Now + ?SCHED_RESOLUTION -> + {min(?TICK_TIME, TimeStamp - Now), State}; +schedule_work({_TimeStamp, JobId} = SchedKey, Now, State) -> + State1 = case is_scheduled(JobId, State) of + true -> State; + false -> start_worker(SchedKey, State) + end, + schedule_work(mnesia:dirty_next('Schedule', SchedKey), Now, State1). + +is_scheduled(JobId, #{workers := Workers}) -> + lists:keymember(JobId, #worker.job_id, Workers). + +start_worker(SchedKey = {_TS, JobId}, #{workers := Workers, mqtt_client := MC} = State) -> + {Pid, Monitor} = spawn_monitor(fun() -> do_scheduled_work(SchedKey, MC) end), + State#{workers := [#worker{pid = Pid, job_id = JobId, monitor_ref = Monitor} | Workers]}. + +do_scheduled_work({_TimeStamp, JobId} = SchedKey, MqttClient) -> + ?LOG_INFO("Schedule work ~p in ~p", [JobId, self()]), + R = mnesia:transaction(fun() -> + case mnesia:read('Schedule', SchedKey, write) of %% Lock this entry + [] -> work_not_found; + [_] -> + [Job] = mnesia:read('Job', JobId, write), + case do_work(MqttClient, Job) of + ok -> + complete_job(MqttClient, Job), + mnesia:delete('Schedule', SchedKey, write); + Err -> + ?LOG_INFO("Scheduled work(~p) failed (Reason: ~p)", [JobId, Err]) + end + end end), + case R of + {atomic, _} -> ok; + {aborted, Reason} -> ?LOG_INFO("Worker for ~p aborted, reason: ~p", [JobId, Reason]) + end. + +do_work(MqttClient, #'Job'{feed_id = #act{data = _PhoneId}, data = [#'Message'{ } = Msg]}) -> + send_msg(MqttClient, Msg). + +complete_job(MqttClient, Job = #'Job'{feed_id = #act{data = PhoneId}}) -> + Job1 = Job#'Job'{status = complete, data = [{done, os:system_time(millisecond)}]}, + update_job(Job1), + roster:send_ses(MqttClient, roster:phone(PhoneId), Job1). + +schedule_tick(TickT) -> + erlang:send_after(TickT, self(), tick). + +send_msg(MqttClient, Msg) -> + roster:send_event(MqttClient, ?SYS_SCHED_CLIENT, <<>>, Msg, roster:get_vnode(<<>>, Msg)). + +worker_done(_Pid, Ref, State = #{workers := Workers}) -> + State#{workers := lists:keydelete(Ref, #worker.monitor_ref, Workers)}. + +update_job(Job = #'Job'{id = JId, feed_id = Feed}) -> + case kvs_stream:load_writer(Feed) of + #writer{cache = #'Job'{id = JId}} = W -> + kvs_stream:save(W#writer{cache = Job}); + _ -> + skip + end, + mnesia:write(Job). + +convert_schedule_table() -> + mnesia:transaction(fun() -> + convert_schedule_table(mnesia:first('Schedule'), []) + end). + +convert_schedule_table({_, _}, []) -> ok; +convert_schedule_table('$end_of_table', OldEntries) -> + [convert_schedule_entry(E) || E <- OldEntries]; +convert_schedule_table(TimeStamp, Acc) -> + convert_schedule_table(mnesia:next('Schedule', TimeStamp), [TimeStamp | Acc]). + +convert_schedule_entry(TimeStamp) -> + [#'Schedule'{ data = Jobs }] = mnesia:read('Schedule', TimeStamp), + mnesia:write([#'Schedule'{id = {TimeStamp, JobId}} || {'Job', JobId} <- Jobs]), + mnesia:delete('Schedule', TimeStamp). + +-define(JOBS_LIMIT, 300). + +job_history(#'History'{feed = Feed, size = N0, entity_id = JId0} = History) -> + N = case N0 of + [] when JId0 == 0 -> -?JOBS_LIMIT; + [] -> ?JOBS_LIMIT; + _ -> N0 + end, + JId = case JId0 of + 0 -> + case kvs:get(writer, Feed) of + {ok, #writer{first = #'Job'{id = JId1}}} when N > 0 -> JId1; + {ok, #writer{cache = #'Job'{id = JId1}}} when N =< 0 -> JId1; + {ok, Writer} -> + ?LOG_ERROR("There are no jobs (~p, ~p): ~p", [JId0, N, Writer]), + JId0; + _ -> + JId0 + end; + _ -> JId0 + end, + ?LOG_INFO("get_job_history ~p ~p", [JId, N]), + Jobs = get_job_history(JId, N), + History#'History'{data = Jobs, size = length(Jobs)}. + +get_job_history(JId, N) -> + {Dir, Limit} = if N < 0 -> {bwd, -N}; true -> {fwd, N} end, + FoldFun = + fun(_, {N0, Jobs}) when N0 >= Limit -> {stop, Jobs}; + (#'Job'{ status = delete }, Acc) -> {cont, Acc}; + (J = #'Job'{}, {N0, Jobs}) -> {cont, {N0 + 1, [J | Jobs]}} + end, + + Jobs0 = case roster:fold_stream('Job', FoldFun, JId, {0, []}, Dir) of + {ok, Jobs} -> Jobs; + {eos, {_, Jobs}} -> Jobs + end, + if Dir == bwd -> lists:reverse(Jobs0); + true -> Jobs0 + end. + diff --git a/apps/roster/src/protocol/roster_search.erl b/apps/roster/src/protocol/roster_search.erl index c80b0990684614c93f6d03851fd98760c30b6d99..ab26b183087cc421222c09bed60ee2c26dde9637 100644 --- a/apps/roster/src/protocol/roster_search.erl +++ b/apps/roster/src/protocol/roster_search.erl @@ -27,7 +27,6 @@ info(#'Search'{id = From, ref = Ref, field = <<"nick">>, type = '==', value = Q, #cx{params = <<"emqttd_", _/binary>> = ClientId} = State) when Ref /=[] -> ?ROSTER_LOG_REQ('Search', contact, ClientId, "Field=nick, Value=~p", [Q]), PhoneId = case ClientId of -% <<"sys_bpe">> -> From0; <<"emqttd_", _/binary>> -> roster:phone_id(ClientId); _ -> [] end, @@ -46,7 +45,6 @@ info(#'Search'{id = From, ref = Ref, field = <<"phone">>, type = '==', value = P #cx{params = <<"emqttd_", _/binary>> = ClientId} = State) when is_list(Phones), Ref /=[] -> ?ROSTER_LOG_REQ('Search', contact, ClientId, "Field=phone, Value=~p", [Phones]), PhoneId = case ClientId of -% <<"sys_bpe">> -> From0; <<"emqttd_", _/binary>> -> roster:phone_id(ClientId); _ -> [] end, diff --git a/apps/roster/src/roster.app.src b/apps/roster/src/roster.app.src index bf95b63841c3ebf24a9bb1576438485b590d7208..a4306105c9eecf3895d6d758dd7847fd9a9b6c2a 100644 --- a/apps/roster/src/roster.app.src +++ b/apps/roster/src/roster.app.src @@ -4,7 +4,7 @@ {registered, []}, {applications, [kernel,stdlib, mnesia, crypto, inets, ssl, ibrowse, cowboy, mochiweb, gen_smtp, - kvs, nitro, n2o, emqttc, emqttd, bpe, + kvs, nitro, n2o, emqttc, emqttd, jose, jsx, uuid, erlydtl, jwt, apns, mini_s3, qdate, rest, enenra, locus, prometheus, libphonenumber_erlang]}, diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index b40dafc87da70cfe9a715ee919936a6bdeb0f454..4ff5b7a8d7d3baf944a85d7662084cf83db58fc2 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -14,8 +14,6 @@ -include_lib("roster/include/static/rest_text.hrl"). -compile({parse_transform, bert_validator}). --define(BPE_SYS_CLIENT, <<"sys_bpe">>). - -define(EMAIL_RE, <<"[a-zA-Z0-9\+\.\_\%\-\+]{1,256}\@[a-zA-Z0-9][a-zA-Z0-9\-]{0,64}(\.[a-zA-Z0-9][a-zA-Z0-9\-]{0,25})+">>). -define(PHONE_RE,<<"(?:\\+?(\\d{1})?-?\\(?(\\d{3})\\)?[\\s-\\.]?)?(\\d{3})[\\s-\\.]?(\\d{4})[\\s-\\.]?">>). -define(URL_RE, <<"(?:(?:https?|ftp|file|smtp):\/\/|www\.|ftp\.)(?:\([-A-Z0-9+&@#\/%=~_|$?!:,.]*\)" @@ -25,8 +23,8 @@ log_level() -> info. log_modules() -> [ %% roster api roster_auth, roster_friend, roster_test, roster_client, roster_message, roster_profile, roster_presence, roster_roster, roster_room, - roster_search, roster_proto, roster_bpe, roster_favorite, roster_history, roster, roster_acl, roster_push, roster_link, - roster_db, roster_data, + roster_search, roster_proto, roster_favorite, roster_history, roster, roster_acl, roster_push, roster_link, + roster_db, roster_data, roster_schedule, %% micro service api micro_test, micro_auth, micro_roster, micro_profile, %% 3rd part api @@ -79,7 +77,7 @@ start(_, _) -> roster_auth:start(), micro_auth:start(), [ M:start() || M <- application:get_env(?MODULE, start_modules, [])], - roster_bpe:start(), + roster_schedule:start(), rest_cowboy_handler:start(), roster_room:start(), roster_history:start(), @@ -2146,49 +2144,6 @@ mover(#reader{pos = Pos, cache = {_, Id}, dir = D} = R, {'Message', Id2} = T) -> end end. -exclude({N, Id}, El2, 0, UID, L) -> exclude({N, Id, []}, 1, El2, 0, UID, L); -exclude({N, Id}, El2, C, UID, List) -> exclude({N, Id, []}, 1, El2, C, UID, List). -exclude({_, []}, _, _, _, _, List) -> List; -exclude({N, Id}, El1, El2, 0, UID, L) -> - exclude({N, Id, []}, El1, El2, 0, UID, L); -exclude({N, Id, _Limit}, El1, El2, 0, UID, _) -> - {ok, T} = kvs:get(N, Id),%Fid=#'Message'.seenby, - case element(El1, T) of - E when is_atom(E), element(El2, T) == delete -> []; - _Any when element(El2, T) == delete; element(El2, T) == edit -> [T]; - [-1] -> []; - [_ | _] = Ids -> case lists:member(UID, Ids) of - true -> []; - false -> NewT = erlang:setelement(El1, T, []), [NewT] end; - E when is_atom(E) -> [T]; - _ -> NewT = erlang:setelement(El1, T, []), [NewT] end; -exclude({N, Id}, El1, El2, C, UID, List) -> exclude({N, Id, []}, El1, El2, C, UID, List); - -exclude({_, Id, Limit}, _, _, _, _, List) when Id > Limit -> List; -exclude({N, Id, Limit}, El1, El2, C, UID, List) -> - Fun = fun(A, Acc) -> case element(2, A) > Limit of true -> Acc; false -> [A | Acc] end end, - case entries(Fun, Id, N, C) of - [H | _] = L0 -> - % ElN=tuple_size(T)-1, - L = lists:foldr(fun(A, Acc) -> - case element(El1, A) of - E when is_atom(E), element(El2, A) == delete -> Acc; - _Any when element(El2, A) == delete; element(El2, A) == edit -> [A | Acc]; - [-1] -> Acc; - [_ | _] = Ids -> case lists:member(UID, Ids) of - true -> Acc; - false -> A1 = erlang:setelement(El1, A, []), [A1 | Acc] end; - E when is_atom(E) -> [A | Acc]; - _ -> A1 = erlang:setelement(El1, A, []), [A1 | Acc] end - end, [], L0), - case abs(C) - length(L) of - 0 when C > 0 -> L ++ List; - C1 when C > 0 -> exclude({N, element(#iterator.prev, H), Limit}, El1, El2, C1, UID, L ++ List); - 0 -> List ++ lists:reverse(L); - C2 -> exclude({N, element(#iterator.next, H), Limit}, El1, El2, -C2, UID, List ++ lists:reverse(L)) - end; - _ -> List - end. %% entries(Function,Id,Table,Number) %% entries(kvs:get(writer,Feed),Table,10) diff --git a/apps/roster/src/roster_db.erl b/apps/roster/src/roster_db.erl index 2c62074a01daf5ebcf7bd4003d0db95ef046aa32..0f7d8e08112fd8da842db0223379b463c9a8e236 100644 --- a/apps/roster/src/roster_db.erl +++ b/apps/roster/src/roster_db.erl @@ -616,15 +616,16 @@ backup(Schema, Tables, Node, File, Opts) when is_list(Tables) -> restore(File) -> restore(File, node(), []). restore(File, Node, SkipTbs) -> - application:stop(bpe), case mnesia:restore(File, [{skip_tables, SkipTbs}]) of - {aborted, {no_exists, Tab}} -> restore(File, Node, [Tab | SkipTbs]); - {aborted, Reason} -> ?LOG_INFO("aborted backup restore from ~s by reason ~p", [File, Reason]), - {error, Reason}; - {atomic, Tabs} -> application:start(bpe), - roster:restart_module(roster_bpe), - {ok, Tabs}; - E -> E end. + {aborted, {no_exists, Tab}} -> + restore(File, Node, [Tab | SkipTbs]); + {aborted, Reason} -> + ?LOG_INFO("aborted backup restore from ~s by reason ~p", [File, Reason]), + {error, Reason}; + {atomic, Tabs} -> + {ok, Tabs}; + E -> E + end. remigrate_default_contact_settings() -> kvs:delete(schema_migrations, 20181008144317), diff --git a/apps/roster/src/roster_oam.erl b/apps/roster/src/roster_oam.erl index 8af1aebe21f3ba67d711001f477db618b10c5b88..cfc366ff9a8d57ccb0ddf559deea1d771f5eb098 100644 --- a/apps/roster/src/roster_oam.erl +++ b/apps/roster/src/roster_oam.erl @@ -75,9 +75,9 @@ applist() -> crypto,mnesia] ++ stoplist(). stoplist() -> - [libphonenumber_erlang,syn,cowlib,kvs,jiffy,idna,parse_trans,goldrush,esockd,public_key, - bpe,lager,ssl,ranch,mochiweb,ssl_verify_fun,locus,emqttd,hackney,roster,service,active,cowboy, - emq_dashboard,emqttc,enenra,envy,uuid,erlydtl,forms,gen_smtp,json_rec,jwt,mad,migresia,mini_s3,nitro, + [libphonenumber_erlang,cowlib,kvs,jiffy,idna,parse_trans,goldrush,esockd,public_key, + lager,ssl,ranch,mochiweb,ssl_verify_fun,locus,emqttd,hackney,roster,service,active,cowboy, + emq_dashboard,emqttc,enenra,envy,uuid,erlydtl,gen_smtp,json_rec,jwt,mad,migresia,mini_s3, opencensus,qdate,rest,rfc3339,sh,stacktrace_compat]. %% This is supposed to be run as diff --git a/apps/roster/src/roster_proto.erl b/apps/roster/src/roster_proto.erl index 6533775c0878b47118069324668104155c891e19..d0a1ee8e07ee429f6ce1adfa8f5863cc095769c4 100644 --- a/apps/roster/src/roster_proto.erl +++ b/apps/roster/src/roster_proto.erl @@ -24,12 +24,12 @@ info(BusinessObject, Req, State) -> prometheus_api:observe(RecordName, RequestName, ExecutionTimeInMicroSeconds), Response. -do_info(#'Job'{} = Job, Req, #cx{params = <<"emqttd_",_/binary>>} = State) -> roster_bpe :info(Job, Req, State); +do_info(#'Job'{} = Job, Req, #cx{params = <<"emqttd_",_/binary>>} = State) -> roster_schedule :info(Job, Req, State); do_info(#'Typing'{} = Typing, Req, #cx{params = <<"emqttd_",_/binary>>} = State) -> roster_message :info(Typing, Req, State); do_info(#'Message'{} = Message, Req, #cx{params = <<"emqttd_",_/binary>>} = State) -> roster_message :info(Message, Req, State); do_info(#'Message'{} = Message, Req, #cx{params = <<"sys_" ,_/binary>>} = State) -> roster_message :info(Message, Req, State); %do_info(#'History'{feed = #mqi{}} = History, Req, #cx{params = <<"emqttd_",_/binary>>} = State) -> roster_channel_history :info(History, Req, State); -do_info(#'History'{feed = #act{}} = History, Req, #cx{params = <<"emqttd_",_/binary>>} = State) -> roster_bpe :info(History, Req, State); +do_info(#'History'{feed = #act{}} = History, Req, #cx{params = <<"emqttd_",_/binary>>} = State) -> roster_schedule :info(History, Req, State); do_info(#'History'{} = History, Req, #cx{params = <<"emqttd_",_/binary>>} = State) -> roster_history :info(History, Req, State); do_info(#'Profile'{} = Profile, Req, #cx{params = <<"sys_" ,_/binary>>} = State) -> micro_profile :info(Profile, Req, State); do_info(#'Profile'{} = Profile, Req, #cx{params = <<"emqttd_",_/binary>>} = State) -> roster_profile :info(Profile, Req, State); diff --git a/apps/roster/src/roster_validator.erl b/apps/roster/src/roster_validator.erl index a8ab412c74693c8fe1e2eca5e4923ecd13c16dd9..538e4749cf18b7fee95767485ad91e5559e7fb79 100644 --- a/apps/roster/src/roster_validator.erl +++ b/apps/roster/src/roster_validator.erl @@ -21,7 +21,7 @@ %% roster_validator.erl in "priv/src". %% This file is ignored however, since ?MODULE in apps/roster/src is used instead. -%% Generate at 2020-06-04T15:09:25+02:00 +%% Generate at 2020-06-04T15:23:24+02:00 -compile(export_all). -define(COND_FUN(Cond), fun(Rec) when Cond -> true; (_) -> false end). validate(Obj) -> @@ -132,305 +132,6 @@ validate(D = #'iterator'{}, Acc) -> ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); Err -> Err end; -validate(D = #'task'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {name, Name} when (is_atom(Name) orelse Name == []) -> []; - {module, Module} when (is_atom(Module) orelse Module == []) -> []; - {prompt, Prompt} when is_list(Prompt) -> []; - {roles, Roles} when is_binary(Roles) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'task'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'userTask'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {name, Name} when (is_atom(Name) orelse Name == []) -> []; - {module, Module} when (is_atom(Module) orelse Module == []) -> []; - {prompt, Prompt} when is_list(Prompt) -> []; - {roles, Roles} when is_binary(Roles) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'userTask'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'serviceTask'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {name, Name} when (is_atom(Name) orelse Name == []) -> []; - {module, Module} when (is_atom(Module) orelse Module == []) -> []; - {prompt, Prompt} when is_list(Prompt) -> []; - {roles, Roles} when is_binary(Roles) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'serviceTask'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'receiveTask'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {name, Name} when (is_atom(Name) orelse Name == []) -> []; - {module, Module} when (is_atom(Module) orelse Module == []) -> []; - {prompt, Prompt} when is_list(Prompt) -> []; - {roles, Roles} when is_binary(Roles) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'receiveTask'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'messageEvent'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {name, Name} when (is_atom(Name) orelse Name == []) -> []; - {module, Module} when (is_atom(Module) orelse Module == []) -> []; - {prompt, Prompt} when is_list(Prompt) -> []; - {payload, Payload} when is_binary(Payload) -> []; - {timeout, Timeout} when is_tuple(Timeout) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'messageEvent'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'boundaryEvent'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {name, Name} when (is_atom(Name) orelse Name == []) -> []; - {module, Module} when (is_atom(Module) orelse Module == []) -> []; - {prompt, Prompt} when is_list(Prompt) -> []; - {payload, Payload} when is_binary(Payload) -> []; - {timeout, Timeout} when is_tuple(Timeout) -> []; - {timeDate, TimeDate} when is_binary(TimeDate) -> []; - {timeDuration, TimeDuration} when is_binary(TimeDuration) -> []; - {timeCycle, TimeCycle} when is_binary(TimeCycle) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'boundaryEvent'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'timeoutEvent'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {name, Name} when (is_atom(Name) orelse Name == []) -> []; - {module, Module} when (is_atom(Module) orelse Module == []) -> []; - {prompt, Prompt} when is_list(Prompt) -> []; - {payload, Payload} when (is_binary(Payload) orelse Payload == []) -> []; - {timeout, Timeout} when (is_tuple(Timeout) orelse Timeout == []) -> []; - {timeDate, TimeDate} when (is_binary(TimeDate) orelse TimeDate == []) -> []; - {timeDuration, TimeDuration} when (is_binary(TimeDuration) orelse TimeDuration == []) -> []; - {timeCycle, TimeCycle} when (is_binary(TimeCycle) orelse TimeCycle == []) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'timeoutEvent'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'beginEvent'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {name, Name} when (is_atom(Name) orelse Name == []) -> []; - {module, Module} when (is_atom(Module) orelse Module == []) -> []; - {prompt, Prompt} when is_list(Prompt) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'beginEvent'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'endEvent'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {name, Name} when (is_atom(Name) orelse Name == []) -> []; - {module, Module} when (is_atom(Module) orelse Module == []) -> []; - {prompt, Prompt} when is_list(Prompt) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'endEvent'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'sequenceFlow'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {source, Source} when (is_atom(Source) orelse Source == []) -> []; - {target, Target} when (is_list(Target) orelse is_atom(Target) orelse Target == []) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'sequenceFlow'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'hist'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {id, Id} when (is_integer(Id) orelse Id == []) -> []; - {container, Container} when is_atom(Container) -> []; - {feed_id,_} -> []; - {prev, Prev} when (is_integer(Prev) orelse Prev == []) -> []; - {next, Next} when (is_integer(Next) orelse Next == []) -> []; - {feeds, Feeds} when is_list(Feeds) -> []; - {name, Name} when (is_binary(Name) orelse Name == []) -> []; - {task, Task} when is_atom(Task) -> []; - {docs, Docs} when is_list(Docs) -> []; - {time,_} -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'hist'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'process'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {id, Id} when (is_integer(Id) orelse Id == []) -> []; - {container, Container} when is_atom(Container) -> []; - {feed_id,_} -> []; - {prev, Prev} when (is_integer(Prev) orelse Prev == []) -> []; - {next, Next} when (is_integer(Next) orelse Next == []) -> []; - {feeds, Feeds} when is_list(Feeds) -> []; - {name, Name} when (is_binary(Name) orelse Name == []) -> []; - {roles, Roles} when is_list(Roles) -> []; - {tasks, Tasks} when is_list(Tasks) -> []; - {events, Events} when is_list(Events) -> []; - {flows, Flows} when is_list(Flows) -> []; - {docs, Docs} when is_list(Docs) -> []; - {options,_} -> []; - {task, Task} when (is_atom(Task) orelse Task == []) -> []; - {timer, Timer} when (is_binary(Timer) orelse Timer == []) -> []; - {notifications, Notifications} when (Notifications == []) -> []; - {result, Result} when (is_binary(Result) orelse Result == []) -> []; - {started, Started} when (is_tuple(Started) orelse Started == []) -> []; - {beginEvent, BeginEvent} when (is_atom(BeginEvent) orelse BeginEvent == []) -> []; - {endEvent, EndEvent} when (is_atom(EndEvent) orelse EndEvent == []) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'process'), tl(tuple_to_list(D)))]), - CondFuns = [?COND_FUN(is_record(Rec, 'sequenceFlow'))], - Fields = [D#'process'.flows], - FieldNames = [flows], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'complete'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {id, Id} when (is_integer(Id) orelse Id == []) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'complete'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'proc'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {id, Id} when (is_integer(Id) orelse Id == []) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'proc'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'load'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {id, Id} when (is_integer(Id) orelse Id == []) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'load'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'histo'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {id, Id} when (is_integer(Id) orelse Id == []) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'histo'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'create'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {proc, Proc} when (is_binary(Proc) orelse is_record(Proc, 'process') orelse Proc == []) -> []; - {docs, Docs} when (is_list(Docs) orelse Docs == []) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'create'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; -validate(D = #'amend'{}, Acc) -> - ErrFields = lists:flatten( - [case {RecField, F} of - {id, Id} when (is_integer(Id) orelse Id == []) -> []; - {docs, Docs} when (is_list(Docs) orelse Docs == []) -> []; - _ -> RecField - end || {RecField, F} <- lists:zip(record_info(fields, 'amend'), tl(tuple_to_list(D)))]), - CondFuns = [], - Fields = [], - FieldNames = [], - case validate(lists:zip3(CondFuns, FieldNames, Fields), [{ErrFields, D} | Acc]) of - ok -> validate(lists:zip(FieldNames, Fields), [{ErrFields, D} | Acc]); - Err -> Err - end; validate(D = #'push'{}, Acc) -> ErrFields = lists:flatten( [case {RecField, F} of @@ -899,7 +600,7 @@ validate(D = #'Job'{}, Acc) -> {prev, Prev} when (is_integer(Prev) orelse Prev == []) -> []; {next, Next} when (is_integer(Next) orelse Next == []) -> []; {context, Context} when (is_binary(Context) orelse is_integer(Context) orelse Context == []) -> []; - {proc, Proc} when (is_record(Proc, 'process') orelse is_integer(Proc) orelse Proc == []) -> []; + {proc, Proc} when (is_integer(Proc) orelse Proc == []) -> []; {time, Time} when (is_integer(Time) orelse Time == []) -> []; {data, Data} when (is_list(Data) orelse is_binary(Data) orelse Data == []) -> []; {events, Events} when (is_list(Events) orelse Events == []) -> []; diff --git a/apps/roster/test/job_tests.erl b/apps/roster/test/job_tests.erl index 75cefea7569f9c72e01118fee2839a850d76b8ce..9da3dd7c7dc51d8c0b89b723ba3e42097243e749 100644 --- a/apps/roster/test/job_tests.erl +++ b/apps/roster/test/job_tests.erl @@ -28,8 +28,6 @@ delete() -> [roster_client:send_receive(roster_client:gen_name(roster:phone(To)), 3, #'Friend'{phone_id = To, friend_id = A, status = confirm}) || To <- [B, C]], roster_client:set_filer([AClientId, BClientId], filter), timer:sleep(1000), - JobDelay = application:get_env(roster, job_delay, 1), - application:set_env(roster, job_delay, 1), #'History'{} = roster_client:send_receive(AClientId, #'History'{roster_id = A, feed = {act, <<"publish">>, A}, size = [], status = get}), %[_, ARId]=roster:parts_phone_id(A), [_, BRId]=roster:parts_phone_id(B) #'Job'{id=DeleteId, proc = Proc} = roster_client:send_receive(AClientId, #'Job'{time = roster:now_msec() + 70000, feed_id = {act, <<"publish">>, A}, data = [ @@ -65,7 +63,6 @@ delete() -> #'Message'{feed_id = roster:feed_key(p2p, A, B), from = A, to = B, files = [#'Desc'{id = <<"3">>, payload = <<"TestA!">>}], status = []}] , status = init}), #'History'{data=[#'Job'{status=pending}]} = roster_client:send_receive(AClientId, #'History'{roster_id = A, feed = {act, <<"publish">>, A}, size = [], status = get}), - application:set_env(roster, job_delay, JobDelay), [roster_client:stop_client(roster_client:gen_name(Phone)) || Phone <- Phones ++ [A]], ok. diff --git a/apps/roster/test/micro_test.erl b/apps/roster/test/micro_test.erl index e0bc62a418bd9ea6d9d8a9eb362f32955207ba33..4952ab7ef4020a56af2139043152f2d2abad1141 100644 --- a/apps/roster/test/micro_test.erl +++ b/apps/roster/test/micro_test.erl @@ -374,8 +374,6 @@ del_rosters() -> timer:sleep(1000), #'Profile'{settings = Settings, status=update}= roster_client:send_receive(BridgeClient,3, #'Profile'{status =update, phone = Auuid, settings = Settings = [#'Feature'{id = <<"dummy_id">>, key = <<"dummy_key">>}]}), - %% JobDelay = application:get_env(roster, job_delay, 1), - application:set_env(roster, job_delay, 1), #'History'{} = roster_client:send_receive(AClientId, 3,#'History'{roster_id = A, feed = {act, <<"publish">>, A}, size = [], status = get}), %[_, ARId]=roster:parts_phone_id(A), [_, BRId]=roster:parts_phone_id(B) #'Job'{} = roster_client:send_receive(BClientId, #'Job'{time = roster:now_msec() + 70000, feed_id = {act, <<"publish">>, B}, data = [ @@ -434,7 +432,6 @@ del_rosters() -> %% <<"Reverse Message in ", Room/binary, " from ", AClientId/binary>>}], status = []}) %% || Room <- lists:reverse(RoomNames)], roster_client:send(BridgeClient, #'Profile'{status = remove, phone = Auuid, rosters=[micro:norm_uuid(Auuid1Link),Auuid2]}), -% application:set_env(roster, job_delay, JobDelay), %% roster_client:send_receive(AClientId, #'Job'{proc= 3, data =[#'Message'{from =A, to=B, status = []}], time=roster:now_msec()+10000 , status=restart}), timer:sleep(4000), [roster_client:stop_client(roster_client:gen_name(micro:norm_uuid(micro:to_uuid(phone, Phone)))) || {Phone,_} <- Phones],%--[APhone]], diff --git a/apps/roster/test/roster_test.erl b/apps/roster/test/roster_test.erl index 8b234912ae1657f3ef1399587c6bd723128da16e..9abe1e6d8ead8177dca5ee436362732a767d8cd0 100644 --- a/apps/roster/test/roster_test.erl +++ b/apps/roster/test/roster_test.erl @@ -1670,76 +1670,6 @@ forbid('Message') -> 20. purge_fake_users() -> length([roster:purge_user(Phone) || #'Auth'{phone = Phone} <- kvs:all('Auth'), byte_size(Phone) < 12]). -bpe() -> - PhoneA = <<"773">>, - Phones = [<<"5333">>, <<"377">>], - [kvs:put(#'Whitelist'{phone = Phone, created = roster:now_msec()}) || Phone <- Phones ++ [PhoneA]], - [ begin try roster:purge_user(Phone) catch _:_ -> skip end end || Phone <- [PhoneA | Phones]], - [{B, BClientId, _}, {C, _, _}, {A, AClientId, _}] = - [begin - roster_client:gen_name_reg(Phone), - {ClientId, Token} = roster_client:reg_fake_user(Phone), - roster_client:receive_drop(), %roster_client:stop_client(C), - roster_client:start_cli_receive(ClientId, Token), - [{PhoneId, _, _} | _] = roster:list_rosters(Phone), {PhoneId, ClientId, Token} - end || Phone <- Phones ++ [PhoneA]], - roster_client:set_filer([AClientId, BClientId], filter_friend), - [roster_client:send_receive(AClientId, 1, #'Friend'{phone_id = A, friend_id = To, status = request}) || To <- [B, C]], - [roster_client:send_receive(roster_client:gen_name(roster:phone(To)), 3, #'Friend'{phone_id = To, friend_id = A, status = confirm}) || To <- [B, C]], - roster_client:set_filer([AClientId, BClientId], filter), - timer:sleep(1000), - JobDelay = application:get_env(roster, job_delay, 1), - application:set_env(roster, job_delay, 1), - #'History'{} = roster_client:send_receive(AClientId, #'History'{roster_id = A, feed = {act, <<"publish">>, A}, size = [], status = get}), - %[_, ARId]=roster:parts_phone_id(A), [_, BRId]=roster:parts_phone_id(B) - #'Job'{proc = Proc} = roster_client:send_receive(AClientId, #'Job'{time = roster:now_msec() + 70000, container = chain, feed_id = #act{name= <<"publish">>, data=A}, data = [ - #'Message'{feed_id = roster:feed_key(p2p, A, B), from = A, to = B, files = [#'Desc'{id = <<"3">>, payload = <<"TestA!">>}], status = []}] - , status = init}), - timer:sleep(1000), - #'Job'{proc = Proc} = roster_client:send_receive(BClientId, #'Job'{time = roster:now_msec() + 7000, feed_id = {act, <<"publish">>, B}, data = [ - #'Message'{feed_id = roster:feed_key(p2p, A, B), from = B, to = A, files = [#'Desc'{id = <<"7">>, payload = <<"TestB!">>}], status = []}] - , status = init}), - #'Message'{id = EId} = roster_client:send_receive(AClientId, #'Message'{feed_id = roster:feed_key(p2p, A, B), from = A, to = B, - files = [#'Desc'{id = <<"1">>, payload = <<"Opa 77777777777777">>}, - #'Desc'{id = <<"3">>, payload = <<"E TestA!">>}], status = []}), - timer:sleep(1000), - %#'Job'{ proc=Proc1}= - #'Message'{} = roster_client:send_receive(AClientId, #'Job'{time = [], feed_id = #act{name= <<"publish">>, data=A}, data = [ - #'Message'{id = EId, feed_id = roster:feed_key(p2p, A, B), from = A, to = B, %type=forward, - files = - [#'Desc'{id = <<"7">>, payload = list_to_binary([<<"777">> | [integer_to_binary(rand:uniform(99999999)) || _ <- lists:seq(1, 7)]])}], - %<<"Next TestA!">> - status = edit}, #'Message'{feed_id = roster:feed_key(p2p, A, B), from = A, to = B, %type=forward, - files = - [#'Desc'{id = <<"7">>, payload = list_to_binary([<<"Opa">> | [integer_to_binary(rand:uniform(99999999)) || _ <- lists:seq(1, 7)]])}], - %<<"Next TestA!">> - status = []}] - , - status = init}), - roster_client:receive_test(AClientId, #'Message'{}, 2), - roster_client:receive_drop(), - #'History'{} = roster_client:send_receive(AClientId, #'History'{roster_id = A, feed = {act, <<"publish">>, A}, size = [], status = get}), - %#io{code=#complete{ },data=D} = roster_client:send_receive(AClientId, #'Job'{proc = [Id], data= <<"stop">>, status=update}), - %roster_client:send(AClientId, #'Job'{proc=Proc, status=update }), - #'Job'{id = UpId, proc = Proc} = roster_client:send_receive(AClientId, #'Job'{time = roster:now_msec() + 30000, feed_id = {act, <<"publish">>, A}, data = [ - #'Message'{feed_id = roster:feed_key(p2p, A, B), from = A, to = B, files = [#'Desc'{id = <<"7">>, payload = <<"Next 2 TestA!">>}], status = []}] - , status = init}), - timer:sleep(2000), - #'Job'{proc = Proc} = roster_client:send_receive(AClientId, #'Job'{id = UpId, time = roster:now_msec() + 70000, feed_id = {act, <<"publish">>, A}, settings = [], data = [ - #'Message'{feed_id = roster:feed_key(p2p, A, B), from = A, to = B, files = [#'Desc'{id = <<"7">>, payload = <<"Next 3 TestA!">>}], status = []}] - , status = update}), - [roster_client:send(AClientId, #'Job'{time = roster:now_msec() + 2000 + DT * 300, feed_id = {act, <<"publish">>, A}, data = - [#'Message'{feed_id = roster:feed_key(p2p, A, A), from = A, to = A, %type=forward, - files = - [#'Desc'{id = <<"7">>, payload = list_to_binary([<<"777">> | [integer_to_binary(rand:uniform(99999999)) || _ <- lists:seq(1, 70)]])}], - %<<"Next TestA!">> - status = []}], - status = init}) || DT <- lists:seq(1, 10)], - application:set_env(roster, job_delay, JobDelay), -%% roster_client:send_receive(AClientId, #'Job'{proc= 3, data =[#'Message'{from =A, to=B, status = []}], time=roster:now_msec()+10000 , status=restart}), - [roster_client:stop_client(roster_client:gen_name(Phone)) || Phone <- Phones ++ [A]]. - - data_integrity() -> [W || #writer{id = #act{}, cache = #'Job'{}, first = #'Job'{context = Cnt}} = W <- kvs:all(writer), (not is_integer(Cnt))], [P || #'Profile'{rosters = Rosters} = P <- kvs:all('Profile'), Rosters == []]. diff --git a/doc/release-notes/next/roster_schedule.md b/doc/release-notes/next/roster_schedule.md new file mode 100644 index 0000000000000000000000000000000000000000..774afb79f30781cb44a1a27f85cbef902a70b7ae --- /dev/null +++ b/doc/release-notes/next/roster_schedule.md @@ -0,0 +1 @@ +* Replace roster_bpe (and remove dependency on bpe) with roster_schedule diff --git a/eqc/jobs_eqc.erl b/eqc/jobs_eqc.erl new file mode 100644 index 0000000000000000000000000000000000000000..5a93b1aa3f387e7c444211d6598a87bddc951b3f --- /dev/null +++ b/eqc/jobs_eqc.erl @@ -0,0 +1,171 @@ +%%% File : jobs_eqc.erl +%%% Author : Hans Svensson +%%% Description : +%%% Created : 2 Jun 2020 by Hans Svensson +-module(jobs_eqc). + +-compile([export_all, nowarn_export_all]). + +-include_lib("roster/include/roster.hrl"). +-include_lib("emqttc/include/emqttc_packet.hrl"). +-include_lib("eqc/include/eqc.hrl"). + +-record(job, { schedule_at :: non_neg_integer() + , run_after :: non_neg_integer() + , cancel_after :: no | non_neg_integer() + , task = send_msg :: send_msg }). + +-define(USER_PHONE, <<"jobs_eqc">>). + +-define(mqtt(Payload), #mqtt_packet{payload = Payload}). + +-define(SCHEDULE_TOLERANCE, 200). +-define(EXECUTION_TOLERANCE, 1000). + +gen_t() -> choose(100, 5000). + +gen_job() -> + ?SUCHTHAT(J, #job{ schedule_at = gen_t(), + run_after = gen_t(), + cancel_after = weighted_default({3, no}, {1, gen_t()}), + task = elements([send_msg]) }, + J#job.cancel_after == no orelse abs(J#job.cancel_after - J#job.run_after) > ?EXECUTION_TOLERANCE). + +prop_schedule() -> + ?SETUP( + fun() -> nynja:ensure_user(?USER_PHONE), fun() -> ok end end, + ?FORALL(Jobs, non_empty(?SIZED(Sz, resize(Sz * 5, list(gen_job())))), + begin + {JobHist, Trace} = run_jobs(lists:usort(Jobs)), + %% io:format("Trace:\n~p\n", [Trace]), + conjunction([{trace, equals(check_trace(Trace, [], []), [])}, + {history, equals(check_history(JobHist, Trace), [])}]) + end)). + +check_trace([], Jobs, Acc) -> lists:keysort(1, lists:keysort(1, [ {ST, {JId, Job}} || {JId, Job = #job{ schedule_at = ST }} <- Jobs] ++ Acc)); +check_trace([{_T, {schedule, Job = #job{}}} | Trace], Jobs, Acc) -> + check_trace(Trace, [{undefined, Job} | Jobs], Acc); +check_trace([{_T, {cancel, Job = #job{cancel_after = C, run_after = R}}} | Trace], Jobs, Acc) when C < R-> + check_trace(Trace, lists:keydelete(Job, 2, Jobs), Acc); +check_trace([X = {T, #'Job'{ status = pending, id = JId, data = [#'Message'{ files = [#'Desc'{ payload = Msg } ] }]}} | Trace], Jobs, Acc) -> + Job = #job{ schedule_at = ST } = find_job(Msg, Jobs), + Acc1 = [ X || abs(ST - T) > ?SCHEDULE_TOLERANCE ] ++ Acc, + check_trace(Trace, [{JId, Job} | Jobs -- [{undefined, Job}]], Acc1); +check_trace([X = {T, #'Job'{ status = complete, id = JId }} | Trace], Jobs, Acc) -> + Acc1 = + case lists:keyfind(JId, 1, Jobs) of + {_, #job{ schedule_at = ST, run_after = RT }} -> + [ X || abs(ST + RT - T) > ?EXECUTION_TOLERANCE ] ++ Acc; + false -> + [X | Acc] + end, + check_trace(Trace, lists:keydelete(JId, 1, Jobs), Acc1); + +check_trace([_ | Trace], Jobs, Acc) -> + check_trace(Trace, Jobs, Acc). + +find_job(Msg, Jobs) -> + [Job] = [ Job || {undefined, Job} <- Jobs, make_msg(Job) == Msg ], + Job. + +check_history(?mqtt(#'History'{data = Jobs0}), History) -> + Jobs = [ {JId, St} || #'Job'{id = JId, status = St} <- Jobs0 ], + HJobs = [ {JId, St} || {_, #'Job'{id = JId, status = St}} <- History, lists:member(St, [complete, delete]) ], + + MissingJobs = [X || X = {_, complete} <- HJobs] -- Jobs, + UnexpectedJobs = [X || X = {_, St} <- Jobs, St /= complete], + + MissingJobs ++ UnexpectedJobs. + +run_jobs(Jobs) -> + Handler = connection_handler(?USER_PHONE), + call_client(Handler, start), + [ erlang:send_after(T, Handler, {schedule_job, J}) || J = #job{ schedule_at = T } <- Jobs ], + [ erlang:send_after(T + C, Handler, {cancel_job, J}) || J = #job{ schedule_at = T, cancel_after = C } <- Jobs, is_integer(C) ], + TestLength = lists:max([ S + T || #job{ schedule_at = S, run_after = T } <- Jobs ]), + timer:sleep(TestLength + ?EXECUTION_TOLERANCE + 1000), + call_client(Handler, stop). + + +call_client(H, Msg) -> + Ref = make_ref(), + H ! {call, Ref, self(), Msg}, + receive + {reply, Ref, Res} -> Res + end. + +connection_handler(Phone) -> + Parent = self(), + Pid = spawn_link(fun() -> + process_flag(trap_exit, true), + User = nynja:connect_user(Phone), + Parent ! {self(), connected}, + handle_connection(Parent, User, os:timestamp(), []) + end), + receive + {Pid, connected} -> Pid + end. + +handle_connection(Parent, User, StartT, History) -> + Pid = nynja:user_pid(User), + receive + {'EXIT', Parent, _Reason} -> + nynja:ws_close(User); + {call, Ref, From, start} -> + From ! {reply, Ref, ok}, + handle_connection(Parent, User, os:timestamp(), History); + {call, Ref, From, stop} -> + NJobs = length([x || {_, {schedule, _}} <- History]), + JobHist = get_job_history(User, NJobs), + nynja:ws_close(User), + From ! {reply, Ref, {JobHist, lists:reverse(History)}}; + {schedule_job, Job} -> + schedule_job(User, Job), + handle_connection(Parent, User, StartT, [event(StartT, {schedule, Job}) | History]); + {cancel_job, Job} -> + cancel_job(User, Job, History), + handle_connection(Parent, User, StartT, [event(StartT, {cancel, Job}) | History]); + {Pid, ?mqtt(Payload)} -> + handle_connection(Parent, User, StartT, [event(StartT, Payload) | History]); + Packet -> + io:format("Unexpected packet ~p\n", [Packet]), + handle_connection(Parent, User, StartT, History) + end. + +event(StartT, E) -> + {timer:now_diff(os:timestamp(), StartT) div 1000, E}. + +schedule_job(User, #job{ run_after = Time, task = send_msg } = J) -> + PhoneId = nynja:user_roster(User), + Msg = nynja:make_message(User, PhoneId, #p2p{ from = PhoneId, to = PhoneId }, make_msg(J), false), + NowMs = nynja:now_msec(), + Job = #'Job'{ feed_id = #act{ data = PhoneId }, time = NowMs + Time, data = [Msg], status = init }, + async_send(User, Job). + +cancel_job(User, Job, History) -> + Msg = make_msg(Job), + case [JId || {_, #'Job'{id = JId, data = [#'Message'{ files = [#'Desc'{ payload = Msg0 } ] }]}} <- History, Msg == Msg0 ] of + [JId] -> + JobMsg = #'Job'{ id = JId, status = delete, feed_id = #act{data = <<>>} }, + async_send(User, JobMsg); + _ -> + ok + end. + +get_job_history(User, NJobs) -> + History = #'History'{ feed = #act{data = nynja:user_roster(User)}, size = -NJobs, entity_id = 0, status = get, roster_id = <<>> }, + nynja:ws_send(User, nynja:mqtt_publish(nynja:user_client(User), History)). + +async_send(User, Msg) -> + Packet = nynja:mqtt_publish(nynja:user_client(User), Msg), + nynja:ws_send_async(User, Packet), + ok. + +make_msg(#job{ schedule_at = X, run_after = Y, cancel_after = Z }) -> + %% N = random:uniform(1 bsl 40 -1), + iolist_to_binary(io_lib:format("~p : ~p : ~p", [X, Y, Z])). + +align_time() -> + {_, _, X} = os:timestamp(), + timer:sleep(1000 - (X div 1000)). + diff --git a/rebar.config b/rebar.config index 3019fb524495a6e026884aa33c467ef214adb9f1..1c213461f4ddbec770fcb4fd66f0524a6dd64403 100644 --- a/rebar.config +++ b/rebar.config @@ -9,7 +9,8 @@ {active, {git, "git://github.com/synrc/active", {branch,"master"}}}, {bert, {git, "git://github.com/NYNJA-MC/bert.git", {branch, master}}}, {esockd, {git, "https://github.com/voxoz/esockd", {branch, "master"}}}, - {bpe, {git, "git://github.com/NYNJA-MC/bpe", {branch, "nynja"}}}, + {kvs, {git, "git://github.com/NYNJA-MC/kvs", {branch, "nynja"}}}, + {nitro, {git, "git://github.com/synrc/nitro", {tag, "4.4"}}}, {emqttd, {git, "git://github.com/NYNJA-MC/emqttd", {branch,"master"}}}, {n2o, {git, "git://github.com/NYNJA-MC/n2o", {branch,"v6.4-otp21"}}}, {emqttc, {git, "git://github.com/NYNJA-MC/emqttc", {branch,"master"}}}, @@ -78,15 +79,15 @@ fs, gproc, gen_logger, compiler, mnesia, kvs, esockd, - prometheus,bert,n2o, + prometheus,bert,n2o,nitro, metrics,mimerl,unicode_util_compat,base64url,jsx,tools, certifi,ibrowse,asn1,xmerl,counters,ctx, wts,syntax_tools,qdate_localtime, - libphonenumber_erlang,syn,cowlib,jiffy,idna,parse_trans, - goldrush, public_key,bpe,{lager,load},ssl,ranch,gun,apns, + libphonenumber_erlang,cowlib,jiffy,idna,parse_trans, + goldrush, public_key,{lager,load},ssl,ranch,gun,apns, ssl_verify_fun,locus,emqttd,hackney,roster,service,active, - cowboy,emq_dashboard,emqttc,enenra,envy,uuid,erlydtl,forms, - gen_smtp, jwt, mini_s3, nitro, opencensus, + cowboy,emq_dashboard,emqttc,enenra,envy,uuid,erlydtl, + gen_smtp, jwt, mini_s3, opencensus, qdate,rest,rfc3339,sh,stacktrace_compat, redbug]}, {sys_config, "./sys.config"}, {vm_args, "./vm.args"}, diff --git a/rebar.lock b/rebar.lock index d8809d78e4946f455ce63058c8b36d3e9afbeb65..5341cbdc68fa3a4741f1ef5405acdfc48ddbc02b 100644 --- a/rebar.lock +++ b/rebar.lock @@ -15,10 +15,6 @@ {git,"git://github.com/NYNJA-MC/bert.git", {ref,"a31041122d4e02b70f4603ae5c92f7be4164e84e"}}, 0}, - {<<"bpe">>, - {git,"git://github.com/NYNJA-MC/bpe", - {ref,"e1e877acc4784143a2d8f86cc99bda9118288350"}}, - 0}, {<<"certifi">>,{pkg,<<"certifi">>,<<"2.4.2">>},1}, {<<"cf">>,{pkg,<<"cf">>,<<"0.3.1">>},2}, {<<"counters">>, @@ -66,10 +62,6 @@ {git,"https://github.com/voxoz/esockd", {ref,"817a4f059698a349aac9037fc0600b3928036e3d"}}, 0}, - {<<"forms">>, - {git,"git://github.com/synrc/forms", - {ref,"845feb45a46dfc2e0e9a156da9c01c218d8fd6cc"}}, - 1}, {<<"fs">>, {git,"git://github.com/synrc/fs", {ref,"45ca2003b208f461ef4acd56c5fdecd2e98e1f33"}}, @@ -117,9 +109,9 @@ {ref,"6fd754fec1cd2e4577a5d7121233371900aa44e7"}}, 0}, {<<"kvs">>, - {git,"git://github.com/synrc/kvs", + {git,"git://github.com/NYNJA-MC/kvs", {ref,"c0f9bd4766c2751e8f7b1752f8eb949d17f01b87"}}, - 1}, + 0}, {<<"lager">>, {git,"git://github.com/voxoz/lager", {ref,"9d657ab5acc9e82354f945ef819dab3760fe63cb"}}, @@ -153,7 +145,7 @@ {<<"nitro">>, {git,"git://github.com/synrc/nitro", {ref,"1aeb421c332f94b135563f8e855b139c1b067f59"}}, - 1}, + 0}, {<<"opencensus">>, {git,"https://github.com/census-instrumentation/opencensus-erlang", {ref,"7fb276ff73d677c00458922c9180df634f45e018"}}, @@ -190,10 +182,6 @@ 2}, {<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.4">>},1}, {<<"stacktrace_compat">>,{pkg,<<"stacktrace_compat">>,<<"1.0.2">>},1}, - {<<"syn">>, - {git,"git://github.com/ostinelli/syn", - {ref,"9964eb8969b6e1e712249d3aed4f3dfafd3aaac3"}}, - 1}, {<<"unicode_util_compat">>,{pkg,<<"unicode_util_compat">>,<<"0.4.1">>},3}, {<<"uuid">>, {git,"https://github.com/avtobiff/erlang-uuid.git", diff --git a/sys.config b/sys.config index 5a628324391def40ebde38ead7c586edbc319d12..6a0bfb786acb760be84f8a6a849ed74ccc7375b5 100644 --- a/sys.config +++ b/sys.config @@ -100,7 +100,6 @@ {apns_cert_dir, "apns_certificates"}, {apns_binary_port, 2195}, {apns_http_port, 443}]}, - {job_delay, 60}, %% 1 mins {auth_ttl, 900}, %% 15 mins {auth_check_ip, false}, {get_sessions_api, true}, @@ -223,12 +222,9 @@ {busy_dist_port,true}]}]}, {esockd, [{logger, {error_logger, info}}]}, {kvs, [{dba,store_mnesia}, - {schema, [kvs_user, kvs_acl, kvs_feed, kvs_subscription, roster, micro, emqttd_kvs, bpe_metainfo]} + {schema, [kvs_user, kvs_acl, kvs_feed, kvs_subscription, roster, micro, emqttd_kvs]} %% ,{generation, {roster_test, limit}} %% ,{forbidding, {roster_test, forbid}} ]}, - {bpe,[{process_worker,{job,worker}} - ,{ttl, 8640000} - ]}, {locus, [{url, "http://geolite.maxmind.com/download/geoip/database/GeoLite2-City.tar.gz"}]} ]. diff --git a/test/nynja.erl b/test/nynja.erl index e49817458c97ada74c49a7c5823bf34ccbd9d61a..6b5289235c6e7f2a1a98128229865b55a82f150d 100644 --- a/test/nynja.erl +++ b/test/nynja.erl @@ -816,9 +816,12 @@ next_prefix() -> fetch_prefix(). init_prefix() -> + ensure_user(?prefix_user). + +ensure_user(Phone) -> Sys = nynja:connect_sys(), try - nynja:register_profile(Sys, ?prefix_user, false), + nynja:register_profile(Sys, Phone, false), {ok, fresh} catch error:{badmatch, #mqtt_packet{ payload = {io, {error, already_exist}, _} }} ->