diff --git a/README.md b/README.md index 91d82cb..c9bb17b 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ Elixir server SDK for [Jellyfish](https://github.com/jellyfish-dev/jellyfish). Currently it allows for: - making API calls to Jellyfish server (QoL wrapper for HTTP requests) -- listening to Jellyfish server notifications via WebSocket +- listening to Jellyfish server events via WebSocket ## Installation @@ -26,12 +26,12 @@ end ## Usage Make API calls to Jellyfish (authentication required, for more information see [Jellyfish docs](https://jellyfish-dev.github.io/jellyfish-docs/getting_started/authentication)) -and receive server notifications: +and receive server events: ```elixir -# start process responsible for receiving notifications +# start process responsible for receiving events {:ok, notifier} = Jellyfish.Notifier.start(server_address: "localhost:5002", server_api_key: "your-jellyfish-token") -{:ok, _rooms} = Jellyfish.Notifier.subscribe(notifier, :all) +{:ok, _rooms} = Jellyfish.Notifier.subscribe_server_notifications(notifier, :all) # create HTTP client instance client = Jellyfish.Client.new(server_address: "localhost:5002", server_api_key: "your-jellyfish-token") @@ -54,7 +54,7 @@ end :ok = Jellyfish.Room.delete_peer(client, room_id, peer_id) ``` -List of structs representing server notifications can be found in [generated Protobuf file](lib/protos/jellyfish/server_notifications.pb.ex). +List of structs representing events can be found in the [docs](https://hexdocs.pm/jellyfish_server_sdk). ## Testing diff --git a/examples/server_socket.exs b/examples/server_socket.exs index 15e6b26..e3e4b55 100644 --- a/examples/server_socket.exs +++ b/examples/server_socket.exs @@ -1,16 +1,23 @@ -Mix.install([ - {:jellyfish_server_sdk, path: __DIR__ |> Path.join("..") |> Path.expand()} -], force: true) +Mix.install( + [ + {:jellyfish_server_sdk, path: __DIR__ |> Path.join("..") |> Path.expand()} + ], + force: true +) +server_address = "localhost:5002" server_api_token = "development" -{:ok, _pid} = - Jellyfish.Notifier.start(server_address: "localhost:4000", server_api_token: server_api_token) +{:ok, notifier} = + Jellyfish.Notifier.start(server_address: server_address, server_api_token: server_api_token) + +{:ok, _rooms} = Jellyfish.Notifier.subscribe_server_notifications(notifier, :all) +:ok = Jellyfish.Notifier.subscribe_metrics(notifier) receive_notification = fn receive_notification -> receive do - {:jellyfish, server_notification} -> - IO.inspect(server_notification, label: :server_notification) + {:jellyfish, event} -> + IO.inspect(event, label: :event) after 150_000 -> IO.inspect(:timeout) diff --git a/lib/jellyfish/component.ex b/lib/jellyfish/component.ex index 178a69e..83ce22c 100644 --- a/lib/jellyfish/component.ex +++ b/lib/jellyfish/component.ex @@ -8,7 +8,7 @@ defmodule Jellyfish.Component do alias Jellyfish.Component.{HLS, RTSP} alias Jellyfish.Exception.StructureError - alias Jellyfish.ServerMessage.SubscriptionResponse.RoomState + alias Jellyfish.ServerMessage.SubscribeResponse.RoomState @enforce_keys [ :id, diff --git a/lib/jellyfish/metrics_report.ex b/lib/jellyfish/metrics_report.ex new file mode 100644 index 0000000..fa52a7a --- /dev/null +++ b/lib/jellyfish/metrics_report.ex @@ -0,0 +1,47 @@ +defmodule Jellyfish.MetricsReport do + @moduledoc nil + + @enforce_keys [:metrics] + defstruct @enforce_keys + + @typedoc """ + Describes a WebRTC metrics report, which is periodically sent once the process subscribes for metrics events. + + The report is a map, with each entry being a `metric_name => value` pair, where value can be a boolean, number, string or a map with the same structure. + + Here is a sample report: + ``` + %Jellyfish.MetricsReport{ + metrics: %{ + "inbound-rtp.frames" => 406, + "inbound-rtp.keyframes" => 9, + "room_id=32b1e952-9efa-4c29-88bc-36d7a536f95a" => %{ + "endpoint_id=4354f193-e787-4f07-b445-7e246b702ba6" => %{ + "ice.protocol" => "udp", + "track_id=4354f193-e787-4f07-b445-7e246b702ba6:cd2013a8-ea9f-4612-aa99-05149172e6a5:" => %{ + "inbound-rtp.bytes_received" => 379567, + "inbound-rtp.bytes_received-per-second" => 68355.64435564436, + "inbound-rtp.encoding" => "VP8", + "rtx_stream" => %{ + "inbound-rtp.bytes_received" => 7680, + "inbound-rtp.bytes_received-per-second" => 0.0 + }, + "track.metadata" => %{ + "active" => true, + "type" => "camera" + } + }, + "track_id=5ead4135-6ab2-4872-8c04-daca02f5116d:63543a41-b3ff-4a80-91fd-91cfdea13cbe" => %{ + "outbound-rtp.bytes" => 354075, + "outbound-rtp.bytes-per-second" => 63083.91608391608, + "outbound-rtp.variant" => "high" + } + } + } + ``` + """ + + @type t :: %__MODULE__{ + metrics: map() + } +end diff --git a/lib/jellyfish/notifier.ex b/lib/jellyfish/notifier.ex index 807f442..b95c67e 100644 --- a/lib/jellyfish/notifier.ex +++ b/lib/jellyfish/notifier.ex @@ -1,7 +1,7 @@ defmodule Jellyfish.Notifier do @moduledoc """ Module defining a process responsible for establishing - WebSocket connection and receiving notifications from Jellyfish server. + WebSocket connection and receiving events from Jellyfish server. Define the connection configuration in the mix config ``` config.exs @@ -21,8 +21,8 @@ defmodule Jellyfish.Notifier do ``` ``` - # Subscribe current process to all server notifications. - iex> {:ok, _rooms} = Jellyfish.Notifier.subscribe(notifier, :server_notifications, :all) + # Subscribe current process to server notifications from all rooms. + iex> {:ok, _rooms} = Jellyfish.Notifier.subscribe_server_notifications(notifier, :all) # here add a room and a peer using functions from `Jellyfish.Room` module # you should receive a notification after the peer established connection @@ -44,27 +44,23 @@ defmodule Jellyfish.Notifier do alias Jellyfish.ServerMessage.{ Authenticated, AuthRequest, + MetricsReport, SubscribeRequest, - SubscriptionResponse + SubscribeResponse } - alias Jellyfish.ServerMessage.SubscribeRequest.ServerNotification - alias Jellyfish.ServerMessage.SubscriptionResponse.{RoomNotFound, RoomsState, RoomState} + alias Jellyfish.ServerMessage.SubscribeRequest.{Metrics, ServerNotification} + + alias Jellyfish.ServerMessage.SubscribeResponse.{RoomNotFound, RoomsState, RoomState} @auth_timeout 2000 @subscribe_timeout 5000 - @valid_events [:server_notification] @typedoc """ The reference to the `Notifier` process. """ @type notifier() :: GenServer.server() - @typedoc """ - A type of event, for which a process can subscribe using `subscribe/3`. - """ - @type event_type() :: :server_notification - @doc """ Starts the Notifier process and connects to Jellyfish. @@ -90,22 +86,20 @@ defmodule Jellyfish.Notifier do end @doc """ - Subscribes the process to receive events of `event_type` from room with `room_id` and returns + Subscribes the process to receive server notifications from room with `room_id` and returns current state of the room. - Currently supported event is `:server_notification`. - If `:all` is passed in place of `room_id`, notifications about all of the rooms will be sent and list of all of the room's states is returned. Notifications are sent to the process in a form of `{:jellyfish, msg}`, - where `msg` is one of structs defined under "Notifications" section in the docs, + where `msg` is one of structs defined under "Jellyfish.Notification" section in the docs, for example `{:jellyfish, %Jellyfish.Notification.RoomCrashed{room_id: "some_id"}}`. """ - @spec subscribe(notifier(), event_type(), Room.id() | :all) :: + @spec subscribe_server_notifications(notifier(), Room.id() | :all) :: {:ok, Room.t() | [Room.t()]} | {:error, atom()} - def subscribe(notifier, event_type, room_id) when event_type in @valid_events do - WebSockex.cast(notifier, {:subscribe, self(), event_type, room_id}) + def subscribe_server_notifications(notifier, room_id) do + WebSockex.cast(notifier, {:subscribe_server_notifications, self(), room_id}) receive do {:jellyfish, {:subscribe_answer, answer}} -> answer @@ -115,12 +109,26 @@ defmodule Jellyfish.Notifier do end end - def subscribe(_notifier, _event_type, _room_id) do - {:error, :invalid_event_type} + @doc """ + Subscribes the process to the WebRTC metrics from all the rooms. + + Metrics are periodically sent to the process in a form of `{:jellyfish, metrics_report}`, + where `metrics_report` is the `Jellyfish.MetricsReport` struct. + """ + + @spec subscribe_metrics(notifier()) :: :ok | {:error, :timeout} + def subscribe_metrics(notifier) do + WebSockex.cast(notifier, {:subscribe_metrics, self()}) + + receive do + {:jellyfish, {:subscribe_answer, :ok}} -> :ok + after + @subscribe_timeout -> {:error, :timeout} + end end @impl true - def handle_cast({:subscribe, pid, :server_notification, room_id}, state) do + def handle_cast({:subscribe_server_notifications, pid, room_id}, state) do proto_room_id = case room_id do :all -> {:option, :OPTION_ALL} @@ -144,7 +152,23 @@ defmodule Jellyfish.Notifier do } |> ServerMessage.encode() - state = put_in(state.pending_subscriptions[request_id], pid) + state = put_in(state.pending_subscriptions[request_id], {:server_notification, pid}) + + {:reply, {:binary, request}, state} + end + + def handle_cast({:subscribe_metrics, pid}, state) do + request_id = UUID.uuid4() + + request = + %ServerMessage{ + content: + {:subscribe_request, + %SubscribeRequest{id: request_id, event_type: {:metrics, %Metrics{}}}} + } + |> ServerMessage.encode() + + state = put_in(state.pending_subscriptions[request_id], {:metrics, pid}) {:reply, {:binary, request}, state} end @@ -160,9 +184,10 @@ defmodule Jellyfish.Notifier do @impl true def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do state = - Map.update!(state, :subscriptions, fn subs -> + update_in(state.subscriptions.server_notification, fn subs -> Map.new(subs, fn {id, pids} -> {id, MapSet.delete(pids, pid)} end) end) + |> update_in([:subscriptions, :metrics], &MapSet.delete(&1, pid)) {:ok, state} end @@ -183,7 +208,10 @@ defmodule Jellyfish.Notifier do state = %{ caller_pid: self(), - subscriptions: %{all: MapSet.new()}, + subscriptions: %{ + server_notification: %{all: MapSet.new()}, + metrics: MapSet.new() + }, pending_subscriptions: %{} } @@ -216,14 +244,14 @@ defmodule Jellyfish.Notifier do state end - defp handle_notification(%SubscriptionResponse{id: id, content: {_type, response}}, state) do - {pid, state} = pop_in(state.pending_subscriptions[id]) + defp handle_notification(%SubscribeResponse{id: id, content: content}, state) do + {{event_type, pid}, state} = pop_in(state.pending_subscriptions[id]) - handle_subscription_response(pid, response, state) + handle_subscription_response(event_type, pid, content, state) end defp handle_notification(%{room_id: room_id} = message, state) do - state.subscriptions + state.subscriptions.server_notification |> Map.take([:all, room_id]) |> Map.values() |> Enum.reduce(fn pids, acc -> MapSet.union(pids, acc) end) @@ -232,12 +260,23 @@ defmodule Jellyfish.Notifier do state end - defp handle_subscription_response(pid, %RoomNotFound{}, state) do + defp handle_notification(%MetricsReport{metrics: metrics}, state) do + notification = %Jellyfish.MetricsReport{metrics: Jason.decode!(metrics)} + + state.subscriptions.metrics + |> Enum.each(fn pid -> + send(pid, {:jellyfish, notification}) + end) + + state + end + + defp handle_subscription_response(:server_notification, pid, {_type, %RoomNotFound{}}, state) do send(pid, {:jellyfish, {:subscribe_answer, {:error, :room_not_found}}}) state end - defp handle_subscription_response(pid, %mod{} = room, state) + defp handle_subscription_response(:server_notification, pid, {_type, %mod{} = room}, state) when mod in [RoomState, RoomsState] do {room_id, room} = case mod do @@ -248,7 +287,7 @@ defmodule Jellyfish.Notifier do Process.monitor(pid) state = - update_in(state.subscriptions[room_id], fn + update_in(state.subscriptions.server_notification[room_id], fn nil -> MapSet.new([pid]) set -> MapSet.put(set, pid) end) @@ -256,4 +295,13 @@ defmodule Jellyfish.Notifier do send(pid, {:jellyfish, {:subscribe_answer, {:ok, room}}}) state end + + defp handle_subscription_response(:metrics, pid, nil, state) do + Process.monitor(pid) + + state = update_in(state.subscriptions.metrics, &MapSet.put(&1, pid)) + + send(pid, {:jellyfish, {:subscribe_answer, :ok}}) + state + end end diff --git a/lib/jellyfish/peer.ex b/lib/jellyfish/peer.ex index fae2162..66eeebb 100644 --- a/lib/jellyfish/peer.ex +++ b/lib/jellyfish/peer.ex @@ -9,7 +9,7 @@ defmodule Jellyfish.Peer do alias Jellyfish.Exception.StructureError alias Jellyfish.Peer.WebRTC - alias Jellyfish.ServerMessage.SubscriptionResponse.RoomState + alias Jellyfish.ServerMessage.SubscribeResponse.RoomState @enforce_keys [ :id, diff --git a/lib/jellyfish/room.ex b/lib/jellyfish/room.ex index 099a3e5..4e49137 100644 --- a/lib/jellyfish/room.ex +++ b/lib/jellyfish/room.ex @@ -29,7 +29,7 @@ defmodule Jellyfish.Room do alias Tesla.Env alias Jellyfish.{Client, Component, Peer} alias Jellyfish.Exception.StructureError - alias Jellyfish.ServerMessage.SubscriptionResponse.RoomState + alias Jellyfish.ServerMessage.SubscribeResponse.RoomState @enforce_keys [ :id, diff --git a/lib/protos/jellyfish/server_notifications.pb.ex b/lib/protos/jellyfish/server_notifications.pb.ex index 995616a..9d7361f 100644 --- a/lib/protos/jellyfish/server_notifications.pb.ex +++ b/lib/protos/jellyfish/server_notifications.pb.ex @@ -7,7 +7,17 @@ defmodule Jellyfish.ServerMessage.SubscribeRequest.ServerNotification.Option do field :OPTION_ALL, 1 end -defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer.Type do +defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Config.Encoding do + @moduledoc false + + use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 + + field :ENCODING_UNSPECIFIED, 0 + field :ENCODING_H264, 1 + field :ENCODING_VP8, 2 +end + +defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Peer.Type do @moduledoc false use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 @@ -16,7 +26,7 @@ defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer.Type do field :TYPE_WEBRTC, 1 end -defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer.Status do +defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Peer.Status do @moduledoc false use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 @@ -26,7 +36,7 @@ defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer.Status do field :STATUS_DISCONNECTED, 2 end -defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Component.Type do +defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component.Type do @moduledoc false use Protobuf, enum: true, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 @@ -109,6 +119,12 @@ defmodule Jellyfish.ServerMessage.SubscribeRequest.ServerNotification do oneof: 0 end +defmodule Jellyfish.ServerMessage.SubscribeRequest.Metrics do + @moduledoc false + + use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 +end + defmodule Jellyfish.ServerMessage.SubscribeRequest do @moduledoc false @@ -122,33 +138,37 @@ defmodule Jellyfish.ServerMessage.SubscribeRequest do type: Jellyfish.ServerMessage.SubscribeRequest.ServerNotification, json_name: "serverNotification", oneof: 0 + + field :metrics, 3, type: Jellyfish.ServerMessage.SubscribeRequest.Metrics, oneof: 0 end -defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Config do +defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Config do @moduledoc false use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 field :max_peers, 1, type: :uint32, json_name: "maxPeers" + + field :enforce_encoding, 2, + type: Jellyfish.ServerMessage.SubscribeResponse.RoomState.Config.Encoding, + json_name: "enforceEncoding", + enum: true end -defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer do +defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Peer do @moduledoc false use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 field :id, 1, type: :string - - field :type, 2, - type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer.Type, - enum: true + field :type, 2, type: Jellyfish.ServerMessage.SubscribeResponse.RoomState.Peer.Type, enum: true field :status, 3, - type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer.Status, + type: Jellyfish.ServerMessage.SubscribeResponse.RoomState.Peer.Status, enum: true end -defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Component do +defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component do @moduledoc false use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 @@ -156,36 +176,33 @@ defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Component do field :id, 1, type: :string field :type, 2, - type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Component.Type, + type: Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component.Type, enum: true end -defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomState do +defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomState do @moduledoc false use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 field :id, 1, type: :string - field :config, 2, type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Config - - field :peers, 3, - repeated: true, - type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Peer + field :config, 2, type: Jellyfish.ServerMessage.SubscribeResponse.RoomState.Config + field :peers, 3, repeated: true, type: Jellyfish.ServerMessage.SubscribeResponse.RoomState.Peer field :components, 4, repeated: true, - type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState.Component + type: Jellyfish.ServerMessage.SubscribeResponse.RoomState.Component end -defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomsState do +defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomsState do @moduledoc false use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 - field :rooms, 1, repeated: true, type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState + field :rooms, 1, repeated: true, type: Jellyfish.ServerMessage.SubscribeResponse.RoomState end -defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomNotFound do +defmodule Jellyfish.ServerMessage.SubscribeResponse.RoomNotFound do @moduledoc false use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 @@ -193,7 +210,7 @@ defmodule Jellyfish.ServerMessage.SubscriptionResponse.RoomNotFound do field :id, 1, type: :string end -defmodule Jellyfish.ServerMessage.SubscriptionResponse do +defmodule Jellyfish.ServerMessage.SubscribeResponse do @moduledoc false use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 @@ -203,17 +220,17 @@ defmodule Jellyfish.ServerMessage.SubscriptionResponse do field :id, 1, type: :string field :room_state, 2, - type: Jellyfish.ServerMessage.SubscriptionResponse.RoomState, + type: Jellyfish.ServerMessage.SubscribeResponse.RoomState, json_name: "roomState", oneof: 0 field :rooms_state, 3, - type: Jellyfish.ServerMessage.SubscriptionResponse.RoomsState, + type: Jellyfish.ServerMessage.SubscribeResponse.RoomsState, json_name: "roomsState", oneof: 0 field :room_not_found, 4, - type: Jellyfish.ServerMessage.SubscriptionResponse.RoomNotFound, + type: Jellyfish.ServerMessage.SubscribeResponse.RoomNotFound, json_name: "roomNotFound", oneof: 0 end @@ -234,6 +251,14 @@ defmodule Jellyfish.ServerMessage.RoomDeleted do field :room_id, 1, type: :string, json_name: "roomId" end +defmodule Jellyfish.ServerMessage.MetricsReport do + @moduledoc false + + use Protobuf, protoc_gen_elixir_version: "0.12.0", syntax: :proto3 + + field :metrics, 1, type: :string +end + defmodule Jellyfish.ServerMessage do @moduledoc false @@ -278,9 +303,9 @@ defmodule Jellyfish.ServerMessage do json_name: "subscribeRequest", oneof: 0 - field :subscription_response, 9, - type: Jellyfish.ServerMessage.SubscriptionResponse, - json_name: "subscriptionResponse", + field :subscribe_response, 9, + type: Jellyfish.ServerMessage.SubscribeResponse, + json_name: "subscribeResponse", oneof: 0 field :room_created, 10, @@ -292,4 +317,9 @@ defmodule Jellyfish.ServerMessage do type: Jellyfish.ServerMessage.RoomDeleted, json_name: "roomDeleted", oneof: 0 + + field :metrics_report, 12, + type: Jellyfish.ServerMessage.MetricsReport, + json_name: "metricsReport", + oneof: 0 end diff --git a/mix.exs b/mix.exs index 146d330..f6b56ad 100644 --- a/mix.exs +++ b/mix.exs @@ -98,8 +98,14 @@ defmodule Membrane.Template.Mixfile do extras: ["README.md", "LICENSE"], formatters: ["html"], source_ref: "v#{@version}", - nest_modules_by_prefix: [Jellyfish, Jellyfish.Exception, Jellyfish.Notification], - groups_for_modules: [Notifications: ~r/^Jellyfish\.Notification\.[a-zA-Z]*$/] + nest_modules_by_prefix: [ + Jellyfish, + Jellyfish.Exception, + Jellyfish.Notification + ], + groups_for_modules: [ + Events: ~r/^Jellyfish\.((\bNotification\.[a-zA-Z]*$)|(\bMetricsReport))/ + ] ] end diff --git a/protos b/protos index 0895a98..a6d29c1 160000 --- a/protos +++ b/protos @@ -1 +1 @@ -Subproject commit 0895a982b0f23714952d38ca8a6ba78a9b0278e6 +Subproject commit a6d29c1575a61e28ebf3732f5fcd446fb2bbd91d diff --git a/test/jellyfish/notifier_test.exs b/test/jellyfish/notifier_test.exs index 839c1e4..49196ee 100644 --- a/test/jellyfish/notifier_test.exs +++ b/test/jellyfish/notifier_test.exs @@ -1,7 +1,6 @@ defmodule Jellyfish.NotifierTest do use ExUnit.Case - alias Jellyfish.Notification.RoomDeleted alias Jellyfish.{Client, Notifier, Peer, Room} alias Jellyfish.PeerMessage @@ -14,6 +13,8 @@ defmodule Jellyfish.NotifierTest do RoomDeleted } + alias Jellyfish.MetricsReport + alias Jellyfish.WS @peer_opts %Peer.WebRTC{} @@ -45,7 +46,7 @@ defmodule Jellyfish.NotifierTest do test "returns error if room does not exist", %{notifier: notifier} do assert {:error, :room_not_found} = - Notifier.subscribe(notifier, :server_notification, "fake_room_id") + Notifier.subscribe_server_notifications(notifier, "fake_room_id") end test "returns initial state of the room", %{client: client, notifier: notifier} do @@ -53,7 +54,7 @@ defmodule Jellyfish.NotifierTest do {:ok, %Jellyfish.Peer{id: peer_id}, _token} = Room.add_peer(client, room_id, Peer.WebRTC) assert {:ok, %Room{id: ^room_id, peers: [%Peer{id: ^peer_id}]}} = - Notifier.subscribe(notifier, :server_notification, room_id) + Notifier.subscribe_server_notifications(notifier, room_id) end test "for all notifications", %{client: client, notifier: notifier} do @@ -62,7 +63,7 @@ defmodule Jellyfish.NotifierTest do trigger_notification(client, room_id) refute_receive {:jellyfish, _msg}, 100 - assert {:ok, _rooms} = Notifier.subscribe(notifier, :server_notification, :all) + assert {:ok, _rooms} = Notifier.subscribe_server_notifications(notifier, :all) trigger_notification(client, room_id) assert_receive {:jellyfish, %PeerConnected{room_id: ^room_id}} @@ -76,7 +77,7 @@ defmodule Jellyfish.NotifierTest do test "for specific room notifications only", %{client: client, notifier: notifier} do {:ok, %Jellyfish.Room{id: room_id}} = Room.create(client) - assert {:ok, _room} = Notifier.subscribe(notifier, :server_notification, room_id) + assert {:ok, _room} = Notifier.subscribe_server_notifications(notifier, room_id) trigger_notification(client, room_id) assert_receive {:jellyfish, %PeerConnected{room_id: ^room_id}} @@ -89,7 +90,7 @@ defmodule Jellyfish.NotifierTest do describe "receiving notifications" do setup do {:ok, notifier} = Notifier.start_link() - {:ok, _rooms} = Notifier.subscribe(notifier, :server_notification, :all) + {:ok, _rooms} = Notifier.subscribe_server_notifications(notifier, :all) %{client: Client.new()} end @@ -125,6 +126,31 @@ defmodule Jellyfish.NotifierTest do end end + describe "receiving metrics" do + setup do + {:ok, notifier} = Notifier.start_link() + {:ok, _rooms} = Notifier.subscribe_server_notifications(notifier, :all) + :ok = Notifier.subscribe_metrics(notifier) + + %{client: Client.new()} + end + + test "with one peer", %{client: client} do + {:ok, %Jellyfish.Room{id: room_id}} = Room.create(client, max_peers: @max_peers) + {:ok, %Jellyfish.Peer{id: peer_id}, peer_token} = Room.add_peer(client, room_id, @peer_opts) + + url = Application.fetch_env!(:jellyfish_server_sdk, :server_address) + {:ok, peer_ws} = WS.start_link("ws://#{url}/socket/peer/websocket") + + auth_request = %PeerMessage{content: {:auth_request, %AuthRequest{token: peer_token}}} + :ok = WS.send_frame(peer_ws, auth_request) + + assert_receive {:jellyfish, %PeerConnected{peer_id: ^peer_id, room_id: ^room_id}} + + assert_receive {:jellyfish, %MetricsReport{metrics: metrics}} when metrics != %{}, 1500 + end + end + defp trigger_notification(client, room_id) do {:ok, %Jellyfish.Peer{}, peer_token} = Room.add_peer(client, room_id, @peer_opts)