diff --git a/config/config.exs b/config/config.exs index 8b57a08e..edb4cc7f 100644 --- a/config/config.exs +++ b/config/config.exs @@ -24,4 +24,6 @@ config :jellyfish, divo: "docker-compose.yaml", divo_wait: [dwell: 1_500, max_tries: 50] +config :ex_aws, :http_client, Jellyfish.Component.HLS.HTTPoison + import_config "#{config_env()}.exs" diff --git a/config/test.exs b/config/test.exs index 6a97c612..70687bb4 100644 --- a/config/test.exs +++ b/config/test.exs @@ -15,3 +15,5 @@ config :logger, level: :warning # Initialize plugs at runtime for faster test compilation config :phoenix, :plug_init_mode, :runtime + +config :ex_aws, :http_client, ExAws.Request.HttpMock diff --git a/lib/jellyfish/application.ex b/lib/jellyfish/application.ex index b3bf7fba..b5842d99 100644 --- a/lib/jellyfish/application.ex +++ b/lib/jellyfish/application.ex @@ -32,7 +32,8 @@ defmodule Jellyfish.Application do {Registry, keys: :unique, name: Jellyfish.RequestHandlerRegistry}, # Start the Telemetry supervisor (must be started after Jellyfish.RoomRegistry) JellyfishWeb.Telemetry, - {Task.Supervisor, name: Jellyfish.TaskSupervisor} + {Task.Supervisor, name: Jellyfish.TaskSupervisor}, + {DynamicSupervisor, name: Jellyfish.HLS.ManagerSupervisor, strategy: :one_for_one} ] ++ if dist_config[:enabled] do config_distribution(dist_config) diff --git a/lib/jellyfish/component/hls.ex b/lib/jellyfish/component/hls.ex index 092b38c0..72670cf4 100644 --- a/lib/jellyfish/component/hls.ex +++ b/lib/jellyfish/component/hls.ex @@ -25,8 +25,7 @@ defmodule Jellyfish.Component.HLS do @impl true def config(options) do - with {:ok, valid_opts} <- OpenApiSpex.cast_value(options, Options.schema()) do - valid_opts = valid_opts |> Map.from_struct() |> Map.new(fn {k, v} -> {underscore(k), v} end) + with {:ok, valid_opts} <- serialize_options(options) do hls_config = create_hls_config(options.room_id, valid_opts) metadata = @@ -70,6 +69,19 @@ defmodule Jellyfish.Component.HLS do Path.join([base_path, "temporary_hls", "#{room_id}"]) end + def serialize_options(options) do + with {:ok, valid_opts} <- OpenApiSpex.Cast.cast(Options.schema(), options) do + valid_opts = + valid_opts + |> Map.from_struct() + |> Map.new(fn {k, v} -> {underscore(k), serialize(v)} end) + + {:ok, valid_opts} + else + {:error, _reason} = error -> error + end + end + defp create_hls_config( room_id, %{ @@ -101,4 +113,9 @@ defmodule Jellyfish.Component.HLS do end defp underscore(k), do: k |> Atom.to_string() |> Macro.underscore() |> String.to_atom() + + defp serialize(v) when is_struct(v), + do: v |> Map.from_struct() |> Map.new(fn {k, v} -> {underscore(k), v} end) + + defp serialize(v), do: v end diff --git a/lib/jellyfish/component/hls/httpoison.ex b/lib/jellyfish/component/hls/httpoison.ex new file mode 100644 index 00000000..352ff817 --- /dev/null +++ b/lib/jellyfish/component/hls/httpoison.ex @@ -0,0 +1,19 @@ +defmodule Jellyfish.Component.HLS.HTTPoison do + @moduledoc false + + @behaviour ExAws.Request.HttpClient + + @impl true + def request(method, url, body \\ "", headers \\ [], http_opts \\ []) do + case HTTPoison.request(method, url, body, headers, http_opts) do + {:ok, %HTTPoison.Response{status_code: status, headers: headers}} -> + {:ok, %{status_code: status, headers: headers}} + + {:ok, %HTTPoison.Response{status_code: status, headers: headers, body: body}} -> + {:ok, %{status_code: status, headers: headers, body: body}} + + {:error, %HTTPoison.Error{reason: reason}} -> + {:error, reason} + end + end +end diff --git a/lib/jellyfish/component/hls/manager.ex b/lib/jellyfish/component/hls/manager.ex new file mode 100644 index 00000000..cfda5e14 --- /dev/null +++ b/lib/jellyfish/component/hls/manager.ex @@ -0,0 +1,121 @@ +defmodule Jellyfish.Component.HLS.Manager do + @moduledoc """ + Module responsible for HLS processing. + It: + * uploads HLS playlist to S3 + * removes HLS playlist from a disk + """ + + use GenServer, restart: :temporary + use Bunch + + require Logger + + alias Jellyfish.Room + + @hls_extensions [".m4s", ".m3u8", ".mp4"] + @playlist_content_type "application/vnd.apple.mpegurl" + + @type options :: %{ + room_id: Room.id(), + engine_pid: pid(), + hls_dir: String.t(), + hls_options: map() + } + + @spec start(Room.id(), pid(), String.t(), map()) :: {:ok, pid()} | {:error, term()} + def start(room_id, engine_pid, hls_dir, hls_options) do + DynamicSupervisor.start_child( + Jellyfish.HLS.ManagerSupervisor, + {__MODULE__, + %{room_id: room_id, engine_pid: engine_pid, hls_dir: hls_dir, hls_options: hls_options}} + ) + end + + @spec start_link(options()) :: GenServer.on_start() + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + @impl true + def init(%{engine_pid: engine_pid, room_id: room_id} = state) do + Process.monitor(engine_pid) + Logger.info("Initialize hls manager, room: #{room_id}") + + {:ok, state} + end + + @impl true + def handle_info( + {:DOWN, _ref, :process, engine_pid, _reason}, + %{engine_pid: engine_pid, hls_options: hls_options, hls_dir: hls_dir, room_id: room_id} = + state + ) do + unless is_nil(hls_options.s3), do: upload_to_s3(hls_dir, room_id, hls_options.s3) + unless hls_options.persistent, do: remove_hls(hls_dir, room_id) + + {:stop, :normal, state} + end + + defp upload_to_s3(hls_dir, room_id, credentials) do + Logger.info("Start uploading to s3, room: #{room_id}") + + config = create_aws_config(credentials) + + result = + hls_dir + |> get_hls_files() + |> Bunch.Enum.try_each(fn file -> + content = get_content(hls_dir, file) + s3_path = get_s3_path(room_id, file) + opts = get_options(file) + + upload_file_to_s3(content, s3_path, opts, config, credentials) + end) + + Logger.info("Finished uploading to s3 with result: #{result}, room: #{room_id}") + end + + defp upload_file_to_s3(content, s3_path, opts, config, credentials) do + result = + credentials.bucket + |> ExAws.S3.put_object(s3_path, content, opts) + |> ExAws.request(config) + + case result do + {:ok, _value} -> :ok + error -> error + end + end + + defp get_hls_files(hls_dir) do + hls_dir + |> File.ls!() + |> Enum.filter(fn file -> String.ends_with?(file, @hls_extensions) end) + end + + defp create_aws_config(credentials) do + credentials + |> Enum.reject(fn {key, _value} -> key == :bucket end) + |> then(&ExAws.Config.new(:s3, &1)) + |> Map.to_list() + end + + defp get_content(hls_dir, file) do + {:ok, content} = hls_dir |> Path.join(file) |> File.read() + content + end + + defp get_options(file) do + if String.ends_with?(file, ".m3u8"), + do: [content_type: @playlist_content_type], + else: [] + end + + defp get_s3_path(room_id, file), do: Path.join(room_id, file) + + defp remove_hls(hls_dir, room_id) do + File.rm_rf!(hls_dir) + Logger.info("Remove hls from a disk, room: #{room_id}") + end +end diff --git a/lib/jellyfish/room.ex b/lib/jellyfish/room.ex index 54633b2e..a3cbc418 100644 --- a/lib/jellyfish/room.ex +++ b/lib/jellyfish/room.ex @@ -12,6 +12,7 @@ defmodule Jellyfish.Room do alias Jellyfish.Component.{HLS, RTSP} alias Jellyfish.Event alias Jellyfish.Peer + alias Membrane.ICE.TURNManager alias Membrane.RTC.Engine alias Membrane.RTC.Engine.Message @@ -235,10 +236,17 @@ defmodule Jellyfish.Room do options ) + component_options = Map.delete(options, "s3") + with :ok <- check_component_allowed(component_type, state), - {:ok, component} <- Component.new(component_type, options) do + {:ok, component} <- Component.new(component_type, component_options) do state = put_in(state, [:components, component.id], component) - if component_type == HLS, do: on_hls_startup(state.id, component.metadata) + + if component_type == HLS do + on_hls_startup(state.id, component.metadata) + spawn_hls_manager(options) + end + :ok = Engine.add_endpoint(state.engine_pid, component.engine_endpoint, id: component.id) Logger.info("Added component #{inspect(component.id)}") @@ -436,12 +444,7 @@ defmodule Jellyfish.Room do defp spawn_request_handler(room_id), do: HLS.RequestHandler.start(room_id) - defp on_hls_removal(room_id, %{low_latency: low_latency, persistent: persistent}) do - unless persistent do - {:ok, path} = HLS.EtsHelper.get_hls_folder_path(room_id) - File.rm_rf!(path) - end - + defp on_hls_removal(room_id, %{low_latency: low_latency}) do HLS.EtsHelper.delete_hls_folder_path(room_id) if low_latency, do: remove_request_handler(room_id) @@ -478,4 +481,11 @@ defmodule Jellyfish.Room do defp hls_component_already_present?(components), do: components |> Map.values() |> Enum.any?(&(&1.type == HLS)) + + defp spawn_hls_manager(%{engine_pid: engine_pid, room_id: room_id} = options) do + {:ok, hls_dir} = HLS.EtsHelper.get_hls_folder_path(room_id) + {:ok, valid_opts} = HLS.serialize_options(options) + + {:ok, _pid} = HLS.Manager.start(room_id, engine_pid, hls_dir, valid_opts) + end end diff --git a/lib/jellyfish_web/api_spec/component/hls.ex b/lib/jellyfish_web/api_spec/component/hls.ex index b87bfd00..d4a6764a 100644 --- a/lib/jellyfish_web/api_spec/component/hls.ex +++ b/lib/jellyfish_web/api_spec/component/hls.ex @@ -37,6 +37,38 @@ defmodule JellyfishWeb.ApiSpec.Component.HLS do }) end + defmodule S3 do + @moduledoc false + + require OpenApiSpex + + OpenApiSpex.schema(%{ + title: "S3Credentials", + description: + "An AWS S3 credential that will be used to send HLS stream. The stream will only be uploaded if credentials are provided", + type: :object, + properties: %{ + accessKeyId: %Schema{ + type: :string, + description: "An AWS access key identifier, linked to your AWS account." + }, + secretAccessKey: %Schema{ + type: :string, + description: "The secret key that is linked to the Access Key ID." + }, + region: %Schema{ + type: :string, + description: "The AWS region where your bucket is located." + }, + bucket: %Schema{ + type: :string, + description: "The name of the S3 bucket where your data will be stored." + } + }, + required: [:accessKeyId, :secretAccessKey, :region, :bucket] + }) + end + defmodule Options do @moduledoc false @@ -61,6 +93,12 @@ defmodule JellyfishWeb.ApiSpec.Component.HLS do type: :boolean, description: "Whether the video is stored after end of stream", default: false + }, + s3: %Schema{ + type: :object, + description: "Credentials to AWS S3 bucket.", + oneOf: [S3], + nullable: true } }, required: [] diff --git a/mix.exs b/mix.exs index b4dd6fd4..32b15481 100644 --- a/mix.exs +++ b/mix.exs @@ -57,6 +57,12 @@ defmodule Jellyfish.MixProject do {:cors_plug, "~> 3.0"}, {:open_api_spex, "~> 3.16"}, {:ymlr, "~> 3.0"}, + {:bunch, "~> 1.6"}, + + # aws deps + {:ex_aws, "~> 2.1"}, + {:ex_aws_s3, "~> 2.0"}, + {:sweet_xml, "~> 0.6"}, # protobuf deps {:protobuf, "~> 0.12.0"}, @@ -85,6 +91,7 @@ defmodule Jellyfish.MixProject do # Test deps {:websockex, "~> 0.4.3", only: [:test, :ci], runtime: false}, {:excoveralls, "~> 0.15.0", only: :test, runtime: false}, + {:mox, "~> 1.0", only: [:test, :ci]}, # Load balancing tests {:divo, "~> 1.3.1", only: [:test, :ci]} diff --git a/mix.lock b/mix.lock index 8d4354fc..eece57b5 100644 --- a/mix.lock +++ b/mix.lock @@ -18,6 +18,8 @@ "divo": {:hex, :divo, "1.3.2", "3a5ce880a1fe930ea804361d1b57b5144129e79e1c856623d923a6fab6d539a1", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:patiently, "~> 0.2", [hex: :patiently, repo: "hexpm", optional: false]}], "hexpm", "4bd035510838959709db2cacd28edd2eda7948d0e7f1b0dfa810a134c913a88a"}, "elixir_make": {:hex, :elixir_make, "0.7.7", "7128c60c2476019ed978210c245badf08b03dbec4f24d05790ef791da11aa17c", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "5bc19fff950fad52bbe5f211b12db9ec82c6b34a9647da0c2224b8b8464c7e6c"}, "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, + "ex_aws": {:hex, :ex_aws, "2.5.0", "1785e69350b16514c1049330537c7da10039b1a53e1d253bbd703b135174aec3", [:mix], [{:configparser_ex, "~> 4.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8 or ~> 3.0", [hex: :jsx, repo: "hexpm", optional: true]}, {:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:sweet_xml, "~> 0.7", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "971b86e5495fc0ae1c318e35e23f389e74cf322f2c02d34037c6fc6d405006f1"}, + "ex_aws_s3": {:hex, :ex_aws_s3, "2.5.2", "cee302b8e9ee198cc0d89f1de2a7d6a8921e1a556574476cf5590d2156590fe3", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "cc5bd945a22a99eece4721d734ae2452d3717e81c357a781c8574663254df4a1"}, "ex_dtls": {:hex, :ex_dtls, "0.12.0", "648522f53340b42301eae57627bb8276555be508ec1010561e606b1621d9d2e9", [:mix], [{:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "0bc2d0de146e7cf9d85eb8d2c0a6a518479a66a2ded8a79c0960eced23fe73a9"}, "ex_libsrtp": {:hex, :ex_libsrtp, "0.6.0", "d96cd7fc1780157614f0bf47d31587e5eab953b43067f4885849f8177ec452a9", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "e9ce8a507a658f7e2df72fae82a4b3ba0a056c175f0bc490e79ab03058e094d5"}, "ex_sdp": {:hex, :ex_sdp, "0.11.0", "19e3af1d70b945381752db3139dfc22a19da1e9394036721449b7fb8c49fe039", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:uuid, "~> 1.1", [hex: :uuid, repo: "hexpm", optional: false]}], "hexpm", "7a3fe42f4ec0c18de09b10464829c27482d81d9c50c21bdebdbcfe17d2046408"}, @@ -76,6 +78,7 @@ "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, "mint": {:hex, :mint, "1.5.1", "8db5239e56738552d85af398798c80648db0e90f343c8469f6c6d8898944fb6f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4a63e1e76a7c3956abd2c72f370a0d0aecddc3976dea5c27eccbecfa5e7d5b1e"}, "mockery": {:hex, :mockery, "2.3.1", "a02fd60b10ac9ed37a7a2ecf6786c1f1dd5c75d2b079a60594b089fba32dc087", [:mix], [], "hexpm", "1d0971d88ebf084e962da3f2cfee16f0ea8e04ff73a7710428500d4500b947fa"}, + "mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"}, "nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"}, "nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"}, "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, @@ -104,6 +107,7 @@ "shmex": {:hex, :shmex, "0.5.0", "7dc4fb1a8bd851085a652605d690bdd070628717864b442f53d3447326bcd3e8", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "b67bb1e22734758397c84458dbb746519e28eac210423c267c7248e59fc97bdc"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "statistics": {:hex, :statistics, "0.6.2", "213dcedc2b3ae7fb775b5510ea9630c66d3c0019ea2f86d5096559853623a60d", [:mix], [], "hexpm", "329f1008dc4ad24430d94c04b52ff09d5fb435ab11f34360831f11eb0c391c17"}, + "sweet_xml": {:hex, :sweet_xml, "0.7.4", "a8b7e1ce7ecd775c7e8a65d501bc2cd933bff3a9c41ab763f5105688ef485d08", [:mix], [], "hexpm", "e7c4b0bdbf460c928234951def54fe87edf1a170f6896675443279e2dbeba167"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, "telemetry_metrics": {:hex, :telemetry_metrics, "0.6.1", "315d9163a1d4660aedc3fee73f33f1d355dcc76c5c3ab3d59e76e3edf80eef1f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7be9e0871c41732c233be71e4be11b96e56177bf15dde64a8ac9ce72ac9834c6"}, "telemetry_metrics_prometheus": {:hex, :telemetry_metrics_prometheus, "1.1.0", "1cc23e932c1ef9aa3b91db257ead31ea58d53229d407e059b29bb962c1505a13", [:mix], [{:plug_cowboy, "~> 2.1", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.0", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: false]}], "hexpm", "d43b3659b3244da44fe0275b717701542365d4519b79d9ce895b9719c1ce4d26"}, diff --git a/openapi.yaml b/openapi.yaml index 5da7ec56..f252be50 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -103,6 +103,12 @@ components: default: false description: Whether the video is stored after end of stream type: boolean + s3: + description: Credentials to AWS S3 bucket. + nullable: true + oneOf: + - $ref: '#/components/schemas/S3Credentials' + type: object targetWindowDuration: description: Duration of stream available for viewer nullable: true @@ -381,6 +387,29 @@ components: title: RoomsListingResponse type: object x-struct: Elixir.JellyfishWeb.ApiSpec.RoomsListingResponse + S3Credentials: + description: An AWS S3 credential that will be used to send HLS stream. The stream will only be uploaded if credentials are provided + properties: + accessKeyId: + description: An AWS access key identifier, linked to your AWS account. + type: string + bucket: + description: The name of the S3 bucket where your data will be stored. + type: string + region: + description: The AWS region where your bucket is located. + type: string + secretAccessKey: + description: The secret key that is linked to the Access Key ID. + type: string + required: + - accessKeyId + - secretAccessKey + - region + - bucket + title: S3Credentials + type: object + x-struct: Elixir.JellyfishWeb.ApiSpec.Component.HLS.S3 securitySchemes: authorization: scheme: bearer diff --git a/test/jellyfish/component/hls/manager_test.exs b/test/jellyfish/component/hls/manager_test.exs new file mode 100644 index 00000000..8dc41279 --- /dev/null +++ b/test/jellyfish/component/hls/manager_test.exs @@ -0,0 +1,102 @@ +defmodule Jellyfish.Component.HLS.ManagerTest do + @moduledoc false + + use ExUnit.Case + + import Mox + + alias Jellyfish.Component.HLS + alias Jellyfish.Component.HLS.Manager + + @files ["manifest.m3u8", "header.mp4", "segment_1.m3u8", "segment_2.m3u8"] + @body <<1, 2, 3, 4>> + @s3_credentials %{ + access_key_id: "access_key_id", + secret_access_key: "secret_access_key", + region: "region", + bucket: "bucket" + } + + setup do + room_id = UUID.uuid4() + hls_dir = HLS.output_dir(room_id, persistent: false) + options = %{s3: nil, persistent: true} + + File.mkdir_p!(hls_dir) + for filename <- @files, do: :ok = hls_dir |> Path.join(filename) |> File.write(@body) + + on_exit(fn -> File.rm_rf!(hls_dir) end) + + {:ok, %{room_id: room_id, hls_dir: hls_dir, options: options}} + end + + setup :verify_on_exit! + setup :set_mox_from_context + + test "Spawn manager without credentials", %{ + room_id: room_id, + hls_dir: hls_dir, + options: options + } do + http_mock_expect(0) + pid = start_mock_engine() + + {:ok, manager} = Manager.start(room_id, pid, hls_dir, options) + ref = Process.monitor(manager) + + kill_mock_engine(pid) + + assert_receive {:DOWN, ^ref, :process, ^manager, :normal} + assert length(File.ls!(hls_dir)) == 4 + end + + test "Spawn manager with credentials", %{room_id: room_id, hls_dir: hls_dir, options: options} do + http_mock_expect(4) + pid = start_mock_engine() + + {:ok, manager} = Manager.start(room_id, pid, hls_dir, %{options | s3: @s3_credentials}) + ref = Process.monitor(manager) + + kill_mock_engine(pid) + + assert_receive {:DOWN, ^ref, :process, ^manager, :normal} + assert length(File.ls!(hls_dir)) == 4 + end + + test "Spawn manager with persistent false", %{ + room_id: room_id, + hls_dir: hls_dir, + options: options + } do + http_mock_expect(0) + pid = start_mock_engine() + + {:ok, manager} = Manager.start(room_id, pid, hls_dir, %{options | persistent: false}) + ref = Process.monitor(manager) + + kill_mock_engine(pid) + + assert_receive {:DOWN, ^ref, :process, ^manager, :normal} + assert {:error, _} = File.ls(hls_dir) + end + + defp http_mock_expect(n) do + expect(ExAws.Request.HttpMock, :request, n, fn _method, + _url, + _req_body, + _headers, + _http_opts -> + {:ok, %{status_code: 200, headers: %{}}} + end) + end + + defp start_mock_engine(), + do: + spawn(fn -> + receive do + :stop -> nil + end + end) + + defp kill_mock_engine(pid), do: send(pid, :stop) +end diff --git a/test/jellyfish_web/controllers/component_controller_test.exs b/test/jellyfish_web/controllers/component_controller_test.exs index 0d7b0c24..46f7ee94 100644 --- a/test/jellyfish_web/controllers/component_controller_test.exs +++ b/test/jellyfish_web/controllers/component_controller_test.exs @@ -2,11 +2,14 @@ defmodule JellyfishWeb.ComponentControllerTest do use JellyfishWeb.ConnCase import OpenApiSpex.TestAssertions + import Mox alias Jellyfish.Component.HLS @schema JellyfishWeb.ApiSpec.spec() @source_uri "rtsp://placeholder-19inrifjbsjb.it:12345/afwefae" + @files ["manifest.m3u8", "header.mp4", "segment_1.m3u8", "segment_2.m3u8"] + @body <<1, 2, 3, 4>> setup %{conn: conn} do server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) @@ -111,6 +114,74 @@ defmodule JellyfishWeb.ComponentControllerTest do assert {:ok, _removed_files} = room_id |> HLS.Recording.directory() |> File.rm_rf() end + setup :set_mox_from_context + setup :verify_on_exit! + + test "renders component with s3 credentials", %{conn: conn} do + conn = post(conn, ~p"/room", videoCodec: "h264") + assert %{"id" => room_id} = json_response(conn, :created)["data"]["room"] + + bucket = "bucket" + + conn = + post(conn, ~p"/room/#{room_id}/component", + type: "hls", + options: %{ + persistent: false, + s3: %{ + accessKeyId: "access_key_id", + secretAccessKey: "secret_access_key", + region: "region", + bucket: bucket + } + } + ) + + assert response = + %{ + "data" => %{ + "type" => "hls", + "metadata" => %{ + "playable" => false, + "lowLatency" => false, + "persistent" => false, + "targetWindowDuration" => nil + } + } + } = + json_response(conn, :created) + + parent = self() + ref = make_ref() + + expect(ExAws.Request.HttpMock, :request, 4, fn _method, + url, + req_body, + _headers, + _http_opts -> + assert req_body == @body + assert String.contains?(url, bucket) + assert String.ends_with?(url, @files) + + send(parent, {ref, :request}) + {:ok, %{status_code: 200, headers: %{}}} + end) + + assert_response_schema(response, "ComponentDetailsResponse", @schema) + assert_hls_path(room_id, persistent: false) + + # waits for directory to be created + # then adds 4 files to it + add_files_for_s3_upload(room_id) + + conn = delete(conn, ~p"/room/#{room_id}") + assert response(conn, :no_content) + + # above we created 4 files + # so there should be axactly 4 requests + for _ <- 1..4, do: assert_receive({^ref, :request}, 10_000) + end + test "renders component with targetWindowDuration set", %{conn: conn} do conn = post(conn, ~p"/room", videoCodec: "h264") assert %{"id" => room_id} = json_response(conn, :created)["data"]["room"] @@ -297,4 +368,22 @@ defmodule JellyfishWeb.ComponentControllerTest do defp assert_no_hls_path(room_id) do assert {:error, :room_not_found} = HLS.EtsHelper.get_hls_folder_path(room_id) end + + defp add_files_for_s3_upload(room_id) do + {:ok, hls_dir} = HLS.EtsHelper.get_hls_folder_path(room_id) + assert :ok = wait_for_folder(hls_dir, 1000) + + for filename <- @files, do: :ok = hls_dir |> Path.join(filename) |> File.write(@body) + end + + defp wait_for_folder(_hls_dir, milliseconds) when milliseconds < 0, do: {:error, :timeout} + + defp wait_for_folder(hls_dir, milliseconds) do + if File.exists?(hls_dir) do + :ok + else + Process.sleep(100) + wait_for_folder(hls_dir, milliseconds - 100) + end + end end diff --git a/test/test_helper.exs b/test/test_helper.exs index 23780d53..c614993d 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -1,3 +1,5 @@ Application.put_env(:jellyfish, :output_base_path, "tmp/hls_output/") +Mox.defmock(ExAws.Request.HttpMock, for: ExAws.Request.HttpClient) + ExUnit.start(capture_log: true)