-
Notifications
You must be signed in to change notification settings - Fork 5
Description
Describe the bug
It's not a bug. However, it seems a little inconsistent that it might throw an exception when parsing of the file failed.
To Reproduce
Create a checkpoint file with two lines
version line // these two lines are ignored
metadata line // so I can put here whatever
And execute the method CheckpointService#getOffsetsFromFile on that created checkpoint file. This will throw an exception instead of returning Try.Failure.
Expected behavior
HdfsService#parseFileAndClose[R] will capture thrown exception by parseFn inside returned Try or Future monad.
Additional context
If we decide to make unification across services and return Future, it might improve even performance. One of the proposals we discussed with @jozefbakus in PR: #791 (comment).
The basic idea is that every call on HdfsService will return Future because they are all IO bound.
class HdfsServiceImpl extends HdfsService {
private implicit val ec: ExecutionContext = ???
// code
def open(path: Path)(implicit ugi: UserGroupInformation): Future[FSDataInputStream] = ???
def readFileIfExists[R](path: Path, parse: InputStream => Future[R])(implicit ugi: UserGroupInformation): Future[Option[R]] =
exists(path).flatMap {
case true => open(path).flatMap(execAndClose(parse))
case _ => Future.successful(None)
}
def execAndClose[R <: Closeable, O](f: R => Future[O])(res: => R): Future[O] =
f(res).onComplete(_ => IOUtils.closeQuietly(res))
}The snippet above is just a suggestion for tackling some of the problems.