Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Elixir server SDK for [Jellyfish](https://github.com/jellyfish-dev/jellyfish).
Currently it allows for:

- making API calls to Jellyfish server (QoL wrapper for HTTP requests)
- listening to Jellyfish server notifications via WebSocket
- listening to Jellyfish server events via WebSocket

## Installation

Expand All @@ -26,12 +26,12 @@ end
## Usage

Make API calls to Jellyfish (authentication required, for more information see [Jellyfish docs](https://jellyfish-dev.github.io/jellyfish-docs/getting_started/authentication))
and receive server notifications:
and receive server events:

```elixir
# start process responsible for receiving notifications
# start process responsible for receiving events
{:ok, notifier} = Jellyfish.Notifier.start(server_address: "localhost:5002", server_api_key: "your-jellyfish-token")
{:ok, _rooms} = Jellyfish.Notifier.subscribe(notifier, :all)
{:ok, _rooms} = Jellyfish.Notifier.subscribe_server_notifications(notifier, :all)

# create HTTP client instance
client = Jellyfish.Client.new(server_address: "localhost:5002", server_api_key: "your-jellyfish-token")
Expand All @@ -54,7 +54,7 @@ end
:ok = Jellyfish.Room.delete_peer(client, room_id, peer_id)
```

List of structs representing server notifications can be found in [generated Protobuf file](lib/protos/jellyfish/server_notifications.pb.ex).
List of structs representing events can be found in the [docs](https://hexdocs.pm/jellyfish_server_sdk).

## Testing

Expand Down
21 changes: 14 additions & 7 deletions examples/server_socket.exs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
Mix.install([
{:jellyfish_server_sdk, path: __DIR__ |> Path.join("..") |> Path.expand()}
], force: true)
Mix.install(
[
{:jellyfish_server_sdk, path: __DIR__ |> Path.join("..") |> Path.expand()}
],
force: true
)

server_address = "localhost:5002"
server_api_token = "development"

{:ok, _pid} =
Jellyfish.Notifier.start(server_address: "localhost:4000", server_api_token: server_api_token)
{:ok, notifier} =
Jellyfish.Notifier.start(server_address: server_address, server_api_token: server_api_token)

{:ok, _rooms} = Jellyfish.Notifier.subscribe_server_notifications(notifier, :all)
:ok = Jellyfish.Notifier.subscribe_metrics(notifier)

receive_notification = fn receive_notification ->
receive do
{:jellyfish, server_notification} ->
IO.inspect(server_notification, label: :server_notification)
{:jellyfish, event} ->
IO.inspect(event, label: :event)
after
150_000 ->
IO.inspect(:timeout)
Expand Down
2 changes: 1 addition & 1 deletion lib/jellyfish/component.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule Jellyfish.Component do

alias Jellyfish.Component.{HLS, RTSP}
alias Jellyfish.Exception.StructureError
alias Jellyfish.ServerMessage.SubscriptionResponse.RoomState
alias Jellyfish.ServerMessage.SubscribeResponse.RoomState

@enforce_keys [
:id,
Expand Down
47 changes: 47 additions & 0 deletions lib/jellyfish/metrics_report.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
defmodule Jellyfish.MetricsReport do
@moduledoc nil

@enforce_keys [:metrics]
defstruct @enforce_keys

@typedoc """
Describes a WebRTC metrics report, which is periodically sent once the process subscribes for metrics events.

The report is a map, with each entry being a `metric_name => value` pair, where value can be a boolean, number, string or a map with the same structure.

Here is a sample report:
```
%Jellyfish.MetricsReport{
metrics: %{
"inbound-rtp.frames" => 406,
"inbound-rtp.keyframes" => 9,
"room_id=32b1e952-9efa-4c29-88bc-36d7a536f95a" => %{
"endpoint_id=4354f193-e787-4f07-b445-7e246b702ba6" => %{
"ice.protocol" => "udp",
"track_id=4354f193-e787-4f07-b445-7e246b702ba6:cd2013a8-ea9f-4612-aa99-05149172e6a5:" => %{
"inbound-rtp.bytes_received" => 379567,
"inbound-rtp.bytes_received-per-second" => 68355.64435564436,
"inbound-rtp.encoding" => "VP8",
"rtx_stream" => %{
"inbound-rtp.bytes_received" => 7680,
"inbound-rtp.bytes_received-per-second" => 0.0
},
"track.metadata" => %{
"active" => true,
"type" => "camera"
}
},
"track_id=5ead4135-6ab2-4872-8c04-daca02f5116d:63543a41-b3ff-4a80-91fd-91cfdea13cbe" => %{
"outbound-rtp.bytes" => 354075,
"outbound-rtp.bytes-per-second" => 63083.91608391608,
"outbound-rtp.variant" => "high"
}
}
}
```
"""

@type t :: %__MODULE__{
metrics: map()
}
end
112 changes: 80 additions & 32 deletions lib/jellyfish/notifier.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Jellyfish.Notifier do
@moduledoc """
Module defining a process responsible for establishing
WebSocket connection and receiving notifications from Jellyfish server.
WebSocket connection and receiving events from Jellyfish server.

Define the connection configuration in the mix config
``` config.exs
Expand All @@ -21,8 +21,8 @@ defmodule Jellyfish.Notifier do
```

```
# Subscribe current process to all server notifications.
iex> {:ok, _rooms} = Jellyfish.Notifier.subscribe(notifier, :server_notifications, :all)
# Subscribe current process to server notifications from all rooms.
iex> {:ok, _rooms} = Jellyfish.Notifier.subscribe_server_notifications(notifier, :all)

# here add a room and a peer using functions from `Jellyfish.Room` module
# you should receive a notification after the peer established connection
Expand All @@ -44,27 +44,23 @@ defmodule Jellyfish.Notifier do
alias Jellyfish.ServerMessage.{
Authenticated,
AuthRequest,
MetricsReport,
SubscribeRequest,
SubscriptionResponse
SubscribeResponse
}

alias Jellyfish.ServerMessage.SubscribeRequest.ServerNotification
alias Jellyfish.ServerMessage.SubscriptionResponse.{RoomNotFound, RoomsState, RoomState}
alias Jellyfish.ServerMessage.SubscribeRequest.{Metrics, ServerNotification}

alias Jellyfish.ServerMessage.SubscribeResponse.{RoomNotFound, RoomsState, RoomState}

@auth_timeout 2000
@subscribe_timeout 5000
@valid_events [:server_notification]

@typedoc """
The reference to the `Notifier` process.
"""
@type notifier() :: GenServer.server()

@typedoc """
A type of event, for which a process can subscribe using `subscribe/3`.
"""
@type event_type() :: :server_notification

@doc """
Starts the Notifier process and connects to Jellyfish.

Expand All @@ -90,22 +86,20 @@ defmodule Jellyfish.Notifier do
end

@doc """
Subscribes the process to receive events of `event_type` from room with `room_id` and returns
Subscribes the process to receive server notifications from room with `room_id` and returns
current state of the room.

Currently supported event is `:server_notification`.

If `:all` is passed in place of `room_id`, notifications about all of the rooms will be sent and
list of all of the room's states is returned.

Notifications are sent to the process in a form of `{:jellyfish, msg}`,
where `msg` is one of structs defined under "Notifications" section in the docs,
where `msg` is one of structs defined under "Jellyfish.Notification" section in the docs,
for example `{:jellyfish, %Jellyfish.Notification.RoomCrashed{room_id: "some_id"}}`.
"""
@spec subscribe(notifier(), event_type(), Room.id() | :all) ::
@spec subscribe_server_notifications(notifier(), Room.id() | :all) ::
{:ok, Room.t() | [Room.t()]} | {:error, atom()}
def subscribe(notifier, event_type, room_id) when event_type in @valid_events do
WebSockex.cast(notifier, {:subscribe, self(), event_type, room_id})
def subscribe_server_notifications(notifier, room_id) do
WebSockex.cast(notifier, {:subscribe_server_notifications, self(), room_id})

receive do
{:jellyfish, {:subscribe_answer, answer}} -> answer
Expand All @@ -115,12 +109,26 @@ defmodule Jellyfish.Notifier do
end
end

def subscribe(_notifier, _event_type, _room_id) do
{:error, :invalid_event_type}
@doc """
Subscribes the process to the WebRTC metrics from all the rooms.

Metrics are periodically sent to the process in a form of `{:jellyfish, metrics_report}`,
where `metrics_report` is the `Jellyfish.MetricsReport` struct.
"""

@spec subscribe_metrics(notifier()) :: :ok | {:error, :timeout}
def subscribe_metrics(notifier) do
WebSockex.cast(notifier, {:subscribe_metrics, self()})

receive do
{:jellyfish, {:subscribe_answer, :ok}} -> :ok
after
@subscribe_timeout -> {:error, :timeout}
end
end

@impl true
def handle_cast({:subscribe, pid, :server_notification, room_id}, state) do
def handle_cast({:subscribe_server_notifications, pid, room_id}, state) do
proto_room_id =
case room_id do
:all -> {:option, :OPTION_ALL}
Expand All @@ -144,7 +152,23 @@ defmodule Jellyfish.Notifier do
}
|> ServerMessage.encode()

state = put_in(state.pending_subscriptions[request_id], pid)
state = put_in(state.pending_subscriptions[request_id], {:server_notification, pid})

{:reply, {:binary, request}, state}
end

def handle_cast({:subscribe_metrics, pid}, state) do
request_id = UUID.uuid4()

request =
%ServerMessage{
content:
{:subscribe_request,
%SubscribeRequest{id: request_id, event_type: {:metrics, %Metrics{}}}}
}
|> ServerMessage.encode()

state = put_in(state.pending_subscriptions[request_id], {:metrics, pid})

{:reply, {:binary, request}, state}
end
Expand All @@ -160,9 +184,10 @@ defmodule Jellyfish.Notifier do
@impl true
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
state =
Map.update!(state, :subscriptions, fn subs ->
update_in(state.subscriptions.server_notification, fn subs ->
Map.new(subs, fn {id, pids} -> {id, MapSet.delete(pids, pid)} end)
end)
|> update_in([:subscriptions, :metrics], &MapSet.delete(&1, pid))

{:ok, state}
end
Expand All @@ -183,7 +208,10 @@ defmodule Jellyfish.Notifier do

state = %{
caller_pid: self(),
subscriptions: %{all: MapSet.new()},
subscriptions: %{
server_notification: %{all: MapSet.new()},
metrics: MapSet.new()
},
pending_subscriptions: %{}
}

Expand Down Expand Up @@ -216,14 +244,14 @@ defmodule Jellyfish.Notifier do
state
end

defp handle_notification(%SubscriptionResponse{id: id, content: {_type, response}}, state) do
{pid, state} = pop_in(state.pending_subscriptions[id])
defp handle_notification(%SubscribeResponse{id: id, content: content}, state) do
{{event_type, pid}, state} = pop_in(state.pending_subscriptions[id])

handle_subscription_response(pid, response, state)
handle_subscription_response(event_type, pid, content, state)
end

defp handle_notification(%{room_id: room_id} = message, state) do
state.subscriptions
state.subscriptions.server_notification
|> Map.take([:all, room_id])
|> Map.values()
|> Enum.reduce(fn pids, acc -> MapSet.union(pids, acc) end)
Expand All @@ -232,12 +260,23 @@ defmodule Jellyfish.Notifier do
state
end

defp handle_subscription_response(pid, %RoomNotFound{}, state) do
defp handle_notification(%MetricsReport{metrics: metrics}, state) do
notification = %Jellyfish.MetricsReport{metrics: Jason.decode!(metrics)}

state.subscriptions.metrics
|> Enum.each(fn pid ->
send(pid, {:jellyfish, notification})
end)

state
end

defp handle_subscription_response(:server_notification, pid, {_type, %RoomNotFound{}}, state) do
send(pid, {:jellyfish, {:subscribe_answer, {:error, :room_not_found}}})
state
end

defp handle_subscription_response(pid, %mod{} = room, state)
defp handle_subscription_response(:server_notification, pid, {_type, %mod{} = room}, state)
when mod in [RoomState, RoomsState] do
{room_id, room} =
case mod do
Expand All @@ -248,12 +287,21 @@ defmodule Jellyfish.Notifier do
Process.monitor(pid)

state =
update_in(state.subscriptions[room_id], fn
update_in(state.subscriptions.server_notification[room_id], fn
nil -> MapSet.new([pid])
set -> MapSet.put(set, pid)
end)

send(pid, {:jellyfish, {:subscribe_answer, {:ok, room}}})
state
end

defp handle_subscription_response(:metrics, pid, nil, state) do
Process.monitor(pid)

state = update_in(state.subscriptions.metrics, &MapSet.put(&1, pid))

send(pid, {:jellyfish, {:subscribe_answer, :ok}})
state
end
end
2 changes: 1 addition & 1 deletion lib/jellyfish/peer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Jellyfish.Peer do

alias Jellyfish.Exception.StructureError
alias Jellyfish.Peer.WebRTC
alias Jellyfish.ServerMessage.SubscriptionResponse.RoomState
alias Jellyfish.ServerMessage.SubscribeResponse.RoomState

@enforce_keys [
:id,
Expand Down
2 changes: 1 addition & 1 deletion lib/jellyfish/room.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ defmodule Jellyfish.Room do
alias Tesla.Env
alias Jellyfish.{Client, Component, Peer}
alias Jellyfish.Exception.StructureError
alias Jellyfish.ServerMessage.SubscriptionResponse.RoomState
alias Jellyfish.ServerMessage.SubscribeResponse.RoomState

@enforce_keys [
:id,
Expand Down
Loading