Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 163 additions & 33 deletions lib/tesla/adapter/mint.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ if Code.ensure_loaded?(Mint.HTTP) do
alias Mint.HTTP

@default timeout: 2_000, body_as: :plain, close_conn: true
@http2_request_chunk_size 16_384

@tags [:tcp_error, :ssl_error, :tcp_closed, :ssl_closed, :tcp, :ssl]

Expand Down Expand Up @@ -121,8 +122,8 @@ if Code.ensure_loaded?(Mint.HTTP) do
path <- prepare_path(uri.path, uri.query),
opts <- check_original(uri, opts),
{:ok, conn, opts} <- open_conn(uri, opts),
{:ok, conn, ref} <- make_request(conn, method, path, headers, body) do
format_response(conn, ref, opts)
{:ok, conn, ref, response} <- make_request(conn, method, path, headers, body, opts) do
format_response(conn, ref, opts, response)
end
end

Expand Down Expand Up @@ -170,7 +171,7 @@ if Code.ensure_loaded?(Mint.HTTP) do
end
end

defp make_request(conn, method, path, headers, body) when is_function(body) do
defp make_request(conn, method, path, headers, body, opts) when is_function(body) do
with {:ok, conn, ref} <-
HTTP.request(
conn,
Expand All @@ -179,46 +180,68 @@ if Code.ensure_loaded?(Mint.HTTP) do
headers,
:stream
),
{:ok, conn} <- stream_request(conn, ref, body) do
{:ok, conn, ref}
{:ok, conn, response} <- stream_request(conn, ref, body, opts) do
{:ok, conn, ref, response}
end
end

defp make_request(conn, method, path, headers, body) do
defp make_request(conn, method, path, headers, body, opts)
when is_binary(body) or is_list(body) do
body_length = IO.iodata_length(body)

if HTTP.protocol(conn) == :http2 and body_length > 0 do
headers = put_default_content_length_header(headers, body_length)
body = IO.iodata_to_binary(body)
make_request(conn, method, path, headers, stream_to_fun([body]), opts)
else
case HTTP.request(conn, method, path, headers, body) do
{:ok, conn, ref} ->
{:ok, conn, ref, %{}}

{:error, _conn, error} ->
{:error, error}
end
end
end
Comment thread
yordis marked this conversation as resolved.

defp make_request(conn, method, path, headers, body, _opts) do
case HTTP.request(conn, method, path, headers, body) do
{:ok, conn, ref} ->
{:ok, conn, ref}
{:ok, conn, ref, %{}}

{:error, _conn, error} ->
{:error, error}
end
end

defp stream_request(conn, ref, fun) do
Comment thread
cursor[bot] marked this conversation as resolved.
case next_chunk(fun) do
{:ok, item, fun} when is_list(item) ->
chunk = List.to_string(item)
{:ok, conn} = HTTP.stream_request_body(conn, ref, chunk)
stream_request(conn, ref, fun)
defp stream_request(conn, ref, fun, opts, acc \\ %{})

defp stream_request(conn, _ref, _fun, _opts, %{done: true} = acc), do: {:ok, conn, acc}

defp stream_request(conn, ref, fun, opts, acc) do
case next_chunk(fun) do
{:ok, item, fun} ->
{:ok, conn} = HTTP.stream_request_body(conn, ref, item)
stream_request(conn, ref, fun)
with {:ok, conn, acc} <- stream_request_body(conn, ref, item, opts, acc) do
stream_request(conn, ref, fun, opts, acc)
end

:eof ->
HTTP.stream_request_body(conn, ref, :eof)
case HTTP.stream_request_body(conn, ref, :eof) do
{:ok, conn} -> {:ok, conn, acc}
{:error, _conn, error} -> {:error, error}
end
end
end

defp format_response(conn, ref, %{body_as: :plain} = opts) do
with {:ok, response} <- receive_responses(conn, ref, opts) do
defp format_response(conn, ref, %{body_as: :plain} = opts, response) do
with {:ok, response} <- receive_responses(conn, ref, opts, response) do
{:ok, response[:status], response[:headers], response[:data]}
end
end

defp format_response(conn, ref, %{body_as: :chunks} = opts) do
defp format_response(conn, ref, %{body_as: :chunks} = opts, response) do
with {:ok, conn, %{status: status, headers: headers} = acc} <-
receive_headers_and_status(conn, ref, opts),
receive_headers_and_status(conn, ref, opts, response),
{state, data} <-
response_state(acc) do
{:ok, conn} =
Expand All @@ -232,10 +255,10 @@ if Code.ensure_loaded?(Mint.HTTP) do
end
end

defp format_response(conn, ref, %{body_as: :stream} = opts) do
defp format_response(conn, ref, %{body_as: :stream} = opts, response) do
# there can be some data already
with {:ok, conn, %{status: status, headers: headers} = acc} <-
receive_headers_and_status(conn, ref, opts) do
receive_headers_and_status(conn, ref, opts, response) do
body_as_stream =
Stream.resource(
fn -> %{conn: conn, data: acc[:data], done: acc[:done]} end,
Expand Down Expand Up @@ -274,14 +297,15 @@ if Code.ensure_loaded?(Mint.HTTP) do
end
end

defp receive_responses(conn, ref, opts, acc \\ %{}) do
with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc),
:ok <- check_data_size(acc, conn, opts) do
defp receive_responses(conn, ref, opts, acc) do
with :ok <- check_data_size(acc, conn, opts) do
if acc[:done] do
if opts[:close_conn], do: {:ok, _conn} = close(conn)
{:ok, acc}
else
receive_responses(conn, ref, opts, acc)
with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc) do
receive_responses(conn, ref, opts, acc)
end
end
end
end
Expand All @@ -298,13 +322,15 @@ if Code.ensure_loaded?(Mint.HTTP) do

