diff --git a/pulsar-package-management/filesystem-storage/pom.xml b/pulsar-package-management/filesystem-storage/pom.xml
new file mode 100644
index 0000000000000..05c65daaa2b9d
--- /dev/null
+++ b/pulsar-package-management/filesystem-storage/pom.xml
@@ -0,0 +1,61 @@
+
+
+
+
+ pulsar-package-management
+ org.apache.pulsar
+ 2.10.0-SNAPSHOT
+
+ 4.0.0
+
+ pulsar-package-filesystem-storage
+ Apache Pulsar :: Package Management :: Filesystem Storage
+
+
+
+ ${project.groupId}
+ pulsar-package-core
+ ${project.parent.version}
+
+
+
+ com.google.guava
+ guava
+
+
+
+ ${project.groupId}
+ testmocks
+ ${project.parent.version}
+ test
+
+
+
+ junit
+ junit
+ test
+
+
+
+
diff --git a/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorage.java b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorage.java
new file mode 100644
index 0000000000000..ff34c482f15d3
--- /dev/null
+++ b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorage.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.filesystem;
+
+import com.google.common.io.ByteStreams;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import lombok.Cleanup;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.packages.management.core.PackagesStorage;
+import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;
+
+
+/**
+ * Packages management storage implementation with filesystem.
+ */
+@Slf4j
+public class FileSystemPackagesStorage implements PackagesStorage {
+
+ private static final String STORAGE_PATH = "STORAGE_PATH";
+ private static final String DEFAULT_STORAGE_PATH = "packages-storage";
+
+ private final File storagePath;
+
+ FileSystemPackagesStorage(PackagesStorageConfiguration configuration) {
+ String storagePath = configuration.getProperty(STORAGE_PATH);
+ if (storagePath != null) {
+ this.storagePath = new File(storagePath);
+ } else {
+ this.storagePath = new File(DEFAULT_STORAGE_PATH);
+ }
+ }
+
+ private File getPath(String path) {
+ File f = Paths.get(storagePath.toString(), path).toFile();
+ if (!f.getParentFile().exists()) {
+ if (!f.getParentFile().mkdirs()) {
+ throw new RuntimeException("Failed to create parent dirs for " + path);
+ }
+ }
+ return f;
+ }
+
+ @Override
+ public void initialize() {
+ if (!storagePath.exists()) {
+ if (!storagePath.mkdirs()) {
+ throw new RuntimeException("Failed to create base storage directory at " + storagePath);
+ }
+ }
+
+ log.info("Packages management filesystem storage initialized on {}", storagePath);
+ }
+
+ @Override
+ public CompletableFuture writeAsync(String path, InputStream inputStream) {
+ try {
+ File f = getPath(path);
+
+ @Cleanup
+ OutputStream os = new FileOutputStream(f);
+
+ @Cleanup
+ BufferedOutputStream bos = new BufferedOutputStream(os);
+ ByteStreams.copy(inputStream, bos);
+
+ return CompletableFuture.completedFuture(null);
+ } catch (IOException e) {
+ CompletableFuture f = new CompletableFuture<>();
+ f.completeExceptionally(e);
+ return f;
+ }
+ }
+
+ @Override
+ public CompletableFuture readAsync(String path, OutputStream outputStream) {
+ try {
+ @Cleanup
+ InputStream is = new FileInputStream(getPath(path));
+
+ @Cleanup
+ BufferedInputStream bis = new BufferedInputStream(is);
+ ByteStreams.copy(bis, outputStream);
+
+ return CompletableFuture.completedFuture(null);
+ } catch (IOException e) {
+ CompletableFuture f = new CompletableFuture<>();
+ f.completeExceptionally(e);
+ return f;
+ }
+ }
+
+ @Override
+ public CompletableFuture deleteAsync(String path) {
+ if (getPath(path).delete()) {
+ return CompletableFuture.completedFuture(null);
+ } else {
+ CompletableFuture f = new CompletableFuture<>();
+ f.completeExceptionally(new IOException("Failed to delete file at " + path));
+ return f;
+ }
+ }
+
+ @Override
+ public CompletableFuture> listAsync(String path) {
+ String[] files = getPath(path).list();
+ if (files == null) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ } else {
+ return CompletableFuture.completedFuture(Arrays.asList(files));
+ }
+ }
+
+ @Override
+ public CompletableFuture existAsync(String path) {
+ return CompletableFuture.completedFuture(getPath(path).exists());
+ }
+
+ @Override
+ public CompletableFuture closeAsync() {
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git a/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageProvider.java b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageProvider.java
new file mode 100644
index 0000000000000..74b46e287024a
--- /dev/null
+++ b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageProvider.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.filesystem;
+
+import org.apache.pulsar.packages.management.core.PackagesStorage;
+import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration;
+import org.apache.pulsar.packages.management.core.PackagesStorageProvider;
+
+public class FileSystemPackagesStorageProvider implements PackagesStorageProvider {
+ @Override
+ public PackagesStorage getStorage(PackagesStorageConfiguration config) {
+ return new FileSystemPackagesStorage(config);
+ }
+}
diff --git a/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java
new file mode 100644
index 0000000000000..1186e2bf08757
--- /dev/null
+++ b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Packages management storage implementation with filesystem.
+ */
+package org.apache.pulsar.packages.management.storage.filesystem;
diff --git a/pulsar-package-management/filesystem-storage/src/test/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageTest.java b/pulsar-package-management/filesystem-storage/src/test/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageTest.java
new file mode 100644
index 0000000000000..0b254ede5f4b5
--- /dev/null
+++ b/pulsar-package-management/filesystem-storage/src/test/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageTest.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.packages.management.storage.filesystem;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.pulsar.packages.management.core.PackagesStorage;
+import org.apache.pulsar.packages.management.core.PackagesStorageProvider;
+import org.apache.pulsar.packages.management.core.impl.DefaultPackagesStorageConfiguration;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Slf4j
+public class FileSystemPackagesStorageTest {
+ private PackagesStorage storage;
+ private Path storagePath;
+
+ @BeforeMethod()
+ public void setup() throws Exception {
+ this.storagePath = Files.createTempDirectory("package-storage-test");
+ log.info("Test using storage path: {}", storagePath);
+
+ PackagesStorageProvider provider = PackagesStorageProvider
+ .newProvider(FileSystemPackagesStorageProvider.class.getName());
+ DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration();
+ configuration.setProperty("STORAGE_PATH", storagePath.toString());
+ storage = provider.getStorage(configuration);
+ storage.initialize();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void teardown() throws Exception {
+ if (storage != null) {
+ storage.closeAsync().get();
+ }
+
+ storagePath.toFile().delete();
+ }
+
+ @Test(timeOut = 60000)
+ public void testReadWriteOperations() throws ExecutionException, InterruptedException {
+ String testData = "test-data";
+ ByteArrayInputStream testDataStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
+ String testPath = "test-read-write";
+
+ // write some data to the package
+ storage.writeAsync(testPath, testDataStream).get();
+
+ // read the data from the package
+ ByteArrayOutputStream readData = new ByteArrayOutputStream();
+ storage.readAsync(testPath, readData).get();
+ String readResult = new String(readData.toByteArray(), StandardCharsets.UTF_8);
+
+ assertEquals(testData, readResult);
+ }
+
+ @Test(timeOut = 60000)
+ public void testReadWriteLargeDataOperations() throws ExecutionException, InterruptedException {
+ byte[] data = RandomUtils.nextBytes(8192 * 3 + 4096);
+ ByteArrayInputStream testDataStream = new ByteArrayInputStream(data);
+ String testPath = "test-large-read-write";
+
+ // write some data to the package
+ storage.writeAsync(testPath, testDataStream).get();
+
+ // read the data from the package
+ ByteArrayOutputStream readData = new ByteArrayOutputStream();
+ storage.readAsync(testPath, readData).get();
+ byte[] readResult = readData.toByteArray();
+
+ assertEquals(data, readResult);
+ }
+
+ @Test(timeOut = 60000)
+ public void testReadNonExistentData() {
+ String testPath = "non-existent-path";
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+
+ try {
+ storage.readAsync(testPath, outputStream).join();
+ } catch (CompletionException e) {
+ assertEquals(e.getCause().getClass(), FileNotFoundException.class);
+ }
+ }
+
+ @Test(timeOut = 60000)
+ public void testListOperation() throws ExecutionException, InterruptedException {
+ // write the data to different path
+ String rootPath = "pulsar";
+ String testData = "test-data";
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
+
+ List writePaths = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String path = "test-" + i;
+ writePaths.add(path);
+ storage.writeAsync(rootPath + "/" + path, inputStream).get();
+ }
+
+ // list all path under the root path
+ List paths = storage.listAsync(rootPath).get();
+
+ // verify the paths number
+ assertEquals(paths.size(), writePaths.size());
+ paths.forEach(p -> writePaths.remove(p));
+ assertEquals(writePaths.size(), 0);
+
+ // list non-existent path
+ storage.listAsync("non-existent").get();
+ }
+
+ @Test(timeOut = 60000)
+ public void testDeleteOperation() throws ExecutionException, InterruptedException {
+ String testPath = "test-delete-path";
+ String testData = "test-data";
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8));
+
+ // write the data to the test path
+ storage.writeAsync(testPath, inputStream).get();
+
+ // list path should have one file
+ List paths = storage.listAsync("").get();
+ assertEquals(paths.size(), 1);
+ assertEquals(paths.get(0), testPath);
+
+ // delete the path
+ storage.deleteAsync(testPath).get();
+
+ // list again and not file under the path
+ paths= storage.listAsync("").get();
+ assertEquals(paths.size(), 0);
+
+
+ // delete non-existent path
+ try {
+ storage.deleteAsync("non-existent").join();
+ fail("should throw exception");
+ } catch (Exception e) {
+ assertEquals(e.getCause().getClass(), IOException.class);
+ }
+ }
+
+ @Test(timeOut = 60000)
+ public void testExistOperation() throws ExecutionException, InterruptedException {
+ Boolean exist = storage.existAsync("test-path").get();
+ org.testng.Assert.assertFalse(exist);
+
+ storage.writeAsync("test-path", new ByteArrayInputStream("test".getBytes())).get();
+
+ exist = storage.existAsync("test-path").get();
+ assertTrue(exist);
+ }
+
+}
diff --git a/pulsar-package-management/pom.xml b/pulsar-package-management/pom.xml
index bbd4e1fbb2b75..5cf86c9d063ce 100644
--- a/pulsar-package-management/pom.xml
+++ b/pulsar-package-management/pom.xml
@@ -36,6 +36,7 @@
core
bookkeeper-storage
+ filesystem-storage