Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ config :jellyfish,
divo: "docker-compose.yaml",
divo_wait: [dwell: 1_500, max_tries: 50]

config :ex_aws, :http_client, Jellyfish.Component.HLS.HTTPoison

import_config "#{config_env()}.exs"
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ config :logger, level: :warning

# Initialize plugs at runtime for faster test compilation
config :phoenix, :plug_init_mode, :runtime

config :ex_aws, :http_client, ExAws.Request.HttpMock
3 changes: 2 additions & 1 deletion lib/jellyfish/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ defmodule Jellyfish.Application do
{Registry, keys: :unique, name: Jellyfish.RequestHandlerRegistry},
# Start the Telemetry supervisor (must be started after Jellyfish.RoomRegistry)
JellyfishWeb.Telemetry,
{Task.Supervisor, name: Jellyfish.TaskSupervisor}
{Task.Supervisor, name: Jellyfish.TaskSupervisor},
{DynamicSupervisor, name: Jellyfish.HLS.ManagerSupervisor, strategy: :one_for_one}
] ++
if dist_config[:enabled] do
config_distribution(dist_config)
Expand Down
21 changes: 19 additions & 2 deletions lib/jellyfish/component/hls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ defmodule Jellyfish.Component.HLS do

@impl true
def config(options) do
with {:ok, valid_opts} <- OpenApiSpex.cast_value(options, Options.schema()) do
valid_opts = valid_opts |> Map.from_struct() |> Map.new(fn {k, v} -> {underscore(k), v} end)
with {:ok, valid_opts} <- serialize_options(options) do
hls_config = create_hls_config(options.room_id, valid_opts)

metadata =
Expand Down Expand Up @@ -70,6 +69,19 @@ defmodule Jellyfish.Component.HLS do
Path.join([base_path, "temporary_hls", "#{room_id}"])
end

def serialize_options(options) do
with {:ok, valid_opts} <- OpenApiSpex.Cast.cast(Options.schema(), options) do
valid_opts =
valid_opts
|> Map.from_struct()
|> Map.new(fn {k, v} -> {underscore(k), serialize(v)} end)

{:ok, valid_opts}
else
{:error, _reason} = error -> error
end
end

