From f89c83b02d0b694c75be9deb58eb9a082428d0d5 Mon Sep 17 00:00:00 2001 From: Jeff Nelson Date: Mon, 17 Jun 2019 08:52:48 -0400 Subject: [PATCH 1/4] set version number to 0.10.0 --- .version | 2 +- CHANGELOG.md | 2 +- README.md | 4 ++-- concourse-driver-java/README.md | 4 ++-- concourse-driver-php/composer.json | 2 +- interface/concourse.thrift | 2 +- 6 files changed, 8 insertions(+), 8 deletions(-) diff --git a/.version b/.version index 3eefcb9dd5..78bc1abd14 100644 --- a/.version +++ b/.version @@ -1 +1 @@ -1.0.0 +0.10.0 diff --git a/CHANGELOG.md b/CHANGELOG.md index 65f593a648..7dc4f0ab43 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ ## Changelog -#### Version 1.0.0 (TBD) +#### Version 0.10.0 (TBD) * Added an iterative connection builder that is accessible using the `Concourse.at()` static factory method. * Refactored the `concourse-import` framework to take advantage of version `1.1.0+` of the `data-transform-api` which has a more flexible notion of data transformations. As a result of this change, the `Importables` utility class has been removed. Custom importers that extend `DelimitedLineImporter` can leverage the protected `parseObject` and `importLines` methods to hook into the extraction and import logic in a manner similar to what was possible using the `Importables` functions. * Added the `com.cinchapi.concourse.valididate.Keys` utility class which contains the `#isWritable` method that determines if a proposed key can be written to Concourse. diff --git a/README.md b/README.md index d2cb1d02fa..d05477e08a 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,13 @@ # Concourse - ![](https://img.shields.io/badge/version-1.0.0-green.svg) + ![](https://img.shields.io/badge/version-0.10.0-green.svg) ![](https://img.shields.io/badge/status-alpha-orange.svg) ![](https://img.shields.io/badge/license-Apache%202-blue.svg) [![Join the chat at https://gitter.im/cinchapi/concourse](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/cinchapi/concourse?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![](https://circleci.com/gh/cinchapi/concourse.svg?style=shield&circle-token=954a20e6114d649b1b6a046d95b953e7d05d2e2f)](https://circleci.com/gh/cinchapi/concourse) > [Concourse](http://concoursedb.com) is a distributed database warehouse for transactions search and analytics across time. Developers prefer Concourse because it simplifies building misssion-critical systems with on-demand data intelligence. Furthermore, Concourse makes end-to-end data management trivial by requiring no extra infrastructure, no prior configuration and no continuous tuning–all of which greatly reduce costs, and allow developers to focus on core business problems. -This is version 1.0.0 of Concourse. +This is version 0.10.0 of Concourse. ## Quickstart ### Docker diff --git a/concourse-driver-java/README.md b/concourse-driver-java/README.md index 6b01a25ec0..2c572fcfad 100644 --- a/concourse-driver-java/README.md +++ b/concourse-driver-java/README.md @@ -12,14 +12,14 @@ The concourse jar is available at [Maven Central](http://search.maven.org/#searc } dependencies { - compile 'com.cinchapi:concourse-driver-java:1.0.0+' + compile 'com.cinchapi:concourse-driver-java:0.10.0+' } If you prefer to use another dependency manager like Maven or Ivy, then use the following project information when declaring the dependency: GroupId: com.cinchapi ArtifactId: concourse-driver-java - Version: 1.0.0+ + Version: 0.10.0+ Alternatively, you can [download](http://cinchapi.org/concourse/download-api) the latest jar and manually add it to your project's classpath. diff --git a/concourse-driver-php/composer.json b/concourse-driver-php/composer.json index e8d2bdddc3..539ac12624 100644 --- a/concourse-driver-php/composer.json +++ b/concourse-driver-php/composer.json @@ -1,7 +1,7 @@ { "name": "cinchapi/concourse-driver-php", "description": "Official PHP driver for Concourse", - "version": "1.0.0", + "version": "0.10.0", "type": "library", "keywords": ["concourse", "nosql", "bigdata"], "homepage": "http://concoursedb.com", diff --git a/interface/concourse.thrift b/interface/concourse.thrift index a727822b89..9cb355f4b7 100644 --- a/interface/concourse.thrift +++ b/interface/concourse.thrift @@ -48,7 +48,7 @@ namespace rb concourse.thrift # # As much as possible, try to preserve backward compatibility so that # Concourse Server can always talk to older drivers. -const string VERSION = "1.0.0" +const string VERSION = "0.10.0" # This value is passed over the wire to represent a null value, usually # for get/select methods where a key/record has no data. From 1fb63d67b3ec32e0ebd9051379e12e5841b277eb Mon Sep 17 00:00:00 2001 From: Jeff Nelson Date: Tue, 18 Jun 2019 18:18:15 -0400 Subject: [PATCH 2/4] CON-654: Make database async tasks (loading, accepting and syncing writes) responsive to stop request. Use traditional ExecutorService to signal that background threads should stop when Database is stopping. Also use a thread factory that doesn't categorize async threads as daemon threads --- .../concourse/server/storage/db/Database.java | 91 +++++++++++++++---- 1 file changed, 74 insertions(+), 17 deletions(-) diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java index c50f003001..d595b32fad 100644 --- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java +++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java @@ -26,6 +26,9 @@ import java.util.Map; import java.util.Set; import java.util.SortedMap; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Stream; @@ -33,10 +36,10 @@ import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import com.cinchapi.common.collect.concurrent.ThreadFactories; import com.cinchapi.common.reflect.Reflection; import com.cinchapi.concourse.annotate.Restricted; import com.cinchapi.concourse.server.GlobalState; -import com.cinchapi.concourse.server.concurrent.ConcourseExecutors; import com.cinchapi.concourse.server.io.Composite; import com.cinchapi.concourse.server.io.FileSystem; import com.cinchapi.concourse.server.jmx.ManagedOperation; @@ -62,6 +65,7 @@ import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -190,10 +194,8 @@ private static Cache buildCache() { * another is by the directory in which they are stored. */ private static final String PRIMARY_BLOCK_DIRECTORY = "cpb"; - private static final String SEARCH_BLOCK_DIRECTORY = "ctb"; private static final String SECONDARY_BLOCK_DIRECTORY = "csb"; - private static final String threadNamePrefix = "database-write-thread"; /** * A flag to indicate if the Database has verified the data it is seeing is @@ -255,6 +257,18 @@ private static Cache buildCache() { */ private transient boolean running = false; + /** + * An {@link ExecutorService} that handles asynchronous writing tasks in the + * background. + */ + private transient ExecutorService writer; + + /** + * An {@link ExecutorService} that handles asynchronous reading tasks in the + * background. + */ + private transient ExecutorService reader; + /** * Construct a Database that is backed by the default location which is in * {@link GlobalState#DATABASE_DIRECTORY}. @@ -273,6 +287,10 @@ public Database() { */ public Database(String backingStore) { this.backingStore = backingStore; + this.writer = Executors.newCachedThreadPool( + ThreadFactories.namingThreadFactory("database-write-thread")); + this.reader = Executors.newCachedThreadPool( + ThreadFactories.namingThreadFactory("Storage Block Loader")); } @Override @@ -293,9 +311,22 @@ && verify(write.getKey().toString(), // NOTE: Write locking happens in each individual Block, and // furthermore this method is only called from the Buffer, which // transports data serially. - ConcourseExecutors.executeAndAwaitTermination(threadNamePrefix, - new BlockWriter(cpb0, write), new BlockWriter(csb0, write), - new BlockWriter(ctb0, write)); + List> tasks = ImmutableList.of( + Executors.callable(new BlockWriter(cpb0, write)), + Executors.callable(new BlockWriter(csb0, write)), + Executors.callable(new BlockWriter(ctb0, write))); + try { + writer.invokeAll(tasks); + } + catch (InterruptedException e) { + Logger.warn( + "The database was interrupted while trying to accept {}. " + + "If the write could not be fully accepted, it will " + + "remain in the buffer and re-tried when the Database is able to accept writes.", + write); + Thread.currentThread().interrupt(); + return; + } } else { Logger.warn( @@ -474,14 +505,23 @@ public void start() { running = true; Logger.info("Database configured to store data in {}", backingStore); - ConcourseExecutors.executeAndAwaitTerminationAndShutdown( - "Storage Block Loader", - new BlockLoader(PrimaryBlock.class, - PRIMARY_BLOCK_DIRECTORY, cpb), - new BlockLoader(SecondaryBlock.class, - SECONDARY_BLOCK_DIRECTORY, csb), - new BlockLoader(SearchBlock.class, - SEARCH_BLOCK_DIRECTORY, ctb)); + List> tasks = ImmutableList.of( + Executors.callable(new BlockLoader( + PrimaryBlock.class, PRIMARY_BLOCK_DIRECTORY, cpb)), + Executors.callable(new BlockLoader( + SecondaryBlock.class, SECONDARY_BLOCK_DIRECTORY, + csb)), + Executors.callable(new BlockLoader( + SearchBlock.class, SEARCH_BLOCK_DIRECTORY, ctb))); + try { + reader.invokeAll(tasks); + } + catch (InterruptedException e) { + Logger.error("The Database was interrupted while starting...", + e); + Thread.currentThread().interrupt(); + return; + } // CON-83: Get rid of any blocks that aren't "balanced" (e.g. has // primary and secondary) under the assumption that the server @@ -504,6 +544,8 @@ public void start() { public void stop() { if(running) { running = false; + reader.shutdown(); + writer.shutdown(); } } @@ -657,9 +699,24 @@ private void triggerSync(boolean doSync) { if(doSync) { // TODO we need a transactional file system to ensure that these // blocks are written atomically (all or nothing) - ConcourseExecutors.executeAndAwaitTermination(threadNamePrefix, - new BlockSyncer(cpb0), new BlockSyncer(csb0), - new BlockSyncer(ctb0)); + List> tasks = ImmutableList.of( + Executors.callable(new BlockSyncer(cpb0)), + Executors.callable(new BlockSyncer(csb0)), + Executors.callable(new BlockSyncer(ctb0))); + try { + writer.invokeAll(tasks); + } + catch (InterruptedException e) { + Logger.warn( + "The database was interrupted while trying to " + + "sync storage blocks for {}. Since the blocks " + + "were not fully synced, the contained writes are " + + "still in the Buffer. Any partially synced blocks " + + "will be safely removed when the Database restarts.", + cpb0.getId()); + Thread.currentThread().interrupt(); + return; + } } String id = Long.toString(Time.now()); cpb.add((cpb0 = Block.createPrimaryBlock(id, From a8cd071ff85a4a8010839af33cde5487b9a31f61 Mon Sep 17 00:00:00 2001 From: Jeff Nelson Date: Tue, 18 Jun 2019 18:30:48 -0400 Subject: [PATCH 3/4] compiler hints --- .../com/cinchapi/concourse/server/storage/db/Database.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java index d595b32fad..48252128ee 100644 --- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java +++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java @@ -311,7 +311,7 @@ && verify(write.getKey().toString(), // NOTE: Write locking happens in each individual Block, and // furthermore this method is only called from the Buffer, which // transports data serially. - List> tasks = ImmutableList.of( + List> tasks = ImmutableList.of( Executors.callable(new BlockWriter(cpb0, write)), Executors.callable(new BlockWriter(csb0, write)), Executors.callable(new BlockWriter(ctb0, write))); @@ -505,7 +505,7 @@ public void start() { running = true; Logger.info("Database configured to store data in {}", backingStore); - List> tasks = ImmutableList.of( + List> tasks = ImmutableList.of( Executors.callable(new BlockLoader( PrimaryBlock.class, PRIMARY_BLOCK_DIRECTORY, cpb)), Executors.callable(new BlockLoader( @@ -699,7 +699,7 @@ private void triggerSync(boolean doSync) { if(doSync) { // TODO we need a transactional file system to ensure that these // blocks are written atomically (all or nothing) - List> tasks = ImmutableList.of( + List> tasks = ImmutableList.of( Executors.callable(new BlockSyncer(cpb0)), Executors.callable(new BlockSyncer(csb0)), Executors.callable(new BlockSyncer(ctb0))); From 60ddf75fa0e9fb5ab822a26491527d5325b26552 Mon Sep 17 00:00:00 2001 From: Jeff Nelson Date: Wed, 19 Jun 2019 07:08:48 -0400 Subject: [PATCH 4/4] Fix unit test failure --- .../concourse/server/storage/db/Database.java | 86 ++++++++++++------- 1 file changed, 54 insertions(+), 32 deletions(-) diff --git a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java index 48252128ee..5e9776914d 100644 --- a/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java +++ b/concourse-server/src/main/java/com/cinchapi/concourse/server/storage/db/Database.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -287,10 +288,6 @@ public Database() { */ public Database(String backingStore) { this.backingStore = backingStore; - this.writer = Executors.newCachedThreadPool( - ThreadFactories.namingThreadFactory("database-write-thread")); - this.reader = Executors.newCachedThreadPool( - ThreadFactories.namingThreadFactory("Storage Block Loader")); } @Override @@ -311,21 +308,32 @@ && verify(write.getKey().toString(), // NOTE: Write locking happens in each individual Block, and // furthermore this method is only called from the Buffer, which // transports data serially. - List> tasks = ImmutableList.of( - Executors.callable(new BlockWriter(cpb0, write)), - Executors.callable(new BlockWriter(csb0, write)), - Executors.callable(new BlockWriter(ctb0, write))); - try { - writer.invokeAll(tasks); + List tasks = ImmutableList.of( + new BlockWriter(cpb0, write), new BlockWriter(csb0, write), + new BlockWriter(ctb0, write)); + if(running) { + try { + List> writes = tasks.stream() + .map(Executors::callable) + .collect(Collectors.toList()); + writer.invokeAll(writes); + } + catch (InterruptedException e) { + Logger.warn( + "The database was interrupted while trying to accept {}. " + + "If the write could not be fully accepted, it will " + + "remain in the buffer and re-tried when the Database is able to accept writes.", + write); + Thread.currentThread().interrupt(); + return; + } } - catch (InterruptedException e) { + else { + // The #accept method may be called when the database is stopped + // during test cases Logger.warn( - "The database was interrupted while trying to accept {}. " - + "If the write could not be fully accepted, it will " - + "remain in the buffer and re-tried when the Database is able to accept writes.", - write); - Thread.currentThread().interrupt(); - return; + "The database is being asked to accept a Write, even though it is not running."); + tasks.forEach(task -> task.run()); } } else { @@ -505,6 +513,10 @@ public void start() { running = true; Logger.info("Database configured to store data in {}", backingStore); + this.writer = Executors.newCachedThreadPool(ThreadFactories + .namingThreadFactory("database-write-thread")); + this.reader = Executors.newCachedThreadPool(ThreadFactories + .namingThreadFactory("Storage Block Loader")); List> tasks = ImmutableList.of( Executors.callable(new BlockLoader( PrimaryBlock.class, PRIMARY_BLOCK_DIRECTORY, cpb)), @@ -699,23 +711,33 @@ private void triggerSync(boolean doSync) { if(doSync) { // TODO we need a transactional file system to ensure that these // blocks are written atomically (all or nothing) - List> tasks = ImmutableList.of( - Executors.callable(new BlockSyncer(cpb0)), - Executors.callable(new BlockSyncer(csb0)), - Executors.callable(new BlockSyncer(ctb0))); - try { - writer.invokeAll(tasks); + List tasks = ImmutableList.of(new BlockSyncer(cpb0), + new BlockSyncer(csb0), new BlockSyncer(ctb0)); + if(running) { + try { + List> syncs = tasks.stream() + .map(Executors::callable) + .collect(Collectors.toList()); + writer.invokeAll(syncs); + } + catch (InterruptedException e) { + Logger.warn( + "The database was interrupted while trying to " + + "sync storage blocks for {}. Since the blocks " + + "were not fully synced, the contained writes are " + + "still in the Buffer. Any partially synced blocks " + + "will be safely removed when the Database restarts.", + cpb0.getId()); + Thread.currentThread().interrupt(); + return; + } } - catch (InterruptedException e) { + else { + // The #triggerSync method may be called when the database + // is stopped during test cases Logger.warn( - "The database was interrupted while trying to " - + "sync storage blocks for {}. Since the blocks " - + "were not fully synced, the contained writes are " - + "still in the Buffer. Any partially synced blocks " - + "will be safely removed when the Database restarts.", - cpb0.getId()); - Thread.currentThread().interrupt(); - return; + "The database is being asked to sync blocks, even though it is not running."); + tasks.forEach(task -> task.run()); } } String id = Long.toString(Time.now());