diff --git a/apps/roster/src/api/google_api.erl b/apps/roster/src/api/google_api.erl index 1fa6cc6b5df4d432008866142bed3c9e9eb3004e..562a10bb6ce6020aadd0f84d956055338761ba00 100644 --- a/apps/roster/src/api/google_api.erl +++ b/apps/roster/src/api/google_api.erl @@ -43,7 +43,7 @@ detect_language(Text) -> ok. translate(Text, Target) -> - ?LOG_INFO("Debug.Target:~p.Text: ~p", [Target, Text]), + ?LOG_DEBUG("Debug.Target:~p.Text: ~p", [Target, Text]), QueryParams = iolist_to_binary(["key=", ?GOOGLE_API_KEY]), PostVariables = jsx:encode([{<<"q">>, Text}, {<<"target">>, Target}]), ?LOG_INFO("PostVariables: ~p", [PostVariables]), @@ -71,35 +71,39 @@ start() -> _ -> ?LOG_ERROR("google credential file ~p not found", PathFile) end, - case ?GOOGLE_API_KEY of - [] -> get_access_token(); - <<"AIza", _/binary>> -> ok - end. + get_access_token(). del_gs_object(<<"gs://", Path/binary>>) -> - Bucket = ?GS_BUCKET, - case binary:split(Path, [<<"/">>]) of - [Bucket|PathObject] -> - Object = http_uri:encode(roster:binary_join(PathObject, <<"/">>)), - case del_gs_object(Bucket, Object) of - ok -> ?LOG_INFO("object ~p is deleted successfully", [Object]), ok; - Err -> ?LOG_ERROR("cannot delete object ~p", [Err]), Err - end; - [_|_] -> ok - end. + Bucket = ?GS_BUCKET, + case binary:split(Path, [<<"/">>]) of + [Bucket|PathObject] -> + Object = http_uri:encode(roster:binary_join(PathObject, <<"/">>)), + case del_gs_object(Bucket, Object) of + ok -> + ?LOG_INFO("object ~p is deleted successfully", [Object]), + ok; + Err -> + ?LOG_ERROR("cannot delete object ~p: ~p", [Object, Err]), + Err + end; + [_|_] -> + ok + end. del_gs_object(Bucket, Object) -> - case application:get_env(roster, google_creds) of - {ok, Creds} -> - enenra:delete_object(Bucket, Object, Creds); - undefined -> ?LOG_ERROR("creds not found to delete GS object", []), - {error, creds_not_found} - end. + case application:get_env(roster, google_creds) of + {ok, Creds} -> + enenra:delete_object(Bucket, Object, Creds); + undefined -> + ?LOG_ERROR("creds not found to delete GS object", []), + {error, creds_not_found} + end. + uri_to_gs(<<"https://www.googleapis.com/storage/", _/binary>> = Uri) -> - try - {ok, {https, _, _, _, <<"/storage/v1/b/", BucketObject/binary>>, _}} = http_uri:parse(Uri), - [Bucket, Object] = binary:split(BucketObject, <<"/o/">>), - <<"gs://", Bucket/binary, "/", (http_uri:decode(Object))/binary>> - catch _:_ -> Uri end; + try + {ok, {https, _, _, _, <<"/storage/v1/b/", BucketObject/binary>>, _}} = http_uri:parse(Uri), + [Bucket, Object] = binary:split(BucketObject, <<"/o/">>), + <<"gs://", Bucket/binary, "/", (http_uri:decode(Object))/binary>> + catch _:_ -> Uri end; uri_to_gs(Uri) -> Uri. access_url(Url, <<"AIza", _/binary>> = AccessKey) -> @@ -120,23 +124,25 @@ token_expiration(Token) -> end. get_access_token() -> - case ?GOOGLE_API_KEY of - [] -> - Now = round(roster:now_msec()/1000), - case application:get_env(roster, access_token, {[], 0}) of - {T, ExpTime} when Now > ExpTime; T == error -> - case lists:droplast(os:cmd(?SHELL_ACCESS_TOKEN_CMD)) of - "ya29"++_ = Token-> - application:set_env(roster, access_token, token_expiration(Token)), Token; - ErrInfo -> - ?LOG_ERROR("invalid acess token: ~p", [ErrInfo]), - application:set_env(roster, access_token, {error, invalid_access_token}), - {error, invalid_access_token} - end; - {Token,_ExpTime} -> Token - end; - <<"AIza", _/binary>> = Key -> Key - end. + case ?GOOGLE_API_KEY of + [] -> + Now = round(roster:now_msec()/1000), + case application:get_env(roster, access_token, {[], 0}) of + {T, ExpTime} when Now > ExpTime; T == error -> + case lists:droplast(os:cmd(?SHELL_ACCESS_TOKEN_CMD)) of + "ya29"++_ = Token-> + application:set_env(roster, access_token, token_expiration(Token)), + Token; + ErrInfo -> + ?LOG_ERROR("invalid acess token: ~p", [ErrInfo]), + application:set_env(roster, access_token, {error, invalid_access_token}), + {error, invalid_access_token} + end; + {Token, _ExpTime} -> Token + end; + <<"AIza", _/binary>> = Key -> Key + end. + transcribe_config(Content, Lang, Encoding) -> jsx:encode(#{config => #{encoding => Encoding, @@ -145,183 +151,138 @@ transcribe_config(Content, Lang, Encoding) -> languageCode => Lang}, audio => Content}). -transcribe(Type, Data, Lang) -> transcribe(Type, Data, Lang, <<"ENCODING_UNSPECIFIED">>, fun(Text) -> Text end). -transcribe(Type, Data, Lang, Encoding, ReturnFun) when Type == short; Type == long -> - D = {Url, Content} = - case {Type, uri_to_gs(Data)} of - {long, <<"gs://", _/binary>> = GS} -> {?TRANSCRIBE_LONG_URL, #{uri => GS}}; - {short, <<"gs://", _/binary>> = GS} -> {?TRANSCRIBE_URL, #{uri => GS}}; - {short, _} -> {?TRANSCRIBE_URL, #{content => Data}}; - {long, _} -> ?LOG_ERROR("invalid url for transcribe of long audio: ~p", [Data]), - {error, invalid_data} - end, - case D of {error, _} -> ReturnFun(D); - _ -> Config = transcribe_config(Content, Lang, Encoding), - case get_access_token() of - {error, _} = Err -> ReturnFun(Err); - AccessToken -> - AuthHeaders = access_headers(AccessToken), - Request = {access_url(Url, AccessToken), AuthHeaders, "application/json", Config}, - case roster_rest:send_request(post, Request, [], []) of - {ok, Result} -> - case jsx:decode(list_to_binary(Result), [return_maps]) of - #{<<"results">> := Alternatives} when Type == short -> -%% ?LOG_INFO("alternative transcribes: ~p", [Alternatives]), - ReturnFun( - case merge_transcribe(Alternatives) of - <<>> -> {error, invalid_transcribe}; - Text -> Text end); - #{<<"name">> := OperationName} -> - LongRequest = {access_url(?TRANSCRIBE_OPERATIONS_URL ++ OperationName, AccessToken), AuthHeaders}, - send_operation(LongRequest, OperationName, ?LONG_TRANSCRIBE_COUNTER, ?LONG_TRANSCRIBE_TIMEOUT, ReturnFun); - #{} -> ?LOG_ERROR("no text for transcribe in audio file", []), - ReturnFun({error, invalid_transcribe}) - end; - {error, _} = Err -> ReturnFun(Err) - end - end - end; -transcribe(Type, Data, Lang, Encoding, Fun) when is_binary(Type) -> - transcribe(binary_to_atom(Type, utf8), Data, Lang, Encoding, Fun). +transcribe(Type, Data, Lang) -> transcribe(Type, Data, Lang, <<"ENCODING_UNSPECIFIED">>). +transcribe(Type, Data, Lang, Encoding) when Type == short; Type == long -> + {Url, Content} = + case {Type, uri_to_gs(Data)} of + {long, <<"gs://", _/binary>> = GS} -> {?TRANSCRIBE_LONG_URL, #{uri => GS}}; + {short, <<"gs://", _/binary>> = GS} -> {?TRANSCRIBE_URL, #{uri => GS}}; + {short, _} -> {?TRANSCRIBE_URL, #{content => Data}}; + {long, _} -> + ?LOG_ERROR("invalid url for transcribe of long audio: ~p", [Data]), + {error, invalid_data} + end, + case {Url, Content} of + {error, _} = Error -> Error; + _ -> + Config = transcribe_config(Content, Lang, Encoding), + case get_access_token() of + {error, _} = Err -> + Err; + AccessToken -> + AuthHeaders = access_headers(AccessToken), + Request = {access_url(Url, AccessToken), AuthHeaders, "application/json", Config}, + case roster_rest:send_request(post, Request, [], []) of + {ok, Result} -> + case jsx:decode(list_to_binary(Result), [return_maps]) of + #{<<"results">> := Alternatives} when Type == short -> + %% ?LOG_INFO("alternative transcribes: ~p", [Alternatives]), + case merge_transcribe(Alternatives) of + <<>> -> {error, invalid_transcribe}; + Text -> Text + end; + #{<<"name">> := OperationName} -> + LongRequest = {access_url(?TRANSCRIBE_OPERATIONS_URL ++ binary_to_list(OperationName), AccessToken), AuthHeaders}, + send_operation(LongRequest, OperationName, ?LONG_TRANSCRIBE_COUNTER, ?LONG_TRANSCRIBE_TIMEOUT); + #{} -> + ?LOG_ERROR("no text for transcribe in audio file", []), + {error, invalid_transcribe} + end; + {error, _} = Err -> + Err + end + end + end; +transcribe(Type, Data, Lang, Encoding) when is_binary(Type) -> + transcribe(binary_to_atom(Type, utf8), Data, Lang, Encoding). merge_transcribe(Alternatives) -> iolist_to_binary(lists:flatten([T|| #{<<"alternatives">> := Ts} <- Alternatives, #{<<"transcript">> := T} <- Ts])). -send_operation(_LongRequest, _OperationName, 0, _Timeout, Fun) -> - Fun({error, timeout}); -send_operation(LongRequest, OperationName, Counter, Timeout, Fun) -> -%% ?LOG_INFO("transcribe counter = ~p", [Counter]), - case roster_rest:send_request(get, LongRequest, [], []) of - {ok, R} -> - case jsx:decode(list_to_binary(R), [return_maps]) of - #{<<"done">> := true, - <<"name">> := OperationName, - <<"response">> := - #{<<"results">> := Alternatives}} -> - Fun(case merge_transcribe(Alternatives) of - <<>> -> {error, invalid_transcribe}; Text -> Text end); - #{<<"name">> := OperationName} -> - timer:apply_after(Timeout, ?MODULE, send_operation, [LongRequest, OperationName, Counter - 1, Timeout, Fun]); - #{} = R -> ?LOG_ERROR("invalid long operation ~p", [R]), - Fun({error, invalid_transcribe}) - end; - {error, _} = Err -> Fun(Err) - end. +send_operation(_LongRequest, _OperationName, 0, _Timeout) -> + {error, timeout}; +send_operation(LongRequest, OperationName, Counter, Timeout) -> + case roster_rest:send_request(get, LongRequest, [], []) of + {ok, R} -> + case jsx:decode(list_to_binary(R), [return_maps]) of + #{<<"done">> := true, + <<"name">> := OperationName, + <<"response">> := #{<<"results">> := Alternatives}} -> + case merge_transcribe(Alternatives) of + <<>> -> + {error, invalid_transcribe}; + Text -> + Text + end; + #{<<"name">> := OperationName} -> + timer:sleep(Timeout), + send_operation(LongRequest, OperationName, Counter - 1, Timeout); + #{} = R -> + ?LOG_ERROR("invalid long operation ~p", [R]), + {error, invalid_transcribe} + end; + {error, _} = Err -> + Err + end. gen_tmp_filename() -> filename:join(?DOWNLOAD_DIR, "ibrowse_tmp_file_"++ uuid:to_string(uuid:uuid4())). download(Uri) -> download(Uri, ?DOWNLOAD_TIMEOUT). download(Uri, Timeout) -> - case ibrowse:send_req(Uri, [], get, [], [{save_response_to_file, gen_tmp_filename()}], Timeout) of - {ok, "200", _, File} -> File; - {ok, _, _, ErrString} = Res-> - ?LOG_ERROR("~p", [Res]), - {error, ErrString}; - Error -> Error - end. + case ibrowse:send_req(Uri, [], get, [], [{save_response_to_file, gen_tmp_filename()}], Timeout) of + {ok, "200", _, File} -> File; + {ok, _, _, ErrString} = Res -> + ?LOG_ERROR("~p", [Res]), + {error, ErrString}; + Error -> Error + end. convert_ffmpeg("https://"++_ = Uri) -> - case download(Uri) of - {file, File} -> - convert_ffmpeg(File); - {error, _} = Err-> Err - end; + ?LOG_DEBUG("download uri ~s", [Uri]), + case download(Uri) of + {file, File} -> + convert_ffmpeg(File); + {error, _} = Err -> + ?LOG_ERROR("Failed download ~s: ~p", [Uri, Err]), + Err + end; convert_ffmpeg(FileIn) -> - io_lib:format(?CONVERT_CMD, [FileIn, FileIn]), - Result = os:cmd(io_lib:format(?CONVERT_CMD, [FileIn, FileIn])), - case filelib:is_file(FileOut = FileIn++".wav") of - true -> {file, FileOut, FileIn}; - _ -> ?LOG_ERROR("ffmpeg error:~n~p", [Result]), - {error, file_not_created} - end. + Result = os:cmd(io_lib:format(?CONVERT_CMD, [FileIn, FileIn])), + case filelib:is_file(FileOut = FileIn++".wav") of + true -> + {file, FileOut, FileIn}; + _ -> + ?LOG_ERROR("ffmpeg error:~n~p", [Result]), + {error, file_not_created} + end. %% upload file to google cloud and returns gs uri -gs_upload(FileIn) -> gs_upload(FileIn, ?GS_BUCKET, <<"audio/wav">>). +gs_upload(FileIn) -> + gs_upload(FileIn, ?GS_BUCKET, <<"audio/wav">>). + gs_upload(<<"gs//", _/binary>> = GsUri, _Bucket, _ContentType) -> {gs, GsUri}; gs_upload(FileIn, Bucket, ContentType) -> - case convert_ffmpeg(FileIn) of - {file, FileOut, FileIn2} -> - case application:get_env(roster, google_creds) of - {ok, Creds} -> - Name = list_to_binary(filename:basename(FileOut)), %%TODO maybe changed path to object in future - Size = filelib:file_size(FileOut), - {ok, Md5} = enenra:compute_md5(FileOut), - case enenra:upload_file(FileOut, - #object{name = Name, bucket = Bucket, contentType = ContentType, md5Hash = Md5, size = Size}, Creds) of - {ok, #object{}} -> - GsUri = iolist_to_binary(["gs://",Bucket,"/",Name]), - ?LOG_INFO("object ~p is uploaded successfully", [GsUri]), - [file:delete(filename:absname(File)) || File<-[FileIn2, FileOut]], - {gs, GsUri}; - Err -> - ?LOG_ERROR("upload file ~p to Google Storage is failed: ~p", [FileOut, Err]), - Err - end; - undefined -> - ?LOG_ERROR("google creds not found", []), - {error, creds_not_found} - end; - Err -> Err - end. - -%% ------------------------------------------------------------------ -%% Tests -%% ------------------------------------------------------------------ - -test_detect_language_ru() -> - Text = <<"Русский Текст">>, - detect_language(Text). - -test_detect_language_en() -> - Text = <<"English text">>, - detect_language(Text). - -test_translate_ru_en() -> - Text = <<"Русский Текст">>, - Target = <<"en">>, - translate(Text, Target). - -test_translate_en_ru() -> - Text = <<"English text">>, - Target = <<"ru">>, - translate(Text, Target). - -test_transcribe() -> - {ok, Binary} = file:read_file("apps/roster/priv/transcribe_test.flac"), - <<_:8, _/binary>> = google_api:transcribe(short, base64:encode(Binary), <<"en-US">>). -test_long_transcribe() -> - ?LOG_INFO("wait about 30 sec", []), - spawn( - fun() -> - Pid = self(), - google_api:transcribe(long, <<"gs://gcs-test-data/vr.flac">>, <<"en-US">>, <<"ENCODING_UNSPECIFIED">>, - fun(Text) -> Pid ! Text end), - receive - <<_:8, _/binary>> = Text -> ?LOG_INFO("~p", [Text]), Text - after - 50000 -> ?LOG_INFO("subscribe timeout", []) - end - end). - -test_convert_transcribe() -> - {file, File, _} = convert_ffmpeg("apps/roster/priv/transcribe_test.flac"), - {ok, Binary} = file:read_file(File), - file:delete(filename:absname(File)), - <<_:8, _/binary>> = google_api:transcribe(short, base64:encode(Binary), <<"en-US">>). - -test_convert_transcribe_long() -> - test_convert_transcribe_long("https://gcs-test-data.storage.googleapis.com/vr.flac"). -test_convert_transcribe_long(Url) -> - {gs, GsUri} = gs_upload(Url), - ?LOG_INFO("\tbegin long transcribe~n\t\twait about 30 sec", []), - spawn( - fun() -> - Pid = self(), - google_api:transcribe(long, GsUri, <<"en-US">>, <<"ENCODING_UNSPECIFIED">>, - fun(Text) -> del_gs_object(GsUri), Pid ! Text end), - receive - <<_:8, _/binary>> = Text -> ?LOG_INFO("~p", [Text]), Text - after - 50000 -> ?LOG_INFO("transcribe timeout", []) - end - end). + case convert_ffmpeg(FileIn) of + {file, FileOut, FileIn2} -> + case application:get_env(roster, google_creds) of + {ok, Creds} -> + Name = list_to_binary(filename:basename(FileOut)), %%TODO maybe changed path to object in future + Size = filelib:file_size(FileOut), + {ok, Md5} = enenra:compute_md5(FileOut), + case enenra:upload_file(FileOut, + #object{name = Name, bucket = Bucket, contentType = ContentType, md5Hash = Md5, size = Size}, Creds) of + {ok, #object{}} -> + GsUri = iolist_to_binary(["gs://",Bucket,"/",Name]), + ?LOG_INFO("object ~p is uploaded successfully", [GsUri]), + [ file:delete(filename:absname(File)) || File <- [FileIn2, FileOut]], + {gs, GsUri}; + Err -> + ?LOG_ERROR("upload file ~p to Google Storage failed: ~p", [FileOut, Err]), + Err + end; + undefined -> + ?LOG_ERROR("google creds not found", []), + {error, creds_not_found} + end; + Err -> Err + end. diff --git a/apps/roster/src/protocol/roster_message.erl b/apps/roster/src/protocol/roster_message.erl index cc92943eb03970ba15d2d5dd83199d7b4a907854..7df9b0534d973ac71d720708abd49c0ac2c04a40 100644 --- a/apps/roster/src/protocol/roster_message.erl +++ b/apps/roster/src/protocol/roster_message.erl @@ -348,45 +348,61 @@ info(#'Message'{status = update, id = Id, files = [#'Desc'{mime = <<"transcribe" Pid = n2o_async:pid(system, roster_message), case kvs:get('Message', Id) of {ok, #'Message'{feed_id = Feed, from = From, to = To, files = [#'Desc'{mime = <<"audio">>, payload = Uri} | _]}} -> - ?LOG_INFO("enter ~p transcribe process with ~p, (Lang=~p)", [Type, Uri, Lang]), + ?LOG_DEBUG("enter ~p transcribe process with ~p, (Lang=~p)", [Type, Uri, Lang]), ErrMsg = #'Message'{id = Id, feed_id = Feed, from = From, to = To}, case Type of short -> spawn(fun() -> - Res = - case google_api:convert_ffmpeg(binary_to_list(Uri)) of - {file, FileOut, FileIn} -> - {ok, Binary} = file:read_file(FileOut), - R = case google_api:transcribe(Type, base64:encode(Binary), Lang) of - {error, _} = E -> #io{code = E, data = ErrMsg}; - Text -> Data2 = lists:keydelete(<<"TYPE">>, #'Feature'.key, Data), - Msg#'Message'{files = [Desc#'Desc'{payload = Text, data = Data2}]} - end, - [file:delete(filename:absname(File)) || File <- [FileIn, FileOut]], R; - {error, ErrInfo} -> - ?LOG_ERROR("invalid url for transcribe: ~p", [ErrInfo]), - #io{code = {error, invaid_data}, data = ErrMsg} - end, Pid ! {Res, ClientId} end), + Res = + case google_api:convert_ffmpeg(binary_to_list(Uri)) of + {file, FileOut, FileIn} -> + {ok, Binary} = file:read_file(FileOut), + R = case google_api:transcribe(Type, base64:encode(Binary), Lang) of + {error, _} = E -> + #io{code = E, data = ErrMsg}; + Text -> + Data2 = lists:keydelete(<<"TYPE">>, #'Feature'.key, Data), + Msg#'Message'{files = [Desc#'Desc'{payload = Text, data = Data2}]} + end, + [file:delete(filename:absname(File)) || File <- [FileIn, FileOut]], + R; + {error, ErrInfo} -> + ?LOG_ERROR("invalid url for transcribe: ~p ~p", [Uri, ErrInfo]), + #io{code = {error, invalid_data}, data = ErrMsg} + end, + Pid ! {Res, ClientId} + end), {reply, {bert, #io{code = #ok{code = transcribe}, data = Id}}, Req, State}; long -> spawn(fun() -> - {gs, GsUri} = google_api:gs_upload(binary_to_list(Uri)), - Fun = - fun(#error{} = Err) -> - Pid ! {#io{code = Err, data = ErrMsg}, ClientId}; - (Text) -> - Data2 = lists:keydelete(<<"TYPE">>, #'Feature'.key, Data), - google_api:del_gs_object(GsUri), - Pid ! {Msg#'Message'{files = [Desc#'Desc'{payload = Text, data = Data2}]}, ClientId} - end, google_api:transcribe(Type, GsUri, Lang, <<"ENCODING_UNSPECIFIED">>, Fun) end), + Res = + case google_api:gs_upload(binary_to_list(Uri)) of + {gs, GsUri} -> + R = case google_api:transcribe(Type, GsUri, Lang) of + {error, _} = E -> + #io{code = E, data = ErrMsg}; + Text -> + Data2 = lists:keydelete(<<"TYPE">>, #'Feature'.key, Data), + Msg#'Message'{files = [Desc#'Desc'{payload = Text, data = Data2}]} + end, + google_api:del_gs_object(GsUri), + R; + {error, ErrInfo} -> + ?LOG_ERROR("invalid url for transcribe: ~p ~p", [Uri, ErrInfo]), + #io{code = {error, invalid_data}, data = ErrMsg} + end, + Pid ! {Res, ClientId} + end), {reply, {bert, #io{code = #ok{code = transcribe}, data = Id}}, Req, State}; - _ -> ?LOG_ERROR("invalid transcribe type ~p", [Type]), + _ -> + ?LOG_ERROR("invalid transcribe type ~p", [Type]), {reply, {bert, #io{code = #error{code = invalid_data}, data = ErrMsg}}, Req, State} end; {ok, InvalidMsg} -> ?LOG_INFO("invalid audio transcribe for ~p", [InvalidMsg]), {reply, {bert, #io{code = invalid_data, data = InvalidMsg}}, Req, State}; - {error, _} = E -> ?LOG_ERROR("message ~p not found for transcribe", [Id]), + {error, _} = E -> + ?LOG_ERROR("message ~p not found for transcribe", [Id]), {reply, {bert, #io{code = E}, #'Message'{id = Id}}, Req, State} end; @@ -470,7 +486,8 @@ proc({#io{} = IO, ClientId}, #handler{state = C} = H) -> {reply, [], H}; proc({#'Message'{status = update} = Msg, ClientId}, #handler{state = C} = H) -> ?LOG_INFO("UPDATE TRANSCRIBE: ~p", [Msg]), - try {reply, {bert, IO}, _, _} = info(Msg, {[], handled}, #cx{params = ClientId, client_pid = C, state = ack}), + %% As a dirty hack, we let sys_bpe post the messaage update. The client itself may have disconnected + try {reply, {bert, IO}, _, _} = info(Msg, {[], handled}, #cx{params = <<"sys_bpe">>, client_pid = C, state = ack}), roster:send_action(C, ClientId, IO) catch Err:Rea:Sta -> ?LOG_ERROR("~p:~p:~p", [Err, Rea, Sta]) diff --git a/doc/release-notes/next/NY8653-transcribe b/doc/release-notes/next/NY8653-transcribe new file mode 100644 index 0000000000000000000000000000000000000000..b8251c7271906febe1473a5f572e9d1bac6da1f3 --- /dev/null +++ b/doc/release-notes/next/NY8653-transcribe @@ -0,0 +1,11 @@ +# Release 1.3! + +### Highlights + +* Fixed bugs in transcribing long messages +* -- + +### List of changes + +* Added tests for transcribe +* fixed enenra package to not fail on token refresh diff --git a/rebar.config b/rebar.config index 1d43b491afef294674628b6a78bd72f79738c492..a22caab3b5575af9665cfdadedbd09d4ce579b89 100644 --- a/rebar.config +++ b/rebar.config @@ -33,7 +33,7 @@ {cowboy, {git, "git://github.com/ninenines/cowboy", {tag,"2.7.0"}}}, {jose, {git, "https://github.com/NYNJA-MC/jose-erlang.git", {ref, "7094018"}}}, {uuid, {git, "https://github.com/avtobiff/erlang-uuid.git",{branch,"master"}}}, - {enenra, {git, "https://github.com/nlfiedler/enenra", {branch, "master"}}}, + {enenra, {git, "git://github.com/NYNJA-MC/enenra", {branch, "master"}}}, {'qdate', {git, "https://github.com/enterprizing/qdate.git", {ref,"fba988fc54214bb37a3ce11d5c5a3cc68752c3ce"}}}, {redbug, {git, "https://github.com/massemanet/redbug.git", {tag, "2.0.1"}}} ]}. diff --git a/rebar.lock b/rebar.lock index a1f6539e025e39b50da147c33edfb9501f0e9734..31235aeb1c596699627d95e68a2619cbc237bf7b 100644 --- a/rebar.lock +++ b/rebar.lock @@ -46,8 +46,8 @@ {ref,"1010714c50515ee31277f45ee4b274573a0b0cd7"}}, 0}, {<<"enenra">>, - {git,"https://github.com/nlfiedler/enenra", - {ref,"fc253037cb5913700bb83cddafdd835bdbd6992c"}}, + {git,"git://github.com/NYNJA-MC/enenra", + {ref,"ff2f61ea8356e74b954264dbf88175ff532bbe5c"}}, 0}, {<<"envy">>, {git,"https://github.com/markan/envy.git", diff --git a/test/integration_test.ts b/test/integration_test.ts index cd6072f3539badddbf946c973980e4ad38c190f5..7fe758b9d1bc8e33c41261175c5438c92529b9ee 100644 --- a/test/integration_test.ts +++ b/test/integration_test.ts @@ -1,3 +1,4 @@ {node, mq, 'mq@127.0.0.1'}. -{suites, ".", [server_SUITE]}. \ No newline at end of file +{suites, ".", [server_SUITE]}. +%{cases, ".", server_SUITE, [group_csv]}. diff --git a/test/nynja.erl b/test/nynja.erl index b65821cefd2ab7e890dbdb47808cce2a8cf8787a..e49817458c97ada74c49a7c5823bf34ccbd9d61a 100644 --- a/test/nynja.erl +++ b/test/nynja.erl @@ -96,7 +96,7 @@ connect_user(Phone, PhoneLink, RosterUUID) -> User = connect_user_(Phone, PhoneLink, RosterUUID), %% timer:sleep(25), %% avoid race between subscriptions and get_profile - #mqtt_packet{ payload = #'Profile'{ rosters = [#'Roster'{ id = RosterIx }] } } = get_profile(User, 5000), + #mqtt_packet{ payload = #'Profile'{ rosters = [#'Roster'{ id = RosterIx }] } } = get_profile(User, 10000), RosterId = make_phone_id(PhoneLink, RosterIx), @@ -243,6 +243,30 @@ msg_room(User = #user{ client = ClientId }, Room, Msg, Ack) -> N = if Ack -> 2; true -> 1 end, ws_send(User, mqtt_publish(ClientId, MsgS), N, 5000). +audio_msg_room(User, Room, Url) -> + audio_msg_room(User, Room, Url, false). + +audio_msg_room(User = #user{ client = ClientId, roster_id = PhoneId}, RoomName, Url, Ack) -> + RoomId = typed_uuid(<<"room">>, RoomName), + MsgS = make_audio_message(ClientId, PhoneId, RoomId, #muc{ name = RoomId }, Url, Ack), + N = if Ack -> 2; true -> 1 end, + ws_send(User, mqtt_publish(ClientId, MsgS), N, 10000). + +audio_msg_chat(User1 = #user{ client = ClientId, roster_id = RosterId1}, #user{roster_id = RosterId2}, Url, FeedId) -> + Msg = make_audio_message(ClientId, RosterId1, RosterId2, FeedId, Url, false), + ws_send(User1, mqtt_publish(ClientId, Msg), 1, 10000). + +transcribe_msg_room(User = #user{ client = ClientId, roster_id = PhoneId}, RoomName, IdMessage, Type, Ack) -> + RoomId = typed_uuid(<<"room">>, RoomName), + MsgS = make_transcribe_message(ClientId, PhoneId, IdMessage, #muc{ name = RoomId }, Type, Ack), + N = if Ack -> 2; true -> 1 end, + ws_send(User, mqtt_publish(ClientId, MsgS), N, 30000). + +transcribe_msg_chat(User = #user{ client = ClientId, roster_id = PhoneId}, FeedId, IdMessage, Type, Ack) -> + MsgS = make_transcribe_message(ClientId, PhoneId, IdMessage, FeedId, Type, true), + N = if Ack -> 2; true -> 1 end, + ws_send(User, mqtt_publish(ClientId, MsgS), N, 30000). + make_room_message(User, RoomName, Msg) -> make_room_message(User, RoomName, Msg, false). @@ -268,6 +292,65 @@ make_message(ClientId, PhoneId, To, Feed, Msg, Ack) -> #'Message'{ feed_id = Feed, msg_id = typed_uuid(PhoneId, TS), from = PhoneId, to = To, files = PayloadDesc ++ AckDesc }. +make_audio_message(ClientId, PhoneId, To, Feed, Url, Ack) -> + TS = integer_to_binary(uniq()), + ID = <>, + Data = + [ #'Feature'{ id = <>, + key = <<"INFO">>, + value = <<"bla">>, + group = <<"FILE_DATA">> }, + #'Feature'{id = <>, + key = <<"DURATION">>, + value = <<"4.649795918367347">>, + group = <<"FILE_DATA">>}, + #'Feature'{id = <>, + key = <<"FILENAME">>, + value = <<"some_file.mp3">>, + group = <<"FILE_DATA">>}, + #'Feature'{id = <>, + key = <<"SIZE">>, + value = <<"93518">>, + group = <<"FILE_DATA">>}], + PayloadDesc = [ #'Desc'{ id = ID, mime = <<"audio">>, + payload = Url, + data = Data } ], + AckDesc = [ #'Desc'{ id = <<"ack", TS/binary>>, mime = <<"ack">> } || Ack ], + #'Message'{ container = [], + feed_id = Feed, + msg_id = typed_uuid(PhoneId, TS), + from = PhoneId, + to = To, + files = PayloadDesc ++ AckDesc }. + +make_transcribe_message(ClientId, PhoneId, IdMessage, Feed, Type, Ack) -> + %% Type is <<"short">> or <<"long">> + TS = integer_to_binary(uniq()), + ID = <>, + Data = + [#'Feature'{id = <>, + key = <<"LANGUAGE">>,value = <<"en">>, + group = <<"FILE_DATA">>}, + #'Feature'{id = <>, + key = <<"USERS">>, + value = PhoneId, + group = <<"FILE_DATA">>}, + #'Feature'{id = <>, + key = <<"TYPE">>, + value = Type, + group = <<"FILE_DATA">>}], + PayloadDesc = [ #'Desc'{ id = ID, mime = <<"transcribe">>, + data = Data } ], + AckDesc = [ #'Desc'{ id = <<"ack", TS/binary>>, mime = <<"ack">> } || Ack ], %% no ack in client + #'Message'{ id = IdMessage, + feed_id = Feed, %% not really needed + container = chain, %% not really needed + from = PhoneId, + to = [], + files = PayloadDesc ++ AckDesc, + status = update }. + + make_member(#{ phone_id := PhoneId } = Args) -> Settings = case Args of @@ -749,4 +832,3 @@ fetch_prefix() -> nynja:send_message_(User, <<"Me has prefix?">>, nynja:user_roster(User), nynja:feed_id(User, User)), nynja:ws_close(User), set_prefix(MsgId). - diff --git a/test/server_SUITE.erl b/test/server_SUITE.erl index 288cd111241e7dc6c942b9922b1a24c553fa6968..20201d72df98a05c7e8630393fd1febaa48f6ccb 100644 --- a/test/server_SUITE.erl +++ b/test/server_SUITE.erl @@ -395,12 +395,10 @@ group_csv(Cfg) -> MemberPhone = <<"001002001">>, MemberPhone2 = <<"001002002">>, - register_user([{phone, Phone}]), %% may fail if already registered register_user([{phone, MemberPhone}, {fname, <<"Kalle">>}]), register_user([{phone, MemberPhone2}, {fname, <<"Lisa">>}]), - AdminUser = nynja:connect_user(Phone), AdminClientId = nynja:user_client(AdminUser), Token = nynja:jwt_token(nynja:user_roster_uuid(AdminUser)), @@ -418,8 +416,7 @@ group_csv(Cfg) -> RoomId = nynja:typed_uuid(<<"room">>, RoomName), ct:log("Room info ~p", [nynja:get_room(AdminUser, RoomName)]), - Res = nynja:msg_room(AdminUser, RoomName, <<"This is a nynja room">>), - %% if we add argument 'true' for acknowledgement this fails Issue 362 + Res = nynja:msg_room(AdminUser, RoomName, <<"This is a nynja room">>, true), ct:log("Res = ~p", [Res]), ct:log("Room info 1 message ~p", [nynja:get_room(AdminUser, RoomName)]), @@ -442,12 +439,22 @@ group_csv(Cfg) -> nynja:msg_room(AdminUser, RoomName, <<"Secret">>), nynja:msg_room(AdminUser, RoomName, <<"Public">>), - nynja:add_to_room(AdminUser, User, RoomName, -1), - nynja:add_to_room(AdminUser, User2, RoomName, -2), + nynja:add_to_room(AdminUser, User, RoomName, 2), + _ = nynja:ws_receive(User), + nynja:add_to_room(AdminUser, User2, RoomName, 4), + _ = nynja:ws_receive(User2), + NewUserInRoom = nynja:ws_receive(User), + ct:log("New User message ~p", [NewUserInRoom]), ct:log("Room info new members ~p", [nynja:get_room(AdminUser, RoomName)]), - nynja:msg_room(AdminUser, RoomName, <<"Welcome new members">>), - ct:log("Room info 2 messages ~p", [nynja:get_room(AdminUser, RoomName)]), + #mqtt_packet{payload = #'Message'{id = WelcomeId}} = nynja:msg_room(AdminUser, RoomName, <<"Welcome new members">>), + _ = nynja:ws_receive(User), + _ = nynja:ws_receive(User2), + + ct:log("WelcomeId = ~p", [WelcomeId]), + + Read2 = nynja:get_messages(User, nynja:room_feed(RoomName), WelcomeId), + ct:log("Compare User read messages with CSV output ~p", [Read2]), {ok, 200, _MultipleCSV} = request('GroupCSV', @@ -492,6 +499,10 @@ chat_csv(Cfg) -> nynja:send_message(Friend, User, <<"Response 1">>, FeedId), nynja:send_message(Friend, User, <<"Msg 2">>, FeedId), + %% cleanup all unreceived messages + nynja:ws_receive(Friend, any, 10), + nynja:ws_receive(User, any, 10), + ct:log("Messages sent"), {ok, 200, _} = diff --git a/test/transcribe_SUITE.erl b/test/transcribe_SUITE.erl new file mode 100644 index 0000000000000000000000000000000000000000..40317adf3112971f10cbd937f1d39d7b0068c5d7 --- /dev/null +++ b/test/transcribe_SUITE.erl @@ -0,0 +1,236 @@ +%% Test transcription and translation of messages +%% Which uses google API + +-module(transcribe_SUITE). +-include_lib("stdlib/include/assert.hrl"). +-include_lib("emqttc/include/emqttc_packet.hrl"). +-include_lib("roster/include/roster.hrl"). + +%% common_test exports +-export([ all/0 + , groups/0 + , init_per_group/2 + , init_per_suite/1 + , end_per_suite/1 + , end_per_group/2 + , init_per_testcase/2 + , end_per_testcase/2 + ]). + +%% Tests +-export([ register_user/1 + , short_transcribe/1 + , long_transcribe/1 + , chat_transcribe/1 + ]). + +all() -> + [ %% {group, translate}, + {group, transcribe} + ]. + +groups() -> + [ {translate, [register_user, translate]}, + {transcribe, [register_user, short_transcribe, long_transcribe, chat_transcribe]} + ]. + +init_per_suite(Cfg) -> + case os:getenv("NYNJA_HOST") of + false -> + ct:pal("NYNJA_HOST undefined. Suite requires running nynja server!"), + {skip, {undefined, "NYNJA_HOST"}}; + Host -> + %% Use a random phone number for testing. Skip tests if number has already been used + Phone = rand:uniform(1000000), + + %% Rebar config has ct_opts point to sys.config such that we + %% have access to config parameters + %% In case one tests against a differently configured node, + %% provide ct_opts with that configuration! + + {ok, Port} = application:get_env(rest, port), + BaseUrl = iolist_to_binary(["http://", Host, ":", integer_to_list(Port), "/"]), + {ok, 200, Response} = request('GetHealth', [], [{base_url, BaseUrl}, basic_auth()]), + ct:log("Pinged server resulted in ~p", [Response]), + %% this does not mean that we "Action Process" is passed + [{base_url, BaseUrl}, {phone, iolist_to_binary(integer_to_list(Phone))} | + Cfg] + end. + +end_per_suite(Cfg) -> + Cfg. + +init_per_group(_Grp, Cfg) -> + Cfg. + +end_per_group(_Grp, Cfg) -> + Cfg. + +init_per_testcase(_TC, Cfg) -> + Cfg. + +end_per_testcase(_TC, Cfg) -> + Cfg. + +%%% ------------- tests ----------------------- + +register_user(Cfg) -> + Phone = proplists:get_value(phone, Cfg, <<"000000123">>), + FirstName = proplists:get_value(fname, Cfg, <<"Tes">>), + LastName = proplists:get_value(lname, Cfg, <<"Tester">>), + ct:log("Phone = ~p", [Phone]), + try User = nynja:register_profile(#{ phone => Phone, fname => FirstName, + lname => LastName, nick => Phone }), + nynja:ws_close(User), + ok + catch + error:{badmatch, {mqtt_packet,_,_, {io,{error,already_exist},_}}} -> + ct:log("Registering failed: user already exists"), + {skip, user_already_registered} + end. + +short_transcribe(Cfg) -> + Phone = proplists:get_value(phone, Cfg, <<"000000036">>), + MemberPhone = <<"001002001">>, + + register_user([{phone, Phone}]), %% may fail if already registered + register_user([{phone, MemberPhone}, {fname, <<"Kalle">>}]), + + AdminUser = nynja:connect_user(Phone), + User = nynja:connect_user(MemberPhone), + RoomName = iolist_to_binary(["Transcribe-", integer_to_list(os:system_time())]), + + ct:log("Creating room ~p", [RoomName]), + nynja:create_room(AdminUser, #{name => RoomName}), + nynja:add_to_room(AdminUser, User, RoomName, 1), + _ = nynja:ws_receive(User), %% get the Room User is added to + #mqtt_packet{ payload = #'Message'{} } = + nynja:msg_room(AdminUser, RoomName, <<"This is a nynja room">>), + UserNotify = nynja:ws_receive(User), %% User gets this msg + ct:log("User notified ~p", [UserNotify]), + + %% Admin sends audio + [#mqtt_packet{ payload = #'Message'{ id = IdMessage } } | _Ack ] = + nynja:audio_msg_room(AdminUser, RoomName, + <<"https://nynja-defaults.s3.us-west-2.amazonaws.com/58f1b135-2c43-48b6-b83a-129254e567f2.mpeg">>, true), + ct:log("Id for 1st audio msg ~p", [IdMessage]), + _ = nynja:ws_receive(User), %% User gets this msg + + %% Admin transcribes its own audio + [#mqtt_packet{ payload = {io, {ok, transcribe}, IdMessage} }, + #mqtt_packet{ payload = #'Message'{id = IdMessage, status = update, files = Data}}] = + nynja:transcribe_msg_room(AdminUser, RoomName, IdMessage, <<"short">>, true), + ct:log("Transcribe result = ~p", [Data]), + [#mqtt_packet{ payload = #'Message'{id = IdMessage, status = update, files = UserSeesTranscribe}}] = nynja:ws_receive(User), %% User gets this msg + ct:log("UserTranscribe = ~p", [UserSeesTranscribe]), + #'Desc'{mime = <<"transcribe">>, payload = <<"1 2 3 4 5">>} = lists:last(UserSeesTranscribe), + + _ = nynja:ws_receive(User, any, 10), + %% User sends audio + [#mqtt_packet{ payload = #'Message'{ id = IdMessage2 } } | _ ] = + nynja:audio_msg_room(User, RoomName, + <<"https://nynja-defaults.s3.us-west-2.amazonaws.com/5b3b3a41-0439-43ab-9bdf-c847e9c72780.mpeg">>, true), + ct:log("Id for 2nd audio msg ~p", [IdMessage2]), + _ = nynja:ws_receive(AdminUser), %% AdminUser gets this msg + + %% AdminUser transcribes User audio + [#mqtt_packet{ payload = {io, {ok, transcribe}, IdMessage2} }, + #mqtt_packet{ payload = #'Message'{id = IdMessage2, status = update, files = Data2}}] = + nynja:transcribe_msg_room(AdminUser, RoomName, IdMessage2, <<"short">>, true), + ct:log("Transcribe result = ~p", [Data2]), + [#mqtt_packet{ payload = #'Message'{id = IdMessage2, status = update, files = UserSeesTranscribe2}}] = nynja:ws_receive(User), %% User gets this msg + ct:log("UserTranscribe2 = ~p", [UserSeesTranscribe2]), + #'Desc'{mime = <<"transcribe">>, payload = <<"this is a test">>} = lists:last(UserSeesTranscribe2), + ok. + +long_transcribe(Cfg) -> + Phone = proplists:get_value(phone, Cfg, <<"000000037">>), + MemberPhone = <<"001002002">>, + + register_user([{phone, Phone}]), %% may fail if already registered + register_user([{phone, MemberPhone}, {fname, <<"Kalle">>}]), + + AdminUser = nynja:connect_user(Phone), + User = nynja:connect_user(MemberPhone), + RoomName = iolist_to_binary(["Transcribe-", integer_to_list(os:system_time())]), + + ct:log("Creating room ~p", [RoomName]), + nynja:create_room(AdminUser, #{name => RoomName}), + nynja:add_to_room(AdminUser, User, RoomName, 1), + _ = nynja:ws_receive(User), %% get the Room User is added to + #mqtt_packet{ payload = #'Message'{} } = + nynja:msg_room(AdminUser, RoomName, <<"This is a nynja room">>), + UserNotify = nynja:ws_receive(User), %% User gets this msg + ct:log("User notified ~p", [UserNotify]), + + %% Admin sends audio + [#mqtt_packet{ payload = #'Message'{ id = IdMessage } } | _Ack ] = + nynja:audio_msg_room(AdminUser, RoomName, + <<"https://nynja-defaults.s3.us-west-2.amazonaws.com/58f1b135-2c43-48b6-b83a-129254e567f2.mpeg">>, true), + ct:log("Id for 1st audio msg ~p", [IdMessage]), + _ = nynja:ws_receive(User), %% User gets this msg + + %% Admin transcribes its own audio + + [#mqtt_packet{ payload = {io, {ok, transcribe}, IdMessage} }, + #mqtt_packet{ payload = #'Message'{id = IdMessage, status = update, files = Data}}] = + nynja:transcribe_msg_room(AdminUser, RoomName, IdMessage, <<"long">>, true), + + ct:log("Transcribe result = ~p", [Data]), + [#mqtt_packet{ payload = #'Message'{id = IdMessage, status = update, files = UserSeesTranscribe}}] = nynja:ws_receive(User), %% User gets this msg + ct:log("UserTranscribe = ~p", [UserSeesTranscribe]), + #'Desc'{mime = <<"transcribe">>, payload = <<"1 2 3 4 5">>} = lists:last(UserSeesTranscribe), + ok. + + +chat_transcribe(Cfg) -> + Phone = proplists:get_value(phone, Cfg, <<"000000037">>), + FriendPhone = <<"000000033">>, + + register_user([{phone, Phone}, {fname, <<"Anna">>}]), %% may fail if already registered + register_user([{phone, FriendPhone}]), + + User = nynja:connect_user(Phone), + Friend = nynja:connect_user(FriendPhone), + + FeedId = nynja:become_friends(User, Friend), + ct:log("Friends: ~p and ~p -> ~p\n", [Phone, FriendPhone, FeedId]), + + Resp = nynja:send_message(Friend, User, <<"Hi there">>, FeedId), + ct:log("Response to User ~p", [Resp]), + + #mqtt_packet{ payload = #'Message'{ id = IdMessage } } = Audio = + nynja:audio_msg_chat(User, Friend, + <<"https://nynja-defaults.s3.us-west-2.amazonaws.com/58f1b135-2c43-48b6-b83a-129254e567f2.mpeg">>, FeedId), + ct:log("Audio Msg ~p: ~p", [IdMessage, Audio]), + + [#mqtt_packet{ payload = {io, {ok, transcribe}, IdMessage} }, + #mqtt_packet{ payload = #'Message'{id = IdMessage, status = update, files = Data}}] = + nynja:transcribe_msg_chat(User, FeedId, IdMessage, <<"short">>, true), + ct:log("Transcribe result = ~p", [Data]), + #'Desc'{mime = <<"transcribe">>, payload = <<"1 2 3 4 5">>} = lists:last(Data), + ok. + +%%% ------------- helpers --------------------- + + +request(Id, Params, Config) -> + Request = api_nynja:request(Id, Params, Config), + ct:log("Outgoing Request ~p", [Request]), + + {ok, Status, Headers, Body} = + api_nynja:http_request(Request, [{timeout, 10000}], [], + [{logfun, fun(F, As) -> + ct:log(F, As) + end}]), + Response = api_nynja:validate_response(Id, Request, Status, Headers, Body), + [ ct:log("Validated ~p\n", [Response]) || true], + Response. + +basic_auth() -> + %% Get the User and Password from sys.config + {ok, BasicAuth} = application:get_env(rest, basic_auth), + User = proplists:get_value(username, BasicAuth), + Password = proplists:get_value(password, BasicAuth), + {security, + {authorization_basic, [#{username => User, password => Password}]}}.