Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
* the name of the file being compressed.
*/
def zipFileToStream(file: Path, entryName: String, outputStream: ZipOutputStream): Unit = {
val fs = FileSystem.get(hadoopConf)
val fs = file.getFileSystem(hadoopConf)
val inputStream = fs.open(file, 1 * 1024 * 1024) // 1MB Buffer
try {
outputStream.putNextEntry(new ZipEntry(entryName))
Expand All @@ -372,7 +372,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
attempt.attemptId.isEmpty || attemptId.isEmpty || attempt.attemptId.get == attemptId.get
}.foreach { attempt =>
val logPath = new Path(logDir, attempt.logPath)
zipFileToStream(new Path(logDir, attempt.logPath), attempt.logPath, zipStream)
zipFileToStream(logPath, attempt.logPath, zipStream)
}
} finally {
zipStream.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,8 @@ object ALS extends DefaultParamsReadable[ALS] with Logging {
val deletePreviousCheckpointFile: () => Unit = () =>
previousCheckpointFile.foreach { file =>
try {
FileSystem.get(sc.hadoopConfiguration).delete(new Path(file), true)
val checkpointFile = new Path(file)
checkpointFile.getFileSystem(sc.hadoopConfiguration).delete(checkpointFile, true)
} catch {
case e: IOException =>
logWarning(s"Cannot delete checkpoint file $file:", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,9 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext with DefaultRead

// There should be 1 checkpoint remaining.
assert(model.getCheckpointFiles.length === 1)
val fs = FileSystem.get(sqlContext.sparkContext.hadoopConfiguration)
assert(fs.exists(new Path(model.getCheckpointFiles.head)))
val checkpointFile = new Path(model.getCheckpointFiles.head)
val fs = checkpointFile.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
assert(fs.exists(checkpointFile))
model.deleteCheckpointFiles()
assert(model.getCheckpointFiles.isEmpty)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ protected void initialize(String path, List<String> columns) throws IOException
config.set("spark.sql.parquet.writeLegacyFormat", "false");

this.file = new Path(path);
long length = FileSystem.get(config).getFileStatus(this.file).getLen();
long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
ParquetMetadata footer = readFooter(config, file, range(0, length));

List<BlockMetaData> blocks = footer.getBlocks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class FileStreamSource(
providerName: String,
dataFrameBuilder: Array[String] => DataFrame) extends Source with Logging {

private val fs = FileSystem.get(sqlContext.sparkContext.hadoopConfiguration)
private val fs = new Path(path).getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All LGTM pending tests. I suppose this could even be fetched in the one place it's used later during the method call rather than hold on to a reference, but, I can't recall a specific reason it's bad to hold onto a FileSystem handle, so, leave it as you've done I think.

private val metadataLog = new HDFSMetadataLog[Seq[String]](sqlContext, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)

Expand Down