From e857a59726f5f39268744a3c13be59c7a0792984 Mon Sep 17 00:00:00 2001 From: Vardhan Thigle Date: Thu, 6 Mar 2025 05:44:27 +0000 Subject: [PATCH] Rethrowing Exception from CassandraIO's ReadFn --- CHANGES.md | 1 + .../main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java | 7 ++++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index b47ffc3da8a9..b7fc5e851603 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -80,6 +80,7 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* (Java) Fixed cassandraIO ReadAll does not let a pipeline handle or retry exceptions ([#34191](https://github.com/apache/beam/pull/34191)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java index 44e3808f53f8..678c72d42ff2 100644 --- a/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java +++ b/sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/ReadFn.java @@ -42,7 +42,7 @@ class ReadFn extends DoFn, T> { private static final Logger LOG = LoggerFactory.getLogger(ReadFn.class); @ProcessElement - public void processElement(@Element Read read, OutputReceiver receiver) { + public void processElement(@Element Read read, OutputReceiver receiver) throws Exception { try { Session session = ConnectionManager.getSession(read); Mapper mapper = read.mapperFactoryFn().apply(session); @@ -89,6 +89,7 @@ public void processElement(@Element Read read, OutputReceiver receiver) { } } catch (Exception ex) { LOG.error("error", ex); + throw ex; } } @@ -107,7 +108,7 @@ private static String getHighestSplitQuery( String finalHighQuery = (spec.query() == null) ? buildInitialQuery(spec, true) + highestClause - : spec.query() + getJoinerClause(spec.query().get()) + highestClause; + : spec.query().get() + getJoinerClause(spec.query().get()) + highestClause; LOG.debug("CassandraIO generated a wrapAround query : {}", finalHighQuery); return finalHighQuery; } @@ -117,7 +118,7 @@ private static String getLowestSplitQuery(Read spec, String partitionKey, Big String finalLowQuery = (spec.query() == null) ? buildInitialQuery(spec, true) + lowestClause - : spec.query() + getJoinerClause(spec.query().get()) + lowestClause; + : spec.query().get() + getJoinerClause(spec.query().get()) + lowestClause; LOG.debug("CassandraIO generated a wrapAround query : {}", finalLowQuery); return finalLowQuery; }