Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public async Task<HttpResponseMessage> SendRequestAsync(HttpRequestMessage reque
// we will leave scope of this method
// we need to pass the ownership of the request and this wrapper to the response (via response content stream)
// unless we know that we are not streaming anymore
incomingStream = new WasiInputStream(this, incomingResponse.Consume());// passing self ownership, passing body ownership
incomingStream = new WasiInputStream(this, incomingResponse.Consume(), response);// passing self ownership, passing body ownership
response.Content = new StreamContent(incomingStream); // passing incomingStream ownership to SendAsync() caller
WasiHttpInterop.ConvertResponseHeaders(incomingResponse, response);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,16 @@ public static void ConvertResponseHeaders(IncomingResponse incomingResponse, Htt
}
}

public static HttpResponseHeaders ConvertTrailingResponseHeaders(Fields headers)
{
var result = new HttpResponseHeaders();
foreach ((var key, var value) in headers.Entries())
{
result.Add(key, Encoding.UTF8.GetString(value));
}
return result;
}

private static bool IsContentHeader(string headerName)
{
return HeaderDescriptor.TryGet(headerName, out HeaderDescriptor descriptor) && (descriptor.HeaderType & HttpHeaderType.Content) != 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ internal sealed class WasiInputStream : Stream
private WasiRequestWrapper wrapper; // owned by this instance
private IncomingBody body; // owned by this instance
private InputStream stream; // owned by this instance
private HttpResponseMessage response;

private int offset;
private byte[]? buffer;
Expand All @@ -28,11 +29,16 @@ internal sealed class WasiInputStream : Stream
public override bool CanWrite => false;
public override bool CanSeek => false;

public WasiInputStream(WasiRequestWrapper wrapper, IncomingBody body)
public WasiInputStream(
WasiRequestWrapper wrapper,
IncomingBody body,
HttpResponseMessage response
)
{
this.wrapper = wrapper;
this.body = body;
this.stream = body.Stream();
this.response = response;
}

~WasiInputStream()
Expand Down Expand Up @@ -111,6 +117,7 @@ CancellationToken cancellationToken
if (((StreamError)e.Value).Tag == StreamError.CLOSED)
{
otherSideClosed = true;
await ReadTrailingHeaders(cancellationToken).ConfigureAwait(false);
return 0;
}
else
Expand Down Expand Up @@ -158,6 +165,46 @@ public override async ValueTask<int> ReadAsync(
return result;
}

private async Task ReadTrailingHeaders(CancellationToken cancellationToken)
{
isClosed = true;
stream.Dispose();
using var futureTrailers = IncomingBody.Finish(body);
while (true)
{
var trailers = futureTrailers.Get();
if (trailers is null)
{
cancellationToken.ThrowIfCancellationRequested();
await WasiHttpInterop
.RegisterWasiPollable(futureTrailers.Subscribe(), cancellationToken)
.ConfigureAwait(false);
}
else
{
var inner = ((Result<Result<Fields?, ErrorCode>, None>)trailers!).AsOk;
if (inner.IsOk)
{
using var headers = inner.AsOk;
if (headers is not null)
{
response.StoreReceivedTrailingHeaders(
WasiHttpInterop.ConvertTrailingResponseHeaders(headers)
);
}

break;
}
else
{
throw new HttpRequestException(
WasiHttpInterop.ErrorCodeToString(inner.AsErr)
);
}
}
}
}

#region PlatformNotSupported

public override void Flush()
Expand Down