From 8105a1e072db7fa20f7c491bee581acfbb3bbd97 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sat, 23 Aug 2025 23:05:05 -0400 Subject: [PATCH 1/7] feat(mongodb): upgrade MongoDB Java driver to version 5.5.0 Update MongoDB Java driver from 3.12.11 to 5.5.0 and refactor code to use new API Add mongo-bson dependency required by new driver version Replace deprecated MongoClient with MongoClients and update GridFS implementation --- .../beam/gradle/BeamModulePlugin.groovy | 3 +- it/mongodb/build.gradle | 1 + sdks/java/io/mongodb/build.gradle | 1 + .../apache/beam/sdk/io/mongodb/FindQuery.java | 5 +- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 158 ++++++++++-------- .../apache/beam/sdk/io/mongodb/MongoDbIO.java | 120 +++++++------ .../beam/sdk/io/mongodb/FindQueryTest.java | 5 +- .../sdk/io/mongodb/MongoDBGridFSIOTest.java | 114 ++++++------- .../beam/sdk/io/mongodb/MongoDbIOTest.java | 5 +- 9 files changed, 221 insertions(+), 191 deletions(-) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 4f40ed6fd04a..febc73c12cb9 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -840,7 +840,8 @@ class BeamModulePlugin implements Plugin { log4j2_log4j12_api : "org.apache.logging.log4j:log4j-1.2-api:$log4j2_version", mockito_core : "org.mockito:mockito-core:4.11.0", mockito_inline : "org.mockito:mockito-inline:4.11.0", - mongo_java_driver : "org.mongodb:mongo-java-driver:3.12.11", + mongo_java_driver : "org.mongodb:mongodb-driver-sync:5.5.0", + mongo_bson : "org.mongodb:bson:5.5.0", nemo_compiler_frontend_beam : "org.apache.nemo:nemo-compiler-frontend-beam:$nemo_version", netty_all : "io.netty:netty-all:$netty_version", netty_handler : "io.netty:netty-handler:$netty_version", diff --git a/it/mongodb/build.gradle b/it/mongodb/build.gradle index 6be9b91f5b34..960e15af8394 100644 --- a/it/mongodb/build.gradle +++ b/it/mongodb/build.gradle @@ -35,6 +35,7 @@ dependencies { implementation library.java.testcontainers_mongodb implementation library.java.google_code_gson implementation library.java.mongo_java_driver + implementation library.java.mongo_bson implementation library.java.vendored_guava_32_1_2_jre testImplementation library.java.mockito_core diff --git a/sdks/java/io/mongodb/build.gradle b/sdks/java/io/mongodb/build.gradle index b9e90082f0dc..e609040000e0 100644 --- a/sdks/java/io/mongodb/build.gradle +++ b/sdks/java/io/mongodb/build.gradle @@ -28,6 +28,7 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation library.java.joda_time implementation library.java.mongo_java_driver + implementation library.java.mongo_bson implementation library.java.slf4j_api implementation library.java.vendored_guava_32_1_2_jre testImplementation library.java.junit diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/FindQuery.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/FindQuery.java index 2131656d458a..d89db9dea54b 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/FindQuery.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/FindQuery.java @@ -21,7 +21,7 @@ import com.google.auto.value.AutoValue; import com.mongodb.BasicDBObject; -import com.mongodb.MongoClient; +import com.mongodb.MongoClientSettings; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.model.Projections; @@ -79,7 +79,8 @@ private FindQuery withFilters(BsonDocument filters) { /** Convert the Bson filters into a BsonDocument via default encoding. */ static BsonDocument bson2BsonDocument(Bson filters) { - return filters.toBsonDocument(BasicDBObject.class, MongoClient.getDefaultCodecRegistry()); + return filters.toBsonDocument( + BasicDBObject.class, MongoClientSettings.getDefaultCodecRegistry()); } /** Sets the filters to find. */ diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index 07cc238c7e6b..ebc436635f82 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -21,15 +21,18 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; -import com.mongodb.DB; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientURI; -import com.mongodb.gridfs.GridFS; -import com.mongodb.gridfs.GridFSDBFile; -import com.mongodb.gridfs.GridFSInputFile; -import com.mongodb.util.JSON; +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.gridfs.GridFSBucket; +import com.mongodb.client.gridfs.GridFSBuckets; +import com.mongodb.client.gridfs.GridFSDownloadStream; +import com.mongodb.client.gridfs.GridFSUploadStream; +import com.mongodb.client.gridfs.model.GridFSFile; +import com.mongodb.client.gridfs.model.GridFSUploadOptions; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -53,6 +56,7 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.bson.Document; import org.bson.types.ObjectId; import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; @@ -117,16 +121,18 @@ public class MongoDbGridFSIO { /** Callback for the parser to use to submit data. */ public interface ParserCallback extends Serializable { - /** Output the object. The default timestamp will be the GridFSDBFile creation timestamp. */ + /** Output the object. The default timestamp will be the GridFSFile creation timestamp. */ void output(T output); /** Output the object using the specified timestamp. */ void output(T output, Instant timestamp); } - /** Interface for the parser that is used to parse the GridFSDBFile into the appropriate types. */ + /** Interface for the parser that is used to parse the GridFSFile into the appropriate types. */ public interface Parser extends Serializable { - void parse(GridFSDBFile input, ParserCallback callback) throws IOException; + void parse( + GridFSFile gridFSFile, GridFSDownloadStream downloadStream, ParserCallback callback) + throws IOException; } /** @@ -134,11 +140,10 @@ public interface Parser extends Serializable { * file into Strings. It uses the timestamp of the file for the event timestamp. */ private static final Parser TEXT_PARSER = - (input, callback) -> { - final Instant time = new Instant(input.getUploadDate().getTime()); + (gridFSFile, downloadStream, callback) -> { + final Instant time = new Instant(gridFSFile.getUploadDate().getTime()); try (BufferedReader reader = - new BufferedReader( - new InputStreamReader(input.getInputStream(), StandardCharsets.UTF_8))) { + new BufferedReader(new InputStreamReader(downloadStream, StandardCharsets.UTF_8))) { for (String line = reader.readLine(); line != null; line = reader.readLine()) { callback.output(line, time); } @@ -197,12 +202,25 @@ static ConnectionConfiguration create( } MongoClient setupMongo() { - return uri() == null ? new MongoClient() : new MongoClient(new MongoClientURI(uri())); + if (uri() == null) { + return MongoClients.create(); + } else { + String uriString = uri(); + if (uriString == null) { + return MongoClients.create(); + } + MongoClientSettings settings = + MongoClientSettings.builder() + .applyConnectionString(new ConnectionString(uriString)) + .build(); + return MongoClients.create(settings); + } } - GridFS setupGridFS(MongoClient mongo) { - DB db = database() == null ? mongo.getDB("gridfs") : mongo.getDB(database()); - return bucket() == null ? new GridFS(db) : new GridFS(db, bucket()); + GridFSBucket setupGridFS(MongoClient mongo) { + MongoDatabase db = + database() == null ? mongo.getDatabase("gridfs") : mongo.getDatabase(database()); + return bucket() == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket()); } } @@ -313,12 +331,12 @@ public PCollection expand(PBegin input) { ParDo.of( new DoFn() { @Nullable MongoClient mongo; - @Nullable GridFS gridfs; + @Nullable GridFSBucket gridFSBucket; @Setup public void setup() { mongo = source.spec.connectionConfiguration().setupMongo(); - gridfs = source.spec.connectionConfiguration().setupGridFS(mongo); + gridFSBucket = source.spec.connectionConfiguration().setupGridFS(mongo); } @Teardown @@ -331,12 +349,18 @@ public void teardown() { @ProcessElement public void processElement(final ProcessContext c) throws IOException { - Preconditions.checkStateNotNull(gridfs); + GridFSBucket bucket = Preconditions.checkStateNotNull(gridFSBucket); ObjectId oid = c.element(); - GridFSDBFile file = gridfs.find(oid); + GridFSDownloadStream downloadStream = bucket.openDownloadStream(oid); + GridFSFile gridFSFile = + bucket.find(com.mongodb.client.model.Filters.eq("_id", oid)).first(); + if (gridFSFile == null) { + return; // Skip if file not found + } Parser parser = Preconditions.checkStateNotNull(parser()); parser.parse( - file, + gridFSFile, + downloadStream, new ParserCallback() { @Override public void output(T output, Instant timestamp) { @@ -378,12 +402,12 @@ protected static class BoundedGridFSSource extends BoundedSource { this.objectIds = objectIds; } - private DBCursor createCursor(GridFS gridfs) { + private MongoCursor createCursor(GridFSBucket gridFSBucket) { if (spec.filter() != null) { - DBObject query = (DBObject) JSON.parse(spec.filter()); - return gridfs.getFileList(query); + Document query = Document.parse(spec.filter()); + return gridFSBucket.find(query).iterator(); } - return gridfs.getFileList(); + return gridFSBucket.find().iterator(); } @Override @@ -391,20 +415,20 @@ public List> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { MongoClient mongo = spec.connectionConfiguration().setupMongo(); try { - GridFS gridfs = spec.connectionConfiguration().setupGridFS(mongo); - DBCursor cursor = createCursor(gridfs); + GridFSBucket gridFSBucket = spec.connectionConfiguration().setupGridFS(mongo); + MongoCursor cursor = createCursor(gridFSBucket); long size = 0; List list = new ArrayList<>(); List objects = new ArrayList<>(); while (cursor.hasNext()) { - GridFSDBFile file = (GridFSDBFile) cursor.next(); + GridFSFile file = cursor.next(); long len = file.getLength(); if ((size + len) > desiredBundleSizeBytes && !objects.isEmpty()) { list.add(new BoundedGridFSSource(spec, objects)); size = 0; objects = new ArrayList<>(); } - objects.add((ObjectId) file.getId()); + objects.add(file.getObjectId()); size += len; } if (!objects.isEmpty() || list.isEmpty()) { @@ -419,10 +443,11 @@ public List> split( @Override public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { try (MongoClient mongo = spec.connectionConfiguration().setupMongo(); - DBCursor cursor = createCursor(spec.connectionConfiguration().setupGridFS(mongo))) { + MongoCursor cursor = + createCursor(spec.connectionConfiguration().setupGridFS(mongo))) { long size = 0; while (cursor.hasNext()) { - GridFSDBFile file = (GridFSDBFile) cursor.next(); + GridFSFile file = cursor.next(); size += file.getLength(); } return size; @@ -456,7 +481,7 @@ static class GridFSReader extends BoundedSource.BoundedReader { final @Nullable List objects; @Nullable MongoClient mongo; - @Nullable DBCursor cursor; + @Nullable MongoCursor cursor; @Nullable Iterator iterator; @Nullable ObjectId current; @@ -474,8 +499,8 @@ public BoundedSource getCurrentSource() { public boolean start() throws IOException { if (objects == null) { mongo = source.spec.connectionConfiguration().setupMongo(); - GridFS gridfs = source.spec.connectionConfiguration().setupGridFS(mongo); - cursor = source.createCursor(gridfs); + GridFSBucket gridFSBucket = source.spec.connectionConfiguration().setupGridFS(mongo); + cursor = source.createCursor(gridFSBucket); } else { iterator = objects.iterator(); } @@ -488,8 +513,8 @@ public boolean advance() throws IOException { current = iterator.next(); return true; } else if (cursor != null && cursor.hasNext()) { - GridFSDBFile file = (GridFSDBFile) cursor.next(); - current = (ObjectId) file.getId(); + GridFSFile file = cursor.next(); + current = file.getObjectId(); return true; } current = null; @@ -628,9 +653,9 @@ private static class GridFsWriteFn extends DoFn { private final Write spec; private transient @Nullable MongoClient mongo; - private transient @Nullable GridFS gridfs; + private transient @Nullable GridFSBucket gridFSBucket; - private transient @Nullable GridFSInputFile gridFsFile; + private transient @Nullable GridFSUploadStream gridFsUploadStream; private transient @Nullable OutputStream outputStream; public GridFsWriteFn(Write spec) { @@ -640,20 +665,22 @@ public GridFsWriteFn(Write spec) { @Setup public void setup() throws Exception { mongo = spec.connectionConfiguration().setupMongo(); - gridfs = spec.connectionConfiguration().setupGridFS(mongo); + gridFSBucket = spec.connectionConfiguration().setupGridFS(mongo); } @StartBundle public void startBundle() { - GridFS gridfs = Preconditions.checkStateNotNull(this.gridfs); + GridFSBucket gridFSBucket = Preconditions.checkStateNotNull(this.gridFSBucket); String filename = Preconditions.checkStateNotNull(spec.filename()); - GridFSInputFile gridFsFile = gridfs.createFile(filename); + if (spec.chunkSize() != null) { - gridFsFile.setChunkSize(spec.chunkSize()); + gridFsUploadStream = + gridFSBucket.openUploadStream( + filename, new GridFSUploadOptions().chunkSizeBytes(spec.chunkSize().intValue())); + } else { + gridFsUploadStream = gridFSBucket.openUploadStream(filename); } - outputStream = gridFsFile.getOutputStream(); - - this.gridFsFile = gridFsFile; + outputStream = gridFsUploadStream; } @ProcessElement @@ -665,35 +692,20 @@ public void processElement(ProcessContext context) throws Exception { @FinishBundle public void finishBundle() throws Exception { - if (outputStream != null) { - OutputStream outputStream = this.outputStream; - outputStream.flush(); - outputStream.close(); - this.outputStream = null; - } - if (gridFsFile != null) { - gridFsFile = null; + GridFSUploadStream uploadStream = gridFsUploadStream; + if (uploadStream != null) { + uploadStream.flush(); + uploadStream.close(); + gridFsUploadStream = null; + outputStream = null; } } @Teardown public void teardown() throws Exception { - try { - if (outputStream != null) { - OutputStream outputStream = this.outputStream; - outputStream.flush(); - outputStream.close(); - this.outputStream = null; - } - if (gridFsFile != null) { - gridFsFile = null; - } - } finally { - if (mongo != null) { - mongo.close(); - mongo = null; - gridfs = null; - } + if (mongo != null) { + mongo.close(); + mongo = null; } } } diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java index 905c7418e26c..4bb26fe55921 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java @@ -22,12 +22,14 @@ import com.google.auto.value.AutoValue; import com.mongodb.BasicDBObject; +import com.mongodb.ConnectionString; import com.mongodb.MongoBulkWriteException; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientOptions; -import com.mongodb.MongoClientURI; +import com.mongodb.MongoClientSettings; +import com.mongodb.MongoClientSettings.Builder; import com.mongodb.MongoCommandException; import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; @@ -46,6 +48,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.net.ssl.SSLContext; import org.apache.beam.sdk.coders.Coder; @@ -362,22 +365,25 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - private static MongoClientOptions.Builder getOptions( + private static MongoClientSettings.Builder getOptions( int maxConnectionIdleTime, boolean sslEnabled, boolean sslInvalidHostNameAllowed, boolean ignoreSSLCertificate) { - MongoClientOptions.Builder optionsBuilder = new MongoClientOptions.Builder(); - optionsBuilder.maxConnectionIdleTime(maxConnectionIdleTime); + MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder(); + settingsBuilder.applyToConnectionPoolSettings( + builder -> builder.maxConnectionIdleTime(maxConnectionIdleTime, TimeUnit.MILLISECONDS)); if (sslEnabled) { - optionsBuilder.sslEnabled(sslEnabled).sslInvalidHostNameAllowed(sslInvalidHostNameAllowed); - if (ignoreSSLCertificate) { - SSLContext sslContext = SSLUtils.ignoreSSLCertificate(); - optionsBuilder.sslContext(sslContext); - optionsBuilder.socketFactory(sslContext.getSocketFactory()); - } + settingsBuilder.applyToSslSettings( + builder -> { + builder.enabled(sslEnabled).invalidHostNameAllowed(sslInvalidHostNameAllowed); + if (ignoreSSLCertificate) { + SSLContext sslContext = SSLUtils.ignoreSSLCertificate(); + builder.context(sslContext); + } + }); } - return optionsBuilder; + return settingsBuilder; } /** A MongoDB {@link BoundedSource} reading {@link Document} from a given instance. */ @@ -414,15 +420,15 @@ long getDocumentCount() { String uri = Preconditions.checkStateNotNull(spec.uri()); String database = Preconditions.checkStateNotNull(spec.database()); String collection = Preconditions.checkStateNotNull(spec.collection()); - try (MongoClient mongoClient = - new MongoClient( - new MongoClientURI( - uri, - getOptions( - spec.maxConnectionIdleTime(), - spec.sslEnabled(), - spec.sslInvalidHostNameAllowed(), - spec.ignoreSSLCertificate())))) { + MongoClientSettings settings = + getOptions( + spec.maxConnectionIdleTime(), + spec.sslEnabled(), + spec.sslInvalidHostNameAllowed(), + spec.ignoreSSLCertificate()) + .applyConnectionString(new ConnectionString(uri)) + .build(); + try (MongoClient mongoClient = MongoClients.create(settings)) { return getDocumentCount(mongoClient, database, collection); } catch (Exception e) { return -1; @@ -446,15 +452,15 @@ public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) { String uri = Preconditions.checkStateNotNull(spec.uri()); String database = Preconditions.checkStateNotNull(spec.database()); String collection = Preconditions.checkStateNotNull(spec.collection()); - try (MongoClient mongoClient = - new MongoClient( - new MongoClientURI( - uri, - getOptions( - spec.maxConnectionIdleTime(), - spec.sslEnabled(), - spec.sslInvalidHostNameAllowed(), - spec.ignoreSSLCertificate())))) { + MongoClientSettings settings = + getOptions( + spec.maxConnectionIdleTime(), + spec.sslEnabled(), + spec.sslInvalidHostNameAllowed(), + spec.ignoreSSLCertificate()) + .applyConnectionString(new ConnectionString(uri)) + .build(); + try (MongoClient mongoClient = MongoClients.create(settings)) { try { return getEstimatedSizeBytes(mongoClient, database, collection); } catch (MongoCommandException exception) { @@ -483,15 +489,15 @@ public List> split( String uri = Preconditions.checkStateNotNull(spec.uri()); String database = Preconditions.checkStateNotNull(spec.database()); String collection = Preconditions.checkStateNotNull(spec.collection()); - try (MongoClient mongoClient = - new MongoClient( - new MongoClientURI( - uri, - getOptions( - spec.maxConnectionIdleTime(), - spec.sslEnabled(), - spec.sslInvalidHostNameAllowed(), - spec.ignoreSSLCertificate())))) { + MongoClientSettings settings = + getOptions( + spec.maxConnectionIdleTime(), + spec.sslEnabled(), + spec.sslInvalidHostNameAllowed(), + spec.ignoreSSLCertificate()) + .applyConnectionString(new ConnectionString(uri)) + .build(); + try (MongoClient mongoClient = MongoClients.create(settings)) { MongoDatabase mongoDatabase = mongoClient.getDatabase(database); List splitKeys; @@ -690,7 +696,10 @@ static List splitKeysToMatch(List splitKeys) { lowestBound = splitKey; } return aggregates.stream() - .map(s -> s.toBsonDocument(BasicDBObject.class, MongoClient.getDefaultCodecRegistry())) + .map( + s -> + s.toBsonDocument( + BasicDBObject.class, MongoClientSettings.getDefaultCodecRegistry())) .collect(Collectors.toList()); } @@ -786,14 +795,15 @@ public void close() { private MongoClient createClient(Read spec) { String uri = Preconditions.checkStateNotNull(spec.uri(), "withUri() is required"); - return new MongoClient( - new MongoClientURI( - uri, - getOptions( + MongoClientSettings settings = + getOptions( spec.maxConnectionIdleTime(), spec.sslEnabled(), spec.sslInvalidHostNameAllowed(), - spec.ignoreSSLCertificate()))); + spec.ignoreSSLCertificate()) + .applyConnectionString(new ConnectionString(uri)) + .build(); + return MongoClients.create(settings); } } @@ -985,15 +995,15 @@ static class WriteFn extends DoFn { @Setup public void createMongoClient() { String uri = Preconditions.checkStateNotNull(spec.uri()); - client = - new MongoClient( - new MongoClientURI( - uri, - getOptions( - spec.maxConnectionIdleTime(), - spec.sslEnabled(), - spec.sslInvalidHostNameAllowed(), - spec.ignoreSSLCertificate()))); + MongoClientSettings settings = + getOptions( + spec.maxConnectionIdleTime(), + spec.sslEnabled(), + spec.sslInvalidHostNameAllowed(), + spec.ignoreSSLCertificate()) + .applyConnectionString(new ConnectionString(uri)) + .build(); + client = MongoClients.create(settings); } @StartBundle diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/FindQueryTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/FindQueryTest.java index df66179f3904..da90f92dc190 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/FindQueryTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/FindQueryTest.java @@ -21,7 +21,7 @@ import com.google.auto.value.AutoValue; import com.mongodb.BasicDBObject; -import com.mongodb.MongoClient; +import com.mongodb.MongoClientSettings; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.model.Projections; @@ -79,7 +79,8 @@ private FindQueryTest withFilters(BsonDocument filters) { /** Convert the Bson filters into a BsonDocument via default encoding. */ static BsonDocument bson2BsonDocument(Bson filters) { - return filters.toBsonDocument(BasicDBObject.class, MongoClient.getDefaultCodecRegistry()); + return filters.toBsonDocument( + BasicDBObject.class, MongoClientSettings.getDefaultCodecRegistry()); } /** Sets the filters to find. */ diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java index 09343606f228..d13185a08fb6 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java @@ -20,11 +20,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import com.mongodb.DB; -import com.mongodb.MongoClient; -import com.mongodb.gridfs.GridFS; -import com.mongodb.gridfs.GridFSDBFile; -import com.mongodb.gridfs.GridFSInputFile; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.gridfs.GridFSBucket; +import com.mongodb.client.gridfs.GridFSBuckets; +import com.mongodb.client.gridfs.GridFSUploadStream; +import com.mongodb.client.gridfs.model.GridFSFile; import de.flapdoodle.embed.mongo.MongodExecutable; import de.flapdoodle.embed.mongo.MongodProcess; import de.flapdoodle.embed.mongo.MongodStarter; @@ -35,12 +37,10 @@ import de.flapdoodle.embed.mongo.distribution.Version; import de.flapdoodle.embed.process.runtime.Network; import java.io.BufferedReader; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.OutputStream; import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -117,9 +117,9 @@ public static void start() throws Exception { LOG.info("Insert test data"); - MongoClient client = new MongoClient("localhost", port); - DB database = client.getDB(DATABASE); - GridFS gridfs = new GridFS(database); + MongoClient client = MongoClients.create("mongodb://localhost:" + port); + MongoDatabase database = client.getDatabase(DATABASE); + GridFSBucket gridfs = GridFSBuckets.create(database); ByteArrayOutputStream out = new ByteArrayOutputStream(); for (int x = 0; x < 100; x++) { @@ -129,10 +129,12 @@ public static void start() throws Exception { .getBytes(StandardCharsets.UTF_8)); } for (int x = 0; x < 5; x++) { - gridfs.createFile(new ByteArrayInputStream(out.toByteArray()), "file" + x).save(); + try (GridFSUploadStream uploadStream = gridfs.openUploadStream("file" + x)) { + uploadStream.write(out.toByteArray()); + } } - gridfs = new GridFS(database, "mapBucket"); + GridFSBucket mapBucketGridfs = GridFSBuckets.create(database, "mapBucket"); long now = System.currentTimeMillis(); Random random = new Random(); String[] scientists = { @@ -148,26 +150,25 @@ public static void start() throws Exception { "Maxwell" }; for (int x = 0; x < 10; x++) { - GridFSInputFile file = gridfs.createFile("file_" + x); - OutputStream outf = file.getOutputStream(); - OutputStreamWriter writer = new OutputStreamWriter(outf, StandardCharsets.UTF_8); - for (int y = 0; y < 5000; y++) { - long time = now - random.nextInt(3600000); - String name = scientists[y % scientists.length]; - writer.write(time + "\t"); - writer.write(name + "\t"); - writer.write(Integer.toString(random.nextInt(100))); - writer.write("\n"); - } - for (int y = 0; y < scientists.length; y++) { - String name = scientists[y % scientists.length]; - writer.write(now + "\t"); - writer.write(name + "\t"); - writer.write("101"); - writer.write("\n"); + try (GridFSUploadStream uploadStream = mapBucketGridfs.openUploadStream("file_" + x)) { + OutputStreamWriter writer = new OutputStreamWriter(uploadStream, StandardCharsets.UTF_8); + for (int y = 0; y < 5000; y++) { + long time = now - random.nextInt(3600000); + String name = scientists[y % scientists.length]; + writer.write(time + "\t"); + writer.write(name + "\t"); + writer.write(Integer.toString(random.nextInt(100))); + writer.write("\n"); + } + for (int y = 0; y < scientists.length; y++) { + String name = scientists[y % scientists.length]; + writer.write(now + "\t"); + writer.write(name + "\t"); + writer.write("101"); + writer.write("\n"); + } + writer.flush(); } - writer.flush(); - writer.close(); } client.close(); } @@ -208,11 +209,10 @@ public void testReadWithParser() { .withDatabase(DATABASE) .withBucket("mapBucket") .>withParser( - (input, callback) -> { + (gridFSFile, downloadStream, callback) -> { try (final BufferedReader reader = new BufferedReader( - new InputStreamReader( - input.getInputStream(), StandardCharsets.UTF_8))) { + new InputStreamReader(downloadStream, StandardCharsets.UTF_8))) { String line = reader.readLine(); while (line != null) { try (Scanner scanner = new Scanner(line.trim())) { @@ -311,19 +311,20 @@ public void testWriteMessage() throws Exception { MongoClient client = null; try { StringBuilder results = new StringBuilder(); - client = new MongoClient("localhost", port); - DB database = client.getDB(DATABASE); - GridFS gridfs = new GridFS(database, "WriteTest"); - List files = gridfs.find("WriteTestData"); - assertTrue(files.size() > 0); - for (GridFSDBFile file : files) { - assertEquals(100, file.getChunkSize()); - int l = (int) file.getLength(); - try (InputStream ins = file.getInputStream()) { - DataInputStream dis = new DataInputStream(ins); - byte[] b = new byte[l]; - dis.readFully(b); - results.append(new String(b, StandardCharsets.UTF_8)); + client = MongoClients.create("mongodb://localhost:" + port); + MongoDatabase database = client.getDatabase(DATABASE); + GridFSBucket gridfs = GridFSBuckets.create(database, "WriteTest"); + + for (GridFSFile file : gridfs.find()) { + if (file.getFilename().equals("WriteTestData")) { + assertEquals(100, file.getChunkSize()); + int l = (int) file.getLength(); + try (InputStream ins = gridfs.openDownloadStream(file.getObjectId())) { + DataInputStream dis = new DataInputStream(ins); + byte[] b = new byte[l]; + dis.readFully(b); + results.append(new String(b, StandardCharsets.UTF_8)); + } } } String dataString = results.toString(); @@ -331,16 +332,17 @@ public void testWriteMessage() throws Exception { assertTrue(dataString.contains("Message " + x)); } - files = gridfs.find("WriteTestIntData"); boolean[] intResults = new boolean[100]; - for (GridFSDBFile file : files) { - int l = (int) file.getLength(); - try (InputStream ins = file.getInputStream()) { - DataInputStream dis = new DataInputStream(ins); - byte[] b = new byte[l]; - dis.readFully(b); - for (byte aB : b) { - intResults[aB] = true; + for (GridFSFile file : gridfs.find()) { + if (file.getFilename().equals("WriteTestIntData")) { + int l = (int) file.getLength(); + try (InputStream ins = gridfs.openDownloadStream(file.getObjectId())) { + DataInputStream dis = new DataInputStream(ins); + byte[] b = new byte[l]; + dis.readFully(b); + for (byte aB : b) { + intResults[aB] = true; + } } } } diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java index 4dda988e355c..cc85db937975 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java @@ -21,7 +21,8 @@ import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; -import com.mongodb.MongoClient; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; @@ -107,7 +108,7 @@ public static void beforeClass() throws Exception { .build(); mongodExecutable = mongodStarter.prepare(mongodConfig); mongodProcess = mongodExecutable.start(); - client = new MongoClient("localhost", port); + client = MongoClients.create("mongodb://localhost:" + port); database = client.getDatabase(DATABASE_NAME); LOG.info("Insert test data"); From 58ef7fc4a8a31fae0b208a64550cc7edac5cb905 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sun, 24 Aug 2025 11:02:01 -0400 Subject: [PATCH 2/7] refactor(mongodb): update MongoDB client usage to modern API Replace deprecated MongoClient with MongoClients.create() and update database drop method --- .../sql/meta/provider/mongodb/MongoDbReadWriteIT.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java index 76be08fe9a6e..804639cacfc3 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java @@ -31,7 +31,8 @@ import static org.hamcrest.core.IsInstanceOf.instanceOf; import com.mongodb.BasicDBObject; -import com.mongodb.MongoClient; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; @@ -128,14 +129,14 @@ public static void setUp() throws Exception { .build(); mongodExecutable = mongodStarter.prepare(mongodConfig); mongodProcess = mongodExecutable.start(); - client = new MongoClient(hostname, port); + client = MongoClients.create("mongodb://" + hostname + ":" + port); mongoSqlUrl = String.format("mongodb://%s:%d/%s/%s", hostname, port, database, collection); } @AfterClass public static void tearDown() throws Exception { - client.dropDatabase(database); + client.getDatabase(database).drop(); client.close(); mongodProcess.stop(); mongodExecutable.stop(); From ea4ed1b8f889946e3b286f28be24b2ecb480a8db Mon Sep 17 00:00:00 2001 From: liferoad Date: Sun, 24 Aug 2025 12:07:01 -0400 Subject: [PATCH 3/7] build(dependencies): add mongodb driver core dependency Add mongodb-driver-core to support MongoDB Java driver functionality. Also mark mongo_java_driver as permitUnusedDeclared and add testImplementation. --- .../groovy/org/apache/beam/gradle/BeamModulePlugin.groovy | 1 + sdks/java/extensions/sql/build.gradle | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index febc73c12cb9..5d3b26de2560 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -842,6 +842,7 @@ class BeamModulePlugin implements Plugin { mockito_inline : "org.mockito:mockito-inline:4.11.0", mongo_java_driver : "org.mongodb:mongodb-driver-sync:5.5.0", mongo_bson : "org.mongodb:bson:5.5.0", + mongodb_driver_core : "org.mongodb:mongodb-driver-core:5.5.0", nemo_compiler_frontend_beam : "org.apache.nemo:nemo-compiler-frontend-beam:$nemo_version", netty_all : "io.netty:netty-all:$netty_version", netty_handler : "io.netty:netty-handler:$netty_version", diff --git a/sdks/java/extensions/sql/build.gradle b/sdks/java/extensions/sql/build.gradle index af8b6cba1742..5527493200f7 100644 --- a/sdks/java/extensions/sql/build.gradle +++ b/sdks/java/extensions/sql/build.gradle @@ -92,6 +92,9 @@ dependencies { implementation "org.codehaus.janino:commons-compiler:3.0.11" implementation library.java.jackson_core implementation library.java.mongo_java_driver + permitUnusedDeclared library.java.mongo_java_driver + implementation library.java.mongo_bson + implementation library.java.mongodb_driver_core implementation library.java.slf4j_api implementation library.java.joda_time implementation library.java.vendored_guava_32_1_2_jre @@ -131,6 +134,7 @@ dependencies { testImplementation library.java.kafka_clients testImplementation project(":sdks:java:io:kafka") testImplementation project(path: ":sdks:java:io:mongodb", configuration: "testRuntimeMigration") + testImplementation library.java.mongo_java_driver testImplementation project(path: ":sdks:java:io:thrift", configuration: "testRuntimeMigration") testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration") testCompileOnly project(":sdks:java:extensions:sql:udf-test-provider") From edb3e1232f488d78ecf36f2edcca4cf494df7ab1 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sun, 24 Aug 2025 13:17:49 -0400 Subject: [PATCH 4/7] fix(mongodb): update embedded mongo version and fix split key filtering Update embedded MongoDB test dependency to version 3.5.4 and simplify split key filtering logic by using BsonObjectId for range queries. This ensures proper type handling when filtering MongoDB documents by _id field. --- sdks/java/io/mongodb/build.gradle | 3 +-- .../apache/beam/sdk/io/mongodb/MongoDbIO.java | 27 +++++++++++++------ 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/mongodb/build.gradle b/sdks/java/io/mongodb/build.gradle index e609040000e0..fe3f9b414b9c 100644 --- a/sdks/java/io/mongodb/build.gradle +++ b/sdks/java/io/mongodb/build.gradle @@ -34,8 +34,7 @@ dependencies { testImplementation library.java.junit testImplementation project(path: ":sdks:java:io:common") testImplementation project(path: ":sdks:java:testing:test-utils") - testImplementation "de.flapdoodle.embed:de.flapdoodle.embed.mongo:3.0.0" - testImplementation "de.flapdoodle.embed:de.flapdoodle.embed.process:3.0.0" + testImplementation "de.flapdoodle.embed:de.flapdoodle.embed.mongo:3.5.4" testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") } diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java index 4bb26fe55921..1283e873f2b6 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java @@ -67,6 +67,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.bson.BsonDocument; import org.bson.BsonInt32; +import org.bson.BsonObjectId; import org.bson.BsonString; import org.bson.Document; import org.bson.conversions.Bson; @@ -677,20 +678,30 @@ static List splitKeysToMatch(List splitKeys) { if (i == 0) { aggregates.add(Aggregates.match(Filters.lte("_id", splitKey))); if (splitKeys.size() == 1) { - aggregates.add(Aggregates.match(Filters.and(Filters.gt("_id", splitKey)))); + aggregates.add(Aggregates.match(Filters.gt("_id", splitKey))); } } else if (i == splitKeys.size() - 1) { // this is the last split in the list, the filters define // the range from the previous split to the current split and also // the current split to the end - aggregates.add( - Aggregates.match( - Filters.and(Filters.gt("_id", lowestBound), Filters.lte("_id", splitKey)))); - aggregates.add(Aggregates.match(Filters.and(Filters.gt("_id", splitKey)))); + // Create a custom BSON document with multiple conditions on the same field + BsonDocument rangeFilter = + new BsonDocument( + "_id", + new BsonDocument( + "$gt", new BsonObjectId(Preconditions.checkStateNotNull(lowestBound))) + .append("$lte", new BsonObjectId(splitKey))); + aggregates.add(Aggregates.match(rangeFilter)); + aggregates.add(Aggregates.match(Filters.gt("_id", splitKey))); } else { - aggregates.add( - Aggregates.match( - Filters.and(Filters.gt("_id", lowestBound), Filters.lte("_id", splitKey)))); + // Create a custom BSON document with multiple conditions on the same field + BsonDocument rangeFilter = + new BsonDocument( + "_id", + new BsonDocument( + "$gt", new BsonObjectId(Preconditions.checkStateNotNull(lowestBound))) + .append("$lte", new BsonObjectId(splitKey))); + aggregates.add(Aggregates.match(rangeFilter)); } lowestBound = splitKey; From 10c122d0c6ed3bebca7cec54054d5f239c905680 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sun, 24 Aug 2025 13:41:12 -0400 Subject: [PATCH 5/7] build: add mongodb-driver-core dependency Add mongodb-driver-core version 5.5.0 to support MongoDB Java driver functionality --- sdks/java/io/mongodb/build.gradle | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/io/mongodb/build.gradle b/sdks/java/io/mongodb/build.gradle index fe3f9b414b9c..3cb153aaec14 100644 --- a/sdks/java/io/mongodb/build.gradle +++ b/sdks/java/io/mongodb/build.gradle @@ -29,6 +29,7 @@ dependencies { implementation library.java.joda_time implementation library.java.mongo_java_driver implementation library.java.mongo_bson + implementation "org.mongodb:mongodb-driver-core:5.5.0" implementation library.java.slf4j_api implementation library.java.vendored_guava_32_1_2_jre testImplementation library.java.junit From 4295332af4b7335aa1caaf3f60d856b7ca54e752 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sun, 24 Aug 2025 16:58:11 -0400 Subject: [PATCH 6/7] use version --- sdks/java/io/mongodb/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/mongodb/build.gradle b/sdks/java/io/mongodb/build.gradle index 3cb153aaec14..56d29750dead 100644 --- a/sdks/java/io/mongodb/build.gradle +++ b/sdks/java/io/mongodb/build.gradle @@ -29,7 +29,7 @@ dependencies { implementation library.java.joda_time implementation library.java.mongo_java_driver implementation library.java.mongo_bson - implementation "org.mongodb:mongodb-driver-core:5.5.0" + implementation library.java.mongodb_driver_core implementation library.java.slf4j_api implementation library.java.vendored_guava_32_1_2_jre testImplementation library.java.junit From 9aa767bf63325a7b978aad6721c8cbdaa5ac1cb2 Mon Sep 17 00:00:00 2001 From: liferoad Date: Sun, 24 Aug 2025 17:02:46 -0400 Subject: [PATCH 7/7] refactor: simplify mongo client creation logic Remove redundant null check and consolidate uri handling in MongoDbGridFSIO --- .../beam/sdk/io/mongodb/MongoDbGridFSIO.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index ebc436635f82..71f8b291e0d5 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -204,17 +204,12 @@ static ConnectionConfiguration create( MongoClient setupMongo() { if (uri() == null) { return MongoClients.create(); - } else { - String uriString = uri(); - if (uriString == null) { - return MongoClients.create(); - } - MongoClientSettings settings = - MongoClientSettings.builder() - .applyConnectionString(new ConnectionString(uriString)) - .build(); - return MongoClients.create(settings); } + MongoClientSettings settings = + MongoClientSettings.builder() + .applyConnectionString(new ConnectionString(Preconditions.checkStateNotNull(uri()))) + .build(); + return MongoClients.create(settings); } GridFSBucket setupGridFS(MongoClient mongo) {