From a229ce8559ea8c493997e51af4a739b0555263f1 Mon Sep 17 00:00:00 2001 From: adesharatushar Date: Mon, 26 Dec 2016 21:28:25 +0530 Subject: [PATCH 1/2] Added Java example for using connections with streaming. --- docs/streaming-programming-guide.md | 64 +++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 1fcd198685a51..2e2d0a37b623d 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1246,6 +1246,21 @@ dstream.foreachRDD { rdd => } {% endhighlight %} +
+{% highlight java %} +dstream.foreachRDD(new VoidFunction>() { + @Override public void call(JavaRDD rdd) throws Exception { + final Connection connection = new Connection(connectionString); // executed at the driver + rdd.foreach(new VoidFunction() { + @Override public void call(String record) throws Exception { + connection.send(record); // executed at the worker + } + }); + connection.close(); + } +}); +{% endhighlight %} +
{% highlight python %} def sendRecord(rdd): @@ -1279,6 +1294,21 @@ dstream.foreachRDD { rdd => } {% endhighlight %}
+
+{% highlight java %} +dstream.foreachRDD(new VoidFunction>() { + @Override public void call(JavaRDD rdd) throws Exception { + rdd.foreach(new VoidFunction() { + @Override public void call(String record) throws Exception { + Connection connection = new Connection(connectionString); + connection.send(record); + connection.close(); + } + }); + } +}); +{% endhighlight %} +
{% highlight python %} def sendRecord(record): @@ -1309,6 +1339,23 @@ dstream.foreachRDD { rdd => } {% endhighlight %}
+
+{% highlight java %} +dstream.foreachRDD(new VoidFunction>() { + @Override public void call(JavaRDD rdd) throws Exception { + rdd.foreachPartition(new VoidFunction>() { + @Override public void call(Iterator partitionOfRecords) throws Exception { + Connection connection = new Connection(connectionString); + while (partitionOfRecords.hasNext()) { + connection.send(partitionOfRecords.next()); + } + connection.close(); + } + }); + } +}); +{% endhighlight %} +
{% highlight python %} def sendPartition(iter): @@ -1342,6 +1389,23 @@ dstream.foreachRDD { rdd => {% endhighlight %}
+
+{% highlight java %} +dstream.foreachRDD(new VoidFunction>() { + @Override public void call(JavaRDD rdd) throws Exception { + rdd.foreachPartition(new VoidFunction>() { + @Override public void call(Iterator partitionOfRecords) throws Exception { + Connection connection = ConnectionPool.getConnection(connectionString); + while (partitionOfRecords.hasNext()) { + connection.send(partitionOfRecords.next()); + } + ConnectionPool.releaseConnection(connection); + } + }); + } +}); +{% endhighlight %} +
{% highlight python %} def sendPartition(iter): From ebb5e5d0c9df3c7415fe9739d82093a1105a40af Mon Sep 17 00:00:00 2001 From: adesharatushar Date: Wed, 28 Dec 2016 14:54:18 +0530 Subject: [PATCH 2/2] Made code similar to Scala, corrected formatting. --- docs/streaming-programming-guide.md | 36 ++++++++++++++++++----------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 2e2d0a37b623d..38b4f7817713d 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -1249,14 +1249,15 @@ dstream.foreachRDD { rdd =>
{% highlight java %} dstream.foreachRDD(new VoidFunction>() { - @Override public void call(JavaRDD rdd) throws Exception { - final Connection connection = new Connection(connectionString); // executed at the driver + @Override + public void call(JavaRDD rdd) { + final Connection connection = createNewConnection(); // executed at the driver rdd.foreach(new VoidFunction() { - @Override public void call(String record) throws Exception { + @Override + public void call(String record) { connection.send(record); // executed at the worker } }); - connection.close(); } }); {% endhighlight %} @@ -1297,10 +1298,12 @@ dstream.foreachRDD { rdd =>
{% highlight java %} dstream.foreachRDD(new VoidFunction>() { - @Override public void call(JavaRDD rdd) throws Exception { + @Override + public void call(JavaRDD rdd) { rdd.foreach(new VoidFunction() { - @Override public void call(String record) throws Exception { - Connection connection = new Connection(connectionString); + @Override + public void call(String record) { + Connection connection = createNewConnection(); connection.send(record); connection.close(); } @@ -1342,10 +1345,12 @@ dstream.foreachRDD { rdd =>
{% highlight java %} dstream.foreachRDD(new VoidFunction>() { - @Override public void call(JavaRDD rdd) throws Exception { + @Override + public void call(JavaRDD rdd) { rdd.foreachPartition(new VoidFunction>() { - @Override public void call(Iterator partitionOfRecords) throws Exception { - Connection connection = new Connection(connectionString); + @Override + public void call(Iterator partitionOfRecords) { + Connection connection = createNewConnection(); while (partitionOfRecords.hasNext()) { connection.send(partitionOfRecords.next()); } @@ -1392,14 +1397,17 @@ dstream.foreachRDD { rdd =>
{% highlight java %} dstream.foreachRDD(new VoidFunction>() { - @Override public void call(JavaRDD rdd) throws Exception { + @Override + public void call(JavaRDD rdd) { rdd.foreachPartition(new VoidFunction>() { - @Override public void call(Iterator partitionOfRecords) throws Exception { - Connection connection = ConnectionPool.getConnection(connectionString); + @Override + public void call(Iterator partitionOfRecords) { + // ConnectionPool is a static, lazily initialized pool of connections + Connection connection = ConnectionPool.getConnection(); while (partitionOfRecords.hasNext()) { connection.send(partitionOfRecords.next()); } - ConnectionPool.releaseConnection(connection); + ConnectionPool.returnConnection(connection); // return to the pool for future reuse } }); }