From 644967831c4b2940cde3dde4b3fcd140e17ba6f2 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Mon, 8 Nov 2021 09:43:11 -0800 Subject: [PATCH 1/3] Added local filesystem backend for package manager --- .../filesystem-storage/pom.xml | 61 ++++++ .../filesystem/FileSystemPackagesStorage.java | 150 ++++++++++++++ .../FileSystemPackagesStorageProvider.java | 30 +++ .../storage/filesystem/package-info.java | 23 +++ .../FileSystemPackagesStorageTest.java | 185 ++++++++++++++++++ pulsar-package-management/pom.xml | 1 + 6 files changed, 450 insertions(+) create mode 100644 pulsar-package-management/filesystem-storage/pom.xml create mode 100644 pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorage.java create mode 100644 pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageProvider.java create mode 100644 pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java create mode 100644 pulsar-package-management/filesystem-storage/src/test/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageTest.java diff --git a/pulsar-package-management/filesystem-storage/pom.xml b/pulsar-package-management/filesystem-storage/pom.xml new file mode 100644 index 0000000000000..05c65daaa2b9d --- /dev/null +++ b/pulsar-package-management/filesystem-storage/pom.xml @@ -0,0 +1,61 @@ + + + + + pulsar-package-management + org.apache.pulsar + 2.10.0-SNAPSHOT + + 4.0.0 + + pulsar-package-filesystem-storage + Apache Pulsar :: Package Management :: Filesystem Storage + + + + ${project.groupId} + pulsar-package-core + ${project.parent.version} + + + + com.google.guava + guava + + + + ${project.groupId} + testmocks + ${project.parent.version} + test + + + + junit + junit + test + + + + diff --git a/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorage.java b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorage.java new file mode 100644 index 0000000000000..ff34c482f15d3 --- /dev/null +++ b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorage.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.packages.management.storage.filesystem; + +import com.google.common.io.ByteStreams; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.packages.management.core.PackagesStorage; +import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration; + + +/** + * Packages management storage implementation with filesystem. + */ +@Slf4j +public class FileSystemPackagesStorage implements PackagesStorage { + + private static final String STORAGE_PATH = "STORAGE_PATH"; + private static final String DEFAULT_STORAGE_PATH = "packages-storage"; + + private final File storagePath; + + FileSystemPackagesStorage(PackagesStorageConfiguration configuration) { + String storagePath = configuration.getProperty(STORAGE_PATH); + if (storagePath != null) { + this.storagePath = new File(storagePath); + } else { + this.storagePath = new File(DEFAULT_STORAGE_PATH); + } + } + + private File getPath(String path) { + File f = Paths.get(storagePath.toString(), path).toFile(); + if (!f.getParentFile().exists()) { + if (!f.getParentFile().mkdirs()) { + throw new RuntimeException("Failed to create parent dirs for " + path); + } + } + return f; + } + + @Override + public void initialize() { + if (!storagePath.exists()) { + if (!storagePath.mkdirs()) { + throw new RuntimeException("Failed to create base storage directory at " + storagePath); + } + } + + log.info("Packages management filesystem storage initialized on {}", storagePath); + } + + @Override + public CompletableFuture writeAsync(String path, InputStream inputStream) { + try { + File f = getPath(path); + + @Cleanup + OutputStream os = new FileOutputStream(f); + + @Cleanup + BufferedOutputStream bos = new BufferedOutputStream(os); + ByteStreams.copy(inputStream, bos); + + return CompletableFuture.completedFuture(null); + } catch (IOException e) { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(e); + return f; + } + } + + @Override + public CompletableFuture readAsync(String path, OutputStream outputStream) { + try { + @Cleanup + InputStream is = new FileInputStream(getPath(path)); + + @Cleanup + BufferedInputStream bis = new BufferedInputStream(is); + ByteStreams.copy(bis, outputStream); + + return CompletableFuture.completedFuture(null); + } catch (IOException e) { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(e); + return f; + } + } + + @Override + public CompletableFuture deleteAsync(String path) { + if (getPath(path).delete()) { + return CompletableFuture.completedFuture(null); + } else { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new IOException("Failed to delete file at " + path)); + return f; + } + } + + @Override + public CompletableFuture> listAsync(String path) { + String[] files = getPath(path).list(); + if (files == null) { + return CompletableFuture.completedFuture(Collections.emptyList()); + } else { + return CompletableFuture.completedFuture(Arrays.asList(files)); + } + } + + @Override + public CompletableFuture existAsync(String path) { + return CompletableFuture.completedFuture(getPath(path).exists()); + } + + @Override + public CompletableFuture closeAsync() { + return CompletableFuture.completedFuture(null); + } +} diff --git a/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageProvider.java b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageProvider.java new file mode 100644 index 0000000000000..74b46e287024a --- /dev/null +++ b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageProvider.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.packages.management.storage.filesystem; + +import org.apache.pulsar.packages.management.core.PackagesStorage; +import org.apache.pulsar.packages.management.core.PackagesStorageConfiguration; +import org.apache.pulsar.packages.management.core.PackagesStorageProvider; + +public class FileSystemPackagesStorageProvider implements PackagesStorageProvider { + @Override + public PackagesStorage getStorage(PackagesStorageConfiguration config) { + return new FileSystemPackagesStorage(config); + } +} diff --git a/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java new file mode 100644 index 0000000000000..d82dba7fa1ad0 --- /dev/null +++ b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * Packages management storage implementation with bookkeeper. + */ +package org.apache.pulsar.packages.management.storage.bookkeeper; diff --git a/pulsar-package-management/filesystem-storage/src/test/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageTest.java b/pulsar-package-management/filesystem-storage/src/test/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageTest.java new file mode 100644 index 0000000000000..0b254ede5f4b5 --- /dev/null +++ b/pulsar-package-management/filesystem-storage/src/test/java/org/apache/pulsar/packages/management/storage/filesystem/FileSystemPackagesStorageTest.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.packages.management.storage.filesystem; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.RandomUtils; +import org.apache.pulsar.packages.management.core.PackagesStorage; +import org.apache.pulsar.packages.management.core.PackagesStorageProvider; +import org.apache.pulsar.packages.management.core.impl.DefaultPackagesStorageConfiguration; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Slf4j +public class FileSystemPackagesStorageTest { + private PackagesStorage storage; + private Path storagePath; + + @BeforeMethod() + public void setup() throws Exception { + this.storagePath = Files.createTempDirectory("package-storage-test"); + log.info("Test using storage path: {}", storagePath); + + PackagesStorageProvider provider = PackagesStorageProvider + .newProvider(FileSystemPackagesStorageProvider.class.getName()); + DefaultPackagesStorageConfiguration configuration = new DefaultPackagesStorageConfiguration(); + configuration.setProperty("STORAGE_PATH", storagePath.toString()); + storage = provider.getStorage(configuration); + storage.initialize(); + } + + @AfterMethod(alwaysRun = true) + public void teardown() throws Exception { + if (storage != null) { + storage.closeAsync().get(); + } + + storagePath.toFile().delete(); + } + + @Test(timeOut = 60000) + public void testReadWriteOperations() throws ExecutionException, InterruptedException { + String testData = "test-data"; + ByteArrayInputStream testDataStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + String testPath = "test-read-write"; + + // write some data to the package + storage.writeAsync(testPath, testDataStream).get(); + + // read the data from the package + ByteArrayOutputStream readData = new ByteArrayOutputStream(); + storage.readAsync(testPath, readData).get(); + String readResult = new String(readData.toByteArray(), StandardCharsets.UTF_8); + + assertEquals(testData, readResult); + } + + @Test(timeOut = 60000) + public void testReadWriteLargeDataOperations() throws ExecutionException, InterruptedException { + byte[] data = RandomUtils.nextBytes(8192 * 3 + 4096); + ByteArrayInputStream testDataStream = new ByteArrayInputStream(data); + String testPath = "test-large-read-write"; + + // write some data to the package + storage.writeAsync(testPath, testDataStream).get(); + + // read the data from the package + ByteArrayOutputStream readData = new ByteArrayOutputStream(); + storage.readAsync(testPath, readData).get(); + byte[] readResult = readData.toByteArray(); + + assertEquals(data, readResult); + } + + @Test(timeOut = 60000) + public void testReadNonExistentData() { + String testPath = "non-existent-path"; + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + + try { + storage.readAsync(testPath, outputStream).join(); + } catch (CompletionException e) { + assertEquals(e.getCause().getClass(), FileNotFoundException.class); + } + } + + @Test(timeOut = 60000) + public void testListOperation() throws ExecutionException, InterruptedException { + // write the data to different path + String rootPath = "pulsar"; + String testData = "test-data"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + + List writePaths = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + String path = "test-" + i; + writePaths.add(path); + storage.writeAsync(rootPath + "/" + path, inputStream).get(); + } + + // list all path under the root path + List paths = storage.listAsync(rootPath).get(); + + // verify the paths number + assertEquals(paths.size(), writePaths.size()); + paths.forEach(p -> writePaths.remove(p)); + assertEquals(writePaths.size(), 0); + + // list non-existent path + storage.listAsync("non-existent").get(); + } + + @Test(timeOut = 60000) + public void testDeleteOperation() throws ExecutionException, InterruptedException { + String testPath = "test-delete-path"; + String testData = "test-data"; + ByteArrayInputStream inputStream = new ByteArrayInputStream(testData.getBytes(StandardCharsets.UTF_8)); + + // write the data to the test path + storage.writeAsync(testPath, inputStream).get(); + + // list path should have one file + List paths = storage.listAsync("").get(); + assertEquals(paths.size(), 1); + assertEquals(paths.get(0), testPath); + + // delete the path + storage.deleteAsync(testPath).get(); + + // list again and not file under the path + paths= storage.listAsync("").get(); + assertEquals(paths.size(), 0); + + + // delete non-existent path + try { + storage.deleteAsync("non-existent").join(); + fail("should throw exception"); + } catch (Exception e) { + assertEquals(e.getCause().getClass(), IOException.class); + } + } + + @Test(timeOut = 60000) + public void testExistOperation() throws ExecutionException, InterruptedException { + Boolean exist = storage.existAsync("test-path").get(); + org.testng.Assert.assertFalse(exist); + + storage.writeAsync("test-path", new ByteArrayInputStream("test".getBytes())).get(); + + exist = storage.existAsync("test-path").get(); + assertTrue(exist); + } + +} diff --git a/pulsar-package-management/pom.xml b/pulsar-package-management/pom.xml index bbd4e1fbb2b75..5cf86c9d063ce 100644 --- a/pulsar-package-management/pom.xml +++ b/pulsar-package-management/pom.xml @@ -36,6 +36,7 @@ core bookkeeper-storage + filesystem-storage From 08ca5d0cb8d07e671c317f69d26420483faf9d64 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 10 Nov 2021 07:59:02 -0800 Subject: [PATCH 2/3] fixed package name in package-info.java --- .../packages/management/storage/filesystem/package-info.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java index d82dba7fa1ad0..2ec21ae7ef58a 100644 --- a/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java +++ b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java @@ -20,4 +20,4 @@ /** * Packages management storage implementation with bookkeeper. */ -package org.apache.pulsar.packages.management.storage.bookkeeper; +package org.apache.pulsar.packages.management.storage.filesystem; From 3b44584f0eeed6f1c188015eb816abb6d9bce468 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 10 Nov 2021 08:00:23 -0800 Subject: [PATCH 3/3] Fixed javadoc --- .../packages/management/storage/filesystem/package-info.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java index 2ec21ae7ef58a..1186e2bf08757 100644 --- a/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java +++ b/pulsar-package-management/filesystem-storage/src/main/java/org/apache/pulsar/packages/management/storage/filesystem/package-info.java @@ -18,6 +18,6 @@ */ /** - * Packages management storage implementation with bookkeeper. + * Packages management storage implementation with filesystem. */ package org.apache.pulsar.packages.management.storage.filesystem;