Skip to content

Conversation

@youngoli
Copy link
Contributor

This is a followup to a previous PR (#16908) which added the feature but didn't switch existing tests to using it. This switches existing tests to using the feature (along with making sure it's set up properly in a way that works).


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

…rvices

This is a followup to a previous PR (apache#16908) which added the feature but didn't switch existing tests to using it. This switches existing tests to using the feature (along with making sure it's set up properly in a way that works).
@youngoli
Copy link
Contributor Author

R: @lostluck @riteshghorse

@youngoli
Copy link
Contributor Author

I'd like to note for future reference that the cross-language postcommits for Python Direct, Flink, and Spark all seem to currently be failing, so while running them I'm not going to be checking that they succeed, but that the failures don't change, and we can be reasonably sure that this PR didn't change anything. Since this only affects the way expansion services are initialized for Go, any errors should probably be visible over existing errors.

@youngoli
Copy link
Contributor Author

Run XVR_Direct PostCommit

@youngoli
Copy link
Contributor Author

Run XVR_Flink PostCommit

@youngoli
Copy link
Contributor Author

Run XVR_Spark PostCommit

@codecov
Copy link

codecov bot commented Mar 23, 2022

Codecov Report

Merging #17161 (b5907f6) into master (555cc26) will decrease coverage by 0.00%.
The diff coverage is n/a.

@@            Coverage Diff             @@
##           master   #17161      +/-   ##
==========================================
- Coverage   73.96%   73.95%   -0.01%     
==========================================
  Files         671      671              
  Lines       88245    88245              
==========================================
- Hits        65270    65264       -6     
- Misses      21863    21869       +6     
  Partials     1112     1112              
Flag Coverage Δ
python 83.63% <ø> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
.../python/apache_beam/testing/test_stream_service.py 88.37% <0.00%> (-4.66%) ⬇️
...che_beam/runners/interactive/interactive_runner.py 92.14% <0.00%> (-1.43%) ⬇️
sdks/python/apache_beam/io/source_test_utils.py 88.01% <0.00%> (-1.39%) ⬇️
...ks/python/apache_beam/runners/worker/sdk_worker.py 88.90% <0.00%> (-0.16%) ⬇️
...hon/apache_beam/runners/worker/bundle_processor.py 93.51% <0.00%> (-0.13%) ⬇️
sdks/python/apache_beam/io/gcp/bigquery.py 63.63% <0.00%> (-0.08%) ⬇️
...s/interactive/dataproc/dataproc_cluster_manager.py 88.32% <0.00%> (ø)
...runners/interactive/display/pcoll_visualization.py 86.36% <0.00%> (+0.50%) ⬆️
...pache_beam/runners/interactive/interactive_beam.py 81.39% <0.00%> (+0.70%) ⬆️
sdks/python/apache_beam/utils/interactive_utils.py 95.12% <0.00%> (+2.43%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 555cc26...b5907f6. Read the comment docs.

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@lostluck
Copy link
Contributor

Flink failed with this, rerunning.


2022/03/23 03:22:10  (): org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
	at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
	at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
	at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
	at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
	at akka.dispatch.OnComplete.internal(Future.scala:264)
	at akka.dispatch.OnComplete.internal(Future.scala:261)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
	at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
	at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
	at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
	at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
	at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
	at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
	at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
	at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
	at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
	at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
	at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
	at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
	at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
	at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
	at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
	at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
	at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
	at sun.reflect.GeneratedMethodAccessor23.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
	at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
	at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
	at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
	at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
	at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
	at akka.actor.ActorCell.invoke(ActorCell.scala:561)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
	at akka.dispatch.Mailbox.run(Mailbox.scala:225)
	at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
	... 4 more
Caused by: java.io.IOException: Insufficient number of network buffers: required 17, but only 14 available. The total number of network buffers is currently set to 2048 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
	at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:372)
	at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:350)
	at org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory$0(ResultPartitionFactory.java:280)
	at org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:150)
	at org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.setup(BufferWritingResultPartition.java:95)
	at org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:946)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:655)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
	at java.lang.Thread.run(Thread.java:748)
2022/03/23 03:22:10  (): java.io.IOException: Insufficient number of network buffers: required 17, but only 14 available. The total number of network buffers is currently set to 2048 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
2022/03/23 03:22:11 Job state: FAILED
    ptest.go:102: Failed to execute job: job go0testxlang0multi-jenkins-0323032139-18f8f1d8_496baae7-955f-404f-af5f-4b9df0770d08 failed
--- FAIL: TestXLang_Multi (34.88s)

@lostluck
Copy link
Contributor

Run XVR_Flink PostCommit

Copy link
Contributor

@lostluck lostluck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming the flink failure is a flake of some kind, this LGTM. Thanks! I wish we had a better/cleaner mechanism to put a prefix like that for testing, but I can't think of anything preferable to the current no-magic approach.

@lostluck
Copy link
Contributor

Run Flink CrossLanguageValidatesRunner Tests

@lostluck
Copy link
Contributor

Run XVR_Flink PostCommit

@youngoli
Copy link
Contributor Author

I'm inclined to think this is a flake for the Flink XVR tests, since this change shouldn't be doing anything to the specific of Flink connections. And looking at the Non-PR version, looks like it's failing with the same error (ex. https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/5215/)

Java Precommit I already know is being flaky due to timeouts and this change isn't modifying Java at all. CommunityMetrics Precommit similarly is currently not working for unrelated reasons.

@youngoli
Copy link
Contributor Author

That said, before I merge this I'm going to double check it doesn't conflict with #16819

@lostluck
Copy link
Contributor

lostluck commented Apr 6, 2022

Run Flink CrossLanguageValidatesRunner Tests

@lostluck
Copy link
Contributor

