From 5b614fbe5f2a0b0e2416aca1f2ee3956aa85f378 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Elek=20M=C3=A1rton?= Date: Fri, 14 May 2021 10:57:30 +0200 Subject: [PATCH 1/8] streaming server --- .../stream/DirectoryServerDestination.java | 18 +++ .../stream/DirectoryServerSource.java | 31 ++++ .../stream/DirstreamClientHandler.java | 107 ++++++++++++++ .../stream/DirstreamServerHandler.java | 107 ++++++++++++++ .../container/stream/StreamingClient.java | 54 +++++++ .../stream/StreamingDestination.java | 9 ++ .../container/stream/StreamingServer.java | 94 ++++++++++++ .../container/stream/StreamingSource.java | 36 +++++ .../container/stream/TestStreamingServer.java | 7 + .../org/apache/hadoop/ozone/freon/Freon.java | 2 +- .../ozone/freon/StreamingGenerator.java | 136 ++++++++++++++++++ 11 files changed, 600 insertions(+), 1 deletion(-) create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerDestination.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingServer.java create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java create mode 100644 hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerDestination.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerDestination.java new file mode 100644 index 000000000000..9733c71d25f2 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerDestination.java @@ -0,0 +1,18 @@ +package org.apache.hadoop.ozone.container.stream; + +import java.nio.file.Path; +import java.nio.file.Paths; + +public class DirectoryServerDestination implements StreamingDestination { + + private Path root; + + public DirectoryServerDestination(Path path) { + root = path; + } + + @Override + public Path mapToDestination(String name) { + return root.resolve(Paths.get(name)); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java new file mode 100644 index 000000000000..acd947f934c6 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java @@ -0,0 +1,31 @@ +package org.apache.hadoop.ozone.container.stream; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; + +public class DirectoryServerSource implements StreamingSource { + + private Path root; + + public DirectoryServerSource(Path root) { + this.root = root; + } + + @Override + public Map getFilesToStream(String id) { + Map files = new HashMap<>(); + try { + Files.walk(root.resolve(id)) + .filter(Files::isRegularFile) + .forEach(path -> { + files.put(root.relativize(path).toString(), path); + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + return files; + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java new file mode 100644 index 000000000000..62384b606b33 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java @@ -0,0 +1,107 @@ +package org.apache.hadoop.ozone.container.stream; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.channels.FileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.ByteProcessor; +import io.netty.util.ReferenceCountUtil; + +public class DirstreamClientHandler extends ChannelInboundHandlerAdapter { + + private final StreamingDestination destination; + private boolean headerMode = true; + private StringBuilder currentFileName = new StringBuilder(); + private RandomAccessFile destFile; + + private FileChannel destFileChannel; + + private long remaining; + + public DirstreamClientHandler(StreamingDestination streamingDestination) { + this.destination = streamingDestination; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws IOException { + try { + ByteBuf buffer = (ByteBuf) msg; + doRead(ctx, buffer); + } finally { + ReferenceCountUtil.release(msg); + } + } + + public void doRead(ChannelHandlerContext ctx, ByteBuf buffer) + throws IOException { + if (headerMode) { + int eolPosition = buffer.forEachByte(ByteProcessor.FIND_LF) - buffer + .readerIndex(); + if (eolPosition > 0) { + headerMode = false; + final ByteBuf name = buffer.readBytes(eolPosition); + currentFileName.append(name + .toString(StandardCharsets.UTF_8)); + name.release(); + buffer.skipBytes(1); + String[] parts = currentFileName.toString().split(" ", 2); + remaining = Long.parseLong(parts[0]); + Path destFilePath = destination.mapToDestination(parts[1]); + Files.createDirectories(destFilePath.getParent()); + this.destFile = + new RandomAccessFile(destFilePath.toFile(), "rw"); + destFileChannel = this.destFile.getChannel(); + } else { + currentFileName + .append(buffer.toString(StandardCharsets.UTF_8)); + } + } + if (!headerMode) { + final int readableBytes = buffer.readableBytes(); + if (remaining >= readableBytes) { + remaining -= + buffer.readBytes(destFileChannel, readableBytes); + } else { + remaining -= buffer.readBytes(destFileChannel, (int) remaining); + currentFileName = new StringBuilder(); + headerMode = true; + destFile.close(); + if (readableBytes > 0) { + doRead(ctx, buffer); + } + } + } + } + + @Override + public void channelUnregistered(ChannelHandlerContext ctx) { + try { + if (destFile != null) { + destFile.close(); + } + + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + try { + destFileChannel.close(); + destFile.close(); + } catch (IOException e) { + e.printStackTrace(); + } + ctx.close(); + } + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java new file mode 100644 index 000000000000..04723962c085 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java @@ -0,0 +1,107 @@ +package org.apache.hadoop.ozone.container.stream; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.Map.Entry; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.DefaultFileRegion; +import io.netty.util.ByteProcessor; + +public class DirstreamServerHandler extends ChannelInboundHandlerAdapter { + + private final StringBuilder id = new StringBuilder(); + private StreamingSource source; + private int writtenIndex = 0; + private boolean headerProcessed = false; + + public DirstreamServerHandler(StreamingSource source) { + this.source = source; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { + if (!headerProcessed) { + ByteBuf buffer = (ByteBuf) msg; + int eolPosition = buffer.forEachByte(ByteProcessor.FIND_LF) - buffer + .readerIndex(); + if (eolPosition > 0) { + headerProcessed = true; + id.append(buffer.toString(Charset.defaultCharset())); + } else { + id.append(buffer.toString(0,eolPosition, Charset.defaultCharset())); + } + buffer.release(); + } + + if (headerProcessed) { + ChannelFuture lastFuture = null; + final List> entriesToWrite = new ArrayList<>( + source.getFilesToStream(id.toString().trim()).entrySet()); + + writeOneElement(ctx, entriesToWrite, 0); + + } + } + + public void writeOneElement( + ChannelHandlerContext ctx, + List> entriesToWrite, + int i + ) + throws IOException { + final Entry entryToWrite = entriesToWrite.get(i); + Path file = entryToWrite.getValue(); + String name = entryToWrite.getKey(); + long fileSize = Files.size(file); + String identifier = fileSize + " " + name + "\n"; + ByteBuf identifierBuf = + Unpooled.wrappedBuffer(identifier.getBytes( + StandardCharsets.UTF_8)); + + final int currentIndex = i; + + ChannelFuture lastFuture = ctx.writeAndFlush(identifierBuf); + lastFuture.addListener(f -> { + ChannelFuture nextFuture = ctx.writeAndFlush( + new DefaultFileRegion(file.toFile(), 0, fileSize)); + if (currentIndex == entriesToWrite.size() - 1) { + nextFuture.addListener(a -> ctx.channel().close()); + } else { + nextFuture.addListener( + a -> writeOneElement(ctx, entriesToWrite, + currentIndex + 1)); + } + }); + + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) { + ctx.flush(); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) + throws Exception { + cause.printStackTrace(); + if (ctx.channel().isActive()) { + ctx.writeAndFlush("ERR: " + + cause.getClass().getSimpleName() + ": " + + cause.getMessage() + '\n').addListener( + ChannelFutureListener.CLOSE); + } + ctx.close(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java new file mode 100644 index 000000000000..f2984aaeec15 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java @@ -0,0 +1,54 @@ +package org.apache.hadoop.ozone.container.stream; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.*; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.string.StringEncoder; +import io.netty.util.CharsetUtil; + +public class StreamingClient implements AutoCloseable { + + private static EventLoopGroup group; + private final Bootstrap b; + private ChannelFuture f; + private int port; + private String host; + + public StreamingClient( + String host, + int port, + StreamingDestination streamingDestination + ) throws InterruptedException { + this.port = port; + this.host = host; + + group = new NioEventLoopGroup(100); + + b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_RCVBUF, 1024 * 1024) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new StringEncoder(CharsetUtil.UTF_8), + new DirstreamClientHandler(streamingDestination)); + } + }); + + } + + public Channel connect() throws InterruptedException { + f = b.connect(host, port).sync(); + return f.channel(); + } + + public void close() throws InterruptedException { + f.channel().closeFuture().sync(); + group.shutdownGracefully(); + } + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java new file mode 100644 index 000000000000..63c3ea89913c --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java @@ -0,0 +1,9 @@ +package org.apache.hadoop.ozone.container.stream; + +import java.nio.file.Path; + +public interface StreamingDestination { + + Path mapToDestination(String name); + +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingServer.java new file mode 100644 index 000000000000..4ceb1569bec0 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingServer.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.stream; + +import java.net.InetSocketAddress; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.stream.ChunkedWriteHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Netty based streaming server to replicate files from a directory. + */ +public class StreamingServer implements AutoCloseable { + + private static final Logger LOG = + LoggerFactory.getLogger(StreamingServer.class); + + private int port; + + private StreamingSource source; + + private EventLoopGroup bossGroup; + + private EventLoopGroup workerGroup; + + public StreamingServer( + StreamingSource source, int port + ) { + this.port = port; + this.source = source; + } + + public void start() throws InterruptedException { + ServerBootstrap b = new ServerBootstrap(); + bossGroup = new NioEventLoopGroup(100); + workerGroup = new NioEventLoopGroup(100); + + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 100) + .childHandler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast( + new ChunkedWriteHandler(), + new DirstreamServerHandler(source)); + } + }); + + ChannelFuture f = b.bind(port).sync(); + final InetSocketAddress socketAddress = + (InetSocketAddress) f.channel().localAddress(); + port = socketAddress.getPort(); + LOG.info("Started streaming server on " + port); + } + + public void stop() { + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + + public int getPort() { + return port; + } + + @Override + public void close() { + stop(); + } +} diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java new file mode 100644 index 000000000000..b93432eaf4ce --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.stream; + +import java.nio.file.Path; +import java.util.Map; + +/** + * Interface to define which files should be replicated for a given id. + */ +public interface StreamingSource { + + /** + * + * @param id: custom identifier + * + * @return map of files which should be copied (logical name -> real path) + */ + Map getFilesToStream(String id); + +} diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java new file mode 100644 index 000000000000..901a1f6c50f5 --- /dev/null +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java @@ -0,0 +1,7 @@ +package org.apache.hadoop.ozone.container.stream; + +import static org.junit.Assert.*; + +public class TestStreamingServer { + +} \ No newline at end of file diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java index 21714c9602c9..b9178c27894b 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java @@ -59,7 +59,7 @@ GeneratorOm.class, GeneratorScm.class, GeneratorDatanode.class, - ClosedContainerReplicator.class}, + StreamingGenerator.class}, versionProvider = HddsVersionProvider.class, mixinStandardHelpOptions = true) public class Freon extends GenericCli { diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java new file mode 100644 index 000000000000..f5f72407e000 --- /dev/null +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.ozone.freon; + +import com.codahale.metrics.Timer; +import io.netty.channel.Channel; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdds.cli.HddsVersionProvider; +import org.apache.hadoop.ozone.container.stream.DirectoryServerDestination; +import org.apache.hadoop.ozone.container.stream.DirectoryServerSource; +import org.apache.hadoop.ozone.container.stream.StreamingClient; +import org.apache.hadoop.ozone.container.stream.StreamingServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import picocli.CommandLine; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.concurrent.Callable; + +@CommandLine.Command(name = "strmg", + aliases = "streaming-generator", + description = "Create directory structure and stream them multiple times.", + versionProvider = HddsVersionProvider.class, + mixinStandardHelpOptions = true, + showDefaultValues = true) +public class StreamingGenerator extends BaseFreonGenerator + implements Callable { + + private static final Logger LOG = LoggerFactory.getLogger(StreamingGenerator.class); + + @CommandLine.Option(names = {"--root-dir"}, + description = "Directory where the working directories are created", + defaultValue = "/tmp/ozone-streaming") + Path testRoot; + + @CommandLine.Option(names = {"--files"}, + description = "Number of the files in the test directory to be generated.", + defaultValue = "50") + private int numberOfFiles; + + @CommandLine.Option(names = {"--size"}, + description = "Size of the generated files.", + defaultValue = "104857600") + private int fileSize; + + + private int port = 1234; + + private String subdir = "dir1"; + + + @Override + public Void call() throws Exception { + init(); + + generateBaseData(); + + Timer timer = getMetrics().timer("streaming"); + setThreadNo(1); + runTests(this::copyDir); + + + return null; + } + + private void generateBaseData() throws IOException { + Path sourceDir = testRoot.resolve("streaming-0"); + if (Files.exists(sourceDir)) { + deleteDirRecursive(sourceDir); + } + Path subDir = sourceDir.resolve(subdir); + Files.createDirectories(subDir); + ContentGenerator contentGenerator = new ContentGenerator(fileSize, 1024); + + for (int i = 0; i < numberOfFiles; i++) { + try (FileOutputStream out = new FileOutputStream(subDir.resolve("file-" + i).toFile())) { + contentGenerator.write(out); + } + } + } + + private void copyDir(long l) { + Path sourceDir = testRoot.resolve("streaming-" + l); + Path destinationDir = testRoot.resolve("streaming-" + (l + 1)); + + try (StreamingServer server = new StreamingServer(new DirectoryServerSource(sourceDir), 1234)) { + try { + server.start(); + LOG.info("Starting streaming server on port {} to publish dir {}", port, sourceDir); + + try (StreamingClient client = + new StreamingClient("localhost", port, + new DirectoryServerDestination( + destinationDir))) { + final Channel connect = client.connect(); + connect.writeAndFlush(subdir + "\n").await(); + connect.closeFuture().sync().await(); + } + LOG.info("Replication has been finished to {}", sourceDir); + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + + deleteDirRecursive(sourceDir); + + } + } + + private void deleteDirRecursive(Path destinationDir) { + try { + FileUtils.forceDelete(destinationDir.toFile()); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} From 510a434ff41cb8d4df4126458ace58c67e034cc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Elek=20M=C3=A1rton?= Date: Fri, 14 May 2021 11:30:12 +0200 Subject: [PATCH 2/8] non-secure streaming --- .../stream/DirectoryServerDestination.java | 20 +++ .../stream/DirectoryServerSource.java | 22 +++- .../stream/DirstreamClientHandler.java | 46 +++++-- .../stream/DirstreamServerHandler.java | 67 +++++++--- .../container/stream/StreamingClient.java | 66 +++++++--- .../stream/StreamingDestination.java | 23 ++++ .../container/stream/StreamingSource.java | 2 +- .../container/stream/TestStreamingServer.java | 118 +++++++++++++++++- .../org/apache/hadoop/ozone/freon/Freon.java | 2 +- .../ozone/freon/StreamingGenerator.java | 4 +- 10 files changed, 322 insertions(+), 48 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerDestination.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerDestination.java index 9733c71d25f2..69af0ae40ff5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerDestination.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerDestination.java @@ -1,8 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.ozone.container.stream; import java.nio.file.Path; import java.nio.file.Paths; +/** + * Streaming binaries to single directory. + */ public class DirectoryServerDestination implements StreamingDestination { private Path root; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java index acd947f934c6..995d22450a7d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.ozone.container.stream; import java.io.IOException; @@ -6,6 +23,9 @@ import java.util.HashMap; import java.util.Map; +/** + * Streaming files from single directory. + */ public class DirectoryServerSource implements StreamingSource { private Path root; @@ -15,7 +35,7 @@ public DirectoryServerSource(Path root) { } @Override - public Map getFilesToStream(String id) { + public Map getFilesToStream(String id) throws InterruptedException { Map files = new HashMap<>(); try { Files.walk(root.resolve(id)) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java index 62384b606b33..f62f7f92d4a9 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.ozone.container.stream; import java.io.IOException; @@ -13,6 +30,17 @@ import io.netty.util.ByteProcessor; import io.netty.util.ReferenceCountUtil; +/** + * Protocol definition from streaming binary files. + * + * Format of the protocol (TCP/IP): + * + * LOGICAL_NAME SIZE + * ... (binary content) + * LOGICAL_NAME SIZE + * ... (binary content) + * END 0 + */ public class DirstreamClientHandler extends ChannelInboundHandlerAdapter { private final StreamingDestination destination; @@ -30,7 +58,7 @@ public DirstreamClientHandler(StreamingDestination streamingDestination) { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) - throws IOException { + throws IOException { try { ByteBuf buffer = (ByteBuf) msg; doRead(ctx, buffer); @@ -40,15 +68,15 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) } public void doRead(ChannelHandlerContext ctx, ByteBuf buffer) - throws IOException { + throws IOException { if (headerMode) { int eolPosition = buffer.forEachByte(ByteProcessor.FIND_LF) - buffer - .readerIndex(); + .readerIndex(); if (eolPosition > 0) { headerMode = false; final ByteBuf name = buffer.readBytes(eolPosition); currentFileName.append(name - .toString(StandardCharsets.UTF_8)); + .toString(StandardCharsets.UTF_8)); name.release(); buffer.skipBytes(1); String[] parts = currentFileName.toString().split(" ", 2); @@ -56,18 +84,18 @@ public void doRead(ChannelHandlerContext ctx, ByteBuf buffer) Path destFilePath = destination.mapToDestination(parts[1]); Files.createDirectories(destFilePath.getParent()); this.destFile = - new RandomAccessFile(destFilePath.toFile(), "rw"); + new RandomAccessFile(destFilePath.toFile(), "rw"); destFileChannel = this.destFile.getChannel(); } else { currentFileName - .append(buffer.toString(StandardCharsets.UTF_8)); + .append(buffer.toString(StandardCharsets.UTF_8)); } } if (!headerMode) { final int readableBytes = buffer.readableBytes(); if (remaining >= readableBytes) { remaining -= - buffer.readBytes(destFileChannel, readableBytes); + buffer.readBytes(destFileChannel, readableBytes); } else { remaining -= buffer.readBytes(destFileChannel, (int) remaining); currentFileName = new StringBuilder(); @@ -86,7 +114,6 @@ public void channelUnregistered(ChannelHandlerContext ctx) { if (destFile != null) { destFile.close(); } - } catch (IOException e) { e.printStackTrace(); } @@ -104,4 +131,7 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } + public String getCurrentFileName() { + return currentFileName.toString(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java index 04723962c085..9320ceb268bc 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.ozone.container.stream; import java.io.IOException; @@ -18,11 +35,20 @@ import io.netty.channel.DefaultFileRegion; import io.netty.util.ByteProcessor; +/** + * Protocol definition of the streaming. + */ public class DirstreamServerHandler extends ChannelInboundHandlerAdapter { + public static final String END_MARKER = "0 END"; + + public static final ByteBuf END_MARKER_BUF = + Unpooled.wrappedBuffer(END_MARKER.getBytes(StandardCharsets.UTF_8)); + private final StringBuilder id = new StringBuilder(); + private StreamingSource source; - private int writtenIndex = 0; + private boolean headerProcessed = false; public DirstreamServerHandler(StreamingSource source) { @@ -31,16 +57,16 @@ public DirstreamServerHandler(StreamingSource source) { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { + throws Exception { if (!headerProcessed) { ByteBuf buffer = (ByteBuf) msg; int eolPosition = buffer.forEachByte(ByteProcessor.FIND_LF) - buffer - .readerIndex(); + .readerIndex(); if (eolPosition > 0) { headerProcessed = true; id.append(buffer.toString(Charset.defaultCharset())); } else { - id.append(buffer.toString(0,eolPosition, Charset.defaultCharset())); + id.append(buffer.toString(0, eolPosition, Charset.defaultCharset())); } buffer.release(); } @@ -48,7 +74,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) if (headerProcessed) { ChannelFuture lastFuture = null; final List> entriesToWrite = new ArrayList<>( - source.getFilesToStream(id.toString().trim()).entrySet()); + source.getFilesToStream(id.toString().trim()).entrySet()); writeOneElement(ctx, entriesToWrite, 0); @@ -56,32 +82,35 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) } public void writeOneElement( - ChannelHandlerContext ctx, - List> entriesToWrite, - int i + ChannelHandlerContext ctx, + List> entriesToWrite, + int i ) - throws IOException { + throws IOException { final Entry entryToWrite = entriesToWrite.get(i); Path file = entryToWrite.getValue(); String name = entryToWrite.getKey(); long fileSize = Files.size(file); String identifier = fileSize + " " + name + "\n"; ByteBuf identifierBuf = - Unpooled.wrappedBuffer(identifier.getBytes( - StandardCharsets.UTF_8)); + Unpooled.wrappedBuffer(identifier.getBytes( + StandardCharsets.UTF_8)); final int currentIndex = i; ChannelFuture lastFuture = ctx.writeAndFlush(identifierBuf); lastFuture.addListener(f -> { ChannelFuture nextFuture = ctx.writeAndFlush( - new DefaultFileRegion(file.toFile(), 0, fileSize)); + new DefaultFileRegion(file.toFile(), 0, fileSize)); if (currentIndex == entriesToWrite.size() - 1) { - nextFuture.addListener(a -> ctx.channel().close()); + nextFuture.addListener(a -> + ctx.writeAndFlush(END_MARKER_BUF).addListener(b -> { + ctx.channel().close(); + })); } else { nextFuture.addListener( - a -> writeOneElement(ctx, entriesToWrite, - currentIndex + 1)); + a -> writeOneElement(ctx, entriesToWrite, + currentIndex + 1)); } }); @@ -94,13 +123,13 @@ public void channelReadComplete(ChannelHandlerContext ctx) { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) - throws Exception { + throws Exception { cause.printStackTrace(); if (ctx.channel().isActive()) { ctx.writeAndFlush("ERR: " + - cause.getClass().getSimpleName() + ": " + - cause.getMessage() + '\n').addListener( - ChannelFutureListener.CLOSE); + cause.getClass().getSimpleName() + ": " + + cause.getMessage() + '\n').addListener( + ChannelFutureListener.CLOSE); } ctx.close(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java index f2984aaeec15..4927bcda081f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.ozone.container.stream; import io.netty.bootstrap.Bootstrap; @@ -8,13 +25,17 @@ import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.ozone.container.stream.DirstreamServerHandler.END_MARKER; + public class StreamingClient implements AutoCloseable { private static EventLoopGroup group; - private final Bootstrap b; - private ChannelFuture f; + private final Bootstrap bootstrap; private int port; private String host; + private final DirstreamClientHandler dirstreamClientHandler; public StreamingClient( String host, @@ -25,30 +46,47 @@ public StreamingClient( this.host = host; group = new NioEventLoopGroup(100); - - b = new Bootstrap(); - b.group(group) + dirstreamClientHandler = new DirstreamClientHandler(streamingDestination); + bootstrap = new Bootstrap(); + bootstrap.group(group) .channel(NioSocketChannel.class) - .option(ChannelOption.SO_RCVBUF, 1024 * 1024) - .handler(new ChannelInitializer() { + .option(ChannelOption.SO_RCVBUF, 1024 * 1024) + .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast(new StringEncoder(CharsetUtil.UTF_8), - new DirstreamClientHandler(streamingDestination)); + dirstreamClientHandler + ); } }); } - public Channel connect() throws InterruptedException { - f = b.connect(host, port).sync(); - return f.channel(); + + public void stream(String id) { + stream(id, 200L, TimeUnit.SECONDS); + } + + public void stream(String id, long timeout, TimeUnit unit) { + try { + Channel channel = bootstrap.connect(host, port).sync().channel(); + channel.writeAndFlush(id + "\n") + .await(timeout, unit); + channel.closeFuture().await(timeout, unit); + if (!dirstreamClientHandler.getCurrentFileName().equals(END_MARKER)) { + throw new RuntimeException("Streaming is failed. Not all files " + + "are streamed. Please check the log of the server. Last (partial?) " + + "streamed file: " + dirstreamClientHandler.getCurrentFileName()); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } - public void close() throws InterruptedException { - f.channel().closeFuture().sync(); + + @Override + public void close() { group.shutdownGracefully(); } - } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java index 63c3ea89913c..f91d99dce2dd 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingDestination.java @@ -1,9 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.ozone.container.stream; import java.nio.file.Path; +/** + * Interface defines the mapping to the destination. + */ public interface StreamingDestination { + /** + * Returns destination path to each logical name. + */ Path mapToDestination(String name); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java index b93432eaf4ce..5fdfc931b99c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingSource.java @@ -31,6 +31,6 @@ public interface StreamingSource { * * @return map of files which should be copied (logical name -> real path) */ - Map getFilesToStream(String id); + Map getFilesToStream(String id) throws InterruptedException; } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java index 901a1f6c50f5..6f6b6c6ce478 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java @@ -1,7 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.ozone.container.stream; -import static org.junit.Assert.*; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Test; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Map; +import java.util.concurrent.TimeUnit; + + +/** + * Testing stream server. + */ public class TestStreamingServer { + private static final String SUBDIR = "test1"; + + private static final byte[] CONTENT = "Stream it if you can" + .getBytes(StandardCharsets.UTF_8); + + @Test + public void simpleStream() throws Exception { + Path sourceDir = GenericTestUtils.getRandomizedTestDir().toPath(); + Path destDir = GenericTestUtils.getRandomizedTestDir().toPath(); + Files.createDirectories(sourceDir.resolve(SUBDIR)); + Files.createDirectories(destDir.resolve(SUBDIR)); + + //GIVEN: generate file + Files.write(sourceDir.resolve(SUBDIR).resolve("file1"), CONTENT); + + //WHEN: stream subdir + streamDir(sourceDir, destDir, SUBDIR); + + //THEN: compare the files + final byte[] targetContent = Files + .readAllBytes(destDir.resolve(SUBDIR).resolve("file1")); + Assert.assertArrayEquals(CONTENT, targetContent); + + } + + + @Test(expected = RuntimeException.class) + public void failedStream() throws Exception { + Path sourceDir = GenericTestUtils.getRandomizedTestDir().toPath(); + Path destDir = GenericTestUtils.getRandomizedTestDir().toPath(); + Files.createDirectories(sourceDir.resolve(SUBDIR)); + Files.createDirectories(destDir.resolve(SUBDIR)); + + //GIVEN: generate file + Files.write(sourceDir.resolve(SUBDIR).resolve("file1"), CONTENT); + + //WHEN: stream subdir + streamDir(sourceDir, destDir, "NO_SUCH_ID"); + + //THEN: compare the files + //exception is expected + + } + + @Test(expected = RuntimeException.class) + public void timeout() throws Exception { + Path sourceDir = GenericTestUtils.getRandomizedTestDir().toPath(); + Path destDir = GenericTestUtils.getRandomizedTestDir().toPath(); + Files.createDirectories(sourceDir.resolve(SUBDIR)); + Files.createDirectories(destDir.resolve(SUBDIR)); + + //GIVEN: generate file + Files.write(sourceDir.resolve(SUBDIR).resolve("file1"), CONTENT); + + //WHEN: stream subdir + try (StreamingServer server = new StreamingServer(new DirectoryServerSource(sourceDir) { + @Override + public Map getFilesToStream(String id) throws InterruptedException { + Thread.sleep(3000L); + return super.getFilesToStream(id); + } + }, 0)) { + server.start(); + try (StreamingClient client = + new StreamingClient("localhost", server.getPort(), + new DirectoryServerDestination( + destDir))) { + client.stream(SUBDIR, 1L, TimeUnit.SECONDS); + } + } + + //THEN: compare the files + //exception is expected + + } + + private void streamDir(Path sourceDir, Path destDir, String subdir) throws InterruptedException { + try (StreamingServer server = new StreamingServer(new DirectoryServerSource(sourceDir), 0)) { + server.start(); + try (StreamingClient client = + new StreamingClient("localhost", server.getPort(), + new DirectoryServerDestination( + destDir))) { + client.stream(subdir); + } + } + } } \ No newline at end of file diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java index b9178c27894b..8be1344328af 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java @@ -59,7 +59,7 @@ GeneratorOm.class, GeneratorScm.class, GeneratorDatanode.class, - StreamingGenerator.class}, + StreamingGenerator.class}, versionProvider = HddsVersionProvider.class, mixinStandardHelpOptions = true) public class Freon extends GenericCli { diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java index f5f72407e000..594fef0b3937 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java @@ -109,9 +109,7 @@ private void copyDir(long l) { new StreamingClient("localhost", port, new DirectoryServerDestination( destinationDir))) { - final Channel connect = client.connect(); - connect.writeAndFlush(subdir + "\n").await(); - connect.closeFuture().sync().await(); + client.stream(subdir); } LOG.info("Replication has been finished to {}", sourceDir); From 134e640f55d08669689c7abec71aad24aa38e791 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Elek=20M=C3=A1rton?= Date: Mon, 17 May 2021 11:36:08 +0200 Subject: [PATCH 3/8] retrigger build From ec01030ada319bcd29d328e72bb018afc7f453a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Elek=20M=C3=A1rton?= Date: Mon, 17 May 2021 12:33:13 +0200 Subject: [PATCH 4/8] fix netty dependencies --- hadoop-hdds/container-service/pom.xml | 12 ++++++++++++ pom.xml | 15 +++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/hadoop-hdds/container-service/pom.xml b/hadoop-hdds/container-service/pom.xml index ed2f8f4dcd61..0c64c8ffede9 100644 --- a/hadoop-hdds/container-service/pom.xml +++ b/hadoop-hdds/container-service/pom.xml @@ -115,6 +115,18 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd"> org.glassfish.jaxb jaxb-runtime + + io.netty + netty-transport + + + io.netty + netty-codec + + + io.netty + netty-handler + diff --git a/pom.xml b/pom.xml index f74df1798a23..f1ba32f6fa14 100644 --- a/pom.xml +++ b/pom.xml @@ -1031,6 +1031,21 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs netty-all ${netty.version} + + io.netty + netty-transport + ${netty.version} + + + io.netty + netty-codec + ${netty.version} + + + io.netty + netty-handler + ${netty.version} + commons-io From 5345c6d9443649325e00b7bcf7f16824ed6b7fd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Elek=20M=C3=A1rton?= Date: Mon, 17 May 2021 14:01:31 +0200 Subject: [PATCH 5/8] findbugs fixes --- .../ozone/container/stream/DirstreamClientHandler.java | 8 +++++++- .../hadoop/ozone/container/stream/StreamingClient.java | 2 +- .../apache/hadoop/ozone/freon/StreamingGenerator.java | 9 +++++---- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java index f62f7f92d4a9..c0e530abbe3b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamClientHandler.java @@ -82,10 +82,16 @@ public void doRead(ChannelHandlerContext ctx, ByteBuf buffer) String[] parts = currentFileName.toString().split(" ", 2); remaining = Long.parseLong(parts[0]); Path destFilePath = destination.mapToDestination(parts[1]); - Files.createDirectories(destFilePath.getParent()); + final Path destfileParent = destFilePath.getParent(); + if (destfileParent == null) { + throw new IllegalArgumentException("Streaming destination " + + "provider return with invalid path: " + destFilePath); + } + Files.createDirectories(destfileParent); this.destFile = new RandomAccessFile(destFilePath.toFile(), "rw"); destFileChannel = this.destFile.getChannel(); + } else { currentFileName .append(buffer.toString(StandardCharsets.UTF_8)); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java index 4927bcda081f..e9d3c081270c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java @@ -31,7 +31,7 @@ public class StreamingClient implements AutoCloseable { - private static EventLoopGroup group; + private EventLoopGroup group; private final Bootstrap bootstrap; private int port; private String host; diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java index 594fef0b3937..b7fe8b596e46 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java @@ -17,7 +17,6 @@ package org.apache.hadoop.ozone.freon; import com.codahale.metrics.Timer; -import io.netty.channel.Channel; import org.apache.commons.io.FileUtils; import org.apache.hadoop.hdds.cli.HddsVersionProvider; import org.apache.hadoop.ozone.container.stream.DirectoryServerDestination; @@ -64,6 +63,7 @@ public class StreamingGenerator extends BaseFreonGenerator private int port = 1234; private String subdir = "dir1"; + private Timer timer; @Override @@ -72,11 +72,10 @@ public Void call() throws Exception { generateBaseData(); - Timer timer = getMetrics().timer("streaming"); + timer = getMetrics().timer("streaming"); setThreadNo(1); runTests(this::copyDir); - return null; } @@ -109,7 +108,9 @@ private void copyDir(long l) { new StreamingClient("localhost", port, new DirectoryServerDestination( destinationDir))) { - client.stream(subdir); + + timer.time(() -> client.stream(subdir)); + } LOG.info("Replication has been finished to {}", sourceDir); From 1b26784edeea05372594dcb7d9b6e7ea0b3aaf86 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Elek=20M=C3=A1rton?= Date: Mon, 17 May 2021 14:06:02 +0200 Subject: [PATCH 6/8] checksyle fixes --- .../stream/DirectoryServerSource.java | 3 +- .../container/stream/StreamingClient.java | 99 ++++++++++--------- .../ozone/container/stream/package-info.java | 22 +++++ .../container/stream/TestStreamingServer.java | 22 +++-- .../ozone/freon/StreamingGenerator.java | 41 +++++--- 5 files changed, 112 insertions(+), 75 deletions(-) create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/package-info.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java index 995d22450a7d..0c4d696876a3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java @@ -35,7 +35,8 @@ public DirectoryServerSource(Path root) { } @Override - public Map getFilesToStream(String id) throws InterruptedException { + public Map getFilesToStream(String id) + throws InterruptedException { Map files = new HashMap<>(); try { Files.walk(root.resolve(id)) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java index e9d3c081270c..4e373e9fc7ab 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java @@ -31,62 +31,63 @@ public class StreamingClient implements AutoCloseable { - private EventLoopGroup group; - private final Bootstrap bootstrap; - private int port; - private String host; - private final DirstreamClientHandler dirstreamClientHandler; + private final Bootstrap bootstrap; + private final DirstreamClientHandler dirstreamClientHandler; + private EventLoopGroup group; + private int port; + private String host; - public StreamingClient( - String host, - int port, - StreamingDestination streamingDestination - ) throws InterruptedException { - this.port = port; - this.host = host; + public StreamingClient( + String host, + int port, + StreamingDestination streamingDestination + ) throws InterruptedException { + this.port = port; + this.host = host; - group = new NioEventLoopGroup(100); - dirstreamClientHandler = new DirstreamClientHandler(streamingDestination); - bootstrap = new Bootstrap(); - bootstrap.group(group) - .channel(NioSocketChannel.class) - .option(ChannelOption.SO_RCVBUF, 1024 * 1024) - .handler(new ChannelInitializer() { - @Override - public void initChannel(SocketChannel ch) throws Exception { - ChannelPipeline p = ch.pipeline(); - p.addLast(new StringEncoder(CharsetUtil.UTF_8), - dirstreamClientHandler - ); - } - }); + group = new NioEventLoopGroup(100); + dirstreamClientHandler = new DirstreamClientHandler(streamingDestination); + bootstrap = new Bootstrap(); + bootstrap.group(group) + .channel(NioSocketChannel.class) + .option(ChannelOption.SO_RCVBUF, 1024 * 1024) + .handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline p = ch.pipeline(); + p.addLast(new StringEncoder(CharsetUtil.UTF_8), + dirstreamClientHandler + ); + } + }); - } + } - public void stream(String id) { - stream(id, 200L, TimeUnit.SECONDS); - } + public void stream(String id) { + stream(id, 200L, TimeUnit.SECONDS); + } - public void stream(String id, long timeout, TimeUnit unit) { - try { - Channel channel = bootstrap.connect(host, port).sync().channel(); - channel.writeAndFlush(id + "\n") - .await(timeout, unit); - channel.closeFuture().await(timeout, unit); - if (!dirstreamClientHandler.getCurrentFileName().equals(END_MARKER)) { - throw new RuntimeException("Streaming is failed. Not all files " + - "are streamed. Please check the log of the server. Last (partial?) " + - "streamed file: " + dirstreamClientHandler.getCurrentFileName()); - } - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + public void stream(String id, long timeout, TimeUnit unit) { + try { + Channel channel = bootstrap.connect(host, port).sync().channel(); + channel.writeAndFlush(id + "\n") + .await(timeout, unit); + channel.closeFuture().await(timeout, unit); + if (!dirstreamClientHandler.getCurrentFileName().equals(END_MARKER)) { + throw new RuntimeException("Streaming is failed. Not all files " + + "are streamed. Please check the log of the server." + + " Last (partial?) streamed file: " + + dirstreamClientHandler.getCurrentFileName()); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); } + } - @Override - public void close() { - group.shutdownGracefully(); - } + @Override + public void close() { + group.shutdownGracefully(); + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/package-info.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/package-info.java new file mode 100644 index 000000000000..8c8ed4e02319 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Streaming API: client and server to move raw binary data. + */ +package org.apache.hadoop.ozone.container.stream; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java index 6f6b6c6ce478..6453477fa9ac 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/stream/TestStreamingServer.java @@ -88,13 +88,15 @@ public void timeout() throws Exception { Files.write(sourceDir.resolve(SUBDIR).resolve("file1"), CONTENT); //WHEN: stream subdir - try (StreamingServer server = new StreamingServer(new DirectoryServerSource(sourceDir) { - @Override - public Map getFilesToStream(String id) throws InterruptedException { - Thread.sleep(3000L); - return super.getFilesToStream(id); - } - }, 0)) { + try (StreamingServer server = + new StreamingServer(new DirectoryServerSource(sourceDir) { + @Override + public Map getFilesToStream(String id) + throws InterruptedException { + Thread.sleep(3000L); + return super.getFilesToStream(id); + } + }, 0)) { server.start(); try (StreamingClient client = new StreamingClient("localhost", server.getPort(), @@ -109,8 +111,10 @@ public Map getFilesToStream(String id) throws InterruptedException } - private void streamDir(Path sourceDir, Path destDir, String subdir) throws InterruptedException { - try (StreamingServer server = new StreamingServer(new DirectoryServerSource(sourceDir), 0)) { + private void streamDir(Path sourceDir, Path destDir, String subdir) + throws InterruptedException { + try (StreamingServer server = new StreamingServer( + new DirectoryServerSource(sourceDir), 0)) { server.start(); try (StreamingClient client = new StreamingClient("localhost", server.getPort(), diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java index b7fe8b596e46..2cf251510aa5 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/StreamingGenerator.java @@ -34,24 +34,27 @@ import java.util.concurrent.Callable; @CommandLine.Command(name = "strmg", - aliases = "streaming-generator", - description = "Create directory structure and stream them multiple times.", - versionProvider = HddsVersionProvider.class, - mixinStandardHelpOptions = true, - showDefaultValues = true) + aliases = "streaming-generator", + description = + "Create directory structure and stream them multiple times.", + versionProvider = HddsVersionProvider.class, + mixinStandardHelpOptions = true, + showDefaultValues = true) public class StreamingGenerator extends BaseFreonGenerator - implements Callable { + implements Callable { - private static final Logger LOG = LoggerFactory.getLogger(StreamingGenerator.class); + private static final Logger LOG = + LoggerFactory.getLogger(StreamingGenerator.class); @CommandLine.Option(names = {"--root-dir"}, - description = "Directory where the working directories are created", - defaultValue = "/tmp/ozone-streaming") - Path testRoot; + description = "Directory where the working directories are created", + defaultValue = "/tmp/ozone-streaming") + private Path testRoot; @CommandLine.Option(names = {"--files"}, - description = "Number of the files in the test directory to be generated.", - defaultValue = "50") + description = "Number of the files in the test directory " + + "to be generated.", + defaultValue = "50") private int numberOfFiles; @CommandLine.Option(names = {"--size"}, @@ -86,10 +89,13 @@ private void generateBaseData() throws IOException { } Path subDir = sourceDir.resolve(subdir); Files.createDirectories(subDir); - ContentGenerator contentGenerator = new ContentGenerator(fileSize, 1024); + ContentGenerator contentGenerator = new ContentGenerator(fileSize, + 1024); for (int i = 0; i < numberOfFiles; i++) { - try (FileOutputStream out = new FileOutputStream(subDir.resolve("file-" + i).toFile())) { + try (FileOutputStream out = new FileOutputStream( + subDir.resolve("file-" + i).toFile()) + ) { contentGenerator.write(out); } } @@ -99,10 +105,13 @@ private void copyDir(long l) { Path sourceDir = testRoot.resolve("streaming-" + l); Path destinationDir = testRoot.resolve("streaming-" + (l + 1)); - try (StreamingServer server = new StreamingServer(new DirectoryServerSource(sourceDir), 1234)) { + try (StreamingServer server = + new StreamingServer(new DirectoryServerSource(sourceDir), + 1234)) { try { server.start(); - LOG.info("Starting streaming server on port {} to publish dir {}", port, sourceDir); + LOG.info("Starting streaming server on port {} to publish dir {}", + port, sourceDir); try (StreamingClient client = new StreamingClient("localhost", port, From 219ecdf710d8d474febfeef32a7efdc961928b19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Elek=20M=C3=A1rton?= Date: Wed, 19 May 2021 12:20:37 +0200 Subject: [PATCH 7/8] adress review comments --- .../ozone/container/stream/DirectoryServerSource.java | 5 +++++ .../ozone/container/stream/DirstreamServerHandler.java | 1 - .../hadoop/ozone/container/stream/StreamingClient.java | 10 +++++++++- 3 files changed, 14 insertions(+), 2 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java index 0c4d696876a3..8759d09f7f3d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirectoryServerSource.java @@ -35,6 +35,11 @@ public DirectoryServerSource(Path root) { } @Override + /** + * Return logicalNames and real file path to replicate. + * + * @param id name of the subdirectory to replitace relative to root. + */ public Map getFilesToStream(String id) throws InterruptedException { Map files = new HashMap<>(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java index 9320ceb268bc..4ee6fd8af726 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java @@ -72,7 +72,6 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) } if (headerProcessed) { - ChannelFuture lastFuture = null; final List> entriesToWrite = new ArrayList<>( source.getFilesToStream(id.toString().trim()).entrySet()); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java index 4e373e9fc7ab..8669fd870782 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/StreamingClient.java @@ -18,7 +18,11 @@ package org.apache.hadoop.ozone.container.stream; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.*; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; @@ -29,6 +33,9 @@ import static org.apache.hadoop.ozone.container.stream.DirstreamServerHandler.END_MARKER; +/** + * Client to stream huge binaries from a streamling server. + */ public class StreamingClient implements AutoCloseable { private final Bootstrap bootstrap; @@ -51,6 +58,7 @@ public StreamingClient( bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_RCVBUF, 1024 * 1024) + .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { From cb21fe5c354e2fc1d361d8081d9ab9d60ceba6f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Elek=20M=C3=A1rton?= Date: Fri, 21 May 2021 11:56:17 +0200 Subject: [PATCH 8/8] Fix end marker buffering --- .../container/stream/DirstreamServerHandler.java | 12 ++++++------ .../java/org/apache/hadoop/ozone/freon/Freon.java | 1 + 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java index 4ee6fd8af726..c609c2c85d62 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.java @@ -42,9 +42,6 @@ public class DirstreamServerHandler extends ChannelInboundHandlerAdapter { public static final String END_MARKER = "0 END"; - public static final ByteBuf END_MARKER_BUF = - Unpooled.wrappedBuffer(END_MARKER.getBytes(StandardCharsets.UTF_8)); - private final StringBuilder id = new StringBuilder(); private StreamingSource source; @@ -103,9 +100,12 @@ public void writeOneElement( new DefaultFileRegion(file.toFile(), 0, fileSize)); if (currentIndex == entriesToWrite.size() - 1) { nextFuture.addListener(a -> - ctx.writeAndFlush(END_MARKER_BUF).addListener(b -> { - ctx.channel().close(); - })); + ctx.writeAndFlush( + Unpooled.wrappedBuffer( + END_MARKER.getBytes(StandardCharsets.UTF_8))) + .addListener(b -> { + ctx.channel().close(); + })); } else { nextFuture.addListener( a -> writeOneElement(ctx, entriesToWrite, diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java index 8be1344328af..d09937caa547 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/Freon.java @@ -59,6 +59,7 @@ GeneratorOm.class, GeneratorScm.class, GeneratorDatanode.class, + ClosedContainerReplicator.class, StreamingGenerator.class}, versionProvider = HddsVersionProvider.class, mixinStandardHelpOptions = true)