Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2441,11 +2441,12 @@ export function fillInputFormatIfNeeded(
sampleResponse: SampleResponse,
): Partial<IngestionSpec> {
if (deepGet(spec, 'spec.ioConfig.inputFormat.type')) return spec;
const specType = getSpecType(spec);

return deepSet(
spec,
'spec.ioConfig.inputFormat',
getSpecType(spec) === 'kafka'
specType === 'kafka'
? guessKafkaInputFormat(
filterMap(sampleResponse.data, l => l.input),
typeof deepGet(spec, 'spec.ioConfig.topicPattern') === 'string',
Expand Down
42 changes: 39 additions & 3 deletions web-console/src/druid-models/input-format/input-format.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,29 @@ const KNOWN_TYPES = [
'avro_stream',
'protobuf',
'regex',
'kafka',
'javascript',
'kafka',
'kinesis',
];

function generateInputFormatFields(streaming: boolean) {
return compact([
{
name: 'type',
label: 'Input format',
type: 'string',
suggestions: KNOWN_TYPES,
suggestions: [
'json',
'csv',
'tsv',
'parquet',
'orc',
'avro_ocf',
'avro_stream',
'protobuf',
'regex',
'javascript',
],
required: true,
info: (
<>
Expand Down Expand Up @@ -606,12 +619,35 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
},
];

export const KINESIS_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
{
name: 'timestampColumnName',
label: 'Kinesis timestamp column name',
type: 'string',
defaultValue: 'kinesis.timestamp',
defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
info: `The name of the column for the Kinesis timestamp.`,
},
{
name: 'partitionKeyColumnName',
label: 'Kinesis partition key column name',
type: 'string',
defaultValue: 'kinesis.partitionKey',
defined: typeIsKnown(KNOWN_TYPES, 'kinesis'),
info: `The name of the column for the Kinesis partition key. This field is useful when ingesting data from multiple partitions into the same datasource.`,
},
];

export function issueWithInputFormat(inputFormat: InputFormat | undefined): string | undefined {
return AutoForm.issueWithModel(inputFormat, BATCH_INPUT_FORMAT_FIELDS);
}

export function isKafkaOrKinesis(type: string | undefined): type is 'kafka' | 'kinesis' {
return type === 'kafka' || type === 'kinesis';
}

export function inputFormatCanProduceNestedData(inputFormat: InputFormat): boolean {
if (inputFormat.type === 'kafka') {
if (isKafkaOrKinesis(inputFormat.type)) {
return Boolean(
inputFormat.valueFormat && inputFormatCanProduceNestedData(inputFormat.valueFormat),
);
Expand Down
11 changes: 10 additions & 1 deletion web-console/src/utils/sampler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,11 @@ const KAFKA_SAMPLE_INPUT_FORMAT: InputFormat = {
valueFormat: WHOLE_ROW_INPUT_FORMAT,
};

const KINESIS_SAMPLE_INPUT_FORMAT: InputFormat = {
type: 'kinesis',
valueFormat: WHOLE_ROW_INPUT_FORMAT,
};

export async function sampleForConnect(
spec: Partial<IngestionSpec>,
sampleStrategy: SampleStrategy,
Expand All @@ -267,7 +272,11 @@ export async function sampleForConnect(
ioConfig = deepSet(
ioConfig,
'inputFormat',
samplerType === 'kafka' ? KAFKA_SAMPLE_INPUT_FORMAT : WHOLE_ROW_INPUT_FORMAT,
samplerType === 'kafka'
? KAFKA_SAMPLE_INPUT_FORMAT
: samplerType === 'kinesis'
? KINESIS_SAMPLE_INPUT_FORMAT
: WHOLE_ROW_INPUT_FORMAT,
);
}

Expand Down
72 changes: 51 additions & 21 deletions web-console/src/views/load-data-view/load-data-view.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,13 @@ import {
invalidPartitionConfig,
isDruidSource,
isEmptyIngestionSpec,
isKafkaOrKinesis,
isStreamingSpec,
issueWithIoConfig,
issueWithSampleData,
joinFilter,
KAFKA_METADATA_INPUT_FORMAT_FIELDS,
KINESIS_METADATA_INPUT_FORMAT_FIELDS,
KNOWN_FILTER_TYPES,
MAX_INLINE_DATA_LENGTH,
METRIC_SPEC_FIELDS,
Expand Down Expand Up @@ -244,30 +246,44 @@ function showKafkaLine(line: SampleEntry): string {
]).join('\n');
}

function showKinesisLine(line: SampleEntry): string {
const { input } = line;
if (!input) return 'Invalid kinesis row';
return compact([
`[ Kinesis timestamp: ${input['kinesis.timestamp']}`,
input['kinesis.partitionKey'] ? ` Partition key: ${input['kinesis.partitionKey']}` : undefined,
` Payload: ${input.raw}`,
']',
]).join('\n');
}

function showBlankLine(line: SampleEntry): string {
return line.parsed ? `[Row: ${JSONBig.stringify(line.parsed)}]` : '[Binary data]';
}

function formatSampleEntries(
sampleEntries: SampleEntry[],
druidSource: boolean,
kafkaSource: boolean,
specialSource: undefined | 'druid' | 'kafka' | 'kinesis',
): string {
if (!sampleEntries.length) return 'No data returned from sampler';

if (druidSource) {
return sampleEntries.map(showDruidLine).join('\n');
}
switch (specialSource) {
case 'druid':
return sampleEntries.map(showDruidLine).join('\n');

if (kafkaSource) {
return sampleEntries.map(showKafkaLine).join('\n');
}
case 'kafka':
return sampleEntries.map(showKafkaLine).join('\n');

return (
sampleEntries.every(l => !l.parsed)
? sampleEntries.map(showBlankLine)
: sampleEntries.map(showRawLine)
).join('\n');
case 'kinesis':
return sampleEntries.map(showKinesisLine).join('\n');

default:
return (
sampleEntries.every(l => !l.parsed)
? sampleEntries.map(showBlankLine)
: sampleEntries.map(showRawLine)
).join('\n');
}
}

function getTimestampSpec(sampleResponse: SampleResponse | null): TimestampSpec {
Expand Down Expand Up @@ -1239,10 +1255,11 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
renderConnectStep() {
const { inputQueryState, sampleStrategy } = this.state;
const spec = this.getEffectiveSpec();
const specType = getSpecType(spec);
const ioConfig: IoConfig = deepGet(spec, 'spec.ioConfig') || EMPTY_OBJECT;
const inlineMode = deepGet(spec, 'spec.ioConfig.inputSource.type') === 'inline';
const druidSource = isDruidSource(spec);
const kafkaSource = getSpecType(spec) === 'kafka';
const specialSource = druidSource ? 'druid' : isKafkaOrKinesis(specType) ? specType : undefined;

let mainFill: JSX.Element | string;
if (inlineMode) {
Expand Down Expand Up @@ -1274,7 +1291,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
<TextArea
className="raw-lines"
readOnly
value={formatSampleEntries(inputData, druidSource, kafkaSource)}
value={formatSampleEntries(inputData, specialSource)}
/>
)}
{inputQueryState.isLoading() && <Loader />}
Expand Down Expand Up @@ -1544,11 +1561,11 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
<ParserMessage />
{!selectedFlattenField && (
<>
{specType !== 'kafka' ? (
{!isKafkaOrKinesis(specType) ? (
normalInputAutoForm
) : (
<>
{inputFormat?.type !== 'kafka' ? (
{!isKafkaOrKinesis(inputFormat?.type) ? (
normalInputAutoForm
) : (
<AutoForm
Expand All @@ -1563,18 +1580,22 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
)}
<FormGroup className="parse-metadata">
<Switch
label="Parse Kafka metadata (ts, headers, key)"
checked={inputFormat?.type === 'kafka'}
label={
specType === 'kafka'
? 'Parse Kafka metadata (ts, headers, key)'
: 'Parse Kinesis metadata (ts, partition key)'
}
checked={isKafkaOrKinesis(inputFormat?.type)}
onChange={() => {
this.updateSpecPreview(
inputFormat?.type === 'kafka'
isKafkaOrKinesis(inputFormat?.type)
? deepMove(
spec,
'spec.ioConfig.inputFormat.valueFormat',
'spec.ioConfig.inputFormat',
)
: deepSet(spec, 'spec.ioConfig.inputFormat', {
type: 'kafka',
type: specType,
valueFormat: inputFormat,
}),
);
Expand All @@ -1590,6 +1611,15 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
}
/>
)}
{inputFormat?.type === 'kinesis' && (
<AutoForm
fields={KINESIS_METADATA_INPUT_FORMAT_FIELDS}
model={inputFormat}
onChange={p =>
this.updateSpecPreview(deepSet(spec, 'spec.ioConfig.inputFormat', p))
}
/>
)}
</>
)}
{possibleSystemFields.length > 0 && (
Expand Down