From b8041fc3c01a5751314bd79bc5fc96b671dd4e81 Mon Sep 17 00:00:00 2001 From: Vincent Marquez Date: Thu, 9 Jun 2022 14:43:50 -0700 Subject: [PATCH] fixing the missing wrap around ring range read --- .../apache/beam/sdk/io/cassandra/ReadFn.java | 62 ++++++++++++++++--- .../sdk/io/cassandra/CassandraIOTest.java | 55 ++++++++++++++++ 2 files changed, 107 insertions(+), 10 deletions(-) diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java index e50010430e41..6bca1cf3d177 100644 --- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java @@ -22,6 +22,7 @@ import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; import com.datastax.driver.core.Token; +import java.math.BigInteger; import java.util.Collections; import java.util.Iterator; import java.util.Set; @@ -59,27 +60,68 @@ public void processElement(@Element Read read, OutputReceiver receiver) { for (RingRange rr : ringRanges) { Token startToken = session.getCluster().getMetadata().newToken(rr.getStart().toString()); Token endToken = session.getCluster().getMetadata().newToken(rr.getEnd().toString()); - ResultSet rs = - session.execute(preparedStatement.bind().setToken(0, startToken).setToken(1, endToken)); - Iterator iter = mapper.map(rs); - while (iter.hasNext()) { - T n = iter.next(); - receiver.output(n); + if (rr.isWrapping()) { + // A wrapping range is one that overlaps from the end of the partitioner range and its + // start (ie : when the start token of the split is greater than the end token) + // We need to generate two queries here : one that goes from the start token to the end + // of + // the partitioner range, and the other from the start of the partitioner range to the + // end token of the split. + outputResults( + session.execute(getLowestSplitQuery(read, partitionKey, rr.getEnd())), + receiver, + mapper); + outputResults( + session.execute(getHighestSplitQuery(read, partitionKey, rr.getStart())), + receiver, + mapper); + } else { + ResultSet rs = + session.execute( + preparedStatement.bind().setToken(0, startToken).setToken(1, endToken)); + outputResults(rs, receiver, mapper); } } if (read.ringRanges() == null) { ResultSet rs = session.execute(preparedStatement.bind()); - Iterator iter = mapper.map(rs); - while (iter.hasNext()) { - receiver.output(iter.next()); - } + outputResults(rs, receiver, mapper); } } catch (Exception ex) { LOG.error("error", ex); } } + private static void outputResults( + ResultSet rs, OutputReceiver outputReceiver, Mapper mapper) { + Iterator iter = mapper.map(rs); + while (iter.hasNext()) { + T n = iter.next(); + outputReceiver.output(n); + } + } + + private static String getHighestSplitQuery( + Read spec, String partitionKey, BigInteger highest) { + String highestClause = String.format("(token(%s) >= %d)", partitionKey, highest); + String finalHighQuery = + (spec.query() == null) + ? buildInitialQuery(spec, true) + highestClause + : spec.query() + " AND " + highestClause; + LOG.debug("CassandraIO generated a wrapAround query : {}", finalHighQuery); + return finalHighQuery; + } + + private static String getLowestSplitQuery(Read spec, String partitionKey, BigInteger lowest) { + String lowestClause = String.format("(token(%s) < %d)", partitionKey, lowest); + String finalLowQuery = + (spec.query() == null) + ? buildInitialQuery(spec, true) + lowestClause + : spec.query() + " AND " + lowestClause; + LOG.debug("CassandraIO generated a wrapAround query : {}", finalLowQuery); + return finalLowQuery; + } + private static String generateRangeQuery( Read spec, String partitionKey, Boolean hasRingRange) { final String rangeFilter = diff --git a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java index ae29883e5506..7196556abc7c 100644 --- a/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java +++ b/sdks/java/io/cassandra/src/test/java/org/apache/beam/sdk/io/cassandra/CassandraIOTest.java @@ -96,6 +96,7 @@ public class CassandraIOTest implements Serializable { private static final String CASSANDRA_KEYSPACE = "beam_ks"; private static final String CASSANDRA_HOST = "127.0.0.1"; private static final String CASSANDRA_TABLE = "scientist"; + private static final String CASSANDRA_TABLE_SIMPLEDATA = "simpledata"; private static final Logger LOG = LoggerFactory.getLogger(CassandraIOTest.class); private static final String STORAGE_SERVICE_MBEAN = "org.apache.cassandra.db:type=StorageService"; private static final int FLUSH_TIMEOUT = 30000; @@ -190,6 +191,10 @@ private static void insertData() throws Exception { "CREATE TABLE IF NOT EXISTS %s.%s(person_department text, person_id int, person_name text, PRIMARY KEY" + "((person_department), person_id));", CASSANDRA_KEYSPACE, CASSANDRA_TABLE_WRITE)); + session.execute( + String.format( + "CREATE TABLE IF NOT EXISTS %s.%s(id int, data text, PRIMARY KEY (id))", + CASSANDRA_KEYSPACE, CASSANDRA_TABLE_SIMPLEDATA)); LOG.info("Insert records"); String[][] scientists = { @@ -221,6 +226,15 @@ private static void insertData() throws Exception { CASSANDRA_TABLE); session.execute(insertStr); } + for (int i = 0; i < 100; i++) { + String insertStr = + String.format( + "INSERT INTO %s.%s(id, data) VALUES(" + i + ",' data_" + i + "');", + CASSANDRA_KEYSPACE, + CASSANDRA_TABLE_SIMPLEDATA); + session.execute(insertStr); + } + flushMemTablesAndRefreshSizeEstimates(); } @@ -273,6 +287,35 @@ private static void disableAutoCompaction() throws Exception { Thread.sleep(JMX_CONF_TIMEOUT); } + /* + Since we have enough data we will be able to detect if any get put in the ring range that wraps around + */ + @Test + public void testWrapAroundRingRanges() throws Exception { + PCollection simpledataPCollection = + pipeline.apply( + CassandraIO.read() + .withHosts(Collections.singletonList(CASSANDRA_HOST)) + .withPort(cassandraPort) + .withKeyspace(CASSANDRA_KEYSPACE) + .withTable(CASSANDRA_TABLE_SIMPLEDATA) + .withMinNumberOfSplits(50) + .withCoder(SerializableCoder.of(SimpleData.class)) + .withEntity(SimpleData.class)); + PCollection countPCollection = simpledataPCollection.apply("counting", Count.globally()); + PAssert.that(countPCollection) + .satisfies( + i -> { + long total = 0; + for (Long aLong : i) { + total = total + aLong; + } + assertEquals(100, total); + return null; + }); + pipeline.run(); + } + @Test public void testRead() throws Exception { PCollection output = @@ -654,6 +697,18 @@ public int hashCode() { } } + @Table(name = CASSANDRA_TABLE_SIMPLEDATA, keyspace = CASSANDRA_KEYSPACE) + static class SimpleData implements Serializable { + @PartitionKey int id; + + @Column String data; + + @Override + public String toString() { + return id + ", " + data; + } + } + private static RingRange fromEncodedKey(Metadata metadata, ByteBuffer... bb) { BigInteger bi = BigInteger.valueOf((long) metadata.newToken(bb).getValue()); return RingRange.of(bi, bi.add(BigInteger.valueOf(1L)));