diff --git a/lib/jellyfish/application.ex b/lib/jellyfish/application.ex index 362bf95d..b181e70e 100644 --- a/lib/jellyfish/application.ex +++ b/lib/jellyfish/application.ex @@ -27,6 +27,7 @@ defmodule Jellyfish.Application do JellyfishWeb.Endpoint, # Start the RoomService Jellyfish.RoomService, + Jellyfish.WebhookNotifier, {Registry, keys: :unique, name: Jellyfish.RoomRegistry}, {Registry, keys: :unique, name: Jellyfish.RequestHandlerRegistry}, # Start the Telemetry supervisor (must be started after Jellyfish.RoomRegistry) @@ -98,7 +99,7 @@ defmodule Jellyfish.Application do Couldn't start epmd daemon. Epmd is required to run Jellyfish in a distributed mode. You can try to start it manually with: - + epmd -daemon and run Jellyfish again. diff --git a/lib/jellyfish/event.ex b/lib/jellyfish/event.ex index 6c982f0c..77505419 100644 --- a/lib/jellyfish/event.ex +++ b/lib/jellyfish/event.ex @@ -16,7 +16,11 @@ defmodule Jellyfish.Event do @pubsub Jellyfish.PubSub @valid_topics [:server_notification, :metrics] - def broadcast(topic, message) when topic in @valid_topics do + def broadcast_metrics(message), do: broadcast(:metrics, message) + + def broadcast_server_notification(message), do: broadcast(:server_notification, message) + + defp broadcast(topic, message) do Phoenix.PubSub.broadcast(@pubsub, Atom.to_string(topic), {topic, message}) end diff --git a/lib/jellyfish/metrics_scraper.ex b/lib/jellyfish/metrics_scraper.ex index c9f8f49e..75eaa7a0 100644 --- a/lib/jellyfish/metrics_scraper.ex +++ b/lib/jellyfish/metrics_scraper.ex @@ -45,7 +45,7 @@ defmodule Jellyfish.MetricsScraper do report |> prepare_report(state) - |> then(&Event.broadcast(:metrics, &1)) + |> then(&Event.broadcast_metrics(&1)) Process.send_after(self(), :scrape, state.scrape_interval) diff --git a/lib/jellyfish/room.ex b/lib/jellyfish/room.ex index 223ba769..58e99552 100644 --- a/lib/jellyfish/room.ex +++ b/lib/jellyfish/room.ex @@ -48,6 +48,8 @@ defmodule Jellyfish.Room do network_options: map() } + def registry_id(room_id), do: {:via, Registry, {Jellyfish.RoomRegistry, room_id}} + @spec start(max_peers(), video_codec()) :: {:ok, pid(), id()} def start(max_peers, video_codec) do id = UUID.uuid4() @@ -311,7 +313,7 @@ defmodule Jellyfish.Room do Logger.error("RTC Engine endpoint #{inspect(endpoint_id)} crashed") if Map.has_key?(state.peers, endpoint_id) do - Event.broadcast(:server_notification, {:peer_crashed, state.id, endpoint_id}) + Event.broadcast_server_notification({:peer_crashed, state.id, endpoint_id}) peer = Map.fetch!(state.peers, endpoint_id) @@ -319,7 +321,7 @@ defmodule Jellyfish.Room do send(peer.socket_pid, {:stop_connection, :endpoint_crashed}) end else - Event.broadcast(:server_notification, {:component_crashed, state.id, endpoint_id}) + Event.broadcast_server_notification({:component_crashed, state.id, endpoint_id}) component = Map.get(state.components, endpoint_id) if component.type == Component.HLS, do: remove_hls_processes(state.id, component.metadata) @@ -337,6 +339,7 @@ defmodule Jellyfish.Room do {peer_id, peer} -> :ok = Engine.remove_endpoint(state.engine_pid, peer_id) + Event.broadcast_server_notification({:peer_disconnected, state.id, peer_id}) peer = %{peer | status: :disconnected, socket_pid: nil} put_in(state, [:peers, peer_id], peer) end @@ -354,7 +357,7 @@ defmodule Jellyfish.Room do if type == Component.HLS, do: id end) - Event.broadcast(:server_notification, {:hls_playable, state.id, endpoint_id}) + Event.broadcast_server_notification({:hls_playable, state.id, endpoint_id}) state = update_in(state, [:components, endpoint_id, :metadata], &Map.put(&1, :playable, true)) {:noreply, state} @@ -430,8 +433,6 @@ defmodule Jellyfish.Room do defp remove_hls_processes(_room_id, _metadata), do: nil - defp registry_id(room_id), do: {:via, Registry, {Jellyfish.RoomRegistry, room_id}} - defp check_component_allowed(Component.HLS, %{ config: %{video_codec: video_codec}, components: components diff --git a/lib/jellyfish/room_service.ex b/lib/jellyfish/room_service.ex index 935a9c7c..c56f6a45 100644 --- a/lib/jellyfish/room_service.ex +++ b/lib/jellyfish/room_service.ex @@ -7,8 +7,7 @@ defmodule Jellyfish.RoomService do require Logger - alias Jellyfish.Event - alias Jellyfish.Room + alias Jellyfish.{Event, Room, WebhookNotifier} def start_link(args) do GenServer.start_link(__MODULE__, args, name: __MODULE__) @@ -17,7 +16,7 @@ defmodule Jellyfish.RoomService do @spec find_room(Room.id()) :: {:ok, pid()} | {:error, :room_not_found} def find_room(room_id) do case Registry.lookup(Jellyfish.RoomRegistry, room_id) do - [{room_pid, nil}] -> + [{room_pid, _value}] -> {:ok, room_pid} _not_found -> @@ -54,9 +53,9 @@ defmodule Jellyfish.RoomService do |> Enum.reject(&(&1 == nil)) end - @spec create_room(Room.max_peers(), String.t()) :: + @spec create_room(Room.max_peers(), String.t(), String.t()) :: {:ok, Room.t(), String.t()} | {:error, :invalid_max_peers | :invalid_video_codec} - def create_room(max_peers, video_codec) do + def create_room(max_peers, video_codec, webhook_url) do {node_resources, failed_nodes} = :rpc.multicall(Jellyfish.RoomService, :get_resource_usage, []) @@ -80,9 +79,9 @@ defmodule Jellyfish.RoomService do if Enum.count(node_resources) > 1 do Logger.info("Node with least used resources is #{inspect(min_node)}") - GenServer.call({__MODULE__, min_node}, {:create_room, max_peers, video_codec}) + GenServer.call({__MODULE__, min_node}, {:create_room, max_peers, video_codec, webhook_url}) else - GenServer.call(__MODULE__, {:create_room, max_peers, video_codec}) + GenServer.call(__MODULE__, {:create_room, max_peers, video_codec, webhook_url}) end end @@ -131,9 +130,10 @@ defmodule Jellyfish.RoomService do end @impl true - def handle_call({:create_room, max_peers, video_codec}, _from, state) do + def handle_call({:create_room, max_peers, video_codec, webhook_url}, _from, state) do with :ok <- validate_max_peers(max_peers), - {:ok, video_codec} <- codec_to_atom(video_codec) do + {:ok, video_codec} <- codec_to_atom(video_codec), + :ok <- validate_webhook_url(webhook_url) do {:ok, room_pid, room_id} = Room.start(max_peers, video_codec) room = Room.get_state(room_id) @@ -141,9 +141,11 @@ defmodule Jellyfish.RoomService do state = put_in(state, [:rooms, room_pid], room_id) + WebhookNotifier.add_webhook(room_id, webhook_url) + Logger.info("Created room #{inspect(room.id)}") - Event.broadcast(:server_notification, {:room_created, room_id}) + Event.broadcast_server_notification({:room_created, room_id}) {:reply, {:ok, room, Application.fetch_env!(:jellyfish, :address)}, state} else @@ -152,6 +154,9 @@ defmodule Jellyfish.RoomService do {:error, :video_codec} -> {:reply, {:error, :invalid_video_codec}, state} + + {:error, :invalid_webhook_url} -> + {:reply, {:error, :invalid_webhook_url}, state} end end @@ -188,7 +193,7 @@ defmodule Jellyfish.RoomService do Logger.warning("Process #{room_id} is down with reason: #{reason}") Phoenix.PubSub.broadcast(Jellyfish.PubSub, room_id, :room_crashed) - Event.broadcast(:server_notification, {:room_crashed, room_id}) + Event.broadcast_server_notification({:room_crashed, room_id}) {:noreply, state} end @@ -217,13 +222,13 @@ defmodule Jellyfish.RoomService do end defp remove_room(room_id) do - room = {:via, Registry, {Jellyfish.RoomRegistry, room_id}} + room = Room.registry_id(room_id) try do :ok = GenServer.stop(room, :normal) Logger.info("Deleted room #{inspect(room_id)}") - Event.broadcast(:server_notification, {:room_deleted, room_id}) + Event.broadcast_server_notification({:room_deleted, room_id}) catch :exit, {:noproc, {GenServer, :stop, [^room, :normal, :infinity]}} -> Logger.warning("Room process with id #{inspect(room_id)} doesn't exist") @@ -234,6 +239,20 @@ defmodule Jellyfish.RoomService do defp validate_max_peers(max_peers) when is_integer(max_peers) and max_peers >= 0, do: :ok defp validate_max_peers(_max_peers), do: {:error, :max_peers} + defp validate_webhook_url(nil), do: :ok + + defp validate_webhook_url(uri) do + uri + |> URI.parse() + |> Map.take([:host, :path, :scheme]) + |> Enum.all?(fn {_key, value} -> not is_nil(value) end) + |> if do + :ok + else + {:error, :invalid_webhook_url} + end + end + defp codec_to_atom("h264"), do: {:ok, :h264} defp codec_to_atom("vp8"), do: {:ok, :vp8} defp codec_to_atom(nil), do: {:ok, nil} diff --git a/lib/jellyfish/webhook_notifier.ex b/lib/jellyfish/webhook_notifier.ex new file mode 100644 index 00000000..710f3e6f --- /dev/null +++ b/lib/jellyfish/webhook_notifier.ex @@ -0,0 +1,63 @@ +defmodule Jellyfish.WebhookNotifier do + @moduledoc """ + Module responsible for sending notifications through webhooks. + """ + + use GenServer + + require Logger + + alias Jellyfish.Event + alias Jellyfish.ServerMessage + + def start_link(args) do + GenServer.start_link(__MODULE__, args, name: __MODULE__) + end + + def add_webhook(room_id, webhook_url) do + GenServer.cast(__MODULE__, {:add_room_webhook, room_id, webhook_url}) + end + + @impl true + def init(_opts) do + :ok = Event.subscribe(:server_notification) + {:ok, %{}} + end + + @impl true + def handle_cast({:add_room_webhook, room_id, webhook_url}, state) do + {:noreply, Map.put(state, room_id, webhook_url)} + end + + @impl true + def handle_info(msg, state) do + {event_type, %{room_id: room_id}} = content = Event.to_proto(msg) + notification = %ServerMessage{content: content} |> ServerMessage.encode() + + webhook_url = Map.get(state, room_id) + send_webhook_notification(notification, webhook_url) + + state = + if event_type in [:room_crashed, :room_deleted] do + Map.delete(state, room_id) + else + state + end + + {:noreply, state} + end + + defp send_webhook_notification(notification, webhook_url) when not is_nil(webhook_url) do + case HTTPoison.post(webhook_url, Jason.encode!(%{notification: notification})) do + {:ok, _result} -> + nil + + {:error, error} -> + Logger.warning( + "Couldn't send notification through webhook: #{webhook_url}, reason: #{inspect(error)}" + ) + end + end + + defp send_webhook_notification(_notification, _webhook_url), do: :ok +end diff --git a/lib/jellyfish_web/api_spec/room.ex b/lib/jellyfish_web/api_spec/room.ex index d1261f49..94ae147d 100644 --- a/lib/jellyfish_web/api_spec/room.ex +++ b/lib/jellyfish_web/api_spec/room.ex @@ -26,6 +26,12 @@ defmodule JellyfishWeb.ApiSpec.Room do type: :string, enum: ["h264", "vp8"], nullable: true + }, + webhookUrl: %Schema{ + description: "URL where Jellyfish notifications will be sent", + type: :string, + example: "https://backend.address.com/jellyfish-notifications-endpoint", + nullable: true } } }) diff --git a/lib/jellyfish_web/controllers/room_controller.ex b/lib/jellyfish_web/controllers/room_controller.ex index 6992e6cc..78ec984e 100644 --- a/lib/jellyfish_web/controllers/room_controller.ex +++ b/lib/jellyfish_web/controllers/room_controller.ex @@ -73,17 +73,27 @@ defmodule JellyfishWeb.RoomController do def create(conn, params) do with max_peers <- Map.get(params, "maxPeers"), video_codec <- Map.get(params, "videoCodec"), - {:ok, room, jellyfish_address} <- RoomService.create_room(max_peers, video_codec) do + webhook_url <- Map.get(params, "webhookUrl"), + {:ok, room, jellyfish_address} <- + RoomService.create_room(max_peers, video_codec, webhook_url) do conn |> put_resp_content_type("application/json") |> put_status(:created) |> render("show.json", room: room, jellyfish_address: jellyfish_address) else {:error, :invalid_max_peers} -> - {:error, :bad_request, "maxPeers must be a number"} + max_peers = Map.get(params, "maxPeers") + + {:error, :bad_request, "Expected maxPeers to be a number, got: #{max_peers}"} {:error, :invalid_video_codec} -> - {:error, :bad_request, "videoCodec must be 'h264' or 'vp8'"} + video_codec = Map.get(params, "videoCodec") + + {:error, :bad_request, "Expected videoCodec to be 'h264' or 'vp8', got: #{video_codec}"} + + {:error, :invalid_webhook_url} -> + webhook_url = Map.get(params, "webhookUrl") + {:error, :bad_request, "Expected webhookUrl to be valid URL, got: #{webhook_url}"} end end diff --git a/lib/jellyfish_web/peer_socket.ex b/lib/jellyfish_web/peer_socket.ex index 7dd5e6cb..4bb45eac 100644 --- a/lib/jellyfish_web/peer_socket.ex +++ b/lib/jellyfish_web/peer_socket.ex @@ -48,7 +48,7 @@ defmodule JellyfishWeb.PeerSocket do room_pid: room_pid }) - Event.broadcast(:server_notification, {:peer_connected, room_id, peer_id}) + Event.broadcast_server_notification({:peer_connected, room_id, peer_id}) {:reply, :ok, {:binary, encoded_message}, state} else @@ -128,16 +128,7 @@ defmodule JellyfishWeb.PeerSocket do end @impl true - def terminate(_reason, state) do - Logger.info(""" - WebSocket associated with peer #{inspect(Map.get(state, :peer_id, ""))} stopped, \ - room: #{inspect(Map.get(state, :room_id, ""))} - """) - - if Map.has_key?(state, :peer_id) do - Event.broadcast(:server_notification, {:peer_disconnected, state.room_id, state.peer_id}) - end - + def terminate(_reason, _state) do :ok end diff --git a/openapi.yaml b/openapi.yaml index f509c065..a8946da8 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -319,6 +319,11 @@ components: - vp8 nullable: true type: string + webhookUrl: + description: URL where Jellyfish notifications will be sent + example: https://backend.address.com/jellyfish-notifications-endpoint + nullable: true + type: string title: RoomConfig type: object x-struct: Elixir.JellyfishWeb.ApiSpec.Room.Config diff --git a/test/jellyfish_web/controllers/room_controller_test.exs b/test/jellyfish_web/controllers/room_controller_test.exs index 18983ba2..f8373c51 100644 --- a/test/jellyfish_web/controllers/room_controller_test.exs +++ b/test/jellyfish_web/controllers/room_controller_test.exs @@ -99,12 +99,17 @@ defmodule JellyfishWeb.RoomControllerTest do conn = post(conn, ~p"/room", maxPeers: "nan") assert json_response(conn, :bad_request)["errors"] == - "maxPeers must be a number" + "Expected maxPeers to be a number, got: nan" conn = post(conn, ~p"/room", videoCodec: "nan") assert json_response(conn, :bad_request)["errors"] == - "videoCodec must be 'h264' or 'vp8'" + "Expected videoCodec to be 'h264' or 'vp8', got: nan" + + conn = post(conn, ~p"/room", webhookUrl: "nan") + + assert json_response(conn, :bad_request)["errors"] == + "Expected webhookUrl to be valid URL, got: nan" end end diff --git a/test/jellyfish_web/integration/server_socket_test.exs b/test/jellyfish_web/integration/server_notification_test.exs similarity index 72% rename from test/jellyfish_web/integration/server_socket_test.exs rename to test/jellyfish_web/integration/server_notification_test.exs index 4d165298..8117d230 100644 --- a/test/jellyfish_web/integration/server_socket_test.exs +++ b/test/jellyfish_web/integration/server_notification_test.exs @@ -1,4 +1,4 @@ -defmodule JellyfishWeb.Integration.ServerSocketTest do +defmodule JellyfishWeb.Integration.ServerNotificationTest do use JellyfishWeb.ConnCase alias __MODULE__.Endpoint @@ -22,10 +22,14 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do } alias JellyfishWeb.{PeerSocket, ServerSocket, WS} + alias Phoenix.PubSub @port 5907 + @webhook_port 2929 + @webhook_url "http://127.0.0.1:#{@webhook_port}/" @path "ws://127.0.0.1:#{@port}/socket/server/websocket" @auth_response %Authenticated{} + @pubsub Jellyfish.PubSub @max_peers 1 @@ -57,10 +61,18 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do setup_all do assert {:ok, _pid} = Endpoint.start_link() + + webserver = + {Plug.Cowboy, plug: WebHookPlug, scheme: :http, options: [port: @webhook_port]} + + {:ok, _pid} = Supervisor.start_link([webserver], strategy: :one_for_one) + :ok end setup(%{conn: conn}) do + :ok = PubSub.subscribe(@pubsub, "webhook") + on_exit(fn -> server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token) @@ -73,6 +85,8 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do conn = delete(conn, ~p"/room/#{id}") assert response(conn, 204) end) + + :ok = PubSub.unsubscribe(@pubsub, "webhook") end) end @@ -126,7 +140,11 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do {:ok, room_pid} = RoomService.find_room(room_id) send(room_pid, {:playlist_playable, :video, "hls_output/#{room_id}"}) - assert_receive %HlsPlayable{room_id: room_id, component_id: ^hls_id} + assert_receive %HlsPlayable{room_id: ^room_id, component_id: ^hls_id} + + assert_receive {:webhook_notification, + %HlsPlayable{room_id: ^room_id, component_id: ^hls_id}}, + 1_000 conn = delete(conn, ~p"/room/#{room_id}/") assert response(conn, :no_content) @@ -148,63 +166,71 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token) - conn = post(conn, ~p"/room", maxPeers: 1) + conn = post(conn, ~p"/room", maxPeers: 1, webhookUrl: @webhook_url) assert %{"id" => room_id} = json_response(conn, :created)["data"]["room"] assert_receive %RoomCreated{room_id: ^room_id} + assert_receive {:webhook_notification, %RoomCreated{room_id: ^room_id}}, 1_000 conn = delete(conn, ~p"/room/#{room_id}") assert response(conn, :no_content) assert_receive %RoomDeleted{room_id: ^room_id} + assert_receive {:webhook_notification, %RoomDeleted{room_id: ^room_id}}, 1_000 end - test "sends a message when room crashes", %{conn: conn} do - server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) - ws = create_and_authenticate() + test "sends a message when peer connects", %{conn: conn} do + {room_id, peer_id, conn} = subscribe_on_notifications_and_connect_peer(conn) - subscribe(ws, :server_notification) + conn = delete(conn, ~p"/room/#{room_id}") + assert response(conn, :no_content) - conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token) + assert_receive %RoomDeleted{room_id: ^room_id} - conn = post(conn, ~p"/room", maxPeers: 1) - assert %{"id" => room_id} = json_response(conn, :created)["data"]["room"] + assert_receive {:webhook_notification, %RoomDeleted{room_id: ^room_id}}, + 1_000 + + refute_received %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id} + + refute_received {:webhook_notification, + %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}} + end + + test "sends a message when peer connects and room crashes", %{conn: conn} do + {room_id, peer_id, _conn} = subscribe_on_notifications_and_connect_peer(conn) {:ok, room_pid} = Jellyfish.RoomService.find_room(room_id) Process.exit(room_pid, :kill) assert_receive %RoomCrashed{room_id: ^room_id} - end - test "sends a message when peer connects", %{conn: conn} do - server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) - ws = create_and_authenticate() + assert_receive {:webhook_notification, %RoomCrashed{room_id: ^room_id}}, + 1_000 - subscribe(ws, :server_notification) + refute_received %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id} - {room_id, peer_id, peer_token, conn} = add_room_and_peer(conn, server_api_token) + refute_received {:webhook_notification, + %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}} + end - {:ok, peer_ws} = WS.start_link("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) - auth_request = peer_auth_request(peer_token) - :ok = WS.send_binary_frame(peer_ws, auth_request) + test "sends a message when peer connects and it crashes", %{conn: conn} do + {room_id, peer_id, conn} = subscribe_on_notifications_and_connect_peer(conn) - assert_receive %PeerConnected{peer_id: ^peer_id, room_id: ^room_id} + {:ok, room_pid} = Jellyfish.RoomService.find_room(room_id) - conn = delete(conn, ~p"/room/#{room_id}/") - assert response(conn, :no_content) + state = :sys.get_state(room_pid) - assert_receive %PeerDisconnected{peer_id: ^peer_id, room_id: ^room_id} - end + peer_socket_pid = state.peers[peer_id].socket_pid - def create_and_authenticate() do - token = Application.fetch_env!(:jellyfish, :server_api_token) - auth_request = auth_request(token) + Process.exit(peer_socket_pid, :kill) - {:ok, ws} = WS.start_link(@path, :server) - :ok = WS.send_binary_frame(ws, auth_request) - assert_receive @auth_response, 1000 + assert_receive %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id} - ws + assert_receive {:webhook_notification, + %PeerDisconnected{room_id: ^room_id, peer_id: ^peer_id}}, + 2_000 + + delete(conn, ~p"/room/#{room_id}") end test "sends metrics", %{conn: conn} do @@ -231,6 +257,38 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do assert String.contains?(endpoint_id, "endpoint_id") end + def subscribe_on_notifications_and_connect_peer(conn) do + server_api_token = Application.fetch_env!(:jellyfish, :server_api_token) + ws = create_and_authenticate() + + subscribe(ws, :server_notification) + + {room_id, peer_id, peer_token, conn} = + add_room_and_peer(conn, server_api_token) + + {:ok, peer_ws} = WS.start("ws://127.0.0.1:#{@port}/socket/peer/websocket", :peer) + auth_request = peer_auth_request(peer_token) + :ok = WS.send_binary_frame(peer_ws, auth_request) + + assert_receive %PeerConnected{peer_id: ^peer_id, room_id: ^room_id} + + assert_receive {:webhook_notification, %PeerConnected{peer_id: ^peer_id, room_id: ^room_id}}, + 1_000 + + {room_id, peer_id, conn} + end + + def create_and_authenticate() do + token = Application.fetch_env!(:jellyfish, :server_api_token) + auth_request = auth_request(token) + + {:ok, ws} = WS.start_link(@path, :server) + :ok = WS.send_binary_frame(ws, auth_request) + assert_receive @auth_response, 1000 + + ws + end + def subscribe(ws, event_type) do proto_event_type = to_proto_event_type(event_type) @@ -251,7 +309,13 @@ defmodule JellyfishWeb.Integration.ServerSocketTest do defp add_room_and_peer(conn, server_api_token) do conn = put_req_header(conn, "authorization", "Bearer " <> server_api_token) - conn = post(conn, ~p"/room", maxPeers: @max_peers, videoCodec: "h264") + conn = + post(conn, ~p"/room", + maxPeers: @max_peers, + videoCodec: "h264", + webhookUrl: @webhook_url + ) + assert %{"id" => room_id} = json_response(conn, :created)["data"]["room"] conn = post(conn, ~p"/room/#{room_id}/peer", type: "webrtc") diff --git a/test/support/webhook_plug.ex b/test/support/webhook_plug.ex new file mode 100644 index 00000000..502dad40 --- /dev/null +++ b/test/support/webhook_plug.ex @@ -0,0 +1,30 @@ +defmodule WebHookPlug do + @moduledoc false + import Plug.Conn + alias Jellyfish.ServerMessage + alias Phoenix.PubSub + + @pubsub Jellyfish.PubSub + + def init(opts) do + # initialize options + + opts + end + + def call(conn, _opts) do + {:ok, body, conn} = Plug.Conn.read_body(conn, []) + notification = Jason.decode!(body) + + notification = + notification |> Map.get("notification") |> ServerMessage.decode() |> Map.get(:content) + + {_notification_type, notification} = notification + + :ok = PubSub.broadcast(@pubsub, "webhook", {:webhook_notification, notification}) + + conn + |> put_resp_content_type("text/plain") + |> send_resp(200, "OK") + end +end diff --git a/test/support/ws.ex b/test/support/ws.ex index e2c7e6f5..09787e9e 100644 --- a/test/support/ws.ex +++ b/test/support/ws.ex @@ -6,6 +6,11 @@ defmodule JellyfishWeb.WS do alias Jellyfish.PeerMessage alias Jellyfish.ServerMessage + def start(url, type) do + state = %{caller: self(), type: type} + WebSockex.start(url, __MODULE__, state) + end + def start_link(url, type) do state = %{caller: self(), type: type} WebSockex.start_link(url, __MODULE__, state)