diff --git a/CHANGELOG.md b/CHANGELOG.md index 7dc4f0ab43..cb41e08563 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,18 +1,28 @@ ## Changelog #### Version 0.10.0 (TBD) + +##### New Features * Added an iterative connection builder that is accessible using the `Concourse.at()` static factory method. -* Refactored the `concourse-import` framework to take advantage of version `1.1.0+` of the `data-transform-api` which has a more flexible notion of data transformations. As a result of this change, the `Importables` utility class has been removed. Custom importers that extend `DelimitedLineImporter` can leverage the protected `parseObject` and `importLines` methods to hook into the extraction and import logic in a manner similar to what was possible using the `Importables` functions. * Added the `com.cinchapi.concourse.valididate.Keys` utility class which contains the `#isWritable` method that determines if a proposed key can be written to Concourse. -* Fixed a bug that caused data imported from STDIN to not have a `__datasource` tag, even if the `--annotate-data-source` flag was included with the CLI invocation. * Added `Parsers#create` static factory methods that accept a `Criteria` object as a parameter. These new methods compliment existing ones which take a CCL `String` and `TCriteria` object respectively. -* Upgrade the `ccl` dependency to the latest version, which adds support for local criteria evaluation using the `Parser#evaluate` method. The parsers returned from the `Parsers#create` factories all support local evaluation using the function defined in the newly created `Operators#evaluate` utility. +* Upgraded the `ccl` dependency to the latest version, which adds support for local criteria evaluation using the `Parser#evaluate` method. The parsers returned from the `Parsers#create` factories all support local evaluation using the function defined in the newly created `Operators#evaluate` utility. * Added the `com.cinchapi.concourse.etl` package that contains data processing utilities: * A `Strainer` can be used to process a `Map` using Concourse's data model rules. In particular, the `Strainer` encapsulates logic to break down top-level sequence values and process their elements individually. * The `Transform` class contains functions for common data transformations. + +##### Improvements +* Refactored the `concourse-import` framework to take advantage of version `1.1.0+` of the `data-transform-api` which has a more flexible notion of data transformations. As a result of this change, the `Importables` utility class has been removed. Custom importers that extend `DelimitedLineImporter` can leverage the protected `parseObject` and `importLines` methods to hook into the extraction and import logic in a manner similar to what was possible using the `Importables` functions. +* Refactored the `Criteria` class into an interface that is implemented by any language symbols that can be immediately transformed to a well-built criteria (e.g. `ValueState` and `TimestampState`). The primary benefit of this change is that methods that took a generic Object parameter and checked whether that object could be built into a `Criteria` have now been removed from the `Concourse` driver since that logic is automatically captured within the new class hiearchy. Another positive side effect of this change is that it is no longer necessary to explicitly build a nested `Criteria` when using the `group` functionality of the `Criteria` builder. + +##### Bug Fixes +* Fixed a bug that caused data imported from STDIN to not have a `__datasource` tag, even if the `--annotate-data-source` flag was included with the CLI invocation. +* Fixed a bug that allowed Concourse Server to start an environment's storage engine in a partially or wholly unreadable state if the Engine partially completed a block sync while Concourse Server was going through its shutdown routine. In this scenario, the partially written block is malformed and should not be processed by the Engine since the data contained in the malformed block is still contained in the Buffer. While the malformed block files can be safely deleted, the implemented fix causes the Engine to simply ignore them if they are encountered upon initialization. +* Added checks to ensure that a storage Engine cannot transport writes from the Buffer to the Database while Concourse Server is shutting down. + +##### Deprecations and Removed Features * Removed the `Strings` utility class in favor of `AnyStrings` from `accent4j`. * Removed the `StringSplitter` framework in favor of the same from `accent4j`. -* Refactored the `Criteria` class into an interface that is implemented by any language symbols that can be immediately transformed to a well-built criteria (e.g. `ValueState` and `TimestampState`). The primary benefit of this change is that methods that took a generic Object parameter and checked whether that object could be built into a `Criteria` have now been removed from the `Concourse` driver since that logic is automatically captured within the new class hiearchy. Another positive side effect of this change is that it is no longer necessary to explicitly build a nested `Criteria` when using the `group` functionality of the `Criteria` builder. * Deprecated `Criteria#getCclString` in favor of `Criteria#ccl`. #### Version 0.9.6 (February 16, 2019) diff --git a/concourse-ete-tests/build.gradle b/concourse-ete-tests/build.gradle new file mode 100644 index 0000000000..d4cc4f4a43 --- /dev/null +++ b/concourse-ete-tests/build.gradle @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2013-2019 Cinchapi Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +dependencies { + testCompile project(':concourse-driver-java') + testCompile project(':concourse-ete-test-core') +} + +test { + exclude '**' +} diff --git a/concourse-ete-tests/gradlew b/concourse-ete-tests/gradlew new file mode 100755 index 0000000000..b80c85f619 --- /dev/null +++ b/concourse-ete-tests/gradlew @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +# Copyright (c) 2013-2019 Cinchapi Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This script wraps the gradlew script in the root of the project directory +# and automatically invokes each task with the project in this directory as +# the prefix. + +# Get the path to the real gradlew script +REAL_GRADLEW_DIR=`dirname $0`"/.." +cd $REAL_GRADLEW_DIR +REAL_GRADLEW_DIR=`pwd -P` +cd - > /dev/null +REAL_GRADLEW_SCRIPT=$REAL_GRADLEW_DIR"/gradlew" + +# Get the name of this project, as known to Gradle +PROJECT=`pwd -P` +PROJECT="${PROJECT/$REAL_GRADLEW_DIR/}" +PROJECT=${PROJECT/\//} + +# Go to the REAL_GRADLEW_DIR and operate from there +cd $REAL_GRADLEW_DIR + +# Prepend the project name to all the input args +args=$@ +newargs="" +for i in ${args[@]} +do + if [[ $i != -* ]]; then + i=:$PROJECT:$i + fi + newargs+="$i " +done + +# Call the real gradle +bash $REAL_GRADLEW_SCRIPT $newargs + +exit $? diff --git a/concourse-ete-tests/src/test/java/com/cinchapi/concourse/bugrepro/CON649.java b/concourse-ete-tests/src/test/java/com/cinchapi/concourse/bugrepro/CON649.java new file mode 100644 index 0000000000..0300165855 --- /dev/null +++ b/concourse-ete-tests/src/test/java/com/cinchapi/concourse/bugrepro/CON649.java @@ -0,0 +1,114 @@ +/* + * Copyright (c) 2013-2019 Cinchapi Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cinchapi.concourse.bugrepro; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.thrift.transport.TTransportException; +import org.junit.Assert; +import org.junit.Test; + +import com.cinchapi.concourse.Concourse; +import com.cinchapi.concourse.test.ClientServerTest; +import com.cinchapi.concourse.util.Random; +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.collect.Table; +import com.google.common.collect.Table.Cell; + +/** + * Unit test to reproduce the issues described in CON-649. + * + * @author Jeff Nelson + */ +public class CON649 extends ClientServerTest { + + @Test + public void repro() + throws IOException, TTransportException, InterruptedException { + List clients = Lists.newArrayList(); + Table expected = HashBasedTable.create(); + for (int i = 0; i < Runtime.getRuntime().availableProcessors(); ++i) { + clients.add(new Thread(() -> { + Concourse $client = server.connect(); + while (!Thread.currentThread().isInterrupted()) { + try { + long id = Random.getLong(); + String key = Random.getSimpleString(); + String value = Random.getSimpleString(); + $client.add(key, value, id); + expected.put(id, key, value); + Random.tinySleep(); // allow some transports to go + // through... + } + catch (Exception e) { + Thread.currentThread().interrupt(); + } + } + })); + } + clients.forEach(Thread::start); + Thread.sleep(10000); + Random.microSleep(); + server.stop(); + Path db = server.getDatabaseDirectory().resolve("default"); + List directories = ImmutableList.of(db.resolve("cpb"), + db.resolve("csb"), db.resolve("ctb")); + Map counts = Maps.newLinkedHashMap(); + directories.forEach(directory -> { + try { + Files.list(directory).forEach(file -> { + System.out.println(file); + String name = file.getFileName().toString().split("\\.")[0]; + counts.computeIfAbsent(name, key -> new AtomicInteger(0)) + .incrementAndGet(); + }); + } + catch (IOException e) { + e.printStackTrace(); + } + }); + Set distinct = Sets.newHashSet(); + counts.forEach((path, count) -> { + System.out.println(path + " = " + count); + distinct.add(count.get()); + }); + System.out.println(counts.size()); + server.start(); + client = server.connect(); + for (Cell cell : expected.cellSet()) { + long record = cell.getRowKey(); + String key = cell.getColumnKey(); + Object value = cell.getValue(); + Assert.assertEquals(value, client.get(key, record)); + } + } + + @Override + protected String getServerVersion() { + return ClientServerTest.LATEST_SNAPSHOT_VERSION; + } + +} diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/io/FileSystem.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/io/FileSystem.java index f44bf60699..633658eac7 100644 --- a/concourse-server/src/main/java/com/cinchapi/concourse/server/io/FileSystem.java +++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/io/FileSystem.java @@ -34,6 +34,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.Set; +import java.util.stream.Stream; import com.cinchapi.common.base.CheckedExceptions; import com.cinchapi.concourse.util.FileOps; @@ -146,8 +147,8 @@ public static Iterator fileOnlyIterator(final String directory) { return new ReadOnlyIterator() { private final File[] files = new File(directory).listFiles(); - private int position = 0; private File next = null; + private int position = 0; { findNext(); } @@ -278,8 +279,8 @@ public static boolean hasDir(String dir) { * @param file * @return {@code true} if {@code file} exists */ - public static boolean hasFile(String file) { - return hasFile(Paths.get(file)); + public static boolean hasFile(Path file) { + return Files.exists(file) && !Files.isDirectory(file); } /** @@ -289,8 +290,8 @@ public static boolean hasFile(String file) { * @param file * @return {@code true} if {@code file} exists */ - public static boolean hasFile(Path file) { - return Files.exists(file) && !Files.isDirectory(file); + public static boolean hasFile(String file) { + return hasFile(Paths.get(file)); } /** @@ -320,6 +321,22 @@ public static void lock(String path) { } } + /** + * Return a {@link Stream} that contains a non-recursive list of all the + * files in the {@code directory}. + * + * @param directory + * @return the files in the directory + */ + public static Stream ls(Path directory) { + try { + return Files.list(directory); + } + catch (IOException e) { + throw CheckedExceptions.wrapAsRuntimeException(e); + } + } + /** * Create a valid path that contains separators in the appropriate places * by joining all the {@link parts} together with the {@link File#separator} diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/cache/BloomFilter.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/cache/BloomFilter.java index c95ac1d440..d3ac98c529 100644 --- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/cache/BloomFilter.java +++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/cache/BloomFilter.java @@ -22,6 +22,7 @@ import java.io.ObjectInputStream; import java.io.ObjectStreamClass; import java.nio.channels.FileChannel; +import java.nio.file.Path; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.StampedLock; @@ -63,6 +64,24 @@ public static BloomFilter create(int expectedInsertions) { return new BloomFilter(null, expectedInsertions); } + /** + * Create a new BloomFilter with enough capacity for + * {@code expectedInsertions}. + *

