Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/jellyfish/component/hls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ defmodule Jellyfish.Component.HLS do
}
}
},
hls_config: hls_config
hls_config: hls_config,
subscribe_mode: valid_opts.subscribe_mode
},
metadata: metadata
}}
Expand All @@ -75,6 +76,7 @@ defmodule Jellyfish.Component.HLS do
valid_opts
|> Map.from_struct()
|> Map.new(fn {k, v} -> {underscore(k), serialize(v)} end)
|> Map.update!(:subscribe_mode, &String.to_atom/1)

{:ok, valid_opts}
else
Expand Down
30 changes: 30 additions & 0 deletions lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ defmodule Jellyfish.Room do

alias Membrane.ICE.TURNManager
alias Membrane.RTC.Engine
alias Membrane.RTC.Engine.Track

alias Membrane.RTC.Engine.Message.{
EndpointAdded,
Expand Down Expand Up @@ -129,6 +130,12 @@ defmodule Jellyfish.Room do
GenServer.call(registry_id(room_id), {:remove_component, component_id})
end

@spec hls_subscribe(id(), [Track.id()]) ::
:ok | {:error, term()}
def hls_subscribe(room_id, tracks) do
GenServer.call(registry_id(room_id), {:hls_subscribe, tracks})
end

@spec receive_media_event(id(), Peer.id(), String.t()) :: :ok
def receive_media_event(room_id, peer_id, event) do
GenServer.cast(registry_id(room_id), {:media_event, peer_id, event})
Expand Down Expand Up @@ -297,6 +304,22 @@ defmodule Jellyfish.Room do
{:reply, reply, state}
end

@impl true
def handle_call({:hls_subscribe, tracks}, _from, state) do
hls_component = hls_component(state)

reply =
case validate_hls_subscription(hls_component) do
:ok ->
Engine.message_endpoint(state.engine_pid, hls_component.id, {:subscribe, tracks})

{:error, _reason} = error ->
error
end

{:reply, reply, state}
end

@impl true
def handle_call(:get_num_forwarded_tracks, _from, state) do
forwarded_tracks = Engine.get_num_forwarded_tracks(state.engine_pid)
Expand Down Expand Up @@ -535,4 +558,11 @@ defmodule Jellyfish.Room do

{:ok, _pid} = HLS.Manager.start(room_id, engine_pid, hls_dir, valid_opts)
end

defp validate_hls_subscription(nil), do: {:error, :hls_component_not_exists}

defp validate_hls_subscription(%{metadata: %{subscribe_mode: :auto}}),
do: {:error, :invalid_subscribe_mode}

defp validate_hls_subscription(%{metadata: %{subscribe_mode: :manual}}), do: :ok
end
15 changes: 14 additions & 1 deletion lib/jellyfish_web/api_spec/component/hls.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,15 @@ defmodule JellyfishWeb.ApiSpec.Component.HLS do
persistent: %Schema{
type: :boolean,
description: "Whether the video is stored after end of stream"
},
subscribeMode: %Schema{
type: :string,
description:
"Whether the HLS component should subscribe to tracks automatically or manually",
enum: ["auto", "manual"]
}
},
required: [:playable, :lowLatency, :persistent, :targetWindowDuration]
required: [:playable, :lowLatency, :persistent, :targetWindowDuration, :subscribeMode]
})
end

Expand Down Expand Up @@ -99,6 +105,13 @@ defmodule JellyfishWeb.ApiSpec.Component.HLS do
description: "Credentials to AWS S3 bucket.",
oneOf: [S3],
nullable: true
},
subscribeMode: %Schema{
type: :string,
description:
"Whether the HLS component should subscribe to tracks automatically or manually.",
enum: ["auto", "manual"],
default: "auto"
}
},
required: []
Expand Down
37 changes: 37 additions & 0 deletions lib/jellyfish_web/api_spec/subscription.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
defmodule JellyfishWeb.ApiSpec.Subscription do
require OpenApiSpex

alias OpenApiSpex.Schema

defmodule Track do
@moduledoc false

require OpenApiSpex

OpenApiSpex.schema(%{
title: "Track",
description: "Track id",
type: :string,
example: "track-1"
})
end

defmodule Tracks do
@moduledoc false

require OpenApiSpex

OpenApiSpex.schema(%{
title: "SubscriptionConfig",
description: "Subscription config",
type: :object,
properties: %{
tracks: %Schema{
type: :array,
description: "List of tracks that hls endpoint will subscribe for",
items: Track
}
}
})
end
end
44 changes: 44 additions & 0 deletions lib/jellyfish_web/controllers/subscription_controller.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
defmodule JellyfishWeb.SubscriptionController do
use JellyfishWeb, :controller
use OpenApiSpex.ControllerSpecs

alias Jellyfish.Room
alias Jellyfish.RoomService
alias JellyfishWeb.ApiSpec
alias OpenApiSpex.Response

action_fallback JellyfishWeb.FallbackController

tags [:hls]

operation :create,
operation_id: "subscribe_tracks",
summary: "Subscribe hls component for tracks",
Comment thread
sgfn marked this conversation as resolved.
parameters: [room_id: [in: :path, description: "Room ID", type: :string]],
request_body: {"Subscribe configuration", "application/json", ApiSpec.Subscription.Tracks},
responses: [
created: %Response{description: "Tracks succesfully added."},
bad_request: ApiSpec.error("Invalid request structure"),
not_found: ApiSpec.error("Room doesn't exist")
]

def create(conn, %{"room_id" => room_id} = params) do
with tracks <- Map.get(params, "tracks", %{}),
{:ok, _room_pid} <- RoomService.find_room(room_id),
:ok <- Room.hls_subscribe(room_id, tracks) do
send_resp(conn, :created, "Successfully subscribed for tracks")
else
:error ->
{:error, :bad_request, "Invalid request body structure"}

{:error, :room_not_found} ->
{:error, :not_found, "Room #{room_id} does not exist"}

{:error, :hls_component_not_exists} ->
{:error, :bad_request, "HLS component does not exist"}

{:error, :invalid_subscribe_mode} ->
{:error, :bad_request, "HLS component option `subscribe_mode` is set to :auto"}
end
end
end
1 change: 1 addition & 0 deletions lib/jellyfish_web/router.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ defmodule JellyfishWeb.Router do

scope "/hls", JellyfishWeb do
get "/:room_id/:filename", HLSController, :index
post "/:room_id/subscribe", SubscriptionController, :create
end

scope "/recording", JellyfishWeb do
Expand Down
11 changes: 9 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,17 @@ defmodule Jellyfish.MixProject do
{:protobuf, "~> 0.12.0"},

# Membrane deps
{:membrane_rtc_engine, "~> 0.17.1", override: true},
{:membrane_rtc_engine,
github: "jellyfish-dev/membrane_rtc_engine",
branch: "manual-track-addition",
sparse: "engine",
override: true},
{:membrane_rtc_engine_webrtc, "~> 0.3.0", override: true},
{:membrane_rtc_engine_hls,
github: "jellyfish-dev/membrane_rtc_engine", sparse: "hls", override: true},
github: "jellyfish-dev/membrane_rtc_engine",
branch: "manual-track-addition",
sparse: "hls",
override: true},
{:membrane_rtc_engine_rtsp, "~> 0.2.1"},
{:membrane_ice_plugin, "~> 0.16.0"},
{:membrane_telemetry_metrics, "~> 0.1.0"},
Expand Down
Loading