Skip to content

Optional support for queued writes for CosmosDB#4513

Closed
chetanmeh wants to merge 10 commits intoapache:masterfrom
chetanmeh:cosmosdb-queued-writes
Closed

Optional support for queued writes for CosmosDB#4513
chetanmeh wants to merge 10 commits intoapache:masterfrom
chetanmeh:cosmosdb-queued-writes

Conversation

@chetanmeh
Copy link
Member

@chetanmeh chetanmeh commented Jun 14, 2019

For activations in case of high volume we are seeing connection pool related error. This PR adds an optional support for having queued writes

Description

In case of high volume of writes on Invoker we are seeing errors like

[2019-06-11T00:37:10.196Z] [ERROR] Network failureio.reactivex.netty.client.PoolExhaustedException: null    
    at io.reactivex.netty.client.ConnectionPoolImpl.performAquire(ConnectionPoolImpl.java:177)  
    at io.reactivex.netty.client.ConnectionPoolImpl.access$300(ConnectionPoolImpl.java:45)  
    at io.reactivex.netty.client.ConnectionPoolImpl$1.call(ConnectionPoolImpl.java:139) 
    at io.reactivex.netty.client.ConnectionPoolImpl$1.call(ConnectionPoolImpl.java:124) 
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:48)  
    at rx.internal.operators.OnSubscribeLift.call(OnSubscribeLift.java:30)  

This was observed with a pool size of 1000. To prevent such cases it would be better to have a more controlled way of writing documents to db. Earlier for CouchDB similar thing was done via #2812 where batched writes were used. Currently CosmosDB does not provide an easy way to perform batched writes (Azure/azure-cosmosdb-java#182).

Design

To enable controlled writes this PR introduces a QueuedExecutor (similar in spirit to existing Batcher implementation) which ensures that writes are queued and then prcessed in a controlled way. Key points

  • The queue size is tracked as a gauge metric
  • Implementation ensures that upon close any existing entry in queue should get persisted
  • Proper backpressure is not possible. So in case of very excessive writes entries would be dropped. Other option is to set a higher queue size but that can result in out of memory scenario.

Usage

  cosmosdb {
    collections {
      WhiskActivation {
        write-queue-config = {
          # Size of in memory queue. If queue gets full then put calls would be rejected
          queue-size = 100000

          # Number of concurrent connections to use to perform writes to db
          concurrency = 500
        }
      }
    }
  }

Future Enhancement

We can possibly optimize the write throughput by implementing the bulk write logic as used in CosmosDB Bulk Importer. This would require us to batch the inserts and then sort by partition and send the calls to respective partition.

Another possible option to reduce heap pressure would be to store the object in byte array form in queue and deserialize before passing to ArtifactStore

Related issue and scope

  • I opened an issue to propose and discuss this change (#????)

My changes affect the following components

  • API
  • Controller
  • Message Bus (e.g., Kafka)
  • Loadbalancer
  • Invoker
  • Intrinsic actions (e.g., sequences, conductors)
  • Data stores (e.g., CouchDB)
  • Tests
  • Deployment
  • CLI
  • General tooling
  • Documentation

Types of changes

  • Bug fix (generally a non-breaking change which closes an issue).
  • Enhancement or new feature (adds new functionality).
  • Breaking change (a bug fix or enhancement which changes existing behavior).

Checklist:

  • I signed an Apache CLA.
  • I reviewed the style guides and followed the recommendations (Travis CI will check :).
  • I added tests to cover my changes.
  • My changes require further changes to the documentation.
  • I updated the documentation where necessary.

@chetanmeh chetanmeh added the cosmosdb Issues related to CosmosDB support label Jun 14, 2019
@chetanmeh chetanmeh force-pushed the cosmosdb-queued-writes branch from 0f61278 to da594a6 Compare June 19, 2019 04:22
@chetanmeh
Copy link
Member Author

@markusthoemmes @cbickel Would be helpful if you can review the QueuedExecutor logic (which is not related to CosmosDB part). Its similar to Batcher implementation. Wanted to confirm if Akka Stream semantics are handled properly

@chetanmeh
Copy link
Member Author

gauge I think is never passed to QueuedExecutor ctor

Good catch. That confirms why queue size was coming as zero in the tests. Thats the drawback of using parameters with default values. Fix that now.

@chetanmeh chetanmeh closed this Oct 16, 2019
@chetanmeh
Copy link
Member Author

Closing it for now as we are now pursuing the Activation Persister Service approach #4632

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cosmosdb Issues related to CosmosDB support

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant