Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 116 additions & 23 deletions lib/event_stream.ex
Original file line number Diff line number Diff line change
@@ -1,12 +1,27 @@
defmodule EventStream do
@type(
header_value_type :: :boolean,
:byte,
:short,
:integer,
:long,
:byte_array,
:string,
:timestamp,
:uuid
)
@type header :: {binary(), any()} | {binary(), any(), header_value_type()}
@type headers :: [header]

@moduledoc """
Module for encoding to and decoding from the AWS EventStream Encoding format.

https://docs.aws.amazon.com/transcribe/latest/dg/event-stream.html
"""

@doc """
Encode a payload and headers to Event Stream format.
Encode a payload and headers to Event Stream format. Allows to specify
the header value type.

## Examples

Expand All @@ -21,14 +36,9 @@ defmodule EventStream do
105, 111, 110, 47, 106, 115, 111, 110, 123, 34, 102, 111, 111, 34, 58, 32, 34,
98, 97, 114, 34, 125, 111, 162, 191, 59>>
"""
@spec encode!(binary(), headers()) :: binary()
def encode!(payload, headers \\ []) do
headers_binary =
Enum.reduce(headers, <<>>, fn {key, value}, acc ->
key = to_string(key)

acc <> <<String.length(key)::size(8)>> <> key <> encode_header_value(value)
end)

headers_binary = encode_headers!(headers)
header_length = byte_size(headers_binary)

total_length = 4 + 4 + 4 + header_length + byte_size(payload) + 4
Expand All @@ -41,12 +51,62 @@ defmodule EventStream do
message_without_crc <> <<message_crc::size(32)>>
end

defp encode_header_value(value) when is_number(value) do
<<4::size(8), value::size(32)>>
def encode_headers!(headers) do
Enum.reduce(headers, <<>>, fn header, acc ->
{key, value, type} = inject_header_value_type(header)
key = to_string(key)
acc <> <<String.length(key)::size(8)>> <> key <> encode_header_value(value, type)
end)
end

defp inject_header_value_type(header = {_, _, _}), do: header
defp inject_header_value_type({key, value}), do: {key, value, :string}

defp encode_header_value(true, :boolean) do
<<0::integer-size(8)>>
end

defp encode_header_value(false, :boolean) do
<<1::integer-size(8)>>
end

defp encode_header_value(value, :byte) do
<<2::integer-size(8), value::big-signed-integer-size(8)>>
end

defp encode_header_value(value, :short) do
<<3::integer-size(8), value::big-signed-integer-size(16)>>
end

defp encode_header_value(value, :integer) do
<<4::integer-size(8), value::big-signed-integer-size(32)>>
end

defp encode_header_value(value, :long) do
<<5::integer-size(8), value::big-signed-integer-size(64)>>
end

defp encode_header_value(value, :byte_array) do
<<6::integer-size(8), byte_size(value)::big-signed-size(16), value::binary>>
end

defp encode_header_value(value, :string) do
<<7::integer-size(8), byte_size(value)::size(16), value::binary>>
end

defp encode_header_value({{year, month, day}, {hour, minute, second}}, :timestamp) do
Date.new!(year, month, day)
|> DateTime.new!(Time.new!(hour, minute, second))
|> encode_header_value(:timestamp)
end

defp encode_header_value(value, :timestamp) do
value = DateTime.to_unix(value, :millisecond)
<<8::integer-size(8), value::big-signed-integer-size(64)>>
end

defp encode_header_value(value) when is_binary(value) do
<<7::size(8), byte_size(value)::size(16)>> <> value
defp encode_header_value(value, :uuid) do
<<9::integer-size(8), value::size(16)>>
end

@doc """
Expand All @@ -65,7 +125,7 @@ defmodule EventStream do
...> 105, 111, 110, 47, 106, 115, 111, 110, 123, 34, 102, 111, 111, 34, 58, 32, 34,
...> 98, 97, 114, 34, 125, 111, 162, 191, 59>>
...> EventStream.decode!(data)
{:ok, [{"content-type", "application/json"}], ~s/{"foo": "bar"}/}
{:ok, [{"content-type", "application/json", :string}], ~s/{"foo": "bar"}/}
"""
def decode!(event_stream_message) do
<<total_length::size(32), header_length::size(32), _prelude_crc::size(32),
Expand All @@ -81,19 +141,52 @@ defmodule EventStream do

