|
39 | 39 | import java.util.List; |
40 | 40 | import java.util.stream.Collectors; |
41 | 41 | import javax.annotation.Nullable; |
| 42 | +import javax.net.ssl.SSLContext; |
42 | 43 | import org.apache.beam.sdk.annotations.Experimental; |
43 | 44 | import org.apache.beam.sdk.coders.Coder; |
44 | 45 | import org.apache.beam.sdk.coders.SerializableCoder; |
@@ -340,14 +341,19 @@ public void populateDisplayData(DisplayData.Builder builder) { |
340 | 341 | } |
341 | 342 |
|
342 | 343 | private static MongoClientOptions.Builder getOptions( |
343 | | - int maxConnectionIdleTime, boolean sslEnabled, boolean sslInvalidHostNameAllowed) { |
| 344 | + int maxConnectionIdleTime, |
| 345 | + boolean sslEnabled, |
| 346 | + boolean sslInvalidHostNameAllowed, |
| 347 | + boolean ignoreSSLCertificate) { |
344 | 348 | MongoClientOptions.Builder optionsBuilder = new MongoClientOptions.Builder(); |
345 | 349 | optionsBuilder.maxConnectionIdleTime(maxConnectionIdleTime); |
346 | 350 | if (sslEnabled) { |
347 | | - optionsBuilder |
348 | | - .sslEnabled(sslEnabled) |
349 | | - .sslInvalidHostNameAllowed(sslInvalidHostNameAllowed) |
350 | | - .sslContext(SSLUtils.ignoreSSLCertificate()); |
| 351 | + optionsBuilder.sslEnabled(sslEnabled).sslInvalidHostNameAllowed(sslInvalidHostNameAllowed); |
| 352 | + if (ignoreSSLCertificate) { |
| 353 | + SSLContext sslContext = SSLUtils.ignoreSSLCertificate(); |
| 354 | + optionsBuilder.sslContext(sslContext); |
| 355 | + optionsBuilder.socketFactory(sslContext.getSocketFactory()); |
| 356 | + } |
351 | 357 | } |
352 | 358 | return optionsBuilder; |
353 | 359 | } |
@@ -385,7 +391,8 @@ public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) { |
385 | 391 | getOptions( |
386 | 392 | spec.maxConnectionIdleTime(), |
387 | 393 | spec.sslEnabled(), |
388 | | - spec.sslInvalidHostNameAllowed())))) { |
| 394 | + spec.sslInvalidHostNameAllowed(), |
| 395 | + spec.ignoreSSLCertificate())))) { |
389 | 396 | return getEstimatedSizeBytes(mongoClient, spec.database(), spec.collection()); |
390 | 397 | } |
391 | 398 | } |
@@ -413,7 +420,8 @@ public List<BoundedSource<Document>> split( |
413 | 420 | getOptions( |
414 | 421 | spec.maxConnectionIdleTime(), |
415 | 422 | spec.sslEnabled(), |
416 | | - spec.sslInvalidHostNameAllowed())))) { |
| 423 | + spec.sslInvalidHostNameAllowed(), |
| 424 | + spec.ignoreSSLCertificate())))) { |
417 | 425 | MongoDatabase mongoDatabase = mongoClient.getDatabase(spec.database()); |
418 | 426 |
|
419 | 427 | List<Document> splitKeys; |
@@ -704,7 +712,8 @@ private MongoClient createClient(Read spec) { |
704 | 712 | getOptions( |
705 | 713 | spec.maxConnectionIdleTime(), |
706 | 714 | spec.sslEnabled(), |
707 | | - spec.sslInvalidHostNameAllowed()))); |
| 715 | + spec.sslInvalidHostNameAllowed(), |
| 716 | + spec.ignoreSSLCertificate()))); |
708 | 717 | } |
709 | 718 | } |
710 | 719 |
|
@@ -886,7 +895,8 @@ public void createMongoClient() { |
886 | 895 | getOptions( |
887 | 896 | spec.maxConnectionIdleTime(), |
888 | 897 | spec.sslEnabled(), |
889 | | - spec.sslInvalidHostNameAllowed()))); |
| 898 | + spec.sslInvalidHostNameAllowed(), |
| 899 | + spec.ignoreSSLCertificate()))); |
890 | 900 | } |
891 | 901 |
|
892 | 902 | @StartBundle |
|
0 commit comments