diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 4f40ed6fd04a..5d3b26de2560 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -840,7 +840,9 @@ class BeamModulePlugin implements Plugin { log4j2_log4j12_api : "org.apache.logging.log4j:log4j-1.2-api:$log4j2_version", mockito_core : "org.mockito:mockito-core:4.11.0", mockito_inline : "org.mockito:mockito-inline:4.11.0", - mongo_java_driver : "org.mongodb:mongo-java-driver:3.12.11", + mongo_java_driver : "org.mongodb:mongodb-driver-sync:5.5.0", + mongo_bson : "org.mongodb:bson:5.5.0", + mongodb_driver_core : "org.mongodb:mongodb-driver-core:5.5.0", nemo_compiler_frontend_beam : "org.apache.nemo:nemo-compiler-frontend-beam:$nemo_version", netty_all : "io.netty:netty-all:$netty_version", netty_handler : "io.netty:netty-handler:$netty_version", diff --git a/it/mongodb/build.gradle b/it/mongodb/build.gradle index 6be9b91f5b34..960e15af8394 100644 --- a/it/mongodb/build.gradle +++ b/it/mongodb/build.gradle @@ -35,6 +35,7 @@ dependencies { implementation library.java.testcontainers_mongodb implementation library.java.google_code_gson implementation library.java.mongo_java_driver + implementation library.java.mongo_bson implementation library.java.vendored_guava_32_1_2_jre testImplementation library.java.mockito_core diff --git a/sdks/java/extensions/sql/build.gradle b/sdks/java/extensions/sql/build.gradle index af8b6cba1742..5527493200f7 100644 --- a/sdks/java/extensions/sql/build.gradle +++ b/sdks/java/extensions/sql/build.gradle @@ -92,6 +92,9 @@ dependencies { implementation "org.codehaus.janino:commons-compiler:3.0.11" implementation library.java.jackson_core implementation library.java.mongo_java_driver + permitUnusedDeclared library.java.mongo_java_driver + implementation library.java.mongo_bson + implementation library.java.mongodb_driver_core implementation library.java.slf4j_api implementation library.java.joda_time implementation library.java.vendored_guava_32_1_2_jre @@ -131,6 +134,7 @@ dependencies { testImplementation library.java.kafka_clients testImplementation project(":sdks:java:io:kafka") testImplementation project(path: ":sdks:java:io:mongodb", configuration: "testRuntimeMigration") + testImplementation library.java.mongo_java_driver testImplementation project(path: ":sdks:java:io:thrift", configuration: "testRuntimeMigration") testImplementation project(path: ":sdks:java:extensions:protobuf", configuration: "testRuntimeMigration") testCompileOnly project(":sdks:java:extensions:sql:udf-test-provider") diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java index 76be08fe9a6e..804639cacfc3 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/mongodb/MongoDbReadWriteIT.java @@ -31,7 +31,8 @@ import static org.hamcrest.core.IsInstanceOf.instanceOf; import com.mongodb.BasicDBObject; -import com.mongodb.MongoClient; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; @@ -128,14 +129,14 @@ public static void setUp() throws Exception { .build(); mongodExecutable = mongodStarter.prepare(mongodConfig); mongodProcess = mongodExecutable.start(); - client = new MongoClient(hostname, port); + client = MongoClients.create("mongodb://" + hostname + ":" + port); mongoSqlUrl = String.format("mongodb://%s:%d/%s/%s", hostname, port, database, collection); } @AfterClass public static void tearDown() throws Exception { - client.dropDatabase(database); + client.getDatabase(database).drop(); client.close(); mongodProcess.stop(); mongodExecutable.stop(); diff --git a/sdks/java/io/mongodb/build.gradle b/sdks/java/io/mongodb/build.gradle index b9e90082f0dc..56d29750dead 100644 --- a/sdks/java/io/mongodb/build.gradle +++ b/sdks/java/io/mongodb/build.gradle @@ -28,13 +28,14 @@ dependencies { implementation project(path: ":sdks:java:core", configuration: "shadow") implementation library.java.joda_time implementation library.java.mongo_java_driver + implementation library.java.mongo_bson + implementation library.java.mongodb_driver_core implementation library.java.slf4j_api implementation library.java.vendored_guava_32_1_2_jre testImplementation library.java.junit testImplementation project(path: ":sdks:java:io:common") testImplementation project(path: ":sdks:java:testing:test-utils") - testImplementation "de.flapdoodle.embed:de.flapdoodle.embed.mongo:3.0.0" - testImplementation "de.flapdoodle.embed:de.flapdoodle.embed.process:3.0.0" + testImplementation "de.flapdoodle.embed:de.flapdoodle.embed.mongo:3.5.4" testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") } diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/FindQuery.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/FindQuery.java index 2131656d458a..d89db9dea54b 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/FindQuery.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/FindQuery.java @@ -21,7 +21,7 @@ import com.google.auto.value.AutoValue; import com.mongodb.BasicDBObject; -import com.mongodb.MongoClient; +import com.mongodb.MongoClientSettings; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.model.Projections; @@ -79,7 +79,8 @@ private FindQuery withFilters(BsonDocument filters) { /** Convert the Bson filters into a BsonDocument via default encoding. */ static BsonDocument bson2BsonDocument(Bson filters) { - return filters.toBsonDocument(BasicDBObject.class, MongoClient.getDefaultCodecRegistry()); + return filters.toBsonDocument( + BasicDBObject.class, MongoClientSettings.getDefaultCodecRegistry()); } /** Sets the filters to find. */ diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java index 07cc238c7e6b..71f8b291e0d5 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO.java @@ -21,15 +21,18 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.google.auto.value.AutoValue; -import com.mongodb.DB; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientURI; -import com.mongodb.gridfs.GridFS; -import com.mongodb.gridfs.GridFSDBFile; -import com.mongodb.gridfs.GridFSInputFile; -import com.mongodb.util.JSON; +import com.mongodb.ConnectionString; +import com.mongodb.MongoClientSettings; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCursor; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.gridfs.GridFSBucket; +import com.mongodb.client.gridfs.GridFSBuckets; +import com.mongodb.client.gridfs.GridFSDownloadStream; +import com.mongodb.client.gridfs.GridFSUploadStream; +import com.mongodb.client.gridfs.model.GridFSFile; +import com.mongodb.client.gridfs.model.GridFSUploadOptions; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; @@ -53,6 +56,7 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; +import org.bson.Document; import org.bson.types.ObjectId; import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; @@ -117,16 +121,18 @@ public class MongoDbGridFSIO { /** Callback for the parser to use to submit data. */ public interface ParserCallback extends Serializable { - /** Output the object. The default timestamp will be the GridFSDBFile creation timestamp. */ + /** Output the object. The default timestamp will be the GridFSFile creation timestamp. */ void output(T output); /** Output the object using the specified timestamp. */ void output(T output, Instant timestamp); } - /** Interface for the parser that is used to parse the GridFSDBFile into the appropriate types. */ + /** Interface for the parser that is used to parse the GridFSFile into the appropriate types. */ public interface Parser extends Serializable { - void parse(GridFSDBFile input, ParserCallback callback) throws IOException; + void parse( + GridFSFile gridFSFile, GridFSDownloadStream downloadStream, ParserCallback callback) + throws IOException; } /** @@ -134,11 +140,10 @@ public interface Parser extends Serializable { * file into Strings. It uses the timestamp of the file for the event timestamp. */ private static final Parser TEXT_PARSER = - (input, callback) -> { - final Instant time = new Instant(input.getUploadDate().getTime()); + (gridFSFile, downloadStream, callback) -> { + final Instant time = new Instant(gridFSFile.getUploadDate().getTime()); try (BufferedReader reader = - new BufferedReader( - new InputStreamReader(input.getInputStream(), StandardCharsets.UTF_8))) { + new BufferedReader(new InputStreamReader(downloadStream, StandardCharsets.UTF_8))) { for (String line = reader.readLine(); line != null; line = reader.readLine()) { callback.output(line, time); } @@ -197,12 +202,20 @@ static ConnectionConfiguration create( } MongoClient setupMongo() { - return uri() == null ? new MongoClient() : new MongoClient(new MongoClientURI(uri())); + if (uri() == null) { + return MongoClients.create(); + } + MongoClientSettings settings = + MongoClientSettings.builder() + .applyConnectionString(new ConnectionString(Preconditions.checkStateNotNull(uri()))) + .build(); + return MongoClients.create(settings); } - GridFS setupGridFS(MongoClient mongo) { - DB db = database() == null ? mongo.getDB("gridfs") : mongo.getDB(database()); - return bucket() == null ? new GridFS(db) : new GridFS(db, bucket()); + GridFSBucket setupGridFS(MongoClient mongo) { + MongoDatabase db = + database() == null ? mongo.getDatabase("gridfs") : mongo.getDatabase(database()); + return bucket() == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket()); } } @@ -313,12 +326,12 @@ public PCollection expand(PBegin input) { ParDo.of( new DoFn() { @Nullable MongoClient mongo; - @Nullable GridFS gridfs; + @Nullable GridFSBucket gridFSBucket; @Setup public void setup() { mongo = source.spec.connectionConfiguration().setupMongo(); - gridfs = source.spec.connectionConfiguration().setupGridFS(mongo); + gridFSBucket = source.spec.connectionConfiguration().setupGridFS(mongo); } @Teardown @@ -331,12 +344,18 @@ public void teardown() { @ProcessElement public void processElement(final ProcessContext c) throws IOException { - Preconditions.checkStateNotNull(gridfs); + GridFSBucket bucket = Preconditions.checkStateNotNull(gridFSBucket); ObjectId oid = c.element(); - GridFSDBFile file = gridfs.find(oid); + GridFSDownloadStream downloadStream = bucket.openDownloadStream(oid); + GridFSFile gridFSFile = + bucket.find(com.mongodb.client.model.Filters.eq("_id", oid)).first(); + if (gridFSFile == null) { + return; // Skip if file not found + } Parser parser = Preconditions.checkStateNotNull(parser()); parser.parse( - file, + gridFSFile, + downloadStream, new ParserCallback() { @Override public void output(T output, Instant timestamp) { @@ -378,12 +397,12 @@ protected static class BoundedGridFSSource extends BoundedSource { this.objectIds = objectIds; } - private DBCursor createCursor(GridFS gridfs) { + private MongoCursor createCursor(GridFSBucket gridFSBucket) { if (spec.filter() != null) { - DBObject query = (DBObject) JSON.parse(spec.filter()); - return gridfs.getFileList(query); + Document query = Document.parse(spec.filter()); + return gridFSBucket.find(query).iterator(); } - return gridfs.getFileList(); + return gridFSBucket.find().iterator(); } @Override @@ -391,20 +410,20 @@ public List> split( long desiredBundleSizeBytes, PipelineOptions options) throws Exception { MongoClient mongo = spec.connectionConfiguration().setupMongo(); try { - GridFS gridfs = spec.connectionConfiguration().setupGridFS(mongo); - DBCursor cursor = createCursor(gridfs); + GridFSBucket gridFSBucket = spec.connectionConfiguration().setupGridFS(mongo); + MongoCursor cursor = createCursor(gridFSBucket); long size = 0; List list = new ArrayList<>(); List objects = new ArrayList<>(); while (cursor.hasNext()) { - GridFSDBFile file = (GridFSDBFile) cursor.next(); + GridFSFile file = cursor.next(); long len = file.getLength(); if ((size + len) > desiredBundleSizeBytes && !objects.isEmpty()) { list.add(new BoundedGridFSSource(spec, objects)); size = 0; objects = new ArrayList<>(); } - objects.add((ObjectId) file.getId()); + objects.add(file.getObjectId()); size += len; } if (!objects.isEmpty() || list.isEmpty()) { @@ -419,10 +438,11 @@ public List> split( @Override public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { try (MongoClient mongo = spec.connectionConfiguration().setupMongo(); - DBCursor cursor = createCursor(spec.connectionConfiguration().setupGridFS(mongo))) { + MongoCursor cursor = + createCursor(spec.connectionConfiguration().setupGridFS(mongo))) { long size = 0; while (cursor.hasNext()) { - GridFSDBFile file = (GridFSDBFile) cursor.next(); + GridFSFile file = cursor.next(); size += file.getLength(); } return size; @@ -456,7 +476,7 @@ static class GridFSReader extends BoundedSource.BoundedReader { final @Nullable List objects; @Nullable MongoClient mongo; - @Nullable DBCursor cursor; + @Nullable MongoCursor cursor; @Nullable Iterator iterator; @Nullable ObjectId current; @@ -474,8 +494,8 @@ public BoundedSource getCurrentSource() { public boolean start() throws IOException { if (objects == null) { mongo = source.spec.connectionConfiguration().setupMongo(); - GridFS gridfs = source.spec.connectionConfiguration().setupGridFS(mongo); - cursor = source.createCursor(gridfs); + GridFSBucket gridFSBucket = source.spec.connectionConfiguration().setupGridFS(mongo); + cursor = source.createCursor(gridFSBucket); } else { iterator = objects.iterator(); } @@ -488,8 +508,8 @@ public boolean advance() throws IOException { current = iterator.next(); return true; } else if (cursor != null && cursor.hasNext()) { - GridFSDBFile file = (GridFSDBFile) cursor.next(); - current = (ObjectId) file.getId(); + GridFSFile file = cursor.next(); + current = file.getObjectId(); return true; } current = null; @@ -628,9 +648,9 @@ private static class GridFsWriteFn extends DoFn { private final Write spec; private transient @Nullable MongoClient mongo; - private transient @Nullable GridFS gridfs; + private transient @Nullable GridFSBucket gridFSBucket; - private transient @Nullable GridFSInputFile gridFsFile; + private transient @Nullable GridFSUploadStream gridFsUploadStream; private transient @Nullable OutputStream outputStream; public GridFsWriteFn(Write spec) { @@ -640,20 +660,22 @@ public GridFsWriteFn(Write spec) { @Setup public void setup() throws Exception { mongo = spec.connectionConfiguration().setupMongo(); - gridfs = spec.connectionConfiguration().setupGridFS(mongo); + gridFSBucket = spec.connectionConfiguration().setupGridFS(mongo); } @StartBundle public void startBundle() { - GridFS gridfs = Preconditions.checkStateNotNull(this.gridfs); + GridFSBucket gridFSBucket = Preconditions.checkStateNotNull(this.gridFSBucket); String filename = Preconditions.checkStateNotNull(spec.filename()); - GridFSInputFile gridFsFile = gridfs.createFile(filename); + if (spec.chunkSize() != null) { - gridFsFile.setChunkSize(spec.chunkSize()); + gridFsUploadStream = + gridFSBucket.openUploadStream( + filename, new GridFSUploadOptions().chunkSizeBytes(spec.chunkSize().intValue())); + } else { + gridFsUploadStream = gridFSBucket.openUploadStream(filename); } - outputStream = gridFsFile.getOutputStream(); - - this.gridFsFile = gridFsFile; + outputStream = gridFsUploadStream; } @ProcessElement @@ -665,35 +687,20 @@ public void processElement(ProcessContext context) throws Exception { @FinishBundle public void finishBundle() throws Exception { - if (outputStream != null) { - OutputStream outputStream = this.outputStream; - outputStream.flush(); - outputStream.close(); - this.outputStream = null; - } - if (gridFsFile != null) { - gridFsFile = null; + GridFSUploadStream uploadStream = gridFsUploadStream; + if (uploadStream != null) { + uploadStream.flush(); + uploadStream.close(); + gridFsUploadStream = null; + outputStream = null; } } @Teardown public void teardown() throws Exception { - try { - if (outputStream != null) { - OutputStream outputStream = this.outputStream; - outputStream.flush(); - outputStream.close(); - this.outputStream = null; - } - if (gridFsFile != null) { - gridFsFile = null; - } - } finally { - if (mongo != null) { - mongo.close(); - mongo = null; - gridfs = null; - } + if (mongo != null) { + mongo.close(); + mongo = null; } } } diff --git a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java index 905c7418e26c..1283e873f2b6 100644 --- a/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java +++ b/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbIO.java @@ -22,12 +22,14 @@ import com.google.auto.value.AutoValue; import com.mongodb.BasicDBObject; +import com.mongodb.ConnectionString; import com.mongodb.MongoBulkWriteException; -import com.mongodb.MongoClient; -import com.mongodb.MongoClientOptions; -import com.mongodb.MongoClientURI; +import com.mongodb.MongoClientSettings; +import com.mongodb.MongoClientSettings.Builder; import com.mongodb.MongoCommandException; import com.mongodb.client.AggregateIterable; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; @@ -46,6 +48,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Optional; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.net.ssl.SSLContext; import org.apache.beam.sdk.coders.Coder; @@ -64,6 +67,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.bson.BsonDocument; import org.bson.BsonInt32; +import org.bson.BsonObjectId; import org.bson.BsonString; import org.bson.Document; import org.bson.conversions.Bson; @@ -362,22 +366,25 @@ public void populateDisplayData(DisplayData.Builder builder) { } } - private static MongoClientOptions.Builder getOptions( + private static MongoClientSettings.Builder getOptions( int maxConnectionIdleTime, boolean sslEnabled, boolean sslInvalidHostNameAllowed, boolean ignoreSSLCertificate) { - MongoClientOptions.Builder optionsBuilder = new MongoClientOptions.Builder(); - optionsBuilder.maxConnectionIdleTime(maxConnectionIdleTime); + MongoClientSettings.Builder settingsBuilder = MongoClientSettings.builder(); + settingsBuilder.applyToConnectionPoolSettings( + builder -> builder.maxConnectionIdleTime(maxConnectionIdleTime, TimeUnit.MILLISECONDS)); if (sslEnabled) { - optionsBuilder.sslEnabled(sslEnabled).sslInvalidHostNameAllowed(sslInvalidHostNameAllowed); - if (ignoreSSLCertificate) { - SSLContext sslContext = SSLUtils.ignoreSSLCertificate(); - optionsBuilder.sslContext(sslContext); - optionsBuilder.socketFactory(sslContext.getSocketFactory()); - } + settingsBuilder.applyToSslSettings( + builder -> { + builder.enabled(sslEnabled).invalidHostNameAllowed(sslInvalidHostNameAllowed); + if (ignoreSSLCertificate) { + SSLContext sslContext = SSLUtils.ignoreSSLCertificate(); + builder.context(sslContext); + } + }); } - return optionsBuilder; + return settingsBuilder; } /** A MongoDB {@link BoundedSource} reading {@link Document} from a given instance. */ @@ -414,15 +421,15 @@ long getDocumentCount() { String uri = Preconditions.checkStateNotNull(spec.uri()); String database = Preconditions.checkStateNotNull(spec.database()); String collection = Preconditions.checkStateNotNull(spec.collection()); - try (MongoClient mongoClient = - new MongoClient( - new MongoClientURI( - uri, - getOptions( - spec.maxConnectionIdleTime(), - spec.sslEnabled(), - spec.sslInvalidHostNameAllowed(), - spec.ignoreSSLCertificate())))) { + MongoClientSettings settings = + getOptions( + spec.maxConnectionIdleTime(), + spec.sslEnabled(), + spec.sslInvalidHostNameAllowed(), + spec.ignoreSSLCertificate()) + .applyConnectionString(new ConnectionString(uri)) + .build(); + try (MongoClient mongoClient = MongoClients.create(settings)) { return getDocumentCount(mongoClient, database, collection); } catch (Exception e) { return -1; @@ -446,15 +453,15 @@ public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) { String uri = Preconditions.checkStateNotNull(spec.uri()); String database = Preconditions.checkStateNotNull(spec.database()); String collection = Preconditions.checkStateNotNull(spec.collection()); - try (MongoClient mongoClient = - new MongoClient( - new MongoClientURI( - uri, - getOptions( - spec.maxConnectionIdleTime(), - spec.sslEnabled(), - spec.sslInvalidHostNameAllowed(), - spec.ignoreSSLCertificate())))) { + MongoClientSettings settings = + getOptions( + spec.maxConnectionIdleTime(), + spec.sslEnabled(), + spec.sslInvalidHostNameAllowed(), + spec.ignoreSSLCertificate()) + .applyConnectionString(new ConnectionString(uri)) + .build(); + try (MongoClient mongoClient = MongoClients.create(settings)) { try { return getEstimatedSizeBytes(mongoClient, database, collection); } catch (MongoCommandException exception) { @@ -483,15 +490,15 @@ public List> split( String uri = Preconditions.checkStateNotNull(spec.uri()); String database = Preconditions.checkStateNotNull(spec.database()); String collection = Preconditions.checkStateNotNull(spec.collection()); - try (MongoClient mongoClient = - new MongoClient( - new MongoClientURI( - uri, - getOptions( - spec.maxConnectionIdleTime(), - spec.sslEnabled(), - spec.sslInvalidHostNameAllowed(), - spec.ignoreSSLCertificate())))) { + MongoClientSettings settings = + getOptions( + spec.maxConnectionIdleTime(), + spec.sslEnabled(), + spec.sslInvalidHostNameAllowed(), + spec.ignoreSSLCertificate()) + .applyConnectionString(new ConnectionString(uri)) + .build(); + try (MongoClient mongoClient = MongoClients.create(settings)) { MongoDatabase mongoDatabase = mongoClient.getDatabase(database); List splitKeys; @@ -671,26 +678,39 @@ static List splitKeysToMatch(List splitKeys) { if (i == 0) { aggregates.add(Aggregates.match(Filters.lte("_id", splitKey))); if (splitKeys.size() == 1) { - aggregates.add(Aggregates.match(Filters.and(Filters.gt("_id", splitKey)))); + aggregates.add(Aggregates.match(Filters.gt("_id", splitKey))); } } else if (i == splitKeys.size() - 1) { // this is the last split in the list, the filters define // the range from the previous split to the current split and also // the current split to the end - aggregates.add( - Aggregates.match( - Filters.and(Filters.gt("_id", lowestBound), Filters.lte("_id", splitKey)))); - aggregates.add(Aggregates.match(Filters.and(Filters.gt("_id", splitKey)))); + // Create a custom BSON document with multiple conditions on the same field + BsonDocument rangeFilter = + new BsonDocument( + "_id", + new BsonDocument( + "$gt", new BsonObjectId(Preconditions.checkStateNotNull(lowestBound))) + .append("$lte", new BsonObjectId(splitKey))); + aggregates.add(Aggregates.match(rangeFilter)); + aggregates.add(Aggregates.match(Filters.gt("_id", splitKey))); } else { - aggregates.add( - Aggregates.match( - Filters.and(Filters.gt("_id", lowestBound), Filters.lte("_id", splitKey)))); + // Create a custom BSON document with multiple conditions on the same field + BsonDocument rangeFilter = + new BsonDocument( + "_id", + new BsonDocument( + "$gt", new BsonObjectId(Preconditions.checkStateNotNull(lowestBound))) + .append("$lte", new BsonObjectId(splitKey))); + aggregates.add(Aggregates.match(rangeFilter)); } lowestBound = splitKey; } return aggregates.stream() - .map(s -> s.toBsonDocument(BasicDBObject.class, MongoClient.getDefaultCodecRegistry())) + .map( + s -> + s.toBsonDocument( + BasicDBObject.class, MongoClientSettings.getDefaultCodecRegistry())) .collect(Collectors.toList()); } @@ -786,14 +806,15 @@ public void close() { private MongoClient createClient(Read spec) { String uri = Preconditions.checkStateNotNull(spec.uri(), "withUri() is required"); - return new MongoClient( - new MongoClientURI( - uri, - getOptions( + MongoClientSettings settings = + getOptions( spec.maxConnectionIdleTime(), spec.sslEnabled(), spec.sslInvalidHostNameAllowed(), - spec.ignoreSSLCertificate()))); + spec.ignoreSSLCertificate()) + .applyConnectionString(new ConnectionString(uri)) + .build(); + return MongoClients.create(settings); } } @@ -985,15 +1006,15 @@ static class WriteFn extends DoFn { @Setup public void createMongoClient() { String uri = Preconditions.checkStateNotNull(spec.uri()); - client = - new MongoClient( - new MongoClientURI( - uri, - getOptions( - spec.maxConnectionIdleTime(), - spec.sslEnabled(), - spec.sslInvalidHostNameAllowed(), - spec.ignoreSSLCertificate()))); + MongoClientSettings settings = + getOptions( + spec.maxConnectionIdleTime(), + spec.sslEnabled(), + spec.sslInvalidHostNameAllowed(), + spec.ignoreSSLCertificate()) + .applyConnectionString(new ConnectionString(uri)) + .build(); + client = MongoClients.create(settings); } @StartBundle diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/FindQueryTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/FindQueryTest.java index df66179f3904..da90f92dc190 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/FindQueryTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/FindQueryTest.java @@ -21,7 +21,7 @@ import com.google.auto.value.AutoValue; import com.mongodb.BasicDBObject; -import com.mongodb.MongoClient; +import com.mongodb.MongoClientSettings; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.model.Projections; @@ -79,7 +79,8 @@ private FindQueryTest withFilters(BsonDocument filters) { /** Convert the Bson filters into a BsonDocument via default encoding. */ static BsonDocument bson2BsonDocument(Bson filters) { - return filters.toBsonDocument(BasicDBObject.class, MongoClient.getDefaultCodecRegistry()); + return filters.toBsonDocument( + BasicDBObject.class, MongoClientSettings.getDefaultCodecRegistry()); } /** Sets the filters to find. */ diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java index 09343606f228..d13185a08fb6 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.java @@ -20,11 +20,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import com.mongodb.DB; -import com.mongodb.MongoClient; -import com.mongodb.gridfs.GridFS; -import com.mongodb.gridfs.GridFSDBFile; -import com.mongodb.gridfs.GridFSInputFile; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.gridfs.GridFSBucket; +import com.mongodb.client.gridfs.GridFSBuckets; +import com.mongodb.client.gridfs.GridFSUploadStream; +import com.mongodb.client.gridfs.model.GridFSFile; import de.flapdoodle.embed.mongo.MongodExecutable; import de.flapdoodle.embed.mongo.MongodProcess; import de.flapdoodle.embed.mongo.MongodStarter; @@ -35,12 +37,10 @@ import de.flapdoodle.embed.mongo.distribution.Version; import de.flapdoodle.embed.process.runtime.Network; import java.io.BufferedReader; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.OutputStream; import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -117,9 +117,9 @@ public static void start() throws Exception { LOG.info("Insert test data"); - MongoClient client = new MongoClient("localhost", port); - DB database = client.getDB(DATABASE); - GridFS gridfs = new GridFS(database); + MongoClient client = MongoClients.create("mongodb://localhost:" + port); + MongoDatabase database = client.getDatabase(DATABASE); + GridFSBucket gridfs = GridFSBuckets.create(database); ByteArrayOutputStream out = new ByteArrayOutputStream(); for (int x = 0; x < 100; x++) { @@ -129,10 +129,12 @@ public static void start() throws Exception { .getBytes(StandardCharsets.UTF_8)); } for (int x = 0; x < 5; x++) { - gridfs.createFile(new ByteArrayInputStream(out.toByteArray()), "file" + x).save(); + try (GridFSUploadStream uploadStream = gridfs.openUploadStream("file" + x)) { + uploadStream.write(out.toByteArray()); + } } - gridfs = new GridFS(database, "mapBucket"); + GridFSBucket mapBucketGridfs = GridFSBuckets.create(database, "mapBucket"); long now = System.currentTimeMillis(); Random random = new Random(); String[] scientists = { @@ -148,26 +150,25 @@ public static void start() throws Exception { "Maxwell" }; for (int x = 0; x < 10; x++) { - GridFSInputFile file = gridfs.createFile("file_" + x); - OutputStream outf = file.getOutputStream(); - OutputStreamWriter writer = new OutputStreamWriter(outf, StandardCharsets.UTF_8); - for (int y = 0; y < 5000; y++) { - long time = now - random.nextInt(3600000); - String name = scientists[y % scientists.length]; - writer.write(time + "\t"); - writer.write(name + "\t"); - writer.write(Integer.toString(random.nextInt(100))); - writer.write("\n"); - } - for (int y = 0; y < scientists.length; y++) { - String name = scientists[y % scientists.length]; - writer.write(now + "\t"); - writer.write(name + "\t"); - writer.write("101"); - writer.write("\n"); + try (GridFSUploadStream uploadStream = mapBucketGridfs.openUploadStream("file_" + x)) { + OutputStreamWriter writer = new OutputStreamWriter(uploadStream, StandardCharsets.UTF_8); + for (int y = 0; y < 5000; y++) { + long time = now - random.nextInt(3600000); + String name = scientists[y % scientists.length]; + writer.write(time + "\t"); + writer.write(name + "\t"); + writer.write(Integer.toString(random.nextInt(100))); + writer.write("\n"); + } + for (int y = 0; y < scientists.length; y++) { + String name = scientists[y % scientists.length]; + writer.write(now + "\t"); + writer.write(name + "\t"); + writer.write("101"); + writer.write("\n"); + } + writer.flush(); } - writer.flush(); - writer.close(); } client.close(); } @@ -208,11 +209,10 @@ public void testReadWithParser() { .withDatabase(DATABASE) .withBucket("mapBucket") .>withParser( - (input, callback) -> { + (gridFSFile, downloadStream, callback) -> { try (final BufferedReader reader = new BufferedReader( - new InputStreamReader( - input.getInputStream(), StandardCharsets.UTF_8))) { + new InputStreamReader(downloadStream, StandardCharsets.UTF_8))) { String line = reader.readLine(); while (line != null) { try (Scanner scanner = new Scanner(line.trim())) { @@ -311,19 +311,20 @@ public void testWriteMessage() throws Exception { MongoClient client = null; try { StringBuilder results = new StringBuilder(); - client = new MongoClient("localhost", port); - DB database = client.getDB(DATABASE); - GridFS gridfs = new GridFS(database, "WriteTest"); - List files = gridfs.find("WriteTestData"); - assertTrue(files.size() > 0); - for (GridFSDBFile file : files) { - assertEquals(100, file.getChunkSize()); - int l = (int) file.getLength(); - try (InputStream ins = file.getInputStream()) { - DataInputStream dis = new DataInputStream(ins); - byte[] b = new byte[l]; - dis.readFully(b); - results.append(new String(b, StandardCharsets.UTF_8)); + client = MongoClients.create("mongodb://localhost:" + port); + MongoDatabase database = client.getDatabase(DATABASE); + GridFSBucket gridfs = GridFSBuckets.create(database, "WriteTest"); + + for (GridFSFile file : gridfs.find()) { + if (file.getFilename().equals("WriteTestData")) { + assertEquals(100, file.getChunkSize()); + int l = (int) file.getLength(); + try (InputStream ins = gridfs.openDownloadStream(file.getObjectId())) { + DataInputStream dis = new DataInputStream(ins); + byte[] b = new byte[l]; + dis.readFully(b); + results.append(new String(b, StandardCharsets.UTF_8)); + } } } String dataString = results.toString(); @@ -331,16 +332,17 @@ public void testWriteMessage() throws Exception { assertTrue(dataString.contains("Message " + x)); } - files = gridfs.find("WriteTestIntData"); boolean[] intResults = new boolean[100]; - for (GridFSDBFile file : files) { - int l = (int) file.getLength(); - try (InputStream ins = file.getInputStream()) { - DataInputStream dis = new DataInputStream(ins); - byte[] b = new byte[l]; - dis.readFully(b); - for (byte aB : b) { - intResults[aB] = true; + for (GridFSFile file : gridfs.find()) { + if (file.getFilename().equals("WriteTestIntData")) { + int l = (int) file.getLength(); + try (InputStream ins = gridfs.openDownloadStream(file.getObjectId())) { + DataInputStream dis = new DataInputStream(ins); + byte[] b = new byte[l]; + dis.readFully(b); + for (byte aB : b) { + intResults[aB] = true; + } } } } diff --git a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java index 4dda988e355c..cc85db937975 100644 --- a/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java +++ b/sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbIOTest.java @@ -21,7 +21,8 @@ import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; -import com.mongodb.MongoClient; +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; @@ -107,7 +108,7 @@ public static void beforeClass() throws Exception { .build(); mongodExecutable = mongodStarter.prepare(mongodConfig); mongodProcess = mongodExecutable.start(); - client = new MongoClient("localhost", port); + client = MongoClients.create("mongodb://localhost:" + port); database = client.getDatabase(DATABASE_NAME); LOG.info("Insert test data");