diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java index 6ab974c2f1ea9..06e60b73b35d2 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeAutoDropIT.java @@ -24,10 +24,12 @@ import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStaticMeta; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq; +import org.apache.iotdb.consensus.ConsensusFactory; import org.apache.iotdb.db.it.utils.TestUtils; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.MultiClusterIT2DualTableManualEnhanced; +import org.apache.iotdb.itbase.env.BaseEnv; import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils; import org.apache.iotdb.pipe.it.dual.tablemodel.manual.AbstractPipeTableModelDualManualIT; import org.apache.iotdb.rpc.TSStatusCode; @@ -40,6 +42,7 @@ import java.sql.Connection; import java.sql.ResultSet; +import java.sql.SQLException; import java.sql.Statement; import java.util.Collections; import java.util.HashMap; @@ -49,6 +52,7 @@ import static org.apache.iotdb.util.MagicUtils.makeItCloseQuietly; import static org.awaitility.Awaitility.await; +import static org.junit.Assert.fail; @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2DualTableManualEnhanced.class}) @@ -60,6 +64,34 @@ public void setUp() { super.setUp(); } + protected void setupConfig() { + // Enable auto split + senderEnv + .getConfig() + .getCommonConfig() + .setAutoCreateSchemaEnabled(true) + .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setEnforceStrongPassword(false) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false); + receiverEnv + .getConfig() + .getCommonConfig() + .setAutoCreateSchemaEnabled(true) + .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) + .setEnforceStrongPassword(false) + .setPipeMemoryManagementEnabled(false) + .setIsPipeEnableMemoryCheck(false); + + // 10 min, assert that the operations will not time out + senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000); + receiverEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000); + + senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH"); + } + @Test public void testAutoDropInHistoricalTransfer() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); @@ -70,48 +102,30 @@ public void testAutoDropInHistoricalTransfer() throws Exception { TestUtils.executeNonQueryWithRetry(receiverEnv, "flush"); }; - final String receiverIp = receiverDataNode.getIp(); - final int receiverPort = receiverDataNode.getPort(); - - try (final SyncConfigNodeIServiceClient client = - (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { - - TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test"); - TableModelUtils.insertData("test", "test", 0, 100, senderEnv); - - final Map extractorAttributes = new HashMap<>(); - final Map processorAttributes = new HashMap<>(); - final Map connectorAttributes = new HashMap<>(); - - extractorAttributes.put("mode.snapshot", "true"); - extractorAttributes.put("capture.table", "true"); - extractorAttributes.put("user", "root"); - - connectorAttributes.put("connector", "iotdb-thrift-connector"); - connectorAttributes.put("connector.batch.enable", "false"); - connectorAttributes.put("connector.ip", receiverIp); - connectorAttributes.put("connector.port", Integer.toString(receiverPort)); - - final TSStatus status = - client.createPipe( - new TCreatePipeReq("p1", connectorAttributes) - .setExtractorAttributes(extractorAttributes) - .setProcessorAttributes(processorAttributes)); + // Create an ordinary full sync pipe + // The database & table name will be converted to lower case + final String sql = + String.format("create pipe a2b ('node-urls'='%s')", receiverDataNode.getIpAndPortString()); + try (final Connection connection = senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT); + final Statement statement = connection.createStatement()) { + statement.execute(sql); + } catch (SQLException e) { + fail(e.getMessage()); + } - Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); - Assert.assertEquals( - TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("p1").getCode()); + TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test"); + TableModelUtils.insertData("test", "test", 0, 100, senderEnv); - TestUtils.assertDataEventuallyOnEnv( - receiverEnv, - TableModelUtils.getQueryCountSql("test"), - "_col0,", - Collections.singleton("100,"), - "test", - handleFailure); - } + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + TableModelUtils.getQueryCountSql("test"), + "_col0,", + Collections.singleton("100,"), + "test", + handleFailure); - try (final Connection connection = makeItCloseQuietly(senderEnv.getConnection()); + try (final Connection connection = + makeItCloseQuietly(senderEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT)); final Statement statement = makeItCloseQuietly(connection.createStatement()); ) { ResultSet result = statement.executeQuery("show pipes"); await() @@ -124,9 +138,9 @@ public void testAutoDropInHistoricalTransfer() throws Exception { try { int pipeNum = 0; while (result.next()) { - if (!result - .getString(ColumnHeaderConstant.ID) - .contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)) { + final String pipeName = result.getString(ColumnHeaderConstant.ID); + if (!pipeName.contains(PipeStaticMeta.CONSENSUS_PIPE_PREFIX) + && pipeName.endsWith("_history")) { pipeNum++; } } @@ -157,25 +171,25 @@ public void testAutoDropInHistoricalTransferWithTimeRange() throws Exception { TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test"); TableModelUtils.insertData("test", "test", 0, 100, senderEnv); - final Map extractorAttributes = new HashMap<>(); + final Map sourceAttributes = new HashMap<>(); final Map processorAttributes = new HashMap<>(); - final Map connectorAttributes = new HashMap<>(); + final Map sinkAttributes = new HashMap<>(); - extractorAttributes.put("mode.snapshot", "true"); - extractorAttributes.put("capture.table", "true"); - extractorAttributes.put("start-time", "0"); - extractorAttributes.put("end-time", "49"); - extractorAttributes.put("user", "root"); + sourceAttributes.put("mode.snapshot", "true"); + sourceAttributes.put("capture.table", "true"); + sourceAttributes.put("start-time", "0"); + sourceAttributes.put("end-time", "49"); + sourceAttributes.put("user", "root"); - connectorAttributes.put("connector", "iotdb-thrift-connector"); - connectorAttributes.put("connector.batch.enable", "false"); - connectorAttributes.put("connector.ip", receiverIp); - connectorAttributes.put("connector.port", Integer.toString(receiverPort)); + sinkAttributes.put("sink", "iotdb-thrift-sink"); + sinkAttributes.put("sink.batch.enable", "false"); + sinkAttributes.put("sink.ip", receiverIp); + sinkAttributes.put("sink.port", Integer.toString(receiverPort)); final TSStatus status = client.createPipe( - new TCreatePipeReq("p1", connectorAttributes) - .setExtractorAttributes(extractorAttributes) + new TCreatePipeReq("p1", sinkAttributes) + .setExtractorAttributes(sourceAttributes) .setProcessorAttributes(processorAttributes)); Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java index d28edddb2f9b7..219c060892b4d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java @@ -2253,6 +2253,8 @@ public SettableFuture createPipe( Boolean.toString(false), PipeSourceConstant.EXTRACTOR_HISTORY_ENABLE_KEY, Boolean.toString(true), + PipeSourceConstant.EXTRACTOR_MODE_KEY, + PipeSourceConstant.EXTRACTOR_MODE_SNAPSHOT_VALUE, // We force the historical pipe to transfer data only // Thus we can transfer schema only once // And may drop the historical pipe on successfully transferred