defp check_data_size(_, _, _), do: :ok

defp receive_headers_and_status(conn, ref, opts, acc \\ %{}) do
Comment thread
yordis marked this conversation as resolved.
with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc) do
case acc do
%{status: _status, headers: _headers} -> {:ok, conn, acc}
# if we don't have status or headers we try to get them in next packet
_ -> receive_headers_and_status(conn, ref, opts, acc)
end
defp receive_headers_and_status(conn, ref, opts, acc) do
case acc do
%{status: _status, headers: _headers} ->
{:ok, conn, acc}

_ ->
with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc) do
receive_headers_and_status(conn, ref, opts, acc)
end
end
end

Expand Down Expand Up @@ -353,6 +379,110 @@ if Code.ensure_loaded?(Mint.HTTP) do
defp raise_stream_error(error) when is_binary(error), do: raise(RuntimeError, message: error)
defp raise_stream_error(error), do: raise(RuntimeError, message: inspect(error))

defp put_default_content_length_header(headers, body_length) do
if has_header?(headers, "content-length") do
headers
else
[{"content-length", Integer.to_string(body_length)} | headers]
end
end

defp has_header?(headers, expected_name) do
Enum.any?(headers, fn {name, _value} ->
String.downcase(name) == expected_name
end)
end

defp stream_request_body(conn, ref, chunk, opts, acc) when is_binary(chunk) do
stream_request_body_chunk(conn, ref, chunk, opts, acc)
end

defp stream_request_body(conn, ref, chunk, opts, acc)
when is_integer(chunk) and chunk >= 0 and chunk <= 255 do
stream_request_body_chunk(conn, ref, <<chunk>>, opts, acc)
end

defp stream_request_body(conn, ref, chunk, opts, acc) when is_list(chunk) do
chunk
|> IO.iodata_to_binary()
|> then(&stream_request_body_chunk(conn, ref, &1, opts, acc))
end

defp stream_request_body(conn, ref, chunk, opts, acc) do
case HTTP.protocol(conn) do
:http2 ->
chunk
|> IO.iodata_to_binary()
|> then(&stream_request_body_chunk(conn, ref, &1, opts, acc))

_ ->
case HTTP.stream_request_body(conn, ref, chunk) do
{:ok, conn} -> {:ok, conn, acc}
{:error, _conn, error} -> {:error, error}
end
end
end

defp stream_request_body_chunk(conn, _ref, "", _opts, acc), do: {:ok, conn, acc}

defp stream_request_body_chunk(conn, ref, chunk, opts, acc) do
case HTTP.protocol(conn) do
:http2 ->
stream_http2_body_chunk(
conn,
ref,
chunk,
opts,
acc,
min(byte_size(chunk), @http2_request_chunk_size)
)

_ ->
case HTTP.stream_request_body(conn, ref, chunk) do
{:ok, conn} -> {:ok, conn, acc}
{:error, _conn, error} -> {:error, error}
end
end
end

defp stream_http2_body_chunk(conn, _ref, "", _opts, acc, _chunk_size), do: {:ok, conn, acc}

defp stream_http2_body_chunk(conn, ref, chunk, opts, acc, chunk_size) do
chunk_size = min(byte_size(chunk), chunk_size)
<<body_chunk::binary-size(chunk_size), rest::binary>> = chunk

case HTTP.stream_request_body(conn, ref, body_chunk) do
{:ok, conn} ->
stream_http2_body_chunk(
conn,
ref,
rest,
opts,
acc,
min(byte_size(rest), @http2_request_chunk_size)
)

{:error, conn, %Mint.HTTPError{reason: {:exceeds_window_size, _, 0}}} ->
await_request_window(conn, ref, chunk, opts, acc, chunk_size)

{:error, conn, %Mint.HTTPError{reason: {:exceeds_window_size, _, window_size}}} ->
stream_http2_body_chunk(conn, ref, chunk, opts, acc, window_size)

{:error, _conn, error} ->
{:error, error}
end
end

defp await_request_window(conn, ref, chunk, opts, acc, chunk_size) do
with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc) do
if acc[:done] do
{:ok, conn, acc}
else
stream_http2_body_chunk(conn, ref, chunk, opts, acc, chunk_size)
end
end
end

defp reduce_responses(responses, ref, acc) do
case Enum.reduce_while(responses, acc, &reduce_response(&1, ref, &2)) do
{:error, _} = error -> error
Expand Down
6 changes: 6 additions & 0 deletions test/support/mint_early_response_handler.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
defmodule Tesla.TestSupport.MintEarlyResponseHandler do
def init(req, state) do
req = :cowboy_req.reply(200, %{"content-type" => "text/plain"}, "early response", req)
{:ok, req, state}
end
end
Loading
Loading