diff --git a/contrib/cassandra-connector/pom.xml b/contrib/cassandra-connector/pom.xml
new file mode 100644
index 000000000000..48fb693a062d
--- /dev/null
+++ b/contrib/cassandra-connector/pom.xml
@@ -0,0 +1,150 @@
+
+ 4.0.0
+
+ com.google.cloud.dataflow
+ google-cloud-dataflow-java-sdk-parent
+ 1.4.0
+
+ com.google.cloud.dataflow
+ google-cloud-dataflow-cassandra-connector
+ 0.0.1-SNAPSHOT
+ cassandra-connector
+
+ source and sink connector for cassandra
+
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+
+ true
+
+
+
+
+
+
+
+
+ com.google.cloud.dataflow
+ google-cloud-dataflow-java-sdk-all
+ 1.4.0
+
+
+
+
+ com.google.apis
+ google-api-services-storage
+ v1-rev25-1.19.1
+
+
+
+ com.google.guava
+ guava-jdk5
+
+
+
+
+
+ com.google.apis
+ google-api-services-bigquery
+ v2-rev187-1.19.1
+
+
+
+ com.google.guava
+ guava-jdk5
+
+
+
+
+
+ com.google.http-client
+ google-http-client-jackson2
+ 1.19.0
+
+
+
+ com.google.guava
+ guava-jdk5
+
+
+
+
+
+ com.google.guava
+ guava
+ 19.0
+
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ 2.4.2
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ 2.4.2
+
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.7
+
+
+
+ org.slf4j
+ slf4j-jdk14
+ 1.7.7
+
+
+
+
+ org.hamcrest
+ hamcrest-all
+ 1.3
+
+
+
+ junit
+ junit
+ 4.11
+
+
+
+
+ com.datastax.cassandra
+ cassandra-driver-mapping
+ 3.0.0
+
+
+
+ com.datastax.cassandra
+ cassandra-driver-core
+ 3.0.0
+
+
+
+ org.mockito
+ mockito-all
+ 1.10.19
+
+
+
+ org.apache.cassandra
+ cassandra-all
+ 3.2.1
+
+
+
\ No newline at end of file
diff --git a/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraIO.java b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraIO.java
new file mode 100644
index 000000000000..ec3270505ab2
--- /dev/null
+++ b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraIO.java
@@ -0,0 +1,307 @@
+package com.google.cloud.cassandra.dataflow.io;
+
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.DriverException;
+import com.datastax.driver.core.exceptions.DriverInternalError;
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionView;
+import com.google.cloud.dataflow.sdk.values.PDone;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+public class CassandraIO {
+
+ /**
+ * Write sink for Cassandra.
+ */
+ public static class Write {
+
+ /** create CassandraIO.Write.Unbound with hosts */
+ public static Unbound hosts(String... hosts) {
+ return new Unbound().hosts(hosts);
+ }
+
+ /** create CassandraIO.Write.Unbound with keyspace */
+ public static Unbound keyspace(String keyspace) {
+ return new Unbound().keyspace(keyspace);
+ }
+
+ /** create CassandraIO.Write.Unbound with port */
+ public static Unbound port(int port) {
+ return new Unbound().port(port);
+ }
+
+ /** Set cassandra hosts,keyspace and port */
+ public static class Unbound {
+
+ private final String[] _hosts;
+ private final String _keyspace;
+ private final int _port;
+
+ Unbound() {
+ _hosts = null;
+ _keyspace = null;
+ _port = 9042;
+ }
+
+ Unbound(String[] hosts, String keyspace, int port) {
+ _hosts = hosts;
+ _keyspace = keyspace;
+ _port = port;
+ }
+
+ /** create CassandraIO.Write.Bound */
+ public Bound bind() {
+ return new Bound(_hosts, _keyspace, _port);
+ }
+
+ /** create CassandraIO.Write.Unbound with hosts */
+
+ public Unbound hosts(String... hosts) {
+ return new Unbound(hosts, _keyspace, _port);
+ }
+
+ /** create CassandraIO.Write.Unbound with keyspace */
+ public Unbound keyspace(String keyspace) {
+ return new Unbound(_hosts, keyspace, _port);
+ }
+
+ /** create CassandraIO.Write.Unbound with port */
+ public Unbound port(int port) {
+ return new Unbound(_hosts, _keyspace, port);
+ }
+ }
+
+ /**
+ * Set Cassandra hosts,keyspace and port Apply transformation
+ */
+ public static class Bound extends PTransform, PDone> {
+
+ private static final long serialVersionUID = 0;
+
+ private final String[] _hosts;
+ private final String _keyspace;
+ private final int _port;
+
+ Bound(String[] hosts, String keyspace, int port) {
+ _hosts = hosts;
+ _keyspace = keyspace;
+ _port = port;
+ }
+
+ public String[] getHosts() {
+ return _hosts;
+ }
+
+ public String getKeyspace() {
+ return _keyspace;
+ }
+
+ public int getPort() {
+ return _port;
+ }
+
+ /**
+ * Create CassandraWriteOperation Create Coder write entity
+ */
+ @SuppressWarnings("unchecked")
+ public PDone apply(PCollection input) {
+ Pipeline p = input.getPipeline();
+ CassandraWriteOperation op = new CassandraWriteOperation(
+ this);
+
+ Coder> coder = (Coder>) SerializableCoder
+ .of(op.getClass());
+
+ PCollection> opSingleton = p
+ .apply(Create.> of(op)
+ .withCoder(coder));
+ final PCollectionView> opSingletonView = opSingleton
+ .apply(View.> asSingleton());
+
+ PCollection results = input.apply(ParDo.of(
+ new DoFn() {
+
+ private static final long serialVersionUID = 0;
+ private CassandraWriter writer = null;
+
+ @Override
+ public void processElement(ProcessContext c)
+ throws Exception {
+ if (writer == null) {
+ CassandraWriteOperation op = c
+ .sideInput(opSingletonView);
+ writer = op.createWriter();
+ }
+
+ try {
+ writer.write(c.element());
+ } catch (Exception e) {
+ try {
+ writer.flush();
+ } catch (Exception ec) {
+
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void finishBundle(Context c)
+ throws Exception {
+ if (writer != null)
+ writer.flush();
+ }
+ }).withSideInputs(opSingletonView));
+
+ PCollectionView> voidView = results.apply(View
+ . asIterable());
+
+ opSingleton.apply(ParDo.of(
+ new DoFn, Void>() {
+ private static final long serialVersionUID = 0;
+
+ @Override
+ public void processElement(ProcessContext c) {
+ CassandraWriteOperation op = c.element();
+ op.finalize();
+ }
+
+ }).withSideInputs(voidView));
+ return PDone.in(p);
+ }
+ }
+
+ /**
+ * Create Cluster object Create Session object Create entity mapper
+ * **/
+ static class CassandraWriteOperation implements java.io.Serializable {
+
+ private static final long serialVersionUID = 0;
+ private final String[] _hosts;
+ private final int _port;
+ private final String _keyspace;
+
+ private transient Cluster _cluster;
+ private transient Session _session;
+ private transient MappingManager _manager;
+
+ private synchronized Cluster getCluster() {
+ if (_cluster == null) {
+ _cluster = Cluster.builder().addContactPoints(_hosts)
+ .withPort(_port).withoutMetrics()
+ .withoutJMXReporting().build();
+ }
+
+ return _cluster;
+ }
+
+ private synchronized Session getSession() {
+
+ if (_session == null) {
+ Cluster cluster = getCluster();
+ _session = cluster.connect(_keyspace);
+ }
+ return _session;
+ }
+
+ private synchronized MappingManager getManager() {
+ if (_manager == null) {
+ Session session = getSession();
+ _manager = new MappingManager(_session);
+ }
+ return _manager;
+ }
+
+ public CassandraWriteOperation(Bound bound) {
+
+ _hosts = bound.getHosts();
+ _port = bound.getPort();
+ _keyspace = bound.getKeyspace();
+ }
+
+ public CassandraWriter createWriter() {
+ return new CassandraWriter(this, getManager());
+ }
+
+ public void finalize() {
+ getSession().close();
+ getCluster().close();
+ }
+ }
+
+ /** Create cassandra writer **/
+ private static class CassandraWriter {
+ private static int BATCH_SIZE = 20000;
+ private final CassandraWriteOperation _op;
+ private final MappingManager _manager;
+ private final List> _results = new ArrayList>();
+ private Mapper _mapper;
+
+ public CassandraWriter(CassandraWriteOperation op,
+ MappingManager manager) {
+ _op = op;
+ _manager = manager;
+ }
+
+ public void flush() {
+ for (ListenableFuture result : _results) {
+ try {
+ Uninterruptibles.getUninterruptibly(result);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof DriverException)
+ throw ((DriverException) e.getCause()).copy();
+ else
+ throw new DriverInternalError(
+ "Unexpected exception thrown", e.getCause());
+ }
+ }
+ _results.clear();
+ }
+
+ /** Get CassandraWriteOperation **/
+ public CassandraWriteOperation getWriteOperation() {
+ return _op;
+ }
+
+ /** Write entity to the database table **/
+ @SuppressWarnings("unchecked")
+ public void write(T entity) {
+ if (_mapper == null)
+ _mapper = (Mapper) _manager.mapper(entity.getClass());
+ if (_results.size() >= BATCH_SIZE)
+ flush();
+ _results.add(_mapper.saveAsync((T) entity));
+ }
+ }
+ }
+}
diff --git a/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadConfiguration.java b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadConfiguration.java
new file mode 100644
index 000000000000..c51e5c0e5d60
--- /dev/null
+++ b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadConfiguration.java
@@ -0,0 +1,99 @@
+package com.google.cloud.cassandra.dataflow.io;
+
+import java.io.Serializable;
+
+/*
+ * Configuration class that holds the details required for cassandra connection
+ */
+public class CassandraReadConfiguration implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private String[] host;
+ private String keypace;
+ private int port;
+ private String table;
+ private String query;
+ private String rowKey;
+ private Class entityName;
+ private int jmxPort;
+
+ CassandraReadConfiguration(String[] hosts, String keyspace, int port,
+ String table,String query,String rowKey,Class entityName,int jmxport) {
+ this.host = hosts;
+ this.keypace = keyspace;
+ this.port = port;
+ this.table = table;
+ this.query = query;
+ this.rowKey = rowKey;
+ this.entityName = entityName;
+ this.jmxPort = jmxPort;
+ }
+
+ public String[] getHost() {
+ return host;
+ }
+
+ public void setHost(String[] host) {
+ this.host = host;
+ }
+
+ public String getKeypace() {
+ return keypace;
+ }
+
+ public void setKeypace(String keypace) {
+ this.keypace = keypace;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public String getTable() {
+ return table;
+ }
+
+ public void setTable(String table) {
+ this.table = table;
+ }
+
+ public String getQuery() {
+ return query;
+ }
+
+ public void setQuery(String query) {
+ this.query = query;
+ }
+
+ public String getRowKey() {
+ if(!rowKey.isEmpty()){
+ return rowKey;
+ }
+ else{
+ return null;
+ }
+ }
+
+ public void setRowKey(String rowKey) {
+ this.rowKey = rowKey;
+ }
+
+ public Class get_entityName() {
+ return entityName;
+ }
+
+ public void set_entityName(Class _entityName) {
+ this.entityName = _entityName;
+ }
+
+ public int getJmxPort() {
+ return jmxPort;
+ }
+
+ public void setJmxPort(int jmxHost) {
+ this.jmxPort = jmxHost;
+ }
+}
diff --git a/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIO.java b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIO.java
new file mode 100644
index 000000000000..66db79a5a0a8
--- /dev/null
+++ b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIO.java
@@ -0,0 +1,340 @@
+package com.google.cloud.cassandra.dataflow.io;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.regex.Pattern;
+
+import javax.swing.text.html.parser.Entity;
+
+import org.apache.cassandra.tools.NodeProbe;
+import org.joda.time.Instant;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.mapping.Mapper;
+import com.datastax.driver.mapping.MappingManager;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
+import com.google.cloud.dataflow.sdk.io.BoundedSource;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.common.base.Preconditions;
+
+public class CassandraReadIO {
+ //A Bounded source to read from Cassandra
+ static class Source extends BoundedSource {
+ private CassandraReadConfiguration configuration;
+ private static final String SSTABLE_DISK_SPACE = "LiveDiskSpaceUsed";
+ private static final String MEMTABLE_SPACE = "MemtableLiveDataSize";
+ private static final int DEFAULT_JMX_PORT = 7199;
+ private static String DEFAULT_ROW_COUNT_QUERY = null;
+ private static final String FROM_KEYWORD = "from";
+ private static final String WHERE_CLAUSE = "where";
+
+ Source(CassandraReadConfiguration config) throws IOException {
+ this.configuration = config;
+ validate();
+ }
+
+ @Override
+ public Coder getDefaultOutputCoder() {
+ return (Coder) SerializableCoder.of(configuration
+ .get_entityName());
+ }
+
+ @Override
+ public void validate() {
+ Preconditions.checkNotNull(configuration.getHost(), "host");
+ Preconditions.checkNotNull(configuration.getPort(), "port");
+ Preconditions.checkNotNull(configuration.getKeypace(), "keypace");
+ Preconditions.checkNotNull(configuration.getTable(), "table");
+ Preconditions.checkNotNull(configuration.getRowKey(),"rowKey/partionKey");
+ }
+
+ private static class Reader extends BoundedReader {
+ CassandraReadConfiguration configuration;
+ static Cluster cluster;
+ static Session session;
+ ResultSet rs;
+ Iterator itr;
+ Object currentEntity;
+
+ Reader(CassandraReadConfiguration configuration) {
+ this.configuration = configuration;
+ }
+
+ /*
+ * advances the reader to the next record
+ */
+ @Override
+ public boolean advance() throws IOException {
+ if (itr.hasNext()) {
+ currentEntity = itr.next();
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /*
+ * Creates a cassandra connection, session and resultSet and map the
+ * entity with resultSet and advances to the next result.
+ * Initializes the reader and advances the reader to the first
+ * record.
+ *
+ * @return true, if a record was read else false if there is no more
+ * input available.
+ */
+ @Override
+ public boolean start() throws IOException {
+ cluster = Cluster.builder()
+ .addContactPoints(configuration.getHost())
+ .withPort(configuration.getPort()).build();
+ session = cluster.connect();
+ rs = session.execute(configuration.getQuery());
+ final MappingManager _manager = new MappingManager(session);
+ Mapper _mapper = null;
+ if (_mapper == null) {
+ _mapper = (Mapper) _manager.mapper(configuration
+ .get_entityName());
+ }
+ itr = _mapper.map(rs).iterator();
+ return advance();
+ }
+
+ @Override
+ public void close() throws IOException {
+ session.close();
+ cluster.close();
+ }
+
+ @Override
+ public Object getCurrent() throws NoSuchElementException {
+ return currentEntity;
+ }
+
+ @Override
+ public Instant getCurrentTimestamp() throws NoSuchElementException {
+ return Instant.now();
+ }
+
+ @Override
+ public BoundedSource getCurrentSource() {
+ return null;
+ }
+
+ }
+
+ /*
+ * method splits a source into desired number of sources. (non-Javadoc)
+ *
+ * @see
+ * com.google.cloud.dataflow.sdk.io.BoundedSource#splitIntoBundles(long,
+ * com.google.cloud.dataflow.sdk.options.PipelineOptions)
+ *
+ * Cassandra cluster token range is split into desired number of smaller
+ * ranges. queries are built from start and end token ranges split
+ * sources are built by providing token range queries and other
+ * configurations rowkey is the partitionkey of the cassandara table
+ */
+ @Override
+ public List splitIntoBundles(long desiredBundleSizeBytes,
+ PipelineOptions options) throws Exception {
+ int exponent = 63;
+ long numSplits=10;
+ long startToken,endToken = 0l;
+ List sourceList = new ArrayList();
+ try{
+ if(desiredBundleSizeBytes >0){
+ numSplits = getEstimatedSizeBytes(options)/desiredBundleSizeBytes;
+ }
+ } catch (Exception e) {
+ // set numSplits equal to Dataflow's numWorkers
+ DataflowPipelineWorkerPoolOptions poolOptions =
+ options.as(DataflowPipelineWorkerPoolOptions.class);
+ if (poolOptions.getNumWorkers() > 0) {
+ numSplits = poolOptions.getNumWorkers();
+ } else {
+ //Default to 10 , equals to Cassandra maxConnectionPerHost
+ numSplits = 10;
+ }
+ }
+
+ if(numSplits <=0){
+ numSplits =1;
+ }
+ // If the desiredBundleSize or number of workers results in 1 split, simply return current source
+ if (numSplits == 1) {
+ sourceList.add(this);
+ return sourceList;
+ }
+
+ BigInteger startRange = new BigInteger(String.valueOf(-(long) Math.pow(2, exponent)));
+ BigInteger endRange= new BigInteger(String.valueOf((long) Math.pow(2, exponent)));
+ endToken = startRange.longValue();
+ BigInteger incrmentValue = (endRange.subtract(startRange)).divide(new BigInteger(String.valueOf(numSplits)));
+ String splitQuery = null;
+ for (int splitCount=1; splitCount <=numSplits; splitCount++) {
+ startToken = endToken;
+ endToken = startToken+incrmentValue.longValue();
+ if(splitCount == numSplits){
+ endToken = (long) Math.pow(2, exponent); //set end token to max token value;
+ }
+ splitQuery = queryBuilder(startToken, endToken);
+ configuration.setQuery(splitQuery);
+ sourceList.add(new CassandraReadIO.Source(configuration));
+
+ }
+ return sourceList;
+ }
+
+ /*
+ * builds query using startToken and endToken
+ */
+ public String queryBuilder(long startToken, long endToken) {
+ String query = QueryBuilder
+ .select()
+ .from(configuration.getKeypace(), configuration.getTable())
+ .where(QueryBuilder.gte(
+ "token(" + configuration.getRowKey() + ")",
+ startToken))
+ .and(QueryBuilder.lt("token(" + configuration.getRowKey()
+ + ")", endToken)).toString();
+ return query;
+ }
+
+ @Override
+ // Cassandra provides no direct way to get the estimate of a query
+ // result size in bytes
+ // As a rough approximation,
+ //1. We fetch the byte count for entire column family
+ //2. we get the row count for column family and row count for supplied query
+ //3. we calculate the query resultSet byte size
+ public long getEstimatedSizeBytes(PipelineOptions options)
+ throws Exception {
+ NodeProbe probe = null;
+ Long estimatedByteSize = 0l;
+ Long totalCfByteSize = 0l;
+ DEFAULT_ROW_COUNT_QUERY = "select count(*) from" + "\t"
+ + configuration.getKeypace() + "."
+ + configuration.getTable();
+ try {
+ if (configuration.getJmxPort() == 0) {
+ probe = new NodeProbe(configuration.getHost()[0],
+ DEFAULT_JMX_PORT);
+ } else {
+ probe = new NodeProbe(configuration.getHost()[0],
+ configuration.getJmxPort());
+ }
+ totalCfByteSize = (Long) probe.getColumnFamilyMetric(
+ configuration.getKeypace(), configuration.getTable(),
+ SSTABLE_DISK_SPACE) + (Long) probe.getColumnFamilyMetric(
+ configuration.getKeypace(), configuration.getTable(),
+ MEMTABLE_SPACE) ;
+
+ // start
+ if (configuration.getQuery() != null
+ && !configuration.getQuery().isEmpty()
+ && Pattern
+ .compile(
+ WHERE_CLAUSE,
+ Pattern.CASE_INSENSITIVE
+ + Pattern.LITERAL)
+ .matcher(configuration.getQuery()).find()) {
+ estimatedByteSize = getQueryResultBytesSize(totalCfByteSize);
+ if (estimatedByteSize == 0) {
+ estimatedByteSize = 1l;
+ }
+ } else {
+ estimatedByteSize = totalCfByteSize;
+ }
+
+ } catch (IOException e) {
+ throw e;
+ } finally {
+ if (probe != null)
+ probe.close();
+ }
+ return estimatedByteSize;
+ }
+
+ // Get the query to fetch row count
+ // for entire column family or for supplied query
+ private String getQueryForRowCount() {
+ String rowCountQuery;
+ if ((configuration.getQuery() != null && !configuration.getQuery()
+ .isEmpty())
+ && Pattern
+ .compile(WHERE_CLAUSE,
+ Pattern.CASE_INSENSITIVE + Pattern.LITERAL)
+ .matcher(configuration.getQuery()).find()) {
+ rowCountQuery = "select count(*)\t"
+ + configuration.getQuery().substring(
+ configuration.getQuery().indexOf(FROM_KEYWORD));
+ } else {
+ rowCountQuery = DEFAULT_ROW_COUNT_QUERY;
+ }
+ return rowCountQuery;
+ }
+
+ //Get the resultSet size in bytes for the supplied query
+ private long getQueryResultBytesSize(long cfByteSize) throws Exception {
+ long CoumanFamilyByteSize = cfByteSize;
+ long totalQueryByteSize = 0l;
+ long totalCfRowCount = getRowCount(DEFAULT_ROW_COUNT_QUERY);
+ long queryRowCount = getRowCount(getQueryForRowCount());
+ int percentage = (int) ((queryRowCount * 100) / totalCfRowCount);
+ totalQueryByteSize = CoumanFamilyByteSize * percentage / 100;
+ return totalQueryByteSize;
+ }
+ //Get row count for the supplied query
+ private long getRowCount(String query) {
+ Cluster cluster = null;
+ Session session = null;
+ long rowCount = 0l;
+ try {
+ cluster = Cluster.builder()
+ .addContactPoints(configuration.getHost())
+ .withPort(configuration.getPort()).build();
+ session = cluster.connect();
+ ResultSet queryResult = session.execute(query);
+ Row row = queryResult.one();
+ rowCount = row.getLong("count");
+ } catch (Exception ex) {
+ throw ex;
+ } finally {
+ if (session != null) {
+ session.close();
+ }
+ }
+ return rowCount;
+ }
+
+ @Override
+ public boolean producesSortedKeys(PipelineOptions paramPipelineOptions)
+ throws Exception {
+ return false;
+ }
+
+ @Override
+ public BoundedReader createReader(PipelineOptions paramPipelineOptions)
+ throws IOException {
+ return new Reader(configuration);
+ }
+ }
+
+ public static BoundedSource read(CassandraReadConfiguration config)
+ throws IOException {
+ return new Source(config);
+ }
+
+}
diff --git a/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithMockTest.java b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithMockTest.java
new file mode 100644
index 000000000000..0e5f44d12f11
--- /dev/null
+++ b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithMockTest.java
@@ -0,0 +1,148 @@
+package com.google.cloud.cassandra.dataflow.io;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import com.datastax.driver.mapping.annotations.Table;
+import com.google.cloud.cassandra.dataflow.io.CassandraReadIO.Source;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.BoundedSource;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.io.Read.Bounded;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+import com.google.cloud.dataflow.sdk.values.POutput;
+
+public class CassandraReadIOWithMockTest {
+ private static CassandraReadIO.Source mockCassandraRead;
+ private long desiredBundleSizeBytes = 64 * (1 << 20);
+ private static Iterator mockIterator;
+ private static List mockSplitedSourceList;
+
+ private static PipelineOptions mockPipelineOptions;
+ private static Pipeline mockPipeline;
+
+ /**
+ * Initial setup for mocking objects
+ */
+ @Before
+ public void setUp() {
+ mockPipelineOptions = Mockito.spy(PipelineOptions.class);
+ mockPipeline = Mockito.mock(Pipeline.class);
+ mockCassandraRead = Mockito.mock(CassandraReadIO.Source.class,Mockito.withSettings().serializable());
+ mockSplitedSourceList = Mockito.spy(ArrayList.class);
+ mockIterator = Mockito.spy(Iterator.class);
+ }
+ /**
+ * Test for checking single source split and PCollection object
+ */
+ @Test
+ public void testToGetSingleSource() {
+ try {
+
+ mockSplitedSourceList.add(mockCassandraRead);
+ Mockito.when(mockCassandraRead.splitIntoBundles(desiredBundleSizeBytes, mockPipelineOptions)).thenReturn(mockSplitedSourceList);
+ Assert.assertEquals(1, mockSplitedSourceList.size());
+ Mockito.when(mockSplitedSourceList.iterator()).thenReturn(mockIterator);
+ PCollection mockPCollection = Mockito.mock(PCollection.class);
+ Mockito.when(mockPipeline.apply(Mockito.mock(Bounded.class))).thenReturn(mockPCollection);
+ mockPipeline.run();
+ Assert.assertNotNull(mockPCollection);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Test for checking multiple source splits and PCollection Object
+ */
+ @Test
+ public void testToGetMultipleSplitedSource() {
+ try {
+ desiredBundleSizeBytes = 1024;
+ mockSplitedSourceList.add(mockCassandraRead);
+ mockSplitedSourceList.add(mockCassandraRead);
+ Mockito.when(mockCassandraRead.splitIntoBundles(desiredBundleSizeBytes, mockPipelineOptions)).thenReturn(mockSplitedSourceList);
+
+ Assert.assertNotNull(mockSplitedSourceList.size());
+ PCollectionList mockPCollectionList = Mockito.mock(PCollectionList.class);
+ Mockito.when(mockSplitedSourceList.iterator()).thenReturn(mockIterator);
+ Mockito.when(mockIterator.hasNext()).thenReturn(true,false);
+ Mockito.when(mockPipeline.apply(Mockito.mock(Bounded.class))).thenReturn(mockPCollectionList);
+
+ PCollection mockMergedPColl = Mockito.mock(PCollection.class);
+ Mockito.when(mockPCollectionList.apply(Flatten.pCollections())).thenReturn(mockMergedPColl);
+ mockPipeline.run();
+ Assert.assertNotNull(mockMergedPColl);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * static inner class contains employee details
+ */
+
+ @Table(name = "emp_info1", keyspace = "demo1")
+ public static class EmployeeDetails implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private int emp_id;
+ private String emp_first;
+ private String emp_last;
+ private String emp_address;
+ private String emp_dept;
+
+ public int getEmp_id() {
+ return emp_id;
+ }
+
+ public void setEmp_id(int emp_id) {
+ this.emp_id = emp_id;
+ }
+
+ public String getEmp_first() {
+ return emp_first;
+ }
+
+ public void setEmp_first(String emp_first) {
+ this.emp_first = emp_first;
+ }
+
+ public String getEmp_last() {
+ return emp_last;
+ }
+
+ public void setEmp_last(String emp_last) {
+ this.emp_last = emp_last;
+ }
+
+ public String getEmp_address() {
+ return emp_address;
+ }
+
+ public void setEmp_address(String emp_address) {
+ this.emp_address = emp_address;
+ }
+
+ public String getEmp_dept() {
+ return emp_dept;
+ }
+
+ public void setEmp_dept(String emp_dept) {
+ this.emp_dept = emp_dept;
+ }
+
+ }
+
+}
diff --git a/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithoutMockTest.java b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithoutMockTest.java
new file mode 100644
index 000000000000..e07281be83f0
--- /dev/null
+++ b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithoutMockTest.java
@@ -0,0 +1,200 @@
+package com.google.cloud.cassandra.dataflow.io;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.datastax.driver.mapping.MappingManager;
+import com.datastax.driver.mapping.annotations.Table;
+import com.google.cloud.cassandra.dataflow.io.CassandraReadIO.Source;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.io.BoundedSource;
+import com.google.cloud.dataflow.sdk.io.Read;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.Flatten;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PCollectionList;
+
+/**
+ * Class contains JUnit test case that can be tested in cloud.
+ */
+public class CassandraReadIOWithoutMockTest {
+ private static String[] hosts;
+ private static int port;
+ private static String keyspace;
+ private static Class entityName;
+ private static String query;
+ private static String tableName;
+ private static String rowKey;
+ private long desiredBundleSizeBytes = 64 * (1 << 20);
+
+ private static Cluster cluster;
+ private static Session session;
+ private static MappingManager manager;
+
+ private static PipelineOptions options;
+ private static Pipeline p;
+
+ /**
+ * Initial setup for cassandra connection hosts : cassandra server hosts
+ * keyspace : schema name port : port of the cassandra server entityName :
+ * is the POJO class query : simple query conditionalBasedQuery :
+ * conditional based query
+ */
+ @BeforeClass
+ public static void oneTimeSetUp() {
+ hosts = new String[] { "localhost" };
+ keyspace = "demo1";
+ port = 9042;
+ tableName = "emp_info1";
+ rowKey = "emp_id";
+ entityName = CassandraReadIOWithoutMockTest.EmployeeDetails.class;
+ query = QueryBuilder.select().all().from(keyspace, tableName)
+ .toString();
+ }
+
+ /**
+ * Creating a pipeline
+ */
+ @Before
+ public void setUp() {
+ options = PipelineOptionsFactory.create();
+ p = Pipeline.create(options);
+ }
+
+ /**
+ * Test for checking single source split and PCollection object
+ */
+
+ @Test
+ public void testToGetSingleSource() {
+ try {
+
+ List splitedSourceList = (List) new CassandraReadIO.Source(
+ new CassandraReadConfiguration(hosts, keyspace, 9042,
+ tableName, query, rowKey, entityName, 7199))
+ .splitIntoBundles(desiredBundleSizeBytes, options);
+ Assert.assertEquals(1, splitedSourceList.size());
+
+ Iterator itr = splitedSourceList.iterator();
+ CassandraReadIO.Source cs = (Source) itr.next();
+ PCollection pCollection = (PCollection) p.apply(Read
+ .from((Source) cs));
+ p.run();
+ Assert.assertNotNull(pCollection);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * Test for checking multiple source splits and PCollection
+ * Object.desiredBundleSizeBytes value is assigned low so that more than one
+ * source can be built
+ */
+ @Test
+ public void testToGetMultipleSplitedSource() {
+ try {
+ desiredBundleSizeBytes = 1024;
+ List splitedSourceList = (List) new CassandraReadIO.Source(
+ new CassandraReadConfiguration(hosts, keyspace, port,
+ tableName, "", rowKey, entityName, 7199))
+ .splitIntoBundles(desiredBundleSizeBytes, options);
+ Assert.assertNotNull(splitedSourceList.size());
+
+ Iterator itr = splitedSourceList.iterator();
+ List sourceList = new ArrayList();
+ while (itr.hasNext()) {
+ CassandraReadIO.Source cs = (Source) itr.next();
+ sourceList.add((PCollection) p.apply(Read.from((Source) cs)));
+ }
+ PCollectionList pCollectionList = null;
+ Iterator sListItrerator = sourceList.iterator();
+
+ int pcollCount = 0;
+ while (sListItrerator.hasNext()) {
+ PCollection pCollection = (PCollection) sListItrerator.next();
+ if (pcollCount == 0) {
+ pCollectionList = PCollectionList.of(pCollection);
+ } else {
+ pCollectionList = pCollectionList.and(pCollection);
+ }
+ pcollCount++;
+ }
+ PCollection merged = (PCollection) pCollectionList.apply(Flatten
+ .pCollections());
+ p.run();
+ Assert.assertNotNull(merged);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * static inner class contains employee details
+ */
+
+ @Table(name = "emp_info1", keyspace = "demo1")
+ public static class EmployeeDetails implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+ private int emp_id;
+ private String emp_first;
+ private String emp_last;
+ private String emp_address;
+ private String emp_dept;
+
+ public int getEmp_id() {
+ return emp_id;
+ }
+
+ public void setEmp_id(int emp_id) {
+ this.emp_id = emp_id;
+ }
+
+ public String getEmp_first() {
+ return emp_first;
+ }
+
+ public void setEmp_first(String emp_first) {
+ this.emp_first = emp_first;
+ }
+
+ public String getEmp_last() {
+ return emp_last;
+ }
+
+ public void setEmp_last(String emp_last) {
+ this.emp_last = emp_last;
+ }
+
+ public String getEmp_address() {
+ return emp_address;
+ }
+
+ public void setEmp_address(String emp_address) {
+ this.emp_address = emp_address;
+ }
+
+ public String getEmp_dept() {
+ return emp_dept;
+ }
+
+ public void setEmp_dept(String emp_dept) {
+ this.emp_dept = emp_dept;
+ }
+
+ }
+
+}
diff --git a/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassndaraIOWithMockTest.java b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassndaraIOWithMockTest.java
new file mode 100644
index 000000000000..d5187ba3903f
--- /dev/null
+++ b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassndaraIOWithMockTest.java
@@ -0,0 +1,97 @@
+package com.google.cloud.cassandra.dataflow.io;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.Table;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PDone;
+
+/**
+ * Class to test CassndraIO.Write using mock objects
+ */
+
+public class CassndaraIOWithMockTest {
+ private CassandraIO.Write.Bound mockCassndaraIOWriter;
+ private PCollection pcollection;
+ private Pipeline pipeline;
+
+ /**
+ * Create mocked objects for CassandraIO.Write.Bound
+ * set method returns for mocked object
+ **/
+ @Before
+ public void setup() {
+ mockCassndaraIOWriter = Mockito.mock(CassandraIO.Write.Bound.class);
+ Mockito.when(mockCassndaraIOWriter.getHosts()).thenReturn(new String[] { "localhost"});
+ Mockito.when(mockCassndaraIOWriter.getPort()).thenReturn(9042);
+ Mockito.when(mockCassndaraIOWriter.getKeyspace()).thenReturn("dev_keyspace");
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ pipeline = Pipeline.create(options);
+ pcollection = pipeline.apply(Create.of(Arrays
+ .asList(getCloumnFamilyRows())));
+
+ Mockito.when(mockCassndaraIOWriter.apply(pcollection)).thenReturn(
+ PDone.in(pipeline));
+ }
+
+ /** To Test Write to Cassandra **/
+ @Test
+ public void cassandraWriteTest() {
+ PDone pdone = mockCassndaraIOWriter.apply(pcollection);
+ System.out.println("Ponde "+pdone);
+ PipelineResult result = pipeline.run();
+ assertNotNull(result);
+ }
+
+ /** Get entities to persist */
+ private Person[] getCloumnFamilyRows() {
+ Random rn = new Random();
+ int range = 10000;
+ int pId = rn.nextInt(range);
+ CassndaraIOWithMockTest.Person person = new CassndaraIOWithMockTest.Person();
+ person.setPersonId(pId);
+ person.setName("PERSON-" + pId);
+ Person[] persons = { person };
+ return persons;
+ }
+
+ @Table(name = "person")
+ public static class Person implements Serializable {
+ private static final long serialVersionUID = 1L;
+ @Column(name = "PERSON_NAME")
+ private String name;
+ @Column(name = "PERSON_ID")
+ private int personId;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getPersonId() {
+ return personId;
+ }
+
+ public void setPersonId(int personId) {
+ this.personId = personId;
+ }
+ }
+
+}
diff --git a/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassndaraIOWithoutMockTest.java b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassndaraIOWithoutMockTest.java
new file mode 100644
index 000000000000..49d4f07e910b
--- /dev/null
+++ b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassndaraIOWithoutMockTest.java
@@ -0,0 +1,179 @@
+package com.google.cloud.cassandra.dataflow.io;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.AlreadyExistsException;
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.Table;
+import com.google.cloud.dataflow.sdk.Pipeline;
+import com.google.cloud.dataflow.sdk.PipelineResult;
+import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.cloud.dataflow.sdk.values.PDone;
+
+/**
+ * To test write to cassandara.
+ * Ton run this test, cassandra instance is required in local/google cloud. This test requires a
+ * columnFamily (table) "Person" in cassandra database. under "dev_keyspace".
+ * Following is the structure of "Person" table CREATE TABLE dev_keyspace.person
+ * (person_id int PRIMARY KEY, person_name text);
+ *
+ */
+public class CassndaraIOWithoutMockTest {
+
+ private CassandraIO.Write.Bound CassndaraIOWriter;
+ private String[] hosts;
+ private String keyspace;
+ private int port;
+ private Cluster cluster;
+ private Session session;
+
+
+ /** set hosts,keyspace,port
+ * Connect to Cassandra
+ * Create Keyspace
+ * Use Created Keyspace
+ * Create Column family
+ * Create CassandraIO Writer
+ **/
+
+ @Before
+ public void setup() {
+ this.hosts = new String[] { "localhost" };
+ this.keyspace = "dev_keyspace";
+ this.port = 9042;
+ try{
+ connect(getCassandaraHostAndPort());
+ createKeyspace();
+ useKeyspace();
+ createColumnFamily();
+ CassndaraIOWriter = new CassandraIO.Write.Bound(hosts, keyspace, port);
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+ }
+
+ /** Connect to Cassandra */
+ public void connect(Collection inetSocketAddresses) {
+ cluster = Cluster.builder()
+ .addContactPointsWithPorts(inetSocketAddresses).build();
+ session = cluster.connect();
+ }
+
+ /** Create Keyspace Keyspace,ignore exception if already exists */
+ private void createKeyspace() {
+ try {
+ session.execute("CREATE KEYSPACE " + keyspace
+ + " WITH replication "
+ + "= {'class':'SimpleStrategy', 'replication_factor':3};");
+ } catch (AlreadyExistsException ex) {
+ }
+ }
+
+ /** Use Created Keyspace */
+ private void useKeyspace() {
+ session.execute("USE " + keyspace);
+ }
+
+ /** Create Column Family ,ignore exception if already exists */
+ private void createColumnFamily() {
+ try {
+ session.execute("CREATE TABLE " + "person("
+ + "person_id int PRIMARY KEY" + "," + "person_name text);");
+ } catch (AlreadyExistsException ex) {
+ }
+ }
+
+
+ /** Create Cassandra ip socket address collection */
+ private Collection getCassandaraHostAndPort() {
+ Collection addresses = new ArrayList();
+ for (String host : hosts) {
+ addresses.add(new InetSocketAddress(host, port));
+ }
+ return addresses;
+ }
+
+ /** To Test Write to Cassandra **/
+ @Test
+ public void cassandraWriteTest() {
+ try{
+ PipelineOptions options = PipelineOptionsFactory.create();
+ Pipeline p = Pipeline.create(options);
+ @SuppressWarnings("rawtypes")
+ PCollection pcollection = p.apply(Create.of(Arrays
+ .asList(getCloumnFamilyRows())));
+ @SuppressWarnings("unused")
+ PDone pDone = CassndaraIOWriter.apply(pcollection);
+ PipelineResult result=null;
+ result= p.run();
+ assertNotNull(result);
+ }catch(Exception e){
+ Assert.fail("Test failed");
+ }
+ }
+
+ /** Get entities to persist */
+ private Person[] getCloumnFamilyRows() {
+ Random rn = new Random();
+ int range = 10000;
+ int pId = rn.nextInt(range);
+ CassndaraIOWithoutMockTest.Person person = new CassndaraIOWithoutMockTest.Person();
+ person.setPersonId(pId);
+ person.setName("PERSON-" + pId);
+ Person[] persons = { person };
+ return persons;
+ }
+
+ @Table(name = "person")
+ public static class Person implements Serializable {
+ private static final long serialVersionUID = 1L;
+ @Column(name = "PERSON_NAME")
+ private String name;
+ @Column(name = "PERSON_ID")
+ private int personId;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getPersonId() {
+ return personId;
+ }
+
+ public void setPersonId(int personId) {
+ this.personId = personId;
+ }
+ }
+
+ @After
+ public void closeResources() {
+ try{
+ cluster.close();
+ session.close();
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+ }
+
+}