diff --git a/lib/event_stream.ex b/lib/event_stream.ex index 8e6102e..f08e206 100644 --- a/lib/event_stream.ex +++ b/lib/event_stream.ex @@ -1,4 +1,18 @@ defmodule EventStream do + @type( + header_value_type :: :boolean, + :byte, + :short, + :integer, + :long, + :byte_array, + :string, + :timestamp, + :uuid + ) + @type header :: {binary(), any()} | {binary(), any(), header_value_type()} + @type headers :: [header] + @moduledoc """ Module for encoding to and decoding from the AWS EventStream Encoding format. @@ -6,7 +20,8 @@ defmodule EventStream do """ @doc """ - Encode a payload and headers to Event Stream format. + Encode a payload and headers to Event Stream format. Allows to specify + the header value type. ## Examples @@ -21,14 +36,9 @@ defmodule EventStream do 105, 111, 110, 47, 106, 115, 111, 110, 123, 34, 102, 111, 111, 34, 58, 32, 34, 98, 97, 114, 34, 125, 111, 162, 191, 59>> """ + @spec encode!(binary(), headers()) :: binary() def encode!(payload, headers \\ []) do - headers_binary = - Enum.reduce(headers, <<>>, fn {key, value}, acc -> - key = to_string(key) - - acc <> <> <> key <> encode_header_value(value) - end) - + headers_binary = encode_headers!(headers) header_length = byte_size(headers_binary) total_length = 4 + 4 + 4 + header_length + byte_size(payload) + 4 @@ -41,12 +51,62 @@ defmodule EventStream do message_without_crc <> <> end - defp encode_header_value(value) when is_number(value) do - <<4::size(8), value::size(32)>> + def encode_headers!(headers) do + Enum.reduce(headers, <<>>, fn header, acc -> + {key, value, type} = inject_header_value_type(header) + key = to_string(key) + acc <> <> <> key <> encode_header_value(value, type) + end) + end + + defp inject_header_value_type(header = {_, _, _}), do: header + defp inject_header_value_type({key, value}), do: {key, value, :string} + + defp encode_header_value(true, :boolean) do + <<0::integer-size(8)>> + end + + defp encode_header_value(false, :boolean) do + <<1::integer-size(8)>> + end + + defp encode_header_value(value, :byte) do + <<2::integer-size(8), value::big-signed-integer-size(8)>> + end + + defp encode_header_value(value, :short) do + <<3::integer-size(8), value::big-signed-integer-size(16)>> + end + + defp encode_header_value(value, :integer) do + <<4::integer-size(8), value::big-signed-integer-size(32)>> + end + + defp encode_header_value(value, :long) do + <<5::integer-size(8), value::big-signed-integer-size(64)>> + end + + defp encode_header_value(value, :byte_array) do + <<6::integer-size(8), byte_size(value)::big-signed-size(16), value::binary>> + end + + defp encode_header_value(value, :string) do + <<7::integer-size(8), byte_size(value)::size(16), value::binary>> + end + + defp encode_header_value({{year, month, day}, {hour, minute, second}}, :timestamp) do + Date.new!(year, month, day) + |> DateTime.new!(Time.new!(hour, minute, second)) + |> encode_header_value(:timestamp) + end + + defp encode_header_value(value, :timestamp) do + value = DateTime.to_unix(value, :millisecond) + <<8::integer-size(8), value::big-signed-integer-size(64)>> end - defp encode_header_value(value) when is_binary(value) do - <<7::size(8), byte_size(value)::size(16)>> <> value + defp encode_header_value(value, :uuid) do + <<9::integer-size(8), value::size(16)>> end @doc """ @@ -65,7 +125,7 @@ defmodule EventStream do ...> 105, 111, 110, 47, 106, 115, 111, 110, 123, 34, 102, 111, 111, 34, 58, 32, 34, ...> 98, 97, 114, 34, 125, 111, 162, 191, 59>> ...> EventStream.decode!(data) - {:ok, [{"content-type", "application/json"}], ~s/{"foo": "bar"}/} + {:ok, [{"content-type", "application/json", :string}], ~s/{"foo": "bar"}/} """ def decode!(event_stream_message) do <>, acc), do: Enum.reverse(acc) - defp decode_headers( - <>, - acc + defp decode_headers(<>, acc) do + {value, type, rest} = decode_header_value(rest) + decode_headers(rest, [{name, value, type} | acc]) + end + + defp decode_header_value(<<0::size(8), rest::binary>>) do + {true, :boolean, rest} + end + + defp decode_header_value(<<1::size(8), rest::binary>>) do + {false, :boolean, rest} + end + + defp decode_header_value(<<2::size(8), value::big-signed-integer-size(8), rest::binary>>) do + {value, :byte, rest} + end + + defp decode_header_value(<<3::size(8), value::big-signed-integer-size(16), rest::binary>>) do + {value, :short, rest} + end + + defp decode_header_value(<<4::size(8), value::big-signed-integer-size(32), rest::binary>>) do + {value, :integer, rest} + end + + defp decode_header_value(<<5::size(8), value::big-signed-integer-size(64), rest::binary>>) do + {value, :long, rest} + end + + defp decode_header_value( + <<6::size(8), value_length::size(16), value::binary-size(value_length), rest::binary>> ) do - decode_headers(rest, [{name, value} | acc]) + {value, :byte_array, rest} end - defp decode_headers( - <>, - acc + defp decode_header_value( + <<7::size(8), value_length::size(16), value::binary-size(value_length), rest::binary>> ) do - decode_headers(rest, [{name, value} | acc]) + {value, :string, rest} + end + + defp decode_header_value(<<8::size(8), value::big-signed-integer-size(64), rest::binary>>) do + {DateTime.from_unix!(value, :millisecond), :timestamp, rest} + end + + defp decode_header_value(<<9::size(8), value::binary-size(16), rest::binary>>) do + {value, :uuid, rest} end end diff --git a/mix.lock b/mix.lock index a7bf9b1..da4bf11 100644 --- a/mix.lock +++ b/mix.lock @@ -1,7 +1,7 @@ %{ - "earmark": {:hex, :earmark, "1.4.3", "364ca2e9710f6bff494117dbbd53880d84bebb692dafc3a78eb50aa3183f2bfd", [:mix], [], "hexpm"}, - "ex_doc": {:hex, :ex_doc, "0.21.3", "857ec876b35a587c5d9148a2512e952e24c24345552259464b98bfbb883c7b42", [:mix], [{:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm"}, - "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm"}, - "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, - "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm"}, + "earmark": {:hex, :earmark, "1.4.3", "364ca2e9710f6bff494117dbbd53880d84bebb692dafc3a78eb50aa3183f2bfd", [:mix], [], "hexpm", "8cf8a291ebf1c7b9539e3cddb19e9cef066c2441b1640f13c34c1d3cfc825fec"}, + "ex_doc": {:hex, :ex_doc, "0.21.3", "857ec876b35a587c5d9148a2512e952e24c24345552259464b98bfbb883c7b42", [:mix], [{:earmark, "~> 1.4", [hex: :earmark, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "0db1ee8d1547ab4877c5b5dffc6604ef9454e189928d5ba8967d4a58a801f161"}, + "makeup": {:hex, :makeup, "1.0.0", "671df94cf5a594b739ce03b0d0316aa64312cee2574b6a44becb83cd90fb05dc", [:mix], [{:nimble_parsec, "~> 0.5.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "a10c6eb62cca416019663129699769f0c2ccf39428b3bb3c0cb38c718a0c186d"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "d4b316c7222a85bbaa2fd7c6e90e37e953257ad196dc229505137c5e505e9eff"}, + "nimble_parsec": {:hex, :nimble_parsec, "0.5.3", "def21c10a9ed70ce22754fdeea0810dafd53c2db3219a0cd54cf5526377af1c6", [:mix], [], "hexpm", "589b5af56f4afca65217a1f3eb3fee7e79b09c40c742fddc1c312b3ac0b3399f"}, } diff --git a/test/event_stream_test.exs b/test/event_stream_test.exs index afc23a7..c06d1b6 100644 --- a/test/event_stream_test.exs +++ b/test/event_stream_test.exs @@ -1,5 +1,6 @@ # Tests taken from # https://github.com/awslabs/aws-c-event-stream/blob/3bc33662f9ccff4f4cbcf9509cc78c26e022fde0/tests/message_serializer_test.c +# https://github.com/mildsunrise/a4s/blob/master/test/events.test.ts defmodule EventStreamTest do alias EventStream use ExUnit.Case @@ -59,6 +60,28 @@ defmodule EventStreamTest do assert encoded == expected end + test "encodes an event with binary and uint64" do + expected = + <<0x0, 0x0, 0x0, 0x53, 0x0, 0x0, 0x0, 0x43, 0xF5, 0x44, 0x7A, 0x58, 0x5, 0x3A, 0x64, 0x61, + 0x74, 0x65, 0x8, 0x0, 0x0, 0x1, 0x68, 0x97, 0x52, 0x43, 0xB, 0x10, 0x3A, 0x63, 0x68, 0x75, + 0x6E, 0x6B, 0x2D, 0x73, 0x69, 0x67, 0x6E, 0x61, 0x74, 0x75, 0x72, 0x65, 0x6, 0x0, 0x20, + 0xAD, 0xE9, 0x9C, 0xBE, 0xBB, 0x29, 0xB0, 0x10, 0xAD, 0x92, 0xAC, 0xBA, 0x7F, 0xCD, 0x50, + 0x48, 0xE5, 0xE1, 0xA7, 0xF3, 0x78, 0xDD, 0x7, 0x0, 0x9A, 0x45, 0x40, 0x49, 0x3, 0xE7, + 0x9A, 0xD, 0xA7, 0xEA, 0x8E, 0xA2>> + + data = <<>> + + headers = [ + {":date", DateTime.from_unix!(1_548_726_977_291, :millisecond), :timestamp}, + {":chunk-signature", + Base.decode16!("ade99cbebb29b010ad92acba7fcd5048e5e1a7f378dd07009a45404903e79a0d", + case: :lower + ), :byte_array} + ] + + assert EventStream.encode!(data, headers) == expected + end + test "encodes integer headers" do expected = <<0x00, 0x00, 0x00, 0x2B, 0x00, 0x00, 0x00, 0x0E, 0x34, 0x8B, 0xEC, 0x7B, 0x08, ?e, ?v, ?e, @@ -66,7 +89,7 @@ defmodule EventStreamTest do 0x3A, 0x27, 0x62, 0x61, 0x72, 0x27, 0x7D, 0xD3, 0x89, 0x02, 0x85>> data = "{'foo':'bar'}" - headers = ["event-id": 40972] + headers = [{"event-id", 40972, :integer}] encoded = EventStream.encode!(data, headers) assert encoded == expected @@ -89,7 +112,7 @@ defmodule EventStreamTest do ?t, ?i, ?o, ?n, ?/, ?j, ?s, ?o, ?n, 0x7B, 0x27, 0x66, 0x6F, 0x6F, 0x27, 0x3A, 0x27, 0x62, 0x61, 0x72, 0x27, 0x7D, 0x8D, 0x9C, 0x08, 0xB1>> - headers = [{"content-type", "application/json"}] + headers = [{"content-type", "application/json", :string}] assert EventStream.decode!(data) == {:ok, headers, "{'foo':'bar'}"} end @@ -102,7 +125,7 @@ defmodule EventStreamTest do ?n, ?t, 0x7B, 0x27, 0x66, 0x6F, 0x6F, 0x27, 0x3A, 0x27, 0x62, 0x61, 0x72, 0x27, 0x7D, 0xDA, 0x48, 0xBD, 0xAD>> - headers = [{"event-type", "AudioEvent"}, {"message-type", "event"}] + headers = [{"event-type", "AudioEvent", :string}, {"message-type", "event", :string}] assert EventStream.decode!(data) == {:ok, headers, "{'foo':'bar'}"} end @@ -112,7 +135,7 @@ defmodule EventStreamTest do ?e, ?n, ?t, ?-, ?i, ?d, 0x04, 0x00, 0x00, 0xA0, 0x0C, 0x7B, 0x27, 0x66, 0x6F, 0x6F, 0x27, 0x3A, 0x27, 0x62, 0x61, 0x72, 0x27, 0x7D, 0xD3, 0x89, 0x02, 0x85>> - headers = [{"event-id", 40972}] + headers = [{"event-id", 40972, :integer}] assert EventStream.decode!(data) == {:ok, headers, "{'foo':'bar'}"} end end