Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ CompletableFuture<Long> write(ByteBuf buf, Iterable<WriteOption> options,
n -> streamFuture.thenCompose(stream -> writeToAsync(buf, options, stream, executor)
.whenComplete((l, e) -> metrics.stop(context, e == null))));
}

void cleanUp() {
streamFuture.thenAccept(DataStream::cleanUp);
}
}

static class RemoteStream {
Expand Down Expand Up @@ -374,31 +378,33 @@ static void sendDataStreamException(Throwable throwable, DataStreamRequestByteBu
ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
} catch (Throwable t) {
LOG.warn("Failed to sendDataStreamException {} for {}", throwable, request, t);
} finally {
request.release();
}
}

void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
try {
readImpl(request, ctx, buf, getStreams);
readImpl(request, ctx, getStreams);
} catch (Throwable t) {
replyDataStreamException(t, request, ctx);
buf.release();
}
}

private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf,
private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
final boolean close = request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
ClientInvocationId key = ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
final StreamInfo info;
if (request.getType() == Type.STREAM_HEADER) {
final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() -> newStreamInfo(buf, getStreams));
final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(
() -> newStreamInfo(request.slice(), getStreams));
info = streams.computeIfAbsent(key, id -> supplier.get());
if (!supplier.isInitialized()) {
streams.remove(key);
final StreamInfo removed = streams.remove(key);
removed.getLocal().cleanUp();
throw new IllegalStateException("Failed to create a new stream for " + request
+ " since a stream already exists Key: " + key + " StreamInfo:" + info);
}
Expand All @@ -408,10 +414,7 @@ private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ct
() -> new IllegalStateException("Failed to remove StreamInfo for " + request));
} else {
info = Optional.ofNullable(streams.get(key)).orElseThrow(
() -> {
streams.remove(key);
return new IllegalStateException("Failed to get StreamInfo for " + request);
});
() -> new IllegalStateException("Failed to get StreamInfo for " + request));
}

final CompletableFuture<Long> localWrite;
Expand All @@ -420,7 +423,7 @@ private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ct
localWrite = CompletableFuture.completedFuture(0L);
remoteWrites = Collections.emptyList();
} else if (request.getType() == Type.STREAM_DATA) {
localWrite = info.getLocal().write(buf, request.getWriteOptionList(), writeExecutor);
localWrite = info.getLocal().write(request.slice(), request.getWriteOptionList(), writeExecutor);
remoteWrites = info.applyToRemotes(out -> out.write(request, requestExecutor));
} else {
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
Expand All @@ -439,11 +442,12 @@ private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ct
}, requestExecutor)).whenComplete((v, exception) -> {
try {
if (exception != null) {
streams.remove(key);
final StreamInfo removed = streams.remove(key);
replyDataStreamException(server, exception, info.getRequest(), request, ctx);
removed.getLocal().cleanUp();
}
} finally {
buf.release();
request.release();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,22 @@

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

/**
* Implements {@link DataStreamRequest} with {@link ByteBuf}.
* <p>
* This class is immutable.
*/
public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements DataStreamRequest {
private final ByteBuf buf;
private final AtomicReference<ByteBuf> buf;
private final List<WriteOption> options;

public DataStreamRequestByteBuf(ClientId clientId, Type type, long streamId, long streamOffset,
Iterable<WriteOption> options, ByteBuf buf) {
super(clientId, type, streamId, streamOffset);
this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
this.buf = new AtomicReference<>(buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER);
this.options = Collections.unmodifiableList(Lists.newArrayList(options));
}

Expand All @@ -52,13 +54,25 @@ public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf) {
header.getWriteOptionList(), buf);
}

ByteBuf getBuf() {
return Optional.ofNullable(buf.get()).orElseThrow(
() -> new IllegalStateException("buf is already released in " + this));
}

@Override
public long getDataLength() {
return buf.readableBytes();
return getBuf().readableBytes();
}

public ByteBuf slice() {
return buf.slice();
return getBuf().slice();
}

public void release() {
final ByteBuf got = buf.getAndSet(null);
if (got != null) {
got.release();
}
}

@Override
Expand Down