Support inputFormat and inputSource for sampler#8901
Conversation
|
Travis is failing because of #8779. |
| null, | ||
| new KafkaSupervisorIOConfig( | ||
| TOPIC, | ||
| new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of()), |
There was a problem hiding this comment.
nit: this, and other similar instantiations could just use JSONPathSpec.DEFAULT I think
| public T next() | ||
| { | ||
| if (!hasNext()) { | ||
| throw new NoSuchElementException(); |
There was a problem hiding this comment.
This might be kind of ugly if the transforming reader that will be wrapping this iterator from the sampler calls hasNext but this iterator is closed before it calls next. In practice I think this should be ok and handled by ExceptionThrowingIterator of CloseableIterator, but I wonder if there is a more graceful, less exception throwing way to handle this.
There was a problem hiding this comment.
Hmm, good point. I fixed to cache the next item in hasNext().
| * Caching the next item. | ||
| * Not volatile since {@link #hasNext()} and {@link #next()} are supposed to be called by the same thread. | ||
| */ | ||
| T next; |
There was a problem hiding this comment.
I think this could be slightly simplified you just check if next is null instead of validNext?
There was a problem hiding this comment.
Good point. next is never null here. Tidied up.
| ); | ||
| try (final CloseableIterator<InputRowListPlusRawValues> iterator = reader.sample(); | ||
| final IncrementalIndex<Aggregator> index = buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema); | ||
| final Closer closer1 = closer) { |
There was a problem hiding this comment.
This with the closer seems kind of sad, but no great solutions since would be sort of funny to register the tmp dir inside the try after already using it.
There was a problem hiding this comment.
Yeah, but we can tidy up later once we move to Java 11 (https://docs.oracle.com/javase/specs/jls/se9/html/jls-14.html#jls-14.20.3).
|
@jihoonson this has conflicts |
|
@fjy thanks, resolved conflicts now. |
* Support inputFormat and inputSource for sampler * Cleanup javadocs and names * fix style * fix timed shutoff input source reader * fix timed shutoff input source reader again * tidy up timed shutoff reader * unused imports * fix tc
Description
This PR is to add support of the new interfaces (
InputSourceandInputFormat) introduced in #8823 for sampler. The key changes are:FirehoseSampler->InputSourceSampler.IndexTaskSamplerSpecnow supports both old (parserandfirehose) and new (inputFormatandinputSource) APIs. You cannot mix them though.SeekableStreamSamplerSpecnow supports bothparserandinputFormat.parserandinputFormat. The supervisor will always internally fillparserif it was set in the supervisor spec to create sub tasks. Since the sub task understands both new and old APIs, it should work during the rolling update.SamplerCacheis removed now. This is because it caches the raw byte array of each row which makes sense for row-oriented file formats.SamplerResponsenow contains aMapof raw data instead ofbyte[]. This map will be serialized into JSON and sent to the caller like UI.SeekableStreamSamplerFirehosehas changed. It returned anInputRowof null if it couldn't fetch any data withinPOLL_TIMEOUT_MS. Now it waits until it fetches any data.This PR has:
This change is