From 9a74ca6c1d674c1337012d77231fbcae6cdc45e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Wed, 11 Oct 2023 11:04:12 +0200 Subject: [PATCH 01/13] Add webhooks for PeerConnected and PeerDisconnected events --- lib/jellyfish/event.ex | 16 ++++++ lib/jellyfish/room.ex | 53 +++++++++++++++---- lib/jellyfish/room_service.ex | 34 +++++++++--- lib/jellyfish_web/api_spec/room.ex | 6 +++ .../controllers/room_controller.ex | 8 ++- lib/jellyfish_web/peer_socket.ex | 8 ++- openapi.yaml | 5 ++ .../integration/server_socket_test.exs | 48 +++++++++++++++-- test/support/webhook_plug.ex | 24 +++++++++ 9 files changed, 177 insertions(+), 25 deletions(-) create mode 100644 test/support/webhook_plug.ex diff --git a/lib/jellyfish/event.ex b/lib/jellyfish/event.ex index 6c982f0c..2854b887 100644 --- a/lib/jellyfish/event.ex +++ b/lib/jellyfish/event.ex @@ -1,6 +1,8 @@ defmodule Jellyfish.Event do @moduledoc false + alias Jellyfish.Room + alias Jellyfish.ServerMessage.{ ComponentCrashed, HlsPlayable, @@ -20,6 +22,20 @@ defmodule Jellyfish.Event do Phoenix.PubSub.broadcast(@pubsub, Atom.to_string(topic), {topic, message}) end + def broadcast_room(topic, message, room_id) when topic in @valid_topics do + Phoenix.PubSub.broadcast(@pubsub, Atom.to_string(topic), {topic, message}) + + {atom, notification} = to_proto_server_notification(message) + + notification = + notification + |> Map.from_struct() + |> Map.put(:type, Atom.to_string(atom)) + |> Map.delete(:__unknown_fields__) + + Room.receive_room_notification(room_id, notification) + end + def subscribe(topic) when topic in @valid_topics do Phoenix.PubSub.subscribe(@pubsub, Atom.to_string(topic)) end diff --git a/lib/jellyfish/room.ex b/lib/jellyfish/room.ex index 223ba769..8bcea49a 100644 --- a/lib/jellyfish/room.ex +++ b/lib/jellyfish/room.ex @@ -19,11 +19,13 @@ defmodule Jellyfish.Room do :id, :config, :engine_pid, - :network_options + :network_options, + :webhook_url ] defstruct @enforce_keys ++ [components: %{}, peers: %{}] @type id :: String.t() + @type webhook_url :: non_neg_integer() | nil @type max_peers :: non_neg_integer() | nil @type video_codec :: :h264 | :vp8 | nil @@ -34,6 +36,7 @@ defmodule Jellyfish.Room do * `components` - map of components * `peers` - map of peers * `engine` - pid of engine + * `webhook_url` - url on which notifcations from this room should be send """ @type t :: %__MODULE__{ id: id(), @@ -45,14 +48,18 @@ defmodule Jellyfish.Room do components: %{Component.id() => Component.t()}, peers: %{Peer.id() => Peer.t()}, engine_pid: pid(), - network_options: map() + network_options: map(), + webhook_url: String.t() } - @spec start(max_peers(), video_codec()) :: {:ok, pid(), id()} - def start(max_peers, video_codec) do + @spec start(max_peers(), video_codec(), String.t()) :: {:ok, pid(), id()} + def start(max_peers, video_codec, webhook_url) do id = UUID.uuid4() - {:ok, pid} = GenServer.start(__MODULE__, [id, max_peers, video_codec], name: registry_id(id)) + {:ok, pid} = + GenServer.start(__MODULE__, [id, max_peers, video_codec, webhook_url], + name: registry_id(id) + ) {:ok, pid, id} end @@ -119,9 +126,14 @@ defmodule Jellyfish.Room do GenServer.cast(registry_id(room_id), {:media_event, peer_id, event}) end + @spec receive_room_notification(id(), any()) :: :ok + def receive_room_notification(room_id, notification) do + GenServer.cast(registry_id(room_id), {:room_notification, notification}) + end + @impl true - def init([id, max_peers, video_codec]) do - state = new(id, max_peers, video_codec) + def init([id, max_peers, video_codec, webhook_url]) do + state = new(id, max_peers, video_codec, webhook_url) Logger.metadata(room_id: id) Logger.info("Initialize room") @@ -286,6 +298,24 @@ defmodule Jellyfish.Room do {:noreply, state} end + @impl true + def handle_cast({:room_notification, _notification}, state) when is_nil(state.webhook_url) do + {:noreply, state} + end + + @impl true + def handle_cast({:room_notification, notification}, state) do + case HTTPoison.post(state.webhook_url, Jason.encode!(notification)) do + {:ok, _result} -> + nil + + {:error, error} -> + Logger.warning("Sending notification through webhook fails with error: #{inspect(error)}") + end + + {:noreply, state} + end + @impl true def handle_info(%Message.EndpointMessage{endpoint_id: to, message: {:media_event, data}}, state) do with {:ok, peer} <- Map.fetch(state.peers, to), @@ -376,7 +406,9 @@ defmodule Jellyfish.Room do :ok end - defp new(id, max_peers, video_codec) do + def registry_id(room_id), do: {:via, Registry, {Jellyfish.RoomRegistry, room_id}} + + defp new(id, max_peers, video_codec, webhook_url) do rtc_engine_options = [ id: id ] @@ -410,7 +442,8 @@ defmodule Jellyfish.Room do id: id, config: %{max_peers: max_peers, video_codec: video_codec}, engine_pid: pid, - network_options: [turn_options: turn_options] + network_options: [turn_options: turn_options], + webhook_url: webhook_url } end @@ -430,8 +463,6 @@ defmodule Jellyfish.Room do defp remove_hls_processes(_room_id, _metadata), do: nil - defp registry_id(room_id), do: {:via, Registry, {Jellyfish.RoomRegistry, room_id}} - defp check_component_allowed(Component.HLS, %{ config: %{video_codec: video_codec}, components: components diff --git a/lib/jellyfish/room_service.ex b/lib/jellyfish/room_service.ex index 935a9c7c..6b73a490 100644 --- a/lib/jellyfish/room_service.ex +++ b/lib/jellyfish/room_service.ex @@ -54,9 +54,9 @@ defmodule Jellyfish.RoomService do |> Enum.reject(&(&1 == nil)) end - @spec create_room(Room.max_peers(), String.t()) :: + @spec create_room(Room.max_peers(), String.t(), String.t()) :: {:ok, Room.t(), String.t()} | {:error, :invalid_max_peers | :invalid_video_codec} - def create_room(max_peers, video_codec) do + def create_room(max_peers, video_codec, webhook_url) do {node_resources, failed_nodes} = :rpc.multicall(Jellyfish.RoomService, :get_resource_usage, []) @@ -80,9 +80,9 @@ defmodule Jellyfish.RoomService do if Enum.count(node_resources) > 1 do Logger.info("Node with least used resources is #{inspect(min_node)}") - GenServer.call({__MODULE__, min_node}, {:create_room, max_peers, video_codec}) + GenServer.call({__MODULE__, min_node}, {:create_room, max_peers, video_codec, webhook_url}) else - GenServer.call(__MODULE__, {:create_room, max_peers, video_codec}) + GenServer.call(__MODULE__, {:create_room, max_peers, video_codec, webhook_url}) end end @@ -131,10 +131,11 @@ defmodule Jellyfish.RoomService do end @impl true - def handle_call({:create_room, max_peers, video_codec}, _from, state) do + def handle_call({:create_room, max_peers, video_codec, webhook_url}, _from, state) do with :ok <- validate_max_peers(max_peers), - {:ok, video_codec} <- codec_to_atom(video_codec) do - {:ok, room_pid, room_id} = Room.start(max_peers, video_codec) + {:ok, video_codec} <- codec_to_atom(video_codec), + :ok <- validate_webhook_url(webhook_url) do + {:ok, room_pid, room_id} = Room.start(max_peers, video_codec, webhook_url) room = Room.get_state(room_id) Process.monitor(room_pid) @@ -152,6 +153,9 @@ defmodule Jellyfish.RoomService do {:error, :video_codec} -> {:reply, {:error, :invalid_video_codec}, state} + + {:error, :not_valid_url} -> + {:reply, {:error, :invalid_webhook_url}, state} end end @@ -217,7 +221,7 @@ defmodule Jellyfish.RoomService do end defp remove_room(room_id) do - room = {:via, Registry, {Jellyfish.RoomRegistry, room_id}} + room = Room.registry_id(room_id) try do :ok = GenServer.stop(room, :normal) @@ -234,6 +238,20 @@ defmodule Jellyfish.RoomService do defp validate_max_peers(max_peers) when is_integer(max_peers) and max_peers >= 0, do: :ok defp validate_max_peers(_max_peers), do: {:error, :max_peers} + defp validate_webhook_url(nil), do: :ok + + defp validate_webhook_url(uri) do + uri + |> URI.parse() + |> Map.take([:host, :path, :scheme]) + |> Enum.all?(fn {_key, value} -> not is_nil(value) end) + |> if do + :ok + else + {:error, :not_valid_url} + end + end + defp codec_to_atom("h264"), do: {:ok, :h264} defp codec_to_atom("vp8"), do: {:ok, :vp8} defp codec_to_atom(nil), do: {:ok, nil} diff --git a/lib/jellyfish_web/api_spec/room.ex b/lib/jellyfish_web/api_spec/room.ex index d1261f49..de323722 100644 --- a/lib/jellyfish_web/api_spec/room.ex +++ b/lib/jellyfish_web/api_spec/room.ex @@ -26,6 +26,12 @@ defmodule JellyfishWeb.ApiSpec.Room do type: :string, enum: ["h264", "vp8"], nullable: true + }, + webhookUrl: %Schema{ + description: "Address on which HTTP requests will be send with jellyfish notifications", + type: :string, + example: "https://backend.address.com/jellyfish-notifications-endpoint", + nullable: true } } }) diff --git a/lib/jellyfish_web/controllers/room_controller.ex b/lib/jellyfish_web/controllers/room_controller.ex index 6992e6cc..4e51663e 100644 --- a/lib/jellyfish_web/controllers/room_controller.ex +++ b/lib/jellyfish_web/controllers/room_controller.ex @@ -73,7 +73,9 @@ defmodule JellyfishWeb.RoomController do def create(conn, params) do with max_peers <- Map.get(params, "maxPeers"), video_codec <- Map.get(params, "videoCodec"), - {:ok, room, jellyfish_address} <- RoomService.create_room(max_peers, video_codec) do + webhook_url <- Map.get(params, "webhookUrl"), + {:ok, room, jellyfish_address} <- + RoomService.create_room(max_peers, video_codec, webhook_url) do conn |> put_resp_content_type("application/json") |> put_status(:created) @@ -84,6 +86,10 @@ defmodule JellyfishWeb.RoomController do {:error, :invalid_video_codec} -> {:error, :bad_request, "videoCodec must be 'h264' or 'vp8'"} + + {:error, :invalid_webhook_url} -> + webhook_url = Map.get(params, "webhookUrl") + {:error, :bad_request, "webhookUrl must be valid URI, received url was: #{webhook_url}"} end end diff --git a/lib/jellyfish_web/peer_socket.ex b/lib/jellyfish_web/peer_socket.ex index 7dd5e6cb..a71bcf70 100644 --- a/lib/jellyfish_web/peer_socket.ex +++ b/lib/jellyfish_web/peer_socket.ex @@ -48,7 +48,7 @@ defmodule JellyfishWeb.PeerSocket do room_pid: room_pid }) - Event.broadcast(:server_notification, {:peer_connected, room_id, peer_id}) + Event.broadcast_room(:server_notification, {:peer_connected, room_id, peer_id}, room_id) {:reply, :ok, {:binary, encoded_message}, state} else @@ -135,7 +135,11 @@ defmodule JellyfishWeb.PeerSocket do """) if Map.has_key?(state, :peer_id) do - Event.broadcast(:server_notification, {:peer_disconnected, state.room_id, state.peer_id}) + Event.broadcast_room( + :server_notification, + {:peer_disconnected, state.room_id, state.peer_id}, + state.room_id + ) end :ok diff --git a/openapi.yaml b/openapi.yaml index f509c065..0ceba3f0 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -319,6 +319,11 @@ components: - vp8 nullable: true type: string + webhookUrl: + description: Address on which HTTP requests will be send with jellyfish notifications + example: https://backend.address.com/jellyfish-notifications-endpoint + nullable: true + type: string title: RoomConfig type: object x-struct: Elixir.JellyfishWeb.ApiSpec.Room.Config diff --git a/test/jellyfish_web/integration/server_socket_test.exs b/test/jellyfish_web/integration/server_socket_test.exs index 4d165298..67a9b1f2 100644 --- a/test/jellyfish_web/integration/server_socket_test.exs +++ b/test/jellyfish_web/integration/server_socket_test.exs @@ -22,10 +22,13 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do } alias JellyfishWeb.{PeerSocket, ServerSocket, WS} + alias Phoenix.PubSub @port 5907 + @webhook_port 2137 @path "ws://127.0.0.1:#{@port}/socket/server/websocket" @auth_response %Authenticated{} + @pubsub Jellyfish.PubSub @max_peers 1 @@ -57,6 +60,12 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do setup_all do assert {:ok, _pid} = Endpoint.start_link() + + webserver = + {Plug.Cowboy, plug: WebHookPlug, scheme: :http, options: [port: @webhook_port]} + + {:ok, _pid} = Supervisor.start_link([webserver], strategy: :one_for_one) + :ok end @@ -176,7 +185,7 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do assert_receive %RoomCrashed{room_id: ^room_id} end - test "sends a message when peer connects", %{conn: conn} do + test "sends a message when peer connects without webhook", %{conn: conn} do server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) ws = create_and_authenticate() @@ -196,6 +205,37 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do assert_receive %PeerDisconnected{peer_id: ^peer_id, room_id: ^room_id} end + test "sends a message when peer connects with webhook", %{conn: conn} do + :ok = PubSub.subscribe(@pubsub, "webhook") + server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) + ws = create_and_authenticate() + + subscribe(ws, :server_notification) + + {room_id, peer_id, peer_token, conn} = + add_room_and_peer(conn, server_api_token, "http://127.0.0.1:#{@webhook_port}/") + + {:ok, peer_ws} = WS.start_link("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) + auth_request = peer_auth_request(peer_token) + :ok = WS.send_binary_frame(peer_ws, auth_request) + + assert_receive %PeerConnected{peer_id: ^peer_id, room_id: ^room_id} + + assert_receive %{"type" => "peer_connected", "peer_id" => ^peer_id, "room_id" => ^room_id}, + 1_000 + + conn = delete(conn, ~p"/room/#{room_id}/peer/#{peer_id}") + assert response(conn, :no_content) + + assert_receive %PeerDisconnected{peer_id: ^peer_id, room_id: ^room_id} + + assert_receive %{"type" => "peer_disconnected", "peer_id" => ^peer_id, "room_id" => ^room_id}, + 1_000 + + conn = delete(conn, ~p"/room/#{room_id}") + assert response(conn, :no_content) + end + def create_and_authenticate() do token = Application.fetch_env!(:jellyfish, :server_api_token) auth_request = auth_request(token) @@ -248,10 +288,12 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do response end - defp add_room_and_peer(conn, server_api_token) do + defp add_room_and_peer(conn, server_api_token, webhook_url \\ nil) do conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token) - conn = post(conn, ~p"/room", maxPeers: @max_peers, videoCodec: "h264") + conn = + post(conn, ~p"/room", maxPeers: @max_peers, videoCodec: "h264", webhookUrl: webhook_url) + assert %{"id" => room_id} = json_response(conn, :created)["data"]["room"] conn = post(conn, ~p"/room/#{room_id}/peer", type: "webrtc") diff --git a/test/support/webhook_plug.ex b/test/support/webhook_plug.ex new file mode 100644 index 00000000..09538f98 --- /dev/null +++ b/test/support/webhook_plug.ex @@ -0,0 +1,24 @@ +defmodule WebHookPlug do + @moduledoc false + import Plug.Conn + alias Phoenix.PubSub + + @pubsub Jellyfish.PubSub + + def init(opts) do + # initialize options + + opts + end + + def call(conn, _opts) do + {:ok, body, conn} = Plug.Conn.read_body(conn, []) + notification = Jason.decode!(body) + + :ok = PubSub.broadcast(@pubsub, "webhook", notification) + + conn + |> put_resp_content_type("text/plain") + |> send_resp(200, "OK") + end +end From 20bb9830f380f5d74d522e79dc3723a4b37724a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Wed, 11 Oct 2023 13:51:08 +0200 Subject: [PATCH 02/13] Add support for other notifications --- lib/jellyfish/event.ex | 28 +++++++--- lib/jellyfish/metrics_scraper.ex | 2 +- lib/jellyfish/room.ex | 26 ++++++---- lib/jellyfish/room_service.ex | 15 +++--- lib/jellyfish_web/peer_socket.ex | 5 +- .../integration/server_socket_test.exs | 52 +++++++------------ test/support/webhook_plug.ex | 4 ++ 7 files changed, 72 insertions(+), 60 deletions(-) diff --git a/lib/jellyfish/event.ex b/lib/jellyfish/event.ex index 2854b887..8e69a010 100644 --- a/lib/jellyfish/event.ex +++ b/lib/jellyfish/event.ex @@ -3,6 +3,8 @@ defmodule Jellyfish.Event do alias Jellyfish.Room + alias Jellyfish.ServerMessage + alias Jellyfish.ServerMessage.{ ComponentCrashed, HlsPlayable, @@ -18,20 +20,30 @@ defmodule Jellyfish.Event do @pubsub Jellyfish.PubSub @valid_topics [:server_notification, :metrics] - def broadcast(topic, message) when topic in @valid_topics do + def broadcast_metrics(message) do + topic = :metrics + Phoenix.PubSub.broadcast(@pubsub, Atom.to_string(topic), {topic, message}) + end + + def broadcast_server_notification({type, _content} = message, webhook_url) + when type in [:room_deleted, :room_crashed] do + topic = :server_notification Phoenix.PubSub.broadcast(@pubsub, Atom.to_string(topic), {topic, message}) + + content = to_proto({topic, message}) + + notification = %ServerMessage{content: content} |> ServerMessage.encode() + + Room.send_webhook_notification(notification, webhook_url) end - def broadcast_room(topic, message, room_id) when topic in @valid_topics do + def broadcast_server_notification(message, room_id) do + topic = :server_notification Phoenix.PubSub.broadcast(@pubsub, Atom.to_string(topic), {topic, message}) - {atom, notification} = to_proto_server_notification(message) + content = to_proto({topic, message}) - notification = - notification - |> Map.from_struct() - |> Map.put(:type, Atom.to_string(atom)) - |> Map.delete(:__unknown_fields__) + notification = %ServerMessage{content: content} |> ServerMessage.encode() Room.receive_room_notification(room_id, notification) end diff --git a/lib/jellyfish/metrics_scraper.ex b/lib/jellyfish/metrics_scraper.ex index c9f8f49e..75eaa7a0 100644 --- a/lib/jellyfish/metrics_scraper.ex +++ b/lib/jellyfish/metrics_scraper.ex @@ -45,7 +45,7 @@ defmodule Jellyfish.MetricsScraper do report |> prepare_report(state) - |> then(&Event.broadcast(:metrics, &1)) + |> then(&Event.broadcast_metrics(&1)) Process.send_after(self(), :scrape, state.scrape_interval) diff --git a/lib/jellyfish/room.ex b/lib/jellyfish/room.ex index 8bcea49a..49f384b2 100644 --- a/lib/jellyfish/room.ex +++ b/lib/jellyfish/room.ex @@ -305,13 +305,7 @@ defmodule Jellyfish.Room do @impl true def handle_cast({:room_notification, notification}, state) do - case HTTPoison.post(state.webhook_url, Jason.encode!(notification)) do - {:ok, _result} -> - nil - - {:error, error} -> - Logger.warning("Sending notification through webhook fails with error: #{inspect(error)}") - end + send_webhook_notification(notification, state.webhook_url) {:noreply, state} end @@ -341,7 +335,7 @@ defmodule Jellyfish.Room do Logger.error("RTC Engine endpoint #{inspect(endpoint_id)} crashed") if Map.has_key?(state.peers, endpoint_id) do - Event.broadcast(:server_notification, {:peer_crashed, state.id, endpoint_id}) + Event.broadcast_server_notification({:peer_crashed, state.id, endpoint_id}, state.id) peer = Map.fetch!(state.peers, endpoint_id) @@ -349,7 +343,7 @@ defmodule Jellyfish.Room do send(peer.socket_pid, {:stop_connection, :endpoint_crashed}) end else - Event.broadcast(:server_notification, {:component_crashed, state.id, endpoint_id}) + Event.broadcast_server_notification({:component_crashed, state.id, endpoint_id}, state.id) component = Map.get(state.components, endpoint_id) if component.type == Component.HLS, do: remove_hls_processes(state.id, component.metadata) @@ -384,7 +378,7 @@ defmodule Jellyfish.Room do if type == Component.HLS, do: id end) - Event.broadcast(:server_notification, {:hls_playable, state.id, endpoint_id}) + Event.broadcast_server_notification({:hls_playable, state.id, endpoint_id}, state.id) state = update_in(state, [:components, endpoint_id, :metadata], &Map.put(&1, :playable, true)) {:noreply, state} @@ -408,6 +402,18 @@ defmodule Jellyfish.Room do def registry_id(room_id), do: {:via, Registry, {Jellyfish.RoomRegistry, room_id}} + def send_webhook_notification(notification, webhook_url) when not is_nil(webhook_url) do + case HTTPoison.post(webhook_url, Jason.encode!(%{notification: notification})) do + {:ok, _result} -> + nil + + {:error, error} -> + Logger.warning("Sending notification through webhook fails with error: #{inspect(error)}") + end + end + + def send_webhook_notification(_notification, _state), do: :ok + defp new(id, max_peers, video_codec, webhook_url) do rtc_engine_options = [ id: id diff --git a/lib/jellyfish/room_service.ex b/lib/jellyfish/room_service.ex index 6b73a490..2ef7256c 100644 --- a/lib/jellyfish/room_service.ex +++ b/lib/jellyfish/room_service.ex @@ -121,7 +121,7 @@ defmodule Jellyfish.RoomService do @impl true def init(_opts) do - {:ok, %{rooms: %{}}, {:continue, nil}} + {:ok, %{rooms: %{}, rooms_webhook: %{}}, {:continue, nil}} end @impl true @@ -141,10 +141,11 @@ defmodule Jellyfish.RoomService do Process.monitor(room_pid) state = put_in(state, [:rooms, room_pid], room_id) + state = put_in(state, [:rooms_webhook, room_id], webhook_url) Logger.info("Created room #{inspect(room.id)}") - Event.broadcast(:server_notification, {:room_created, room_id}) + Event.broadcast_server_notification({:room_created, room_id}, room_id) {:reply, {:ok, room, Application.fetch_env!(:jellyfish, :address)}, state} else @@ -164,7 +165,7 @@ defmodule Jellyfish.RoomService do response = case find_room(room_id) do {:ok, _room_pid} -> - remove_room(room_id) + remove_room(room_id, state) :ok {:error, _} -> @@ -188,11 +189,12 @@ defmodule Jellyfish.RoomService do @impl true def handle_info({:DOWN, _ref, :process, pid, reason}, state) do {room_id, state} = pop_in(state, [:rooms, pid]) + {webhook_url, state} = pop_in(state, [:rooms_webhook, room_id]) Logger.warning("Process #{room_id} is down with reason: #{reason}") Phoenix.PubSub.broadcast(Jellyfish.PubSub, room_id, :room_crashed) - Event.broadcast(:server_notification, {:room_crashed, room_id}) + Event.broadcast_server_notification({:room_crashed, room_id}, webhook_url) {:noreply, state} end @@ -220,14 +222,15 @@ defmodule Jellyfish.RoomService do |> Registry.select([{{:"$1", :_, :_}, [], [:"$1"]}]) end - defp remove_room(room_id) do + defp remove_room(room_id, state) do room = Room.registry_id(room_id) try do :ok = GenServer.stop(room, :normal) Logger.info("Deleted room #{inspect(room_id)}") + webhook_url = get_in(state, [:rooms_webhook, room_id]) - Event.broadcast(:server_notification, {:room_deleted, room_id}) + Event.broadcast_server_notification({:room_deleted, room_id}, webhook_url) catch :exit, {:noproc, {GenServer, :stop, [^room, :normal, :infinity]}} -> Logger.warning("Room process with id #{inspect(room_id)} doesn't exist") diff --git a/lib/jellyfish_web/peer_socket.ex b/lib/jellyfish_web/peer_socket.ex index a71bcf70..b423908f 100644 --- a/lib/jellyfish_web/peer_socket.ex +++ b/lib/jellyfish_web/peer_socket.ex @@ -48,7 +48,7 @@ defmodule JellyfishWeb.PeerSocket do room_pid: room_pid }) - Event.broadcast_room(:server_notification, {:peer_connected, room_id, peer_id}, room_id) + Event.broadcast_server_notification({:peer_connected, room_id, peer_id}, room_id) {:reply, :ok, {:binary, encoded_message}, state} else @@ -135,8 +135,7 @@ defmodule JellyfishWeb.PeerSocket do """) if Map.has_key?(state, :peer_id) do - Event.broadcast_room( - :server_notification, + Event.broadcast_server_notification( {:peer_disconnected, state.room_id, state.peer_id}, state.room_id ) diff --git a/test/jellyfish_web/integration/server_socket_test.exs b/test/jellyfish_web/integration/server_socket_test.exs index 67a9b1f2..fb486b90 100644 --- a/test/jellyfish_web/integration/server_socket_test.exs +++ b/test/jellyfish_web/integration/server_socket_test.exs @@ -26,6 +26,7 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do @port 5907 @webhook_port 2137 + @webhook_url "http://127.0.0.1:#{@webhook_port}/" @path "ws://127.0.0.1:#{@port}/socket/server/websocket" @auth_response %Authenticated{} @pubsub Jellyfish.PubSub @@ -70,6 +71,8 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do end setup(%{conn: conn}) do + :ok = PubSub.subscribe(@pubsub, "webhook") + on_exit(fn -> server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token) @@ -135,7 +138,8 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do {:ok, room_pid} = RoomService.find_room(room_id) send(room_pid, {:playlist_playable, :video, "hls_output/#{room_id}"}) - assert_receive %HlsPlayable{room_id: room_id, component_id: ^hls_id} + assert_receive %HlsPlayable{room_id: ^room_id, component_id: ^hls_id} + assert_receive {:hls_playable, %HlsPlayable{room_id: ^room_id, component_id: ^hls_id}}, 1_000 conn = delete(conn, ~p"/room/#{room_id}/") assert response(conn, :no_content) @@ -157,15 +161,17 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token) - conn = post(conn, ~p"/room", maxPeers: 1) + conn = post(conn, ~p"/room", maxPeers: 1, webhookUrl: @webhook_url) assert %{"id" => room_id} = json_response(conn, :created)["data"]["room"] assert_receive %RoomCreated{room_id: ^room_id} + assert_receive {:room_created, %RoomCreated{room_id: ^room_id}}, 1_000 conn = delete(conn, ~p"/room/#{room_id}") assert response(conn, :no_content) assert_receive %RoomDeleted{room_id: ^room_id} + assert_receive {:room_deleted, %RoomDeleted{room_id: ^room_id}}, 1_000 end test "sends a message when room crashes", %{conn: conn} do @@ -176,60 +182,38 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token) - conn = post(conn, ~p"/room", maxPeers: 1) + conn = post(conn, ~p"/room", maxPeers: 1, webhookUrl: @webhook_url) assert %{"id" => room_id} = json_response(conn, :created)["data"]["room"] {:ok, room_pid} = Jellyfish.RoomService.find_room(room_id) Process.exit(room_pid, :kill) assert_receive %RoomCrashed{room_id: ^room_id} + assert_receive {:room_crashed, %RoomCrashed{room_id: ^room_id}}, 2_000 end - test "sends a message when peer connects without webhook", %{conn: conn} do - server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) - ws = create_and_authenticate() - - subscribe(ws, :server_notification) - - {room_id, peer_id, peer_token, conn} = add_room_and_peer(conn, server_api_token) - - {:ok, peer_ws} = WS.start_link("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) - auth_request = peer_auth_request(peer_token) - :ok = WS.send_binary_frame(peer_ws, auth_request) - - assert_receive %PeerConnected{peer_id: ^peer_id, room_id: ^room_id} - - conn = delete(conn, ~p"/room/#{room_id}/") - assert response(conn, :no_content) - - assert_receive %PeerDisconnected{peer_id: ^peer_id, room_id: ^room_id} - end - - test "sends a message when peer connects with webhook", %{conn: conn} do - :ok = PubSub.subscribe(@pubsub, "webhook") + test "sends a message when peer connects", %{conn: conn} do server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) ws = create_and_authenticate() subscribe(ws, :server_notification) {room_id, peer_id, peer_token, conn} = - add_room_and_peer(conn, server_api_token, "http://127.0.0.1:#{@webhook_port}/") + add_room_and_peer(conn, server_api_token) {:ok, peer_ws} = WS.start_link("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) auth_request = peer_auth_request(peer_token) :ok = WS.send_binary_frame(peer_ws, auth_request) assert_receive %PeerConnected{peer_id: ^peer_id, room_id: ^room_id} - - assert_receive %{"type" => "peer_connected", "peer_id" => ^peer_id, "room_id" => ^room_id}, - 1_000 + assert_receive {:peer_connected, %PeerConnected{peer_id: ^peer_id, room_id: ^room_id}}, 1_000 conn = delete(conn, ~p"/room/#{room_id}/peer/#{peer_id}") assert response(conn, :no_content) assert_receive %PeerDisconnected{peer_id: ^peer_id, room_id: ^room_id} - assert_receive %{"type" => "peer_disconnected", "peer_id" => ^peer_id, "room_id" => ^room_id}, + assert_receive {:peer_disconnected, %PeerDisconnected{peer_id: ^peer_id, room_id: ^room_id}}, 1_000 conn = delete(conn, ~p"/room/#{room_id}") @@ -288,11 +272,15 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do response end - defp add_room_and_peer(conn, server_api_token, webhook_url \\ nil) do + defp add_room_and_peer(conn, server_api_token) do conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token) conn = - post(conn, ~p"/room", maxPeers: @max_peers, videoCodec: "h264", webhookUrl: webhook_url) + post(conn, ~p"/room", + maxPeers: @max_peers, + videoCodec: "h264", + webhookUrl: @webhook_url + ) assert %{"id" => room_id} = json_response(conn, :created)["data"]["room"] diff --git a/test/support/webhook_plug.ex b/test/support/webhook_plug.ex index 09538f98..97593300 100644 --- a/test/support/webhook_plug.ex +++ b/test/support/webhook_plug.ex @@ -1,6 +1,7 @@ defmodule WebHookPlug do @moduledoc false import Plug.Conn + alias Jellyfish.ServerMessage alias Phoenix.PubSub @pubsub Jellyfish.PubSub @@ -15,6 +16,9 @@ defmodule WebHookPlug do {:ok, body, conn} = Plug.Conn.read_body(conn, []) notification = Jason.decode!(body) + notification = + notification |> Map.get("notification") |> ServerMessage.decode() |> Map.get(:content) + :ok = PubSub.broadcast(@pubsub, "webhook", notification) conn From 3aa5b251b541a7a6a1d6f99be6fa579a13a27647 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= <56085570+Rados13@users.noreply.github.com> Date: Thu, 12 Oct 2023 11:08:51 +0200 Subject: [PATCH 03/13] Update lib/jellyfish/room.ex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Michał Śledź --- lib/jellyfish/room.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/jellyfish/room.ex b/lib/jellyfish/room.ex index 49f384b2..25df4adc 100644 --- a/lib/jellyfish/room.ex +++ b/lib/jellyfish/room.ex @@ -36,7 +36,7 @@ defmodule Jellyfish.Room do * `components` - map of components * `peers` - map of peers * `engine` - pid of engine - * `webhook_url` - url on which notifcations from this room should be send + * `webhook_url` - url where notifcations from this room should be sent """ @type t :: %__MODULE__{ id: id(), From 78641fd27848a75da324a1bf426d1e5c13e15635 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= <56085570+Rados13@users.noreply.github.com> Date: Thu, 12 Oct 2023 11:09:00 +0200 Subject: [PATCH 04/13] Update lib/jellyfish_web/api_spec/room.ex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Michał Śledź --- lib/jellyfish_web/api_spec/room.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/jellyfish_web/api_spec/room.ex b/lib/jellyfish_web/api_spec/room.ex index de323722..83670d67 100644 --- a/lib/jellyfish_web/api_spec/room.ex +++ b/lib/jellyfish_web/api_spec/room.ex @@ -28,7 +28,7 @@ defmodule JellyfishWeb.ApiSpec.Room do nullable: true }, webhookUrl: %Schema{ - description: "Address on which HTTP requests will be send with jellyfish notifications", +description: "URL where Jellyfish notifications will be sent", type: :string, example: "https://backend.address.com/jellyfish-notifications-endpoint", nullable: true From 50a130f02c6ac045180d4124efd47047e4c1944d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= <56085570+Rados13@users.noreply.github.com> Date: Thu, 12 Oct 2023 11:09:50 +0200 Subject: [PATCH 05/13] Update lib/jellyfish_web/controllers/room_controller.ex MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Michał Śledź --- lib/jellyfish_web/controllers/room_controller.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/jellyfish_web/controllers/room_controller.ex b/lib/jellyfish_web/controllers/room_controller.ex index 4e51663e..71e764dd 100644 --- a/lib/jellyfish_web/controllers/room_controller.ex +++ b/lib/jellyfish_web/controllers/room_controller.ex @@ -89,7 +89,7 @@ defmodule JellyfishWeb.RoomController do {:error, :invalid_webhook_url} -> webhook_url = Map.get(params, "webhookUrl") - {:error, :bad_request, "webhookUrl must be valid URI, received url was: #{webhook_url}"} +{:error, :bad_request, "webhookUrl must be valid URL, received URL was: #{webhook_url}"} end end From ffe7592d59095226922a3a4fe8b64e2f39dcbfab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Thu, 12 Oct 2023 12:16:02 +0200 Subject: [PATCH 06/13] Add WebhookNotifier --- lib/jellyfish/application.ex | 3 +- lib/jellyfish/event.ex | 30 +-------- lib/jellyfish/room.ex | 58 ++++------------ lib/jellyfish/room_service.ex | 24 ++++--- lib/jellyfish/webhook_notifier.ex | 66 +++++++++++++++++++ lib/jellyfish_web/api_spec/room.ex | 2 +- .../controllers/room_controller.ex | 2 +- lib/jellyfish_web/peer_socket.ex | 7 +- 8 files changed, 100 insertions(+), 92 deletions(-) create mode 100644 lib/jellyfish/webhook_notifier.ex diff --git a/lib/jellyfish/application.ex b/lib/jellyfish/application.ex index 362bf95d..b181e70e 100644 --- a/lib/jellyfish/application.ex +++ b/lib/jellyfish/application.ex @@ -27,6 +27,7 @@ defmodule Jellyfish.Application do JellyfishWeb.Endpoint, # Start the RoomService Jellyfish.RoomService, + Jellyfish.WebhookNotifier, {Registry, keys: :unique, name: Jellyfish.RoomRegistry}, {Registry, keys: :unique, name: Jellyfish.RequestHandlerRegistry}, # Start the Telemetry supervisor (must be started after Jellyfish.RoomRegistry) @@ -98,7 +99,7 @@ defmodule Jellyfish.Application do Couldn't start epmd daemon. Epmd is required to run Jellyfish in a distributed mode. You can try to start it manually with: - + epmd -daemon and run Jellyfish again. diff --git a/lib/jellyfish/event.ex b/lib/jellyfish/event.ex index 8e69a010..77505419 100644 --- a/lib/jellyfish/event.ex +++ b/lib/jellyfish/event.ex @@ -1,10 +1,6 @@ defmodule Jellyfish.Event do @moduledoc false - alias Jellyfish.Room - - alias Jellyfish.ServerMessage - alias Jellyfish.ServerMessage.{ ComponentCrashed, HlsPlayable, @@ -20,32 +16,12 @@ defmodule Jellyfish.Event do @pubsub Jellyfish.PubSub @valid_topics [:server_notification, :metrics] - def broadcast_metrics(message) do - topic = :metrics - Phoenix.PubSub.broadcast(@pubsub, Atom.to_string(topic), {topic, message}) - end - - def broadcast_server_notification({type, _content} = message, webhook_url) - when type in [:room_deleted, :room_crashed] do - topic = :server_notification - Phoenix.PubSub.broadcast(@pubsub, Atom.to_string(topic), {topic, message}) - - content = to_proto({topic, message}) - - notification = %ServerMessage{content: content} |> ServerMessage.encode() + def broadcast_metrics(message), do: broadcast(:metrics, message) - Room.send_webhook_notification(notification, webhook_url) - end + def broadcast_server_notification(message), do: broadcast(:server_notification, message) - def broadcast_server_notification(message, room_id) do - topic = :server_notification + defp broadcast(topic, message) do Phoenix.PubSub.broadcast(@pubsub, Atom.to_string(topic), {topic, message}) - - content = to_proto({topic, message}) - - notification = %ServerMessage{content: content} |> ServerMessage.encode() - - Room.receive_room_notification(room_id, notification) end def subscribe(topic) when topic in @valid_topics do diff --git a/lib/jellyfish/room.ex b/lib/jellyfish/room.ex index 25df4adc..dc6fb1ea 100644 --- a/lib/jellyfish/room.ex +++ b/lib/jellyfish/room.ex @@ -19,13 +19,11 @@ defmodule Jellyfish.Room do :id, :config, :engine_pid, - :network_options, - :webhook_url + :network_options ] defstruct @enforce_keys ++ [components: %{}, peers: %{}] @type id :: String.t() - @type webhook_url :: non_neg_integer() | nil @type max_peers :: non_neg_integer() | nil @type video_codec :: :h264 | :vp8 | nil @@ -36,7 +34,6 @@ defmodule Jellyfish.Room do * `components` - map of components * `peers` - map of peers * `engine` - pid of engine - * `webhook_url` - url where notifcations from this room should be sent """ @type t :: %__MODULE__{ id: id(), @@ -48,18 +45,15 @@ defmodule Jellyfish.Room do components: %{Component.id() => Component.t()}, peers: %{Peer.id() => Peer.t()}, engine_pid: pid(), - network_options: map(), - webhook_url: String.t() + network_options: map() } - @spec start(max_peers(), video_codec(), String.t()) :: {:ok, pid(), id()} - def start(max_peers, video_codec, webhook_url) do + @spec start(max_peers(), video_codec()) :: {:ok, pid(), id()} + def start(max_peers, video_codec) do id = UUID.uuid4() {:ok, pid} = - GenServer.start(__MODULE__, [id, max_peers, video_codec, webhook_url], - name: registry_id(id) - ) + GenServer.start(__MODULE__, [id, max_peers, video_codec], name: registry_id(id)) {:ok, pid, id} end @@ -132,8 +126,8 @@ defmodule Jellyfish.Room do end @impl true - def init([id, max_peers, video_codec, webhook_url]) do - state = new(id, max_peers, video_codec, webhook_url) + def init([id, max_peers, video_codec]) do + state = new(id, max_peers, video_codec) Logger.metadata(room_id: id) Logger.info("Initialize room") @@ -298,18 +292,6 @@ defmodule Jellyfish.Room do {:noreply, state} end - @impl true - def handle_cast({:room_notification, _notification}, state) when is_nil(state.webhook_url) do - {:noreply, state} - end - - @impl true - def handle_cast({:room_notification, notification}, state) do - send_webhook_notification(notification, state.webhook_url) - - {:noreply, state} - end - @impl true def handle_info(%Message.EndpointMessage{endpoint_id: to, message: {:media_event, data}}, state) do with {:ok, peer} <- Map.fetch(state.peers, to), @@ -335,7 +317,7 @@ defmodule Jellyfish.Room do Logger.error("RTC Engine endpoint #{inspect(endpoint_id)} crashed") if Map.has_key?(state.peers, endpoint_id) do - Event.broadcast_server_notification({:peer_crashed, state.id, endpoint_id}, state.id) + Event.broadcast_server_notification({:peer_crashed, state.id, endpoint_id}) peer = Map.fetch!(state.peers, endpoint_id) @@ -343,7 +325,7 @@ defmodule Jellyfish.Room do send(peer.socket_pid, {:stop_connection, :endpoint_crashed}) end else - Event.broadcast_server_notification({:component_crashed, state.id, endpoint_id}, state.id) + Event.broadcast_server_notification({:component_crashed, state.id, endpoint_id}) component = Map.get(state.components, endpoint_id) if component.type == Component.HLS, do: remove_hls_processes(state.id, component.metadata) @@ -378,7 +360,7 @@ defmodule Jellyfish.Room do if type == Component.HLS, do: id end) - Event.broadcast_server_notification({:hls_playable, state.id, endpoint_id}, state.id) + Event.broadcast_server_notification({:hls_playable, state.id, endpoint_id}) state = update_in(state, [:components, endpoint_id, :metadata], &Map.put(&1, :playable, true)) {:noreply, state} @@ -400,21 +382,10 @@ defmodule Jellyfish.Room do :ok end - def registry_id(room_id), do: {:via, Registry, {Jellyfish.RoomRegistry, room_id}} - - def send_webhook_notification(notification, webhook_url) when not is_nil(webhook_url) do - case HTTPoison.post(webhook_url, Jason.encode!(%{notification: notification})) do - {:ok, _result} -> - nil - - {:error, error} -> - Logger.warning("Sending notification through webhook fails with error: #{inspect(error)}") - end - end - - def send_webhook_notification(_notification, _state), do: :ok + def registry_id(room_id), + do: {:via, Registry, {Jellyfish.RoomRegistry, room_id}} - defp new(id, max_peers, video_codec, webhook_url) do + defp new(id, max_peers, video_codec) do rtc_engine_options = [ id: id ] @@ -448,8 +419,7 @@ defmodule Jellyfish.Room do id: id, config: %{max_peers: max_peers, video_codec: video_codec}, engine_pid: pid, - network_options: [turn_options: turn_options], - webhook_url: webhook_url + network_options: [turn_options: turn_options] } end diff --git a/lib/jellyfish/room_service.ex b/lib/jellyfish/room_service.ex index 2ef7256c..3d6f037a 100644 --- a/lib/jellyfish/room_service.ex +++ b/lib/jellyfish/room_service.ex @@ -7,8 +7,7 @@ defmodule Jellyfish.RoomService do require Logger - alias Jellyfish.Event - alias Jellyfish.Room + alias Jellyfish.{Event, Room, WebhookNotifier} def start_link(args) do GenServer.start_link(__MODULE__, args, name: __MODULE__) @@ -17,7 +16,7 @@ defmodule Jellyfish.RoomService do @spec find_room(Room.id()) :: {:ok, pid()} | {:error, :room_not_found} def find_room(room_id) do case Registry.lookup(Jellyfish.RoomRegistry, room_id) do - [{room_pid, nil}] -> + [{room_pid, _webhook_url}] -> {:ok, room_pid} _not_found -> @@ -121,7 +120,7 @@ defmodule Jellyfish.RoomService do @impl true def init(_opts) do - {:ok, %{rooms: %{}, rooms_webhook: %{}}, {:continue, nil}} + {:ok, %{rooms: %{}}, {:continue, nil}} end @impl true @@ -135,17 +134,18 @@ defmodule Jellyfish.RoomService do with :ok <- validate_max_peers(max_peers), {:ok, video_codec} <- codec_to_atom(video_codec), :ok <- validate_webhook_url(webhook_url) do - {:ok, room_pid, room_id} = Room.start(max_peers, video_codec, webhook_url) + {:ok, room_pid, room_id} = Room.start(max_peers, video_codec) room = Room.get_state(room_id) Process.monitor(room_pid) state = put_in(state, [:rooms, room_pid], room_id) - state = put_in(state, [:rooms_webhook, room_id], webhook_url) + + WebhookNotifier.add_webhook(room_id, webhook_url) Logger.info("Created room #{inspect(room.id)}") - Event.broadcast_server_notification({:room_created, room_id}, room_id) + Event.broadcast_server_notification({:room_created, room_id}) {:reply, {:ok, room, Application.fetch_env!(:jellyfish, :address)}, state} else @@ -165,7 +165,7 @@ defmodule Jellyfish.RoomService do response = case find_room(room_id) do {:ok, _room_pid} -> - remove_room(room_id, state) + remove_room(room_id) :ok {:error, _} -> @@ -189,12 +189,11 @@ defmodule Jellyfish.RoomService do @impl true def handle_info({:DOWN, _ref, :process, pid, reason}, state) do {room_id, state} = pop_in(state, [:rooms, pid]) - {webhook_url, state} = pop_in(state, [:rooms_webhook, room_id]) Logger.warning("Process #{room_id} is down with reason: #{reason}") Phoenix.PubSub.broadcast(Jellyfish.PubSub, room_id, :room_crashed) - Event.broadcast_server_notification({:room_crashed, room_id}, webhook_url) + Event.broadcast_server_notification({:room_crashed, room_id}) {:noreply, state} end @@ -222,15 +221,14 @@ defmodule Jellyfish.RoomService do |> Registry.select([{{:"$1", :_, :_}, [], [:"$1"]}]) end - defp remove_room(room_id, state) do + defp remove_room(room_id) do room = Room.registry_id(room_id) try do :ok = GenServer.stop(room, :normal) Logger.info("Deleted room #{inspect(room_id)}") - webhook_url = get_in(state, [:rooms_webhook, room_id]) - Event.broadcast_server_notification({:room_deleted, room_id}, webhook_url) + Event.broadcast_server_notification({:room_deleted, room_id}) catch :exit, {:noproc, {GenServer, :stop, [^room, :normal, :infinity]}} -> Logger.warning("Room process with id #{inspect(room_id)} doesn't exist") diff --git a/lib/jellyfish/webhook_notifier.ex b/lib/jellyfish/webhook_notifier.ex new file mode 100644 index 00000000..09a9ad56 --- /dev/null +++ b/lib/jellyfish/webhook_notifier.ex @@ -0,0 +1,66 @@ +defmodule Jellyfish.WebhookNotifier do + @moduledoc """ + Module responsible for sending notifications to webhooks. + """ + + use GenServer + + require Logger + + alias Jellyfish.Event + alias Jellyfish.ServerMessage + + def start_link(args) do + GenServer.start_link(__MODULE__, args, name: __MODULE__) + end + + def add_webhook(room_id, webhook_url) do + GenServer.cast(__MODULE__, {:add_room_webhook, room_id, webhook_url}) + end + + @impl true + def init(_opts) do + {:ok, %{}, {:continue, nil}} + end + + @impl true + def handle_continue(_continue_arg, state) do + :ok = Event.subscribe(:server_notification) + {:noreply, state} + end + + @impl true + def handle_cast({:add_room_webhook, room_id, webhook_url}, state) do + {:noreply, Map.put(state, room_id, webhook_url)} + end + + @impl true + def handle_info(msg, state) do + {atom, %{room_id: room_id}} = content = Event.to_proto(msg) + notification = %ServerMessage{content: content} |> ServerMessage.encode() + + webhook_url = Map.get(state, room_id) + send_webhook_notification(notification, webhook_url) + + state = + if atom in [:room_crashed, :room_deleted] do + Map.delete(state, room_id) + else + state + end + + {:noreply, state} + end + + defp send_webhook_notification(notification, webhook_url) when not is_nil(webhook_url) do + case HTTPoison.post(webhook_url, Jason.encode!(%{notification: notification})) do + {:ok, _result} -> + nil + + {:error, error} -> + Logger.warning("Sending notification through webhook fails with error: #{inspect(error)}") + end + end + + defp send_webhook_notification(_notification, _webhook_url), do: :ok +end diff --git a/lib/jellyfish_web/api_spec/room.ex b/lib/jellyfish_web/api_spec/room.ex index 83670d67..94ae147d 100644 --- a/lib/jellyfish_web/api_spec/room.ex +++ b/lib/jellyfish_web/api_spec/room.ex @@ -28,7 +28,7 @@ defmodule JellyfishWeb.ApiSpec.Room do nullable: true }, webhookUrl: %Schema{ -description: "URL where Jellyfish notifications will be sent", + description: "URL where Jellyfish notifications will be sent", type: :string, example: "https://backend.address.com/jellyfish-notifications-endpoint", nullable: true diff --git a/lib/jellyfish_web/controllers/room_controller.ex b/lib/jellyfish_web/controllers/room_controller.ex index 71e764dd..32142d98 100644 --- a/lib/jellyfish_web/controllers/room_controller.ex +++ b/lib/jellyfish_web/controllers/room_controller.ex @@ -89,7 +89,7 @@ defmodule JellyfishWeb.RoomController do {:error, :invalid_webhook_url} -> webhook_url = Map.get(params, "webhookUrl") -{:error, :bad_request, "webhookUrl must be valid URL, received URL was: #{webhook_url}"} + {:error, :bad_request, "webhookUrl must be valid URL, received URL was: #{webhook_url}"} end end diff --git a/lib/jellyfish_web/peer_socket.ex b/lib/jellyfish_web/peer_socket.ex index b423908f..1885a7ec 100644 --- a/lib/jellyfish_web/peer_socket.ex +++ b/lib/jellyfish_web/peer_socket.ex @@ -48,7 +48,7 @@ defmodule JellyfishWeb.PeerSocket do room_pid: room_pid }) - Event.broadcast_server_notification({:peer_connected, room_id, peer_id}, room_id) + Event.broadcast_server_notification({:peer_connected, room_id, peer_id}) {:reply, :ok, {:binary, encoded_message}, state} else @@ -135,10 +135,7 @@ defmodule JellyfishWeb.PeerSocket do """) if Map.has_key?(state, :peer_id) do - Event.broadcast_server_notification( - {:peer_disconnected, state.room_id, state.peer_id}, - state.room_id - ) + Event.broadcast_server_notification({:peer_disconnected, state.room_id, state.peer_id}) end :ok From 5ef36c0796c985bc38a3864714062c523e80f74c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Thu, 12 Oct 2023 12:25:06 +0200 Subject: [PATCH 07/13] New api.spec --- openapi.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openapi.yaml b/openapi.yaml index 0ceba3f0..a8946da8 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -320,7 +320,7 @@ components: nullable: true type: string webhookUrl: - description: Address on which HTTP requests will be send with jellyfish notifications + description: URL where Jellyfish notifications will be sent example: https://backend.address.com/jellyfish-notifications-endpoint nullable: true type: string From 41a2c815b52d6a459f9a29ed272b02fd6c4ec212 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Mon, 16 Oct 2023 08:40:10 +0200 Subject: [PATCH 08/13] Add unsubscribe in setup --- lib/jellyfish/webhook_notifier.ex | 4 +++- test/jellyfish_web/integration/server_socket_test.exs | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/jellyfish/webhook_notifier.ex b/lib/jellyfish/webhook_notifier.ex index 09a9ad56..4eff92ab 100644 --- a/lib/jellyfish/webhook_notifier.ex +++ b/lib/jellyfish/webhook_notifier.ex @@ -58,7 +58,9 @@ defmodule Jellyfish.WebhookNotifier do nil {:error, error} -> - Logger.warning("Sending notification through webhook fails with error: #{inspect(error)}") + Logger.warning( + "Sending notification through webhook fails with error: #{inspect(error)} on address #{webhook_url}" + ) end end diff --git a/test/jellyfish_web/integration/server_socket_test.exs b/test/jellyfish_web/integration/server_socket_test.exs index fb486b90..b6c6166c 100644 --- a/test/jellyfish_web/integration/server_socket_test.exs +++ b/test/jellyfish_web/integration/server_socket_test.exs @@ -85,6 +85,8 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do conn = delete(conn, ~p"/room/#{id}") assert response(conn, 204) end) + + :ok = PubSub.unsubscribe(@pubsub, "webhook") end) end From f9563f7b16a1494e972149388fd8d1c4578964fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Tue, 17 Oct 2023 12:14:05 +0200 Subject: [PATCH 09/13] Move broadcasting PeerDisconnected to Room --- lib/jellyfish/room.ex | 9 +-- lib/jellyfish/room_service.ex | 4 +- lib/jellyfish/webhook_notifier.ex | 15 ++--- .../controllers/room_controller.ex | 8 ++- lib/jellyfish_web/peer_socket.ex | 8 +-- .../controllers/room_controller_test.exs | 4 +- .../integration/server_socket_test.exs | 62 +++++++++++++++---- test/support/ws.ex | 5 ++ 8 files changed, 75 insertions(+), 40 deletions(-) diff --git a/lib/jellyfish/room.ex b/lib/jellyfish/room.ex index dc6fb1ea..367eb41e 100644 --- a/lib/jellyfish/room.ex +++ b/lib/jellyfish/room.ex @@ -52,8 +52,7 @@ defmodule Jellyfish.Room do def start(max_peers, video_codec) do id = UUID.uuid4() - {:ok, pid} = - GenServer.start(__MODULE__, [id, max_peers, video_codec], name: registry_id(id)) + {:ok, pid} = GenServer.start(__MODULE__, [id, max_peers, video_codec], name: registry_id(id)) {:ok, pid, id} end @@ -120,11 +119,6 @@ defmodule Jellyfish.Room do GenServer.cast(registry_id(room_id), {:media_event, peer_id, event}) end - @spec receive_room_notification(id(), any()) :: :ok - def receive_room_notification(room_id, notification) do - GenServer.cast(registry_id(room_id), {:room_notification, notification}) - end - @impl true def init([id, max_peers, video_codec]) do state = new(id, max_peers, video_codec) @@ -343,6 +337,7 @@ defmodule Jellyfish.Room do {peer_id, peer} -> :ok = Engine.remove_endpoint(state.engine_pid, peer_id) + Event.broadcast_server_notification({:peer_disconnected, state.id, peer_id}) peer = %{peer | status: :disconnected, socket_pid: nil} put_in(state, [:peers, peer_id], peer) end diff --git a/lib/jellyfish/room_service.ex b/lib/jellyfish/room_service.ex index 3d6f037a..9998d6e1 100644 --- a/lib/jellyfish/room_service.ex +++ b/lib/jellyfish/room_service.ex @@ -155,7 +155,7 @@ defmodule Jellyfish.RoomService do {:error, :video_codec} -> {:reply, {:error, :invalid_video_codec}, state} - {:error, :not_valid_url} -> + {:error, :invalid_webhook_url} -> {:reply, {:error, :invalid_webhook_url}, state} end end @@ -249,7 +249,7 @@ defmodule Jellyfish.RoomService do |> if do :ok else - {:error, :not_valid_url} + {:error, :invalid_webhook_url} end end diff --git a/lib/jellyfish/webhook_notifier.ex b/lib/jellyfish/webhook_notifier.ex index 4eff92ab..710f3e6f 100644 --- a/lib/jellyfish/webhook_notifier.ex +++ b/lib/jellyfish/webhook_notifier.ex @@ -1,6 +1,6 @@ defmodule Jellyfish.WebhookNotifier do @moduledoc """ - Module responsible for sending notifications to webhooks. + Module responsible for sending notifications through webhooks. """ use GenServer @@ -20,13 +20,8 @@ defmodule Jellyfish.WebhookNotifier do @impl true def init(_opts) do - {:ok, %{}, {:continue, nil}} - end - - @impl true - def handle_continue(_continue_arg, state) do :ok = Event.subscribe(:server_notification) - {:noreply, state} + {:ok, %{}} end @impl true @@ -36,14 +31,14 @@ defmodule Jellyfish.WebhookNotifier do @impl true def handle_info(msg, state) do - {atom, %{room_id: room_id}} = content = Event.to_proto(msg) + {event_type, %{room_id: room_id}} = content = Event.to_proto(msg) notification = %ServerMessage{content: content} |> ServerMessage.encode() webhook_url = Map.get(state, room_id) send_webhook_notification(notification, webhook_url) state = - if atom in [:room_crashed, :room_deleted] do + if event_type in [:room_crashed, :room_deleted] do Map.delete(state, room_id) else state @@ -59,7 +54,7 @@ defmodule Jellyfish.WebhookNotifier do {:error, error} -> Logger.warning( - "Sending notification through webhook fails with error: #{inspect(error)} on address #{webhook_url}" + "Couldn't send notification through webhook: #{webhook_url}, reason: #{inspect(error)}" ) end end diff --git a/lib/jellyfish_web/controllers/room_controller.ex b/lib/jellyfish_web/controllers/room_controller.ex index 32142d98..3802fba7 100644 --- a/lib/jellyfish_web/controllers/room_controller.ex +++ b/lib/jellyfish_web/controllers/room_controller.ex @@ -82,10 +82,14 @@ defmodule JellyfishWeb.RoomController do |> render("show.json", room: room, jellyfish_address: jellyfish_address) else {:error, :invalid_max_peers} -> - {:error, :bad_request, "maxPeers must be a number"} + max_peers = Map.get(params, "maxPeers") + {:error, :bad_request, "maxPeers must be a number received maxPeers was: #{max_peers}"} {:error, :invalid_video_codec} -> - {:error, :bad_request, "videoCodec must be 'h264' or 'vp8'"} + video_codec = Map.get(params, "videoCodec") + + {:error, :bad_request, + "videoCodec must be 'h264' or 'vp8' received codec was: #{video_codec}"} {:error, :invalid_webhook_url} -> webhook_url = Map.get(params, "webhookUrl") diff --git a/lib/jellyfish_web/peer_socket.ex b/lib/jellyfish_web/peer_socket.ex index 1885a7ec..75844a2c 100644 --- a/lib/jellyfish_web/peer_socket.ex +++ b/lib/jellyfish_web/peer_socket.ex @@ -130,14 +130,10 @@ defmodule JellyfishWeb.PeerSocket do @impl true def terminate(_reason, state) do Logger.info(""" - WebSocket associated with peer #{inspect(Map.get(state, :peer_id, ""))} stopped, \ - room: #{inspect(Map.get(state, :room_id, ""))} + WebSocket associated with peer #{inspect(state.peer_id)} stopped, \ + room: #{inspect(state.room_id)} """) - if Map.has_key?(state, :peer_id) do - Event.broadcast_server_notification({:peer_disconnected, state.room_id, state.peer_id}) - end - :ok end diff --git a/test/jellyfish_web/controllers/room_controller_test.exs b/test/jellyfish_web/controllers/room_controller_test.exs index 18983ba2..07bb0d9d 100644 --- a/test/jellyfish_web/controllers/room_controller_test.exs +++ b/test/jellyfish_web/controllers/room_controller_test.exs @@ -99,12 +99,12 @@ defmodule JellyfishWeb.RoomControllerTest do conn = post(conn, ~p"/room", maxPeers: "nan") assert json_response(conn, :bad_request)["errors"] == - "maxPeers must be a number" + "maxPeers must be a number received maxPeers was: nan" conn = post(conn, ~p"/room", videoCodec: "nan") assert json_response(conn, :bad_request)["errors"] == - "videoCodec must be 'h264' or 'vp8'" + "videoCodec must be 'h264' or 'vp8' received codec was: nan" end end diff --git a/test/jellyfish_web/integration/server_socket_test.exs b/test/jellyfish_web/integration/server_socket_test.exs index b6c6166c..fad9a432 100644 --- a/test/jellyfish_web/integration/server_socket_test.exs +++ b/test/jellyfish_web/integration/server_socket_test.exs @@ -195,6 +195,55 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do end test "sends a message when peer connects", %{conn: conn} do + {room_id, peer_id, conn} = prepare_for_server_notification_test(conn) + + conn = delete(conn, ~p"/room/#{room_id}") + assert response(conn, :no_content) + + assert_receive %RoomDeleted{room_id: ^room_id} + + assert_receive {:room_deleted, %RoomDeleted{room_id: ^room_id}}, + 1_000 + + refute_received %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id} + refute_received {:peer_disconnected, %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}} + end + + test "sends a message when peer connects and room crashes", %{conn: conn} do + {room_id, peer_id, _conn} = prepare_for_server_notification_test(conn) + {:ok, room_pid} = Jellyfish.RoomService.find_room(room_id) + + Process.exit(room_pid, :kill) + + assert_receive %RoomCrashed{room_id: ^room_id} + + assert_receive {:room_crashed, %RoomCrashed{room_id: ^room_id}}, + 1_000 + + refute_received %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id} + refute_received {:peer_disconnected, %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}} + end + + test "sends a message when peer connects and it crashes", %{conn: conn} do + {room_id, peer_id, conn} = prepare_for_server_notification_test(conn) + + {:ok, room_pid} = Jellyfish.RoomService.find_room(room_id) + + state = :sys.get_state(room_pid) + + peer_socket_pid = state.peers[peer_id].socket_pid + + Process.exit(peer_socket_pid, :kill) + + assert_receive %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id} + + assert_receive {:peer_disconnected, %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}}, + 2_000 + + delete(conn, ~p"/room/#{room_id}") + end + + def prepare_for_server_notification_test(conn) do server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) ws = create_and_authenticate() @@ -203,23 +252,14 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do {room_id, peer_id, peer_token, conn} = add_room_and_peer(conn, server_api_token) - {:ok, peer_ws} = WS.start_link("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) + {:ok, peer_ws} = WS.start("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) auth_request = peer_auth_request(peer_token) :ok = WS.send_binary_frame(peer_ws, auth_request) assert_receive %PeerConnected{peer_id: ^peer_id, room_id: ^room_id} assert_receive {:peer_connected, %PeerConnected{peer_id: ^peer_id, room_id: ^room_id}}, 1_000 - conn = delete(conn, ~p"/room/#{room_id}/peer/#{peer_id}") - assert response(conn, :no_content) - - assert_receive %PeerDisconnected{peer_id: ^peer_id, room_id: ^room_id} - - assert_receive {:peer_disconnected, %PeerDisconnected{peer_id: ^peer_id, room_id: ^room_id}}, - 1_000 - - conn = delete(conn, ~p"/room/#{room_id}") - assert response(conn, :no_content) + {room_id, peer_id, conn} end def create_and_authenticate() do diff --git a/test/support/ws.ex b/test/support/ws.ex index e2c7e6f5..03c96e4d 100644 --- a/test/support/ws.ex +++ b/test/support/ws.ex @@ -11,6 +11,11 @@ defmodule JellyfishWeb.WS do WebSockex.start_link(url, __MODULE__, state) end + def start(url, type) do + state = %{caller: self(), type: type} + WebSockex.start(url, __MODULE__, state) + end + def send_frame(ws, msg) do WebSockex.send_frame(ws, {:text, Jason.encode!(msg)}) end From 68881c01887be87dadb0bc3b56c718639dc7ab06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Wed, 18 Oct 2023 10:19:17 +0200 Subject: [PATCH 10/13] Rename server_socket_test to server_notification_test --- .../controllers/room_controller.ex | 8 +- .../controllers/room_controller_test.exs | 9 +- ..._test.exs => server_notification_test.exs} | 98 +++++++++---------- test/support/webhook_plug.ex | 4 +- test/support/ws.ex | 8 +- 5 files changed, 63 insertions(+), 64 deletions(-) rename test/jellyfish_web/integration/{server_socket_test.exs => server_notification_test.exs} (84%) diff --git a/lib/jellyfish_web/controllers/room_controller.ex b/lib/jellyfish_web/controllers/room_controller.ex index 3802fba7..78ec984e 100644 --- a/lib/jellyfish_web/controllers/room_controller.ex +++ b/lib/jellyfish_web/controllers/room_controller.ex @@ -83,17 +83,17 @@ defmodule JellyfishWeb.RoomController do else {:error, :invalid_max_peers} -> max_peers = Map.get(params, "maxPeers") - {:error, :bad_request, "maxPeers must be a number received maxPeers was: #{max_peers}"} + + {:error, :bad_request, "Expected maxPeers to be a number, got: #{max_peers}"} {:error, :invalid_video_codec} -> video_codec = Map.get(params, "videoCodec") - {:error, :bad_request, - "videoCodec must be 'h264' or 'vp8' received codec was: #{video_codec}"} + {:error, :bad_request, "Expected videoCodec to be 'h264' or 'vp8', got: #{video_codec}"} {:error, :invalid_webhook_url} -> webhook_url = Map.get(params, "webhookUrl") - {:error, :bad_request, "webhookUrl must be valid URL, received URL was: #{webhook_url}"} + {:error, :bad_request, "Expected webhookUrl to be valid URL, got: #{webhook_url}"} end end diff --git a/test/jellyfish_web/controllers/room_controller_test.exs b/test/jellyfish_web/controllers/room_controller_test.exs index 07bb0d9d..f8373c51 100644 --- a/test/jellyfish_web/controllers/room_controller_test.exs +++ b/test/jellyfish_web/controllers/room_controller_test.exs @@ -99,12 +99,17 @@ defmodule JellyfishWeb.RoomControllerTest do conn = post(conn, ~p"/room", maxPeers: "nan") assert json_response(conn, :bad_request)["errors"] == - "maxPeers must be a number received maxPeers was: nan" + "Expected maxPeers to be a number, got: nan" conn = post(conn, ~p"/room", videoCodec: "nan") assert json_response(conn, :bad_request)["errors"] == - "videoCodec must be 'h264' or 'vp8' received codec was: nan" + "Expected videoCodec to be 'h264' or 'vp8', got: nan" + + conn = post(conn, ~p"/room", webhookUrl: "nan") + + assert json_response(conn, :bad_request)["errors"] == + "Expected webhookUrl to be valid URL, got: nan" end end diff --git a/test/jellyfish_web/integration/server_socket_test.exs b/test/jellyfish_web/integration/server_notification_test.exs similarity index 84% rename from test/jellyfish_web/integration/server_socket_test.exs rename to test/jellyfish_web/integration/server_notification_test.exs index fad9a432..8117d230 100644 --- a/test/jellyfish_web/integration/server_socket_test.exs +++ b/test/jellyfish_web/integration/server_notification_test.exs @@ -1,4 +1,4 @@ -defmodule JellyfishWeb.Integration.ServerSocketTest do +defmodule JellyfishWeb.Integration.ServerNotificationTest do use JellyfishWeb.ConnCase alias __MODULE__.Endpoint @@ -25,7 +25,7 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do alias Phoenix.PubSub @port 5907 - @webhook_port 2137 + @webhook_port 2929 @webhook_url "http://127.0.0.1:#{@webhook_port}/" @path "ws://127.0.0.1:#{@port}/socket/server/websocket" @auth_response %Authenticated{} @@ -141,7 +141,10 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do send(room_pid, {:playlist_playable, :video, "hls_output/#{room_id}"}) assert_receive %HlsPlayable{room_id: ^room_id, component_id: ^hls_id} - assert_receive {:hls_playable, %HlsPlayable{room_id: ^room_id, component_id: ^hls_id}}, 1_000 + + assert_receive {:webhook_notification, + %HlsPlayable{room_id: ^room_id, component_id: ^hls_id}}, + 1_000 conn = delete(conn, ~p"/room/#{room_id}/") assert response(conn, :no_content) @@ -167,65 +170,51 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do assert %{"id" => room_id} = json_response(conn, :created)["data"]["room"] assert_receive %RoomCreated{room_id: ^room_id} - assert_receive {:room_created, %RoomCreated{room_id: ^room_id}}, 1_000 + assert_receive {:webhook_notification, %RoomCreated{room_id: ^room_id}}, 1_000 conn = delete(conn, ~p"/room/#{room_id}") assert response(conn, :no_content) assert_receive %RoomDeleted{room_id: ^room_id} - assert_receive {:room_deleted, %RoomDeleted{room_id: ^room_id}}, 1_000 - end - - test "sends a message when room crashes", %{conn: conn} do - server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) - ws = create_and_authenticate() - - subscribe(ws, :server_notification) - - conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token) - - conn = post(conn, ~p"/room", maxPeers: 1, webhookUrl: @webhook_url) - assert %{"id" => room_id} = json_response(conn, :created)["data"]["room"] - {:ok, room_pid} = Jellyfish.RoomService.find_room(room_id) - - Process.exit(room_pid, :kill) - - assert_receive %RoomCrashed{room_id: ^room_id} - assert_receive {:room_crashed, %RoomCrashed{room_id: ^room_id}}, 2_000 + assert_receive {:webhook_notification, %RoomDeleted{room_id: ^room_id}}, 1_000 end test "sends a message when peer connects", %{conn: conn} do - {room_id, peer_id, conn} = prepare_for_server_notification_test(conn) + {room_id, peer_id, conn} = subscribe_on_notifications_and_connect_peer(conn) conn = delete(conn, ~p"/room/#{room_id}") assert response(conn, :no_content) assert_receive %RoomDeleted{room_id: ^room_id} - assert_receive {:room_deleted, %RoomDeleted{room_id: ^room_id}}, + assert_receive {:webhook_notification, %RoomDeleted{room_id: ^room_id}}, 1_000 refute_received %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id} - refute_received {:peer_disconnected, %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}} + + refute_received {:webhook_notification, + %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}} end test "sends a message when peer connects and room crashes", %{conn: conn} do - {room_id, peer_id, _conn} = prepare_for_server_notification_test(conn) + {room_id, peer_id, _conn} = subscribe_on_notifications_and_connect_peer(conn) {:ok, room_pid} = Jellyfish.RoomService.find_room(room_id) Process.exit(room_pid, :kill) assert_receive %RoomCrashed{room_id: ^room_id} - assert_receive {:room_crashed, %RoomCrashed{room_id: ^room_id}}, + assert_receive {:webhook_notification, %RoomCrashed{room_id: ^room_id}}, 1_000 refute_received %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id} - refute_received {:peer_disconnected, %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}} + + refute_received {:webhook_notification, + %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}} end test "sends a message when peer connects and it crashes", %{conn: conn} do - {room_id, peer_id, conn} = prepare_for_server_notification_test(conn) + {room_id, peer_id, conn} = subscribe_on_notifications_and_connect_peer(conn) {:ok, room_pid} = Jellyfish.RoomService.find_room(room_id) @@ -237,64 +226,67 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do assert_receive %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id} - assert_receive {:peer_disconnected, %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}}, + assert_receive {:webhook_notification, + %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}}, 2_000 delete(conn, ~p"/room/#{room_id}") end - def prepare_for_server_notification_test(conn) do + test "sends metrics", %{conn: conn} do server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) ws = create_and_authenticate() subscribe(ws, :server_notification) - {room_id, peer_id, peer_token, conn} = - add_room_and_peer(conn, server_api_token) + {room_id, peer_id, peer_token, _conn} = add_room_and_peer(conn, server_api_token) - {:ok, peer_ws} = WS.start("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) + {:ok, peer_ws} = WS.start_link("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) auth_request = peer_auth_request(peer_token) :ok = WS.send_binary_frame(peer_ws, auth_request) assert_receive %PeerConnected{peer_id: ^peer_id, room_id: ^room_id} - assert_receive {:peer_connected, %PeerConnected{peer_id: ^peer_id, room_id: ^room_id}}, 1_000 - {room_id, peer_id, conn} - end + subscribe(ws, :metrics) + assert_receive %MetricsReport{metrics: metrics} when metrics != "{}", 500 - def create_and_authenticate() do - token = Application.fetch_env!(:jellyfish, :server_api_token) - auth_request = auth_request(token) + metrics = Jason.decode!(metrics) - {:ok, ws} = WS.start_link(@path, :server) - :ok = WS.send_binary_frame(ws, auth_request) - assert_receive @auth_response, 1000 + [endpoint_id | _rest] = metrics["room_id=#{room_id}"] |> Map.keys() - ws + assert String.contains?(endpoint_id, "endpoint_id") end - test "sends metrics", %{conn: conn} do + def subscribe_on_notifications_and_connect_peer(conn) do server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) ws = create_and_authenticate() subscribe(ws, :server_notification) - {room_id, peer_id, peer_token, _conn} = add_room_and_peer(conn, server_api_token) + {room_id, peer_id, peer_token, conn} = + add_room_and_peer(conn, server_api_token) - {:ok, peer_ws} = WS.start_link("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) + {:ok, peer_ws} = WS.start("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) auth_request = peer_auth_request(peer_token) :ok = WS.send_binary_frame(peer_ws, auth_request) assert_receive %PeerConnected{peer_id: ^peer_id, room_id: ^room_id} - subscribe(ws, :metrics) - assert_receive %MetricsReport{metrics: metrics} when metrics != "{}", 500 + assert_receive {:webhook_notification, %PeerConnected{peer_id: ^peer_id, room_id: ^room_id}}, + 1_000 - metrics = Jason.decode!(metrics) + {room_id, peer_id, conn} + end - [endpoint_id | _rest] = metrics["room_id=#{room_id}"] |> Map.keys() + def create_and_authenticate() do + token = Application.fetch_env!(:jellyfish, :server_api_token) + auth_request = auth_request(token) - assert String.contains?(endpoint_id, "endpoint_id") + {:ok, ws} = WS.start_link(@path, :server) + :ok = WS.send_binary_frame(ws, auth_request) + assert_receive @auth_response, 1000 + + ws end def subscribe(ws, event_type) do diff --git a/test/support/webhook_plug.ex b/test/support/webhook_plug.ex index 97593300..502dad40 100644 --- a/test/support/webhook_plug.ex +++ b/test/support/webhook_plug.ex @@ -19,7 +19,9 @@ defmodule WebHookPlug do notification = notification |> Map.get("notification") |> ServerMessage.decode() |> Map.get(:content) - :ok = PubSub.broadcast(@pubsub, "webhook", notification) + {_notification_type, notification} = notification + + :ok = PubSub.broadcast(@pubsub, "webhook", {:webhook_notification, notification}) conn |> put_resp_content_type("text/plain") diff --git a/test/support/ws.ex b/test/support/ws.ex index 03c96e4d..09787e9e 100644 --- a/test/support/ws.ex +++ b/test/support/ws.ex @@ -6,14 +6,14 @@ defmodule JellyfishWeb.WS do alias Jellyfish.PeerMessage alias Jellyfish.ServerMessage - def start_link(url, type) do + def start(url, type) do state = %{caller: self(), type: type} - WebSockex.start_link(url, __MODULE__, state) + WebSockex.start(url, __MODULE__, state) end - def start(url, type) do + def start_link(url, type) do state = %{caller: self(), type: type} - WebSockex.start(url, __MODULE__, state) + WebSockex.start_link(url, __MODULE__, state) end def send_frame(ws, msg) do From 8b6b4e174019189d684252212e8ec218bd4d9dba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Wed, 18 Oct 2023 12:43:32 +0200 Subject: [PATCH 11/13] Rename variable in Registry.lookup --- lib/jellyfish/room_service.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/jellyfish/room_service.ex b/lib/jellyfish/room_service.ex index 9998d6e1..c56f6a45 100644 --- a/lib/jellyfish/room_service.ex +++ b/lib/jellyfish/room_service.ex @@ -16,7 +16,7 @@ defmodule Jellyfish.RoomService do @spec find_room(Room.id()) :: {:ok, pid()} | {:error, :room_not_found} def find_room(room_id) do case Registry.lookup(Jellyfish.RoomRegistry, room_id) do - [{room_pid, _webhook_url}] -> + [{room_pid, _value}] -> {:ok, room_pid} _not_found -> From 45399bb14f17c328c4cd91633483aaaf64f74b43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Thu, 19 Oct 2023 09:34:31 +0200 Subject: [PATCH 12/13] Remove log from terminate function in PeerSocket --- lib/jellyfish_web/peer_socket.ex | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/lib/jellyfish_web/peer_socket.ex b/lib/jellyfish_web/peer_socket.ex index 75844a2c..4bb45eac 100644 --- a/lib/jellyfish_web/peer_socket.ex +++ b/lib/jellyfish_web/peer_socket.ex @@ -128,12 +128,7 @@ defmodule JellyfishWeb.PeerSocket do end @impl true - def terminate(_reason, state) do - Logger.info(""" - WebSocket associated with peer #{inspect(state.peer_id)} stopped, \ - room: #{inspect(state.room_id)} - """) - + def terminate(_reason, _state) do :ok end From 6165296eef765eb3b2c51f66606f765899697d58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rados=C5=82aw=20Szuma?= Date: Thu, 19 Oct 2023 09:54:16 +0200 Subject: [PATCH 13/13] Move registry_id before start in Room module --- lib/jellyfish/room.ex | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/jellyfish/room.ex b/lib/jellyfish/room.ex index 367eb41e..58e99552 100644 --- a/lib/jellyfish/room.ex +++ b/lib/jellyfish/room.ex @@ -48,6 +48,8 @@ defmodule Jellyfish.Room do network_options: map() } + def registry_id(room_id), do: {:via, Registry, {Jellyfish.RoomRegistry, room_id}} + @spec start(max_peers(), video_codec()) :: {:ok, pid(), id()} def start(max_peers, video_codec) do id = UUID.uuid4() @@ -377,9 +379,6 @@ defmodule Jellyfish.Room do :ok end - def registry_id(room_id), - do: {:via, Registry, {Jellyfish.RoomRegistry, room_id}} - defp new(id, max_peers, video_codec) do rtc_engine_options = [ id: id