From 16e104cd0cc7859f346315f63061f3f5125a654c Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 31 Mar 2016 14:54:28 +0530 Subject: [PATCH 1/3] Cassandra source and sink connector classes --- contrib/cassandra-connector/pom.xml | 144 ++++++++ .../cassandra/dataflow/io/CassandraIO.java | 307 ++++++++++++++++++ .../io/CassandraReadConfiguration.java | 81 +++++ .../dataflow/io/CassandraReadIO.java | 188 +++++++++++ .../io/CassandraReadIOWithMockTest.java | 148 +++++++++ .../io/CassandraReadIOWithoutMockTest.java | 186 +++++++++++ .../dataflow/io/CassndaraIOWithMockTest.java | 97 ++++++ .../io/CassndaraIOWithoutMockTest.java | 179 ++++++++++ 8 files changed, 1330 insertions(+) create mode 100644 contrib/cassandra-connector/pom.xml create mode 100644 contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraIO.java create mode 100644 contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadConfiguration.java create mode 100644 contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIO.java create mode 100644 contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithMockTest.java create mode 100644 contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithoutMockTest.java create mode 100644 contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassndaraIOWithMockTest.java create mode 100644 contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassndaraIOWithoutMockTest.java diff --git a/contrib/cassandra-connector/pom.xml b/contrib/cassandra-connector/pom.xml new file mode 100644 index 000000000000..a30f255ccc9d --- /dev/null +++ b/contrib/cassandra-connector/pom.xml @@ -0,0 +1,144 @@ + + 4.0.0 + + com.google.cloud.dataflow + google-cloud-dataflow-java-sdk-parent + 1.4.0 + + com.google.cloud.dataflow + google-cloud-dataflow-cassandra-connector + 0.0.1-SNAPSHOT + cassandra-connector + + source and sink connector for cassandra + + + + + org.apache.maven.plugins + maven-surefire-plugin + + true + + + + + + + + + com.google.cloud.dataflow + google-cloud-dataflow-java-sdk-all + 1.4.0 + + + + + com.google.apis + google-api-services-storage + v1-rev25-1.19.1 + + + + com.google.guava + guava-jdk5 + + + + + + com.google.apis + google-api-services-bigquery + v2-rev187-1.19.1 + + + + com.google.guava + guava-jdk5 + + + + + + com.google.http-client + google-http-client-jackson2 + 1.19.0 + + + + com.google.guava + guava-jdk5 + + + + + + com.google.guava + guava + 19.0 + + + + com.fasterxml.jackson.core + jackson-core + 2.4.2 + + + + com.fasterxml.jackson.core + jackson-annotations + 2.4.2 + + + + + org.slf4j + slf4j-api + 1.7.7 + + + + org.slf4j + slf4j-jdk14 + 1.7.7 + + + + + org.hamcrest + hamcrest-all + 1.3 + + + + junit + junit + 4.11 + + + + + com.datastax.cassandra + cassandra-driver-mapping + 3.0.0 + + + + com.datastax.cassandra + cassandra-driver-core + 3.0.0 + + + + org.mockito + mockito-all + 1.10.19 + + + \ No newline at end of file diff --git a/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraIO.java b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraIO.java new file mode 100644 index 000000000000..ec3270505ab2 --- /dev/null +++ b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraIO.java @@ -0,0 +1,307 @@ +package com.google.cloud.cassandra.dataflow.io; + +/* + * Copyright (C) 2015 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.DriverException; +import com.datastax.driver.core.exceptions.DriverInternalError; +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.SerializableCoder; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionView; +import com.google.cloud.dataflow.sdk.values.PDone; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.Uninterruptibles; + +public class CassandraIO { + + /** + * Write sink for Cassandra. + */ + public static class Write { + + /** create CassandraIO.Write.Unbound with hosts */ + public static Unbound hosts(String... hosts) { + return new Unbound().hosts(hosts); + } + + /** create CassandraIO.Write.Unbound with keyspace */ + public static Unbound keyspace(String keyspace) { + return new Unbound().keyspace(keyspace); + } + + /** create CassandraIO.Write.Unbound with port */ + public static Unbound port(int port) { + return new Unbound().port(port); + } + + /** Set cassandra hosts,keyspace and port */ + public static class Unbound { + + private final String[] _hosts; + private final String _keyspace; + private final int _port; + + Unbound() { + _hosts = null; + _keyspace = null; + _port = 9042; + } + + Unbound(String[] hosts, String keyspace, int port) { + _hosts = hosts; + _keyspace = keyspace; + _port = port; + } + + /** create CassandraIO.Write.Bound */ + public Bound bind() { + return new Bound(_hosts, _keyspace, _port); + } + + /** create CassandraIO.Write.Unbound with hosts */ + + public Unbound hosts(String... hosts) { + return new Unbound(hosts, _keyspace, _port); + } + + /** create CassandraIO.Write.Unbound with keyspace */ + public Unbound keyspace(String keyspace) { + return new Unbound(_hosts, keyspace, _port); + } + + /** create CassandraIO.Write.Unbound with port */ + public Unbound port(int port) { + return new Unbound(_hosts, _keyspace, port); + } + } + + /** + * Set Cassandra hosts,keyspace and port Apply transformation + */ + public static class Bound extends PTransform, PDone> { + + private static final long serialVersionUID = 0; + + private final String[] _hosts; + private final String _keyspace; + private final int _port; + + Bound(String[] hosts, String keyspace, int port) { + _hosts = hosts; + _keyspace = keyspace; + _port = port; + } + + public String[] getHosts() { + return _hosts; + } + + public String getKeyspace() { + return _keyspace; + } + + public int getPort() { + return _port; + } + + /** + * Create CassandraWriteOperation Create Coder write entity + */ + @SuppressWarnings("unchecked") + public PDone apply(PCollection input) { + Pipeline p = input.getPipeline(); + CassandraWriteOperation op = new CassandraWriteOperation( + this); + + Coder> coder = (Coder>) SerializableCoder + .of(op.getClass()); + + PCollection> opSingleton = p + .apply(Create.> of(op) + .withCoder(coder)); + final PCollectionView> opSingletonView = opSingleton + .apply(View.> asSingleton()); + + PCollection results = input.apply(ParDo.of( + new DoFn() { + + private static final long serialVersionUID = 0; + private CassandraWriter writer = null; + + @Override + public void processElement(ProcessContext c) + throws Exception { + if (writer == null) { + CassandraWriteOperation op = c + .sideInput(opSingletonView); + writer = op.createWriter(); + } + + try { + writer.write(c.element()); + } catch (Exception e) { + try { + writer.flush(); + } catch (Exception ec) { + + } + throw e; + } + } + + @Override + public void finishBundle(Context c) + throws Exception { + if (writer != null) + writer.flush(); + } + }).withSideInputs(opSingletonView)); + + PCollectionView> voidView = results.apply(View + . asIterable()); + + opSingleton.apply(ParDo.of( + new DoFn, Void>() { + private static final long serialVersionUID = 0; + + @Override + public void processElement(ProcessContext c) { + CassandraWriteOperation op = c.element(); + op.finalize(); + } + + }).withSideInputs(voidView)); + return PDone.in(p); + } + } + + /** + * Create Cluster object Create Session object Create entity mapper + * **/ + static class CassandraWriteOperation implements java.io.Serializable { + + private static final long serialVersionUID = 0; + private final String[] _hosts; + private final int _port; + private final String _keyspace; + + private transient Cluster _cluster; + private transient Session _session; + private transient MappingManager _manager; + + private synchronized Cluster getCluster() { + if (_cluster == null) { + _cluster = Cluster.builder().addContactPoints(_hosts) + .withPort(_port).withoutMetrics() + .withoutJMXReporting().build(); + } + + return _cluster; + } + + private synchronized Session getSession() { + + if (_session == null) { + Cluster cluster = getCluster(); + _session = cluster.connect(_keyspace); + } + return _session; + } + + private synchronized MappingManager getManager() { + if (_manager == null) { + Session session = getSession(); + _manager = new MappingManager(_session); + } + return _manager; + } + + public CassandraWriteOperation(Bound bound) { + + _hosts = bound.getHosts(); + _port = bound.getPort(); + _keyspace = bound.getKeyspace(); + } + + public CassandraWriter createWriter() { + return new CassandraWriter(this, getManager()); + } + + public void finalize() { + getSession().close(); + getCluster().close(); + } + } + + /** Create cassandra writer **/ + private static class CassandraWriter { + private static int BATCH_SIZE = 20000; + private final CassandraWriteOperation _op; + private final MappingManager _manager; + private final List> _results = new ArrayList>(); + private Mapper _mapper; + + public CassandraWriter(CassandraWriteOperation op, + MappingManager manager) { + _op = op; + _manager = manager; + } + + public void flush() { + for (ListenableFuture result : _results) { + try { + Uninterruptibles.getUninterruptibly(result); + } catch (ExecutionException e) { + if (e.getCause() instanceof DriverException) + throw ((DriverException) e.getCause()).copy(); + else + throw new DriverInternalError( + "Unexpected exception thrown", e.getCause()); + } + } + _results.clear(); + } + + /** Get CassandraWriteOperation **/ + public CassandraWriteOperation getWriteOperation() { + return _op; + } + + /** Write entity to the database table **/ + @SuppressWarnings("unchecked") + public void write(T entity) { + if (_mapper == null) + _mapper = (Mapper) _manager.mapper(entity.getClass()); + if (_results.size() >= BATCH_SIZE) + flush(); + _results.add(_mapper.saveAsync((T) entity)); + } + } + } +} diff --git a/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadConfiguration.java b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadConfiguration.java new file mode 100644 index 000000000000..dc6068d0024f --- /dev/null +++ b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadConfiguration.java @@ -0,0 +1,81 @@ +package com.google.cloud.cassandra.dataflow.io; +/* + * Configuration class that holds the details required for cassandra connection + */ +public class CassandraReadConfiguration { + + private String[] host; + private String keypace; + private int port; + private String table; + private String query; + private String rowKey; + private Class entityName; + + CassandraReadConfiguration(String[] hosts, String keyspace, int port, + String table,String query,String rowKey,Class entityName) { + this.host = hosts; + this.keypace = keyspace; + this.port = port; + this.table = table; + this.query = query; + this.rowKey = rowKey; + this.entityName = entityName; + } + + public String[] getHost() { + return host; + } + + public void setHost(String[] host) { + this.host = host; + } + + public String getKeypace() { + return keypace; + } + + public void setKeypace(String keypace) { + this.keypace = keypace; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public String getTable() { + return table; + } + + public void setTable(String table) { + this.table = table; + } + + public String getQuery() { + return query; + } + + public void setQuery(String query) { + this.query = query; + } + + public String getRowKey() { + return rowKey; + } + + public void setRowKey(String rowKey) { + this.rowKey = rowKey; + } + + public Class get_entityName() { + return entityName; + } + + public void set_entityName(Class _entityName) { + this.entityName = _entityName; + } +} diff --git a/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIO.java b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIO.java new file mode 100644 index 000000000000..d791dfc9ec54 --- /dev/null +++ b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIO.java @@ -0,0 +1,188 @@ +package com.google.cloud.cassandra.dataflow.io; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +import javax.swing.text.html.parser.Entity; + +import org.joda.time.Instant; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.mapping.Mapper; +import com.datastax.driver.mapping.MappingManager; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.coders.SerializableCoder; +import com.google.cloud.dataflow.sdk.io.BoundedSource; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; + +public class CassandraReadIO { + + static class Source extends BoundedSource{ + CassandraReadConfiguration configuration; + + Source(CassandraReadConfiguration config) throws IOException{ + this.configuration = config; + } + + @Override + public Coder getDefaultOutputCoder() { + return (Coder) SerializableCoder.of(configuration.get_entityName()); + } + + @Override + public void validate(){} + + private static class Reader extends BoundedReader{ + CassandraReadConfiguration configuration; + static Cluster cluster; + static Session session; + ResultSet rs; + Iterator itr; + Object currentEntity; + + Reader(CassandraReadConfiguration configuration){ + this.configuration = configuration; + } + + /* + * advances the reader to the next record + */ + @Override + public boolean advance() throws IOException { + if (itr.hasNext()) { + currentEntity = itr.next(); + return true; + } else { + return false; + } + } + + /* Creates a cassandra connection, session and resultSet and + * map the entity with resultSet and advances to the next result. + * Initializes the reader and advances the reader to the first record. + * @return true, if a record was read else false if there is no more input available. + */ + @Override + public boolean start() throws IOException { + cluster = Cluster.builder().addContactPoints(configuration.getHost()).withPort(configuration.getPort()).build(); + session = cluster.connect(); + rs = session.execute(configuration.getQuery()); + final MappingManager _manager =new MappingManager(session); + Mapper _mapper = null; + if(_mapper==null){ + _mapper = (Mapper) _manager.mapper(configuration.get_entityName()); + } + itr=_mapper.map(rs).iterator(); + return advance(); + } + + @Override + public void close() throws IOException { + session.close(); + cluster.close(); + } + + @Override + public Object getCurrent() throws NoSuchElementException { + return currentEntity; + } + + @Override + public Instant getCurrentTimestamp() throws NoSuchElementException { + return Instant.now(); + } + + @Override + public BoundedSource getCurrentSource() { + return null; + } + + } + + /* method splits a source into desired number of sources. + * (non-Javadoc) + * @see com.google.cloud.dataflow.sdk.io.BoundedSource#splitIntoBundles(long, com.google.cloud.dataflow.sdk.options.PipelineOptions) + * + * Cassandra cluster token range is split into desired number of smaller ranges + * queries are built from start and end token ranges + * split sources are built by providing token range queries and other configurations + * rowkey is the partitionkey of the cassandara table + */ + @Override + public List splitIntoBundles(long desiredSplits, + PipelineOptions paramPipelineOptions) throws Exception { + int max_exponent =127; + long split_exponent = max_exponent/desiredSplits; + long token_split_range = (long) Math.pow(2, (split_exponent)); + long min_start_token = -(long) Math.pow(2, max_exponent); + long start_token=0L,end_token = 0; + List sourceList = new ArrayList<>(); + if(desiredSplits==1){ + sourceList.add(this); + return sourceList; + } + String query = null; + for(int split =1;split<=desiredSplits;split++){ + if(split==1){ //for first split will query token range starting from -2**(127-1) + start_token = min_start_token; + end_token = token_split_range; + query = queryBuilder(start_token, end_token); + configuration.setQuery(query); + sourceList.add(new CassandraReadIO.Source( + new CassandraReadConfiguration(configuration.getHost(), configuration.getKeypace(), + configuration.getPort(), configuration.getTable(),configuration.getQuery(),configuration.getRowKey(),configuration.get_entityName()))); + }else{ + token_split_range = (long) Math.pow(2, (split_exponent)); + start_token = end_token; + + end_token = token_split_range; + query = queryBuilder(start_token, end_token); + configuration.setQuery(query); + sourceList.add(new CassandraReadIO.Source( + new CassandraReadConfiguration(configuration.getHost(), configuration.getKeypace(), + configuration.getPort(), configuration.getTable(),configuration.getQuery(),configuration.getRowKey(),configuration.get_entityName()))); + } + split_exponent = split_exponent*2; + } + return sourceList; + } + /* + * builds query using starttoken and endtoken + */ + public String queryBuilder(long startToken,long endToken){ + String query= QueryBuilder.select().from(configuration.getKeypace(),configuration.getTable()).where(QueryBuilder.gte("token("+configuration.getRowKey()+")",startToken)).and(QueryBuilder.lt("token("+configuration.getRowKey()+")",endToken)).toString(); + return query; + } + + + @Override + public long getEstimatedSizeBytes(PipelineOptions paramPipelineOptions) + throws Exception { + return 0; + } + + @Override + public boolean producesSortedKeys(PipelineOptions paramPipelineOptions) + throws Exception { + return false; + } + + @Override + public BoundedReader createReader(PipelineOptions paramPipelineOptions) + throws IOException { + return new Reader(configuration); + } + } + + public static BoundedSource read( + CassandraReadConfiguration config) throws IOException { + return new Source(config); + } + + +} diff --git a/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithMockTest.java b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithMockTest.java new file mode 100644 index 000000000000..39a42783df4c --- /dev/null +++ b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithMockTest.java @@ -0,0 +1,148 @@ +package com.google.cloud.cassandra.dataflow.io; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.datastax.driver.mapping.annotations.Table; +import com.google.cloud.cassandra.dataflow.io.CassandraReadIO.Source; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.BoundedSource; +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.Flatten; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionList; + +public class CassandraReadIOWithMockTest { + private static CassandraReadIO.Source mockCassandraRead; + private static int desiredNoOfSplits; + private static Iterator mockIterator; + private static List mockSplitedSourceList; + + private static PipelineOptions mockPipelineOptions; + private static Pipeline mockPipeline; + + /** + * Initial setup for mocking objects + */ + @Before + public void setUp() { + mockPipelineOptions = Mockito.spy(PipelineOptions.class); + mockPipeline = Mockito.mock(Pipeline.class); + mockCassandraRead = Mockito.mock(CassandraReadIO.Source.class); + mockSplitedSourceList = Mockito.spy(ArrayList.class); + mockIterator = Mockito.spy(Iterator.class); + } + + /** + * Test for checking single source split and PCollection object + */ + @Test + public void testToGetSingleSource() { + try { + desiredNoOfSplits = 1; + mockSplitedSourceList.add(mockCassandraRead); + Mockito.when(mockCassandraRead.splitIntoBundles(desiredNoOfSplits, mockPipelineOptions)).thenReturn(mockSplitedSourceList); + + Assert.assertEquals(1, mockSplitedSourceList.size()); + + Mockito.when(mockSplitedSourceList.iterator()).thenReturn(mockIterator); + PCollection mockPCollection = Mockito.mock(PCollection.class); + Mockito.when(mockPipeline.apply(Read.from((Source) mockIterator.next()))).thenReturn(mockPCollection); + mockPipeline.run(); + Assert.assertNotNull(mockPCollection); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * Test for checking multiple source splits and PCollection Object + */ + @Test + public void testToGetMultipleSplitedSource() { + try { + desiredNoOfSplits = 2; + mockSplitedSourceList.add(mockCassandraRead); + mockSplitedSourceList.add(mockCassandraRead); + Mockito.when(mockCassandraRead.splitIntoBundles(desiredNoOfSplits, mockPipelineOptions)).thenReturn(mockSplitedSourceList); + + Assert.assertEquals(2, mockSplitedSourceList.size()); + PCollectionList mockPCollectionList = Mockito.mock(PCollectionList.class); + Mockito.when(mockSplitedSourceList.iterator()).thenReturn(mockIterator); + Mockito.when(mockIterator.hasNext()).thenReturn(true,false); + Mockito.when(mockPipeline.apply(Read.from((Source) mockIterator.next()))).thenReturn(mockPCollectionList); + + PCollection mockMergedPColl = Mockito.mock(PCollection.class); + Mockito.when(mockPCollectionList.apply(Flatten.pCollections())).thenReturn(mockMergedPColl); + mockPipeline.run(); + Assert.assertNotNull(mockMergedPColl); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * static inner class contains employee details + */ + + @Table(name = "emp_info1", keyspace = "demo1") + public static class EmployeeDetails implements Serializable { + + private static final long serialVersionUID = 1L; + private int emp_id; + private String emp_first; + private String emp_last; + private String emp_address; + private String emp_dept; + + public int getEmp_id() { + return emp_id; + } + + public void setEmp_id(int emp_id) { + this.emp_id = emp_id; + } + + public String getEmp_first() { + return emp_first; + } + + public void setEmp_first(String emp_first) { + this.emp_first = emp_first; + } + + public String getEmp_last() { + return emp_last; + } + + public void setEmp_last(String emp_last) { + this.emp_last = emp_last; + } + + public String getEmp_address() { + return emp_address; + } + + public void setEmp_address(String emp_address) { + this.emp_address = emp_address; + } + + public String getEmp_dept() { + return emp_dept; + } + + public void setEmp_dept(String emp_dept) { + this.emp_dept = emp_dept; + } + + } + +} diff --git a/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithoutMockTest.java b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithoutMockTest.java new file mode 100644 index 000000000000..ace386d56386 --- /dev/null +++ b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithoutMockTest.java @@ -0,0 +1,186 @@ +package com.google.cloud.cassandra.dataflow.io; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.mapping.MappingManager; +import com.datastax.driver.mapping.annotations.Table; +import com.google.cloud.cassandra.dataflow.io.CassandraReadIO.Source; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.io.BoundedSource; +import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.Flatten; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PCollectionList; + +/** + * Class contains JUnit test case that can be tested in cloud. + */ +public class CassandraReadIOWithoutMockTest { + private static String[] hosts; + private static int port; + private static String keyspace; + private static Class entityName; + private static String query; + private static String tableName; + private static String rowKey; + private static int desiredNoOfSplits; + + private static Cluster cluster; + private static Session session; + private static MappingManager manager; + + private static PipelineOptions options; + private static Pipeline p; + + /** + * Initial setup for cassandra connection hosts : cassandra server hosts + * keyspace : schema name port : port of the cassandra server entityName : + * is the POJO class query : simple query conditionalBasedQuery : + * conditional based query + */ + @BeforeClass + public static void oneTimeSetUp() { + hosts = new String[] { "localhost" }; + keyspace = "demo1"; + port = 9042; + tableName = "emp_info1"; + rowKey = "emp_id"; + entityName = CassandraReadIOWithoutMockTest.EmployeeDetails.class; + query = QueryBuilder.select().all().from(keyspace, tableName) + .toString(); + + } + + /** + * Creating a pipeline + */ + @Before + public void setUp() { + options = PipelineOptionsFactory.create(); + p = Pipeline.create(options); + } + + /** + * Test for checking single source split and PCollection object + */ + @Test + public void testToGetSingleSource() { + try { + desiredNoOfSplits = 1; + List splitedSourceList = (List) new CassandraReadIO.Source( + new CassandraReadConfiguration(hosts, keyspace, 9042, + tableName, query, rowKey, entityName)) + .splitIntoBundles(desiredNoOfSplits, options); + Assert.assertEquals(1, splitedSourceList.size()); + + Iterator itr = splitedSourceList.iterator(); + CassandraReadIO.Source cs = (Source) itr.next(); + PCollection pCollection = (PCollection) p.apply(Read.from((Source) cs)); + p.run(); + Assert.assertNotNull(pCollection); + + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * Test for checking multiple source splits and PCollection Object + */ + @Test + public void testToGetMultipleSplitedSource() { + try { + desiredNoOfSplits = 4; + List splitedSourceList = (List) new CassandraReadIO.Source( + new CassandraReadConfiguration(hosts, keyspace, port, + tableName, "", rowKey, entityName)) + .splitIntoBundles(desiredNoOfSplits, options); + Assert.assertEquals(4, splitedSourceList.size()); + + Iterator itr = splitedSourceList.iterator(); + List pcoll = new ArrayList(); + while (itr.hasNext()) { + CassandraReadIO.Source cs = (Source) itr.next(); + pcoll.add((PCollection)p.apply(Read.from((Source) cs))); + } + PCollectionList pCollectionList = PCollectionList.of(pcoll.get(0)) + .and(pcoll.get(1)).and(pcoll.get(2)).and(pcoll.get(3)); + PCollection merged = (PCollection) pCollectionList.apply(Flatten + .pCollections()); + p.run(); + Assert.assertNotNull(merged); + } catch (Exception e) { + e.printStackTrace(); + } + } + + /** + * static inner class contains employee details + */ + + @Table(name = "emp_info1", keyspace = "demo1") + public static class EmployeeDetails implements Serializable { + + private static final long serialVersionUID = 1L; + private int emp_id; + private String emp_first; + private String emp_last; + private String emp_address; + private String emp_dept; + + public int getEmp_id() { + return emp_id; + } + + public void setEmp_id(int emp_id) { + this.emp_id = emp_id; + } + + public String getEmp_first() { + return emp_first; + } + + public void setEmp_first(String emp_first) { + this.emp_first = emp_first; + } + + public String getEmp_last() { + return emp_last; + } + + public void setEmp_last(String emp_last) { + this.emp_last = emp_last; + } + + public String getEmp_address() { + return emp_address; + } + + public void setEmp_address(String emp_address) { + this.emp_address = emp_address; + } + + public String getEmp_dept() { + return emp_dept; + } + + public void setEmp_dept(String emp_dept) { + this.emp_dept = emp_dept; + } + + } + +} diff --git a/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassndaraIOWithMockTest.java b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassndaraIOWithMockTest.java new file mode 100644 index 000000000000..d5187ba3903f --- /dev/null +++ b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassndaraIOWithMockTest.java @@ -0,0 +1,97 @@ +package com.google.cloud.cassandra.dataflow.io; + +import static org.junit.Assert.assertNotNull; + +import java.io.Serializable; +import java.util.Arrays; +import java.util.Random; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import com.datastax.driver.mapping.annotations.Column; +import com.datastax.driver.mapping.annotations.Table; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.PipelineResult; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PDone; + +/** + * Class to test CassndraIO.Write using mock objects + */ + +public class CassndaraIOWithMockTest { + private CassandraIO.Write.Bound mockCassndaraIOWriter; + private PCollection pcollection; + private Pipeline pipeline; + + /** + * Create mocked objects for CassandraIO.Write.Bound + * set method returns for mocked object + **/ + @Before + public void setup() { + mockCassndaraIOWriter = Mockito.mock(CassandraIO.Write.Bound.class); + Mockito.when(mockCassndaraIOWriter.getHosts()).thenReturn(new String[] { "localhost"}); + Mockito.when(mockCassndaraIOWriter.getPort()).thenReturn(9042); + Mockito.when(mockCassndaraIOWriter.getKeyspace()).thenReturn("dev_keyspace"); + + PipelineOptions options = PipelineOptionsFactory.create(); + pipeline = Pipeline.create(options); + pcollection = pipeline.apply(Create.of(Arrays + .asList(getCloumnFamilyRows()))); + + Mockito.when(mockCassndaraIOWriter.apply(pcollection)).thenReturn( + PDone.in(pipeline)); + } + + /** To Test Write to Cassandra **/ + @Test + public void cassandraWriteTest() { + PDone pdone = mockCassndaraIOWriter.apply(pcollection); + System.out.println("Ponde "+pdone); + PipelineResult result = pipeline.run(); + assertNotNull(result); + } + + /** Get entities to persist */ + private Person[] getCloumnFamilyRows() { + Random rn = new Random(); + int range = 10000; + int pId = rn.nextInt(range); + CassndaraIOWithMockTest.Person person = new CassndaraIOWithMockTest.Person(); + person.setPersonId(pId); + person.setName("PERSON-" + pId); + Person[] persons = { person }; + return persons; + } + + @Table(name = "person") + public static class Person implements Serializable { + private static final long serialVersionUID = 1L; + @Column(name = "PERSON_NAME") + private String name; + @Column(name = "PERSON_ID") + private int personId; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getPersonId() { + return personId; + } + + public void setPersonId(int personId) { + this.personId = personId; + } + } + +} diff --git a/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassndaraIOWithoutMockTest.java b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassndaraIOWithoutMockTest.java new file mode 100644 index 000000000000..49d4f07e910b --- /dev/null +++ b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassndaraIOWithoutMockTest.java @@ -0,0 +1,179 @@ +package com.google.cloud.cassandra.dataflow.io; + +import static org.junit.Assert.assertNotNull; + +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Random; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.AlreadyExistsException; +import com.datastax.driver.mapping.annotations.Column; +import com.datastax.driver.mapping.annotations.Table; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.PipelineResult; +import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.PDone; + +/** + * To test write to cassandara. + * Ton run this test, cassandra instance is required in local/google cloud. This test requires a + * columnFamily (table) "Person" in cassandra database. under "dev_keyspace". + * Following is the structure of "Person" table CREATE TABLE dev_keyspace.person + * (person_id int PRIMARY KEY, person_name text); + * + */ +public class CassndaraIOWithoutMockTest { + + private CassandraIO.Write.Bound CassndaraIOWriter; + private String[] hosts; + private String keyspace; + private int port; + private Cluster cluster; + private Session session; + + + /** set hosts,keyspace,port + * Connect to Cassandra + * Create Keyspace + * Use Created Keyspace + * Create Column family + * Create CassandraIO Writer + **/ + + @Before + public void setup() { + this.hosts = new String[] { "localhost" }; + this.keyspace = "dev_keyspace"; + this.port = 9042; + try{ + connect(getCassandaraHostAndPort()); + createKeyspace(); + useKeyspace(); + createColumnFamily(); + CassndaraIOWriter = new CassandraIO.Write.Bound(hosts, keyspace, port); + }catch(Exception e){ + e.printStackTrace(); + } + } + + /** Connect to Cassandra */ + public void connect(Collection inetSocketAddresses) { + cluster = Cluster.builder() + .addContactPointsWithPorts(inetSocketAddresses).build(); + session = cluster.connect(); + } + + /** Create Keyspace Keyspace,ignore exception if already exists */ + private void createKeyspace() { + try { + session.execute("CREATE KEYSPACE " + keyspace + + " WITH replication " + + "= {'class':'SimpleStrategy', 'replication_factor':3};"); + } catch (AlreadyExistsException ex) { + } + } + + /** Use Created Keyspace */ + private void useKeyspace() { + session.execute("USE " + keyspace); + } + + /** Create Column Family ,ignore exception if already exists */ + private void createColumnFamily() { + try { + session.execute("CREATE TABLE " + "person(" + + "person_id int PRIMARY KEY" + "," + "person_name text);"); + } catch (AlreadyExistsException ex) { + } + } + + + /** Create Cassandra ip socket address collection */ + private Collection getCassandaraHostAndPort() { + Collection addresses = new ArrayList(); + for (String host : hosts) { + addresses.add(new InetSocketAddress(host, port)); + } + return addresses; + } + + /** To Test Write to Cassandra **/ + @Test + public void cassandraWriteTest() { + try{ + PipelineOptions options = PipelineOptionsFactory.create(); + Pipeline p = Pipeline.create(options); + @SuppressWarnings("rawtypes") + PCollection pcollection = p.apply(Create.of(Arrays + .asList(getCloumnFamilyRows()))); + @SuppressWarnings("unused") + PDone pDone = CassndaraIOWriter.apply(pcollection); + PipelineResult result=null; + result= p.run(); + assertNotNull(result); + }catch(Exception e){ + Assert.fail("Test failed"); + } + } + + /** Get entities to persist */ + private Person[] getCloumnFamilyRows() { + Random rn = new Random(); + int range = 10000; + int pId = rn.nextInt(range); + CassndaraIOWithoutMockTest.Person person = new CassndaraIOWithoutMockTest.Person(); + person.setPersonId(pId); + person.setName("PERSON-" + pId); + Person[] persons = { person }; + return persons; + } + + @Table(name = "person") + public static class Person implements Serializable { + private static final long serialVersionUID = 1L; + @Column(name = "PERSON_NAME") + private String name; + @Column(name = "PERSON_ID") + private int personId; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public int getPersonId() { + return personId; + } + + public void setPersonId(int personId) { + this.personId = personId; + } + } + + @After + public void closeResources() { + try{ + cluster.close(); + session.close(); + }catch(Exception e){ + e.printStackTrace(); + } + } + +} From f61863bff0f91a2a345720ef83f0c397ac793589 Mon Sep 17 00:00:00 2001 From: unknown Date: Thu, 14 Apr 2016 17:41:11 +0530 Subject: [PATCH 2/3] implemented getEstimatedSizeBytes --- contrib/cassandra-connector/pom.xml | 12 +- .../io/CassandraReadConfiguration.java | 24 +- .../dataflow/io/CassandraReadIO.java | 312 +++++++++++++----- .../io/CassandraReadIOWithMockTest.java | 24 +- .../io/CassandraReadIOWithoutMockTest.java | 50 ++- 5 files changed, 311 insertions(+), 111 deletions(-) diff --git a/contrib/cassandra-connector/pom.xml b/contrib/cassandra-connector/pom.xml index a30f255ccc9d..48fb693a062d 100644 --- a/contrib/cassandra-connector/pom.xml +++ b/contrib/cassandra-connector/pom.xml @@ -10,10 +10,10 @@ google-cloud-dataflow-cassandra-connector 0.0.1-SNAPSHOT cassandra-connector - + source and sink connector for cassandra - - + + org.apache.maven.plugins @@ -140,5 +140,11 @@ mockito-all 1.10.19 + + + org.apache.cassandra + cassandra-all + 3.2.1 + \ No newline at end of file diff --git a/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadConfiguration.java b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadConfiguration.java index dc6068d0024f..c51e5c0e5d60 100644 --- a/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadConfiguration.java +++ b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadConfiguration.java @@ -1,9 +1,12 @@ package com.google.cloud.cassandra.dataflow.io; + +import java.io.Serializable; + /* * Configuration class that holds the details required for cassandra connection */ -public class CassandraReadConfiguration { - +public class CassandraReadConfiguration implements Serializable { + private static final long serialVersionUID = 1L; private String[] host; private String keypace; private int port; @@ -11,9 +14,10 @@ public class CassandraReadConfiguration { private String query; private String rowKey; private Class entityName; + private int jmxPort; CassandraReadConfiguration(String[] hosts, String keyspace, int port, - String table,String query,String rowKey,Class entityName) { + String table,String query,String rowKey,Class entityName,int jmxport) { this.host = hosts; this.keypace = keyspace; this.port = port; @@ -21,6 +25,7 @@ public class CassandraReadConfiguration { this.query = query; this.rowKey = rowKey; this.entityName = entityName; + this.jmxPort = jmxPort; } public String[] getHost() { @@ -64,7 +69,12 @@ public void setQuery(String query) { } public String getRowKey() { + if(!rowKey.isEmpty()){ return rowKey; + } + else{ + return null; + } } public void setRowKey(String rowKey) { @@ -78,4 +88,12 @@ public Class get_entityName() { public void set_entityName(Class _entityName) { this.entityName = _entityName; } + + public int getJmxPort() { + return jmxPort; + } + + public void setJmxPort(int jmxHost) { + this.jmxPort = jmxHost; + } } diff --git a/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIO.java b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIO.java index d791dfc9ec54..6c98c4255b89 100644 --- a/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIO.java +++ b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIO.java @@ -1,16 +1,22 @@ package com.google.cloud.cassandra.dataflow.io; + +import static com.google.common.base.Strings.isNullOrEmpty; + import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.regex.Pattern; import javax.swing.text.html.parser.Entity; +import org.apache.cassandra.tools.NodeProbe; import org.joda.time.Instant; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; import com.datastax.driver.core.Session; import com.datastax.driver.core.querybuilder.QueryBuilder; import com.datastax.driver.mapping.Mapper; @@ -18,37 +24,53 @@ import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.SerializableCoder; import com.google.cloud.dataflow.sdk.io.BoundedSource; +import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.common.base.Preconditions; public class CassandraReadIO { - - static class Source extends BoundedSource{ + //A Bounded source to read from cassandra + static class Source extends BoundedSource { CassandraReadConfiguration configuration; + private static final String SSTABLE_DISK_SPACE = "LiveDiskSpaceUsed"; + private static final String MEMTABLE_SPACE = "MemtableLiveDataSize"; + private static final int DEFAULT_JMX_PORT = 7199; + private static String DEFAULT_ROW_COUNT_QUERY = null; + private static final String FROM_KEYWORD = "from"; + private static final String WHERE_CLAUSE = "where"; - Source(CassandraReadConfiguration config) throws IOException{ + Source(CassandraReadConfiguration config) throws IOException { this.configuration = config; + validate(); } @Override public Coder getDefaultOutputCoder() { - return (Coder) SerializableCoder.of(configuration.get_entityName()); + return (Coder) SerializableCoder.of(configuration + .get_entityName()); } @Override - public void validate(){} - - private static class Reader extends BoundedReader{ + public void validate() { + Preconditions.checkNotNull(configuration.getHost(), "host"); + Preconditions.checkNotNull(configuration.getPort(), "port"); + Preconditions.checkNotNull(configuration.getKeypace(), "keypace"); + Preconditions.checkNotNull(configuration.getTable(), "table"); + Preconditions.checkNotNull(configuration.getRowKey(),"rowKey/partionKey"); + } + + private static class Reader extends BoundedReader { CassandraReadConfiguration configuration; static Cluster cluster; static Session session; ResultSet rs; Iterator itr; Object currentEntity; - - Reader(CassandraReadConfiguration configuration){ + + Reader(CassandraReadConfiguration configuration) { this.configuration = configuration; } - + /* * advances the reader to the next record */ @@ -62,22 +84,29 @@ public boolean advance() throws IOException { } } - /* Creates a cassandra connection, session and resultSet and - * map the entity with resultSet and advances to the next result. - * Initializes the reader and advances the reader to the first record. - * @return true, if a record was read else false if there is no more input available. + /* + * Creates a cassandra connection, session and resultSet and map the + * entity with resultSet and advances to the next result. + * Initializes the reader and advances the reader to the first + * record. + * + * @return true, if a record was read else false if there is no more + * input available. */ @Override - public boolean start() throws IOException { - cluster = Cluster.builder().addContactPoints(configuration.getHost()).withPort(configuration.getPort()).build(); + public boolean start() throws IOException { + cluster = Cluster.builder() + .addContactPoints(configuration.getHost()) + .withPort(configuration.getPort()).build(); session = cluster.connect(); rs = session.execute(configuration.getQuery()); - final MappingManager _manager =new MappingManager(session); - Mapper _mapper = null; - if(_mapper==null){ - _mapper = (Mapper) _manager.mapper(configuration.get_entityName()); - } - itr=_mapper.map(rs).iterator(); + final MappingManager _manager = new MappingManager(session); + Mapper _mapper = null; + if (_mapper == null) { + _mapper = (Mapper) _manager.mapper(configuration + .get_entityName()); + } + itr = _mapper.map(rs).iterator(); return advance(); } @@ -100,70 +129,204 @@ public Instant getCurrentTimestamp() throws NoSuchElementException { @Override public BoundedSource getCurrentSource() { return null; - } - + } + } - /* method splits a source into desired number of sources. - * (non-Javadoc) - * @see com.google.cloud.dataflow.sdk.io.BoundedSource#splitIntoBundles(long, com.google.cloud.dataflow.sdk.options.PipelineOptions) + /* + * method splits a source into desired number of sources. (non-Javadoc) * - * Cassandra cluster token range is split into desired number of smaller ranges - * queries are built from start and end token ranges - * split sources are built by providing token range queries and other configurations - * rowkey is the partitionkey of the cassandara table + * @see + * com.google.cloud.dataflow.sdk.io.BoundedSource#splitIntoBundles(long, + * com.google.cloud.dataflow.sdk.options.PipelineOptions) + * + * Cassandra cluster token range is split into desired number of smaller + * ranges. queries are built from start and end token ranges split + * sources are built by providing token range queries and other + * configurations rowkey is the partitionkey of the cassandara table */ @Override - public List splitIntoBundles(long desiredSplits, - PipelineOptions paramPipelineOptions) throws Exception { - int max_exponent =127; - long split_exponent = max_exponent/desiredSplits; - long token_split_range = (long) Math.pow(2, (split_exponent)); + public List splitIntoBundles(long desiredBundleSizeBytes, + PipelineOptions options) throws Exception { + int max_exponent = 63; + long numSplits=10; long min_start_token = -(long) Math.pow(2, max_exponent); - long start_token=0L,end_token = 0; - List sourceList = new ArrayList<>(); - if(desiredSplits==1){ + long start_token = 0L, end_token = 0; + List sourceList = new ArrayList(); + try{ + if(desiredBundleSizeBytes >0){ + numSplits = getEstimatedSizeBytes(options)/desiredBundleSizeBytes; + } + } catch (Exception e) { + // set numSplits equal to Dataflow's numWorkers + DataflowPipelineWorkerPoolOptions poolOptions = + options.as(DataflowPipelineWorkerPoolOptions.class); + if (poolOptions.getNumWorkers() > 0) { + numSplits = poolOptions.getNumWorkers(); + } else { + //Default to 10 , equals to Cassandra maxConnectionPerHost + numSplits = 10; + } + } + + if(numSplits <=0){ + numSplits =1; + } + if(numSplits>63){ //restrict numSplits to max token range exponent + numSplits = 63; + } + long split_exponent = max_exponent / numSplits; + long token_split_range = (long) Math.pow(2, (split_exponent)); + + // If the desiredBundleSize or number of workers results in 1 split, simply return current source + if (numSplits == 1) { sourceList.add(this); return sourceList; } - String query = null; - for(int split =1;split<=desiredSplits;split++){ - if(split==1){ //for first split will query token range starting from -2**(127-1) - start_token = min_start_token; - end_token = token_split_range; - query = queryBuilder(start_token, end_token); - configuration.setQuery(query); - sourceList.add(new CassandraReadIO.Source( - new CassandraReadConfiguration(configuration.getHost(), configuration.getKeypace(), - configuration.getPort(), configuration.getTable(),configuration.getQuery(),configuration.getRowKey(),configuration.get_entityName()))); - }else{ + String splitQuery = null; + for (int split = 1; split <= numSplits; split++) { + if (split == 1) { // for first split will query token range + // starting from -2**(63) + start_token = min_start_token; + end_token = token_split_range; + splitQuery = queryBuilder(start_token, end_token); + configuration.setQuery(splitQuery); + sourceList.add(new CassandraReadIO.Source(configuration)); + System.out.println("first split query "+ configuration.getQuery()); + } else { token_split_range = (long) Math.pow(2, (split_exponent)); - start_token = end_token; - - end_token = token_split_range; - query = queryBuilder(start_token, end_token); - configuration.setQuery(query); - sourceList.add(new CassandraReadIO.Source( - new CassandraReadConfiguration(configuration.getHost(), configuration.getKeypace(), - configuration.getPort(), configuration.getTable(),configuration.getQuery(),configuration.getRowKey(),configuration.get_entityName()))); - } - split_exponent = split_exponent*2; + start_token = end_token; + end_token = token_split_range; + splitQuery = queryBuilder(start_token, end_token); + configuration.setQuery(splitQuery); + sourceList.add(new CassandraReadIO.Source(configuration)); + System.out.println(split + " split query "+ configuration.getQuery()); + } + split_exponent = split_exponent * 2; } return sourceList; } + /* - * builds query using starttoken and endtoken + * builds query using startToken and endToken */ - public String queryBuilder(long startToken,long endToken){ - String query= QueryBuilder.select().from(configuration.getKeypace(),configuration.getTable()).where(QueryBuilder.gte("token("+configuration.getRowKey()+")",startToken)).and(QueryBuilder.lt("token("+configuration.getRowKey()+")",endToken)).toString(); + public String queryBuilder(long startToken, long endToken) { + String query = QueryBuilder + .select() + .from(configuration.getKeypace(), configuration.getTable()) + .where(QueryBuilder.gte( + "token(" + configuration.getRowKey() + ")", + startToken)) + .and(QueryBuilder.lt("token(" + configuration.getRowKey() + + ")", endToken)).toString(); return query; } - - + @Override - public long getEstimatedSizeBytes(PipelineOptions paramPipelineOptions) + // Cassandra provides no direct way to get the estimate of a query + // result size in bytes + // As a rough approximation, + //1. We fetch the byte count for entire column family + //2. we get the row count for column family and row count for supplied query + //3. we calculate the query resultSet byte size + public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { - return 0; + NodeProbe probe = null; + Long estimatedByteSize = 0l; + Long totalCfByteSize = 0l; + DEFAULT_ROW_COUNT_QUERY = "select count(*) from" + "\t" + + configuration.getKeypace() + "." + + configuration.getTable(); + try { + if (configuration.getJmxPort() == 0) { + probe = new NodeProbe(configuration.getHost()[0], + DEFAULT_JMX_PORT); + } else { + probe = new NodeProbe(configuration.getHost()[0], + configuration.getJmxPort()); + } + totalCfByteSize = (Long) probe.getColumnFamilyMetric( + configuration.getKeypace(), configuration.getTable(), + SSTABLE_DISK_SPACE) + (Long) probe.getColumnFamilyMetric( + configuration.getKeypace(), configuration.getTable(), + MEMTABLE_SPACE) ; + + // start + if (configuration.getQuery() != null + && !configuration.getQuery().isEmpty() + && Pattern + .compile( + WHERE_CLAUSE, + Pattern.CASE_INSENSITIVE + + Pattern.LITERAL) + .matcher(configuration.getQuery()).find()) { + estimatedByteSize = getQueryResultBytesSize(totalCfByteSize); + if (estimatedByteSize == 0) { + estimatedByteSize = 1l; + } + } else { + estimatedByteSize = totalCfByteSize; + } + + } catch (IOException e) { + throw e; + } finally { + if (probe != null) + probe.close(); + } + return estimatedByteSize; + } + + // Get the query to fetch row count + // for entire column family or for supplied query + private String getQueryForRowCount() { + String rowCountQuery; + if ((configuration.getQuery() != null && !configuration.getQuery() + .isEmpty()) + && Pattern + .compile(WHERE_CLAUSE, + Pattern.CASE_INSENSITIVE + Pattern.LITERAL) + .matcher(configuration.getQuery()).find()) { + rowCountQuery = "select count(*)\t" + + configuration.getQuery().substring( + configuration.getQuery().indexOf(FROM_KEYWORD)); + } else { + rowCountQuery = DEFAULT_ROW_COUNT_QUERY; + } + return rowCountQuery; + } + + //Get the resultSet size in bytes for the supplied query + private long getQueryResultBytesSize(long cfByteSize) throws Exception { + long CoumanFamilyByteSize = cfByteSize; + long totalQueryByteSize = 0l; + long totalCfRowCount = getRowCount(DEFAULT_ROW_COUNT_QUERY); + long queryRowCount = getRowCount(getQueryForRowCount()); + int percentage = (int) ((queryRowCount * 100) / totalCfRowCount); + totalQueryByteSize = CoumanFamilyByteSize * percentage / 100; + return totalQueryByteSize; + } + //Get row count for the supplied query + private long getRowCount(String query) { + Cluster cluster = null; + Session session = null; + long rowCount = 0l; + try { + cluster = Cluster.builder() + .addContactPoints(configuration.getHost()) + .withPort(configuration.getPort()).build(); + session = cluster.connect(); + ResultSet queryResult = session.execute(query); + Row row = queryResult.one(); + rowCount = row.getLong("count"); + } catch (Exception ex) { + throw ex; + } finally { + if (session != null) { + session.close(); + } + } + return rowCount; } @Override @@ -178,11 +341,10 @@ public BoundedReader createReader(PipelineOptions paramPipelineOptions) return new Reader(configuration); } } - - public static BoundedSource read( - CassandraReadConfiguration config) throws IOException { - return new Source(config); - } - - -} + + public static BoundedSource read(CassandraReadConfiguration config) + throws IOException { + return new Source(config); + } + +} diff --git a/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithMockTest.java b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithMockTest.java index 39a42783df4c..0e5f44d12f11 100644 --- a/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithMockTest.java +++ b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithMockTest.java @@ -15,14 +15,17 @@ import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.io.BoundedSource; import com.google.cloud.dataflow.sdk.io.Read; +import com.google.cloud.dataflow.sdk.io.Read.Bounded; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.transforms.Flatten; +import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollectionList; +import com.google.cloud.dataflow.sdk.values.POutput; public class CassandraReadIOWithMockTest { private static CassandraReadIO.Source mockCassandraRead; - private static int desiredNoOfSplits; + private long desiredBundleSizeBytes = 64 * (1 << 20); private static Iterator mockIterator; private static List mockSplitedSourceList; @@ -36,26 +39,23 @@ public class CassandraReadIOWithMockTest { public void setUp() { mockPipelineOptions = Mockito.spy(PipelineOptions.class); mockPipeline = Mockito.mock(Pipeline.class); - mockCassandraRead = Mockito.mock(CassandraReadIO.Source.class); + mockCassandraRead = Mockito.mock(CassandraReadIO.Source.class,Mockito.withSettings().serializable()); mockSplitedSourceList = Mockito.spy(ArrayList.class); mockIterator = Mockito.spy(Iterator.class); } - /** * Test for checking single source split and PCollection object */ @Test public void testToGetSingleSource() { try { - desiredNoOfSplits = 1; - mockSplitedSourceList.add(mockCassandraRead); - Mockito.when(mockCassandraRead.splitIntoBundles(desiredNoOfSplits, mockPipelineOptions)).thenReturn(mockSplitedSourceList); + mockSplitedSourceList.add(mockCassandraRead); + Mockito.when(mockCassandraRead.splitIntoBundles(desiredBundleSizeBytes, mockPipelineOptions)).thenReturn(mockSplitedSourceList); Assert.assertEquals(1, mockSplitedSourceList.size()); - Mockito.when(mockSplitedSourceList.iterator()).thenReturn(mockIterator); PCollection mockPCollection = Mockito.mock(PCollection.class); - Mockito.when(mockPipeline.apply(Read.from((Source) mockIterator.next()))).thenReturn(mockPCollection); + Mockito.when(mockPipeline.apply(Mockito.mock(Bounded.class))).thenReturn(mockPCollection); mockPipeline.run(); Assert.assertNotNull(mockPCollection); } catch (Exception e) { @@ -69,16 +69,16 @@ public void testToGetSingleSource() { @Test public void testToGetMultipleSplitedSource() { try { - desiredNoOfSplits = 2; + desiredBundleSizeBytes = 1024; mockSplitedSourceList.add(mockCassandraRead); mockSplitedSourceList.add(mockCassandraRead); - Mockito.when(mockCassandraRead.splitIntoBundles(desiredNoOfSplits, mockPipelineOptions)).thenReturn(mockSplitedSourceList); + Mockito.when(mockCassandraRead.splitIntoBundles(desiredBundleSizeBytes, mockPipelineOptions)).thenReturn(mockSplitedSourceList); - Assert.assertEquals(2, mockSplitedSourceList.size()); + Assert.assertNotNull(mockSplitedSourceList.size()); PCollectionList mockPCollectionList = Mockito.mock(PCollectionList.class); Mockito.when(mockSplitedSourceList.iterator()).thenReturn(mockIterator); Mockito.when(mockIterator.hasNext()).thenReturn(true,false); - Mockito.when(mockPipeline.apply(Read.from((Source) mockIterator.next()))).thenReturn(mockPCollectionList); + Mockito.when(mockPipeline.apply(Mockito.mock(Bounded.class))).thenReturn(mockPCollectionList); PCollection mockMergedPColl = Mockito.mock(PCollection.class); Mockito.when(mockPCollectionList.apply(Flatten.pCollections())).thenReturn(mockMergedPColl); diff --git a/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithoutMockTest.java b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithoutMockTest.java index ace386d56386..e07281be83f0 100644 --- a/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithoutMockTest.java +++ b/contrib/cassandra-connector/src/test/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIOWithoutMockTest.java @@ -36,7 +36,7 @@ public class CassandraReadIOWithoutMockTest { private static String query; private static String tableName; private static String rowKey; - private static int desiredNoOfSplits; + private long desiredBundleSizeBytes = 64 * (1 << 20); private static Cluster cluster; private static Session session; @@ -61,7 +61,6 @@ public static void oneTimeSetUp() { entityName = CassandraReadIOWithoutMockTest.EmployeeDetails.class; query = QueryBuilder.select().all().from(keyspace, tableName) .toString(); - } /** @@ -76,19 +75,21 @@ public void setUp() { /** * Test for checking single source split and PCollection object */ + @Test public void testToGetSingleSource() { try { - desiredNoOfSplits = 1; + List splitedSourceList = (List) new CassandraReadIO.Source( new CassandraReadConfiguration(hosts, keyspace, 9042, - tableName, query, rowKey, entityName)) - .splitIntoBundles(desiredNoOfSplits, options); + tableName, query, rowKey, entityName, 7199)) + .splitIntoBundles(desiredBundleSizeBytes, options); Assert.assertEquals(1, splitedSourceList.size()); - + Iterator itr = splitedSourceList.iterator(); CassandraReadIO.Source cs = (Source) itr.next(); - PCollection pCollection = (PCollection) p.apply(Read.from((Source) cs)); + PCollection pCollection = (PCollection) p.apply(Read + .from((Source) cs)); p.run(); Assert.assertNotNull(pCollection); @@ -98,26 +99,39 @@ public void testToGetSingleSource() { } /** - * Test for checking multiple source splits and PCollection Object + * Test for checking multiple source splits and PCollection + * Object.desiredBundleSizeBytes value is assigned low so that more than one + * source can be built */ @Test public void testToGetMultipleSplitedSource() { try { - desiredNoOfSplits = 4; + desiredBundleSizeBytes = 1024; List splitedSourceList = (List) new CassandraReadIO.Source( new CassandraReadConfiguration(hosts, keyspace, port, - tableName, "", rowKey, entityName)) - .splitIntoBundles(desiredNoOfSplits, options); - Assert.assertEquals(4, splitedSourceList.size()); - + tableName, "", rowKey, entityName, 7199)) + .splitIntoBundles(desiredBundleSizeBytes, options); + Assert.assertNotNull(splitedSourceList.size()); + Iterator itr = splitedSourceList.iterator(); - List pcoll = new ArrayList(); + List sourceList = new ArrayList(); while (itr.hasNext()) { CassandraReadIO.Source cs = (Source) itr.next(); - pcoll.add((PCollection)p.apply(Read.from((Source) cs))); + sourceList.add((PCollection) p.apply(Read.from((Source) cs))); + } + PCollectionList pCollectionList = null; + Iterator sListItrerator = sourceList.iterator(); + + int pcollCount = 0; + while (sListItrerator.hasNext()) { + PCollection pCollection = (PCollection) sListItrerator.next(); + if (pcollCount == 0) { + pCollectionList = PCollectionList.of(pCollection); + } else { + pCollectionList = pCollectionList.and(pCollection); + } + pcollCount++; } - PCollectionList pCollectionList = PCollectionList.of(pcoll.get(0)) - .and(pcoll.get(1)).and(pcoll.get(2)).and(pcoll.get(3)); PCollection merged = (PCollection) pCollectionList.apply(Flatten .pCollections()); p.run(); @@ -133,7 +147,7 @@ public void testToGetMultipleSplitedSource() { @Table(name = "emp_info1", keyspace = "demo1") public static class EmployeeDetails implements Serializable { - + private static final long serialVersionUID = 1L; private int emp_id; private String emp_first; From 632fc1b44cbba1b37a0937b5197218582bbf8bb6 Mon Sep 17 00:00:00 2001 From: unknown Date: Fri, 15 Apr 2016 12:11:29 +0530 Subject: [PATCH 3/3] implemented getEstimatedSizeBytes --- .../dataflow/io/CassandraReadIO.java | 52 ++++++++----------- 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIO.java b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIO.java index 6c98c4255b89..66db79a5a0a8 100644 --- a/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIO.java +++ b/contrib/cassandra-connector/src/main/java/com/google/cloud/cassandra/dataflow/io/CassandraReadIO.java @@ -3,6 +3,7 @@ import static com.google.common.base.Strings.isNullOrEmpty; import java.io.IOException; +import java.math.BigInteger; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -29,9 +30,9 @@ import com.google.common.base.Preconditions; public class CassandraReadIO { - //A Bounded source to read from cassandra + //A Bounded source to read from Cassandra static class Source extends BoundedSource { - CassandraReadConfiguration configuration; + private CassandraReadConfiguration configuration; private static final String SSTABLE_DISK_SPACE = "LiveDiskSpaceUsed"; private static final String MEMTABLE_SPACE = "MemtableLiveDataSize"; private static final int DEFAULT_JMX_PORT = 7199; @@ -148,10 +149,9 @@ public BoundedSource getCurrentSource() { @Override public List splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception { - int max_exponent = 63; + int exponent = 63; long numSplits=10; - long min_start_token = -(long) Math.pow(2, max_exponent); - long start_token = 0L, end_token = 0; + long startToken,endToken = 0l; List sourceList = new ArrayList(); try{ if(desiredBundleSizeBytes >0){ @@ -168,41 +168,31 @@ public List splitIntoBundles(long desiredBundleSizeBytes, numSplits = 10; } } - + if(numSplits <=0){ numSplits =1; } - if(numSplits>63){ //restrict numSplits to max token range exponent - numSplits = 63; - } - long split_exponent = max_exponent / numSplits; - long token_split_range = (long) Math.pow(2, (split_exponent)); - // If the desiredBundleSize or number of workers results in 1 split, simply return current source if (numSplits == 1) { sourceList.add(this); return sourceList; } + + BigInteger startRange = new BigInteger(String.valueOf(-(long) Math.pow(2, exponent))); + BigInteger endRange= new BigInteger(String.valueOf((long) Math.pow(2, exponent))); + endToken = startRange.longValue(); + BigInteger incrmentValue = (endRange.subtract(startRange)).divide(new BigInteger(String.valueOf(numSplits))); String splitQuery = null; - for (int split = 1; split <= numSplits; split++) { - if (split == 1) { // for first split will query token range - // starting from -2**(63) - start_token = min_start_token; - end_token = token_split_range; - splitQuery = queryBuilder(start_token, end_token); - configuration.setQuery(splitQuery); - sourceList.add(new CassandraReadIO.Source(configuration)); - System.out.println("first split query "+ configuration.getQuery()); - } else { - token_split_range = (long) Math.pow(2, (split_exponent)); - start_token = end_token; - end_token = token_split_range; - splitQuery = queryBuilder(start_token, end_token); - configuration.setQuery(splitQuery); - sourceList.add(new CassandraReadIO.Source(configuration)); - System.out.println(split + " split query "+ configuration.getQuery()); + for (int splitCount=1; splitCount <=numSplits; splitCount++) { + startToken = endToken; + endToken = startToken+incrmentValue.longValue(); + if(splitCount == numSplits){ + endToken = (long) Math.pow(2, exponent); //set end token to max token value; } - split_exponent = split_exponent * 2; + splitQuery = queryBuilder(startToken, endToken); + configuration.setQuery(splitQuery); + sourceList.add(new CassandraReadIO.Source(configuration)); + } return sourceList; } @@ -250,7 +240,7 @@ public long getEstimatedSizeBytes(PipelineOptions options) SSTABLE_DISK_SPACE) + (Long) probe.getColumnFamilyMetric( configuration.getKeypace(), configuration.getTable(), MEMTABLE_SPACE) ; - + // start if (configuration.getQuery() != null && !configuration.getQuery().isEmpty()