Conversation
2566b66 to
519f9dd
Compare
Codecov Report
@@ Coverage Diff @@
## master #4632 +/- ##
=========================================
+ Coverage 77.92% 78.6% +0.68%
=========================================
Files 198 199 +1
Lines 8841 8879 +38
Branches 614 624 +10
=========================================
+ Hits 6889 6979 +90
+ Misses 1952 1900 -52
Continue to review full report at Codecov.
|
519f9dd to
2bb8648
Compare
| # Maximum request size. By default it uses the MAX_ACTIVATION_LIMIT as computed from | ||
| # `whisk.activation.payload`. Bump it to higher value if the activation result also | ||
| # includes the logs | ||
| # max-request-size = 2 MB |
There was a problem hiding this comment.
is it possible to have conflicting size limits (what if this is lower than the activation payload)?
There was a problem hiding this comment.
Yes thats possible. Probably I can add a programatic check in KafkaActivationStore where we pick the max and log a warning. Purpose here is to ensure that no activation is rejected due to max payload size error as the fallback route to add to db is disabled
| logging.info( | ||
| this, | ||
| s"posted activation to Kafka topic ${config.activationsTopic} - ${activation.activationId}") | ||
| } |
common/scala/src/main/scala/org/apache/openwhisk/http/BasicRasService.scala
Outdated
Show resolved
Hide resolved
| activations { | ||
| # Enable this if ActivationStore is to be disabled and all activation results are | ||
| # to be sent to Kafka topic for further processing by ActivationPersisterService | ||
| activation-store-enabled = true |
There was a problem hiding this comment.
this is strangely named: you enable this property to send to kafka instead of the "activation store" (which i'd think is the existing store)...
There was a problem hiding this comment.
Description here was wrong. Instead it would be
# Disable this if ActivationStore should not be used and all activation results are
# to be sent to Kafka topic for further processing by ActivationPersisterService
| activationStore.storeAfterCheck(activation, context)(tid, notifier = None) | ||
| private val store = if (activationStorageConfig.activationStoreEnabled) { | ||
| (tid: TransactionId, activation: WhiskActivation, context: UserContext) => | ||
| { |
There was a problem hiding this comment.
Technically not as its a single statement but it help in clarifying the boundaries
There was a problem hiding this comment.
I'm getting a little confused, if we set activationStoreEnabled to false, the store will do nothing here(code will not jump to KafkaActivationStore.store neither), then how can activations be sent to kafka? I see that only blocking activations result will be sent to completedN topic, but for non-blocking activations, they will be missing
There was a problem hiding this comment.
Support for non-blocking activations is currently pending.
There was a problem hiding this comment.
Is there any update on whether this will support non-blocking activations?
|
|
||
| case class KafkaActivationStoreConfig(activationsTopic: String, db: Boolean, maxRequestSize: Option[ByteSize]) | ||
|
|
||
| class KafkaActivationStore(producer: MessageProducer, |
There was a problem hiding this comment.
is this loaded/activated via SPI?
There was a problem hiding this comment.
Yes via ActivationStore SPI
| implicit transid: TransactionId, | ||
| notifier: Option[CacheChangeNotification]): Future[DocInfo] = { | ||
| if (config.db) { | ||
| sendToKafka(activation).flatMap(_ => super.store(activation, context)) |
There was a problem hiding this comment.
should there be a series of stores that are configured at top level, all of which receive the activation - to keep the db setting from trickling down to subtypes?
There was a problem hiding this comment.
With recent addition of ActivationStoreWrapper this is now handled and KafkaActivationStore would now delegate to the configured primary store here if configured
| ConsumerSettings(system, new StringDeserializer, new ByteArrayDeserializer) | ||
| .withGroupId(config.groupId) | ||
| .withBootstrapServers(config.kafkaHosts) | ||
| .withProperty(ConsumerConfig.CLIENT_ID_CONFIG, config.clientId) |
There was a problem hiding this comment.
Would it be worth to make this blank or to use the unique ID for each instance?
We may want to scale out the microservice at some point.
Then each instance will form one consumer group.
There was a problem hiding this comment.
Can be done. Currently client.id is used to access the lag stats from JMX MBean. Hence made it deterministic via config.
From docs for client.id I was not sure of its significance in multi consumer group settings. Currently if you do not specify the id then it gets something like consumer-0 based on a static counter. So 2 consumer in same group would still end up getting same id.
An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.
There was a problem hiding this comment.
Oh yes I got confused.
I thought the client id should be unique.
But actually a consumer id should be unique.
And it is generated with random UUID.
https://github.com/apache/kafka/blob/e24d0e22abb0fb3e4cb3974284a3dad126544584/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala#L373
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
completed0 0 347324 347324 0 consumer-1-0da778fb-61f2-414c-8c36-82b60e60fdc4 /10.64.74.90 consumer-1
- - - - - dominic-7fc8c30b-8b99-46b0-84e9-33024f466d15 /10.64.74.90 dominic
- - - - - consumer-1-8e518874-1789-42cd-b0f4-914b8e610ba5 /10.64.71.195 consumer-1
- - - - - dominic-397e49df-5cd7-4ef8-a0cd-7cc75ad198c8 /10.64.74.90 dominic
- - - - - consumer-1-61d8894f-0d53-43c8-b2d4-cb33bb47be96 /10.64.74.90 consumer-1
| # - earliest : automatically reset the offset to the earliest offset | ||
| # - latest : automatically reset the offset to the latest offset | ||
| # - none : throw exception to the consumer if no previous offset is found for the consumer's group | ||
| auto.offset.reset = "earliest" |
There was a problem hiding this comment.
This implies the multiple processing of the same activation.
Some activation stores would just override existing activations but others may not.
There was a problem hiding this comment.
Yes the semantics being implemented here are at-least-once processing. So same activation may be processed multiple times
| //Recover for conflict case as its possible in case of unclean shutdown persister need to process | ||
| // same activation again | ||
| //Checking the chain as exception may get wrapped | ||
| case t if Throwables.getRootCause(t).isInstanceOf[DocumentConflictException] => Future.successful(Done) |
There was a problem hiding this comment.
oh here, it would take care of the situation where same activations are processed multiple time.
| actorSystem: ActorSystem, | ||
| actorMaterializer: ActorMaterializer, | ||
| logging: Logging) | ||
| extends ArtifactActivationStore(actorSystem, actorMaterializer, logging) { |
There was a problem hiding this comment.
Is there any possibility that KafkaActivationStore can be used with other ActivationStore rather than just ArtifactActivationStore?
There was a problem hiding this comment.
I think it's hard for KafkaActivationStore to be compatible with other ActivationStore as different ActivationStore may have different definition for methods store, get, delete and countActivationsInNamespace, and so on, and we cannot define these methods except the store in the KafkaActivationStore
There was a problem hiding this comment.
Current ActivationPersisterService is also using the ActivationStore for storage. So in theory we can support others.
I can introduce a config to capture base activation store and then have KafkaActivationStore act as decorator for that. Would give it a try
There was a problem hiding this comment.
This is implemented now. There is now a config to configure the primary store via primary-store-provider which defaults to ArtifactActivationStoreProvider
whisk {
kafka-activation-store {
# Name of the Kafka topic for sending WhiskActivations
# Set to
# - `completed-others` - When LogDriverLogStore is used
# - `activations` - When other LogStore is used where logs are stored as part of activation result
activations-topic = "completed-others"
# Primary activation store used for eventual storage and querying of activation records
primary-store-provider = org.apache.openwhisk.core.database.ArtifactActivationStoreProvider
# Also store directly to primary ActivationStore. This mode can be used for initial
# trial runs where data is sent to both primary store and also via Kafka to ActivationPersisterService
store-in-primary = false
# Maximum request size. By default it uses the MAX_ACTIVATION_LIMIT as computed from
# `whisk.activation.payload`. Bump it to higher value if the activation result also
# includes the logs
# max-request-size = 2 MB
}
}So one can configure any ActivationStore impl and KafkaActivationStore would wrap it and handles the store flow. Rest of the flow is directed back to primary store
| # Set to | ||
| # - `completed-others` - When LogDriverLogStore is used | ||
| # - `activations` - When other LogStore is used where logs are stored as part of activation result | ||
| activations-topic = "completed-others" |
There was a problem hiding this comment.
I feel there may be a more intuitive name for this topic(completed-others).
It makes me vision activations sent to some misc controllers or else at first sight.
I am not quite sure it's worth to do and I know this is to use the topic pattern, but how about using a more intuitive name? such as activations-with-logs and just activations?
Opinion from others would be appreciated.
There was a problem hiding this comment.
I also do not like this name :).
For LogDriverLogStore case I need a name which confirms to pattern completed.* as consumer listens to all topic with that pattern. So any name confirming to that can be used
There was a problem hiding this comment.
Was also thinking this about the name. Would completed-async work? It's at the very least much more descriptive than others
77c446a to
9becb4a
Compare
652772a to
fcf4759
Compare
7af11b2 to
2460cdc
Compare
This is done to ensure that we can override the settings in application.conf defined in common
|
@chetanmeh |
This PR introduces a new micro service -
Activation Persister Service(APS). This is based on discussion done here and here.Description
This service fullfills following objective
With this service the activation results would now instead written to a Kafka topic and then picked by this service and then saved in db. The approach taken here varies with following parameters
Components
The implementation consist of following high level components
core/persistermodule. It is based on Alpakka Kafka and reads the activation record from Kafka and saves them to dbKafkaActivationStore- An extension ofArtifactActivationStorewhich also routes to Kafka viaMessageProducerconnectorsendlogic would be modfied (details below)completed-others- This topic is meant to be used for setups usingLogDriverLogStorei.e. where logs are not stored in activation result. Further note that the name of topic confirms to patterncompleted.*such that Kafka consumer used within ActivationPersisterService can pickup this topic along with other completed topics created per controllercompleted<controllerId>as they are strictly used for blocking caseactivations- This topic name would be used when setup stores the log within activation result. Note that in this case if APS used one need to ensure that Kafka can carry messages of size upto 10MB (or configured log limits)Some key aspects which impact the implementation are
completedtopic variesa. Blocking - The
ResultMessagecontains the activation result without logsb. Non Blocking - Only
CompletionMessageis sentActivation Persister Service
This service implements a streaming flow based on Alpakka Kafka. It runs in 2 modes
ActivationResult without Logs
If the setup does not store the activation with logs (uses some
LogDriverLogStore) then persister service would listen forResultMessage(and CombinedCompletionAndResultMessage post #4624) on allcompleted.*topics. This would also pickup records fromcompleted-otherstopic.Further how
ActivationStoregets used would changeController- Store all activations tocompleted-othersviaKafkaActivationStoreInvoker- Thestorefunction would be initialized to a noop function. Here we did not went for aNoopActivationStoreas that would cause issue when configuring this setup withStandaloneOpenWhiskfor test setup as such a setup can only have a singleActivationStoreimplInvokerReactive send flow would send all WhiskActivation result for blocking calls to existing
completed<instanceId>topic. In addition non blocking results would also get routed to Kafka tocompleted-othersActivationResult with Logs
In this mode APS would listen for messages on
activationstopic.KafkaActivationStorewould route all activations to this topicRelated issue and scope
My changes affect the following components
Types of changes
Checklist: