Skip to content

Conversation

@milesgranger
Copy link
Contributor

No description provided.

@milesgranger milesgranger force-pushed the milesgranger/pyspark-tpch-benchmarks branch 3 times, most recently from 1a09820 to be20e23 Compare September 28, 2023 11:37
@mrocklin
Copy link
Member

Ha! This is fun. Is this working?

@milesgranger
Copy link
Contributor Author

Ha! This is fun. Is this working?

Works in a notebook, adapted from Florian's. Fails now for different, unrelated reasons.

@milesgranger milesgranger force-pushed the milesgranger/pyspark-tpch-benchmarks branch from be20e23 to e9a0b10 Compare September 29, 2023 12:06
@milesgranger milesgranger changed the title [WIP] Add pyspark tpch benchmarks Add pyspark tpch benchmarks Sep 29, 2023
@milesgranger milesgranger marked this pull request as ready for review September 29, 2023 12:06
@milesgranger milesgranger force-pushed the milesgranger/pyspark-tpch-benchmarks branch from e9a0b10 to 083756e Compare September 29, 2023 13:02
@mrocklin
Copy link
Member

It looks like this might work now? If so, I'll be curious to see what performance is like. (although I appreciate that that might take more work than what's here so far)

@milesgranger
Copy link
Contributor Author

milesgranger commented Sep 29, 2023

Does indeed work; planning to do preliminary comparison to #971 on Monday. I'm sure some things will need to be changed/adjusted. :)

@fjetter
Copy link
Contributor

fjetter commented Sep 29, 2023

I believe this was the cluster https://cloud.coiled.io/clusters/281807/information?account=dask-benchmarks&tab=Code

The hardware metrics all look a little disappointing. Nothing is utilized properly. I guess the 100GB dataset is just to small to see any interesting activity. Well, memory seems to increase pretty consistently but I assume this is JAVA VM foo.

@ntabris
Copy link
Member

ntabris commented Sep 29, 2023

Well, memory seems to increase pretty consistently but I assume this is JAVA VM foo

At my old job, there were QA folks using some automated tools and I think I literally had to tell them at least 20 times that this pattern from the JVM didn't mean there was a memory leak. (We had JVM configured to not dealloc to OS, which I think is pretty typical config.)

@mrocklin
Copy link
Member

I'm guessing that the troughs between the peaks are some setup/teardown code. Is that correct? Maybe this is because the WorkerPlugin is running repeatedly and downloading stuff repeatedly. Maybe we can make this a bit more smooth by avoiding repeats there? There's probably some other things we could be sensitive to there.

@mrocklin
Copy link
Member

Also, every time I've tried to use Spark I've missed some critical configuration parameter. We should run this by a Spark person. @fjetter do you have that covered or should I go hunting?

@milesgranger milesgranger force-pushed the milesgranger/pyspark-tpch-benchmarks branch from 5000543 to 45cfc40 Compare September 30, 2023 04:43
@milesgranger
Copy link
Contributor Author

milesgranger commented Sep 30, 2023

There were some setup/teardown calls being made between the queries; moved to the cluster setup and then noticed it was running too fast, which ended up being we needed to materialize the dataframe. This cluster is more representative now: https://cloud.coiled.io/clusters/282553/information?account=dask-engineering&tab=Metrics


Edit: and I agree, there is surely some Spark configurations we ought to investigate and trial.

@milesgranger milesgranger force-pushed the milesgranger/pyspark-tpch-benchmarks branch 2 times, most recently from fdc5aec to 623bf40 Compare September 30, 2023 05:43
@milesgranger
Copy link
Contributor Author

milesgranger commented Sep 30, 2023

As a side note, occasionally I'm getting the following error with the queries, specifically when materializing the Spark DF:

java.nio.file.AccessDeniedException: s3a://coiled-runtime-ci/tpch_scale_100/part/part.0.parquet: getFileStatus on s3a://coiled-runtime-ci/tpch_scale_100/part/part.0.parquet: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: RWF629STDDBDBT3Q

Full error Exception: ('Long error message', , 'An error occurred while calling o36.showString.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 49 in stage 1.0 failed 4 times, most recent failure: Lost task 49.3 in stage 1.0 (TID 63) (10.0.29.67 executor 6): java.nio.file.AccessDeniedException: s3a://coiled-runtime-ci/tpch_scale_100/lineitem/part.765.parquet: getFileStatus on s3a://coiled-runtime-ci/tpch_scale_100/lineitem/part.765.parquet: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: JKG9XPAYKWF912MY; S3 Extended Request ID: vuyILTZotzrJpWUFiTd6I5GPIt4guzoNkjQ2qfxOEbJoKLVZlKWluRDd457/sPLvSqNYqNnzUXH8Mll4NR1qyg==; Proxy: null), S3 Extended Request ID: vuyILTZotzrJpWUFiTd6I5GPIt4guzoNkjQ2qfxOEbJoKLVZlKWluRDd457/sPLvSqNYqNnzUXH8Mll4NR1qyg==:403 Forbidden\n\tat org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255)\n\tat org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3796)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)\n\tat org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)\n\tat org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)\n\tat org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:39)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:39)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.footerFileMetaData$lzycompute$1(ParquetFileFormat.scala:211)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.footerFileMetaData$1(ParquetFileFormat.scala:210)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:213)\n\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:231)\n\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:293)\n\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)\n\tat org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)\n\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)\n\tat org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)\n\tat org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:139)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)\n\tat org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1623)\nCaused by: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: JKG9XPAYKWF912MY; S3 Extended Request ID: vuyILTZotzrJpWUFiTd6I5GPIt4guzoNkjQ2qfxOEbJoKLVZlKWluRDd457/sPLvSqNYqNnzUXH8Mll4NR1qyg==; Proxy: null), S3 Extended Request ID: vuyILTZotzrJpWUFiTd6I5GPIt4guzoNkjQ2qfxOEbJoKLVZlKWluRDd457/sPLvSqNYqNnzUXH8Mll4NR1qyg==\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1879)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1418)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1387)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1157)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:814)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:781)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:755)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:715)\n\tat com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:697)\n\tat com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:561)\n\tat com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:541)\n\tat com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5456)\n\tat com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5403)\n\tat com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1372)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$10(S3AFileSystem.java:2545)\n\tat org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:414)\n\tat org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:377)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2533)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2513)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3776)\n\t... 34 more\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)\n\tat scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)\n\tat scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)\n\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)\n\tat scala.Option.foreach(Option.scala:407)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)\nCaused by: java.nio.file.AccessDeniedException: s3a://coiled-runtime-ci/tpch_scale_100/lineitem/part.765.parquet: getFileStatus on s3a://coiled-runtime-ci/tpch_scale_100/lineitem/part.765.parquet: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: JKG9XPAYKWF912MY; S3 Extended Request ID: vuyILTZotzrJpWUFiTd6I5GPIt4guzoNkjQ2qfxOEbJoKLVZlKWluRDd457/sPLvSqNYqNnzUXH8Mll4NR1qyg==; Proxy: null), S3 Extended Request ID: vuyILTZotzrJpWUFiTd6I5GPIt4guzoNkjQ2qfxOEbJoKLVZlKWluRDd457/sPLvSqNYqNnzUXH8Mll4NR1qyg==:403 Forbidden\n\tat org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255)\n\tat org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3796)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3556)\n\tat org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)\n\tat org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)\n\tat org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3554)\n\tat org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:39)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFooterReader.readFooter(ParquetFooterReader.java:39)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.footerFileMetaData$lzycompute$1(ParquetFileFormat.scala:211)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.footerFileMetaData$1(ParquetFileFormat.scala:210)\n\tat org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.')

/opt/coiled/env/lib/python3.10/site-packages/py4j/protocol.py:326: Exception

In this cluster for example: https://cloud.coiled.io/clusters/282430/information?account=dask-engineering&tab=Logs&filterPattern=&computation=&sinceMs=1696055246817&untilMs=1696055251817

@milesgranger milesgranger force-pushed the milesgranger/pyspark-tpch-benchmarks branch from 623bf40 to 11539b6 Compare September 30, 2023 11:07
@fjetter
Copy link
Contributor

fjetter commented Oct 2, 2023

At my old job, there were QA folks using some automated tools and I think I literally had to tell them at least 20 times that this pattern from the JVM didn't mean there was a memory leak. (We had JVM configured to not dealloc to OS, which I think is pretty typical config.)

Thanks for confirming. That's what I thought.

Also, every time I've tried to use Spark I've missed some critical configuration parameter. We should run this by a Spark person. @fjetter do you have that covered or should I go hunting?

Yeah, wouldn't be surprised if that was the case. We're only using default.

do you have that covered or should I go hunting?

I don't have a lot of spark contacts. If you can find somebody, that'll be helpful. I'll poke Powers again but that's it on my end, unfortunately.

@mrocklin
Copy link
Member

mrocklin commented Oct 2, 2023 via email

@milesgranger
Copy link
Contributor Author

closing in favor of work done at #1044

@milesgranger milesgranger deleted the milesgranger/pyspark-tpch-benchmarks branch October 5, 2023 11:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants