diff --git a/lib/tesla/adapter/mint.ex b/lib/tesla/adapter/mint.ex index 40e29b9f..d9644eab 100644 --- a/lib/tesla/adapter/mint.ex +++ b/lib/tesla/adapter/mint.ex @@ -53,6 +53,7 @@ if Code.ensure_loaded?(Mint.HTTP) do alias Mint.HTTP @default timeout: 2_000, body_as: :plain, close_conn: true + @http2_request_chunk_size 16_384 @tags [:tcp_error, :ssl_error, :tcp_closed, :ssl_closed, :tcp, :ssl] @@ -121,8 +122,8 @@ if Code.ensure_loaded?(Mint.HTTP) do path <- prepare_path(uri.path, uri.query), opts <- check_original(uri, opts), {:ok, conn, opts} <- open_conn(uri, opts), - {:ok, conn, ref} <- make_request(conn, method, path, headers, body) do - format_response(conn, ref, opts) + {:ok, conn, ref, response} <- make_request(conn, method, path, headers, body, opts) do + format_response(conn, ref, opts, response) end end @@ -170,7 +171,7 @@ if Code.ensure_loaded?(Mint.HTTP) do end end - defp make_request(conn, method, path, headers, body) when is_function(body) do + defp make_request(conn, method, path, headers, body, opts) when is_function(body) do with {:ok, conn, ref} <- HTTP.request( conn, @@ -179,46 +180,68 @@ if Code.ensure_loaded?(Mint.HTTP) do headers, :stream ), - {:ok, conn} <- stream_request(conn, ref, body) do - {:ok, conn, ref} + {:ok, conn, response} <- stream_request(conn, ref, body, opts) do + {:ok, conn, ref, response} end end - defp make_request(conn, method, path, headers, body) do + defp make_request(conn, method, path, headers, body, opts) + when is_binary(body) or is_list(body) do + body_length = IO.iodata_length(body) + + if HTTP.protocol(conn) == :http2 and body_length > 0 do + headers = put_default_content_length_header(headers, body_length) + body = IO.iodata_to_binary(body) + make_request(conn, method, path, headers, stream_to_fun([body]), opts) + else + case HTTP.request(conn, method, path, headers, body) do + {:ok, conn, ref} -> + {:ok, conn, ref, %{}} + + {:error, _conn, error} -> + {:error, error} + end + end + end + + defp make_request(conn, method, path, headers, body, _opts) do case HTTP.request(conn, method, path, headers, body) do {:ok, conn, ref} -> - {:ok, conn, ref} + {:ok, conn, ref, %{}} {:error, _conn, error} -> {:error, error} end end - defp stream_request(conn, ref, fun) do - case next_chunk(fun) do - {:ok, item, fun} when is_list(item) -> - chunk = List.to_string(item) - {:ok, conn} = HTTP.stream_request_body(conn, ref, chunk) - stream_request(conn, ref, fun) + defp stream_request(conn, ref, fun, opts, acc \\ %{}) + + defp stream_request(conn, _ref, _fun, _opts, %{done: true} = acc), do: {:ok, conn, acc} + defp stream_request(conn, ref, fun, opts, acc) do + case next_chunk(fun) do {:ok, item, fun} -> - {:ok, conn} = HTTP.stream_request_body(conn, ref, item) - stream_request(conn, ref, fun) + with {:ok, conn, acc} <- stream_request_body(conn, ref, item, opts, acc) do + stream_request(conn, ref, fun, opts, acc) + end :eof -> - HTTP.stream_request_body(conn, ref, :eof) + case HTTP.stream_request_body(conn, ref, :eof) do + {:ok, conn} -> {:ok, conn, acc} + {:error, _conn, error} -> {:error, error} + end end end - defp format_response(conn, ref, %{body_as: :plain} = opts) do - with {:ok, response} <- receive_responses(conn, ref, opts) do + defp format_response(conn, ref, %{body_as: :plain} = opts, response) do + with {:ok, response} <- receive_responses(conn, ref, opts, response) do {:ok, response[:status], response[:headers], response[:data]} end end - defp format_response(conn, ref, %{body_as: :chunks} = opts) do + defp format_response(conn, ref, %{body_as: :chunks} = opts, response) do with {:ok, conn, %{status: status, headers: headers} = acc} <- - receive_headers_and_status(conn, ref, opts), + receive_headers_and_status(conn, ref, opts, response), {state, data} <- response_state(acc) do {:ok, conn} = @@ -232,10 +255,10 @@ if Code.ensure_loaded?(Mint.HTTP) do end end - defp format_response(conn, ref, %{body_as: :stream} = opts) do + defp format_response(conn, ref, %{body_as: :stream} = opts, response) do # there can be some data already with {:ok, conn, %{status: status, headers: headers} = acc} <- - receive_headers_and_status(conn, ref, opts) do + receive_headers_and_status(conn, ref, opts, response) do body_as_stream = Stream.resource( fn -> %{conn: conn, data: acc[:data], done: acc[:done]} end, @@ -274,14 +297,15 @@ if Code.ensure_loaded?(Mint.HTTP) do end end - defp receive_responses(conn, ref, opts, acc \\ %{}) do - with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc), - :ok <- check_data_size(acc, conn, opts) do + defp receive_responses(conn, ref, opts, acc) do + with :ok <- check_data_size(acc, conn, opts) do if acc[:done] do if opts[:close_conn], do: {:ok, _conn} = close(conn) {:ok, acc} else - receive_responses(conn, ref, opts, acc) + with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc) do + receive_responses(conn, ref, opts, acc) + end end end end @@ -298,13 +322,15 @@ if Code.ensure_loaded?(Mint.HTTP) do defp check_data_size(_, _, _), do: :ok - defp receive_headers_and_status(conn, ref, opts, acc \\ %{}) do - with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc) do - case acc do - %{status: _status, headers: _headers} -> {:ok, conn, acc} - # if we don't have status or headers we try to get them in next packet - _ -> receive_headers_and_status(conn, ref, opts, acc) - end + defp receive_headers_and_status(conn, ref, opts, acc) do + case acc do + %{status: _status, headers: _headers} -> + {:ok, conn, acc} + + _ -> + with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc) do + receive_headers_and_status(conn, ref, opts, acc) + end end end @@ -353,6 +379,110 @@ if Code.ensure_loaded?(Mint.HTTP) do defp raise_stream_error(error) when is_binary(error), do: raise(RuntimeError, message: error) defp raise_stream_error(error), do: raise(RuntimeError, message: inspect(error)) + defp put_default_content_length_header(headers, body_length) do + if has_header?(headers, "content-length") do + headers + else + [{"content-length", Integer.to_string(body_length)} | headers] + end + end + + defp has_header?(headers, expected_name) do + Enum.any?(headers, fn {name, _value} -> + String.downcase(name) == expected_name + end) + end + + defp stream_request_body(conn, ref, chunk, opts, acc) when is_binary(chunk) do + stream_request_body_chunk(conn, ref, chunk, opts, acc) + end + + defp stream_request_body(conn, ref, chunk, opts, acc) + when is_integer(chunk) and chunk >= 0 and chunk <= 255 do + stream_request_body_chunk(conn, ref, <>, opts, acc) + end + + defp stream_request_body(conn, ref, chunk, opts, acc) when is_list(chunk) do + chunk + |> IO.iodata_to_binary() + |> then(&stream_request_body_chunk(conn, ref, &1, opts, acc)) + end + + defp stream_request_body(conn, ref, chunk, opts, acc) do + case HTTP.protocol(conn) do + :http2 -> + chunk + |> IO.iodata_to_binary() + |> then(&stream_request_body_chunk(conn, ref, &1, opts, acc)) + + _ -> + case HTTP.stream_request_body(conn, ref, chunk) do + {:ok, conn} -> {:ok, conn, acc} + {:error, _conn, error} -> {:error, error} + end + end + end + + defp stream_request_body_chunk(conn, _ref, "", _opts, acc), do: {:ok, conn, acc} + + defp stream_request_body_chunk(conn, ref, chunk, opts, acc) do + case HTTP.protocol(conn) do + :http2 -> + stream_http2_body_chunk( + conn, + ref, + chunk, + opts, + acc, + min(byte_size(chunk), @http2_request_chunk_size) + ) + + _ -> + case HTTP.stream_request_body(conn, ref, chunk) do + {:ok, conn} -> {:ok, conn, acc} + {:error, _conn, error} -> {:error, error} + end + end + end + + defp stream_http2_body_chunk(conn, _ref, "", _opts, acc, _chunk_size), do: {:ok, conn, acc} + + defp stream_http2_body_chunk(conn, ref, chunk, opts, acc, chunk_size) do + chunk_size = min(byte_size(chunk), chunk_size) + <> = chunk + + case HTTP.stream_request_body(conn, ref, body_chunk) do + {:ok, conn} -> + stream_http2_body_chunk( + conn, + ref, + rest, + opts, + acc, + min(byte_size(rest), @http2_request_chunk_size) + ) + + {:error, conn, %Mint.HTTPError{reason: {:exceeds_window_size, _, 0}}} -> + await_request_window(conn, ref, chunk, opts, acc, chunk_size) + + {:error, conn, %Mint.HTTPError{reason: {:exceeds_window_size, _, window_size}}} -> + stream_http2_body_chunk(conn, ref, chunk, opts, acc, window_size) + + {:error, _conn, error} -> + {:error, error} + end + end + + defp await_request_window(conn, ref, chunk, opts, acc, chunk_size) do + with {:ok, conn, acc} <- receive_packet(conn, ref, opts, acc) do + if acc[:done] do + {:ok, conn, acc} + else + stream_http2_body_chunk(conn, ref, chunk, opts, acc, chunk_size) + end + end + end + defp reduce_responses(responses, ref, acc) do case Enum.reduce_while(responses, acc, &reduce_response(&1, ref, &2)) do {:error, _} = error -> error diff --git a/test/support/mint_early_response_handler.ex b/test/support/mint_early_response_handler.ex new file mode 100644 index 00000000..5d6df02f --- /dev/null +++ b/test/support/mint_early_response_handler.ex @@ -0,0 +1,6 @@ +defmodule Tesla.TestSupport.MintEarlyResponseHandler do + def init(req, state) do + req = :cowboy_req.reply(200, %{"content-type" => "text/plain"}, "early response", req) + {:ok, req, state} + end +end diff --git a/test/tesla/adapter/mint_test.exs b/test/tesla/adapter/mint_test.exs index e002637f..333792ec 100644 --- a/test/tesla/adapter/mint_test.exs +++ b/test/tesla/adapter/mint_test.exs @@ -10,6 +10,8 @@ defmodule Tesla.Adapter.MintTest do use Tesla.AdapterCase.Multipart use Tesla.AdapterCase.StreamRequestBody + @large_http2_request_size 70_000 + use Tesla.AdapterCase.SSL, transport_opts: [ cacertfile: Path.join([to_string(:code.priv_dir(:httparrot)), "/ssl/server-ca.crt"]) @@ -251,6 +253,165 @@ defmodule Tesla.Adapter.MintTest do end end + describe "issue #394 - handle HTTP/2 request flow control" do + test "preserves automatic content-length for non-empty HTTP/2 request bodies" do + body = "hello" + + request = %Env{ + method: :post, + url: "#{@https}/post", + headers: [{"content-type", "text/plain"}], + body: body + } + + assert {:ok, %Env{} = response} = + call(request, + protocols: [:http2], + transport_opts: [cacertfile: httparrot_cacertfile()] + ) + + assert response.status == 200 + assert posted_data(response.body) == body + assert posted_headers(response.body)["content-length"] == Integer.to_string(byte_size(body)) + end + + test "handles request bodies larger than the flow control window" do + body = String.duplicate("a", @large_http2_request_size) + + request = %Env{ + method: :post, + url: "#{@https}/post", + headers: [{"content-type", "text/plain"}], + body: body + } + + assert {:ok, %Env{} = response} = + call(request, + protocols: [:http2], + transport_opts: [cacertfile: httparrot_cacertfile()] + ) + + assert response.status == 200 + assert posted_data(response.body) == body + end + + test "handles streamed request bodies larger than the flow control window" do + body = large_streamed_http2_body() + expected = String.duplicate("a", @large_http2_request_size) + + request = %Env{ + method: :post, + url: "#{@https}/post", + headers: [{"content-type", "text/plain"}], + body: body + } + + assert {:ok, %Env{} = response} = + call(request, + protocols: [:http2], + transport_opts: [cacertfile: httparrot_cacertfile()] + ) + + assert response.status == 200 + assert posted_data(response.body) == expected + end + end + + describe "issue #394 - handle early HTTP/2 responses during upload" do + setup do + listener_ref = :"mint-early-response-#{System.unique_integer([:positive])}" + dispatch = early_response_dispatch() + priv_dir = :code.priv_dir(:httparrot) + + {:ok, _pid} = + :cowboy.start_tls( + listener_ref, + [ + port: 0, + certfile: priv_dir ++ ~c"/ssl/server.crt", + keyfile: priv_dir ++ ~c"/ssl/server.key" + ], + %{env: %{dispatch: dispatch}} + ) + + on_exit(fn -> :cowboy.stop_listener(listener_ref) end) + + {_, port} = :ranch.get_addr(listener_ref) + + {:ok, + early_response_url: "https://localhost:#{port}", + early_response_cacertfile: Path.join([to_string(priv_dir), "ssl/server-ca.crt"])} + end + + test "returns the response body without waiting for another packet", %{ + early_response_url: early_response_url, + early_response_cacertfile: early_response_cacertfile + } do + request = %Env{ + method: :post, + url: "#{early_response_url}/early-response", + headers: [{"content-type", "text/plain"}], + body: String.duplicate("a", @large_http2_request_size) + } + + assert {:ok, %Env{} = response} = + call(request, + protocols: [:http2], + timeout: 200, + transport_opts: [cacertfile: early_response_cacertfile] + ) + + assert response.status == 200 + assert response.body == "early response" + end + + test "returns chunked responses that already finished during upload", %{ + early_response_url: early_response_url, + early_response_cacertfile: early_response_cacertfile + } do + request = %Env{ + method: :post, + url: "#{early_response_url}/early-response", + headers: [{"content-type", "text/plain"}], + body: String.duplicate("a", @large_http2_request_size) + } + + assert {:ok, %Env{} = response} = + call(request, + body_as: :chunks, + protocols: [:http2], + timeout: 200, + transport_opts: [cacertfile: early_response_cacertfile] + ) + + assert response.status == 200 + assert %{body: {:fin, "early response"}} = response.body + end + + test "returns streamed responses that already finished during upload", %{ + early_response_url: early_response_url, + early_response_cacertfile: early_response_cacertfile + } do + request = %Env{ + method: :post, + url: "#{early_response_url}/early-response", + headers: [{"content-type", "text/plain"}], + body: String.duplicate("a", @large_http2_request_size) + } + + assert {:ok, %Env{} = response} = + call(request, + body_as: :stream, + protocols: [:http2], + timeout: 200, + transport_opts: [cacertfile: early_response_cacertfile] + ) + + assert response.status == 200 + assert Enum.join(response.body) == "early response" + end + end + def read_body(conn, _ref, _opts, {:fin, body}), do: {:ok, conn, body} def read_body(conn, ref, opts, {:nofin, acc}), @@ -516,6 +677,39 @@ defmodule Tesla.Adapter.MintTest do end end + defp large_streamed_http2_body do + chunks = + List.duplicate(String.duplicate("a", 8_192), div(@large_http2_request_size, 8_192)) + + chunks = + case rem(@large_http2_request_size, 8_192) do + 0 -> chunks + remainder -> chunks ++ [String.duplicate("a", remainder)] + end + + Stream.map(chunks, & &1) + end + + defp posted_data(body) do + body + |> posted_response() + |> Map.fetch!("data") + end + + defp posted_headers(body) do + body + |> posted_response() + |> Map.fetch!("headers") + end + + defp posted_response(body) do + Jason.decode!(body) + end + + defp httparrot_cacertfile do + Path.join([to_string(:code.priv_dir(:httparrot)), "ssl/server-ca.crt"]) + end + describe "issue #450 - handle missing Mint response types" do setup do listener_ref = @push_promise_listener_ref @@ -671,4 +865,10 @@ defmodule Tesla.Adapter.MintTest do recv_until_response(conn, match?, timeout, attempts - 1, responses) end end + + defp early_response_dispatch do + :cowboy_router.compile([ + {:_, [{"/early-response", Tesla.TestSupport.MintEarlyResponseHandler, []}]} + ]) + end end