+ * Note that overflowing a BloomFilter with significantly more elements than + * specified, will result in its saturation, and a sharp deterioration of + * its false positive probability (source: + * {@link BloomFilter#create(com.google.common.hash.Funnel, int)}) + *

+ * + * @param file + * @param expectedInsertions + * @return the BloomFilter + */ + public static BloomFilter create(Path file, int expectedInsertions) { + return create(file.toString(), expectedInsertions); + } + /** * Create a new BloomFilter with enough capacity for * {@code expectedInsertions}. @@ -81,6 +100,16 @@ public static BloomFilter create(String file, int expectedInsertions) { return new BloomFilter(file, expectedInsertions); } + /** + * Return the BloomFilter that is stored on disk in {@code file}. + * + * @param file + * @return the BloomFilter + */ + public static BloomFilter open(Path file) { + return open(file.toString()); + } + /** * Return the BloomFilter that is stored on disk in {@code file}. * diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Block.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Block.java index 1bb6475568..695f3030fa 100644 --- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Block.java +++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Block.java @@ -24,6 +24,7 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.Comparator; import java.util.Iterator; @@ -31,12 +32,14 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import java.util.stream.Collectors; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; import com.cinchapi.common.base.AdHocIterator; +import com.cinchapi.common.base.Array; import com.cinchapi.common.base.CheckedExceptions; import com.cinchapi.common.base.validate.BiCheck; import com.cinchapi.concourse.annotate.PackagePrivate; @@ -53,6 +56,7 @@ import com.cinchapi.concourse.util.Logger; import com.google.common.base.Preconditions; import com.google.common.collect.ComparisonChain; +import com.google.common.collect.ImmutableList; import com.google.common.collect.SortedMultiset; import com.google.common.collect.TreeMultiset; @@ -303,38 +307,44 @@ public static String getId(String filename) { * @param directory * @param diskLoad - set to {@code true} to deserialize the block {@code id} * from {@code directory} on disk + * @throws MalformedBlockException if a loaded Block does not have all of + * the required components */ - protected Block(String id, String directory, boolean diskLoad) { + protected Block(String id, String directory, boolean diskLoad) + throws MalformedBlockException { FileSystem.mkdirs(directory); this.id = id; this.file = directory + File.separator + id + BLOCK_NAME_EXTENSION; - this.stats = new BlockStats( - Paths.get(directory, id + STATS_NAME_EXTENSION)); + Path $stats = Paths.get(directory, id + STATS_NAME_EXTENSION); + Path $filter = Paths.get(directory, id + FILTER_NAME_EXTENSION); + Path $index = Paths.get(directory, id + INDEX_NAME_EXTENSION); + this.stats = new BlockStats($stats); if(diskLoad) { + String[] missing = ImmutableList.of($stats, $filter, $index) + .stream().filter(path -> !path.toFile().exists()) + .map(path -> path.getFileName().toString()) + .collect(Collectors.toList()).toArray(Array.containing()); + if(missing.length > 0) { + throw new MalformedBlockException(id, directory, missing); + } this.mutable = false; this.size = (int) FileSystem.getFileSize(this.file); try { - this.filter = BloomFilter.open(directory + File.separator + id - + FILTER_NAME_EXTENSION); + this.filter = BloomFilter.open($filter); filter.disableThreadSafety(); } catch (RuntimeException e) { repair(e); } - this.index = BlockIndex.open( - directory + File.separator + id + INDEX_NAME_EXTENSION); + this.index = BlockIndex.open($index); this.revisions = null; } else { this.mutable = true; this.size = 0; this.revisions = createBackingStore(Sorter.INSTANCE); - this.filter = BloomFilter.create( - (directory + File.separator + id + FILTER_NAME_EXTENSION), - EXPECTED_INSERTIONS); - this.index = BlockIndex.create( - directory + File.separator + id + INDEX_NAME_EXTENSION, - EXPECTED_INSERTIONS); + this.filter = BloomFilter.create($filter, EXPECTED_INSERTIONS); + this.index = BlockIndex.create($index, EXPECTED_INSERTIONS); stats.put(Attribute.SCHEMA_VERSION, SCHEMA_VERSION); } this.softRevisions = new SoftReference>>( diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/BlockIndex.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/BlockIndex.java index bc612d72c8..0742997c56 100644 --- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/BlockIndex.java +++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/BlockIndex.java @@ -20,6 +20,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileChannel.MapMode; +import java.nio.file.Path; import java.util.Iterator; import java.util.Map; @@ -61,6 +62,17 @@ public static BlockIndex create(String file, int expectedInsertions) { return new BlockIndex(file, expectedInsertions); } + /** + * Return a newly created BlockIndex. + * + * @param file + * @param expectedInsertions + * @return the BlockIndex + */ + public static BlockIndex create(Path file, int expectedInsertions) { + return create(file.toString(), expectedInsertions); + } + /** * Return the BlockIndex that is stored in {@code file}. * @@ -71,6 +83,16 @@ public static BlockIndex open(String file) { return new BlockIndex(file); } + /** + * Return the BlockIndex that is stored in {@code file}. + * + * @param file + * @return the BlockIndex + */ + public static BlockIndex open(Path file) { + return open(file.toString()); + } + /** * Represents an entry that has not been recorded. */ diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java index 7cdd2935e2..5e9776914d 100644 --- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java +++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java @@ -18,23 +18,29 @@ import static com.cinchapi.concourse.server.GlobalState.*; import java.io.File; -import java.io.FilenameFilter; import java.io.IOException; -import java.lang.reflect.Constructor; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import com.cinchapi.common.collect.concurrent.ThreadFactories; +import com.cinchapi.common.reflect.Reflection; import com.cinchapi.concourse.annotate.Restricted; import com.cinchapi.concourse.server.GlobalState; -import com.cinchapi.concourse.server.concurrent.ConcourseExecutors; import com.cinchapi.concourse.server.io.Composite; import com.cinchapi.concourse.server.io.FileSystem; import com.cinchapi.concourse.server.jmx.ManagedOperation; @@ -60,6 +66,7 @@ import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -188,10 +195,8 @@ private static Cache buildCache() { * another is by the directory in which they are stored. */ private static final String PRIMARY_BLOCK_DIRECTORY = "cpb"; - private static final String SEARCH_BLOCK_DIRECTORY = "ctb"; private static final String SECONDARY_BLOCK_DIRECTORY = "csb"; - private static final String threadNamePrefix = "database-write-thread"; /** * A flag to indicate if the Database has verified the data it is seeing is @@ -253,6 +258,18 @@ private static Cache buildCache() { */ private transient boolean running = false; + /** + * An {@link ExecutorService} that handles asynchronous writing tasks in the + * background. + */ + private transient ExecutorService writer; + + /** + * An {@link ExecutorService} that handles asynchronous reading tasks in the + * background. + */ + private transient ExecutorService reader; + /** * Construct a Database that is backed by the default location which is in * {@link GlobalState#DATABASE_DIRECTORY}. @@ -291,9 +308,33 @@ && verify(write.getKey().toString(), // NOTE: Write locking happens in each individual Block, and // furthermore this method is only called from the Buffer, which // transports data serially. - ConcourseExecutors.executeAndAwaitTermination(threadNamePrefix, + List tasks = ImmutableList.of( new BlockWriter(cpb0, write), new BlockWriter(csb0, write), new BlockWriter(ctb0, write)); + if(running) { + try { + List> writes = tasks.stream() + .map(Executors::callable) + .collect(Collectors.toList()); + writer.invokeAll(writes); + } + catch (InterruptedException e) { + Logger.warn( + "The database was interrupted while trying to accept {}. " + + "If the write could not be fully accepted, it will " + + "remain in the buffer and re-tried when the Database is able to accept writes.", + write); + Thread.currentThread().interrupt(); + return; + } + } + else { + // The #accept method may be called when the database is stopped + // during test cases + Logger.warn( + "The database is being asked to accept a Write, even though it is not running."); + tasks.forEach(task -> task.run()); + } } else { Logger.warn( @@ -472,14 +513,27 @@ public void start() { running = true; Logger.info("Database configured to store data in {}", backingStore); - ConcourseExecutors.executeAndAwaitTerminationAndShutdown( - "Storage Block Loader", - new BlockLoader(PrimaryBlock.class, - PRIMARY_BLOCK_DIRECTORY, cpb), - new BlockLoader(SecondaryBlock.class, - SECONDARY_BLOCK_DIRECTORY, csb), - new BlockLoader(SearchBlock.class, - SEARCH_BLOCK_DIRECTORY, ctb)); + this.writer = Executors.newCachedThreadPool(ThreadFactories + .namingThreadFactory("database-write-thread")); + this.reader = Executors.newCachedThreadPool(ThreadFactories + .namingThreadFactory("Storage Block Loader")); + List> tasks = ImmutableList.of( + Executors.callable(new BlockLoader( + PrimaryBlock.class, PRIMARY_BLOCK_DIRECTORY, cpb)), + Executors.callable(new BlockLoader( + SecondaryBlock.class, SECONDARY_BLOCK_DIRECTORY, + csb)), + Executors.callable(new BlockLoader( + SearchBlock.class, SEARCH_BLOCK_DIRECTORY, ctb))); + try { + reader.invokeAll(tasks); + } + catch (InterruptedException e) { + Logger.error("The Database was interrupted while starting...", + e); + Thread.currentThread().interrupt(); + return; + } // CON-83: Get rid of any blocks that aren't "balanced" (e.g. has // primary and secondary) under the assumption that the server @@ -502,6 +556,8 @@ public void start() { public void stop() { if(running) { running = false; + reader.shutdown(); + writer.shutdown(); } } @@ -655,9 +711,34 @@ private void triggerSync(boolean doSync) { if(doSync) { // TODO we need a transactional file system to ensure that these // blocks are written atomically (all or nothing) - ConcourseExecutors.executeAndAwaitTermination(threadNamePrefix, - new BlockSyncer(cpb0), new BlockSyncer(csb0), - new BlockSyncer(ctb0)); + List tasks = ImmutableList.of(new BlockSyncer(cpb0), + new BlockSyncer(csb0), new BlockSyncer(ctb0)); + if(running) { + try { + List> syncs = tasks.stream() + .map(Executors::callable) + .collect(Collectors.toList()); + writer.invokeAll(syncs); + } + catch (InterruptedException e) { + Logger.warn( + "The database was interrupted while trying to " + + "sync storage blocks for {}. Since the blocks " + + "were not fully synced, the contained writes are " + + "still in the Buffer. Any partially synced blocks " + + "will be safely removed when the Database restarts.", + cpb0.getId()); + Thread.currentThread().interrupt(); + return; + } + } + else { + // The #triggerSync method may be called when the database + // is stopped during test cases + Logger.warn( + "The database is being asked to sync blocks, even though it is not running."); + tasks.forEach(task -> task.run()); + } } String id = Long.toString(Time.now()); cpb.add((cpb0 = Block.createPrimaryBlock(id, @@ -699,39 +780,35 @@ public BlockLoader(Class clazz, String directory, List blocks) { this.blocks = blocks; } - @SuppressWarnings("deprecation") @Override public void run() { - File _file = null; - try { - final String path = backingStore + File.separator + directory; - FileSystem.mkdirs(path); - SortedMap blockSorter = Maps - .newTreeMap(NaturalSorter.INSTANCE); - Set checksums = Sets.newHashSet(); - for (File file : new File(path).listFiles(new FilenameFilter() { - - @Override - public boolean accept(File dir, String name) { - return dir.getAbsolutePath() - .equals(new File(path).getAbsolutePath()) - && name.endsWith(Block.BLOCK_NAME_EXTENSION); - } - - })) { - _file = file; + Path path = Paths.get(backingStore, directory); + path.toFile().mkdirs(); + SortedMap sorted = Maps.newTreeMap(NaturalSorter.INSTANCE); + Set checksums = Sets.newHashSet(); + Stream files = FileSystem.ls(path) + .filter(file -> file.toString() + .endsWith(Block.BLOCK_NAME_EXTENSION)) + .map(Path::toFile); + files.forEach(file -> { + try { String id = Block.getId(file.getName()); - Constructor constructor = clazz.getDeclaredConstructor( - String.class, String.class, Boolean.TYPE); - constructor.setAccessible(true); String checksum = Files.asByteSource(file) - .hash(Hashing.md5()).toString(); + .hash(Hashing.murmur3_128()).toString(); if(!checksums.contains(checksum)) { - blockSorter.put(file, constructor.newInstance(id, - path.toString(), true)); - Logger.info("Loaded {} metadata for {}", - clazz.getSimpleName(), file.getName()); - checksums.add(checksum); + try { + T block = Reflection.newInstance(clazz, id, + path.toString(), true); + sorted.put(file, block); + checksums.add(checksum); + Logger.info("Loaded {} metadata for {}", + clazz.getSimpleName(), file.getName()); + } + catch (MalformedBlockException e) { + Logger.warn( + "{}. As a result the Block was NOT loaded. A malformed block is usually an indication that the Block was only partially synced to disk before Concourse Server shutdown. In this case, it is safe to delete any Block files that were written for id {}", + e.getMessage(), id); + } } else { Logger.warn( @@ -740,19 +817,16 @@ public boolean accept(File dir, String name) { + "delete this file.", clazz.getSimpleName(), id); } - } - blocks.addAll(blockSorter.values()); - } - catch (ReflectiveOperationException | IOException e) { - Logger.error( - "An error occured while loading {} metadata for {}", - clazz.getSimpleName(), _file.getName()); - Logger.error("", e); - } - + catch (IOException e) { + Logger.error( + "An error occured while loading {} metadata for {}", + clazz.getSimpleName(), file.getName()); + Logger.error("", e); + } + }); + blocks.addAll(sorted.values()); } - } /** diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/MalformedBlockException.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/MalformedBlockException.java new file mode 100644 index 0000000000..6ab2aa30e1 --- /dev/null +++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/MalformedBlockException.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2013-2019 Cinchapi Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cinchapi.concourse.server.storage.db; + +import com.cinchapi.common.base.AnyStrings; + +/** + * A {@link MalformedBlockException} is thrown when a Block does not contain all + * of the components (e.g. stats, index, filter, etc) necessary to function + * properly. + *

+ * A "malformed" block is usually an indicator that the Database exited in the + * middle of a syncing operation. Malformed blocks can usually be safely + * discared. + *

+ * + * @author Jeff Nelson + */ +public class MalformedBlockException extends RuntimeException { + + private static final long serialVersionUID = -1721757690680045080L; + + /** + * Construct a new instance. + * + * @param id + * @param directory + * @param missing + */ + public MalformedBlockException(String id, String directory, + String... missing) { + super(AnyStrings.format("Block {} in {} is missing {}", id, directory, + String.join(", ", missing))); + } + +} diff --git a/settings.gradle b/settings.gradle index 0891c80aa1..3dba3a79dc 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,2 +1,2 @@ rootProject.name = "concourse" -include "concourse-plugin-core", "concourse-driver-java", "concourse-server", "concourse-shell", "concourse-integration-tests", "concourse-cli", "concourse-import", "concourse-upgrade-tests", "concourse-unit-test-core", "concourse-ete-test-core", "concourse-plugin-core-tests" +include "concourse-plugin-core", "concourse-driver-java", "concourse-server", "concourse-shell", "concourse-integration-tests", "concourse-cli", "concourse-import", "concourse-upgrade-tests", "concourse-unit-test-core", "concourse-ete-test-core", "concourse-plugin-core-tests", "concourse-ete-tests"