From 5a35bf74809b6eadbd3b068241604b65b13957d1 Mon Sep 17 00:00:00 2001 From: qomputer Date: Tue, 24 Oct 2017 17:12:43 +0300 Subject: [PATCH 1/7] Add bpe --- apps/roster/include/roster.hrl | 1 + apps/roster/src/protocol/roster_search.erl | 2 +- rebar.config | 1 + sys.config | 2 +- 4 files changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/roster/include/roster.hrl b/apps/roster/include/roster.hrl index 10a787795..4774144f5 100644 --- a/apps/roster/include/roster.hrl +++ b/apps/roster/include/roster.hrl @@ -2,6 +2,7 @@ -define(ROSTER_HRL, true). -include_lib("kvs/include/kvs.hrl"). +-include("api_bpe.hrl"). -record(chain, {?CONTAINER, aclver=[], unread={[],[]}}). diff --git a/apps/roster/src/protocol/roster_search.erl b/apps/roster/src/protocol/roster_search.erl index be37ff18c..654cc751f 100644 --- a/apps/roster/src/protocol/roster_search.erl +++ b/apps/roster/src/protocol/roster_search.erl @@ -60,7 +60,7 @@ index_next(R,K,Current) -> ok; Next -> [ #'Index'{id = {K,V}}] = mnesia:dirty_read({R, Next}), - catch ets:insert(K, [{binary_to_list(V)}]), + catch ets:insert(K, [{binary_to_list(V)}]), %% TODO index_next(R,K,Next), ok end. \ No newline at end of file diff --git a/rebar.config b/rebar.config index f57b57c8c..1dec3568e 100644 --- a/rebar.config +++ b/rebar.config @@ -24,5 +24,6 @@ {rest,".*", {git, "git://github.com/synrc/rest",[]}}, {gen_smtp,".*", {git, "git://github.com/ne-luboff/gen_smtp",[]}}, {migresia,".*", {git, "https://github.com/yoonka/migresia.git",[]}} + {bpe,".*", {git, "git://github.com/spawnproc/bpe.git",[]}} ]}. {erl_opts, [{parse_transform,lager_transform},debug_info]}. diff --git a/sys.config b/sys.config index 2a06c4760..4f947645e 100644 --- a/sys.config +++ b/sys.config @@ -115,7 +115,7 @@ {busy_port,false}, {busy_dist_port,true}]}]}, {kvs, [{dba,store_mnesia}, - {schema, [kvs_user, kvs_acl, kvs_feed, kvs_subscription, roster, emqttd_kvs]} + {schema, [kvs_user, kvs_acl, kvs_feed, kvs_subscription, roster, emqttd_kvs, bpe_metainfo]} %% ,{generation, {roster_test, limit}} %% ,{forbidding, {roster_test, forbid}} ]} -- GitLab From c65b87e72d1c0ac27eb5cf928f81896767415e31 Mon Sep 17 00:00:00 2001 From: qomputer Date: Tue, 24 Oct 2017 17:15:47 +0300 Subject: [PATCH 2/7] Fix rebar.config --- rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rebar.config b/rebar.config index 1dec3568e..768b6b483 100644 --- a/rebar.config +++ b/rebar.config @@ -23,7 +23,7 @@ {base64url,".*",{git,"https://github.com/dvv/base64url.git","v1.0"}}, {rest,".*", {git, "git://github.com/synrc/rest",[]}}, {gen_smtp,".*", {git, "git://github.com/ne-luboff/gen_smtp",[]}}, - {migresia,".*", {git, "https://github.com/yoonka/migresia.git",[]}} + {migresia,".*", {git, "https://github.com/yoonka/migresia.git",[]}}, {bpe,".*", {git, "git://github.com/spawnproc/bpe.git",[]}} ]}. {erl_opts, [{parse_transform,lager_transform},debug_info]}. -- GitLab From af1c5556651e53c592bea07c9225f5dc8a99b688 Mon Sep 17 00:00:00 2001 From: qomputer Date: Thu, 26 Oct 2017 11:13:23 +0300 Subject: [PATCH 3/7] Fix sys.config --- rebar.config | 2 +- sys.config | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/rebar.config b/rebar.config index 768b6b483..dfae3639c 100644 --- a/rebar.config +++ b/rebar.config @@ -16,7 +16,7 @@ {rest,".*", {git, "git://github.com/synrc/rest",[]}}, {jsone,".*", {git, "git://github.com/sile/jsone.git",[]}}, {review,".*", {git, "git://github.com/synrc/review",[]}}, - {spa,".*",{git,"git@github.com:NYNJA-MC/SPA.git",[]}}, +% {spa,".*",{git,"git@github.com:NYNJA-MC/SPA.git",[]}}, {mini_s3,".*",{git,"https://github.com/chef/mini_s3.git",[]}}, {jwt,".*",{git,"https://github.com/artemeff/jwt.git",{tag, "0.1.0"}}}, {jsx,".*",{git,"https://github.com/talentdeficit/jsx.git","v2.8.0"}}, diff --git a/sys.config b/sys.config index 4f947645e..8fe3b6b14 100644 --- a/sys.config +++ b/sys.config @@ -118,5 +118,6 @@ {schema, [kvs_user, kvs_acl, kvs_feed, kvs_subscription, roster, emqttd_kvs, bpe_metainfo]} %% ,{generation, {roster_test, limit}} %% ,{forbidding, {roster_test, forbid}} - ]} + ]}, + {bpe,[{process_worker,{job,worker}}]} ]. -- GitLab From 33f358903523d50ac498f3e762fbfe6876a03dba Mon Sep 17 00:00:00 2001 From: qomputer Date: Fri, 27 Oct 2017 12:44:22 +0300 Subject: [PATCH 4/7] Add api_bpe.hrl --- apps/roster/include/roster.hrl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/roster/include/roster.hrl b/apps/roster/include/roster.hrl index 4774144f5..737ace834 100644 --- a/apps/roster/include/roster.hrl +++ b/apps/roster/include/roster.hrl @@ -2,7 +2,7 @@ -define(ROSTER_HRL, true). -include_lib("kvs/include/kvs.hrl"). --include("api_bpe.hrl"). +%-include("api_bpe.hrl"). -record(chain, {?CONTAINER, aclver=[], unread={[],[]}}). -- GitLab From 8c2779e7ec10fe53e6d91d9930f7616ea3dde64d Mon Sep 17 00:00:00 2001 From: qomputer Date: Thu, 2 Nov 2017 00:52:59 +0200 Subject: [PATCH 5/7] Add the basis for realization of Job-protocol --- apps/roster/src/protocol/roster_message.erl | 2 +- apps/roster/src/roster.erl | 6 ++-- apps/roster/src/roster_proto.erl | 4 +++ apps/roster/src/roster_test.erl | 33 +++++++++++++++++++-- rebar.config | 2 +- 5 files changed, 40 insertions(+), 7 deletions(-) diff --git a/apps/roster/src/protocol/roster_message.erl b/apps/roster/src/protocol/roster_message.erl index 87dd1fad4..72f5fc139 100644 --- a/apps/roster/src/protocol/roster_message.erl +++ b/apps/roster/src/protocol/roster_message.erl @@ -1,7 +1,7 @@ -module(roster_message). -include("roster.hrl"). -include_lib("n2o/include/n2o.hrl"). -%-include_lib("kvs/include/kvs.hrl"). +-include_lib("kvs/include/kvs.hrl"). -compile(export_all). start() -> n2o_async:start(#handler{module=?MODULE,class=system,group=roster,name=?MODULE,state=[]}). diff --git a/apps/roster/src/roster.erl b/apps/roster/src/roster.erl index 094c81c55..a3ee7dc70 100644 --- a/apps/roster/src/roster.erl +++ b/apps/roster/src/roster.erl @@ -2,6 +2,7 @@ -behaviour(application). -compile(export_all). -export([start/2, stop/1, init/1]). +-include("api_bpe.hrl"). -include("roster.hrl"). -include_lib("n2o/include/n2o.hrl"). -include_lib("kvs/include/metainfo.hrl"). @@ -11,8 +12,8 @@ log_level() -> info. log_modules() -> [amazon_api, vox_api, telesign_api, roster_auth, roster_friend, roster_test, signup, n2o, dashboard, - roster_message, roster_profile, roster_presence, roster_roster, roster_room, email_api, roster_search, kvs_stream, - telesign_api, stream, n2o_auth, n2o_vnode, macbert_javascript]. + roster_message, roster_profile, roster_presence, roster_roster, roster_room, email_api, roster_search, + telesign_api, stream, n2o_auth, n2o_vnode, macbert_javascript, roster_bpe]. storage_init() -> [ ets:new(X,n2o:opt()) || X <- [ system ] ]. @@ -29,6 +30,7 @@ start(_, _) -> atoms(), catch load([]), roster_profile:start(), roster_search:start(), n2o_async:start(#handler{module=roster_auth,class=system,group=roster,name=roster_auth,state=[]}), + roster_bpe:start(), X. stop(_) -> unload(), emqttd_access_control:unregister_mod(auth, n2o_auth), emqttd_access_control:unregister_mod(auth, roster_auth),ok. diff --git a/apps/roster/src/roster_proto.erl b/apps/roster/src/roster_proto.erl index 16520f88a..310daa9ae 100644 --- a/apps/roster/src/roster_proto.erl +++ b/apps/roster/src/roster_proto.erl @@ -1,15 +1,18 @@ -module(roster_proto). -include("roster.hrl"). +-include("api_bpe.hrl"). -include("test.hrl"). -include_lib("n2o/include/n2o.hrl"). -compile({parse_transform, bert_javascript}). -compile({parse_transform, bert_swift}). -export([info/3]). +info(#'Job'{} = Job, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_bpe :info(Job, Req, State); info(#'Test'{} = Test, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_test :info(Test, Req, State); info(#'Typing'{} = Typing, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_message :info(Typing, Req, State); info(#'Cursor'{} = Cursor, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_message :info(Cursor, Req, State); info(#'Message'{} = Message, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_message :info(Message, Req, State); +info(#'Message'{} = Message, Req, #cx{params = <<"sys_",_/binary>>}=State) -> roster_message :info(Message, Req, State); info(#'History'{} = History, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_message :info(History, Req, State); info(#'Profile'{} = Profile, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_profile :info(Profile, Req, State); info(#'Roster'{} = Roster, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_roster :info(Roster, Req, State); @@ -19,6 +22,7 @@ info(#'Room'{} = Room, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> info(#'Member'{} = Member, Req, #cx{params = <<"emqttd_",_/binary>>}=State) -> roster_room :info(Member, Req, State); info(#'Auth'{} = Auth, Req, State) -> roster_auth :info(Auth, Req, State); + info(Message,Req,State) -> n2o:info(?MODULE, "UNKNOWN: ~p~n",[Message]), {unknown,Message,Req,State}. diff --git a/apps/roster/src/roster_test.erl b/apps/roster/src/roster_test.erl index 6ed2bceb6..1bc157ec8 100644 --- a/apps/roster/src/roster_test.erl +++ b/apps/roster/src/roster_test.erl @@ -2,6 +2,7 @@ -compile(export_all). -include_lib("n2o/include/n2o.hrl"). -include("roster.hrl"). +-include("api_bpe.hrl"). -include("test.hrl"). -define(LOC, "127.0.0.1"). -define(HOST, ?LOC). @@ -63,7 +64,7 @@ proc(Term, #handler{state=#state{mqttc=C}}=H) when is_record(Term, 'Auth'); is_record(Term, 'Message'); is_record(Term, 'History'); is_record(Term, 'Profile'); is_record(Term, 'Person'); is_record(Term, 'Friend'); is_record(Term, 'Cursor'); is_record(Term, 'Search'); is_record(Term, 'Roster'); - is_record(Term, 'Room'); is_record(Term,'Member') -> + is_record(Term, 'Room'); is_record(Term,'Member'); is_record(Term,'Job')-> roster:send_event(C, <<>>, <<>>, Term), {reply, [], H}; @@ -412,10 +413,11 @@ receive_test(ClientId, Term, Counter, Timeout) -> {#'Cursor'{}, #'Room'{}} -> receive_drop(), R; {#'Message'{status = client}, #'Message'{status = sent}} when Counter>0 -> receive_test(ClientId, Term, Counter-1); {#'Message'{status = client}, #'Message'{status = sent}} -> R; + {#'Job'{}, _} -> R; {#'Message'{status = client}, _} -> receive_test(ClientId, Term, Counter); {_, _} when Counter>0 -> n2o:info(?MODULE, "Received:~w~n", [R]), receive_test(ClientId, Term, Counter-1); - _ -> R end + _ -> n2o:info(?MODULE, "Received:~w~n", [R]), R end after Timeout -> stop_client(ClientId), throw({error, {timeout, Term}}) @@ -534,4 +536,29 @@ test_migrate() -> Arity = record_info(size, 'MigresiaTest'), Arity = tuple_size(element(2, kvs:get('MigresiaTest', 1))), ok; Res -> n2o:info(?MODULE, "~p = migresia:check(roster)", [Res]), - {error, test_migration} end. \ No newline at end of file + {error, test_migration} end. + + +bpe() -> + PhoneA = <<"773">>, + Phones = [<<"5333">>, <<"377">>], + [roster:purge_user(Phone) || Phone<-[PhoneA|Phones]], + [{B, BClientId, _}, {C, CClientId, _}, {A,AClientId, TA}] = + [ begin C=gen_name_reg(Phone), + {ClientId, Token} = reg_fake_user(Phone), + receive_drop(), stop_client(C), + start_cli_receive(ClientId, Token), + [{PhoneId, _,_}|_] = roster:list_rosters(Phone), {PhoneId, ClientId, Token} + end || Phone <- Phones ++ [PhoneA] ], + %{ClientId, Token} = reg_fake_user(Phone), + %start_client(ClientId, Token), + %receive_drop(), + timer:sleep(1000), + #'Job'{id=Id}=send_sync(AClientId, #'Job'{proc = <<"Duration">>, docs =[{'Proc',#'Message'{from =A, to=B, status = sent}}], timeout ={0,{0,2,0}} , status=create }), + timer:sleep(2000), + #io{code=#complete{ },data=D} = send_sync(AClientId, #'Job'{id = Id, docs =[{'Proc','Stop'}], timeout ={0,{0,3,0}} , status=update}), + % send(AClientId, #'Job'{proc=Id, status=update }), + timer:sleep(20000), + %roster_test:send_sync(roster_test:gen_name(<<"773">>), #'Job'{id = Id, docs =[{'Proc',#'Message'{from =A, to=B, status = sent}}], timeout ={0,{0,3,0}} , status=amend}), + [ stop_client(gen_name(Phone)) || Phone <- Phones ++[A] ]. + diff --git a/rebar.config b/rebar.config index dfae3639c..768b6b483 100644 --- a/rebar.config +++ b/rebar.config @@ -16,7 +16,7 @@ {rest,".*", {git, "git://github.com/synrc/rest",[]}}, {jsone,".*", {git, "git://github.com/sile/jsone.git",[]}}, {review,".*", {git, "git://github.com/synrc/review",[]}}, -% {spa,".*",{git,"git@github.com:NYNJA-MC/SPA.git",[]}}, + {spa,".*",{git,"git@github.com:NYNJA-MC/SPA.git",[]}}, {mini_s3,".*",{git,"https://github.com/chef/mini_s3.git",[]}}, {jwt,".*",{git,"https://github.com/artemeff/jwt.git",{tag, "0.1.0"}}}, {jsx,".*",{git,"https://github.com/talentdeficit/jsx.git","v2.8.0"}}, -- GitLab From fbabf929a57c8d79aa5e56611367ab67c903b8da Mon Sep 17 00:00:00 2001 From: qomputer Date: Thu, 2 Nov 2017 12:43:31 +0200 Subject: [PATCH 6/7] Fix roster_test:bpe --- apps/roster/src/roster_test.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/roster/src/roster_test.erl b/apps/roster/src/roster_test.erl index 1bc157ec8..a8cf6b724 100644 --- a/apps/roster/src/roster_test.erl +++ b/apps/roster/src/roster_test.erl @@ -558,7 +558,7 @@ bpe() -> timer:sleep(2000), #io{code=#complete{ },data=D} = send_sync(AClientId, #'Job'{id = Id, docs =[{'Proc','Stop'}], timeout ={0,{0,3,0}} , status=update}), % send(AClientId, #'Job'{proc=Id, status=update }), - timer:sleep(20000), - %roster_test:send_sync(roster_test:gen_name(<<"773">>), #'Job'{id = Id, docs =[{'Proc',#'Message'{from =A, to=B, status = sent}}], timeout ={0,{0,3,0}} , status=amend}), + timer:sleep(2000), + send_sync(AClientId, #'Job'{id = Id, docs =[{'Proc',#'Message'{from =A, to=B, status = sent}}], timeout ={0,{0,1,0}} , status=restart}), [ stop_client(gen_name(Phone)) || Phone <- Phones ++[A] ]. -- GitLab From 556cb60ef187d0e05100cb59fa37cab8a7ba63cd Mon Sep 17 00:00:00 2001 From: qomputer Date: Thu, 2 Nov 2017 12:46:27 +0200 Subject: [PATCH 7/7] Add processes and roster_bpe --- apps/roster/include/api_bpe.hrl | 17 +++++ apps/roster/src/processes/job.erl | 66 +++++++++++++++++ apps/roster/src/processes/job_process.erl | 74 +++++++++++++++++++ apps/roster/src/protocol/roster_bpe.erl | 87 +++++++++++++++++++++++ 4 files changed, 244 insertions(+) create mode 100644 apps/roster/include/api_bpe.hrl create mode 100644 apps/roster/src/processes/job.erl create mode 100644 apps/roster/src/processes/job_process.erl create mode 100644 apps/roster/src/protocol/roster_bpe.erl diff --git a/apps/roster/include/api_bpe.hrl b/apps/roster/include/api_bpe.hrl new file mode 100644 index 000000000..049988c46 --- /dev/null +++ b/apps/roster/include/api_bpe.hrl @@ -0,0 +1,17 @@ + +-ifndef(API_BPE_HRL). +-define(API_BPE_HRL, true). +-include_lib("bpe/include/bpe.hrl"). +-include("roster.hrl"). + + +-record('Job', { id=[] :: [] | integer(), + proc=[] :: [] | #process{} | binary(), + docs=[] :: [] | list(#join_application{} | #'Message'{}), + timeout=[] :: {integer(),{integer(),integer(),integer()}}, + events=[] :: list(#messageEvent{} | #boundaryEvent{} | #timeoutEvent{}), + status=[] :: [] }). + +%%-record(tour_list, {users=[]::list(#join_application{})}). + +-endif. diff --git a/apps/roster/src/processes/job.erl b/apps/roster/src/processes/job.erl new file mode 100644 index 000000000..7a6a36c40 --- /dev/null +++ b/apps/roster/src/processes/job.erl @@ -0,0 +1,66 @@ +-module(job). +-include("api_bpe.hrl"). +-include_lib("kvs/include/user.hrl"). +-compile(export_all). + +def() -> job_process:definition(). +def(T) -> job_process:definition(T). + + +action({request,'Init'}, Proc) -> + io:format("Action Init~n"), + {reply,Proc}; + +%%action({request,'Action'}, Proc) -> +%% Sending = bpe:doc({'Proc'},Proc), +%% io:format("Action start~n"), +%% case is_tuple(Sending ) of +%% true -> {reply,'Process',Proc}; +%% false -> {reply,'Update',Proc} end; + +action({request,'Stop'}, Proc) -> + + {reply,'Action',Proc}; + +action({event,Name,Payload},Proc)-> + io:format("Event Process~n"), + {reply,Proc}; + +action({request,'Action'}, #process{id=Id, notifications=C}=Proc) -> + io:format("Action Process~n"), + %ProcId=bpe:find_pid(Id), + SendEvent= bpe:doc({'Proc'},Proc), + case SendEvent of + {'Proc','Stop'} ->{reply,'Stop',Proc}; + {'Proc',Msg} -> +% n2o_ring:send(Msg), +%% {ClientId, Token} =roster_test:reg_fake_user(<<"1">>), +%% roster_test:start_client(ClientId , Token), + %roster_test:send(<<"bpe">>, Msg), + %emqttc:publish(C, roster:event_topic(<<>>, <<>>), term_to_binary(Msg), [{qos, 2}]), + roster:send_event(C, <<>>, <<>>, Msg), + io:format("Timeout ~p ~p ~n",[Id,Msg]), + {reply,'Final',Proc}; + + _ -> {reply,'Stop',Proc} + % roster_test:stop_client(ClientId), + end; + +action({request,'Final'}, #process{id=Id}=Proc) -> + io:format(" Finale~n"), + bpe:complete(Id), + {reply,Proc}. + +worker(#process{id=Id}=P) -> + case bpe:history(Id) of + [H|_] -> worker_do(calendar:time_difference(H#history.time,calendar:local_time()),P); + __ -> skip end. + +worker_do({Days,Time},P) when Days >= 14 -> skip; +worker_do({Days,Time},P) when P#process.task =:= 'Action' -> kvs:info(?MODULE,"BPE Start: ~p~n",[bpe:start(P,[])]); +worker_do({Days,Time},P) when P#process.task =:= 'Delay' -> kvs:info(?MODULE,"BPE Start: ~p~n",[bpe:start(P,[])]); +worker_do({Days,Time},P) when P#process.task =:= 'FirstDelay' -> kvs:info(?MODULE,"BPE Start: ~p~n",[bpe:start(P,[])]); +worker_do({Days,Time},P) when P#process.task =:= 'SecondDelay' -> kvs:info(?MODULE,"BPE Start: ~p~n",[bpe:start(P,[])]); +worker_do({Days,Time},P) -> skip. + + diff --git a/apps/roster/src/processes/job_process.erl b/apps/roster/src/processes/job_process.erl new file mode 100644 index 000000000..98652feb5 --- /dev/null +++ b/apps/roster/src/processes/job_process.erl @@ -0,0 +1,74 @@ +-module(job_process). +-include("api_bpe.hrl"). +-compile(export_all). + +definition() -> + + #process { name = 'Action Delay', + + flows = [ + #sequenceFlow{source='Init', target='Action'}, + #sequenceFlow{source='Action', target='Update'}, + #sequenceFlow{source='Action', target='Process'}, + #sequenceFlow{source='Update', target='Action'}, + #sequenceFlow{source='Update', target='Process'}, + % #sequenceFlow{source='Delay', target='Process'}, + #sequenceFlow{source='Process', target='Final'} + ], + + tasks = [ + #userTask { name='Init', module = job }, + #serviceTask { name='Action', module = job }, + #userTask { name='Stop', module = job }, + %#serviceTask { name='Process', module = job }, + % #serviceTask { name='Delay', module = job }, + #endEvent { name='Final'} + ], + + beginEvent = 'Init', + endEvent = 'Final', + events = [ + #boundaryEvent{ name = '*', timeout={0,{0,30,0}} }, + #timeoutEvent{name='Process', timeout={0,{0,1,0}}, module=job} + ] + }. + +definition(#'Job'{timeout=T}) -> + + #process { name = 'Action Delay', + + flows = [ + #sequenceFlow{source='Init', target='Action'}, + #sequenceFlow{source='Action', target='Stop'}, + #sequenceFlow{source='Action', target='Final'}, + #sequenceFlow{source='Stop', target='Action'}, + #sequenceFlow{source='Stop', target='Final'} + % #sequenceFlow{source='Delay', target='Process'}, + % #sequenceFlow{source='Process', target='Final'} + ], + + tasks = [ + #userTask { name='Init', module = job }, + #serviceTask { name='Action', module = job }, + #userTask { name='Stop', module = job }, + %#serviceTask { name='Process', module = job }, + % #serviceTask { name='Delay', module = job }, + #endEvent { name='Final'} + ], + + beginEvent = 'Init', + endEvent = 'Final', + events = [ + #boundaryEvent{ name = '*', timeout={0,{0,30,0}} }, + #timeoutEvent{name='Action', timeout=T, module=job}, + #timeoutEvent{name='Final', timeout={0,{0,0,3}}, module=job} + + ] + }. + +update_event(Proc,Task,T)-> + %Proc=bpe:load(Id), + Events=lists:keystore(Task,#timeoutEvent.name,bpe:events(Proc),#timeoutEvent{name=Task, timeout=T}), + NewP=Proc#process{events = Events}, + kvs:put(NewP), NewP + . diff --git a/apps/roster/src/protocol/roster_bpe.erl b/apps/roster/src/protocol/roster_bpe.erl new file mode 100644 index 000000000..63cce5fd7 --- /dev/null +++ b/apps/roster/src/protocol/roster_bpe.erl @@ -0,0 +1,87 @@ +-module(roster_bpe). +-include("api_bpe.hrl"). +-include_lib("emqttd/include/emqttd.hrl"). +-include_lib("n2o/include/n2o.hrl"). +-compile({parse_transform, bert_javascript}). +-compile(export_all). + +start() -> n2o_async:start(#handler{module=?MODULE,class=system,group=roster,name=?MODULE,state=[]}). + +info(#'Job'{proc= <<"job">>,docs=D, timeout=T, status=create}=M,R,#cx{params = ClientId, client_pid = C} = S) -> + n2o:info(?MODULE, "create:~w",[M]), + {reply,{bert,{io,bpe:start(job:def(M),[{notification,C}]), <<>>}},R,S}; + +info(#'Job'{proc= <<"Duration">>,docs=D, timeout=T, status=create}=J,R,#cx{params = ClientId, client_pid = C} = S) -> + n2o:info(?MODULE, "create:~w",[J]), + %n2o_async:send(system, ?MODULE, {start, M}), + %Start={ok,Id}=bpe:start(job:def(J),[]), + n2o_async:pid(system,?MODULE) ! {start, J, R,S}, + %#'Job'{id=Id}}, + {reply,{bert, <<>>},R,S}; + +info(#'Job'{proc=Proc,docs=Docs, status=create}=M,R,S) -> + n2o:info(?MODULE, "create:~w",[M]), + {reply,{bert,{io,bpe:start(Proc,Docs), <<>>}},R,S}; + +info(#'Job'{id=Proc,docs=Docs, status=get}=M,R,S) -> + n2o:info(?MODULE, "GET:~w",[M]), + {reply,{bert,{io,Proc, <<>>}},R,S}; + + +info(#'Job'{id=Proc,docs=Docs, timeout=T, status=restart}=M,R,S) -> + n2o:info(?MODULE, "amend:~w",[M]), + bpe:find_pid(Proc) ! {'DOWN', <<>>, <<>>, <<>>, <<>>}, + P=bpe:load(Proc), NewP=job_process:update_event(P,'Action',T), bpe:start(NewP,[]), + {reply,{bert,{io,bpe:amend(Proc,Docs, noflow), <<>>}},R,S}; + +info(#'Job'{id=Proc, docs=Docs, status=update}=M,R,S) -> + %P=bpe:load(Proc), + n2o:info(?MODULE, "UPDATE:~w",[M]), + {reply,{bert,{io,bpe:amend(Proc,Docs,noflow), <<>>}},R,S}; + + +info(#'Job'{id=Proc, events=Events, status=event}=M,R,S) -> + n2o:info(?MODULE, "proc:~w",[M]), + {reply,{bert,{io,bpe:event(Proc, Events), <<>>}},R,S}; + +info(#'Job'{id=Proc, status=history}=M,R,S) -> + n2o:info(?MODULE, "hist:~w",[M]), + {reply,{bert,{io,bpe:history(Proc), <<>>}},R,S}; + +info(#'Job'{id=Proc, status=proc}=M,R,S) -> + n2o:info(?MODULE, "proc:~w",[M]), + {reply,{bert,{io,bpe:process(Proc), <<>>}},R,S}; + +info(#'Job'{id=Proc, status=complete}=M,R,S) -> + n2o:info(?MODULE, "complete:~w",[M]), + {reply,{bert,{io,bpe:complete(Proc), <<>>}},R,S}; + +info(M,R,S) -> n2o:info(?MODULE, "UNKNOWN:~w",[M]), {unknown,M,R,S}. + +proc(init,#handler{name=roster_bpe} = Async) -> + {ok, C} = emqttc:start_link([{client_id, <<"sys_bpe">>},{logger, {console, error}},{reconnect, 5}]), + n2o:info(?MODULE, "ASYNC BPE started: ~w",[C]), + {ok,Async#handler{state=C,seq=0}}; + +proc({start,#'Job'{ docs=Docs}=J, R, #cx{params = ClientId}=State}, #handler{state=C,seq=S}=H) -> + n2o:info(?MODULE, "BPE PROC started",[]), +%% Proc=bpe:load(Id), +%% bpe_proc:init(NewP=Proc#process{options =[{notification, C}], notifications = C}), +%% %bpe:option(Proc, notification, C), +%% %bpe:cache({process,Id}, undefined), +%% kvs:put(NewP), + %bpe:cache({process,Id},NewP), + Start={ok,Id}=bpe:start(job:def(J),[{notification, C}]), + %info(J#'Job'{id=Id, docs=Docs, status=get},R,State) , + roster:send_action(C, ClientId,J#'Job'{id=Id, status=get}), + %#complete{id=Task}=bpe:complete(Id), + bpe:amend(Id,Docs), + {reply,[], H}; + + +proc({mqttc, C, connected}, State=#handler{state=C,seq=S}) -> {ok, State#handler{seq = S+1}}; +proc({mqttc, _C, disconnected}, State) -> {ok, State}. + + + + -- GitLab