diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx index d51fd34e901a..3a7f0ae56748 100644 --- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx +++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx @@ -2441,11 +2441,12 @@ export function fillInputFormatIfNeeded( sampleResponse: SampleResponse, ): Partial { if (deepGet(spec, 'spec.ioConfig.inputFormat.type')) return spec; + const specType = getSpecType(spec); return deepSet( spec, 'spec.ioConfig.inputFormat', - getSpecType(spec) === 'kafka' + specType === 'kafka' ? guessKafkaInputFormat( filterMap(sampleResponse.data, l => l.input), typeof deepGet(spec, 'spec.ioConfig.topicPattern') === 'string', diff --git a/web-console/src/druid-models/input-format/input-format.tsx b/web-console/src/druid-models/input-format/input-format.tsx index a5a1fbdfa073..970e88e2b99e 100644 --- a/web-console/src/druid-models/input-format/input-format.tsx +++ b/web-console/src/druid-models/input-format/input-format.tsx @@ -60,16 +60,29 @@ const KNOWN_TYPES = [ 'avro_stream', 'protobuf', 'regex', - 'kafka', 'javascript', + 'kafka', + 'kinesis', ]; + function generateInputFormatFields(streaming: boolean) { return compact([ { name: 'type', label: 'Input format', type: 'string', - suggestions: KNOWN_TYPES, + suggestions: [ + 'json', + 'csv', + 'tsv', + 'parquet', + 'orc', + 'avro_ocf', + 'avro_stream', + 'protobuf', + 'regex', + 'javascript', + ], required: true, info: ( <> @@ -606,12 +619,35 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field[] = [ }, ]; +export const KINESIS_METADATA_INPUT_FORMAT_FIELDS: Field[] = [ + { + name: 'timestampColumnName', + label: 'Kinesis timestamp column name', + type: 'string', + defaultValue: 'kinesis.timestamp', + defined: typeIsKnown(KNOWN_TYPES, 'kinesis'), + info: `The name of the column for the Kinesis timestamp.`, + }, + { + name: 'partitionKeyColumnName', + label: 'Kinesis partition key column name', + type: 'string', + defaultValue: 'kinesis.partitionKey', + defined: typeIsKnown(KNOWN_TYPES, 'kinesis'), + info: `The name of the column for the Kinesis partition key. This field is useful when ingesting data from multiple partitions into the same datasource.`, + }, +]; + export function issueWithInputFormat(inputFormat: InputFormat | undefined): string | undefined { return AutoForm.issueWithModel(inputFormat, BATCH_INPUT_FORMAT_FIELDS); } +export function isKafkaOrKinesis(type: string | undefined): type is 'kafka' | 'kinesis' { + return type === 'kafka' || type === 'kinesis'; +} + export function inputFormatCanProduceNestedData(inputFormat: InputFormat): boolean { - if (inputFormat.type === 'kafka') { + if (isKafkaOrKinesis(inputFormat.type)) { return Boolean( inputFormat.valueFormat && inputFormatCanProduceNestedData(inputFormat.valueFormat), ); diff --git a/web-console/src/utils/sampler.ts b/web-console/src/utils/sampler.ts index 28489c524006..cc9ae32b8aef 100644 --- a/web-console/src/utils/sampler.ts +++ b/web-console/src/utils/sampler.ts @@ -251,6 +251,11 @@ const KAFKA_SAMPLE_INPUT_FORMAT: InputFormat = { valueFormat: WHOLE_ROW_INPUT_FORMAT, }; +const KINESIS_SAMPLE_INPUT_FORMAT: InputFormat = { + type: 'kinesis', + valueFormat: WHOLE_ROW_INPUT_FORMAT, +}; + export async function sampleForConnect( spec: Partial, sampleStrategy: SampleStrategy, @@ -267,7 +272,11 @@ export async function sampleForConnect( ioConfig = deepSet( ioConfig, 'inputFormat', - samplerType === 'kafka' ? KAFKA_SAMPLE_INPUT_FORMAT : WHOLE_ROW_INPUT_FORMAT, + samplerType === 'kafka' + ? KAFKA_SAMPLE_INPUT_FORMAT + : samplerType === 'kinesis' + ? KINESIS_SAMPLE_INPUT_FORMAT + : WHOLE_ROW_INPUT_FORMAT, ); } diff --git a/web-console/src/views/load-data-view/load-data-view.tsx b/web-console/src/views/load-data-view/load-data-view.tsx index 0fdf68c88c1f..968b53888c62 100644 --- a/web-console/src/views/load-data-view/load-data-view.tsx +++ b/web-console/src/views/load-data-view/load-data-view.tsx @@ -113,11 +113,13 @@ import { invalidPartitionConfig, isDruidSource, isEmptyIngestionSpec, + isKafkaOrKinesis, isStreamingSpec, issueWithIoConfig, issueWithSampleData, joinFilter, KAFKA_METADATA_INPUT_FORMAT_FIELDS, + KINESIS_METADATA_INPUT_FORMAT_FIELDS, KNOWN_FILTER_TYPES, MAX_INLINE_DATA_LENGTH, METRIC_SPEC_FIELDS, @@ -244,30 +246,44 @@ function showKafkaLine(line: SampleEntry): string { ]).join('\n'); } +function showKinesisLine(line: SampleEntry): string { + const { input } = line; + if (!input) return 'Invalid kinesis row'; + return compact([ + `[ Kinesis timestamp: ${input['kinesis.timestamp']}`, + input['kinesis.partitionKey'] ? ` Partition key: ${input['kinesis.partitionKey']}` : undefined, + ` Payload: ${input.raw}`, + ']', + ]).join('\n'); +} + function showBlankLine(line: SampleEntry): string { return line.parsed ? `[Row: ${JSONBig.stringify(line.parsed)}]` : '[Binary data]'; } function formatSampleEntries( sampleEntries: SampleEntry[], - druidSource: boolean, - kafkaSource: boolean, + specialSource: undefined | 'druid' | 'kafka' | 'kinesis', ): string { if (!sampleEntries.length) return 'No data returned from sampler'; - if (druidSource) { - return sampleEntries.map(showDruidLine).join('\n'); - } + switch (specialSource) { + case 'druid': + return sampleEntries.map(showDruidLine).join('\n'); - if (kafkaSource) { - return sampleEntries.map(showKafkaLine).join('\n'); - } + case 'kafka': + return sampleEntries.map(showKafkaLine).join('\n'); - return ( - sampleEntries.every(l => !l.parsed) - ? sampleEntries.map(showBlankLine) - : sampleEntries.map(showRawLine) - ).join('\n'); + case 'kinesis': + return sampleEntries.map(showKinesisLine).join('\n'); + + default: + return ( + sampleEntries.every(l => !l.parsed) + ? sampleEntries.map(showBlankLine) + : sampleEntries.map(showRawLine) + ).join('\n'); + } } function getTimestampSpec(sampleResponse: SampleResponse | null): TimestampSpec { @@ -1239,10 +1255,11 @@ export class LoadDataView extends React.PureComponent )} {inputQueryState.isLoading() && } @@ -1544,11 +1561,11 @@ export class LoadDataView extends React.PureComponent {!selectedFlattenField && ( <> - {specType !== 'kafka' ? ( + {!isKafkaOrKinesis(specType) ? ( normalInputAutoForm ) : ( <> - {inputFormat?.type !== 'kafka' ? ( + {!isKafkaOrKinesis(inputFormat?.type) ? ( normalInputAutoForm ) : ( { this.updateSpecPreview( - inputFormat?.type === 'kafka' + isKafkaOrKinesis(inputFormat?.type) ? deepMove( spec, 'spec.ioConfig.inputFormat.valueFormat', 'spec.ioConfig.inputFormat', ) : deepSet(spec, 'spec.ioConfig.inputFormat', { - type: 'kafka', + type: specType, valueFormat: inputFormat, }), ); @@ -1590,6 +1611,15 @@ export class LoadDataView extends React.PureComponent )} + {inputFormat?.type === 'kinesis' && ( + + this.updateSpecPreview(deepSet(spec, 'spec.ioConfig.inputFormat', p)) + } + /> + )} )} {possibleSystemFields.length > 0 && (