diff --git a/.redocly.lint-ignore.yaml b/.redocly.lint-ignore.yaml index 782ee7a1..bbe2ba26 100644 --- a/.redocly.lint-ignore.yaml +++ b/.redocly.lint-ignore.yaml @@ -3,3 +3,8 @@ openapi.yaml: no-empty-servers: - '#/servers' + operation-4xx-response: + - '#/paths/~1health/get/responses' + spec: + - '#/components/schemas/Track/properties/metadata/nullable' + - '#/components/schemas/PeerMetadata/nullable' diff --git a/config/dev.exs b/config/dev.exs index 8846bb1c..550fd318 100644 --- a/config/dev.exs +++ b/config/dev.exs @@ -26,3 +26,5 @@ config :phoenix, :stacktrace_depth, 20 # Initialize plugs at runtime for faster development compilation config :phoenix, :plug_init_mode, :runtime + +config :logger, level: :info diff --git a/lib/jellyfish/component.ex b/lib/jellyfish/component.ex index ef4496ba..705ab67b 100644 --- a/lib/jellyfish/component.ex +++ b/lib/jellyfish/component.ex @@ -11,6 +11,7 @@ defmodule Jellyfish.Component do use Bunch.Access alias Jellyfish.Component.{File, HLS, RTSP} + alias Jellyfish.Track @enforce_keys [ :id, @@ -18,7 +19,7 @@ defmodule Jellyfish.Component do :engine_endpoint, :properties ] - defstruct @enforce_keys + defstruct @enforce_keys ++ [tracks: %{}] @type id :: String.t() @type component :: HLS | RTSP | File @@ -35,7 +36,8 @@ defmodule Jellyfish.Component do id: id(), type: component(), engine_endpoint: Membrane.ChildrenSpec.child_definition(), - properties: properties() + properties: properties(), + tracks: %{Track.id() => Track.t()} } @spec parse_type(String.t()) :: {:ok, component()} | {:error, :invalid_type} diff --git a/lib/jellyfish/event.ex b/lib/jellyfish/event.ex index f4c3886e..f8a73f36 100644 --- a/lib/jellyfish/event.ex +++ b/lib/jellyfish/event.ex @@ -10,11 +10,18 @@ defmodule Jellyfish.Event do PeerConnected, PeerCrashed, PeerDisconnected, + PeerMetadataUpdated, RoomCrashed, RoomCreated, - RoomDeleted + RoomDeleted, + Track, + TrackAdded, + TrackMetadataUpdated, + TrackRemoved } + alias Membrane.RTC.Engine.Message + @pubsub Jellyfish.PubSub @valid_topics [:server_notification, :metrics] @@ -59,6 +66,38 @@ defmodule Jellyfish.Event do defp to_proto_server_notification({:component_crashed, room_id, component_id}), do: {:component_crashed, %ComponentCrashed{room_id: room_id, component_id: component_id}} + defp to_proto_server_notification({:peer_metadata_updated, room_id, peer_id, metadata}), + do: + {:peer_metadata_updated, + %PeerMetadataUpdated{room_id: room_id, peer_id: peer_id, metadata: Jason.encode!(metadata)}} + + defp to_proto_server_notification({:track_added, room_id, endpoint_info, track_info}) do + {:track_added, + %TrackAdded{ + room_id: room_id, + endpoint_info: endpoint_info, + track: to_proto_track(track_info) + }} + end + + defp to_proto_server_notification({:track_removed, room_id, endpoint_info, track_info}) do + {:track_removed, + %TrackRemoved{ + room_id: room_id, + endpoint_info: endpoint_info, + track: to_proto_track(track_info) + }} + end + + defp to_proto_server_notification({:track_metadata_updated, room_id, endpoint_info, track_id}) do + {:track_metadata_updated, + %TrackMetadataUpdated{ + room_id: room_id, + endpoint_info: endpoint_info, + track: to_proto_track(track_id) + }} + end + defp to_proto_server_notification({:hls_playable, room_id, component_id}), do: {:hls_playable, %HlsPlayable{room_id: room_id, component_id: component_id}} @@ -67,4 +106,24 @@ defmodule Jellyfish.Event do defp to_proto_server_notification({:hls_upload_crashed, room_id}), do: {:hls_upload_crashed, %HlsUploadCrashed{room_id: room_id}} + + defp to_proto_track(%Jellyfish.Track{} = track) do + %Track{ + id: track.id, + type: to_proto_track_type(track.type), + metadata: Jason.encode!(track.metadata) + } + end + + defp to_proto_track(%Message.TrackAdded{} = track) do + %Track{ + id: track.track_id, + type: to_proto_track_type(track.track_type), + metadata: Jason.encode!(track.track_metadata) + } + end + + defp to_proto_track_type(:video), do: :TRACK_TYPE_VIDEO + defp to_proto_track_type(:audio), do: :TRACK_TYPE_AUDIO + defp to_proto_track_type(_type), do: :TRACK_TYPE_UNSPECIFIED end diff --git a/lib/jellyfish/peer.ex b/lib/jellyfish/peer.ex index 5fd2ea1e..621fee58 100644 --- a/lib/jellyfish/peer.ex +++ b/lib/jellyfish/peer.ex @@ -3,15 +3,17 @@ defmodule Jellyfish.Peer do Peer is an entity that connects to the server to publish, subscribe or publish and subscribe to tracks published by producers or other peers. Peer process is spawned after peer connects to the server. """ + use Bunch.Access alias Jellyfish.Peer.WebRTC + alias Jellyfish.Track @enforce_keys [ :id, :type, :engine_endpoint ] - defstruct @enforce_keys ++ [status: :disconnected, socket_pid: nil] + defstruct @enforce_keys ++ [status: :disconnected, socket_pid: nil, tracks: %{}, metadata: nil] @type id :: String.t() @type peer :: WebRTC @@ -28,7 +30,9 @@ defmodule Jellyfish.Peer do type: peer(), status: status(), socket_pid: pid() | nil, - engine_endpoint: Membrane.ChildrenSpec.child_definition() + engine_endpoint: Membrane.ChildrenSpec.child_definition(), + tracks: %{Track.id() => Track.t()}, + metadata: any() } @spec parse_type(String.t()) :: {:ok, peer()} | {:error, :invalid_type} diff --git a/lib/jellyfish/peer/webrtc.ex b/lib/jellyfish/peer/webrtc.ex index 2f294695..20c72def 100644 --- a/lib/jellyfish/peer/webrtc.ex +++ b/lib/jellyfish/peer/webrtc.ex @@ -53,7 +53,6 @@ defmodule Jellyfish.Peer.WebRTC do handshake_opts: handshake_options, filter_codecs: filter_codecs, log_metadata: [peer_id: options.peer_id], - trace_context: nil, extensions: %{opus: Membrane.RTP.VAD}, webrtc_extensions: webrtc_extensions, simulcast_config: %SimulcastConfig{ diff --git a/lib/jellyfish/room.ex b/lib/jellyfish/room.ex index 3092ada3..a30e9f1c 100644 --- a/lib/jellyfish/room.ex +++ b/lib/jellyfish/room.ex @@ -13,6 +13,7 @@ defmodule Jellyfish.Room do alias Jellyfish.Event alias Jellyfish.Peer alias Jellyfish.Room.Config + alias Jellyfish.Track alias Membrane.ICE.TURNManager alias Membrane.RTC.Engine @@ -21,8 +22,10 @@ defmodule Jellyfish.Room do EndpointAdded, EndpointCrashed, EndpointMessage, + EndpointMetadataUpdated, EndpointRemoved, TrackAdded, + TrackMetadataUpdated, TrackRemoved } @@ -224,16 +227,7 @@ defmodule Jellyfish.Room do def handle_call({:remove_peer, peer_id}, _from, state) do {reply, state} = if Map.has_key?(state.peers, peer_id) do - {peer, state} = pop_in(state, [:peers, peer_id]) - :ok = Engine.remove_endpoint(state.engine_pid, peer_id) - - if is_pid(peer.socket_pid), - do: send(peer.socket_pid, {:stop_connection, :peer_removed}) - - Logger.info("Removed peer #{inspect(peer_id)} from room #{inspect(state.id)}") - - if peer.status == :connected, - do: Event.broadcast_server_notification({:peer_disconnected, state.id, peer_id}) + state = handle_remove_peer(peer_id, state, :peer_removed) {:ok, state} else @@ -315,13 +309,7 @@ defmodule Jellyfish.Room do def handle_call({:remove_component, component_id}, _from, state) do {reply, state} = if Map.has_key?(state.components, component_id) do - {component, state} = pop_in(state, [:components, component_id]) - :ok = Engine.remove_endpoint(state.engine_pid, component_id) - - Logger.info("Removed component #{inspect(component_id)}") - - if component.type == HLS, do: on_hls_removal(state.id, component.properties) - + state = handle_remove_component(component_id, state) {:ok, state} else {{:error, :component_not_found}, state} @@ -332,7 +320,7 @@ defmodule Jellyfish.Room do @impl true def handle_call({:hls_subscribe, origins}, _from, state) do - hls_component = hls_component(state) + hls_component = get_hls_component(state) reply = case validate_hls_subscription(hls_component) do @@ -450,24 +438,109 @@ defmodule Jellyfish.Room do {:noreply, state} end + @impl true + def handle_info(%EndpointAdded{endpoint_id: endpoint_id}, state) + when endpoint_exists?(state, endpoint_id) do + {:noreply, state} + end + @impl true def handle_info( - %EndpointAdded{endpoint_id: endpoint_id}, + %EndpointMetadataUpdated{endpoint_id: endpoint_id, endpoint_metadata: metadata}, state ) + when is_map_key(state.peers, endpoint_id) do + Logger.info("Peer #{endpoint_id} metadata updated: #{inspect(metadata)}") + Event.broadcast_server_notification({:peer_metadata_updated, state.id, endpoint_id, metadata}) + + state = put_in(state, [:peers, endpoint_id, :metadata], metadata) + {:noreply, state} + end + + @impl true + def handle_info(%EndpointMetadataUpdated{}, state) do + {:noreply, state} + end + + @impl true + def handle_info(%TrackAdded{endpoint_id: endpoint_id} = track_info, state) when endpoint_exists?(state, endpoint_id) do + endpoint_id_type = get_endpoint_id_type(state, endpoint_id) + + Logger.info("Track #{track_info.track_id} added, #{endpoint_id_type}: #{endpoint_id}") + + Event.broadcast_server_notification( + {:track_added, state.id, {endpoint_id_type, endpoint_id}, track_info} + ) + + endpoint_group = get_endpoint_group(state, track_info.endpoint_id) + access_path = [endpoint_group, track_info.endpoint_id, :tracks, track_info.track_id] + + track = Track.from_track_message(track_info) + state = put_in(state, access_path, track) + {:noreply, state} end @impl true - def handle_info(%TrackAdded{} = track_info, state) do - Logger.info("Endpoint #{track_info.endpoint_id} added track #{inspect(track_info)}") + def handle_info(%TrackAdded{endpoint_id: endpoint_id} = track_info, state) do + Logger.error("Unknown endpoint #{endpoint_id} added track #{inspect(track_info)}") {:noreply, state} end @impl true - def handle_info(%TrackRemoved{} = track_info, state) do - Logger.info("Endpoint #{track_info.endpoint_id} removed track #{inspect(track_info)}") + def handle_info(%TrackMetadataUpdated{endpoint_id: endpoint_id} = track_info, state) + when endpoint_exists?(state, endpoint_id) do + endpoint_group = get_endpoint_group(state, endpoint_id) + access_path = [endpoint_group, endpoint_id, :tracks, track_info.track_id] + + state = + update_in(state, access_path, fn + %Track{} = track -> + endpoint_id_type = get_endpoint_id_type(state, endpoint_id) + updated_track = %Track{track | metadata: track_info.track_metadata} + + Logger.info( + "Track #{updated_track.id}, #{endpoint_id_type}: #{endpoint_id} - metadata updated: #{inspect(updated_track.metadata)}" + ) + + Event.broadcast_server_notification( + {:track_metadata_updated, state.id, {endpoint_id_type, endpoint_id}, updated_track} + ) + + updated_track + end) + + {:noreply, state} + end + + @impl true + def handle_info(%TrackMetadataUpdated{endpoint_id: endpoint_id} = track_info, state) do + Logger.error("Unknown endpoint #{endpoint_id} updated track #{inspect(track_info)}") + {:noreply, state} + end + + @impl true + def handle_info(%TrackRemoved{endpoint_id: endpoint_id} = track_info, state) + when endpoint_exists?(state, endpoint_id) do + endpoint_group = get_endpoint_group(state, endpoint_id) + access_path = [endpoint_group, endpoint_id, :tracks, track_info.track_id] + + {track, state} = pop_in(state, access_path) + + endpoint_id_type = get_endpoint_id_type(state, endpoint_id) + Logger.info("Track removed: #{track.id}, #{endpoint_id_type}: #{endpoint_id}") + + Event.broadcast_server_notification( + {:track_removed, state.id, {endpoint_id_type, endpoint_id}, track} + ) + + {:noreply, state} + end + + @impl true + def handle_info(%TrackRemoved{endpoint_id: endpoint_id} = track_info, state) do + Logger.error("Unknown endpoint #{endpoint_id} removed track #{inspect(track_info)}") {:noreply, state} end @@ -481,9 +554,17 @@ defmodule Jellyfish.Room do def terminate(_reason, %{engine_pid: engine_pid} = state) do Engine.terminate(engine_pid, asynchronous?: true, timeout: 10_000) - hls_component = hls_component(state) + hls_component = get_hls_component(state) unless is_nil(hls_component), do: on_hls_removal(state.id, hls_component.properties) + state.peers + |> Map.values() + |> Enum.each(&handle_remove_peer(&1.id, state, :room_stopped)) + + state.components + |> Map.values() + |> Enum.each(&handle_remove_component(&1.id, state)) + :ok end @@ -525,7 +606,47 @@ defmodule Jellyfish.Room do } end - defp hls_component(%{components: components}), + defp handle_remove_component(component_id, state) do + {component, state} = pop_in(state, [:components, component_id]) + :ok = Engine.remove_endpoint(state.engine_pid, component_id) + + component.tracks + |> Map.values() + |> Enum.each( + &Event.broadcast_server_notification( + {:track_removed, state.id, {:component_id, component_id}, &1} + ) + ) + + Logger.info("Removed component #{inspect(component_id)}") + + if component.type == HLS, do: on_hls_removal(state.id, component.properties) + + state + end + + defp handle_remove_peer(peer_id, state, reason) do + {peer, state} = pop_in(state, [:peers, peer_id]) + :ok = Engine.remove_endpoint(state.engine_pid, peer_id) + + if is_pid(peer.socket_pid), + do: send(peer.socket_pid, {:stop_connection, reason}) + + peer.tracks + |> Map.values() + |> Enum.each( + &Event.broadcast_server_notification({:track_removed, state.id, {:peer_id, peer_id}, &1}) + ) + + Logger.info("Removed peer #{inspect(peer_id)} from room #{inspect(state.id)}") + + if peer.status == :connected and reason == :peer_removed, + do: Event.broadcast_server_notification({:peer_disconnected, state.id, peer_id}) + + state + end + + defp get_hls_component(%{components: components}), do: Enum.find_value(components, fn {_id, component} -> if component.type == HLS, do: component @@ -593,4 +714,17 @@ defmodule Jellyfish.Room do do: {:error, :invalid_subscribe_mode} defp validate_hls_subscription(%{properties: %{subscribe_mode: :manual}}), do: :ok + + defp get_endpoint_group(state, endpoint_id) when is_map_key(state.components, endpoint_id), + do: :components + + defp get_endpoint_group(state, endpoint_id) when is_map_key(state.peers, endpoint_id), + do: :peers + + defp get_endpoint_id_type(state, endpoint_id) do + case get_endpoint_group(state, endpoint_id) do + :peers -> :peer_id + :components -> :component_id + end + end end diff --git a/lib/jellyfish/room_service.ex b/lib/jellyfish/room_service.ex index 21b2352b..688ef513 100644 --- a/lib/jellyfish/room_service.ex +++ b/lib/jellyfish/room_service.ex @@ -203,8 +203,6 @@ defmodule Jellyfish.RoomService do Logger.debug("Room #{room_id} is down with reason: normal") - Phoenix.PubSub.broadcast(Jellyfish.PubSub, room_id, :room_stopped) - {:noreply, state} end diff --git a/lib/jellyfish/track.ex b/lib/jellyfish/track.ex new file mode 100644 index 00000000..418d0382 --- /dev/null +++ b/lib/jellyfish/track.ex @@ -0,0 +1,30 @@ +defmodule Jellyfish.Track do + @moduledoc """ + Represents a media track send from Component or Peer. + """ + + use Bunch.Access + + alias Membrane.RTC.Engine.Message.{TrackAdded, TrackMetadataUpdated} + + @enforce_keys [:id, :type] + defstruct @enforce_keys ++ [:metadata] + + @type id() :: String.t() + + @type t() :: %__MODULE__{ + id: id(), + type: :audio | :video, + metadata: nil | any() + } + + @spec from_track_message(TrackAdded.t() | TrackMetadataUpdated.t()) :: t() + def from_track_message(%type{} = message) + when type in [TrackAdded, TrackMetadataUpdated] do + %__MODULE__{ + id: message.track_id, + type: message.track_type, + metadata: message.track_metadata + } + end +end diff --git a/lib/jellyfish/webhook_notifier.ex b/lib/jellyfish/webhook_notifier.ex index 10e7f549..2ac0cdf7 100644 --- a/lib/jellyfish/webhook_notifier.ex +++ b/lib/jellyfish/webhook_notifier.ex @@ -50,8 +50,8 @@ defmodule Jellyfish.WebhookNotifier do defp send_webhook_notification(notification, webhook_url) when not is_nil(webhook_url) do case HTTPoison.post( webhook_url, - Jason.encode!(%{notification: notification}), - [{"Content-Type", "application/json"}] + notification, + [{"Content-Type", "application/x-protobuf"}] ) do {:ok, result} when result.status_code >= 200 and result.status_code < 300 -> nil diff --git a/lib/jellyfish_web/api_spec/component/file.ex b/lib/jellyfish_web/api_spec/component/file.ex index c0df820d..059915e9 100644 --- a/lib/jellyfish_web/api_spec/component/file.ex +++ b/lib/jellyfish_web/api_spec/component/file.ex @@ -65,8 +65,13 @@ defmodule JellyfishWeb.ApiSpec.Component.File do id: %Schema{type: :string, description: "Assigned component ID", example: "component-1"}, # FIXME: due to cyclic imports, we can't use ApiSpec.Component.Type here type: %Schema{type: :string, description: "Component type", example: "file"}, - properties: Properties + properties: Properties, + tracks: %Schema{ + type: :array, + items: JellyfishWeb.ApiSpec.Track, + description: "List of all component's tracks" + } }, - required: [:id, :type] + required: [:id, :type, :tracks] }) end diff --git a/lib/jellyfish_web/api_spec/component/hls.ex b/lib/jellyfish_web/api_spec/component/hls.ex index 8d74f9aa..9eca9cb5 100644 --- a/lib/jellyfish_web/api_spec/component/hls.ex +++ b/lib/jellyfish_web/api_spec/component/hls.ex @@ -126,8 +126,13 @@ defmodule JellyfishWeb.ApiSpec.Component.HLS do id: %Schema{type: :string, description: "Assigned component ID", example: "component-1"}, # FIXME: due to cyclic imports, we can't use ApiSpec.Component.Type here type: %Schema{type: :string, description: "Component type", example: "hls"}, - properties: Properties + properties: Properties, + tracks: %Schema{ + type: :array, + items: JellyfishWeb.ApiSpec.Track, + description: "List of all component's tracks" + } }, - required: [:id, :type, :properties] + required: [:id, :type, :properties, :tracks] }) end diff --git a/lib/jellyfish_web/api_spec/component/rtsp.ex b/lib/jellyfish_web/api_spec/component/rtsp.ex index 336a1f60..1ee98028 100644 --- a/lib/jellyfish_web/api_spec/component/rtsp.ex +++ b/lib/jellyfish_web/api_spec/component/rtsp.ex @@ -96,8 +96,13 @@ defmodule JellyfishWeb.ApiSpec.Component.RTSP do id: %Schema{type: :string, description: "Assigned component ID", example: "component-1"}, # FIXME: due to cyclic imports, we can't use ApiSpec.Component.Type here type: %Schema{type: :string, description: "Component type", example: "hls"}, - properties: Properties + properties: Properties, + tracks: %Schema{ + type: :array, + items: JellyfishWeb.ApiSpec.Track, + description: "List of all component's tracks" + } }, - required: [:id, :type, :properties] + required: [:id, :type, :properties, :tracks] }) end diff --git a/lib/jellyfish_web/api_spec/peer.ex b/lib/jellyfish_web/api_spec/peer.ex index 54c22d49..a44a63fd 100644 --- a/lib/jellyfish_web/api_spec/peer.ex +++ b/lib/jellyfish_web/api_spec/peer.ex @@ -61,6 +61,19 @@ defmodule JellyfishWeb.ApiSpec.Peer do }) end + defmodule PeerMetadata do + @moduledoc false + + require OpenApiSpex + + OpenApiSpex.schema(%{ + title: "PeerMetadata", + description: "Custom metadata set by the peer", + example: %{name: "JellyfishUser"}, + nullable: true + }) + end + OpenApiSpex.schema(%{ title: "Peer", description: "Describes peer status", @@ -68,8 +81,14 @@ defmodule JellyfishWeb.ApiSpec.Peer do properties: %{ id: %Schema{type: :string, description: "Assigned peer id", example: "peer-1"}, type: Type, - status: Status + status: Status, + tracks: %Schema{ + type: :array, + items: JellyfishWeb.ApiSpec.Track, + description: "List of all peer's tracks" + }, + metadata: PeerMetadata }, - required: [:id, :type, :status] + required: [:id, :type, :status, :tracks, :metadata] }) end diff --git a/lib/jellyfish_web/api_spec/track.ex b/lib/jellyfish_web/api_spec/track.ex new file mode 100644 index 00000000..a12c4358 --- /dev/null +++ b/lib/jellyfish_web/api_spec/track.ex @@ -0,0 +1,24 @@ +defmodule JellyfishWeb.ApiSpec.Track do + @moduledoc false + require OpenApiSpex + + alias OpenApiSpex.Schema + + OpenApiSpex.schema(%{ + title: "Track", + description: "Describes media track of a Peer or Component", + type: :object, + properties: %{ + id: %Schema{ + type: :string + }, + type: %Schema{ + type: :string, + enum: ["audio", "video"] + }, + metadata: %Schema{ + nullable: true + } + } + }) +end diff --git a/lib/jellyfish_web/controllers/component_json.ex b/lib/jellyfish_web/controllers/component_json.ex index 7d28be3f..15bade2f 100644 --- a/lib/jellyfish_web/controllers/component_json.ex +++ b/lib/jellyfish_web/controllers/component_json.ex @@ -19,7 +19,8 @@ defmodule JellyfishWeb.ComponentJSON do %{ id: component.id, type: type, - properties: component.properties |> ParserJSON.camel_case_keys() + properties: component.properties |> ParserJSON.camel_case_keys(), + tracks: component.tracks |> Map.values() |> Enum.map(&Map.from_struct/1) } end end diff --git a/lib/jellyfish_web/controllers/peer_json.ex b/lib/jellyfish_web/controllers/peer_json.ex index b12bb02a..f4f87397 100644 --- a/lib/jellyfish_web/controllers/peer_json.ex +++ b/lib/jellyfish_web/controllers/peer_json.ex @@ -19,7 +19,9 @@ defmodule JellyfishWeb.PeerJSON do %{ id: peer.id, type: type, - status: "#{peer.status}" + status: "#{peer.status}", + tracks: peer.tracks |> Map.values() |> Enum.map(&Map.from_struct/1), + metadata: peer.metadata } end end diff --git a/lib/jellyfish_web/peer_socket.ex b/lib/jellyfish_web/peer_socket.ex index b7b1e62e..ec2a61f9 100644 --- a/lib/jellyfish_web/peer_socket.ex +++ b/lib/jellyfish_web/peer_socket.ex @@ -119,18 +119,18 @@ defmodule JellyfishWeb.PeerSocket do end @impl true - def handle_info({:stop_connection, _reason}, state) do - {:stop, :closed, {1011, "Internal server error"}, state} + def handle_info({:stop_connection, :room_stopped}, state) do + {:stop, :closed, {1000, "Room stopped"}, state} end @impl true - def handle_info(:room_crashed, state) do + def handle_info({:stop_connection, _reason}, state) do {:stop, :closed, {1011, "Internal server error"}, state} end @impl true - def handle_info(:room_stopped, state) do - {:stop, :closed, {1000, "Room stopped"}, state} + def handle_info(:room_crashed, state) do + {:stop, :closed, {1011, "Internal server error"}, state} end @impl true diff --git a/lib/protos/jellyfish/server_notifications.pb.ex b/lib/protos/jellyfish/server_notifications.pb.ex index 2dc99ad9..5348b6c5 100644 --- a/lib/protos/jellyfish/server_notifications.pb.ex +++ b/lib/protos/jellyfish/server_notifications.pb.ex @@ -8,6 +8,16 @@ defmodule Jellyfish.ServerMessage.EventType do field :EVENT_TYPE_METRICS, 2 end +defmodule Jellyfish.ServerMessage.TrackType do + @moduledoc false + + use Protobuf, enum: true, syntax: :proto3, protoc_gen_elixir_version: "0.12.0" + + field :TRACK_TYPE_UNSPECIFIED, 0 + field :TRACK_TYPE_VIDEO, 1 + field :TRACK_TYPE_AUDIO, 2 +end + defmodule Jellyfish.ServerMessage.RoomCrashed do @moduledoc false @@ -137,6 +147,65 @@ defmodule Jellyfish.ServerMessage.HlsUploadCrashed do field :room_id, 1, type: :string, json_name: "roomId" end +defmodule Jellyfish.ServerMessage.PeerMetadataUpdated do + @moduledoc false + + use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.12.0" + + field :room_id, 1, type: :string, json_name: "roomId" + field :peer_id, 2, type: :string, json_name: "peerId" + field :metadata, 3, type: :string +end + +defmodule Jellyfish.ServerMessage.Track do + @moduledoc false + + use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.12.0" + + field :id, 1, type: :string + field :type, 2, type: Jellyfish.ServerMessage.TrackType, enum: true + field :metadata, 3, type: :string +end + +defmodule Jellyfish.ServerMessage.TrackAdded do + @moduledoc false + + use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.12.0" + + oneof :endpoint_info, 0 + + field :room_id, 1, type: :string, json_name: "roomId" + field :peer_id, 2, type: :string, json_name: "peerId", oneof: 0 + field :component_id, 3, type: :string, json_name: "componentId", oneof: 0 + field :track, 4, type: Jellyfish.ServerMessage.Track +end + +defmodule Jellyfish.ServerMessage.TrackRemoved do + @moduledoc false + + use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.12.0" + + oneof :endpoint_info, 0 + + field :room_id, 1, type: :string, json_name: "roomId" + field :peer_id, 2, type: :string, json_name: "peerId", oneof: 0 + field :component_id, 3, type: :string, json_name: "componentId", oneof: 0 + field :track, 4, type: Jellyfish.ServerMessage.Track +end + +defmodule Jellyfish.ServerMessage.TrackMetadataUpdated do + @moduledoc false + + use Protobuf, syntax: :proto3, protoc_gen_elixir_version: "0.12.0" + + oneof :endpoint_info, 0 + + field :room_id, 1, type: :string, json_name: "roomId" + field :peer_id, 2, type: :string, json_name: "peerId", oneof: 0 + field :component_id, 3, type: :string, json_name: "componentId", oneof: 0 + field :track, 4, type: Jellyfish.ServerMessage.Track +end + defmodule Jellyfish.ServerMessage do @moduledoc false @@ -215,4 +284,24 @@ defmodule Jellyfish.ServerMessage do type: Jellyfish.ServerMessage.HlsUploadCrashed, json_name: "hlsUploadCrashed", oneof: 0 + + field :peer_metadata_updated, 16, + type: Jellyfish.ServerMessage.PeerMetadataUpdated, + json_name: "peerMetadataUpdated", + oneof: 0 + + field :track_added, 17, + type: Jellyfish.ServerMessage.TrackAdded, + json_name: "trackAdded", + oneof: 0 + + field :track_removed, 18, + type: Jellyfish.ServerMessage.TrackRemoved, + json_name: "trackRemoved", + oneof: 0 + + field :track_metadata_updated, 19, + type: Jellyfish.ServerMessage.TrackMetadataUpdated, + json_name: "trackMetadataUpdated", + oneof: 0 end diff --git a/mix.exs b/mix.exs index e53db4fb..825ee345 100644 --- a/mix.exs +++ b/mix.exs @@ -68,12 +68,16 @@ defmodule Jellyfish.MixProject do {:protobuf, "~> 0.12.0"}, # Membrane deps - {:membrane_rtc_engine, "~> 0.19.0"}, - {:membrane_rtc_engine_webrtc, "~> 0.5.0"}, - {:membrane_rtc_engine_hls, "~> 0.4.0"}, + {:membrane_rtc_engine, + github: "jellyfish-dev/membrane_rtc_engine", sparse: "engine", override: true}, + {:membrane_rtc_engine_webrtc, + github: "jellyfish-dev/membrane_rtc_engine", sparse: "webrtc", override: true}, + {:membrane_rtc_engine_hls, + github: "jellyfish-dev/membrane_rtc_engine", sparse: "hls", override: true}, {:membrane_rtc_engine_rtsp, "~> 0.4.0"}, - {:membrane_rtc_engine_file, "~> 0.2.0"}, - {:membrane_ice_plugin, "~> 0.17.0"}, + {:membrane_rtc_engine_file, + github: "jellyfish-dev/membrane_rtc_engine", sparse: "file", override: true}, + {:membrane_ice_plugin, "~> 0.18.0"}, {:membrane_telemetry_metrics, "~> 0.1.0"}, # HLS endpoints deps diff --git a/mix.lock b/mix.lock index ef529bd9..740bf1df 100644 --- a/mix.lock +++ b/mix.lock @@ -51,22 +51,21 @@ "membrane_h264_plugin": {:hex, :membrane_h264_plugin, "0.9.1", "ea140ab1ca21c528563675fdd7e14c80607e120e320dc930cac3dcfb4db3fc2b", [:mix], [{:bunch, "~> 1.4", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}], "hexpm", "8f10db817e691fc1234ed85fe674b3f8718d3a410e4582736dcdd53664cae725"}, "membrane_h265_format": {:hex, :membrane_h265_format, "0.2.0", "1903c072cf7b0980c4d0c117ab61a2cd33e88782b696290de29570a7fab34819", [:mix], [], "hexpm", "6df418bdf242c0d9f7dbf2e5aea4c2d182e34ac9ad5a8b8cef2610c290002e83"}, "membrane_http_adaptive_stream_plugin": {:hex, :membrane_http_adaptive_stream_plugin, "0.18.2", "420519e956540d00bfe97594bcda893f0616c4251297500c855290fccc5f899a", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_plugin, "~> 0.18.0", [hex: :membrane_aac_plugin, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_plugin, "~> 0.9.0", [hex: :membrane_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_mp4_plugin, "~> 0.31.0", [hex: :membrane_mp4_plugin, repo: "hexpm", optional: false]}, {:membrane_tee_plugin, "~> 0.12.0", [hex: :membrane_tee_plugin, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "540cf54a85410aa2f4dd40c420a4a7da7493c0c14c5d935e84857c02ff1096fb"}, - "membrane_ice_plugin": {:hex, :membrane_ice_plugin, "0.17.0", "1299689e2ee36f4083e84e7ddd27f1333d3b92662fef9f47cdcb8158903be6e2", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_dtls, "~> 0.12.0", [hex: :ex_dtls, repo: "hexpm", optional: false]}, {:fake_turn, "~> 0.4.0", [hex: :fake_turn, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.9.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_opentelemetry, "~> 0.1.0", [hex: :membrane_opentelemetry, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_telemetry_metrics, "~> 0.1.0", [hex: :membrane_telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "837ee99f53fe2097acea0d908c3afd572524e93bfb3e2895fe23ed3c906a5382"}, + "membrane_ice_plugin": {:hex, :membrane_ice_plugin, "0.18.0", "beecb741b641b0c8b4efea0569fa68a3564051294e3ed10a10a1e29028e1d474", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_dtls, "~> 0.12.0", [hex: :ex_dtls, repo: "hexpm", optional: false]}, {:fake_turn, "~> 0.4.0", [hex: :fake_turn, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.9.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_telemetry_metrics, "~> 0.1.0", [hex: :membrane_telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "fff74d447d42902bb014bc8cdc78d6da3f797b59ed8fd622bfc7c6854323a287"}, "membrane_mp4_format": {:hex, :membrane_mp4_format, "0.8.0", "8c6e7d68829228117d333b4fbb030e7be829aab49dd8cb047fdc664db1812e6a", [:mix], [], "hexpm", "148dea678a1f82ccfd44dbde6f936d2f21255f496cb45a22cc6eec427f025522"}, "membrane_mp4_plugin": {:hex, :membrane_mp4_plugin, "0.31.0", "1932c86e2f4a24aca1b99ee531a131fd0da1128db8975ba8f8738e3b1bbcfabd", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_aac_format, "~> 0.8.0", [hex: :membrane_aac_format, repo: "hexpm", optional: false]}, {:membrane_cmaf_format, "~> 0.7.0", [hex: :membrane_cmaf_format, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.16.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.1", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_h264_plugin, "~> 0.9.0", [hex: :membrane_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_h265_format, "~> 0.2.0", [hex: :membrane_h265_format, repo: "hexpm", optional: false]}, {:membrane_mp4_format, "~> 0.8.0", [hex: :membrane_mp4_format, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}], "hexpm", "9968e56e02085228974bf6a59c8858f3c0d9800a4e767c1b3b2f2890050c72f4"}, "membrane_ogg_plugin": {:hex, :membrane_ogg_plugin, "0.3.0", "6e98b8932a2b88174dc3922989a475e02ee327589222b1c8422ff4fa630325c3", [:mix], [{:crc, "~> 0.10", [hex: :crc, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}], "hexpm", "82557299b3d72aab0fafa07a05dcc3a5a842eeaa3de3f2f3959677517b66b713"}, - "membrane_opentelemetry": {:hex, :membrane_opentelemetry, "0.1.0", "af774bc5b9bad3a822e9a26d8530819b0291b569a282c65a7dd51cc498e6e9cd", [:mix], [{:opentelemetry_api, "~> 1.0.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "2e4072a5f14eb95514701e242a7667a7178dfc6afd313d4643ec0726391c243b"}, "membrane_opus_format": {:hex, :membrane_opus_format, "0.3.0", "3804d9916058b7cfa2baa0131a644d8186198d64f52d592ae09e0942513cb4c2", [:mix], [], "hexpm", "8fc89c97be50de23ded15f2050fe603dcce732566fe6fdd15a2de01cb6b81afe"}, "membrane_opus_plugin": {:hex, :membrane_opus_plugin, "0.19.3", "af398a10c84d27e49b9a68ec78a54f123f2637441dd380857a3da4bb492eca5c", [:mix], [{:bunch, "~> 1.3", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.2", [hex: :bundlex, repo: "hexpm", optional: false]}, {:membrane_common_c, "~> 0.16.0", [hex: :membrane_common_c, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_precompiled_dependency_provider, "~> 0.1.0", [hex: :membrane_precompiled_dependency_provider, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "172d5637233e4e7cb2c464be34ea85b487188887381ef5ff98d5c110fdf44f5b"}, "membrane_precompiled_dependency_provider": {:hex, :membrane_precompiled_dependency_provider, "0.1.1", "a0d5b7942f8be452c30744207f78284f6a0e0c84c968aba7d76e206fbf75bc5d", [:mix], [{:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "87ad44752e2cf0fa3b31c5aac15b863343c2f6e0f0fd201f5ec4c0bcda8c6fa3"}, "membrane_raw_audio_format": {:hex, :membrane_raw_audio_format, "0.12.0", "b574cd90f69ce2a8b6201b0ccf0826ca28b0fbc8245b8078d9f11cef65f7d5d5", [:mix], [{:bimap, "~> 1.1", [hex: :bimap, repo: "hexpm", optional: false]}, {:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "6e6c98e3622a2b9df19eab50ba65d7eb45949b1ba306fa8423df6cdb12fd0b44"}, "membrane_raw_video_format": {:hex, :membrane_raw_video_format, "0.3.0", "ba10f475e0814a6fe79602a74536b796047577c7ef5b0e33def27cd344229699", [:mix], [], "hexpm", "2f08760061c8a5386ecf04273480f10e48d25a1a40aa99476302b0bcd34ccb1c"}, "membrane_realtimer_plugin": {:hex, :membrane_realtimer_plugin, "0.9.0", "27210d5e32a5e8bfd101c41e4d8c1876e873a52cc129ebfbee4d0ccbea1cbd21", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "b2e96d62135ee57ef9a5fdea94b3a9ab1198e5ea8ee248391b89c671125d1b51"}, - "membrane_rtc_engine": {:hex, :membrane_rtc_engine, "0.19.0", "6ca3f1e1e8cd9129ac45790bec619c4b11f4abb1968dbe7f9d2884c9f3f66072", [:mix], [{:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 0.13.1", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_opentelemetry, "~> 0.1.0", [hex: :membrane_opentelemetry, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_rtp_plugin, "~> 0.24.1", [hex: :membrane_rtp_plugin, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.0.0", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.0.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:statistics, "~> 0.6.0", [hex: :statistics, repo: "hexpm", optional: false]}], "hexpm", "daa9da426aa1e6468bab65a0ce1c0c06187c5ec90138c8fd125f23ea2e692d85"}, - "membrane_rtc_engine_file": {:hex, :membrane_rtc_engine_file, "0.2.0", "c330a965df4ea4fa8185526b50f2b2ed4825a80780d64ae2cfb61f3af3ee7bca", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.16.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_plugin, "~> 0.9.0", [hex: :membrane_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_ogg_plugin, "~> 0.3.0", [hex: :membrane_ogg_plugin, repo: "hexpm", optional: false]}, {:membrane_opus_plugin, "~> 0.19.0", [hex: :membrane_opus_plugin, repo: "hexpm", optional: false]}, {:membrane_realtimer_plugin, "~> 0.9.0", [hex: :membrane_realtimer_plugin, repo: "hexpm", optional: false]}, {:membrane_rtc_engine, "~> 0.19.0", [hex: :membrane_rtc_engine, repo: "hexpm", optional: false]}, {:membrane_rtc_engine_webrtc, "~> 0.5.0", [hex: :membrane_rtc_engine_webrtc, repo: "hexpm", optional: false]}], "hexpm", "82263a434af6c181e7045b8c7b74946947b6c29fe738f6a37398093b19fc7344"}, - "membrane_rtc_engine_hls": {:hex, :membrane_rtc_engine_hls, "0.4.0", "7a73fb67a5d3000ab5fdacf4b2d48a604f6ee57de7aaf823d3a756d0f2a4bce3", [:mix], [{:membrane_aac_fdk_plugin, "~> 0.18.1", [hex: :membrane_aac_fdk_plugin, repo: "hexpm", optional: false]}, {:membrane_aac_plugin, "~> 0.18.0", [hex: :membrane_aac_plugin, repo: "hexpm", optional: false]}, {:membrane_audio_mix_plugin, "~> 0.16.0", [hex: :membrane_audio_mix_plugin, repo: "hexpm", optional: true]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_ffmpeg_plugin, "~> 0.31.0", [hex: :membrane_h264_ffmpeg_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_plugin, "~> 0.9.0", [hex: :membrane_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_http_adaptive_stream_plugin, "~> 0.18.0", [hex: :membrane_http_adaptive_stream_plugin, repo: "hexpm", optional: false]}, {:membrane_opus_plugin, "~> 0.19.0", [hex: :membrane_opus_plugin, repo: "hexpm", optional: false]}, {:membrane_raw_audio_format, "~> 0.12.0", [hex: :membrane_raw_audio_format, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.3.0", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}, {:membrane_rtc_engine, "~> 0.19.0", [hex: :membrane_rtc_engine, repo: "hexpm", optional: false]}, {:membrane_rtc_engine_webrtc, "~> 0.5.0", [hex: :membrane_rtc_engine_webrtc, repo: "hexpm", optional: false]}, {:membrane_video_compositor_plugin, "~> 0.7.0", [hex: :membrane_video_compositor_plugin, repo: "hexpm", optional: true]}], "hexpm", "e4cf04b42c255e627f59ce73482deda0bb56a81ddfd32fbc50d0a215ccf42a0a"}, + "membrane_rtc_engine": {:git, "https://github.com/jellyfish-dev/membrane_rtc_engine.git", "164b5dc3fa2f56a5da696e5173794f37acce7959", [sparse: "engine"]}, + "membrane_rtc_engine_file": {:git, "https://github.com/jellyfish-dev/membrane_rtc_engine.git", "765a7a7403de672ed3f389690ef6c3974160d3ba", [sparse: "file"]}, + "membrane_rtc_engine_hls": {:git, "https://github.com/jellyfish-dev/membrane_rtc_engine.git", "765a7a7403de672ed3f389690ef6c3974160d3ba", [sparse: "hls"]}, "membrane_rtc_engine_rtsp": {:hex, :membrane_rtc_engine_rtsp, "0.4.0", "a4ed1d3aca0b8795c745d2e787e60caaa820f985fdf7ee74b144fdef4e273d4a", [:mix], [{:connection, "~> 1.1", [hex: :connection, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 0.13.1", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_plugin, "~> 0.9.0", [hex: :membrane_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_rtc_engine, "~> 0.19.0", [hex: :membrane_rtc_engine, repo: "hexpm", optional: false]}, {:membrane_rtc_engine_webrtc, "~> 0.5.0", [hex: :membrane_rtc_engine_webrtc, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_rtp_h264_plugin, "~> 0.19.0", [hex: :membrane_rtp_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_plugin, "~> 0.24.1", [hex: :membrane_rtp_plugin, repo: "hexpm", optional: false]}, {:membrane_rtsp, "~> 0.5.1", [hex: :membrane_rtsp, repo: "hexpm", optional: false]}, {:membrane_udp_plugin, "~> 0.12.0", [hex: :membrane_udp_plugin, repo: "hexpm", optional: false]}], "hexpm", "5a74afeeeae126a704a6ae1299f7cdab79f72573ab229c66a7a1050c4e3aaf56"}, - "membrane_rtc_engine_webrtc": {:hex, :membrane_rtc_engine_webrtc, "0.5.0", "10a939c9d666daaa115ddc1112e0b4cc3380326b9aebd81ac3476273d6f12810", [:mix], [{:ex_sdp, "~> 0.13.1", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_file_plugin, "~> 0.16.0", [hex: :membrane_file_plugin, repo: "hexpm", optional: false]}, {:membrane_ice_plugin, "~> 0.17.0", [hex: :membrane_ice_plugin, repo: "hexpm", optional: false]}, {:membrane_opentelemetry, "~> 0.1.0", [hex: :membrane_opentelemetry, repo: "hexpm", optional: false]}, {:membrane_rtc_engine, "~> 0.19.0", [hex: :membrane_rtc_engine, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_rtp_h264_plugin, "~> 0.19.0", [hex: :membrane_rtp_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_plugin, "~> 0.24.1", [hex: :membrane_rtp_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_vp8_plugin, "~> 0.9.0", [hex: :membrane_rtp_vp8_plugin, repo: "hexpm", optional: false]}, {:membrane_telemetry_metrics, "~> 0.1.0", [hex: :membrane_telemetry_metrics, repo: "hexpm", optional: false]}, {:membrane_webrtc_plugin, "~> 0.17.0", [hex: :membrane_webrtc_plugin, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.0.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "11d59b4a45f90a418f72856cb2cd09b618c18b4519b6a561cdba830547da62ad"}, + "membrane_rtc_engine_webrtc": {:git, "https://github.com/jellyfish-dev/membrane_rtc_engine.git", "765a7a7403de672ed3f389690ef6c3974160d3ba", [sparse: "webrtc"]}, "membrane_rtp_format": {:hex, :membrane_rtp_format, "0.8.0", "828924bbd27efcf85b2015ae781e824c4a9928f0a7dc132abc66817b2c6edfc4", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}], "hexpm", "bc75d2a649dfaef6df563212fbb9f9f62eebc871393692f9dae8d289bd4f94bb"}, "membrane_rtp_h264_plugin": {:hex, :membrane_rtp_h264_plugin, "0.19.0", "112bfedc14fb83bdb549ef1a03da23908feedeb165fd3e4512a549f1af532ae7", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_h264_format, "~> 0.6.0", [hex: :membrane_h264_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "76fd159e7406cadbef15124cba30eca3fffcf71a7420964f26e23d4cffd9b29d"}, "membrane_rtp_opus_plugin": {:hex, :membrane_rtp_opus_plugin, "0.9.0", "ae76421faa04697a4af76a55b6c5e675dea61b611d29d8201098783d42863af7", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_opus_format, "~> 0.3.0", [hex: :membrane_opus_format, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}], "hexpm", "58f095d2978daf999d87c1c016007cb7d99434208486331ab5045e77f5be9dcc"}, @@ -78,7 +77,7 @@ "membrane_udp_plugin": {:hex, :membrane_udp_plugin, "0.12.0", "f3930a592f975f5aef924ff70b1072e55451de16ec5dce7dd264ecf9d034b9ad", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:mockery, "~> 2.3.0", [hex: :mockery, repo: "hexpm", optional: false]}], "hexpm", "65e846f7523e443215b6954136971d4f00a8e8f375ef015153daa535ce607769"}, "membrane_video_compositor_plugin": {:hex, :membrane_video_compositor_plugin, "0.7.0", "2273743fd0a47660880276e0bbb8a9f8848e09b05b425101d1cc0c5d245ff8ea", [:mix], [{:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_framerate_converter_plugin, "~> 0.8.0", [hex: :membrane_framerate_converter_plugin, repo: "hexpm", optional: false]}, {:membrane_raw_video_format, "~> 0.3.0", [hex: :membrane_raw_video_format, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.1", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:rustler, "~> 0.26.0", [hex: :rustler, repo: "hexpm", optional: false]}, {:unifex, "~> 1.0", [hex: :unifex, repo: "hexpm", optional: false]}], "hexpm", "68ea6a5770cf053464d6fa009a20f85ca559267f5a454370506c6d40965985de"}, "membrane_vp8_format": {:hex, :membrane_vp8_format, "0.4.0", "6c29ec67479edfbab27b11266dc92f18f3baf4421262c5c31af348c33e5b92c7", [:mix], [], "hexpm", "8bb005ede61db8fcb3535a883f32168b251c2dfd1109197c8c3b39ce28ed08e2"}, - "membrane_webrtc_plugin": {:hex, :membrane_webrtc_plugin, "0.17.0", "631c01283a4534210ff8d29384260db350fe48d68276fb4db984d4f85738f4a1", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_libsrtp, ">= 0.0.0", [hex: :ex_libsrtp, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 0.13.1", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.9.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_plugin, "~> 0.9.0", [hex: :membrane_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_ice_plugin, "~> 0.17.0", [hex: :membrane_ice_plugin, repo: "hexpm", optional: false]}, {:membrane_opentelemetry, "~> 0.1.0", [hex: :membrane_opentelemetry, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_rtp_h264_plugin, "~> 0.19.0", [hex: :membrane_rtp_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_opus_plugin, ">= 0.9.0", [hex: :membrane_rtp_opus_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_plugin, "~> 0.24.0", [hex: :membrane_rtp_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_vp8_plugin, "~> 0.9.0", [hex: :membrane_rtp_vp8_plugin, repo: "hexpm", optional: false]}, {:opentelemetry, "~> 1.0.4", [hex: :opentelemetry, repo: "hexpm", optional: false]}, {:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.0", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "ec32d6318f2a6ad857b3056a8e60ac18894977a79f1b09fc31970dff067ff6a7"}, + "membrane_webrtc_plugin": {:hex, :membrane_webrtc_plugin, "0.18.1", "af5988cdfddc95174f365ce18b16694d862ab1d95bd2671297a8ab5fe65837fb", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:ex_libsrtp, ">= 0.0.0", [hex: :ex_libsrtp, repo: "hexpm", optional: false]}, {:ex_sdp, "~> 0.13.1", [hex: :ex_sdp, repo: "hexpm", optional: false]}, {:membrane_core, "~> 1.0", [hex: :membrane_core, repo: "hexpm", optional: false]}, {:membrane_funnel_plugin, "~> 0.9.0", [hex: :membrane_funnel_plugin, repo: "hexpm", optional: false]}, {:membrane_h264_plugin, "~> 0.9.0", [hex: :membrane_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_ice_plugin, "~> 0.18.0", [hex: :membrane_ice_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_format, "~> 0.8.0", [hex: :membrane_rtp_format, repo: "hexpm", optional: false]}, {:membrane_rtp_h264_plugin, "~> 0.19.0", [hex: :membrane_rtp_h264_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_opus_plugin, ">= 0.9.0", [hex: :membrane_rtp_opus_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_plugin, "~> 0.24.0", [hex: :membrane_rtp_plugin, repo: "hexpm", optional: false]}, {:membrane_rtp_vp8_plugin, "~> 0.9.0", [hex: :membrane_rtp_vp8_plugin, repo: "hexpm", optional: false]}, {:qex, "~> 0.5.0", [hex: :qex, repo: "hexpm", optional: false]}], "hexpm", "08de22bdd3a6c9e7d79bad45b1a0eb87f335e33804a9b5afd2c84ee36428b683"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm", "69b09adddc4f74a40716ae54d140f93beb0fb8978d8636eaded0c31b6f099f16"}, "mime": {:hex, :mime, "2.0.5", "dc34c8efd439abe6ae0343edbb8556f4d63f178594894720607772a041b04b02", [:mix], [], "hexpm", "da0d64a365c45bc9935cc5c8a7fc5e49a0e0f9932a761c55d6c52b142780a05c"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm", "f278585650aa581986264638ebf698f8bb19df297f66ad91b18910dfc6e19323"}, @@ -89,8 +88,6 @@ "nimble_pool": {:hex, :nimble_pool, "1.0.0", "5eb82705d138f4dd4423f69ceb19ac667b3b492ae570c9f5c900bb3d2f50a847", [:mix], [], "hexpm", "80be3b882d2d351882256087078e1b1952a28bf98d0a287be87e4a24a710b67a"}, "numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"}, "open_api_spex": {:hex, :open_api_spex, "3.18.2", "8c855e83bfe8bf81603d919d6e892541eafece3720f34d1700b58024dadde247", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:poison, "~> 3.0 or ~> 4.0 or ~> 5.0", [hex: :poison, repo: "hexpm", optional: true]}, {:ymlr, "~> 2.0 or ~> 3.0 or ~> 4.0", [hex: :ymlr, repo: "hexpm", optional: true]}], "hexpm", "aa3e6dcfc0ad6a02596b2172662da21c9dd848dac145ea9e603f54e3d81b8d2b"}, - "opentelemetry": {:hex, :opentelemetry, "1.0.5", "f0cd36ac8b30b68e8d70cec5bb88801ed7f3fe79aac67597054ed5490542e810", [:rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "3b17f8933a58e1246f42a0c215840fd8218aebbcabdb0aac62b0c766fe85542e"}, - "opentelemetry_api": {:hex, :opentelemetry_api, "1.0.3", "77f9644c42340cd8b18c728cde4822ed55ae136f0d07761b78e8c54da46af93a", [:mix, :rebar3], [], "hexpm", "4293e06bd369bc004e6fad5edbb56456d891f14bd3f9f1772b18f1923e0678ea"}, "p1_utils": {:hex, :p1_utils, "1.0.23", "7f94466ada69bd982ea7bb80fbca18e7053e7d0b82c9d9e37621fa508587069b", [:rebar3], [], "hexpm", "47f21618694eeee5006af1c88731ad86b757161e7823c29b6f73921b571c8502"}, "parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"}, "patiently": {:hex, :patiently, "0.2.0", "67eb139591e10c4b363ae0198e832552f191c58894731efd3bf124ec4722267a", [:mix], [], "hexpm", "c08cc5edc27def565647a9b55a0bea8025a5f81a4472e57692f28f2292c44c94"}, @@ -120,7 +117,7 @@ "telemetry_poller": {:hex, :telemetry_poller, "1.0.0", "db91bb424e07f2bb6e73926fcafbfcbcb295f0193e0a00e825e589a0a47e8453", [:rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "b3a24eafd66c3f42da30fc3ca7dda1e9d546c12250a2d60d7b81d264fbec4f6e"}, "toml": {:hex, :toml, "0.7.0", "fbcd773caa937d0c7a02c301a1feea25612720ac3fa1ccb8bfd9d30d822911de", [:mix], [], "hexpm", "0690246a2478c1defd100b0c9b89b4ea280a22be9a7b313a8a058a2408a2fa70"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, - "unifex": {:hex, :unifex, "1.1.0", "26b1bcb6c3b3454e1ea15f85b2e570aaa5b5c609566aa9f5c2e0a8b213379d6b", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "d8f47e9e3240301f5b20eec5792d1d4341e1a3a268d94f7204703b48da4aaa06"}, + "unifex": {:hex, :unifex, "1.1.1", "e8445ff780ea07c10657428051e4cf84359f2770e27d24e9d8636430662691ff", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:bundlex, "~> 1.4", [hex: :bundlex, repo: "hexpm", optional: false]}, {:shmex, "~> 0.5.0", [hex: :shmex, repo: "hexpm", optional: false]}], "hexpm", "3e2867237a5582a40cb7c88d9ed0955071ebb1c4d525345513544adc0abd3b4b"}, "websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"}, "websock_adapter": {:hex, :websock_adapter, "0.5.5", "9dfeee8269b27e958a65b3e235b7e447769f66b5b5925385f5a569269164a210", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "4b977ba4a01918acbf77045ff88de7f6972c2a009213c515a445c48f224ffce9"}, "websockex": {:hex, :websockex, "0.4.3", "92b7905769c79c6480c02daacaca2ddd49de936d912976a4d3c923723b647bf0", [:mix], [], "hexpm", "95f2e7072b85a3a4cc385602d42115b73ce0b74a9121d0d6dbbf557645ac53e4"}, diff --git a/openapi.yaml b/openapi.yaml index 8d27b495..03434e72 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -167,6 +167,11 @@ components: type: string properties: $ref: '#/components/schemas/ComponentPropertiesHLS' + tracks: + description: List of all component's tracks + items: + $ref: '#/components/schemas/Track' + type: array type: description: Component type example: hls @@ -175,6 +180,7 @@ components: - id - type - properties + - tracks title: ComponentHLS type: object x-struct: Elixir.JellyfishWeb.ApiSpec.Component.HLS @@ -185,6 +191,13 @@ components: title: HlsSkip type: string x-struct: Elixir.JellyfishWeb.ApiSpec.HLS.Params.HlsSkip + PeerMetadata: + description: Custom metadata set by the peer + example: + name: JellyfishUser + nullable: true + title: PeerMetadata + x-struct: Elixir.JellyfishWeb.ApiSpec.Peer.PeerMetadata ComponentPropertiesFile: description: Properties specific to the File component properties: @@ -211,6 +224,11 @@ components: type: string properties: $ref: '#/components/schemas/ComponentPropertiesFile' + tracks: + description: List of all component's tracks + items: + $ref: '#/components/schemas/Track' + type: array type: description: Component type example: file @@ -218,6 +236,7 @@ components: required: - id - type + - tracks title: ComponentFile type: object x-struct: Elixir.JellyfishWeb.ApiSpec.Component.File @@ -249,6 +268,11 @@ components: type: string properties: $ref: '#/components/schemas/ComponentPropertiesRTSP' + tracks: + description: List of all component's tracks + items: + $ref: '#/components/schemas/Track' + type: array type: description: Component type example: hls @@ -257,6 +281,7 @@ components: - id - type - properties + - tracks title: ComponentRTSP type: object x-struct: Elixir.JellyfishWeb.ApiSpec.Component.RTSP @@ -353,6 +378,21 @@ components: title: ComponentOptions type: object x-struct: Elixir.JellyfishWeb.ApiSpec.Component.Options + Track: + description: Describes media track of a Peer or Component + properties: + id: + type: string + metadata: + nullable: true + type: + enum: + - audio + - video + type: string + title: Track + type: object + x-struct: Elixir.JellyfishWeb.ApiSpec.Track HealthReportDistribution: description: Informs about the status of Jellyfish distribution properties: @@ -428,14 +468,23 @@ components: description: Assigned peer id example: peer-1 type: string + metadata: + $ref: '#/components/schemas/PeerMetadata' status: $ref: '#/components/schemas/PeerStatus' + tracks: + description: List of all peer's tracks + items: + $ref: '#/components/schemas/Track' + type: array type: $ref: '#/components/schemas/PeerType' required: - id - type - status + - tracks + - metadata title: Peer type: object x-struct: Elixir.JellyfishWeb.ApiSpec.Peer diff --git a/protos b/protos index 37818482..cb67f49c 160000 --- a/protos +++ b/protos @@ -1 +1 @@ -Subproject commit 3781848239f67a88866f861fa798a8c18384e666 +Subproject commit cb67f49c47250daf9a97f1296ef7be8965ef4acf diff --git a/test/jellyfish_web/controllers/component/file_component_test.exs b/test/jellyfish_web/controllers/component/file_component_test.exs index ef322c27..67a4e6d8 100644 --- a/test/jellyfish_web/controllers/component/file_component_test.exs +++ b/test/jellyfish_web/controllers/component/file_component_test.exs @@ -2,10 +2,23 @@ defmodule JellyfishWeb.Component.FileComponentTest do use JellyfishWeb.ConnCase use JellyfishWeb.ComponentCase + alias JellyfishWeb.WS + + alias Jellyfish.ServerMessage.{ + Authenticated, + Track, + TrackAdded, + TrackRemoved + } + @file_component_directory "file_component_sources" + @fixtures_directory "test/fixtures" @video_source "video.h264" @audio_source "audio.ogg" + @ws_url "ws://127.0.0.1:4002/socket/server/websocket" + @auth_response %Authenticated{} + setup_all _tags do media_sources_directory = Application.fetch_env!(:jellyfish, :media_files_path) @@ -14,13 +27,7 @@ defmodule JellyfishWeb.Component.FileComponentTest do File.mkdir_p!(media_sources_directory) - media_sources_directory - |> Path.join(@video_source) - |> File.touch!() - - media_sources_directory - |> Path.join(@audio_source) - |> File.touch!() + File.cp_r!(@fixtures_directory, media_sources_directory) on_exit(fn -> :file.del_dir_r(media_sources_directory) end) @@ -29,6 +36,8 @@ defmodule JellyfishWeb.Component.FileComponentTest do describe "Create File Component" do test "renders component with video as source", %{conn: conn, room_id: room_id} do + start_notifier() + conn = post(conn, ~p"/room/#{room_id}/component", type: "file", @@ -47,6 +56,21 @@ defmodule JellyfishWeb.Component.FileComponentTest do model_response(conn, :created, "ComponentDetailsResponse") assert_component_created(conn, room_id, id, "file") + + assert_receive %TrackAdded{ + room_id: ^room_id, + endpoint_info: {:component_id, ^id}, + track: %Track{type: :TRACK_TYPE_VIDEO, metadata: "null"} = track + } + + conn = delete(conn, ~p"/room/#{room_id}/component/#{id}") + assert response(conn, :no_content) + + assert_receive %TrackRemoved{ + room_id: ^room_id, + endpoint_info: {:component_id, ^id}, + track: ^track + } end test "renders component with video as source with framerate set", %{ @@ -74,7 +98,9 @@ defmodule JellyfishWeb.Component.FileComponentTest do assert_component_created(conn, room_id, id, "file") end - test "renders component wiht audio as source", %{conn: conn, room_id: room_id} do + test "renders component with audio as source", %{conn: conn, room_id: room_id} do + start_notifier() + conn = post(conn, ~p"/room/#{room_id}/component", type: "file", @@ -93,6 +119,25 @@ defmodule JellyfishWeb.Component.FileComponentTest do model_response(conn, :created, "ComponentDetailsResponse") assert_component_created(conn, room_id, id, "file") + + assert_receive %TrackAdded{ + room_id: ^room_id, + endpoint_info: {:component_id, ^id}, + track: + %{ + type: :TRACK_TYPE_AUDIO, + metadata: "null" + } = track + } + + conn = delete(conn, ~p"/room/#{room_id}/component/#{id}") + assert response(conn, :no_content) + + assert_receive %TrackRemoved{ + room_id: ^room_id, + endpoint_info: {:component_id, ^id}, + track: ^track + } end test "file in subdirectory", %{ @@ -234,4 +279,15 @@ defmodule JellyfishWeb.Component.FileComponentTest do "Unsupported file type" end end + + defp start_notifier() do + token = Application.fetch_env!(:jellyfish, :server_api_token) + + {:ok, ws} = WS.start_link(@ws_url, :server) + WS.send_auth_request(ws, token) + assert_receive @auth_response, 1000 + WS.subscribe(ws, :server_notification) + + ws + end end diff --git a/test/jellyfish_web/integration/peer_socket_test.exs b/test/jellyfish_web/integration/peer_socket_test.exs index 7cedc381..bcd2fa19 100644 --- a/test/jellyfish_web/integration/peer_socket_test.exs +++ b/test/jellyfish_web/integration/peer_socket_test.exs @@ -3,7 +3,7 @@ defmodule JellyfishWeb.Integration.PeerSocketTest do alias __MODULE__.Endpoint alias Jellyfish.PeerMessage - alias Jellyfish.PeerMessage.{Authenticated, AuthRequest, MediaEvent} + alias Jellyfish.PeerMessage.{Authenticated, MediaEvent} alias Jellyfish.RoomService alias JellyfishWeb.{PeerSocket, WS} @@ -62,8 +62,7 @@ defmodule JellyfishWeb.Integration.PeerSocketTest do test "invalid token", %{token: token} do {:ok, ws} = WS.start_link(@path, :peer) - auth_request = auth_request("invalid" <> token) - :ok = WS.send_binary_frame(ws, auth_request) + WS.send_auth_request(ws, "invalid" <> token) assert_receive {:disconnected, {:remote, 1000, "invalid token"}}, 1000 end @@ -76,9 +75,7 @@ defmodule JellyfishWeb.Integration.PeerSocketTest do {:ok, ws} = WS.start_link(@path, :peer) unadded_peer_token = JellyfishWeb.PeerToken.generate(%{peer_id: "peer_id", room_id: room_id}) - auth_request = auth_request(unadded_peer_token) - - :ok = WS.send_binary_frame(ws, auth_request) + WS.send_auth_request(ws, unadded_peer_token) assert_receive {:disconnected, {:remote, 1000, "peer not found"}}, 1000 end @@ -87,8 +84,7 @@ defmodule JellyfishWeb.Integration.PeerSocketTest do _conn = delete(conn, ~p"/room/#{room_id}") {:ok, ws} = WS.start_link(@path, :peer) - auth_request = auth_request(token) - :ok = WS.send_binary_frame(ws, auth_request) + WS.send_auth_request(ws, token) assert_receive {:disconnected, {:remote, 1000, "room not found"}}, 1000 end @@ -96,8 +92,7 @@ defmodule JellyfishWeb.Integration.PeerSocketTest do test "authRequest when already connected", %{token: token} do ws = create_and_authenticate(token) - auth_request = auth_request(token) - :ok = WS.send_binary_frame(ws, auth_request) + WS.send_auth_request(ws, token) refute_receive @auth_response, 1000 refute_receive {:disconnected, {:remote, 1000, _msg}} end @@ -106,8 +101,7 @@ defmodule JellyfishWeb.Integration.PeerSocketTest do create_and_authenticate(token) {:ok, ws2} = WS.start_link(@path, :peer) - auth_request = auth_request(token) - :ok = WS.send_binary_frame(ws2, auth_request) + WS.send_auth_request(ws2, token) assert_receive {:disconnected, {:remote, 1000, "peer already connected"}}, 1000 end @@ -172,16 +166,10 @@ defmodule JellyfishWeb.Integration.PeerSocketTest do end def create_and_authenticate(token) do - auth_request = auth_request(token) - {:ok, ws} = WS.start_link(@path, :peer) - :ok = WS.send_binary_frame(ws, auth_request) + WS.send_auth_request(ws, token) assert_receive @auth_response, 1000 ws end - - defp auth_request(token) do - PeerMessage.encode(%PeerMessage{content: {:auth_request, %AuthRequest{token: token}}}) - end end diff --git a/test/jellyfish_web/integration/server_notification_test.exs b/test/jellyfish_web/integration/server_notification_test.exs index b5fd04bf..1037afd9 100644 --- a/test/jellyfish_web/integration/server_notification_test.exs +++ b/test/jellyfish_web/integration/server_notification_test.exs @@ -3,6 +3,8 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do import Mox + import JellyfishWeb.WS, only: [subscribe: 2] + alias __MODULE__.Endpoint alias Jellyfish.Component.HLS @@ -13,18 +15,19 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do alias Jellyfish.ServerMessage.{ Authenticated, - AuthRequest, HlsPlayable, HlsUploadCrashed, HlsUploaded, MetricsReport, PeerConnected, PeerDisconnected, + PeerMetadataUpdated, RoomCrashed, RoomCreated, RoomDeleted, - SubscribeRequest, - SubscribeResponse + Track, + TrackAdded, + TrackRemoved } alias JellyfishWeb.{PeerSocket, ServerSocket, WS} @@ -37,6 +40,10 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do @auth_response %Authenticated{} @pubsub Jellyfish.PubSub + @file_component_directory "file_component_sources" + @fixtures_directory "test/fixtures" + @video_source "video.h264" + @max_peers 1 @source_uri "rtsp://placeholder-19inrifjbsjb.it:12345/afwefae" @@ -107,9 +114,8 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do test "invalid token" do {:ok, ws} = WS.start_link(@path, :server) server_api_token = "invalid" <> Application.fetch_env!(:jellyfish, :server_api_token) - auth_request = auth_request(server_api_token) + WS.send_auth_request(ws, server_api_token) - :ok = WS.send_binary_frame(ws, auth_request) assert_receive {:disconnected, {:remote, 1000, "invalid token"}}, 1000 end @@ -194,7 +200,7 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do end test "sends a message when peer connects and room is deleted", %{conn: conn} do - {room_id, peer_id, conn} = subscribe_on_notifications_and_connect_peer(conn) + {room_id, peer_id, conn, _ws} = subscribe_on_notifications_and_connect_peer(conn) _conn = delete(conn, ~p"/room/#{room_id}") assert_receive %RoomDeleted{room_id: ^room_id} @@ -209,7 +215,7 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do end test "sends a message when peer connects and peer is removed", %{conn: conn} do - {room_id, peer_id, conn} = subscribe_on_notifications_and_connect_peer(conn) + {room_id, peer_id, conn, _ws} = subscribe_on_notifications_and_connect_peer(conn) conn = delete(conn, ~p"/room/#{room_id}/peer/#{peer_id}") assert response(conn, :no_content) @@ -228,7 +234,7 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do end test "sends a message when peer connects and room crashes", %{conn: conn} do - {room_id, peer_id, _conn} = subscribe_on_notifications_and_connect_peer(conn) + {room_id, peer_id, _conn, _ws} = subscribe_on_notifications_and_connect_peer(conn) {:ok, room_pid} = Jellyfish.RoomService.find_room(room_id) Process.exit(room_pid, :kill) @@ -245,7 +251,7 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do end test "sends a message when peer connects and it crashes", %{conn: conn} do - {room_id, peer_id, conn} = subscribe_on_notifications_and_connect_peer(conn) + {room_id, peer_id, conn, _ws} = subscribe_on_notifications_and_connect_peer(conn) {:ok, room_pid} = Jellyfish.RoomService.find_room(room_id) @@ -264,12 +270,77 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do delete(conn, ~p"/room/#{room_id}") end + test "sends message when tracks are added or removed", %{conn: conn} do + media_sources_directory = + Application.fetch_env!(:jellyfish, :media_files_path) + |> Path.join(@file_component_directory) + |> Path.expand() + + File.mkdir_p!(media_sources_directory) + File.cp_r!(@fixtures_directory, media_sources_directory) + + {room_id, _peer_id, conn, _ws} = subscribe_on_notifications_and_connect_peer(conn) + {conn, id} = add_file_component(conn, room_id) + + assert_receive %TrackAdded{ + room_id: ^room_id, + endpoint_info: {:component_id, ^id}, + track: + %Track{ + id: _track_id, + type: :TRACK_TYPE_VIDEO, + metadata: "null" + } = + track_info + } = track_added, + 500 + + assert_receive {:webhook_notification, ^track_added}, 1_000 + + _conn = delete(conn, ~p"/room/#{room_id}/component/#{id}") + + assert_receive %TrackRemoved{ + room_id: ^room_id, + endpoint_info: {:component_id, ^id}, + track: ^track_info + } = track_removed + + assert_receive {:webhook_notification, ^track_removed}, 1_000 + + :file.del_dir_r(media_sources_directory) + end + + test "sends message when peer metadata is updated", %{conn: conn} do + {room_id, peer_id, _conn, peer_ws} = subscribe_on_notifications_and_connect_peer(conn) + + metadata = %{name: "Jellyuser"} + metadata_encoded = Jason.encode!(metadata) + + media_event = %PeerMessage.MediaEvent{ + data: %{"type" => "connect", "data" => %{"metadata" => metadata}} |> Jason.encode!() + } + + :ok = + WS.send_binary_frame( + peer_ws, + PeerMessage.encode(%PeerMessage{content: {:media_event, media_event}}) + ) + + assert_receive %PeerMetadataUpdated{ + room_id: ^room_id, + peer_id: ^peer_id, + metadata: ^metadata_encoded + } = peer_metadata_updated + + assert_receive {:webhook_notification, ^peer_metadata_updated}, 1_000 + end + describe "hls upload" do setup :verify_on_exit! setup :set_mox_from_context test "sends a message when hls was uploaded", %{conn: conn} do - {room_id, _peer_id, _conn} = subscribe_on_notifications_and_connect_peer(conn) + {room_id, _peer_id, _conn, _ws} = subscribe_on_notifications_and_connect_peer(conn) test_hls_manager(room_id, request_no: 4, status_code: 200) assert_receive %HlsUploaded{room_id: ^room_id} @@ -277,7 +348,7 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do end test "sends a message when hls upload crashed", %{conn: conn} do - {room_id, _peer_id, _conn} = subscribe_on_notifications_and_connect_peer(conn) + {room_id, _peer_id, _conn, _ws} = subscribe_on_notifications_and_connect_peer(conn) test_hls_manager(room_id, request_no: 1, status_code: 400) assert_receive %HlsUploadCrashed{room_id: ^room_id} @@ -294,8 +365,7 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do {room_id, peer_id, peer_token, _conn} = add_room_and_peer(conn, server_api_token) {:ok, peer_ws} = WS.start_link("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) - auth_request = peer_auth_request(peer_token) - :ok = WS.send_binary_frame(peer_ws, auth_request) + WS.send_auth_request(peer_ws, peer_token) assert_receive %PeerConnected{peer_id: ^peer_id, room_id: ^room_id} @@ -319,45 +389,26 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do add_room_and_peer(conn, server_api_token) {:ok, peer_ws} = WS.start("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) - auth_request = peer_auth_request(peer_token) - :ok = WS.send_binary_frame(peer_ws, auth_request) + WS.send_auth_request(peer_ws, peer_token) assert_receive %PeerConnected{peer_id: ^peer_id, room_id: ^room_id} assert_receive {:webhook_notification, %PeerConnected{peer_id: ^peer_id, room_id: ^room_id}}, 1_000 - {room_id, peer_id, conn} + {room_id, peer_id, conn, peer_ws} end def create_and_authenticate() do token = Application.fetch_env!(:jellyfish, :server_api_token) - auth_request = auth_request(token) {:ok, ws} = WS.start_link(@path, :server) - :ok = WS.send_binary_frame(ws, auth_request) + WS.send_auth_request(ws, token) assert_receive @auth_response, 1000 ws end - def subscribe(ws, event_type) do - proto_event_type = to_proto_event_type(event_type) - - msg = %ServerMessage{ - content: - {:subscribe_request, - %SubscribeRequest{ - event_type: proto_event_type - }} - } - - :ok = WS.send_binary_frame(ws, ServerMessage.encode(msg)) - - assert_receive %SubscribeResponse{event_type: ^proto_event_type} = response - response - end - defp add_room_and_peer(conn, server_api_token) do conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token) @@ -397,14 +448,17 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do {conn, id} end - defp auth_request(token) do - ServerMessage.encode(%ServerMessage{content: {:auth_request, %AuthRequest{token: token}}}) - end + defp add_file_component(conn, room_id) do + conn = + post(conn, ~p"/room/#{room_id}/component", + type: "file", + options: %{filePath: @video_source} + ) - defp peer_auth_request(token) do - PeerMessage.encode(%PeerMessage{ - content: {:auth_request, %PeerMessage.AuthRequest{token: token}} - }) + assert %{"id" => id, "properties" => %{"filePath" => @video_source}, "type" => "file"} = + json_response(conn, :created)["data"] + + {conn, id} end defp trigger_notification(conn) do @@ -412,13 +466,9 @@ defmodule JellyfishWeb.Integration.ServerNotificationTest do {_room_id, _peer_id, peer_token, _conn} = add_room_and_peer(conn, server_api_token) {:ok, peer_ws} = WS.start_link("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) - auth_request = peer_auth_request(peer_token) - :ok = WS.send_binary_frame(peer_ws, auth_request) + WS.send_auth_request(peer_ws, peer_token) end - defp to_proto_event_type(:server_notification), do: :EVENT_TYPE_SERVER_NOTIFICATION - defp to_proto_event_type(:metrics), do: :EVENT_TYPE_METRICS - defp test_hls_manager(room_id, request_no: request_no, status_code: status_code) do hls_dir = HLS.output_dir(room_id, persistent: false) options = %{s3: @s3_credentials, persistent: false} diff --git a/test/support/webhook_plug.ex b/test/support/webhook_plug.ex index 502dad40..04e30fc9 100644 --- a/test/support/webhook_plug.ex +++ b/test/support/webhook_plug.ex @@ -14,10 +14,9 @@ defmodule WebHookPlug do def call(conn, _opts) do {:ok, body, conn} = Plug.Conn.read_body(conn, []) - notification = Jason.decode!(body) notification = - notification |> Map.get("notification") |> ServerMessage.decode() |> Map.get(:content) + body |> ServerMessage.decode() |> Map.get(:content) {_notification_type, notification} = notification diff --git a/test/support/ws.ex b/test/support/ws.ex index 09787e9e..cd004922 100644 --- a/test/support/ws.ex +++ b/test/support/ws.ex @@ -6,16 +6,41 @@ defmodule JellyfishWeb.WS do alias Jellyfish.PeerMessage alias Jellyfish.ServerMessage + @spec start(String.t(), :server | :peer) :: {:ok, pid()} | {:error, term()} def start(url, type) do state = %{caller: self(), type: type} WebSockex.start(url, __MODULE__, state) end + @spec start_link(String.t(), :server | :peer) :: {:ok, pid()} | {:error, term()} def start_link(url, type) do state = %{caller: self(), type: type} WebSockex.start_link(url, __MODULE__, state) end + def send_auth_request(ws, token) do + send(ws, {:authenticate, token}) + end + + def subscribe(ws, event_type) do + proto_event_type = to_proto_event_type(event_type) + + msg = %ServerMessage{ + content: + {:subscribe_request, + %ServerMessage.SubscribeRequest{ + event_type: proto_event_type + }} + } + + :ok = send_binary_frame(ws, ServerMessage.encode(msg)) + + import ExUnit.Assertions + + assert_receive %ServerMessage.SubscribeResponse{event_type: ^proto_event_type} = response + response + end + def send_frame(ws, msg) do WebSockex.send_frame(ws, {:text, Jason.encode!(msg)}) end @@ -41,6 +66,12 @@ defmodule JellyfishWeb.WS do {:ok, state} end + @impl true + def handle_info({:authenticate, token}, state) do + request = auth_request(state.type, token) + {:reply, {:binary, request}, state} + end + @impl true def handle_disconnect(conn_status, state) do send(state.caller, {:disconnected, conn_status.reason}) @@ -56,4 +87,19 @@ defmodule JellyfishWeb.WS do %ServerMessage{content: {_atom, content}} = ServerMessage.decode(msg) content end + + defp auth_request(:peer, token) do + PeerMessage.encode(%PeerMessage{ + content: {:auth_request, %PeerMessage.AuthRequest{token: token}} + }) + end + + defp auth_request(:server, token) do + ServerMessage.encode(%ServerMessage{ + content: {:auth_request, %ServerMessage.AuthRequest{token: token}} + }) + end + + defp to_proto_event_type(:server_notification), do: :EVENT_TYPE_SERVER_NOTIFICATION + defp to_proto_event_type(:metrics), do: :EVENT_TYPE_METRICS end