defp create_hls_config(
room_id,
%{
Expand Down Expand Up @@ -101,4 +113,9 @@ defmodule Jellyfish.Component.HLS do
end

defp underscore(k), do: k |> Atom.to_string() |> Macro.underscore() |> String.to_atom()

defp serialize(v) when is_struct(v),
do: v |> Map.from_struct() |> Map.new(fn {k, v} -> {underscore(k), v} end)

defp serialize(v), do: v
end
19 changes: 19 additions & 0 deletions lib/jellyfish/component/hls/httpoison.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
defmodule Jellyfish.Component.HLS.HTTPoison do
@moduledoc false

@behaviour ExAws.Request.HttpClient

@impl true
def request(method, url, body \\ "", headers \\ [], http_opts \\ []) do
case HTTPoison.request(method, url, body, headers, http_opts) do
{:ok, %HTTPoison.Response{status_code: status, headers: headers}} ->
{:ok, %{status_code: status, headers: headers}}

{:ok, %HTTPoison.Response{status_code: status, headers: headers, body: body}} ->
{:ok, %{status_code: status, headers: headers, body: body}}

{:error, %HTTPoison.Error{reason: reason}} ->
{:error, reason}
end
end
end
121 changes: 121 additions & 0 deletions lib/jellyfish/component/hls/manager.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
defmodule Jellyfish.Component.HLS.Manager do
Comment thread
mickel8 marked this conversation as resolved.
@moduledoc """
Module responsible for HLS processing.
Comment thread
mickel8 marked this conversation as resolved.
It:
* uploads HLS playlist to S3
* removes HLS playlist from a disk
"""

use GenServer, restart: :temporary
use Bunch

require Logger

alias Jellyfish.Room

@hls_extensions [".m4s", ".m3u8", ".mp4"]
@playlist_content_type "application/vnd.apple.mpegurl"

@type options :: %{
room_id: Room.id(),
engine_pid: pid(),
hls_dir: String.t(),
hls_options: map()
}

@spec start(Room.id(), pid(), String.t(), map()) :: {:ok, pid()} | {:error, term()}
def start(room_id, engine_pid, hls_dir, hls_options) do
DynamicSupervisor.start_child(
Jellyfish.HLS.ManagerSupervisor,
{__MODULE__,
%{room_id: room_id, engine_pid: engine_pid, hls_dir: hls_dir, hls_options: hls_options}}
)
end

@spec start_link(options()) :: GenServer.on_start()
def start_link(opts) do
Comment thread
mickel8 marked this conversation as resolved.
GenServer.start_link(__MODULE__, opts)
end

@impl true
def init(%{engine_pid: engine_pid, room_id: room_id} = state) do
Process.monitor(engine_pid)
Logger.info("Initialize hls manager, room: #{room_id}")

{:ok, state}
end

@impl true
def handle_info(
{:DOWN, _ref, :process, engine_pid, _reason},
%{engine_pid: engine_pid, hls_options: hls_options, hls_dir: hls_dir, room_id: room_id} =
state
) do
unless is_nil(hls_options.s3), do: upload_to_s3(hls_dir, room_id, hls_options.s3)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't that work?

Suggested change
unless is_nil(hls_options.s3), do: upload_to_s3(hls_dir, room_id, hls_options.s3)
if hls_options.s3, do: upload_to_s3(hls_dir, room_id, hls_options.s3)

unless hls_options.persistent, do: remove_hls(hls_dir, room_id)

{:stop, :normal, state}
end

defp upload_to_s3(hls_dir, room_id, credentials) do
Logger.info("Start uploading to s3, room: #{room_id}")

config = create_aws_config(credentials)

result =
hls_dir
|> get_hls_files()
|> Bunch.Enum.try_each(fn file ->
Comment thread
Karolk99 marked this conversation as resolved.
content = get_content(hls_dir, file)
s3_path = get_s3_path(room_id, file)
opts = get_options(file)

upload_file_to_s3(content, s3_path, opts, config, credentials)
end)

Logger.info("Finished uploading to s3 with result: #{result}, room: #{room_id}")
end

defp upload_file_to_s3(content, s3_path, opts, config, credentials) do
result =
credentials.bucket
|> ExAws.S3.put_object(s3_path, content, opts)
|> ExAws.request(config)

case result do
{:ok, _value} -> :ok
error -> error
end
end

defp get_hls_files(hls_dir) do
hls_dir
|> File.ls!()
|> Enum.filter(fn file -> String.ends_with?(file, @hls_extensions) end)
end

defp create_aws_config(credentials) do
credentials
|> Enum.reject(fn {key, _value} -> key == :bucket end)
|> then(&ExAws.Config.new(:s3, &1))
|> Map.to_list()
end

defp get_content(hls_dir, file) do
{:ok, content} = hls_dir |> Path.join(file) |> File.read()
content
end

defp get_options(file) do
if String.ends_with?(file, ".m3u8"),
do: [content_type: @playlist_content_type],
else: []
end

defp get_s3_path(room_id, file), do: Path.join(room_id, file)

defp remove_hls(hls_dir, room_id) do
File.rm_rf!(hls_dir)
Logger.info("Remove hls from a disk, room: #{room_id}")
end
end
26 changes: 18 additions & 8 deletions lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ defmodule Jellyfish.Room do
alias Jellyfish.Component.{HLS, RTSP}
alias Jellyfish.Event
alias Jellyfish.Peer

alias Membrane.ICE.TURNManager
alias Membrane.RTC.Engine
alias Membrane.RTC.Engine.Message
Expand Down Expand Up @@ -235,10 +236,17 @@ defmodule Jellyfish.Room do
options
)

component_options = Map.delete(options, "s3")

with :ok <- check_component_allowed(component_type, state),
{:ok, component} <- Component.new(component_type, options) do
{:ok, component} <- Component.new(component_type, component_options) do
state = put_in(state, [:components, component.id], component)
if component_type == HLS, do: on_hls_startup(state.id, component.metadata)

if component_type == HLS do
on_hls_startup(state.id, component.metadata)
spawn_hls_manager(options)
end

:ok = Engine.add_endpoint(state.engine_pid, component.engine_endpoint, id: component.id)

Logger.info("Added component #{inspect(component.id)}")
Expand Down Expand Up @@ -436,12 +444,7 @@ defmodule Jellyfish.Room do
defp spawn_request_handler(room_id),
do: HLS.RequestHandler.start(room_id)

defp on_hls_removal(room_id, %{low_latency: low_latency, persistent: persistent}) do
unless persistent do
{:ok, path} = HLS.EtsHelper.get_hls_folder_path(room_id)
File.rm_rf!(path)
end

defp on_hls_removal(room_id, %{low_latency: low_latency}) do
HLS.EtsHelper.delete_hls_folder_path(room_id)

if low_latency, do: remove_request_handler(room_id)
Expand Down Expand Up @@ -478,4 +481,11 @@ defmodule Jellyfish.Room do

defp hls_component_already_present?(components),
do: components |> Map.values() |> Enum.any?(&(&1.type == HLS))

defp spawn_hls_manager(%{engine_pid: engine_pid, room_id: room_id} = options) do
{:ok, hls_dir} = HLS.EtsHelper.get_hls_folder_path(room_id)
{:ok, valid_opts} = HLS.serialize_options(options)

{:ok, _pid} = HLS.Manager.start(room_id, engine_pid, hls_dir, valid_opts)
end
end
38 changes: 38 additions & 0 deletions lib/jellyfish_web/api_spec/component/hls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,38 @@ defmodule JellyfishWeb.ApiSpec.Component.HLS do
})
end