defp decode_headers(<<>>, acc), do: Enum.reverse(acc)

defp decode_headers(
<<name_length::size(8), name::binary-size(name_length), 7::size(8),
value_length::size(16), value::binary-size(value_length), rest::binary>>,
acc
defp decode_headers(<<name_length::size(8), name::binary-size(name_length), rest::binary>>, acc) do
{value, type, rest} = decode_header_value(rest)
decode_headers(rest, [{name, value, type} | acc])
end

defp decode_header_value(<<0::size(8), rest::binary>>) do
{true, :boolean, rest}
end

defp decode_header_value(<<1::size(8), rest::binary>>) do
{false, :boolean, rest}
end

defp decode_header_value(<<2::size(8), value::big-signed-integer-size(8), rest::binary>>) do
{value, :byte, rest}
end

defp decode_header_value(<<3::size(8), value::big-signed-integer-size(16), rest::binary>>) do
{value, :short, rest}
end

defp decode_header_value(<<4::size(8), value::big-signed-integer-size(32), rest::binary>>) do
{value, :integer, rest}
end

defp decode_header_value(<<5::size(8), value::big-signed-integer-size(64), rest::binary>>) do
{value, :long, rest}
end

defp decode_header_value(
<<6::size(8), value_length::size(16), value::binary-size(value_length), rest::binary>>
) do
decode_headers(rest, [{name, value} | acc])
{value, :byte_array, rest}
end

defp decode_headers(
<<name_length::size(8), name::binary-size(name_length), 4::size(8), value::size(32),
rest::binary>>,
acc
defp decode_header_value(
<<7::size(8), value_length::size(16), value::binary-size(value_length), rest::binary>>
) do
decode_headers(rest, [{name, value} | acc])
{value, :string, rest}
end

defp decode_header_value(<<8::size(8), value::big-signed-integer-size(64), rest::binary>>) do
{DateTime.from_unix!(value, :millisecond), :timestamp, rest}
end

defp decode_header_value(<<9::size(8), value::binary-size(16), rest::binary>>) do
{value, :uuid, rest}
end
end
10 changes: 5 additions & 5 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%{
"earmark": {:hex, :earmark, "1.4.3", "364ca2e9710f6bff494117dbbd53880d84bebb692dafc3a78eb50aa3183f2bfd", [:mix], [], "hexpm"},
"ex_doc": {:hex, :ex_doc, "0.21.3", "857ec876b35a587c5d9148a2512e952e24c24345552259464b98bfbb883c7b42", [:mix], [{:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"},
"makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm"},
"earmark": {:hex, :earmark, "1.4.3", "364ca2e9710f6bff494117dbbd53880d84bebb692dafc3a78eb50aa3183f2bfd", [:mix], [], "hexpm", "8cf8a291ebf1c7b9539e3cddb19e9cef066c2441b1640f13c34c1d3cfc825fec"},
"ex_doc": {:hex, :ex_doc, "0.21.3", "857ec876b35a587c5d9148a2512e952e24c24345552259464b98bfbb883c7b42", [:mix], [{:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "0db1ee8d1547ab4877c5b5dffc6604ef9454e189928d5ba8967d4a58a801f161"},
"makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a10c6eb62cca416019663129699769f0c2ccf39428b3bb3c0cb38c718a0c186d"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"},
}
31 changes: 27 additions & 4 deletions test/event_stream_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Tests taken from
# https://github.com/awslabs/aws-c-event-stream/blob/3bc33662f9ccff4f4cbcf9509cc78c26e022fde0/tests/message_serializer_test.c
# https://github.com/mildsunrise/a4s/blob/master/test/events.test.ts
defmodule EventStreamTest do
alias EventStream
use ExUnit.Case
Expand Down Expand Up @@ -59,14 +60,36 @@ defmodule EventStreamTest do
assert encoded == expected
end

test "encodes an event with binary and uint64" do
expected =
<<0x0, 0x0, 0x0, 0x53, 0x0, 0x0, 0x0, 0x43, 0xF5, 0x44, 0x7A, 0x58, 0x5, 0x3A, 0x64, 0x61,
0x74, 0x65, 0x8, 0x0, 0x0, 0x1, 0x68, 0x97, 0x52, 0x43, 0xB, 0x10, 0x3A, 0x63, 0x68, 0x75,
0x6E, 0x6B, 0x2D, 0x73, 0x69, 0x67, 0x6E, 0x61, 0x74, 0x75, 0x72, 0x65, 0x6, 0x0, 0x20,
0xAD, 0xE9, 0x9C, 0xBE, 0xBB, 0x29, 0xB0, 0x10, 0xAD, 0x92, 0xAC, 0xBA, 0x7F, 0xCD, 0x50,
0x48, 0xE5, 0xE1, 0xA7, 0xF3, 0x78, 0xDD, 0x7, 0x0, 0x9A, 0x45, 0x40, 0x49, 0x3, 0xE7,
0x9A, 0xD, 0xA7, 0xEA, 0x8E, 0xA2>>

data = <<>>

headers = [
{":date", DateTime.from_unix!(1_548_726_977_291, :millisecond), :timestamp},
{":chunk-signature",
Base.decode16!("ade99cbebb29b010ad92acba7fcd5048e5e1a7f378dd07009a45404903e79a0d",
case: :lower
), :byte_array}
]

assert EventStream.encode!(data, headers) == expected
end

test "encodes integer headers" do
expected =
<<0x00, 0x00, 0x00, 0x2B, 0x00, 0x00, 0x00, 0x0E, 0x34, 0x8B, 0xEC, 0x7B, 0x08, ?e, ?v, ?e,
?n, ?t, ?-, ?i, ?d, 0x04, 0x00, 0x00, 0xA0, 0x0C, 0x7B, 0x27, 0x66, 0x6F, 0x6F, 0x27,
0x3A, 0x27, 0x62, 0x61, 0x72, 0x27, 0x7D, 0xD3, 0x89, 0x02, 0x85>>

data = "{'foo':'bar'}"
headers = ["event-id": 40972]
headers = [{"event-id", 40972, :integer}]

encoded = EventStream.encode!(data, headers)
assert encoded == expected
Expand All @@ -89,7 +112,7 @@ defmodule EventStreamTest do
?t, ?i, ?o, ?n, ?/, ?j, ?s, ?o, ?n, 0x7B, 0x27, 0x66, 0x6F, 0x6F, 0x27, 0x3A, 0x27,
0x62, 0x61, 0x72, 0x27, 0x7D, 0x8D, 0x9C, 0x08, 0xB1>>

headers = [{"content-type", "application/json"}]
headers = [{"content-type", "application/json", :string}]

assert EventStream.decode!(data) == {:ok, headers, "{'foo':'bar'}"}
end
Expand All @@ -102,7 +125,7 @@ defmodule EventStreamTest do
?n, ?t, 0x7B, 0x27, 0x66, 0x6F, 0x6F, 0x27, 0x3A, 0x27, 0x62, 0x61, 0x72, 0x27, 0x7D,
0xDA, 0x48, 0xBD, 0xAD>>

headers = [{"event-type", "AudioEvent"}, {"message-type", "event"}]
headers = [{"event-type", "AudioEvent", :string}, {"message-type", "event", :string}]
assert EventStream.decode!(data) == {:ok, headers, "{'foo':'bar'}"}
end

Expand All @@ -112,7 +135,7 @@ defmodule EventStreamTest do
?e, ?n, ?t, ?-, ?i, ?d, 0x04, 0x00, 0x00, 0xA0, 0x0C, 0x7B, 0x27, 0x66, 0x6F, 0x6F,
0x27, 0x3A, 0x27, 0x62, 0x61, 0x72, 0x27, 0x7D, 0xD3, 0x89, 0x02, 0x85>>

headers = [{"event-id", 40972}]
headers = [{"event-id", 40972, :integer}]
assert EventStream.decode!(data) == {:ok, headers, "{'foo':'bar'}"}
end
end
Expand Down