Skip to content

Conversation

@reuvenlax
Copy link
Contributor

No description provided.

@reuvenlax reuvenlax changed the title Adding MapState support for Datalow. [BEAM-1474]. Adding MapState support for Datalow. Jan 23, 2021
@reuvenlax
Copy link
Contributor Author

R: @steveniemitz
R: @dpmills

@reuvenlax reuvenlax changed the title [BEAM-1474]. Adding MapState support for Datalow. [BEAM-1474]. Adding MapState and SetState support for the Dataflow runner Jan 23, 2021
@reuvenlax
Copy link
Contributor Author

run dataflow validatesrunner

@steveniemitz
Copy link
Contributor

I tried using SetState, but get this:

java.lang.NullPointerException
	at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillMap.getFuture(WindmillStateInternals.java:1533)
	at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillMap.access$2000(WindmillStateInternals.java:1175)
	at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillMap$4.readLater(WindmillStateInternals.java:1425)
	at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillMap$2.readLater(WindmillStateInternals.java:1339)
	at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillSet.readLater(WindmillStateInternals.java:1165)

It looks like WindmillSet is missing an override on initializeForWorkItem to initialize the underlying windmillMap.

@steveniemitz
Copy link
Contributor

steveniemitz commented Jan 23, 2021

Fixed up the SetState problem, now getting this:

java.lang.IllegalStateException: Didn't receive responses for all pending fetches. Missing: [StateTag{kind=VALUE_PREFIX, tag=<ByteString@2ce72ed1 size=22 contents="/gAABdzFC1wCAurcD/+ssk">, stateFamily=S15, requestPosition=null, sortedListRange=null}]
	at org.apache.beam.runners.dataflow.worker.WindmillStateReader.consumeResponse(WindmillStateReader.java:671)
	at org.apache.beam.runners.dataflow.worker.WindmillStateReader.startBatchAndBlock(WindmillStateReader.java:494)
	at org.apache.beam.runners.dataflow.worker.WindmillStateReader$WrappedFuture.get(WindmillStateReader.java:372)
	at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Futures$2.get(Futures.java:542)
	at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.fetchData(WindmillStateInternals.java:1597)
	at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillBag.read(WindmillStateInternals.java:1630)
	at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.getAccum(WindmillStateInternals.java:2046)
	at org.apache.beam.runners.dataflow.worker.WindmillStateInternals$WindmillCombiningState.read(WindmillStateInternals.java:2011)

edit: I should mention I'm on appliance, using the windmill container dataflow.gcr.io/v1beta3/windmill:20210106-04-rc03

@reuvenlax
Copy link
Contributor Author

run dataflow validatesrunner

@reuvenlax
Copy link
Contributor Author

run dataflow validatesrunner

@reuvenlax
Copy link
Contributor Author

Run Dataflow ValidatesRunner

}
cleared = false;