defmodule S3 do
@moduledoc false

require OpenApiSpex

OpenApiSpex.schema(%{
title: "S3Credentials",
description:
"An AWS S3 credential that will be used to send HLS stream. The stream will only be uploaded if credentials are provided",
type: :object,
properties: %{
accessKeyId: %Schema{
type: :string,
description: "An AWS access key identifier, linked to your AWS account."
},
secretAccessKey: %Schema{
type: :string,
description: "The secret key that is linked to the Access Key ID."
},
region: %Schema{
type: :string,
description: "The AWS region where your bucket is located."
},
bucket: %Schema{
type: :string,
description: "The name of the S3 bucket where your data will be stored."
}
},
required: [:accessKeyId, :secretAccessKey, :region, :bucket]
})
end

defmodule Options do
@moduledoc false

Expand All @@ -61,6 +93,12 @@ defmodule JellyfishWeb.ApiSpec.Component.HLS do
type: :boolean,
description: "Whether the video is stored after end of stream",
default: false
},
s3: %Schema{
type: :object,
description: "Credentials to AWS S3 bucket.",
oneOf: [S3],
nullable: true
}
},
required: []
Expand Down
7 changes: 7 additions & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ defmodule Jellyfish.MixProject do
{:cors_plug, "~> 3.0"},
{:open_api_spex, "~> 3.16"},
{:ymlr, "~> 3.0"},
{:bunch, "~> 1.6"},

# aws deps
{:ex_aws, "~> 2.1"},
{:ex_aws_s3, "~> 2.0"},
{:sweet_xml, "~> 0.6"},

# protobuf deps
{:protobuf, "~> 0.12.0"},
Expand Down Expand Up @@ -85,6 +91,7 @@ defmodule Jellyfish.MixProject do
# Test deps
{:websockex, "~> 0.4.3", only: [:test, :ci], runtime: false},
{:excoveralls, "~> 0.15.0", only: :test, runtime: false},
{:mox, "~> 1.0", only: [:test, :ci]},

# Load balancing tests
{:divo, "~> 1.3.1", only: [:test, :ci]}
Expand Down
4 changes: 4 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
"divo": {:hex, :divo, "1.3.2", "3a5ce880a1fe930ea804361d1b57b5144129e79e1c856623d923a6fab6d539a1", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: false]}, {:patiently, "~> 0.2", [hex: :patiently, repo: "hexpm", optional: false]}], "hexpm", "4bd035510838959709db2cacd28edd2eda7948d0e7f1b0dfa810a134c913a88a"},
"elixir_make": {:hex, :elixir_make, "0.7.7", "7128c60c2476019ed978210c245badf08b03dbec4f24d05790ef791da11aa17c", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}], "hexpm", "5bc19fff950fad52bbe5f211b12db9ec82c6b34a9647da0c2224b8b8464c7e6c"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_aws": {:hex, :ex_aws, "2.5.0", "1785e69350b16514c1049330537c7da10039b1a53e1d253bbd703b135174aec3", [:mix], [{:configparser_ex, "~> 4.0", [hex: :configparser_ex, repo: "hexpm", optional: true]}, {:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:jsx, "~> 2.8 or ~> 3.0", [hex: :jsx, repo: "hexpm", optional: true]}, {:mime, "~> 1.2 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:sweet_xml, "~> 0.7", [hex: :sweet_xml, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "971b86e5495fc0ae1c318e35e23f389e74cf322f2c02d34037c6fc6d405006f1"},
"ex_aws_s3": {:hex, :ex_aws_s3, "2.5.2", "cee302b8e9ee198cc0d89f1de2a7d6a8921e1a556574476cf5590d2156590fe3", [:mix], [{:ex_aws, "~> 2.0", [hex: :ex_aws, repo: "hexpm", optional: false]}, {:sweet_xml, ">= 0.0.0", [hex: :sweet_xml, repo: "hexpm", optional: true]}], "hexpm", "cc5bd945a22a99eece4721d734ae2452d3717e81c357a781c8574663254df4a1"},
"ex_dtls": {:hex, :ex_dtls, "0.12.0", "648522f53340b42301eae57627bb8276555be508ec1010561e606b1621d9d2e9", [:mix], [{:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "0bc2d0de146e7cf9d85eb8d2c0a6a518479a66a2ded8a79c0960eced23fe73a9"},
"ex_libsrtp": {:hex, :ex_libsrtp, "0.6.0", "d96cd7fc1780157614f0bf47d31587e5eab953b43067f4885849f8177ec452a9", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "e9ce8a507a658f7e2df72fae82a4b3ba0a056c175f0bc490e79ab03058e094d5"},
"ex_sdp": {:hex, :ex_sdp, "0.11.0", "19e3af1d70b945381752db3139dfc22a19da1e9394036721449b7fb8c49fe039", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:uuid, "~> 1.1", [hex: :uuid, repo: "hexpm", optional: false]}], "hexpm", "7a3fe42f4ec0c18de09b10464829c27482d81d9c50c21bdebdbcfe17d2046408"},
Expand Down Expand Up @@ -76,6 +78,7 @@
"mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"},
"mint": {:hex, :mint, "1.5.1", "8db5239e56738552d85af398798c80648db0e90f343c8469f6c6d8898944fb6f", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "4a63e1e76a7c3956abd2c72f370a0d0aecddc3976dea5c27eccbecfa5e7d5b1e"},
"mockery": {:hex, :mockery, "2.3.1", "a02fd60b10ac9ed37a7a2ecf6786c1f1dd5c75d2b079a60594b089fba32dc087", [:mix], [], "hexpm", "1d0971d88ebf084e962da3f2cfee16f0ea8e04ff73a7710428500d4500b947fa"},
"mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"},
"nimble_options": {:hex, :nimble_options, "1.0.2", "92098a74df0072ff37d0c12ace58574d26880e522c22801437151a159392270e", [:mix], [], "hexpm", "fd12a8db2021036ce12a309f26f564ec367373265b53e25403f0ee697380f1b8"},
"nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"},
"numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"},
Expand Down Expand Up @@ -104,6 +107,7 @@
"shmex": {:hex, :shmex, "0.5.0", "7dc4fb1a8bd851085a652605d690bdd070628717864b442f53d3447326bcd3e8", [:mix], [{:bunch_native, "~> 0.5.0", [hex: :bunch_native, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "b67bb1e22734758397c84458dbb746519e28eac210423c267c7248e59fc97bdc"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
"statistics": {:hex, :statistics, "0.6.2", "213dcedc2b3ae7fb775b5510ea9630c66d3c0019ea2f86d5096559853623a60d", [:mix], [], "hexpm", "329f1008dc4ad24430d94c04b52ff09d5fb435ab11f34360831f11eb0c391c17"},
"sweet_xml": {:hex, :sweet_xml, "0.7.4", "a8b7e1ce7ecd775c7e8a65d501bc2cd933bff3a9c41ab763f5105688ef485d08", [:mix], [], "hexpm", "e7c4b0bdbf460c928234951def54fe87edf1a170f6896675443279e2dbeba167"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"telemetry_metrics": {:hex, :telemetry_metrics, "0.6.1", "315d9163a1d4660aedc3fee73f33f1d355dcc76c5c3ab3d59e76e3edf80eef1f", [:mix], [{:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7be9e0871c41732c233be71e4be11b96e56177bf15dde64a8ac9ce72ac9834c6"},
"telemetry_metrics_prometheus": {:hex, :telemetry_metrics_prometheus, "1.1.0", "1cc23e932c1ef9aa3b91db257ead31ea58d53229d407e059b29bb962c1505a13", [:mix], [{:plug_cowboy, "~> 2.1", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:telemetry_metrics_prometheus_core, "~> 1.0", [hex: :telemetry_metrics_prometheus_core, repo: "hexpm", optional: false]}], "hexpm", "d43b3659b3244da44fe0275b717701542365d4519b79d9ce895b9719c1ce4d26"},
Expand Down
29 changes: 29 additions & 0 deletions openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ components:
default: false
description: Whether the video is stored after end of stream
type: boolean
s3:
description: Credentials to AWS S3 bucket.
nullable: true
oneOf:
- $ref: '#/components/schemas/S3Credentials'
type: object
targetWindowDuration:
description: Duration of stream available for viewer
nullable: true
Expand Down Expand Up @@ -381,6 +387,29 @@ components:
title: RoomsListingResponse
type: object
x-struct: Elixir.JellyfishWeb.ApiSpec.RoomsListingResponse
S3Credentials:
description: An AWS S3 credential that will be used to send HLS stream. The stream will only be uploaded if credentials are provided
properties:
accessKeyId:
description: An AWS access key identifier, linked to your AWS account.
type: string
bucket:
description: The name of the S3 bucket where your data will be stored.
type: string
region:
description: The AWS region where your bucket is located.
type: string
secretAccessKey:
description: The secret key that is linked to the Access Key ID.
type: string
required:
- accessKeyId
- secretAccessKey
- region
- bucket
title: S3Credentials
type: object
x-struct: Elixir.JellyfishWeb.ApiSpec.Component.HLS.S3
securitySchemes:
authorization:
scheme: bearer
Expand Down
Loading