diff --git a/apps/roster/include/api_bpe.hrl b/apps/roster/include/api_bpe.hrl new file mode 100644 index 0000000000000000000000000000000000000000..049988c462a829bd2c7a8c90bd79076d63f22cd6 --- /dev/null +++ b/apps/roster/include/api_bpe.hrl @@ -0,0 +1,17 @@ + +-ifndef(API_BPE_HRL). +-define(API_BPE_HRL, true). +-include_lib("bpe/include/bpe.hrl"). +-include("roster.hrl"). + + +-record('Job', { id=[] :: [] | integer(), + proc=[] :: [] | #process{} | binary(), + docs=[] :: [] | list(#join_application{} | #'Message'{}), + timeout=[] :: {integer(),{integer(),integer(),integer()}}, + events=[] :: list(#messageEvent{} | #boundaryEvent{} | #timeoutEvent{}), + status=[] :: [] }). + +%%-record(tour_list, {users=[]::list(#join_application{})}). + +-endif. diff --git a/apps/roster/include/roster.hrl b/apps/roster/include/roster.hrl index 10a787795001cde3501d548aacd1596ab1e92114..737ace8345a47ba3a654eb8febbe489c43623464 100644 --- a/apps/roster/include/roster.hrl +++ b/apps/roster/include/roster.hrl @@ -2,6 +2,7 @@ -define(ROSTER_HRL, true). -include_lib("kvs/include/kvs.hrl"). +%-include("api_bpe.hrl"). -record(chain, {?CONTAINER, aclver=[], unread={[],[]}}). diff --git a/apps/roster/src/processes/job.erl b/apps/roster/src/processes/job.erl new file mode 100644 index 0000000000000000000000000000000000000000..7a6a36c40063621350389d261d4b9edf60b1d971 --- /dev/null +++ b/apps/roster/src/processes/job.erl @@ -0,0 +1,66 @@ +-module(job). +-include("api_bpe.hrl"). +-include_lib("kvs/include/user.hrl"). +-compile(export_all). + +def() -> job_process:definition(). +def(T) -> job_process:definition(T). + + +action({request,'Init'}, Proc) -> + io:format("Action Init~n"), + {reply,Proc}; + +%%action({request,'Action'}, Proc) -> +%% Sending = bpe:doc({'Proc'},Proc), +%% io:format("Action start~n"), +%% case is_tuple(Sending ) of +%% true -> {reply,'Process',Proc}; +%% false -> {reply,'Update',Proc} end; + +action({request,'Stop'}, Proc) -> + + {reply,'Action',Proc}; + +action({event,Name,Payload},Proc)-> + io:format("Event Process~n"), + {reply,Proc}; + +action({request,'Action'}, #process{id=Id, notifications=C}=Proc) -> + io:format("Action Process~n"), + %ProcId=bpe:find_pid(Id), + SendEvent= bpe:doc({'Proc'},Proc), + case SendEvent of + {'Proc','Stop'} ->{reply,'Stop',Proc}; + {'Proc',Msg} -> +% n2o_ring:send(Msg), +%% {ClientId, Token} =roster_test:reg_fake_user(<<"1">>), +%% roster_test:start_client(ClientId , Token), + %roster_test:send(<<"bpe">>, Msg), + %emqttc:publish(C, roster:event_topic(<<>>, <<>>), term_to_binary(Msg), [{qos, 2}]), + roster:send_event(C, <<>>, <<>>, Msg), + io:format("Timeout ~p ~p ~n",[Id,Msg]), + {reply,'Final',Proc}; + + _ -> {reply,'Stop',Proc} + % roster_test:stop_client(ClientId), + end; + +action({request,'Final'}, #process{id=Id}=Proc) -> + io:format(" Finale~n"), + bpe:complete(Id), + {reply,Proc}. + +worker(#process{id=Id}=P) -> + case bpe:history(Id) of + [H|_] -> worker_do(calendar:time_difference(H#history.time,calendar:local_time()),P); + __ -> skip end. + +worker_do({Days,Time},P) when Days >= 14 -> skip; +worker_do({Days,Time},P) when P#process.task =:= 'Action' -> kvs:info(?MODULE,"BPE Start: ~p~n",[bpe:start(P,[])]); +worker_do({Days,Time},P) when P#process.task =:= 'Delay' -> kvs:info(?MODULE,"BPE Start: ~p~n",[bpe:start(P,[])]); +worker_do({Days,Time},P) when P#process.task =:= 'FirstDelay' -> kvs:info(?MODULE,"BPE Start: ~p~n",[bpe:start(P,[])]); +worker_do({Days,Time},P) when P#process.task =:= 'SecondDelay' -> kvs:info(?MODULE,"BPE Start: ~p~n",[bpe:start(P,[])]); +worker_do({Days,Time},P) -> skip. + + diff --git a/apps/roster/src/processes/job_process.erl b/apps/roster/src/processes/job_process.erl new file mode 100644 index 0000000000000000000000000000000000000000..98652feb561d6d049f55479faa4f0b766efd7080 --- /dev/null +++ b/apps/roster/src/processes/job_process.erl @@ -0,0 +1,74 @@ +-module(job_process). +-include("api_bpe.hrl"). +-compile(export_all). + +definition() -> + + #process { name = 'Action Delay', + + flows = [ + #sequenceFlow{source='Init', target='Action'}, + #sequenceFlow{source='Action', target='Update'}, + #sequenceFlow{source='Action', target='Process'}, + #sequenceFlow{source='Update', target='Action'}, + #sequenceFlow{source='Update', target='Process'}, + % #sequenceFlow{source='Delay', target='Process'}, + #sequenceFlow{source='Process', target='Final'} + ], + + tasks = [ + #userTask { name='Init', module = job }, + #serviceTask { name='Action', module = job }, + #userTask { name='Stop', module = job }, + %#serviceTask { name='Process', module = job }, + % #serviceTask { name='Delay', module = job }, + #endEvent { name='Final'} + ], + + beginEvent = 'Init', + endEvent = 'Final', + events = [ + #boundaryEvent{ name = '*', timeout={0,{0,30,0}} }, + #timeoutEvent{name='Process', timeout={0,{0,1,0}}, module=job} + ] + }. + +definition(#'Job'{timeout=T}) -> + + #process { name = 'Action Delay', + + flows = [ + #sequenceFlow{source='Init', target='Action'}, + #sequenceFlow{source='Action', target='Stop'}, + #sequenceFlow{source='Action', target='Final'}, + #sequenceFlow{source='Stop', target='Action'}, + #sequenceFlow{source='Stop', target='Final'} + % #sequenceFlow{source='Delay', target='Process'}, + % #sequenceFlow{source='Process', target='Final'} + ], + + tasks = [ + #userTask { name='Init', module = job }, + #serviceTask { name='Action', module = job }, + #userTask { name='Stop', module = job }, + %#serviceTask { name='Process', module = job }, + % #serviceTask { name='Delay', module = job }, + #endEvent { name='Final'} + ], + + beginEvent = 'Init', + endEvent = 'Final', + events = [ + #boundaryEvent{ name = '*', timeout={0,{0,30,0}} }, + #timeoutEvent{name='Action', timeout=T, module=job}, + #timeoutEvent{name='Final', timeout={0,{0,0,3}}, module=job} + + ] + }. + +update_event(Proc,Task,T)-> + %Proc=bpe:load(Id), + Events=lists:keystore(Task,#timeoutEvent.name,bpe:events(Proc),#timeoutEvent{name=Task, timeout=T}), + NewP=Proc#process{events = Events}, + kvs:put(NewP), NewP + . diff --git a/apps/roster/src/protocol/roster_bpe.erl b/apps/roster/src/protocol/roster_bpe.erl new file mode 100644 index 0000000000000000000000000000000000000000..63cce5fd7759ff743d1ef4cc04bd0d2dc3da1209 --- /dev/null +++ b/apps/roster/src/protocol/roster_bpe.erl @@ -0,0 +1,87 @@ +-module(roster_bpe). +-include("api_bpe.hrl"). +-include_lib("emqttd/include/emqttd.hrl"). +-include_lib("n2o/include/n2o.hrl"). +-compile({parse_transform, bert_javascript}). +-compile(export_all). + +start() -> n2o_async:start(#handler{module=?MODULE,class=system,group=roster,name=?MODULE,state=[]}). + +info(#'Job'{proc= <<"job">>,docs=D, timeout=T, status=create}=M,R,#cx{params = ClientId, client_pid = C} = S) -> + n2o:info(?MODULE, "create:~w",[M]), + {reply,{bert,{io,bpe:start(job:def(M),[{notification,C}]), <<>>}},R,S}; + +info(#'Job'{proc= <<"Duration">>,docs=D, timeout=T, status=create}=J,R,#cx{params = ClientId, client_pid = C} = S) -> + n2o:info(?MODULE, "create:~w",[J]), + %n2o_async:send(system, ?MODULE, {start, M}), + %Start={ok,Id}=bpe:start(job:def(J),[]), + n2o_async:pid(system,?MODULE) ! {start, J, R,S}, + %#'Job'{id=Id}}, + {reply,{bert, <<>>},R,S}; + +info(#'Job'{proc=Proc,docs=Docs, status=create}=M,R,S) -> + n2o:info(?MODULE, "create:~w",[M]), + {reply,{bert,{io,bpe:start(Proc,Docs), <<>>}},R,S}; + +info(#'Job'{id=Proc,docs=Docs, status=get}=M,R,S) -> + n2o:info(?MODULE, "GET:~w",[M]), + {reply,{bert,{io,Proc, <<>>}},R,S}; + + +info(#'Job'{id=Proc,docs=Docs, timeout=T, status=restart}=M,R,S) -> + n2o:info(?MODULE, "amend:~w",[M]), + bpe:find_pid(Proc) ! {'DOWN', <<>>, <<>>, <<>>, <<>>}, + P=bpe:load(Proc), NewP=job_process:update_event(P,'Action',T), bpe:start(NewP,[]), + {reply,{bert,{io,bpe:amend(Proc,Docs, noflow), <<>>}},R,S}; + +info(#'Job'{id=Proc, docs=Docs, status=update}=M,R,S) -> + %P=bpe:load(Proc), + n2o:info(?MODULE, "UPDATE:~w",[M]), + {reply,{bert,{io,bpe:amend(Proc,Docs,noflow), <<>>}},R,S}; + + +info(#'Job'{id=Proc, events=Events, status=event}=M,R,S) -> + n2o:info(?MODULE, "proc:~w",[M]), + {reply,{bert,{io,bpe:event(Proc, Events), <<>>}},R,S}; + +info(#'Job'{id=Proc, status=history}=M,R,S) -> + n2o:info(?MODULE, "hist:~w",[M]), + {reply,{bert,{io,bpe:history(Proc), <<>>}},R,S}; + +info(#'Job'{id=Proc, status=proc}=M,R,S) -> + n2o:info(?MODULE, "proc:~w",[M]), + {reply,{bert,{io,bpe:process(Proc), <<>>}},R,S}; + +info(#'Job'{id=Proc, status=complete}=M,R,S) -> + n2o:info(?MODULE, "complete:~w",[M]), + {reply,{bert,{io,bpe:complete(Proc), <<>>}},R,S}; + +info(M,R,S) -> n2o:info(?MODULE, "UNKNOWN:~w",[M]), {unknown,M,R,S}. + +proc(init,#handler{name=roster_bpe} = Async) -> + {ok, C} = emqttc:start_link([{client_id, <<"sys_bpe">>},{logger, {console, error}},{reconnect, 5}]), + n2o:info(?MODULE, "ASYNC BPE started: ~w",[C]), + {ok,Async#handler{state=C,seq=0}}; + +proc({start,#'Job'{ docs=Docs}=J, R, #cx{params = ClientId}=State}, #handler{state=C,seq=S}=H) -> + n2o:info(?MODULE, "BPE PROC started",[]), +%% Proc=bpe:load(Id), +%% bpe_proc:init(NewP=Proc#process{options =[{notification, C}], notifications = C}), +%% %bpe:option(Proc, notification, C), +%% %bpe:cache({process,Id}, undefined), +%% kvs:put(NewP), + %bpe:cache({process,Id},NewP), + Start={ok,Id}=bpe:start(job:def(J),[{notification, C}]), + %info(J#'Job'{id=Id, docs=Docs, status=get},R,State) , + roster:send_action(C, ClientId,J#'Job'{id=Id, status=get}), + %#complete{id=Task}=bpe:complete(Id), + bpe:amend(Id,Docs), + {reply,[], H}; + + +proc({mqttc, C, connected}, State=#handler{state=C,seq=S}) -> {ok, State#handler{seq = S+1}}; +proc({mqttc, _C, disconnected}, State) -> {ok, State}. + + + + diff --git a/apps/roster/src/protocol/roster_message.erl b/apps/roster/src/protocol/roster_message.erl index 87dd1fad4330d4d0d853c68372108ecf66462369..72f5fc1392020c7496e0706c59a91d423073a6f9 100644 --- a/apps/roster/src/protocol/roster_message.erl +++ b/apps/roster/src/protocol/roster_message.erl @@ -1,7 +1,7 @@ -module(roster_message). -include("roster.hrl"). -include_lib("n2o/include/n2o.hrl"). -%-include_lib("kvs/include/kvs.hrl"). +-include_lib("kvs/include/kvs.hrl"). -compile(export_all). start() -> n2o_async:start(#handler{module=?MODULE,class=system,group=roster,name=?MODULE,state=[]}). diff --git a/apps/roster/src/protocol/roster_search.erl b/apps/roster/src/protocol/roster_search.erl index be37ff18cc586b09970ea1ffc6a5c00fe66aed7d..654cc751f3e744a64403fdc828cac66515d0eb69 100644 --- a/apps/roster/src/protocol/roster_search.erl +++ b/apps/roster/src/protocol/roster_search.erl @@ -60,7 +60,7 @@ index_next(R,K,Current) -> ok; Next -> [ #'Index'{id = {K,V}}] = mnesia:dirty_read({R, Next}), - catch ets:insert(K, [{binary_to_list(V)}]), + catch ets:insert(K, [{binary_to_list(V)}]), %% TODO index_next(R,K,Next), ok end. \ No newline at end of file diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index 094c81c558c43ba4a4345c46976504d11cfb3118..a3ee7dc7072fded30d3bc75923d1abceb7007664 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -2,6 +2,7 @@ -behaviour(application). -compile(export_all). -export([start/2, stop/1, init/1]). +-include("api_bpe.hrl"). -include("roster.hrl"). -include_lib("n2o/include/n2o.hrl"). -include_lib("kvs/include/metainfo.hrl"). @@ -11,8 +12,8 @@ log_level() -> info. log_modules() -> [amazon_api, vox_api, telesign_api, roster_auth, roster_friend, roster_test, signup, n2o, dashboard, - roster_message, roster_profile, roster_presence, roster_roster, roster_room, email_api, roster_search, kvs_stream, - telesign_api, stream, n2o_auth, n2o_vnode, macbert_javascript]. + roster_message, roster_profile, roster_presence, roster_roster, roster_room, email_api, roster_search, + telesign_api, stream, n2o_auth, n2o_vnode, macbert_javascript, roster_bpe]. storage_init() -> [ ets:new(X,n2o:opt()) || X <- [ system ] ]. @@ -29,6 +30,7 @@ start(_, _) -> atoms(), catch load([]), roster_profile:start(), roster_search:start(), n2o_async:start(#handler{module=roster_auth,class=system,group=roster,name=roster_auth,state=[]}), + roster_bpe:start(), X. stop(_) -> unload(), emqttd_access_control:unregister_mod(auth, n2o_auth), emqttd_access_control:unregister_mod(auth, roster_auth),ok. diff --git a/apps/roster/src/roster_proto.erl b/apps/roster/src/roster_proto.erl index 16520f88a1684d4ffdc8b8dac14f0ba10b9d62c1..310daa9ae4091867287d982c3b1f3553ac380e31 100644 --- a/apps/roster/src/roster_proto.erl +++ b/apps/roster/src/roster_proto.erl @@ -1,15 +1,18 @@ -module(roster_proto). -include("roster.hrl"). +-include("api_bpe.hrl"). -include("test.hrl"). -include_lib("n2o/include/n2o.hrl"). -compile({parse_transform, bert_javascript}). -compile({parse_transform, bert_swift}). -export([info/3]). +info(#'Job'{} = Job, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_bpe :info(Job, Req, State); info(#'Test'{} = Test, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_test :info(Test, Req, State); info(#'Typing'{} = Typing, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_message :info(Typing, Req, State); info(#'Cursor'{} = Cursor, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_message :info(Cursor, Req, State); info(#'Message'{} = Message, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_message :info(Message, Req, State); +info(#'Message'{} = Message, Req, #cx{params = <<"sys_",_/binary>>}=State) -> roster_message :info(Message, Req, State); info(#'History'{} = History, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_message :info(History, Req, State); info(#'Profile'{} = Profile, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_profile :info(Profile, Req, State); info(#'Roster'{} = Roster, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_roster :info(Roster, Req, State); @@ -19,6 +22,7 @@ info(#'Room'{} = Room, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> info(#'Member'{} = Member, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_room :info(Member, Req, State); info(#'Auth'{} = Auth, Req, State) -> roster_auth :info(Auth, Req, State); + info(Message,Req,State) -> n2o:info(?MODULE, "UNKNOWN: ~p~n",[Message]), {unknown,Message,Req,State}. diff --git a/apps/roster/src/roster_test.erl b/apps/roster/src/roster_test.erl index 6ed2bceb6e9c9a9c2920ea0875b73ff2ba7cbe6e..a8cf6b724926d254498ebac0bfbd0e7ce768b265 100644 --- a/apps/roster/src/roster_test.erl +++ b/apps/roster/src/roster_test.erl @@ -2,6 +2,7 @@ -compile(export_all). -include_lib("n2o/include/n2o.hrl"). -include("roster.hrl"). +-include("api_bpe.hrl"). -include("test.hrl"). -define(LOC, "127.0.0.1"). -define(HOST, ?LOC). @@ -63,7 +64,7 @@ proc(Term, #handler{state=#state{mqttc=C}}=H) when is_record(Term, 'Auth'); is_record(Term, 'Message'); is_record(Term, 'History'); is_record(Term, 'Profile'); is_record(Term, 'Person'); is_record(Term, 'Friend'); is_record(Term, 'Cursor'); is_record(Term, 'Search'); is_record(Term, 'Roster'); - is_record(Term, 'Room'); is_record(Term,'Member') -> + is_record(Term, 'Room'); is_record(Term,'Member'); is_record(Term,'Job')-> roster:send_event(C, <<>>, <<>>, Term), {reply, [], H}; @@ -412,10 +413,11 @@ receive_test(ClientId, Term, Counter, Timeout) -> {#'Cursor'{}, #'Room'{}} -> receive_drop(), R; {#'Message'{status = client}, #'Message'{status = sent}} when Counter>0 -> receive_test(ClientId, Term, Counter-1); {#'Message'{status = client}, #'Message'{status = sent}} -> R; + {#'Job'{}, _} -> R; {#'Message'{status = client}, _} -> receive_test(ClientId, Term, Counter); {_, _} when Counter>0 -> n2o:info(?MODULE, "Received:~w~n", [R]), receive_test(ClientId, Term, Counter-1); - _ -> R end + _ -> n2o:info(?MODULE, "Received:~w~n", [R]), R end after Timeout -> stop_client(ClientId), throw({error, {timeout, Term}}) @@ -534,4 +536,29 @@ test_migrate() -> Arity = record_info(size, 'MigresiaTest'), Arity = tuple_size(element(2, kvs:get('MigresiaTest', 1))), ok; Res -> n2o:info(?MODULE, "~p = migresia:check(roster)", [Res]), - {error, test_migration} end. \ No newline at end of file + {error, test_migration} end. + + +bpe() -> + PhoneA = <<"773">>, + Phones = [<<"5333">>, <<"377">>], + [roster:purge_user(Phone) || Phone<-[PhoneA|Phones]], + [{B, BClientId, _}, {C, CClientId, _}, {A,AClientId, TA}] = + [ begin C=gen_name_reg(Phone), + {ClientId, Token} = reg_fake_user(Phone), + receive_drop(), stop_client(C), + start_cli_receive(ClientId, Token), + [{PhoneId, _,_}|_] = roster:list_rosters(Phone), {PhoneId, ClientId, Token} + end || Phone <- Phones ++ [PhoneA] ], + %{ClientId, Token} = reg_fake_user(Phone), + %start_client(ClientId, Token), + %receive_drop(), + timer:sleep(1000), + #'Job'{id=Id}=send_sync(AClientId, #'Job'{proc = <<"Duration">>, docs =[{'Proc',#'Message'{from =A, to=B, status = sent}}], timeout ={0,{0,2,0}} , status=create }), + timer:sleep(2000), + #io{code=#complete{ },data=D} = send_sync(AClientId, #'Job'{id = Id, docs =[{'Proc','Stop'}], timeout ={0,{0,3,0}} , status=update}), + % send(AClientId, #'Job'{proc=Id, status=update }), + timer:sleep(2000), + send_sync(AClientId, #'Job'{id = Id, docs =[{'Proc',#'Message'{from =A, to=B, status = sent}}], timeout ={0,{0,1,0}} , status=restart}), + [ stop_client(gen_name(Phone)) || Phone <- Phones ++[A] ]. + diff --git a/rebar.config b/rebar.config index f57b57c8c1b15e46eda4167b34c404a72f89c4f9..768b6b483f960c93052aa2717b1fe2895e98416a 100644 --- a/rebar.config +++ b/rebar.config @@ -23,6 +23,7 @@ {base64url,".*",{git,"https://github.com/dvv/base64url.git","v1.0"}}, {rest,".*", {git, "git://github.com/synrc/rest",[]}}, {gen_smtp,".*", {git, "git://github.com/ne-luboff/gen_smtp",[]}}, - {migresia,".*", {git, "https://github.com/yoonka/migresia.git",[]}} + {migresia,".*", {git, "https://github.com/yoonka/migresia.git",[]}}, + {bpe,".*", {git, "git://github.com/spawnproc/bpe.git",[]}} ]}. {erl_opts, [{parse_transform,lager_transform},debug_info]}. diff --git a/sys.config b/sys.config index 2a06c476075aebf7dcb4d76ce886fbf55108af50..8fe3b6b1454307fc08b6f16246e98f65c8a0b62c 100644 --- a/sys.config +++ b/sys.config @@ -115,8 +115,9 @@ {busy_port,false}, {busy_dist_port,true}]}]}, {kvs, [{dba,store_mnesia}, - {schema, [kvs_user, kvs_acl, kvs_feed, kvs_subscription, roster, emqttd_kvs]} + {schema, [kvs_user, kvs_acl, kvs_feed, kvs_subscription, roster, emqttd_kvs, bpe_metainfo]} %% ,{generation, {roster_test, limit}} %% ,{forbidding, {roster_test, forbid}} - ]} + ]}, + {bpe,[{process_worker,{job,worker}}]} ].