Skip to content

Conversation

@sunchao
Copy link
Member

@sunchao sunchao commented Oct 12, 2020

What changes were proposed in this pull request?

This PR removes the restriction in HadoopFSUtils.parallelListLeafFiles which only calls listLocatedStatus when the FileSystem is either DistributedFileSystem or ViewFileSystem. Instead, this checks locality flag and calls listLocatedStatus when that is set.

Consequently, this also removes the special handling when locality info is required but the file system impl is neither above. Currently in this case we'd call getFileBlockLocations on each FileStatus obtained, and do some handling on missing files as well.

Why are the changes needed?

HadoopFsUtils.parallelListLeafFiles currently only calls listLocatedStatus when the FileSystem impl is DistributedFileSystem or ViewFileSystem. For other types of FileSystem, it calls listStatus and then subsequently calls getFileBlockLocations on all the result FileStatuses.

In Hadoop client, listLocatedStatus is a well-defined API and in fact it is often overridden by specific file system implementations, such as S3A. The default listLocatedStatus also has similar behavior as it's done in Spark.

Therefore, instead of re-implement the logic in Spark itself, it's better to rely on the FileSystem-specific implementation for listLocatedStatus, which could include its own optimizations in the code path.

Does this PR introduce any user-facing change?

There could be some behavior change when spark.sql.files.ignoreMissingFiles is set and there are race conditions during listing.

For instance, assume locality info is required and some files in the listing result were deleted right after the listing op, but before the subsequent getFileBlockLocations calls . in the previous code. The previous implementation would return partial listing result, but the current impl will return an empty set, since the default FileSystem.listLocatedStatus will call getFileBlockLocations internally which will cause it fail as a whole.

On the other hand, it's also possible that new implementation returns a full list rather than a partial list in the previous impl, since we don't call getFileBlockLocations as a separate step now which eliminates some possibility that a file was deleted before the getFileBlockLocations as in the previous impl.

How was this patch tested?

Relying on existing tests.

@SparkQA
Copy link

SparkQA commented Oct 12, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34313/

@SparkQA
Copy link

SparkQA commented Oct 12, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34313/

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds like a new JIRA issue. Please file a new one, @sunchao .

This PR removes the restriction in HadoopFSUtils.parallelListLeafFiles which only calls listLocatedStatus when the FileSystem is either DistributedFileSystem or ViewFileSystem. Instead

@sunchao
Copy link
Member Author

sunchao commented Oct 13, 2020

Will do @dongjoon-hyun . Should I open a new PR or change title for this one?

@SparkQA
Copy link

SparkQA commented Oct 13, 2020

Test build #129707 has finished for PR 30019 at commit 651f6a6.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sunchao sunchao changed the title [SPARK-32381][CORE][SQL][FOLLOWUP] Reply on listLocatedStatus from FileSystem implementations [SPARK-33135][CORE] Use listLocatedStatus from FileSystem implementations Oct 13, 2020
@sunchao
Copy link
Member Author

sunchao commented Oct 13, 2020

@dongjoon-hyun created SPARK-33135 and changed this PR's title.

@sunchao
Copy link
Member Author

sunchao commented Oct 13, 2020

I still need to look into failing tests.

@xkrogen
Copy link
Contributor

xkrogen commented Oct 13, 2020

+1 on this effort from me @sunchao ! Checking FS instance types is brittle and has caused us lots of headaches when we leverage wrapper-type FS instances. Allowing the FileSystem instance to understand its own capabilities and delegate appropriately is much better.

Copy link
Contributor

@steveloughran steveloughran left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's great that you're doing this -ties in nicely with recent work on S3A and being discussed for ABFS

If you look @ what is slow against the stores for read-only operations it is

-LIST
-HEAD
-GET
-renegotiating GET requests after aborting an HTTPS request on seek() calls.

On my latest IOStatistics test runs, I'm seeing S3A HEAD slower than GET, interestingly, though that's in test runs where a HEAD is invariably the first HTTPS request of a store on an FS instantiation -it will include all the set up costs.

What to do in spark.

HEAD: don't call exists/getFileStatus/etc if you know a file is there. It's also better to do a try { open() } catch (f: FileNotFoundException) than it is for a probe + open, as youl will save a HEAD.

The new openFile() API will let the caller specify seek policy (sequential, random, adaptive,...) and, if you pass in the known file length, skip the HEAD. It's also an async operation on S3A, even when a HEAD is needed.

GET: needs to let apps/libraries set the seek policy there; vectored IO work underway even better.

LIST
-make API calls which return deep trees.
-don't treewalk yourself.
-make the most of paged listings by doing useful work per element in the iterator
-i.e. avoid converting to any array/list until as late as possible. Better to simply wrap iterators. Call them "monad functions" if you need to impress people.

The work I'm doing on IOStatistics (HADOOP-16830) lets apps/libraries collect statistics on the IO performed by streams, remote iterators, etc -anything which can implement IOStatisticsSource. But, as well as being a Hadoop 3.3.1+ API, trying to collect/collate those stats across an entire workflow is unsustainable. What you can do right now is add an option to log the toString() values of input streams/output streams/remoteiterators at debug level to some performance-only log. S3A and ABFS input streams today will print out their stats; we are doing the same for the listings. With the logging, at least someone who wants to do some analysis can look inside, and you don't need to use any new APIs. (FWIW, I now need to extend the stats model with a thread-local statistics context which can be passed in to helper threads, so spark/hive/flink threads can simply collect all their results at the end of each task attempt)

