Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions web-console/src/druid-models/ingestion-spec/ingestion-spec.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
cleanSpec,
guessColumnTypeFromInput,
guessColumnTypeFromSampleResponse,
guessKafkaInputFormat,
guessSimpleInputFormat,
updateSchemaWithSample,
upgradeSpec,
Expand Down Expand Up @@ -669,6 +670,36 @@ describe('ingestion-spec', () => {
});
});
});

describe('guessKafkaInputFormat', () => {
const sample = [
{
'kafka.timestamp': 1710962988515,
'kafka.topic': 'kttm2',
'raw':
'{"timestamp":"2019-08-25T00:00:00.031Z","session":"S56194838","number":"16","event":{"type":"PercentClear","percentage":55},"agent":{"type":"Browser","category":"Personal computer","browser":"Chrome","browser_version":"76.0.3809.100","os":"Windows 7","platform":"Windows"},"client_ip":"181.13.41.82","geo_ip":{"continent":"South America","country":"Argentina","region":"Santa Fe","city":"Rosario"},"language":["es","es-419"],"adblock_list":"NoAdblock","app_version":"1.9.6","path":"http://www.koalastothemax.com/","loaded_image":"http://www.koalastothemax.com/img/koalas2.jpg","referrer":"Direct","referrer_host":"Direct","server_ip":"172.31.57.89","screen":"1680x1050","window":"1680x939","session_length":76261,"timezone":"N/A","timezone_offset":"180"}',
},
{
'kafka.timestamp': 1710962988518,
'kafka.topic': 'kttm2',
'raw':
'{"timestamp":"2019-08-25T00:00:00.059Z","session":"S46093731","number":"24","event":{"type":"PercentClear","percentage":85},"agent":{"type":"Mobile Browser","category":"Smartphone","browser":"Chrome Mobile","browser_version":"50.0.2661.89","os":"Android","platform":"Android"},"client_ip":"177.242.100.0","geo_ip":{"continent":"North America","country":"Mexico","region":"Chihuahua","city":"Nuevo Casas Grandes"},"language":["en","es","es-419","es-MX"],"adblock_list":"NoAdblock","app_version":"1.9.6","path":"https://koalastothemax.com/","loaded_image":"https://koalastothemax.com/img/koalas1.jpg","referrer":"https://www.google.com/","referrer_host":"www.google.com","server_ip":"172.31.11.5","screen":"320x570","window":"540x743","session_length":252689,"timezone":"CDT","timezone_offset":"300"}',
},
];

it('works when single topic', () => {
expect(guessKafkaInputFormat(sample, false)).toEqual({ type: 'json' });
});

it('works when multi-topic', () => {
expect(guessKafkaInputFormat(sample, true)).toEqual({
type: 'kafka',
valueFormat: {
type: 'json',
},
});
});
});
});

describe('spec utils', () => {
Expand Down
23 changes: 19 additions & 4 deletions web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2417,7 +2417,10 @@ export function fillInputFormatIfNeeded(
spec,
'spec.ioConfig.inputFormat',
getSpecType(spec) === 'kafka'
? guessKafkaInputFormat(filterMap(sampleResponse.data, l => l.input))
? guessKafkaInputFormat(
filterMap(sampleResponse.data, l => l.input),
typeof deepGet(spec, 'spec.ioConfig.topicPattern') === 'string',
)
: guessSimpleInputFormat(
filterMap(sampleResponse.data, l => l.input?.raw),
isStreamingSpec(spec),
Expand All @@ -2429,15 +2432,27 @@ function noNumbers(xs: string[]): boolean {
return xs.every(x => isNaN(Number(x)));
}

export function guessKafkaInputFormat(sampleRaw: Record<string, any>[]): InputFormat {
export function guessKafkaInputFormat(
sampleRaw: Record<string, any>[],
multiTopic: boolean,
): InputFormat {
const hasHeader = sampleRaw.some(x => Object.keys(x).some(k => k.startsWith('kafka.header.')));
const keys = filterMap(sampleRaw, x => x['kafka.key']);
const payloads = filterMap(sampleRaw, x => x.raw);
const valueFormat = guessSimpleInputFormat(
filterMap(sampleRaw, x => x.raw),
true,
);

if (!hasHeader && !keys.length && !multiTopic) {
// No headers or keys and just a single topic means do not pick the 'kafka' format by default as it is less performant
return valueFormat;
}

return {
type: 'kafka',
headerFormat: hasHeader ? { type: 'string' } : undefined,
keyFormat: keys.length ? guessSimpleInputFormat(keys, true) : undefined,
valueFormat: guessSimpleInputFormat(payloads, true),
valueFormat,
};
}

Expand Down