lostluck commented Apr 6, 2022

Run XVR_Flink PostCommit

@lostluck
Copy link
Contributor

lostluck commented Apr 6, 2022

Run CommunityMetrics PreCommit

@lostluck
Copy link
Contributor

lostluck commented Apr 7, 2022

Seems like a flake.

=== RUN   TestKafkaIO_BasicReadWrite
--- FAIL: TestKafkaIO_BasicReadWrite (0.01s)
panic: 	tried cross-language for beam:transform:org.apache.beam:kafka_write:v1 against localhost:45851 and failed
	expanding external transform
	expanding transform with ExpansionRequest: components:{pcollections:{key:"n4" value:{unique_name:"n4" coder_id:"c1@GihEXlkPLs" is_bounded:BOUNDED windowing_strategy_id:"w0@GihEXlkPLs"}} windowing_strategies:{key:"w0@GihEXlkPLs" value:{window_fn:{urn:"beam:window_fn:global_windows:v1"} merge_status:NON_MERGING window_coder_id:"c2@GihEXlkPLs" trigger:{default:{}} accumulation_mode:DISCARDING output_time:END_OF_WINDOW closing_behavior:EMIT_IF_NONEMPTY OnTimeBehavior:FIRE_IF_NONEMPTY environment_id:"go"}} coders:{key:"c0@GihEXlkPLs" value:{spec:{urn:"beam:coder:bytes:v1"}}} coders:{key:"c1@GihEXlkPLs" value:{spec:{urn:"beam:coder:kv:v1"} component_coder_ids:"c0@GihEXlkPLs" component_coder_ids:"c0@GihEXlkPLs"}} coders:{key:"c2@GihEXlkPLs" value:{spec:{urn:"beam:coder:global_window:v1"}}} environments:{key:"go" value:{}}} transform:{unique_name:"External" spec:{urn:"beam:transform:org.apache.beam:kafka_write:v1" payload:"\n}\n\x1c\n\x0eProducerConfig\x1a\n*\x08\n\x02\x10\x07\x12\x02\x10\x07\n\x0b\n\x05Topic\x1a\x02\x10\x07\n\x13\n\rKeySerializer\x1a\x02\x10\x07\n\x15\n\x0fValueSerializer\x1a\x02\x10\x07\x12$d97fbd8d-dde2-46d8-babd-f5372b6ab5e6\x12\xda\x01\x04\x00\x00\x00\x00\x01\x11bootstrap.servers\x0flocalhost:38215=xlang_kafkaio_basic_test_1f77995b-3f76-4225-a30a-37f34017bee79org.apache.kafka.common.serialization.ByteArraySerializer9org.apache.kafka.common.serialization.ByteArraySerializer"} inputs:{key:"i0" value:"n4"} environment_id:"go"} namespace:"GihEXlkPLs"
expansion failed
	caused by:
rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing dial tcp 127.0.0.1:45851: connect: connection refused" [recovered]
	panic: 	tried cross-language for beam:transform:org.apache.beam:kafka_write:v1 against localhost:45851 and failed
	expanding external transform
	expanding transform with ExpansionRequest: components:{pcollections:{key:"n4" value:{unique_name:"n4" coder_id:"c1@GihEXlkPLs" is_bounded:BOUNDED windowing_strategy_id:"w0@GihEXlkPLs"}} windowing_strategies:{key:"w0@GihEXlkPLs" value:{window_fn:{urn:"beam:window_fn:global_windows:v1"} merge_status:NON_MERGING window_coder_id:"c2@GihEXlkPLs" trigger:{default:{}} accumulation_mode:DISCARDING output_time:END_OF_WINDOW closing_behavior:EMIT_IF_NONEMPTY OnTimeBehavior:FIRE_IF_NONEMPTY environment_id:"go"}} coders:{key:"c0@GihEXlkPLs" value:{spec:{urn:"beam:coder:bytes:v1"}}} coders:{key:"c1@GihEXlkPLs" value:{spec:{urn:"beam:coder:kv:v1"} component_coder_ids:"c0@GihEXlkPLs" component_coder_ids:"c0@GihEXlkPLs"}} coders:{key:"c2@GihEXlkPLs" value:{spec:{urn:"beam:coder:global_window:v1"}}} environments:{key:"go" value:{}}} transform:{unique_name:"External" spec:{urn:"beam:transform:org.apache.beam:kafka_write:v1" payload:"\n}\n\x1c\n\x0eProducerConfig\x1a\n*\x08\n\x02\x10\x07\x12\x02\x10\x07\n\x0b\n\x05Topic\x1a\x02\x10\x07\n\x13\n\rKeySerializer\x1a\x02\x10\x07\n\x15\n\x0fValueSerializer\x1a\x02\x10\x07\x12$d97fbd8d-dde2-46d8-babd-f5372b6ab5e6\x12\xda\x01\x04\x00\x00\x00\x00\x01\x11bootstrap.servers\x0flocalhost:38215=xlang_kafkaio_basic_test_1f77995b-3f76-4225-a30a-37f34017bee79org.apache.kafka.common.serialization.ByteArraySerializer9org.apache.kafka.common.serialization.ByteArraySerializer"} inputs:{key:"i0" value:"n4"} environment_id:"go"} namespace:"GihEXlkPLs"
expansion failed
	caused by:
rpc error: code = Unavailable desc = connection error: desc = "transport: Error while dialing dial tcp 127.0.0.1:45851: connect: connection refused"

@lostluck
Copy link
Contributor

lostluck commented Apr 7, 2022

Run XVR_Flink PostCommit

@lostluck
Copy link
Contributor

lostluck commented Apr 7, 2022

Up, just an unrelated flake. Merging.

@lostluck lostluck merged commit f42d437 into apache:master Apr 7, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants