diff --git a/src/h2_connection.erl b/src/h2_connection.erl index effbb57..7df98c6 100644 --- a/src/h2_connection.erl +++ b/src/h2_connection.erl @@ -50,6 +50,10 @@ %% second; exceeding the limit triggers GOAWAY(ENHANCE_YOUR_CALM). -define(PING_FLOOD_LIMIT_PER_SEC, 20). -define(RST_FLOOD_LIMIT_PER_SEC, 100). +%% RFC 9113 §10.5 / §6.5.2: cap the raw bytes of a HEADERS+CONTINUATION +%% block before HPACK decode, so a CONTINUATION flood cannot OOM the node +%% before max_header_list_size (which acts on decoded headers) can fire. +-define(MAX_HEADER_BLOCK_BYTES, 256 * 1024). %% Stream states per RFC 7540 Section 5.1 -record(stream, { @@ -60,6 +64,7 @@ send_buffer = <<>> :: binary(), pending_end_stream = false :: boolean(), header_buffer = [] :: iodata(), + header_buffer_size = 0 :: non_neg_integer(), request_headers = [] :: [{binary(), binary()}], response_headers = [] :: [{binary(), binary()}], %% Optional pid to receive body data for this stream (set_stream_handler). @@ -97,6 +102,11 @@ socket :: gen_tcp:socket() | ssl:sslsocket(), transport :: gen_tcp | ssl, owner :: pid(), + %% Monitor on the current owner, installed by controlling_process/2. + %% The initial owner is reached via the start_link bidirectional link; + %% subsequent owners are tracked via monitor so a dead owner is + %% detected even when no link exists. + owner_monitor = undefined :: undefined | reference(), buffer = <<>> :: binary(), %% Settings @@ -775,8 +785,11 @@ handle_frame(_StateName, {ping, Data}, State) -> case bump_flood_counter(ping, State) of {ok, State1} -> Frame = h2_frame:ping_ack(Data), - send_frame(Frame, State1), - {ok, connected, State1}; + case send_frame(Frame, State1) of + ok -> {ok, connected, State1}; + {error, Reason} -> + {stop, {shutdown, {send_failed, Reason}}, State1} + end; flood -> {error, enhance_your_calm, State} end; @@ -905,12 +918,16 @@ handle_settings(Settings, #state{mode = Mode, peer_settings = OldSettings} = Sta MergedSettings = h2_settings:merge(OldSettings, NewSettings), case apply_peer_settings(MergedSettings, State) of {ok, State1} -> - send_frame(h2_frame:settings_ack(), State1), - NewStateName = case State1#state.settings_acked of - true -> connected; - false -> settings - end, - {ok, NewStateName, State1}; + case send_frame(h2_frame:settings_ack(), State1) of + ok -> + NewStateName = case State1#state.settings_acked of + true -> connected; + false -> settings + end, + {ok, NewStateName, State1}; + {error, Reason} -> + {stop, {shutdown, {send_failed, Reason}}, State1} + end; {error, ErrCode} -> {error, ErrCode, State} end; @@ -993,7 +1010,8 @@ handle_headers(StreamId, HeaderBlock, EndStream, EndHeaders, _Priority, #state{m true -> decode_and_process_headers(StreamId, HeaderBlock, EndStream, State); false -> - Stream1 = Stream#stream{header_buffer = HeaderBlock}, + Stream1 = Stream#stream{header_buffer = HeaderBlock, + header_buffer_size = byte_size(HeaderBlock)}, State1 = put_stream(StreamId, Stream1, State), State2 = State1#state{expecting_continuation = {StreamId, EndStream}}, {ok, connected, State2} @@ -1044,7 +1062,8 @@ handle_headers_new(StreamId, HeaderBlock, EndStream, EndHeaders, Mode, State) -> decode_and_process_headers(StreamId, HeaderBlock, EndStream, State); false -> Stream = get_or_create_stream(StreamId, State), - Stream1 = Stream#stream{header_buffer = HeaderBlock}, + Stream1 = Stream#stream{header_buffer = HeaderBlock, + header_buffer_size = byte_size(HeaderBlock)}, State1 = put_stream(StreamId, Stream1, State), State2 = State1#state{expecting_continuation = {StreamId, EndStream}}, {ok, connected, State2} @@ -1094,21 +1113,30 @@ handle_continuation(StreamId, HeaderBlock, EndHeaders, case maps:find(StreamId, Streams) of {ok, #stream{state = closed}} -> {error, protocol_error, State}; - {ok, #stream{header_buffer = Buffer} = Stream} -> - NewBuffer = [Buffer, HeaderBlock], - case EndHeaders of + {ok, #stream{header_buffer = Buffer, + header_buffer_size = OldSize} = Stream} -> + NewSize = OldSize + byte_size(HeaderBlock), + case NewSize > ?MAX_HEADER_BLOCK_BYTES of true -> - Stream1 = Stream#stream{header_buffer = []}, - State1 = put_stream(StreamId, Stream1, State), - %% Restore the original END_STREAM flag captured when - %% HEADERS arrived; clear expecting_continuation. - {_, EndStream} = Expecting, - State2 = State1#state{expecting_continuation = undefined}, - decode_and_process_headers(StreamId, iolist_to_binary(NewBuffer), EndStream, State2); + {error, enhance_your_calm, State}; false -> - Stream1 = Stream#stream{header_buffer = NewBuffer}, - State1 = put_stream(StreamId, Stream1, State), - {ok, connected, State1} + NewBuffer = [Buffer, HeaderBlock], + case EndHeaders of + true -> + Stream1 = Stream#stream{header_buffer = [], + header_buffer_size = 0}, + State1 = put_stream(StreamId, Stream1, State), + %% Restore the original END_STREAM flag captured when + %% HEADERS arrived; clear expecting_continuation. + {_, EndStream} = Expecting, + State2 = State1#state{expecting_continuation = undefined}, + decode_and_process_headers(StreamId, iolist_to_binary(NewBuffer), EndStream, State2); + false -> + Stream1 = Stream#stream{header_buffer = NewBuffer, + header_buffer_size = NewSize}, + State1 = put_stream(StreamId, Stream1, State), + {ok, connected, State1} + end end; error -> {error, protocol_error, State} @@ -1881,7 +1909,9 @@ maybe_send_conn_window_update(_DataSize, #state{recv_conn_window_size = ConnWind case ConnWindow < Threshold of true -> ConnIncrement = Target - ConnWindow, - send_frame(h2_frame:window_update(0, ConnIncrement), State), + %% Best-effort: a dead socket will surface via {tcp_closed,_} / + %% {ssl_closed,_} momentarily and terminate the connection. + _ = send_frame(h2_frame:window_update(0, ConnIncrement), State), State#state{recv_conn_window_size = ConnWindow + ConnIncrement}; false -> State @@ -1894,7 +1924,8 @@ maybe_send_window_update(StreamId, _DataSize, #state{local_settings = Settings, case maps:find(StreamId, Streams) of {ok, #stream{recv_window_size = StreamWindow} = Stream} when StreamWindow < Threshold -> StreamIncrement = InitialWindow - StreamWindow, - send_frame(h2_frame:window_update(StreamId, StreamIncrement), State), + %% Best-effort: dead socket surfaces via {tcp_closed,_} / {ssl_closed,_}. + _ = send_frame(h2_frame:window_update(StreamId, StreamIncrement), State), Stream1 = Stream#stream{recv_window_size = StreamWindow + StreamIncrement}, State#state{streams = maps:put(StreamId, Stream1, Streams)}; _ -> @@ -1966,15 +1997,20 @@ handle_send_request(From, Method, Path, Headers, EndStream, #state{mode = client request_method = Method }, - send_header_block(StreamId, HeaderBlock, EndStream, State), - - State1 = State#state{ - encode_context = EncCtx1, - streams = maps:put(StreamId, Stream, State#state.streams), - next_stream_id = StreamId + 2 - }, - - {keep_state, State1, [{reply, From, {ok, StreamId}}]} + case send_header_block(StreamId, HeaderBlock, EndStream, State) of + ok -> + State1 = State#state{ + encode_context = EncCtx1, + streams = maps:put(StreamId, Stream, State#state.streams), + next_stream_id = StreamId + 2 + }, + {keep_state, State1, [{reply, From, {ok, StreamId}}]}; + {error, SendReason} -> + {stop_and_reply, + {shutdown, {send_failed, SendReason}}, + [{reply, From, {error, SendReason}}], + State} + end end end end @@ -2033,13 +2069,20 @@ handle_send_request_headers(From, Headers, EndStream, #state{mode = client, next protocol = Protocol, request_method = Method }, - send_header_block(StreamId, HeaderBlock, EndStream, State), - State1 = State#state{ - encode_context = EncCtx1, - streams = maps:put(StreamId, Stream, State#state.streams), - next_stream_id = StreamId + 2 - }, - {keep_state, State1, [{reply, From, {ok, StreamId}}]} + case send_header_block(StreamId, HeaderBlock, EndStream, State) of + ok -> + State1 = State#state{ + encode_context = EncCtx1, + streams = maps:put(StreamId, Stream, State#state.streams), + next_stream_id = StreamId + 2 + }, + {keep_state, State1, [{reply, From, {ok, StreamId}}]}; + {error, SendReason} -> + {stop_and_reply, + {shutdown, {send_failed, SendReason}}, + [{reply, From, {error, SendReason}}], + State} + end end end end @@ -2083,30 +2126,37 @@ handle_send_response(From, StreamId, Status, Headers, #state{mode = server, stre %% on the wire and the peer's stream state matches ours. BodyForbidden = ReqMethod =:= <<"HEAD">> orelse Status =:= 204 orelse Status =:= 304, - send_header_block(StreamId, HeaderBlock, BodyForbidden, State), - NewTunnel = IsConnectRequest andalso IsSuccess, - NewState = case BodyForbidden of - true -> - case StreamState of - open -> half_closed_local; - half_closed_remote -> closed - end; - false -> - StreamState - end, - Stream1 = Stream#stream{response_headers = AllHeaders, - tunnel = NewTunnel, - body_forbidden = BodyForbidden, - state = NewState}, - State1 = State#state{ - encode_context = EncCtx1, - streams = maps:put(StreamId, Stream1, Streams) - }, - State2 = case NewState of - closed -> close_stream(StreamId, end_stream, State1); - _ -> State1 - end, - {keep_state, State2, [{reply, From, ok}]} + case send_header_block(StreamId, HeaderBlock, BodyForbidden, State) of + ok -> + NewTunnel = IsConnectRequest andalso IsSuccess, + NewState = case BodyForbidden of + true -> + case StreamState of + open -> half_closed_local; + half_closed_remote -> closed + end; + false -> + StreamState + end, + Stream1 = Stream#stream{response_headers = AllHeaders, + tunnel = NewTunnel, + body_forbidden = BodyForbidden, + state = NewState}, + State1 = State#state{ + encode_context = EncCtx1, + streams = maps:put(StreamId, Stream1, Streams) + }, + State2 = case NewState of + closed -> close_stream(StreamId, end_stream, State1); + _ -> State1 + end, + {keep_state, State2, [{reply, From, ok}]}; + {error, SendReason} -> + {stop_and_reply, + {shutdown, {send_failed, SendReason}}, + [{reply, From, {error, SendReason}}], + State} + end end end end; @@ -2123,22 +2173,27 @@ handle_send_response(From, _StreamId, _Status, _Headers, State) -> %% SETTINGS_MAX_FRAME_SIZE advertised by the peer. If our encoded block is %% larger, split it across one HEADERS frame + one or more CONTINUATION %% frames. No other frames on this stream may interleave. +%% Returns ok | {error, Reason}. send_header_block(StreamId, HeaderBlock, EndStream, #state{peer_max_frame_size = MaxFrameSize} = State) -> case byte_size(HeaderBlock) =< MaxFrameSize of true -> send_frame(h2_frame:headers(StreamId, HeaderBlock, EndStream), State); false -> <> = HeaderBlock, - send_frame(h2_frame:headers(StreamId, First, EndStream, false), State), - send_continuations(StreamId, Rest, MaxFrameSize, State) + case send_frame(h2_frame:headers(StreamId, First, EndStream, false), State) of + ok -> send_continuations(StreamId, Rest, MaxFrameSize, State); + {error, _} = Err -> Err + end end. send_continuations(StreamId, Rest, MaxFrameSize, State) when byte_size(Rest) =< MaxFrameSize -> send_frame(h2_frame:continuation(StreamId, Rest, true), State); send_continuations(StreamId, Rest, MaxFrameSize, State) -> <> = Rest, - send_frame(h2_frame:continuation(StreamId, Chunk, false), State), - send_continuations(StreamId, More, MaxFrameSize, State). + case send_frame(h2_frame:continuation(StreamId, Chunk, false), State) of + ok -> send_continuations(StreamId, More, MaxFrameSize, State); + {error, _} = Err -> Err + end. %% RFC 7540 §8.3: forbidden on a 2xx CONNECT response. has_banned_tunnel_header(Headers) -> @@ -2170,47 +2225,53 @@ handle_send_data(From, StreamId, Data, EndStream, #state{streams = Streams, conn <> = Data, IsEnd = EndStream andalso Remaining == <<>> andalso Buffer == <<>>, Frame = h2_frame:data(StreamId, SendData, IsEnd), - send_frame(Frame, State), - - %% Update windows - NewConnWindow = ConnWindow - ToSend1, - NewStreamWindow = StreamWindow - ToSend1, + case send_frame(Frame, State) of + {error, SendReason} -> + {stop_and_reply, + {shutdown, {send_failed, SendReason}}, + [{reply, From, {error, SendReason}}], + State}; + ok -> + %% Update windows + NewConnWindow = ConnWindow - ToSend1, + NewStreamWindow = StreamWindow - ToSend1, - %% Update stream state; Remaining is handled via recursion below, - %% so it must not be appended to send_buffer (would cause duplication). - Stream1 = Stream#stream{ - window_size = NewStreamWindow, - send_buffer = Buffer, - pending_end_stream = EndStream andalso Remaining =/= <<>>, - state = case IsEnd of - true -> - case StreamState of - open -> half_closed_local; - half_closed_remote -> closed - end; - false -> - StreamState - end - }, + %% Update stream state; Remaining is handled via recursion below, + %% so it must not be appended to send_buffer (would cause duplication). + Stream1 = Stream#stream{ + window_size = NewStreamWindow, + send_buffer = Buffer, + pending_end_stream = EndStream andalso Remaining =/= <<>>, + state = case IsEnd of + true -> + case StreamState of + open -> half_closed_local; + half_closed_remote -> closed + end; + false -> + StreamState + end + }, - State1 = State#state{ - conn_window_size = NewConnWindow, - streams = maps:put(StreamId, Stream1, Streams) - }, - %% If we just fully closed the stream, record its close - %% reason so later frames on this id get classified - %% correctly per RFC 9113 §5.1. - State2 = case Stream1#stream.state =:= closed - andalso StreamState =:= half_closed_remote of - true -> close_stream(StreamId, end_stream, State1); - false -> State1 - end, - %% If there's more data, try to send it - case Remaining of - <<>> -> - {keep_state, State2, [{reply, From, ok}]}; - _ -> - handle_send_data(From, StreamId, Remaining, EndStream, State2) + State1 = State#state{ + conn_window_size = NewConnWindow, + streams = maps:put(StreamId, Stream1, Streams) + }, + %% If we just fully closed the stream, record its close + %% reason so later frames on this id get classified + %% correctly per RFC 9113 §5.1. + State2 = case Stream1#stream.state =:= closed + andalso StreamState =:= half_closed_remote of + true -> close_stream(StreamId, end_stream, State1); + false -> State1 + end, + %% If there's more data, try to send it + case Remaining of + <<>> -> + {keep_state, State2, [{reply, From, ok}]}; + _ -> + handle_send_data(From, StreamId, Remaining, EndStream, State2) + end end end; {ok, _} -> @@ -2237,27 +2298,33 @@ handle_send_trailers(From, StreamId, Trailers, #state{streams = Streams, encode_ {HeaderBlock, EncCtx1} = h2_hpack:encode(Trailers, EncCtx), %% Send HEADERS frame with END_STREAM - send_header_block(StreamId, HeaderBlock, true, State), - - %% Update stream state - Stream1 = Stream#stream{ - state = case StreamState of - open -> half_closed_local; - half_closed_remote -> closed - end - }, + case send_header_block(StreamId, HeaderBlock, true, State) of + {error, SendReason} -> + {stop_and_reply, + {shutdown, {send_failed, SendReason}}, + [{reply, From, {error, SendReason}}], + State}; + ok -> + %% Update stream state + Stream1 = Stream#stream{ + state = case StreamState of + open -> half_closed_local; + half_closed_remote -> closed + end + }, - State1 = State#state{ - encode_context = EncCtx1, - streams = maps:put(StreamId, Stream1, Streams) - }, + State1 = State#state{ + encode_context = EncCtx1, + streams = maps:put(StreamId, Stream1, Streams) + }, - case Stream1#stream.state of - closed -> - State2 = close_stream(StreamId, end_stream, State1), - {keep_state, State2, [{reply, From, ok}]}; - _ -> - {keep_state, State1, [{reply, From, ok}]} + case Stream1#stream.state of + closed -> + State2 = close_stream(StreamId, end_stream, State1), + {keep_state, State2, [{reply, From, ok}]}; + _ -> + {keep_state, State1, [{reply, From, ok}]} + end end end end; @@ -2351,7 +2418,8 @@ handle_call_early(From, Request, StateName, #state{waiters = Waiters} = State) - get_peer_settings -> {keep_state, State, [{reply, From, State#state.peer_settings}]}; {controlling_process, NewOwner} -> - {keep_state, State#state{owner = NewOwner}, [{reply, From, ok}]}; + {keep_state, swap_owner_monitor(NewOwner, State), + [{reply, From, ok}]}; _ -> {keep_state, State, [{reply, From, {error, {not_ready, StateName}}}]} end. @@ -2363,13 +2431,17 @@ handle_call_common(From, Request, _StateName, State) -> get_peer_settings -> {keep_state, State, [{reply, From, State#state.peer_settings}]}; {controlling_process, NewOwner} -> - {keep_state, State#state{owner = NewOwner}, [{reply, From, ok}]}; + {keep_state, swap_owner_monitor(NewOwner, State), + [{reply, From, ok}]}; _ -> {keep_state, State, [{reply, From, {error, unknown_request}}]} end. handle_common(info, {'EXIT', Owner, Reason}, _StateName, #state{owner = Owner} = State) -> {stop, {shutdown, {owner_exit, Reason}}, State}; +handle_common(info, {'DOWN', Ref, process, _Pid, Reason}, _StateName, + #state{owner_monitor = Ref} = State) when Ref =/= undefined -> + {stop, {shutdown, {owner_exit, Reason}}, State}; handle_common(info, {tcp_error, Socket, Reason}, _StateName, #state{socket = Socket} = State) -> {stop, {shutdown, {tcp_error, Reason}}, State}; handle_common(info, {ssl_error, Socket, Reason}, _StateName, #state{socket = Socket} = State) -> @@ -2489,7 +2561,10 @@ flush_stream_buffer(StreamId, #state{streams = Streams, conn_window_size = ConnW <> = Buffer, IsEnd = PendingEnd andalso Remaining == <<>>, Frame = h2_frame:data(StreamId, SendData, IsEnd), - send_frame(Frame, State), + %% Best-effort: this flush is triggered by a peer WINDOW_UPDATE, + %% there is no in-flight call to reply to. A dead socket surfaces + %% via {tcp_closed,_} / {ssl_closed,_} and terminates the connection. + _ = send_frame(Frame, State), NewStreamState = case IsEnd of true -> @@ -2531,7 +2606,10 @@ send_preface(#state{socket = Socket, transport = Transport} = State) -> send_settings_frame(#state{local_settings = Settings, pending_settings = Pending} = State) -> Frame = h2_frame:settings(settings_to_list(Settings)), - send_frame(Frame, State), + %% Best-effort like the preface above: if the peer has already closed, + %% the imminent {tcp_closed,_} / {ssl_closed,_} will tear us down and + %% reply {error, _} to any wait_connected callers. + _ = send_frame(Frame, State), State#state{pending_settings = Pending ++ [Settings]}. settings_to_list(Settings) -> @@ -2554,12 +2632,12 @@ setting_id(_) -> undefined. encode_setting_value(unlimited) -> 16#ffffffff; encode_setting_value(V) -> V. +%% Returns ok | {error, Reason}. Callers must handle errors so that +%% an in-flight gen_statem:call cannot reply ok to the user after the +%% socket has died. send_frame(Frame, #state{socket = Socket, transport = Transport}) -> Bin = h2_frame:encode(Frame), - case Transport:send(Socket, Bin) of - ok -> ok; - {error, _Reason} -> ok %% Peer closed; let subsequent handling clean up - end. + Transport:send(Socket, Bin). send_goaway_frame(LastStreamId, ErrorCode, #state{socket = Socket, transport = Transport} = State) -> Frame = h2_frame:goaway(LastStreamId, ErrorCode, <<>>), @@ -2611,6 +2689,18 @@ notify_connected(#state{waiters = Waiters, settings_timer = Timer} = State) -> peel_reason({shutdown, R}) -> R; peel_reason(R) -> R. +%% Switch owner liveness tracking to NewOwner: demonitor the previous +%% monitor (if any), install a fresh one. The initial owner is reached via +%% start_link's bidirectional link, so owner_monitor is undefined until the +%% first controlling_process call. +swap_owner_monitor(NewOwner, #state{owner_monitor = OldRef} = State) -> + case OldRef of + undefined -> ok; + _ -> _ = erlang:demonitor(OldRef, [flush]), ok + end, + NewRef = erlang:monitor(process, NewOwner), + State#state{owner = NewOwner, owner_monitor = NewRef}. + is_ssl_socket(Socket) when is_tuple(Socket) -> element(1, Socket) =:= sslsocket; is_ssl_socket(_) -> diff --git a/test/h2_compliance_SUITE.erl b/test/h2_compliance_SUITE.erl index 2cf9eb7..df17558 100644 --- a/test/h2_compliance_SUITE.erl +++ b/test/h2_compliance_SUITE.erl @@ -123,6 +123,9 @@ server_advertises_enable_push_zero_test/1, push_promise_gets_stream_reset_test/1, ping_flood_triggers_enhance_your_calm_test/1, + continuation_flood_triggers_enhance_your_calm_test/1, + controlling_process_monitors_new_owner_test/1, + send_returns_error_on_closed_socket_test/1, tls_transport_tag_detected_test/1 ]). @@ -257,6 +260,9 @@ groups() -> server_advertises_enable_push_zero_test, push_promise_gets_stream_reset_test, ping_flood_triggers_enhance_your_calm_test, + continuation_flood_triggers_enhance_your_calm_test, + controlling_process_monitors_new_owner_test, + send_returns_error_on_closed_socket_test, tls_transport_tag_detected_test ]} ]. @@ -2431,6 +2437,102 @@ ping_flood_triggers_enhance_your_calm_test(Config) -> ssl:close(Sock), ok. +%% RFC 9113 §6.5.2 / §10.5: a HEADERS+CONTINUATION block whose raw bytes +%% exceed our pre-decode cap (?MAX_HEADER_BLOCK_BYTES = 256 KB) must trigger +%% GOAWAY(ENHANCE_YOUR_CALM) before HPACK decode runs, so a peer cannot +%% OOM the node by streaming CONTINUATIONs. +continuation_flood_triggers_enhance_your_calm_test(Config) -> + Port = ?config(port, Config), + {ok, Sock} = raw_h2_client(Port), + %% Initial HEADERS with END_HEADERS=0, END_STREAM=0 — small valid payload. + Headers = [ + {<<":method">>, <<"GET">>}, + {<<":scheme">>, <<"https">>}, + {<<":path">>, <<"/">>}, + {<<":authority">>, <<"localhost">>} + ], + {Block, _} = h2_hpack:encode(Headers, h2_hpack:new_context()), + HeadersLen = byte_size(Block), + HeadersFrame = <>, + ok = ssl:send(Sock, HeadersFrame), + %% Stream junk CONTINUATION frames totalling > 256 KB. Use 16 KB chunks of + %% raw bytes — they will fail HPACK decode if processed, but the cap must + %% kick in first. + Chunk = binary:copy(<<0>>, 16 * 1024), + ChunkLen = byte_size(Chunk), + ContFrame = <>, + %% 20 chunks = 320 KB, comfortably over the cap. + _ = [ssl:send(Sock, ContFrame) || _ <- lists:seq(1, 20)], + case wait_for_goaway(Sock, 5000) of + {ok, ErrorCode} -> ?assertEqual(11, ErrorCode); %% ENHANCE_YOUR_CALM + timeout -> ct:fail(no_goaway) + end, + ssl:close(Sock), + ok. + +%% controlling_process/2 must install a monitor on the new owner so that the +%% connection is torn down when the new owner exits. Before the fix the +%% connection became an orphan: the original start_link link still pointed +%% at the spawning caller, and the new owner had no liveness signal. +controlling_process_monitors_new_owner_test(Config) -> + Port = ?config(port, Config), + {ok, Conn} = h2:connect("localhost", Port, #{ + ssl_opts => [{verify, verify_none}] + }), + Parent = self(), + NewOwner = spawn(fun() -> + Parent ! {ready, self()}, + receive die -> exit(boom) end + end), + receive {ready, NewOwner} -> ok + after 1000 -> ct:fail(new_owner_not_ready) end, + ok = h2:controlling_process(Conn, NewOwner), + ConnRef = erlang:monitor(process, Conn), + NewOwner ! die, + receive + {'DOWN', ConnRef, process, Conn, _Reason} -> ok + after 3000 -> + ct:fail(connection_did_not_terminate_with_new_owner) + end, + drain_exits(), + ok. + +%% send_frame/2 errors must propagate to API callers instead of silently +%% returning ok. Closing the underlying socket out from under the connection +%% then issuing a send_data must yield {error, _}, not ok. +send_returns_error_on_closed_socket_test(Config) -> + %% Use a never-completing handler so the server-side stream stays open + %% long enough for us to corrupt the socket. + case ?config(server_ref, Config) of + undefined -> ok; + OldRef -> h2:stop_server(OldRef) + end, + Handler = fun(_Conn, _Sid, _, _, _) -> timer:sleep(60000) end, + {ok, Ref} = h2:start_server(0, #{ + cert => ?config(cert_file, Config), + key => ?config(key_file, Config), + handler => Handler + }), + ServerPort = h2:server_port(Ref), + {ok, Conn} = h2:connect("localhost", ServerPort, #{ + ssl_opts => [{verify, verify_none}] + }), + {ok, Sid} = h2:request(Conn, <<"POST">>, <<"/">>, + [{<<"host">>, <<"localhost">>}]), + %% Reach into the gen_statem state to grab the socket and slam it shut. + %% This mimics a peer that disappears mid-flight. + {_StateName, State} = sys:get_state(Conn), + Socket = element(3, State), + Transport = element(4, State), + Transport:close(Socket), + %% Now any send must surface {error, _}; pre-fix this returned ok. + Result = h2:send_data(Conn, Sid, <<"hello">>, false), + ?assertMatch({error, _}, Result), + catch h2:close(Conn), + h2:stop_server(Ref), + drain_exits(), + ok. + read_headers_block(S) -> {ok, <>} = ssl:recv(S, 9, 5000), {ok, Body} = case Len of 0 -> {ok, <<>>}; _ -> ssl:recv(S, Len, 5000) end,