for (K key : localAdditions) {
Copy link
Contributor

@steveniemitz steveniemitz Jan 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we avoid committing keys added in localAdditions if they already exist in the cached Map (and the values are equal)? Particularly for SetState, this could significantly reduce the commit volume.

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Jan 25, 2021 via email

@steveniemitz
Copy link
Contributor

Maybe. Requires a bit of thought, because the naive approach would force us to serialize the key and value on every write to cheeck for equality, and that could add quite a bit of CPU and memory cost.

Maybe we could special case for just primitive values? That'd get you the optimization for SetState, which I'd guess is going to be like 99% of the cases where you'd be adding the same value for a key repeatedly to the map. We're already serializing the key/value each commit anyways, so there wouldn't be any overhead there.

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Jan 25, 2021 via email

@steveniemitz
Copy link
Contributor

awesome, I think that'll be a big win.

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Jan 25, 2021 via email

@reuvenlax
Copy link
Contributor Author

run dataflow validatesrunner

@steveniemitz
Copy link
Contributor

Keep in mind that right now I don't think the cache will last between work items. To enable that, we'll need to track the byte size off every element so that we can efficiently track the weight of the map for the cache.

ah ok, that'll be really important to have also, but happy to get it working first. :D

* <p>When {@code read()} is called, a particular state implementation is encouraged to perform
* all pending reads in a single batch.
*/
ReadableState<V> getOrDefault(K key, @Nullable V defaultValue);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wdyt about replacing this with either computeIfAbsent or getOrDefault(K key, Supplier<V> defaultValue)?

Either would allow users to avoid allocating throw-away defaultValues for every getOrDefault operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding computeIfAbsent is a good idea - just added it.

while computeIfAbsent is a generalization off putIfAbsent, it's not quite a generalization of getOrDefault, since getOrDefault doesn't modify the map.

def commonRunnerV2ExcludeCategories = [
'org.apache.beam.sdk.testing.UsesCommittedMetrics',
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
'org.apache.beam.sdk.testing.UsesSetState',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are MapState and SetState also supported in dataflow runner v2(unified worker) within this changes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Jan 28, 2021 via email

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Jan 28, 2021 via email

@reuvenlax
Copy link
Contributor Author

Friendly ping - any more comments?

@reuvenlax
Copy link
Contributor Author

@steveniemitz any more comments on this PR?

@steveniemitz
Copy link
Contributor

nothing from my end. I think we're in a good spot from the java side at least. Should the DataflowRunner reject jobs using Map/SetState though if streaming engine is enabled?

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

@steveniemitz does this look good to you know?

@steveniemitz
Copy link
Contributor

👍 lgtm

@reuvenlax
Copy link
Contributor Author

run dataflow validatesrunner

@reuvenlax reuvenlax merged commit 6308ef9 into apache:master Feb 26, 2021
@kennknowles
Copy link
Member

Ah, we have a runner v2 example integration test suite that picked this up and failed. I'll add it to the excluded tests list.

@kennknowles
Copy link
Member

Ah, it seems also that the streaming/batch auto test suite passed, but likely the tests only have bounded PCollections, or are excluded for other reasons. The tests fail in the streaming ValidatesRunner suite: https://ci-beam.apache.org/view/PostCommit/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming/552/console

kennknowles added a commit to kennknowles/beam that referenced this pull request Mar 4, 2021
…gger

* github/master: (123 commits)
  [BEAM-11899] Bump commons-pool to 2.8.1 and bump commons-dbcp to 2.8.0, Because there is a library dependency
  Update pillars.yaml (apache#14142)
  [BEAM-10632] Checkerframework nullness cleanups (apache#14107)
  [BEAM-11213] Instantiate SparkListenerApplicationStart in a Spark 3 compatible way
  Fix typos for excluding testMergingCustomWindowsWithoutCustomWindowTypes
  Specify the time resolution for TestStreamPayload.
  [BEAM-10961] Enable strict depdency checking for sdks/java/extensions/euphoria (second attempt)
  [BEAM-11848] Store Docker images in a variable for consistency.
  Splitting old Go Precommit and new ULR integration test precommit.
  Moving runner imports out of ptest.
  Add the TO_STRING capability to Java and Python
  [BEAM-11848] Fix Docker images list.
  jdbc python supported Dataflow runner (apache#13960)
  Adding a warning to use multi-workers on FnApiRunner
  Fix legend for Python Directrunner microbenchmarks
  [BEAM-11740] Estimate PCollection byte size
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/zetasketch (apache#14093)
  Map Dataflow JOB_STATE_CANCELLING to Beam RUNNING state
  [BEAM-11833] Fix reported watermark after restriction split in UnboundedSourceAsSDFRestrictionTracker
  [BEAM-10761] add reference to BEAM-11761
  [BEAM-10961] enable strict dependency checking for flink/job-server
  Exclude MapState example integration tests from Dataflow runner v2 suite
  Remove InvalidWindows from Java SDK, instead track "already merged" bit
  Fix checkstyle in watermark latency benchmark
  Fix compile breakage in WindmillStateInternals
  Improve test, error on ALREADY_MERGED.
  [BEAM-10961] Strict dependency checking for sdks/java/io/gcp (apache#13791)
  Initial watermark latency benchmark
  Attempting improvements on DirectRunner Python dash
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/google-cloud-platform-core (apache#14084)
  Merge pull request apache#13802: [BEAM-1474]. Adding MapState and SetState support for the Dataflow runner
  Remove some false positives
  Remove nullness warning suppression
  [BEAM-11861] Add methods to explicitly provide coder for ParquetIO's Parse and ParseFiles (apache#14078)
  [BEAM-11531] Use pandas 1.2 for python>=3.7 (apache#14099)
  [BEAM-10961] add reference to BEAM-11761
  [BEAM-10961] add explicit compile for auto_value_annotations in sdks/extensions/ml/build.gradle
  Attempting improvements on DirectRunner Python dash
  Recognize JOB_STATE_PENDING from Dataflow and map to RUNNING
  never run checkerframework on tests
  Puts more expensive BQ empty table check to the right of the 'and' condition (apache#14094)
  Use the windowing strategy of the input, not output, PCollection of GBK.
  Do not stage dataflow worker jar when use runner_v2.
  [BEAM-11870] Re-raise underlying exception for InvocationTargetException (apache#14098)
  [BEAM-11778] Create a wrapper for ZetaSQL catalog and refactor accordingly. (apache#13934)
  [BEAM-9378] Add ignored tests which fail in various ways when querying nested structures (apache#14077)
  Merge Fn API and runner v2 configurations for DataflowRunner
  Fix up! formatting
  Add validate runner test for testing custom merging windows fn without custom window types
  Revert "Revert "[BEAM-2914] Add portable merging window support to Python. (apache#12995)""
  [BEAM-10961] fix stray reordering of lines
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/sorter
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/sketching
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/schemaio-expansion-service
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/protobuf
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/ml
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/kyro
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/join-library
  [BEAM-10961] undo line moves (originally intended for alphabeticization)
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/jackson
  [BEAM-10961] Enable strict dependency checking on sdks/java/extensions/sql (apache#13830)
  [BEAM-10961] enable strict dependency checking for sdks/java/extensions/euphoria
  [BEAM-10961] enable strict dependency checking for sdks/java/io/parquet (apache#14062)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/thrift (apache#14066)
  Refactor ZetaSqlDialectSpecTest and add some passing tests. (apache#14080)
  [BEAM-11864] Use objects.equals instead of raw comparison
  [BEAM-11707] Change WindmillStateCache cache invalidation to be based upon reference invalidation instead of expensive set management. Reduce operations of shared cache by caching per-key object sets locally and flushing as groups to shared cache. Remove byte tracking which could be racy based upon background evictions in favor of just iterating for rendering the status page. This also lets us capture more stats.
  [BEAM-11730] Reduce context switching overhead for appliance reads by issuing reads directly from calling threads in the case that there is no reads being queued.
  Fix preview
  Show string from Dataflow service when job terminates in unrecognized state
  Log a warning when Dataflow returns an unrecognized state
  Merge pull request apache#14033 from [BEAM-11408] Integrate Python BigQuery sink with GroupIntoBatches
  Remove SYNCHRONIZED_PROCESSING_TIME from model proto
  Remove use of model SYNCHRONIZED_PROCESSING_TIME
  Merge redundant model feature columns in capability matrix
  Remove MapReduce runner from capability matrix, because it is on a branch and unreleased
  Remove JStorm runner from capability matrix, because it is on a branch and unreleased
  Remove retractions from capability matrix, because they do not exist yet
  Remove metadata-driven triggers from capability matrix, because they do not exist
  [BEAM-10937] Add Tour of Beam page (apache#13747)
  [BEAM-11344] Apply "Become a Committer" changes from Website Revamp (apache#14036)
  Merge pull request apache#14046 from [BEAM-11791] Adding a microbenchmark for TestStream
  Returning successful writes in FhirIO.Write.Result (apache#14034)
  Fixup
  [BEAM-10961] enable strict dependency checking for sdks/java/io/file-based-io-tests (apache#14052)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/contextualtextio (apache#14049)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/kinesis (apache#14058)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/bigquery-io-perf-tests (apache#14048)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/elasticsearch (apache#14050)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/expansion-service (apache#14051)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/jdbc (apache#14055)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/jms (apache#14056)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/kafka (apache#14057)
  [BEAM-10961] enable strict dependency checking for sdks/java/io/hcatalog (apache#14053)
  [BEAM-11859] Fixed bug in python S3 IO
  [BEAM-10114] Fix PerSubscriptionPartitionSdf to not rely on the presence of BundleFinalizer
  [BEAM-10114] Fix PerSubscriptionPartitionSdf to not rely on the presence of BundleFinalizer
  [BEAM-10961] fix spacing
  [BEAM-10961] enable strict dependency checking for sdks/java/io/xml
  [BEAM-10961] enable strict dependency checking for sdks/java/io/tika
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants