From 79dd1dcb33f9eb4e774e9963ea11addc7fa4b80e Mon Sep 17 00:00:00 2001 From: Junyu Chen <10862251+junyuc25@users.noreply.github.com> Date: Mon, 22 Aug 2022 10:41:36 +0000 Subject: [PATCH 1/4] Shutdown CloudWatch reporter when query completes Add delta streamer tests Only shutdown in non-continuous mode --- .../java/org/apache/hudi/metrics/Metrics.java | 7 +++++- .../scala/org/apache/hudi/DefaultSource.scala | 1 + .../apache/hudi/HoodieSparkSqlWriter.scala | 5 ++++ .../hudi/functional/TestCOWDataSource.scala | 2 ++ .../deltastreamer/HoodieDeltaStreamer.java | 2 ++ .../functional/TestHoodieDeltaStreamer.java | 25 +++++++++++++------ 6 files changed, 34 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java index b570f512f371d..d9f22bca01ed4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/Metrics.java @@ -53,7 +53,7 @@ private Metrics(HoodieWriteConfig metricConfig) { } reporter.start(); - Runtime.getRuntime().addShutdownHook(new Thread(this::reportAndCloseReporter)); + Runtime.getRuntime().addShutdownHook(new Thread(Metrics::shutdown)); } private void reportAndCloseReporter() { @@ -61,6 +61,7 @@ private void reportAndCloseReporter() { registerHoodieCommonMetrics(); reporter.report(); if (getReporter() != null) { + LOG.info("Closing metrics reporter..."); getReporter().close(); } } catch (Exception e) { @@ -139,4 +140,8 @@ public MetricRegistry getRegistry() { public Closeable getReporter() { return reporter.getReporter(); } + + public static boolean isInitialized() { + return initialized; + } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index a141c60f3dd81..bc851d8ed6a3b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -181,6 +181,7 @@ class DefaultSource extends RelationProvider HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols) } else { HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols) + HoodieSparkSqlWriter.cleanup() } new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema) } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index fed3353552155..028de00f7ff47 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -44,6 +44,7 @@ import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter import org.apache.hudi.internal.schema.utils.{AvroSchemaEvolutionUtils, SerDeHelper} import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.keygen.{TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator} +import org.apache.hudi.metrics.Metrics import org.apache.hudi.sync.common.HoodieSyncConfig import org.apache.hudi.sync.common.util.SyncUtilHelpers import org.apache.hudi.table.BulkInsertPartitioner @@ -594,6 +595,10 @@ object HoodieSparkSqlWriter { (syncHiveSuccess, common.util.Option.ofNullable(instantTime)) } + def cleanup() : Unit = { + Metrics.shutdown() + } + private def handleSaveModes(spark: SparkSession, mode: SaveMode, tablePath: Path, tableConfig: HoodieTableConfig, tableName: String, operation: WriteOperationType, fs: FileSystem): Unit = { if (mode == SaveMode.Append && tableExists) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 6697ec1514cdd..cdfc2d4b5831e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -31,6 +31,7 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.{HoodieException, HoodieUpsertException} import org.apache.hudi.keygen._ import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config +import org.apache.hudi.metrics.Metrics import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.util.JFunction import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils} @@ -738,6 +739,7 @@ class TestCOWDataSource extends HoodieClientTestBase { .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, commitInstantTime1) .load(basePath) assertEquals(N + 1, hoodieIncViewDF1.count()) + assertEquals(false, Metrics.isInitialized) } @Test def testSchemaEvolution(): Unit = { diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java index 0f403cd266028..367a121e9f783 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java @@ -53,6 +53,7 @@ import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.hive.HiveSyncTool; +import org.apache.hudi.metrics.Metrics; import org.apache.hudi.utilities.HiveIncrementalPuller; import org.apache.hudi.utilities.IdentitySplitter; import org.apache.hudi.utilities.UtilHelpers; @@ -208,6 +209,7 @@ public void sync() throws Exception { throw ex; } finally { deltaSyncService.ifPresent(DeltaSyncService::close); + Metrics.shutdown(); LOG.info("Shut down delta streamer"); } } diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 69d6dd7d3b298..dc91605f35b4d 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -56,6 +56,7 @@ import org.apache.hudi.hive.HiveSyncConfig; import org.apache.hudi.hive.HoodieHiveSyncClient; import org.apache.hudi.keygen.SimpleKeyGenerator; +import org.apache.hudi.metrics.Metrics; import org.apache.hudi.utilities.DummySchemaProvider; import org.apache.hudi.utilities.HoodieClusteringJob; import org.apache.hudi.utilities.HoodieIndexer; @@ -739,30 +740,36 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, @Test public void testUpsertsCOWContinuousMode() throws Exception { - testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow"); + testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow", false, true); + } + + @Test + public void testUpsertsCOW_ContinuousModeDisabled() throws Exception { + testUpserts(HoodieTableType.COPY_ON_WRITE, "non_continuous_cow", false, false); } @Test public void testUpsertsCOWContinuousModeShutdownGracefully() throws Exception { - testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true); + testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow_shutdown_gracefully", true, true); } @Test public void testUpsertsMORContinuousMode() throws Exception { - testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor"); + testUpserts(HoodieTableType.MERGE_ON_READ, "continuous_mor", false, true); } - private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception { - testUpsertsContinuousMode(tableType, tempDir, false); + @Test + public void testUpsertsMOR_ContinuousModeDisabled() throws Exception { + testUpserts(HoodieTableType.MERGE_ON_READ, "non_continuous_mor", false, false); } - private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully) throws Exception { + private void testUpserts(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully, boolean continuousMode) throws Exception { String tableBasePath = dfsBasePath + "/" + tempDir; // Keep it higher than batch-size to test continuous mode int totalRecords = 3000; // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); - cfg.continuousMode = true; + cfg.continuousMode = continuousMode; if (testShutdownGracefully) { cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName(); } @@ -782,6 +789,10 @@ private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir if (testShutdownGracefully) { TestDataSource.returnEmptyBatch = true; } + + if (!cfg.continuousMode) { + assertFalse(Metrics.isInitialized()); + } return true; }); } From 1085dd46bc650232b46e25b8d53c1ef540beb8f1 Mon Sep 17 00:00:00 2001 From: Junyu Chen <10862251+junyuc25@users.noreply.github.com> Date: Wed, 24 Aug 2022 12:47:47 +0000 Subject: [PATCH 2/4] Add first test --- .../hudi/functional/TestCOWDataSource.scala | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index cdfc2d4b5831e..51f0d89cc11fb 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -19,6 +19,7 @@ package org.apache.hudi.functional import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.HoodieConversionUtils.toJavaOption +import org.apache.hudi.QuickstartUtils.{convertToStringList, getQuickstartWriteConfigs} import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.timeline.HoodieInstant @@ -28,13 +29,14 @@ import org.apache.hudi.common.testutils.RawTripTestPayload.{deleteRecordsToStrin import org.apache.hudi.common.util import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH import org.apache.hudi.config.HoodieWriteConfig +import org.apache.hudi.config.metrics.HoodieMetricsConfig import org.apache.hudi.exception.{HoodieException, HoodieUpsertException} import org.apache.hudi.keygen._ import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config import org.apache.hudi.metrics.Metrics import org.apache.hudi.testutils.HoodieClientTestBase import org.apache.hudi.util.JFunction -import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, HoodieSparkUtils} +import org.apache.hudi.{AvroConversionUtils, DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers, QuickstartUtils} import org.apache.spark.sql._ import org.apache.spark.sql.functions.{col, concat, lit, udf} import org.apache.spark.sql.hudi.HoodieSparkSessionExtension @@ -1026,4 +1028,27 @@ class TestCOWDataSource extends HoodieClientTestBase { .saveAsTable("hoodie_test") assertEquals(spark.read.format("hudi").load(basePath).count(), 9) } + + @Test + def testInsertMetricsReporterEnabled(): Unit = { + val dataGenerator = new QuickstartUtils.DataGenerator() + val records = convertToStringList(dataGenerator.generateInserts( 10)) +// println("Printing Records: " + records) + val recordsRDD = spark.sparkContext.parallelize(records, 2) + val inputDF = spark.read.json(sparkSession.createDataset(recordsRDD)(Encoders.STRING)) + inputDF.write.format("hudi") + .options(getQuickstartWriteConfigs) + .option(DataSourceWriteOptions.RECORDKEY_FIELD.key, "uuid") + .option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key, "partitionpath") + .option(DataSourceWriteOptions.PRECOMBINE_FIELD.key, "ts") + .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) + .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test") + .option(HoodieMetricsConfig.TURN_METRICS_ON.key(), "true") + .option(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), "JMX") + .mode(SaveMode.Overwrite) + .save(basePath) + + assertTrue(HoodieDataSourceHelpers.hasNewCommits(fs, basePath, "000")) + assertEquals(false, Metrics.isInitialized, "Metrics should be shutdown") + } } From c73c1f9df232e6e2cb8aade97ef6d264cf467c6f Mon Sep 17 00:00:00 2001 From: Junyu Chen <10862251+junyuc25@users.noreply.github.com> Date: Thu, 25 Aug 2022 15:02:02 +0000 Subject: [PATCH 3/4] Add tests for metrics reporter --- .../hudi/functional/TestCOWDataSource.scala | 1 - .../functional/TestHoodieDeltaStreamer.java | 42 ++++++++++++++----- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index 51f0d89cc11fb..b964b9cd84e10 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -1033,7 +1033,6 @@ class TestCOWDataSource extends HoodieClientTestBase { def testInsertMetricsReporterEnabled(): Unit = { val dataGenerator = new QuickstartUtils.DataGenerator() val records = convertToStringList(dataGenerator.generateInserts( 10)) -// println("Printing Records: " + records) val recordsRDD = spark.sparkContext.parallelize(records, 2) val inputDF = spark.read.json(sparkSession.createDataset(recordsRDD)(Encoders.STRING)) inputDF.write.format("hudi") diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index dc91605f35b4d..2f6bd9c15d931 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -135,6 +135,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.hudi.config.metrics.HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE; +import static org.apache.hudi.config.metrics.HoodieMetricsConfig.TURN_METRICS_ON; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_PARTITION_FIELDS; import static org.apache.hudi.sync.common.HoodieSyncConfig.META_SYNC_TABLE_NAME; import static org.apache.hudi.utilities.UtilHelpers.EXECUTE; @@ -740,36 +742,58 @@ public void testSchemaEvolution(String tableType, boolean useUserProvidedSchema, @Test public void testUpsertsCOWContinuousMode() throws Exception { - testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow", false, true); + testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow"); } @Test public void testUpsertsCOW_ContinuousModeDisabled() throws Exception { - testUpserts(HoodieTableType.COPY_ON_WRITE, "non_continuous_cow", false, false); + String tableBasePath = dfsBasePath + "/non_continuous_cow"; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); + cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); + cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true")); + cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "JMX")); + cfg.continuousMode = false; + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + ds.sync(); + TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); + assertFalse(Metrics.isInitialized(), "Metrics should be shutdown"); } @Test public void testUpsertsCOWContinuousModeShutdownGracefully() throws Exception { - testUpserts(HoodieTableType.COPY_ON_WRITE, "continuous_cow_shutdown_gracefully", true, true); + testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow", true); } @Test public void testUpsertsMORContinuousMode() throws Exception { - testUpserts(HoodieTableType.MERGE_ON_READ, "continuous_mor", false, true); + testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor"); } @Test public void testUpsertsMOR_ContinuousModeDisabled() throws Exception { - testUpserts(HoodieTableType.MERGE_ON_READ, "non_continuous_mor", false, false); + String tableBasePath = dfsBasePath + "/non_continuous_mor"; + HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); + cfg.tableType = HoodieTableType.MERGE_ON_READ.name(); + cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true")); + cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE")); + cfg.continuousMode = false; + HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); + ds.sync(); + TestHelpers.assertRecordCount(SQL_SOURCE_NUM_RECORDS, tableBasePath, sqlContext); + assertFalse(Metrics.isInitialized(), "Metrics should be shutdown"); + } + + private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir) throws Exception { + testUpsertsContinuousMode(tableType, tempDir, false); } - private void testUpserts(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully, boolean continuousMode) throws Exception { + private void testUpsertsContinuousMode(HoodieTableType tableType, String tempDir, boolean testShutdownGracefully) throws Exception { String tableBasePath = dfsBasePath + "/" + tempDir; // Keep it higher than batch-size to test continuous mode int totalRecords = 3000; // Initial bulk insert HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); - cfg.continuousMode = continuousMode; + cfg.continuousMode = true; if (testShutdownGracefully) { cfg.postWriteTerminationStrategyClass = NoNewDataTerminationStrategy.class.getName(); } @@ -789,10 +813,6 @@ private void testUpserts(HoodieTableType tableType, String tempDir, boolean test if (testShutdownGracefully) { TestDataSource.returnEmptyBatch = true; } - - if (!cfg.continuousMode) { - assertFalse(Metrics.isInitialized()); - } return true; }); } From 2e4b3c86879105d680c419842078dbbff893434b Mon Sep 17 00:00:00 2001 From: Junyu Chen <10862251+junyuc25@users.noreply.github.com> Date: Mon, 5 Sep 2022 05:42:48 +0000 Subject: [PATCH 4/4] Invoke cleanup for bootstrap use case --- .../src/main/scala/org/apache/hudi/DefaultSource.scala | 3 ++- .../scala/org/apache/hudi/functional/TestCOWDataSource.scala | 4 ++-- .../hudi/utilities/functional/TestHoodieDeltaStreamer.java | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala index bc851d8ed6a3b..348a6056c8883 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala @@ -181,8 +181,9 @@ class DefaultSource extends RelationProvider HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols) } else { HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols) - HoodieSparkSqlWriter.cleanup() } + + HoodieSparkSqlWriter.cleanup() new HoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index b964b9cd84e10..241223be7e333 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -1030,7 +1030,7 @@ class TestCOWDataSource extends HoodieClientTestBase { } @Test - def testInsertMetricsReporterEnabled(): Unit = { + def testMetricsReporterViaDataSource(): Unit = { val dataGenerator = new QuickstartUtils.DataGenerator() val records = convertToStringList(dataGenerator.generateInserts( 10)) val recordsRDD = spark.sparkContext.parallelize(records, 2) @@ -1043,7 +1043,7 @@ class TestCOWDataSource extends HoodieClientTestBase { .option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .option(HoodieWriteConfig.TBL_NAME.key, "hoodie_test") .option(HoodieMetricsConfig.TURN_METRICS_ON.key(), "true") - .option(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), "JMX") + .option(HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE") .mode(SaveMode.Overwrite) .save(basePath) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java index 2f6bd9c15d931..741ff8010a9ef 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java @@ -751,7 +751,7 @@ public void testUpsertsCOW_ContinuousModeDisabled() throws Exception { HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT); cfg.tableType = HoodieTableType.COPY_ON_WRITE.name(); cfg.configs.add(String.format("%s=%s", TURN_METRICS_ON.key(), "true")); - cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "JMX")); + cfg.configs.add(String.format("%s=%s", METRICS_REPORTER_TYPE_VALUE.key(), "CONSOLE")); cfg.continuousMode = false; HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc); ds.sync();