feat: Adds lag reporting and API for use in lag aware routing as desc#1
feat: Adds lag reporting and API for use in lag aware routing as desc#1AlanConfluent wants to merge 22 commits intomasterfrom
Conversation
| final long nowMs = clock.millis(); | ||
| final HostInfoEntity hostInfoEntity = lagReportingRequest.getHostInfo(); | ||
| final HostInfo hostInfo = new HostInfo(hostInfoEntity.getHost(), hostInfoEntity.getPort()); | ||
| receivedLagInfo.remove(hostInfo); |
There was a problem hiding this comment.
Why do you remove the entry here? All you want to do is update the offsets and lag information if the current request has newer information. Keep in mind, requests may arrive out of order
There was a problem hiding this comment.
I remove the entry so as to produce one consistent view of the data, per host. I could check the timestamps and ignore if it's an older request, if that's what you mean. I'll do that.
If you don't remove data for this host, then over time, it will be littered with state store that no longer exist or partitions that the host no longer supports. Maybe this isn't a huge deal, but why not be consistent.
vinothchandar
left a comment
There was a problem hiding this comment.
Made a high level pass. Can do a deeper pass once again sometime once I settled there
| * This represents a unique store in the system and the basis for getting lag information from | ||
| * KafkaStreams, exposed by the QueryMetadata. | ||
| */ | ||
| public final class LagInfoKey { |
There was a problem hiding this comment.
wonder if this class should reside somewhere close to the agent itself.. ksql-engine is more for the core sql objects/constructs I think?
There was a problem hiding this comment.
That makes sense, though we're in need of some way to refer to a state store more generally. At request time, we'll need to pick through persistent query state in order to create this, which isn't easy since it's kind of hidden. It might make sense to add some method getStateStoreId which returns this kind of object from one of the persistent query data structures (and has access to internal state like state store name), and generalizes it a bit instead of making this key lag related. What do you think? Also, which persistent query data structure would own that StateStoreId?
There was a problem hiding this comment.
I made a class QueryStateStoreId as a rest entity since it's used as part of the API and has jackson annotations. Tell me if you have another recommendation here.
| * KafkaStreams, exposed by the QueryMetadata. | ||
| */ | ||
| public final class LagInfoKey { | ||
| private static final String SEPARATOR = "$"; |
There was a problem hiding this comment.
there is a KLIP-16 on k$ dynamic views.. might want to check that once to see if the $ is still a safe separator
| /** | ||
| * A listener for heartbeat related events. | ||
| */ | ||
| public interface HeartbeatListener { |
There was a problem hiding this comment.
HostStatusListener?
nit: we use Node in the KsLocator code.. and use Host here? might be good to use one?
There was a problem hiding this comment.
I guess I was thinking there might be other events, but maybe that was overthinking it. Changed to HostStatusListener.
That's not a bad idea to factor out a single host entity, though I think Vicky is adding status to Node, and I'm planning on adding Lag, so Node will represent the joining of all of this data. Maybe it will make sense to revisit that idea after the routing changes are made.
| ).define( | ||
| KSQL_LAG_REPORTING_SEND_INTERVAL_MS_CONFIG, | ||
| Type.LONG, | ||
| 1000L, |
There was a problem hiding this comment.
can we make this something like 3 or 5 seconds. I know it sounds conservative.. but since this is RPC on kafka, best to err on that side
There was a problem hiding this comment.
Sure. Choosing 5 seconds
|
|
||
| final Map<String, Map<Integer, LagInfo>> storeToPartitionToLagMap = currentQueries.stream() | ||
| .filter(Objects::nonNull) | ||
| .flatMap(pmd -> pmd.getStoreToPartitionToLagMap().entrySet().stream() |
There was a problem hiding this comment.
could we just access the KafkaStreams instance here and fetch the lag map on this class, instead of QueryMetadata? Technically, the lag is not metadata about the query itself and it might be best to consolidate all the lag related code, to the agent
There was a problem hiding this comment.
Sure. Moved the code that fetches the map to this class.
| * and debug resources. | ||
| */ | ||
| public Map<String, Map<String, Map<Integer, LagInfoEntity>>> listAllLags() { | ||
| return receivedLagInfo.entrySet().stream() |
There was a problem hiding this comment.
this block of code, I find it hard to read.. Do you think we can just have a class TablePartitionLagInfo which wraps a queryId, storeName, partition, lagInfo and simply manage a list/map tht is just one level? I went with such a nested map because thats how Streams code was.. but I think we can add an abstraction here, to make this easier?
There was a problem hiding this comment.
To make it a single level which would be usable by Jackson (which this is since it's exposed in an endpoint) would require "stringifying" the host, storename, partition as a single key and would make it harder to pick apart
in tests. I'll instead use Jackson-friendly types rather than Strings if that would make it easier to read.
I'm fine to rewrite this as a normal block without using so many streams. I went a little overboard :-)
| Map<LagInfoKey, Map<Integer, LagInfo>> storeToPartitionToLagMap = null; | ||
| try { | ||
| storeToPartitionToLagMap = kafkaStreams.allLocalStorePartitionLags().entrySet().stream() | ||
| .map(e -> Pair.of(LagInfoKey.of(getQueryApplicationId(), e.getKey()), e.getValue())) |
There was a problem hiding this comment.
(I think you are handling this already). Note to self: When a new query is scheduled, it's possible that the sending node has the store created for it already i.e the aware of the query while the receiving node still may not have seen the queryID being scheduled on it.. So if we expect the receiving node to do any checks based on the queryApplicationId(), it may fail in a small window
|
|
||
| @Override | ||
| public void onHostStatusUpdated(final Map<String, HostStatusEntity> hostsStatusMap) { | ||
| aliveHosts.clear(); |
There was a problem hiding this comment.
do we need to synchronize between this update and the send lag iterating over the map to send lag to other nodes?
There was a problem hiding this comment.
Yes, good call. There's technically a race as it is here.
I changed it to create an immutable set and then updates an AtomicReference.
| import javax.ws.rs.core.MediaType; | ||
| import javax.ws.rs.core.Response; | ||
|
|
||
| @Path("/lag") |
There was a problem hiding this comment.
same comment as in vicky's PR.. should we just consolidate all of this into the cluster status resource.. atleast the active/standby info and lag info for the same partitions reported on that endpoint, can be consolidated?
There was a problem hiding this comment.
Done. Moved the lag info to that endpoint.
Reporting still belongs in its own resource though, so I left that here.
|
|
||
| @Immutable | ||
| @JsonIgnoreProperties(ignoreUnknown = true) | ||
| public class LagInfoEntity { |
There was a problem hiding this comment.
note to self : this class is pretty much same as LagInfo in streams.. but we need all these jackson annotations.. so had to create a new one
There was a problem hiding this comment.
There's that and also the fact that the constructor of LagInfo is not public. If you could make those changes to the original object, this could be removed.
d3e4f95 to
f35a7bc
Compare
Description
What behavior do you want to change, why, how does your patch achieve the changes?
Testing done
Describe the testing strategy. Unit and integration tests are expected for any behavior changes.
Reviewer checklist