To close then:

  • yes for the listing
  • see if you can keep feeding incremental results into other code...via yield, maybe?
  • I'd love to get some comparisons here.

It may be interesting to move the deleted code into a test module, so you can use it as regression testing.

fs.listStatus(path)
} else {
val remoteIter = fs.listLocatedStatus(path)
new Iterator[LocatedFileStatus]() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to pull this out into something reusable for any RemoteIterator[T] as it gets used in a number of API calls (all because of java's checked exceptions...)

}.toArray
case _ => fs.listStatus(path)
if (ignoreLocality) {
fs.listStatus(path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch to listStatusIterator(path) and again, provide a remoteIterator. This will give you on paged downloads on hdfs, webhdfs, async page prefetch on latest S3A builds, and, at worst elsewhere, exactly the same performance a listStatus

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg - I'll switch to listStatusIterator and create a wrapper class for the returned RemoteIterator in both cases.

new Iterator[LocatedFileStatus]() {
def next(): LocatedFileStatus = remoteIter.next
def hasNext(): Boolean = remoteIter.hasNext
}.toArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the longer you can incrementally do per entry in the remote iterator, the more latencies talking to the object stores can be hidden. See HADOOP-17074 and HADOOP-17023 for details; one of the PRs shows some numbers there.

If the spark API could return an iterator/yield and the processing of it used that, a lot of that listing cost could be absorbed entirely.

Copy link
Member Author

@sunchao sunchao Oct 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it would be lovely if we can get async listing here, but I think it requires a much bigger surgery - up to the top level currently Spark's RDD model requires all the input partitions to be ready before it can start processing (deeply embedded in its primitives such as map/reduce).

We can perhaps add the async logic here in this class but I think "local" processing we're doing here is far cheaper than the remote listing and perhaps can't gain much from the change.

We can wrap the iterator and make it looks like a lazy array until certain info is needed but again I think it won't go very far until we make extensive changes in upper stack like in PartitioningAwareFileIndex or DataSourceScanExec. Anyways I'll perhaps try this in a separate PR.

@dongjoon-hyun
Copy link
Member

cc @holdenk

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for updates.

@dongjoon-hyun dongjoon-hyun requested review from dongjoon-hyun and removed request for dongjoon-hyun October 16, 2020 21:53
@dongjoon-hyun
Copy link
Member

Retest this please

@SparkQA
Copy link

SparkQA commented Oct 19, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34624/

@SparkQA
Copy link

SparkQA commented Oct 19, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34624/

@SparkQA
Copy link

SparkQA commented Oct 19, 2020

Test build #130017 has finished for PR 30019 at commit 651f6a6.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sunchao
Copy link
Member Author

sunchao commented Oct 20, 2020

Thanks @steveloughran , for putting all the context on S3A/ABFS, and sorry for the late comment.

HEAD: don't call exists/getFileStatus/etc if you know a file is there. It's also better to do a try { open() } catch (f: FileNotFoundException) than it is for a probe + open, as youl will save a HEAD.

Good point. I guess this applies in general to other FS-impls as well. I can spend some time checking Spark codebase for this pattern and find potential improvements.

The new openFile() API will let the caller specify seek policy (sequential, random, adaptive,...) and, if you pass in the known file length, skip the HEAD. It's also an async operation on S3A, even when a HEAD is needed.

Cool. This is in 3.3.0+ though. We can perhaps explore this once Spark made the switch.

LIST ...

Yes I wish Spark can benefit from the paged listing (I know Presto has optimizations around this and it seems to work really well for them) from FileSystem impls but it will need some significant changes.

What you can do right now is add an option to log the toString() values of input streams/output streams/remoteiterators at debug level to some performance-only log

Sounds good. Let me try to add these.

I'd love to get some comparisons here.

I'll see what I can do ... without the incremental listing support I think we won't see much difference here? plus I don't have a testing environment for S3A at hand :(

@SparkQA
Copy link

SparkQA commented Oct 20, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34668/

@SparkQA
Copy link

SparkQA commented Oct 20, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34668/

@SparkQA
Copy link

SparkQA commented Oct 21, 2020

Test build #130059 has finished for PR 30019 at commit 3c0ad25.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@steveloughran
Copy link
Contributor

The new openFile() API will let the caller specify seek policy (sequential, random, adaptive,...) and, if you pass in the known file length, skip the HEAD. It's also an async operation on S3A, even when a HEAD is needed.

Cool. This is in 3.3.0+ though. We can perhaps explore this once Spark made the switch.

if someone wants to do the work, we could backport the API (with no optimised implementations) to 3.2.x. Same for other public FileSystem API changes. Here work:== backport API plus subset of tests.

@sunchao
Copy link
Member Author

sunchao commented Oct 23, 2020

@steveloughran that will be great, I'm also exploring to having Spark directly use Hadoop 3.3.0+ in #30135 but it seems there are some bugs from Hadoop side which caused test failures.

@steveloughran
Copy link
Contributor

@sunchao if you do want backporting, probably easiest if you lift it so I can review, though I could probably abuse the "I'm cherry picking a subset of a larger patch" rule to do it to...I'd be pulling back the hadoop-common changes without any of the s3a optimisations.

@github-actions
Copy link

github-actions bot commented Feb 4, 2021

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Feb 4, 2021
@github-actions github-actions bot closed this Feb 5, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants