From 1a792d068f2652118ddfdc7c4021876dea3f7422 Mon Sep 17 00:00:00 2001 From: Vadim Ogievetsky Date: Mon, 4 Nov 2019 22:31:24 -0800 Subject: [PATCH 01/27] converter v1 --- web-console/src/utils/ingestion-spec.spec.ts | 86 ++++++++++++++++++++ web-console/src/utils/ingestion-spec.tsx | 12 ++- web-console/src/utils/object-change.ts | 10 +++ 3 files changed, 107 insertions(+), 1 deletion(-) create mode 100644 web-console/src/utils/ingestion-spec.spec.ts diff --git a/web-console/src/utils/ingestion-spec.spec.ts b/web-console/src/utils/ingestion-spec.spec.ts new file mode 100644 index 000000000000..fdd4540ce6eb --- /dev/null +++ b/web-console/src/utils/ingestion-spec.spec.ts @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { upgradeSpec } from './ingestion-spec'; + +describe('ingestion-spec', () => { + it('upgrades', () => { + const oldSpec = { + type: 'index_parallel', + ioConfig: { + type: 'index_parallel', + firehose: { + type: 'http', + uris: ['https://static.druid.io/data/wikipedia.json.gz'], + }, + }, + tuningConfig: { + type: 'index_parallel', + }, + dataSchema: { + dataSource: 'wikipedia', + granularitySpec: { + type: 'uniform', + segmentGranularity: 'DAY', + queryGranularity: 'HOUR', + rollup: true, + }, + parser: { + type: 'string', + parseSpec: { + format: 'json', + timestampSpec: { + column: 'timestamp', + format: 'iso', + }, + dimensionsSpec: { + dimensions: ['channel', 'cityName', 'comment'], + }, + }, + }, + transformSpec: { + transforms: [ + { + type: 'expression', + name: 'channel', + expression: 'concat("channel", \'lol\')', + }, + ], + filter: { + type: 'selector', + dimension: 'commentLength', + value: '35', + }, + }, + metricsSpec: [ + { + name: 'count', + type: 'count', + }, + { + name: 'sum_added', + type: 'longSum', + fieldName: 'added', + }, + ], + }, + }; + + expect(upgradeSpec(oldSpec)).toMatchSnapshot(); + }); +}); diff --git a/web-console/src/utils/ingestion-spec.tsx b/web-console/src/utils/ingestion-spec.tsx index 32053eee681f..721507e6531c 100644 --- a/web-console/src/utils/ingestion-spec.tsx +++ b/web-console/src/utils/ingestion-spec.tsx @@ -29,7 +29,7 @@ import { DATETIME_TIME_FORMATS, OTHER_TIME_FORMATS, } from './druid-time'; -import { deepGet, deepSet } from './object-change'; +import { deepGet, deepMove, deepSet } from './object-change'; export const MAX_INLINE_DATA_LENGTH = 65536; @@ -2442,3 +2442,13 @@ const FILTER_FORM_FIELDS: Field[] = [ export function getFilterFormFields() { return FILTER_FORM_FIELDS; } + +export function upgradeSpec(spec: any): any { + if (deepGet(spec, 'ioConfig.firehose')) { + spec = deepMove(spec, 'ioConfig.firehose', 'ioConfig.inputSource'); + spec = deepMove(spec, 'dataSchema.parser.parseSpec', 'ioConfig.inputFormat'); + spec = deepMove(spec, 'ioConfig.inputFormat.timestampSpec', 'dataSchema.timestampSpec'); + spec = deepMove(spec, 'ioConfig.inputFormat.dimensionsSpec', 'dataSchema.dimensionsSpec'); + } + return spec; +} diff --git a/web-console/src/utils/object-change.ts b/web-console/src/utils/object-change.ts index 2ff608fd52d1..83b7cceaa2b7 100644 --- a/web-console/src/utils/object-change.ts +++ b/web-console/src/utils/object-change.ts @@ -111,6 +111,16 @@ export function deepDelete>(value: T, path: string return valueCopy; } +export function deepMove>( + value: T, + fromPath: string, + toPath: string, +): T { + value = deepSet(value, toPath, deepGet(value, fromPath)); + value = deepDelete(value, fromPath); + return value; +} + export function deepExtend>(target: T, diff: Record): T { if (typeof target !== 'object') throw new TypeError(`Invalid target`); if (typeof diff !== 'object') throw new TypeError(`Invalid diff`); From 7626cb357d19efd75b99448e97a8323f6a20a1f9 Mon Sep 17 00:00:00 2001 From: Vadim Ogievetsky Date: Tue, 5 Nov 2019 15:43:24 -0800 Subject: [PATCH 02/27] working v1 --- .../dialogs/doctor-dialog/doctor-checks.tsx | 20 +- web-console/src/utils/druid-type.ts | 10 +- web-console/src/utils/ingestion-spec.spec.ts | 117 ++++--- web-console/src/utils/ingestion-spec.tsx | 312 +++++++++--------- web-console/src/utils/sampler.ts | 180 +++++----- web-console/src/utils/utils.spec.ts | 79 ++--- .../views/load-data-view/load-data-view.tsx | 162 ++++----- 7 files changed, 410 insertions(+), 470 deletions(-) diff --git a/web-console/src/dialogs/doctor-dialog/doctor-checks.tsx b/web-console/src/dialogs/doctor-dialog/doctor-checks.tsx index e8b90da8a277..aaaf8a713875 100644 --- a/web-console/src/dialogs/doctor-dialog/doctor-checks.tsx +++ b/web-console/src/dialogs/doctor-dialog/doctor-checks.tsx @@ -233,20 +233,18 @@ export const DOCTOR_CHECKS: DoctorCheck[] = [ type: 'index', spec: { type: 'index', - ioConfig: { type: 'index', firehose: { type: 'inline', data: '{"test":"Data"}' } }, + ioConfig: { + type: 'index', + inputSource: { type: 'inline', data: '{"test":"Data"}' }, + inputFormat: { type: 'json' }, + }, dataSchema: { dataSource: 'sample', - parser: { - type: 'string', - parseSpec: { - format: 'json', - timestampSpec: { - column: '!!!_no_such_column_!!!', - missingValue: '2010-01-01T00:00:00Z', - }, - dimensionsSpec: { dimensions: ['test'] }, - }, + timestampSpec: { + column: '!!!_no_such_column_!!!', + missingValue: '2010-01-01T00:00:00Z', }, + dimensionsSpec: { dimensions: ['test'] }, transformSpec: {}, metricsSpec: [], granularitySpec: { queryGranularity: 'NONE' }, diff --git a/web-console/src/utils/druid-type.ts b/web-console/src/utils/druid-type.ts index b0540d35cab6..9f5a6f93980d 100644 --- a/web-console/src/utils/druid-type.ts +++ b/web-console/src/utils/druid-type.ts @@ -88,17 +88,13 @@ export function updateSchemaWithSample( let newSpec = spec; if (dimensionMode === 'auto-detect') { - newSpec = deepSet(newSpec, 'dataSchema.parser.parseSpec.dimensionsSpec.dimensions', []); + newSpec = deepSet(newSpec, 'dataSchema.dimensionsSpec.dimensions', []); } else { - newSpec = deepDelete(newSpec, 'dataSchema.parser.parseSpec.dimensionsSpec.dimensionExclusions'); + newSpec = deepDelete(newSpec, 'dataSchema.dimensionsSpec.dimensionExclusions'); const dimensions = getDimensionSpecs(headerAndRows, rollup); if (dimensions) { - newSpec = deepSet( - newSpec, - 'dataSchema.parser.parseSpec.dimensionsSpec.dimensions', - dimensions, - ); + newSpec = deepSet(newSpec, 'dataSchema.dimensionsSpec.dimensions', dimensions); } } diff --git a/web-console/src/utils/ingestion-spec.spec.ts b/web-console/src/utils/ingestion-spec.spec.ts index fdd4540ce6eb..f56617ccdf79 100644 --- a/web-console/src/utils/ingestion-spec.spec.ts +++ b/web-console/src/utils/ingestion-spec.spec.ts @@ -16,71 +16,84 @@ * limitations under the License. */ -import { upgradeSpec } from './ingestion-spec'; +import { downgradeSpec, upgradeSpec } from './ingestion-spec'; describe('ingestion-spec', () => { - it('upgrades', () => { - const oldSpec = { + const oldSpec = { + type: 'index_parallel', + ioConfig: { type: 'index_parallel', - ioConfig: { - type: 'index_parallel', - firehose: { - type: 'http', - uris: ['https://static.druid.io/data/wikipedia.json.gz'], - }, + firehose: { + type: 'http', + uris: ['https://static.imply.io/data/wikipedia.json.gz'], }, - tuningConfig: { - type: 'index_parallel', + }, + tuningConfig: { + type: 'index_parallel', + }, + dataSchema: { + dataSource: 'wikipedia', + granularitySpec: { + type: 'uniform', + segmentGranularity: 'DAY', + queryGranularity: 'HOUR', + rollup: true, }, - dataSchema: { - dataSource: 'wikipedia', - granularitySpec: { - type: 'uniform', - segmentGranularity: 'DAY', - queryGranularity: 'HOUR', - rollup: true, - }, - parser: { - type: 'string', - parseSpec: { - format: 'json', - timestampSpec: { - column: 'timestamp', - format: 'iso', - }, - dimensionsSpec: { - dimensions: ['channel', 'cityName', 'comment'], - }, + parser: { + type: 'string', + parseSpec: { + format: 'json', + timestampSpec: { + column: 'timestamp', + format: 'iso', }, - }, - transformSpec: { - transforms: [ - { - type: 'expression', - name: 'channel', - expression: 'concat("channel", \'lol\')', - }, - ], - filter: { - type: 'selector', - dimension: 'commentLength', - value: '35', + dimensionsSpec: { + dimensions: ['channel', 'cityName', 'comment'], }, - }, - metricsSpec: [ - { - name: 'count', - type: 'count', + flattenSpec: { + fields: [ + { + type: 'path', + name: 'cityNameAlt', + expr: '$.cityName', + }, + ], }, + }, + }, + transformSpec: { + transforms: [ { - name: 'sum_added', - type: 'longSum', - fieldName: 'added', + type: 'expression', + name: 'channel', + expression: 'concat("channel", \'lol\')', }, ], + filter: { + type: 'selector', + dimension: 'commentLength', + value: '35', + }, }, - }; + metricsSpec: [ + { + name: 'count', + type: 'count', + }, + { + name: 'sum_added', + type: 'longSum', + fieldName: 'added', + }, + ], + }, + }; + it('upgrades', () => { expect(upgradeSpec(oldSpec)).toMatchSnapshot(); }); + + it('round trips', () => { + expect(downgradeSpec(upgradeSpec(oldSpec))).toMatchObject(oldSpec); + }); }); diff --git a/web-console/src/utils/ingestion-spec.tsx b/web-console/src/utils/ingestion-spec.tsx index 721507e6531c..f9c3253f6716 100644 --- a/web-console/src/utils/ingestion-spec.tsx +++ b/web-console/src/utils/ingestion-spec.tsx @@ -29,7 +29,7 @@ import { DATETIME_TIME_FORMATS, OTHER_TIME_FORMATS, } from './druid-time'; -import { deepGet, deepMove, deepSet } from './object-change'; +import { deepDelete, deepGet, deepMove, deepSet } from './object-change'; export const MAX_INLINE_DATA_LENGTH = 65536; @@ -41,8 +41,8 @@ const CURRENT_YEAR = new Date().getUTCFullYear(); export interface IngestionSpec { type?: IngestionType; - dataSchema: DataSchema; ioConfig: IoConfig; + dataSchema: DataSchema; tuningConfig?: TuningConfig; } @@ -52,7 +52,7 @@ export function isEmptyIngestionSpec(spec: IngestionSpec) { export type IngestionType = 'kafka' | 'kinesis' | 'index_hadoop' | 'index' | 'index_parallel'; -// A combination of IngestionType and firehose +// A combination of IngestionType and inputSourceType export type IngestionComboType = | 'kafka' | 'kinesis' @@ -93,8 +93,8 @@ export function getIngestionComboType(spec: IngestionSpec): IngestionComboType | case 'index': case 'index_parallel': - const firehose = deepGet(spec, 'ioConfig.firehose') || EMPTY_OBJECT; - switch (firehose.type) { + const inputSource = deepGet(spec, 'ioConfig.inputSource') || EMPTY_OBJECT; + switch (inputSource.type) { case 'local': case 'http': case 'ingestSegment': @@ -102,7 +102,7 @@ export function getIngestionComboType(spec: IngestionSpec): IngestionComboType | case 'static-s3': case 'static-google-blobstore': case 'hdfs': - return `index:${firehose.type}` as IngestionComboType; + return `index:${inputSource.type}` as IngestionComboType; } } @@ -199,28 +199,21 @@ export function getRequiredModule(ingestionType: IngestionComboTypeWithExtra): s export interface DataSchema { dataSource: string; - parser: Parser; + timestampSpec: TimestampSpec; transformSpec?: TransformSpec; granularitySpec?: GranularitySpec; + dimensionsSpec: DimensionsSpec; metricsSpec?: MetricSpec[]; } -export interface Parser { - type?: string; - parseSpec: ParseSpec; -} - -export interface ParseSpec { - format: string; +export interface InputFormat { + type: string; hasHeaderRow?: boolean; skipHeaderRows?: number; columns?: string[]; listDelimiter?: string; pattern?: string; function?: string; - - timestampSpec: TimestampSpec; - dimensionsSpec: DimensionsSpec; flattenSpec?: FlattenSpec; } @@ -237,8 +230,7 @@ export function isParallel(spec: IngestionSpec): boolean { export type DimensionMode = 'specific' | 'auto-detect'; export function getDimensionMode(spec: IngestionSpec): DimensionMode { - const dimensions = - deepGet(spec, 'dataSchema.parser.parseSpec.dimensionsSpec.dimensions') || EMPTY_ARRAY; + const dimensions = deepGet(spec, 'dataSchema.dimensionsSpec.dimensions') || EMPTY_ARRAY; return Array.isArray(dimensions) && dimensions.length === 0 ? 'auto-detect' : 'specific'; } @@ -262,7 +254,7 @@ export function isTask(spec: IngestionSpec) { } export function isIngestSegment(spec: IngestionSpec): boolean { - return deepGet(spec, 'ioConfig.firehose.type') === 'ingestSegment'; + return deepGet(spec, 'ioConfig.inputSource.type') === 'ingestSegment'; } export function changeParallel(spec: IngestionSpec, parallel: boolean): IngestionSpec { @@ -296,10 +288,9 @@ export function normalizeSpec(spec: Partial): IngestionSpec { return spec as IngestionSpec; } -const PARSE_SPEC_FORM_FIELDS: Field[] = [ +const INPUT_FORMAT_FORM_FIELDS: Field[] = [ { - name: 'format', - label: 'Parser to use', + name: 'type', type: 'string', suggestions: ['json', 'csv', 'tsv', 'regex'], info: ( @@ -321,25 +312,25 @@ const PARSE_SPEC_FORM_FIELDS: Field[] = [ name: 'pattern', type: 'string', required: true, - defined: (p: ParseSpec) => p.format === 'regex', + defined: (p: InputFormat) => p.type === 'regex', }, { name: 'function', type: 'string', required: true, - defined: (p: ParseSpec) => p.format === 'javascript', + defined: (p: InputFormat) => p.type === 'javascript', }, { name: 'hasHeaderRow', type: 'boolean', defaultValue: false, - defined: (p: ParseSpec) => p.format === 'csv' || p.format === 'tsv', + defined: (p: InputFormat) => p.type === 'csv' || p.type === 'tsv', }, { name: 'skipHeaderRows', type: 'number', defaultValue: 0, - defined: (p: ParseSpec) => p.format === 'csv' || p.format === 'tsv', + defined: (p: InputFormat) => p.type === 'csv' || p.type === 'tsv', min: 0, info: ( <> @@ -352,51 +343,47 @@ const PARSE_SPEC_FORM_FIELDS: Field[] = [ { name: 'columns', type: 'string-array', - required: (p: ParseSpec) => - ((p.format === 'csv' || p.format === 'tsv') && !p.hasHeaderRow) || p.format === 'regex', - defined: (p: ParseSpec) => - ((p.format === 'csv' || p.format === 'tsv') && !p.hasHeaderRow) || p.format === 'regex', + required: (p: InputFormat) => + ((p.type === 'csv' || p.type === 'tsv') && !p.hasHeaderRow) || p.type === 'regex', + defined: (p: InputFormat) => + ((p.type === 'csv' || p.type === 'tsv') && !p.hasHeaderRow) || p.type === 'regex', }, { name: 'delimiter', type: 'string', defaultValue: '\t', - defined: (p: ParseSpec) => p.format === 'tsv', + defined: (p: InputFormat) => p.type === 'tsv', info: <>A custom delimiter for data values., }, { name: 'listDelimiter', type: 'string', - defined: (p: ParseSpec) => p.format === 'csv' || p.format === 'tsv', + defined: (p: InputFormat) => p.type === 'csv' || p.type === 'tsv', info: <>A custom delimiter for multi-value dimensions., }, ]; -export function getParseSpecFormFields() { - return PARSE_SPEC_FORM_FIELDS; +export function getInputFormatFormFields() { + return INPUT_FORMAT_FORM_FIELDS; } -export function issueWithParser(parser: Parser | undefined): string | undefined { - if (!parser) return 'no parser'; - if (parser.type === 'map') return; - - const { parseSpec } = parser; - if (!parseSpec) return 'no parse spec'; - if (!parseSpec.format) return 'missing a format'; - switch (parseSpec.format) { +export function issueWithInputFormat(inputFormat: InputFormat | undefined): string | undefined { + if (!inputFormat) return 'no input format'; + if (!inputFormat.type) return 'missing a type'; + switch (inputFormat.type) { case 'regex': - if (!parseSpec.pattern) return "must have a 'pattern'"; + if (!inputFormat.pattern) return "must have a 'pattern'"; break; case 'javascript': - if (!parseSpec['function']) return "must have a 'function'"; + if (!inputFormat['function']) return "must have a 'function'"; break; } return; } -export function parseSpecHasFlatten(parseSpec: ParseSpec): boolean { - return parseSpec.format === 'json'; +export function inputFormatCanFlatten(inputFormat: InputFormat): boolean { + return inputFormat.type === 'json'; } export interface TimestampSpec { @@ -936,7 +923,8 @@ export function getMetricSpecName(metricSpec: MetricSpec): string { export interface IoConfig { type: string; - firehose?: Firehose; + inputSource?: InputSource; + inputFormat?: InputFormat; appendToExisting?: boolean; topic?: string; consumerProperties?: any; @@ -958,7 +946,7 @@ export function invalidIoConfig(ioConfig: IoConfig): boolean { ); } -export interface Firehose { +export interface InputSource { type: string; baseDir?: string; filter?: any; @@ -982,8 +970,8 @@ export interface Firehose { } export function getIoConfigFormFields(ingestionComboType: IngestionComboType): Field[] { - const firehoseType: Field = { - name: 'firehose.type', + const inputSourceType: Field = { + name: 'inputSource.type', label: 'Firehose type', type: 'string', suggestions: ['local', 'http', 'inline', 'static-s3', 'static-google-blobstore', 'hdfs'], @@ -993,9 +981,9 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F - firehoses + inputSources - . You can change your selected firehose here. + . You can change your selected inputSource here.

), }; @@ -1003,9 +991,9 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F switch (ingestionComboType) { case 'index:http': return [ - firehoseType, + inputSourceType, { - name: 'firehose.uris', + name: 'inputSource.uris', label: 'URIs', type: 'string-array', placeholder: @@ -1019,14 +1007,14 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F ), }, { - name: 'firehose.httpAuthenticationUsername', + name: 'inputSource.httpAuthenticationUsername', label: 'HTTP auth username', type: 'string', placeholder: '(optional)', info:

Username to use for authentication with specified URIs

, }, { - name: 'firehose.httpAuthenticationPassword', + name: 'inputSource.httpAuthenticationPassword', label: 'HTTP auth password', type: 'string', placeholder: '(optional)', @@ -1036,9 +1024,9 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F case 'index:local': return [ - firehoseType, + inputSourceType, { - name: 'firehose.baseDir', + name: 'inputSource.baseDir', label: 'Base directory', type: 'string', placeholder: '/path/to/files/', @@ -1048,14 +1036,14 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F - firehose.baseDir + inputSource.baseDir

Specifies the directory to search recursively for files to be ingested.

), }, { - name: 'firehose.filter', + name: 'inputSource.filter', label: 'File filter', type: 'string', required: true, @@ -1065,7 +1053,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F - firehose.filter + inputSource.filter

A wildcard filter for files. See{' '} @@ -1081,16 +1069,16 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F case 'index:ingestSegment': return [ - firehoseType, + inputSourceType, { - name: 'firehose.dataSource', + name: 'inputSource.dataSource', label: 'Datasource', type: 'string', required: true, info:

The datasource to fetch rows from.

, }, { - name: 'firehose.interval', + name: 'inputSource.interval', label: 'Interval', type: 'string', placeholder: `${CURRENT_YEAR}-01-01/${CURRENT_YEAR + 1}-01-01`, @@ -1108,7 +1096,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F ), }, { - name: 'firehose.dimensions', + name: 'inputSource.dimensions', label: 'Dimensions', type: 'string-array', placeholder: '(optional)', @@ -1120,7 +1108,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F ), }, { - name: 'firehose.metrics', + name: 'inputSource.metrics', label: 'Metrics', type: 'string-array', placeholder: '(optional)', @@ -1132,7 +1120,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F ), }, { - name: 'firehose.filter', + name: 'inputSource.filter', label: 'Filter', type: 'json', placeholder: '(optional)', @@ -1152,20 +1140,20 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F case 'index:inline': return [ - firehoseType, + inputSourceType, // do not add 'data' here as it has special handling in the load-data view ]; case 'index:static-s3': return [ - firehoseType, + inputSourceType, { - name: 'firehose.uris', + name: 'inputSource.uris', label: 'S3 URIs', type: 'string-array', placeholder: 's3://your-bucket/some-file1.ext, s3://your-bucket/some-file2.ext', required: true, - defined: ioConfig => !deepGet(ioConfig, 'firehose.prefixes'), + defined: ioConfig => !deepGet(ioConfig, 'inputSource.prefixes'), info: ( <>

@@ -1177,12 +1165,12 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F ), }, { - name: 'firehose.prefixes', + name: 'inputSource.prefixes', label: 'S3 prefixes', type: 'string-array', placeholder: 's3://your-bucket/some-path1, s3://your-bucket/some-path2', required: true, - defined: ioConfig => !deepGet(ioConfig, 'firehose.uris'), + defined: ioConfig => !deepGet(ioConfig, 'inputSource.uris'), info: ( <>

A list of paths (with bucket) where your files are stored.

@@ -1194,9 +1182,9 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F case 'index:static-google-blobstore': return [ - firehoseType, + inputSourceType, { - name: 'firehose.blobs', + name: 'inputSource.blobs', label: 'Google blobs', type: 'json', required: true, @@ -1218,9 +1206,9 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F case 'index:hdfs': return [ - firehoseType, + inputSourceType, { - name: 'firehose.paths', + name: 'inputSource.paths', label: 'Paths', type: 'string', placeholder: '/path/to/file.ext', @@ -1346,44 +1334,44 @@ function nonEmptyArray(a: any) { return Array.isArray(a) && Boolean(a.length); } -function issueWithFirehose(firehose: Firehose | undefined): string | undefined { - if (!firehose) return 'does not exist'; - if (!firehose.type) return 'missing a type'; - switch (firehose.type) { +function issueWithInputSource(inputSource: InputSource | undefined): string | undefined { + if (!inputSource) return 'does not exist'; + if (!inputSource.type) return 'missing a type'; + switch (inputSource.type) { case 'local': - if (!firehose.baseDir) return `must have a 'baseDir'`; - if (!firehose.filter) return `must have a 'filter'`; + if (!inputSource.baseDir) return `must have a 'baseDir'`; + if (!inputSource.filter) return `must have a 'filter'`; break; case 'http': - if (!nonEmptyArray(firehose.uris)) { + if (!nonEmptyArray(inputSource.uris)) { return 'must have at least one uri'; } break; case 'ingestSegment': - if (!firehose.dataSource) return `must have a 'dataSource'`; - if (!firehose.interval) return `must have an 'interval'`; + if (!inputSource.dataSource) return `must have a 'dataSource'`; + if (!inputSource.interval) return `must have an 'interval'`; break; case 'inline': - if (!firehose.data) return `must have 'data'`; + if (!inputSource.data) return `must have 'data'`; break; case 'static-s3': - if (!nonEmptyArray(firehose.uris) && !nonEmptyArray(firehose.prefixes)) { + if (!nonEmptyArray(inputSource.uris) && !nonEmptyArray(inputSource.prefixes)) { return 'must have at least one uri or prefix'; } break; case 'static-google-blobstore': - if (!nonEmptyArray(firehose.blobs)) { + if (!nonEmptyArray(inputSource.blobs)) { return 'must have at least one blob'; } break; case 'hdfs': - if (!firehose.paths) { + if (!inputSource.paths) { return 'must have paths'; } break; @@ -1391,14 +1379,17 @@ function issueWithFirehose(firehose: Firehose | undefined): string | undefined { return; } -export function issueWithIoConfig(ioConfig: IoConfig | undefined): string | undefined { +export function issueWithIoConfig( + ioConfig: IoConfig | undefined, + ignoreInputFormat = false, +): string | undefined { if (!ioConfig) return 'does not exist'; if (!ioConfig.type) return 'missing a type'; switch (ioConfig.type) { case 'index': case 'index_parallel': - if (issueWithFirehose(ioConfig.firehose)) { - return `firehose: '${issueWithFirehose(ioConfig.firehose)}'`; + if (issueWithInputSource(ioConfig.inputSource)) { + return `inputSource: '${issueWithInputSource(ioConfig.inputSource)}'`; } break; @@ -1411,6 +1402,10 @@ export function issueWithIoConfig(ioConfig: IoConfig | undefined): string | unde break; } + if (!ignoreInputFormat && issueWithInputFormat(ioConfig.inputFormat)) { + return `inputFormat: '${issueWithInputFormat(ioConfig.inputFormat)}'`; + } + return; } @@ -1424,7 +1419,7 @@ export function getIoConfigTuningFormFields( case 'index:hdfs': return [ { - name: 'firehose.fetchTimeout', + name: 'inputSource.fetchTimeout', label: 'Fetch timeout', type: 'number', defaultValue: 60000, @@ -1435,7 +1430,7 @@ export function getIoConfigTuningFormFields( ), }, { - name: 'firehose.maxFetchRetry', + name: 'inputSource.maxFetchRetry', label: 'Max fetch retry', type: 'number', defaultValue: 3, @@ -1446,7 +1441,7 @@ export function getIoConfigTuningFormFields( ), }, { - name: 'firehose.maxCacheCapacityBytes', + name: 'inputSource.maxCacheCapacityBytes', label: 'Max cache capacity bytes', type: 'number', defaultValue: 1073741824, @@ -1460,7 +1455,7 @@ export function getIoConfigTuningFormFields( ), }, { - name: 'firehose.maxFetchCapacityBytes', + name: 'inputSource.maxFetchCapacityBytes', label: 'Max fetch capacity bytes', type: 'number', defaultValue: 1073741824, @@ -1474,7 +1469,7 @@ export function getIoConfigTuningFormFields( ), }, { - name: 'firehose.prefetchTriggerBytes', + name: 'inputSource.prefetchTriggerBytes', label: 'Prefetch trigger bytes', type: 'number', placeholder: 'maxFetchCapacityBytes / 2', @@ -1493,7 +1488,7 @@ export function getIoConfigTuningFormFields( case 'index:ingestSegment': return [ { - name: 'firehose.maxFetchCapacityBytes', + name: 'inputSource.maxFetchCapacityBytes', label: 'Max fetch capacity bytes', type: 'number', defaultValue: 157286400, @@ -1743,28 +1738,31 @@ export function guessDataSourceName(spec: IngestionSpec): string | undefined { switch (ioConfig.type) { case 'index': case 'index_parallel': - const firehose = ioConfig.firehose; - if (!firehose) return; + const inputSource = ioConfig.inputSource; + if (!inputSource) return; - switch (firehose.type) { + switch (inputSource.type) { case 'local': - if (firehose.filter && filterIsFilename(firehose.filter)) { - return basenameFromFilename(firehose.filter); - } else if (firehose.baseDir) { - return filenameFromPath(firehose.baseDir); + if (inputSource.filter && filterIsFilename(inputSource.filter)) { + return basenameFromFilename(inputSource.filter); + } else if (inputSource.baseDir) { + return filenameFromPath(inputSource.baseDir); } else { return; } case 'static-s3': - const s3Path = (firehose.uris || EMPTY_ARRAY)[0] || (firehose.prefixes || EMPTY_ARRAY)[0]; + const s3Path = + (inputSource.uris || EMPTY_ARRAY)[0] || (inputSource.prefixes || EMPTY_ARRAY)[0]; return s3Path ? filenameFromPath(s3Path) : undefined; case 'http': - return Array.isArray(firehose.uris) ? filenameFromPath(firehose.uris[0]) : undefined; + return Array.isArray(inputSource.uris) + ? filenameFromPath(inputSource.uris[0]) + : undefined; case 'ingestSegment': - return firehose.dataSource; + return inputSource.dataSource; case 'inline': return 'inline_data'; @@ -2242,7 +2240,7 @@ export function updateIngestionType( spec: IngestionSpec, comboType: IngestionComboType, ): IngestionSpec { - let [ingestionType, firehoseType] = comboType.split(':'); + let [ingestionType, inputSourceType] = comboType.split(':'); if (ingestionType === 'index') ingestionType = 'index_parallel'; const ioAndTuningConfigType = ingestionTypeToIoAndTuningConfigType( ingestionType as IngestionType, @@ -2253,11 +2251,11 @@ export function updateIngestionType( newSpec = deepSet(newSpec, 'ioConfig.type', ioAndTuningConfigType); newSpec = deepSet(newSpec, 'tuningConfig.type', ioAndTuningConfigType); - if (firehoseType) { - newSpec = deepSet(newSpec, 'ioConfig.firehose', { type: firehoseType }); + if (inputSourceType) { + newSpec = deepSet(newSpec, 'ioConfig.inputSource', { type: inputSourceType }); - if (firehoseType === 'local') { - newSpec = deepSet(newSpec, 'ioConfig.firehose.filter', '*'); + if (inputSourceType === 'local') { + newSpec = deepSet(newSpec, 'ioConfig.inputSource.filter', '*'); } } @@ -2280,62 +2278,58 @@ export function updateIngestionType( return newSpec; } -export function fillParser(spec: IngestionSpec, sampleData: string[]): IngestionSpec { - const firehoseType = deepGet(spec, 'ioConfig.firehose.type'); - - if (firehoseType === 'sql') { - return deepSet(spec, 'dataSchema.parser', { type: 'map' }); - } +export function fillInputFormat(spec: IngestionSpec, sampleData: string[]): IngestionSpec { + // const inputSourceType = deepGet(spec, 'ioConfig.inputSource.type'); - if (firehoseType === 'ingestSegment') { - return deepSet(spec, 'dataSchema.parser', { - type: 'string', - parseSpec: { format: 'timeAndDims' }, - }); - } + // ToDo: !!! + // if (inputSourceType === 'sql') { + // return deepSet(spec, 'dataSchema.parser', { type: 'map' }); + // } + // + // if (inputSourceType === 'ingestSegment') { + // return deepSet(spec, 'dataSchema.parser', { + // type: 'string', + // }); + // } - const parseSpec = guessParseSpec(sampleData); - if (!parseSpec) return spec; + const inputFormat = guessInputFormat(sampleData); + if (!inputFormat) return spec; - return deepSet(spec, 'dataSchema.parser', { type: 'string', parseSpec }); + return deepSet(spec, 'ioConfig.inputFormat', inputFormat); } -function guessParseSpec(sampleData: string[]): ParseSpec | undefined { +function guessInputFormat(sampleData: string[]): InputFormat | undefined { const sampleDatum = sampleData[0]; if (!sampleDatum) return; if (sampleDatum.startsWith('{') && sampleDatum.endsWith('}')) { - return parseSpecFromFormat('json'); + return inputFormatFromType('json'); } if (sampleDatum.split('\t').length > 3) { - return parseSpecFromFormat('tsv', !/\t\d+\t/.test(sampleDatum)); + return inputFormatFromType('tsv', !/\t\d+\t/.test(sampleDatum)); } if (sampleDatum.split(',').length > 3) { - return parseSpecFromFormat('csv', !/,\d+,/.test(sampleDatum)); + return inputFormatFromType('csv', !/,\d+,/.test(sampleDatum)); } - return parseSpecFromFormat('regex'); + return inputFormatFromType('regex'); } -function parseSpecFromFormat(format: string, hasHeaderRow?: boolean): ParseSpec { - const parseSpec: ParseSpec = { - format, - timestampSpec: {}, - dimensionsSpec: {}, - }; +function inputFormatFromType(type: string, hasHeaderRow?: boolean): InputFormat { + const inputFormat: InputFormat = { type }; - if (format === 'regex') { - parseSpec.pattern = '(.*)'; - parseSpec.columns = ['column1']; + if (type === 'regex') { + inputFormat.pattern = '(.*)'; + inputFormat.columns = ['column1']; } if (typeof hasHeaderRow === 'boolean') { - parseSpec.hasHeaderRow = hasHeaderRow; + inputFormat.hasHeaderRow = hasHeaderRow; } - return parseSpec; + return inputFormat; } export type DruidFilter = Record; @@ -2446,9 +2440,31 @@ export function getFilterFormFields() { export function upgradeSpec(spec: any): any { if (deepGet(spec, 'ioConfig.firehose')) { spec = deepMove(spec, 'ioConfig.firehose', 'ioConfig.inputSource'); + spec = deepMove(spec, 'dataSchema.parser.parseSpec.timestampSpec', 'dataSchema.timestampSpec'); + spec = deepMove( + spec, + 'dataSchema.parser.parseSpec.dimensionsSpec', + 'dataSchema.dimensionsSpec', + ); spec = deepMove(spec, 'dataSchema.parser.parseSpec', 'ioConfig.inputFormat'); - spec = deepMove(spec, 'ioConfig.inputFormat.timestampSpec', 'dataSchema.timestampSpec'); - spec = deepMove(spec, 'ioConfig.inputFormat.dimensionsSpec', 'dataSchema.dimensionsSpec'); + spec = deepDelete(spec, 'dataSchema.parser'); + spec = deepMove(spec, 'ioConfig.inputFormat.format', 'ioConfig.inputFormat.type'); + } + return spec; +} + +export function downgradeSpec(spec: any): any { + if (deepGet(spec, 'ioConfig.inputSource')) { + spec = deepMove(spec, 'ioConfig.inputFormat.type', 'ioConfig.inputFormat.format'); + spec = deepSet(spec, 'dataSchema.parser', { type: 'string' }); + spec = deepMove(spec, 'ioConfig.inputFormat', 'dataSchema.parser.parseSpec'); + spec = deepMove( + spec, + 'dataSchema.dimensionsSpec', + 'dataSchema.parser.parseSpec.dimensionsSpec', + ); + spec = deepMove(spec, 'dataSchema.timestampSpec', 'dataSchema.parser.parseSpec.timestampSpec'); + spec = deepMove(spec, 'ioConfig.inputSource', 'ioConfig.firehose'); } return spec; } diff --git a/web-console/src/utils/sampler.ts b/web-console/src/utils/sampler.ts index 83c0401a81ca..2324e14c9d4b 100644 --- a/web-console/src/utils/sampler.ts +++ b/web-console/src/utils/sampler.ts @@ -22,6 +22,7 @@ import { getDruidErrorMessage, queryDruidRune } from './druid-query'; import { alphanumericCompare, filterMap, sortWithPrefixSuffix } from './general'; import { DimensionsSpec, + downgradeSpec, getEmptyTimestampSpec, getSpecType, IngestionSpec, @@ -29,12 +30,11 @@ import { isColumnTimestampSpec, isIngestSegment, MetricSpec, - Parser, - ParseSpec, + TimestampSpec, Transform, TransformSpec, } from './ingestion-spec'; -import { deepGet, deepSet, whitelistKeys } from './object-change'; +import { deepGet, deepSet } from './object-change'; const MS_IN_HOUR = 60 * 60 * 1000; @@ -140,7 +140,7 @@ export function headerAndRowsFromSampleResponse( }; } -export async function getOverlordModules(): Promise { +export async function getProxyOverlordModules(): Promise { let statusResp: any; try { statusResp = await axios.get(`/proxy/overlord/status`); @@ -155,6 +155,11 @@ export async function postToSampler( sampleSpec: SampleSpec, forStr: string, ): Promise { + sampleSpec = fixSamplerTypes(sampleSpec); + console.log('pre downgrade', sampleSpec); + sampleSpec = deepSet(sampleSpec, 'spec', downgradeSpec(sampleSpec.spec)); // ToDo: remove this line when new format is supported by sampler + console.log('post downgrade', sampleSpec); + let sampleResp: any; try { sampleResp = await axios.post(`${SAMPLER_URL}?for=${forStr}`, sampleSpec); @@ -181,6 +186,15 @@ function makeSamplerIoConfig( return ioConfig; } +function fixSamplerTypes(sampleSpec: SampleSpec): SampleSpec { + const samplerType = getSamplerType(sampleSpec.spec); + sampleSpec = deepSet(sampleSpec, 'type', samplerType); + sampleSpec = deepSet(sampleSpec, 'spec.type', samplerType); + sampleSpec = deepSet(sampleSpec, 'spec.ioConfig.type', samplerType); + sampleSpec = deepSet(sampleSpec, 'spec.tuningConfig.type', samplerType); + return sampleSpec; +} + /** * This function scopes down the interval of an ingestSegment firehose for the data sampler * this is needed because the ingestSegment firehose gets the interval you are sampling over, @@ -191,7 +205,7 @@ function makeSamplerIoConfig( * This is essentially a workaround for https://github.com/apache/incubator-druid/issues/8448 * @param ioConfig The IO Config to scope down the interval of */ -export async function scopeDownIngestSegmentFirehoseIntervalIfNeeded( +export async function scopeDownIngestSegmentInputSourceIntervalIfNeeded( ioConfig: IoConfig, ): Promise { if (deepGet(ioConfig, 'firehose.type') !== 'ingestSegment') return ioConfig; @@ -235,7 +249,7 @@ export async function sampleForConnect( sampleStrategy: SampleStrategy, ): Promise { const samplerType = getSamplerType(spec); - const ioConfig: IoConfig = await scopeDownIngestSegmentFirehoseIntervalIfNeeded( + const ioConfig: IoConfig = await scopeDownIngestSegmentInputSourceIntervalIfNeeded( makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy), ); @@ -245,19 +259,15 @@ export async function sampleForConnect( type: samplerType, spec: { type: samplerType, - ioConfig, + ioConfig: deepSet(ioConfig, 'inputFormat', { + type: 'regex', + pattern: '(.*)', + columns: ['a'], + }), dataSchema: { dataSource: 'sample', - parser: { - type: 'string', - parseSpec: { - format: 'regex', - pattern: '(.*)', - columns: ['a'], - dimensionsSpec: {}, - timestampSpec: getEmptyTimestampSpec(), - }, - }, + timestampSpec: getEmptyTimestampSpec(), + dimensionsSpec: {}, }, } as any, samplerConfig: BASE_SAMPLER_CONFIG, @@ -298,27 +308,19 @@ export async function sampleForParser( cacheKey: string | undefined, ): Promise { const samplerType = getSamplerType(spec); - const ioConfig: IoConfig = await scopeDownIngestSegmentFirehoseIntervalIfNeeded( + const ioConfig: IoConfig = await scopeDownIngestSegmentInputSourceIntervalIfNeeded( makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy), ); - const parser: Parser = deepGet(spec, 'dataSchema.parser') || {}; const sampleSpec: SampleSpec = { type: samplerType, spec: { type: samplerType, - ioConfig: deepSet(ioConfig, 'type', samplerType), + ioConfig, dataSchema: { dataSource: 'sample', - parser: { - type: parser.type, - parseSpec: (parser.parseSpec - ? Object.assign({}, parser.parseSpec, { - dimensionsSpec: {}, - timestampSpec: getEmptyTimestampSpec(), - }) - : undefined) as any, - }, + timestampSpec: getEmptyTimestampSpec(), + dimensionsSpec: {}, }, }, samplerConfig: Object.assign({}, BASE_SAMPLER_CONFIG, { @@ -335,13 +337,11 @@ export async function sampleForTimestamp( cacheKey: string | undefined, ): Promise { const samplerType = getSamplerType(spec); - const ioConfig: IoConfig = await scopeDownIngestSegmentFirehoseIntervalIfNeeded( + const ioConfig: IoConfig = await scopeDownIngestSegmentInputSourceIntervalIfNeeded( makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy), ); - const parser: Parser = deepGet(spec, 'dataSchema.parser') || {}; - const parseSpec: ParseSpec = deepGet(spec, 'dataSchema.parser.parseSpec') || {}; - const timestampSpec: ParseSpec = - deepGet(spec, 'dataSchema.parser.parseSpec.timestampSpec') || getEmptyTimestampSpec(); + const timestampSpec: TimestampSpec = + deepGet(spec, 'dataSchema.timestampSpec') || getEmptyTimestampSpec(); const columnTimestampSpec = isColumnTimestampSpec(timestampSpec); // First do a query with a static timestamp spec @@ -349,18 +349,11 @@ export async function sampleForTimestamp( type: samplerType, spec: { type: samplerType, - ioConfig: deepSet(ioConfig, 'type', samplerType), + ioConfig, dataSchema: { dataSource: 'sample', - parser: { - type: parser.type, - parseSpec: (parser.parseSpec - ? Object.assign({}, parseSpec, { - dimensionsSpec: {}, - timestampSpec: columnTimestampSpec ? getEmptyTimestampSpec() : timestampSpec, - }) - : undefined) as any, - }, + dimensionsSpec: {}, + timestampSpec: columnTimestampSpec ? getEmptyTimestampSpec() : timestampSpec, }, }, samplerConfig: Object.assign({}, BASE_SAMPLER_CONFIG, { @@ -379,15 +372,11 @@ export async function sampleForTimestamp( type: samplerType, spec: { type: samplerType, - ioConfig: deepSet(ioConfig, 'type', samplerType), + ioConfig, dataSchema: { dataSource: 'sample', - parser: { - type: parser.type, - parseSpec: Object.assign({}, parseSpec, { - dimensionsSpec: {}, - }), - }, + dimensionsSpec: {}, + timestampSpec, }, }, samplerConfig: Object.assign({}, BASE_SAMPLER_CONFIG, { @@ -424,12 +413,11 @@ export async function sampleForTransform( cacheKey: string | undefined, ): Promise { const samplerType = getSamplerType(spec); - const ioConfig: IoConfig = await scopeDownIngestSegmentFirehoseIntervalIfNeeded( + const ioConfig: IoConfig = await scopeDownIngestSegmentInputSourceIntervalIfNeeded( makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy), ); - const parser: Parser = deepGet(spec, 'dataSchema.parser') || {}; - const parseSpec: ParseSpec = deepGet(spec, 'dataSchema.parser.parseSpec') || {}; - const parserColumns: string[] = deepGet(parseSpec, 'columns') || []; + const inputFormatColumns: string[] = deepGet(spec, 'ioConfig.inputFormat.columns') || []; + const timestampSpec: TimestampSpec = deepGet(spec, 'dataSchema.timestampSpec'); const transforms: Transform[] = deepGet(spec, 'dataSchema.transformSpec.transforms') || []; // Extra step to simulate auto detecting dimension with transforms @@ -439,15 +427,11 @@ export async function sampleForTransform( type: samplerType, spec: { type: samplerType, - ioConfig: deepSet(ioConfig, 'type', samplerType), + ioConfig, dataSchema: { dataSource: 'sample', - parser: { - type: parser.type, - parseSpec: Object.assign({}, parseSpec, { - dimensionsSpec: {}, - }), - }, + timestampSpec, + dimensionsSpec: {}, }, }, samplerConfig: Object.assign({}, BASE_SAMPLER_CONFIG, { @@ -461,7 +445,7 @@ export async function sampleForTransform( headerFromSampleResponse( sampleResponseHack, '__time', - ['__time'].concat(parserColumns), + ['__time'].concat(inputFormatColumns), ).concat(transforms.map(t => t.name)), ); } @@ -470,15 +454,11 @@ export async function sampleForTransform( type: samplerType, spec: { type: samplerType, - ioConfig: deepSet(ioConfig, 'type', samplerType), + ioConfig, dataSchema: { dataSource: 'sample', - parser: { - type: parser.type, - parseSpec: Object.assign({}, parseSpec, { - dimensionsSpec: specialDimensionSpec, // Hack Hack Hack - }), - }, + timestampSpec, + dimensionsSpec: specialDimensionSpec, // Hack Hack Hack transformSpec: { transforms, }, @@ -498,12 +478,11 @@ export async function sampleForFilter( cacheKey: string | undefined, ): Promise { const samplerType = getSamplerType(spec); - const ioConfig: IoConfig = await scopeDownIngestSegmentFirehoseIntervalIfNeeded( + const ioConfig: IoConfig = await scopeDownIngestSegmentInputSourceIntervalIfNeeded( makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy), ); - const parser: Parser = deepGet(spec, 'dataSchema.parser') || {}; - const parseSpec: ParseSpec = deepGet(spec, 'dataSchema.parser.parseSpec') || {}; - const parserColumns: string[] = deepGet(parser, 'columns') || []; + const inputFormatColumns: string[] = deepGet(spec, 'ioConfig.inputFormat.columns') || []; + const timestampSpec: TimestampSpec = deepGet(spec, 'dataSchema.timestampSpec'); const transforms: Transform[] = deepGet(spec, 'dataSchema.transformSpec.transforms') || []; const filter: any = deepGet(spec, 'dataSchema.transformSpec.filter'); @@ -514,15 +493,11 @@ export async function sampleForFilter( type: samplerType, spec: { type: samplerType, - ioConfig: deepSet(ioConfig, 'type', samplerType), + ioConfig, dataSchema: { dataSource: 'sample', - parser: { - type: parser.type, - parseSpec: Object.assign({}, parseSpec, { - dimensionsSpec: {}, - }), - }, + timestampSpec, + dimensionsSpec: {}, }, }, samplerConfig: Object.assign({}, BASE_SAMPLER_CONFIG, { @@ -536,7 +511,7 @@ export async function sampleForFilter( headerFromSampleResponse( sampleResponseHack, '__time', - ['__time'].concat(parserColumns), + ['__time'].concat(inputFormatColumns), ).concat(transforms.map(t => t.name)), ); } @@ -545,15 +520,11 @@ export async function sampleForFilter( type: samplerType, spec: { type: samplerType, - ioConfig: deepSet(ioConfig, 'type', samplerType), + ioConfig, dataSchema: { dataSource: 'sample', - parser: { - type: parser.type, - parseSpec: Object.assign({}, parseSpec, { - dimensionsSpec: specialDimensionSpec, // Hack Hack Hack - }), - }, + timestampSpec, + dimensionsSpec: specialDimensionSpec, // Hack Hack Hack transformSpec: { transforms, filter, @@ -574,12 +545,14 @@ export async function sampleForSchema( cacheKey: string | undefined, ): Promise { const samplerType = getSamplerType(spec); - const ioConfig: IoConfig = await scopeDownIngestSegmentFirehoseIntervalIfNeeded( + const ioConfig: IoConfig = await scopeDownIngestSegmentInputSourceIntervalIfNeeded( makeSamplerIoConfig(deepGet(spec, 'ioConfig'), samplerType, sampleStrategy), ); - const parser: Parser = deepGet(spec, 'dataSchema.parser') || {}; + + const timestampSpec: TimestampSpec = deepGet(spec, 'dataSchema.timestampSpec'); const transformSpec: TransformSpec = deepGet(spec, 'dataSchema.transformSpec') || ({} as TransformSpec); + const dimensionsSpec: DimensionsSpec = deepGet(spec, 'dataSchema.dimensionsSpec'); const metricsSpec: MetricSpec[] = deepGet(spec, 'dataSchema.metricsSpec') || []; const queryGranularity: string = deepGet(spec, 'dataSchema.granularitySpec.queryGranularity') || 'NONE'; @@ -588,15 +561,16 @@ export async function sampleForSchema( type: samplerType, spec: { type: samplerType, - ioConfig: deepSet(ioConfig, 'type', samplerType), + ioConfig, dataSchema: { dataSource: 'sample', - parser: whitelistKeys(parser, ['type', 'parseSpec']) as Parser, + timestampSpec, transformSpec, - metricsSpec, granularitySpec: { queryGranularity, }, + dimensionsSpec, + metricsSpec, }, }, samplerConfig: Object.assign({}, BASE_SAMPLER_CONFIG, { @@ -616,22 +590,16 @@ export async function sampleForExampleManifests( type: 'index', ioConfig: { type: 'index', - firehose: { type: 'http', uris: [exampleManifestUrl] }, + inputSource: { type: 'http', uris: [exampleManifestUrl] }, + inputFormat: { type: 'tsv', hasHeaderRow: true }, }, dataSchema: { dataSource: 'sample', - parser: { - type: 'string', - parseSpec: { - format: 'tsv', - timestampSpec: { - column: 'timestamp', - missingValue: '2010-01-01T00:00:00Z', - }, - dimensionsSpec: {}, - hasHeaderRow: true, - }, + timestampSpec: { + column: 'timestamp', + missingValue: '2010-01-01T00:00:00Z', }, + dimensionsSpec: {}, }, }, samplerConfig: { numRows: 50, timeoutMs: 10000, skipCache: true }, diff --git a/web-console/src/utils/utils.spec.ts b/web-console/src/utils/utils.spec.ts index dda3c73b3ae8..32f1d9417332 100644 --- a/web-console/src/utils/utils.spec.ts +++ b/web-console/src/utils/utils.spec.ts @@ -38,14 +38,17 @@ import { } from './sampler'; describe('test-utils', () => { - const ingestionSpec = { + const ingestionSpec: IngestionSpec = { type: 'index_parallel', ioConfig: { type: 'index_parallel', - firehose: { + inputSource: { type: 'http', uris: ['https://static.imply.io/data/wikipedia.json.gz'], }, + inputFormat: { + type: 'json', + }, }, tuningConfig: { type: 'index_parallel', @@ -57,21 +60,15 @@ describe('test-utils', () => { segmentGranularity: 'DAY', queryGranularity: 'HOUR', }, - parser: { - type: 'string', - parseSpec: { - format: 'json', - timestampSpec: { - column: 'timestamp', - format: 'iso', - }, - dimensionsSpec: {}, - }, + timestampSpec: { + column: 'timestamp', + format: 'iso', }, + dimensionsSpec: {}, }, }; it('spec-utils getSamplerType', () => { - expect(getSamplerType(ingestionSpec as IngestionSpec)).toMatchInlineSnapshot(`"index"`); + expect(getSamplerType(ingestionSpec)).toMatchInlineSnapshot(`"index"`); }); it('spec-utils headerFromSampleResponse', () => { expect(headerFromSampleResponse({ cacheKey: 'abc123', data: [] })).toMatchInlineSnapshot( @@ -79,34 +76,26 @@ describe('test-utils', () => { ); }); it('spec-utils sampleForParser', () => { - expect( - sampleForParser(ingestionSpec as IngestionSpec, 'start', 'abc123'), - ).toMatchInlineSnapshot(`Promise {}`); + expect(sampleForParser(ingestionSpec, 'start', 'abc123')).toMatchInlineSnapshot(`Promise {}`); }); it('spec-utils SampleSpec', () => { - expect(sampleForConnect(ingestionSpec as IngestionSpec, 'start')).toMatchInlineSnapshot( - `Promise {}`, - ); + expect(sampleForConnect(ingestionSpec, 'start')).toMatchInlineSnapshot(`Promise {}`); }); it('spec-utils sampleForTimestamp', () => { - expect( - sampleForTimestamp(ingestionSpec as IngestionSpec, 'start', 'abc123'), - ).toMatchInlineSnapshot(`Promise {}`); + expect(sampleForTimestamp(ingestionSpec, 'start', 'abc123')).toMatchInlineSnapshot( + `Promise {}`, + ); }); it('spec-utils sampleForTransform', () => { - expect( - sampleForTransform(ingestionSpec as IngestionSpec, 'start', 'abc123'), - ).toMatchInlineSnapshot(`Promise {}`); + expect(sampleForTransform(ingestionSpec, 'start', 'abc123')).toMatchInlineSnapshot( + `Promise {}`, + ); }); it('spec-utils sampleForFilter', () => { - expect( - sampleForFilter(ingestionSpec as IngestionSpec, 'start', 'abc123'), - ).toMatchInlineSnapshot(`Promise {}`); + expect(sampleForFilter(ingestionSpec, 'start', 'abc123')).toMatchInlineSnapshot(`Promise {}`); }); it('spec-utils sampleForSchema', () => { - expect( - sampleForSchema(ingestionSpec as IngestionSpec, 'start', 'abc123'), - ).toMatchInlineSnapshot(`Promise {}`); + expect(sampleForSchema(ingestionSpec, 'start', 'abc123')).toMatchInlineSnapshot(`Promise {}`); }); it('spec-utils sampleForExampleManifests', () => { expect(sampleForExampleManifests('abc123')).toMatchInlineSnapshot(`Promise {}`); @@ -114,14 +103,17 @@ describe('test-utils', () => { }); describe('druid-type.ts', () => { - const ingestionSpec = { + const ingestionSpec: IngestionSpec = { type: 'index_parallel', ioConfig: { type: 'index_parallel', - firehose: { + inputSource: { type: 'http', uris: ['https://static.imply.io/data/wikipedia.json.gz'], }, + inputFormat: { + type: 'json', + }, }, tuningConfig: { type: 'index_parallel', @@ -133,17 +125,11 @@ describe('druid-type.ts', () => { segmentGranularity: 'DAY', queryGranularity: 'HOUR', }, - parser: { - type: 'string', - parseSpec: { - format: 'json', - timestampSpec: { - column: 'timestamp', - format: 'iso', - }, - dimensionsSpec: {}, - }, + timestampSpec: { + column: 'timestamp', + format: 'iso', }, + dimensionsSpec: {}, }, }; it('spec-utils getSamplerType', () => { @@ -173,12 +159,7 @@ describe('druid-type.ts', () => { }); it('spec-utils updateSchemaWithSample', () => { expect( - updateSchemaWithSample( - ingestionSpec as IngestionSpec, - { header: ['header'], rows: [] }, - 'specific', - true, - ), + updateSchemaWithSample(ingestionSpec, { header: ['header'], rows: [] }, 'specific', true), ).toMatchInlineSnapshot(` Object { "dataSchema": Object { diff --git a/web-console/src/views/load-data-view/load-data-view.tsx b/web-console/src/views/load-data-view/load-data-view.tsx index e471c048e5d7..d4b1a93b568a 100644 --- a/web-console/src/views/load-data-view/load-data-view.tsx +++ b/web-console/src/views/load-data-view/load-data-view.tsx @@ -75,7 +75,7 @@ import { EMPTY_ARRAY, EMPTY_OBJECT, fillDataSourceNameIfNeeded, - fillParser, + fillInputFormat, FlattenField, getDimensionMode, getDimensionSpecFormFields, @@ -86,10 +86,10 @@ import { getIngestionDocLink, getIngestionImage, getIngestionTitle, + getInputFormatFormFields, getIoConfigFormFields, getIoConfigTuningFormFields, getMetricSpecFormFields, - getParseSpecFormFields, getPartitionRelatedTuningSpecFormFields, getRequiredModule, getRollup, @@ -101,6 +101,8 @@ import { hasParallelAbility, IngestionComboTypeWithExtra, IngestionSpec, + InputFormat, + inputFormatCanFlatten, invalidIoConfig, invalidTuningConfig, IoConfig, @@ -108,16 +110,13 @@ import { isEmptyIngestionSpec, isIngestSegment, isParallel, + issueWithInputFormat, issueWithIoConfig, - issueWithParser, isTask, joinFilter, MAX_INLINE_DATA_LENGTH, MetricSpec, normalizeSpec, - Parser, - ParseSpec, - parseSpecHasFlatten, splitFilter, TimestampSpec, Transform, @@ -127,7 +126,7 @@ import { import { deepDelete, deepGet, deepSet } from '../../utils/object-change'; import { ExampleManifest, - getOverlordModules, + getProxyOverlordModules, HeaderAndRows, headerAndRowsFromSampleResponse, SampleEntry, @@ -339,7 +338,7 @@ export class LoadDataView extends React.PureComponent { const stringValue = e.target.value.substr(0, MAX_INLINE_DATA_LENGTH); - this.updateSpecPreview(deepSet(spec, 'ioConfig.firehose.data', stringValue)); + this.updateSpecPreview(deepSet(spec, 'ioConfig.inputSource.data', stringValue)); }} /> ); @@ -1068,7 +1069,7 @@ export class LoadDataView extends React.PureComponent )} - {deepGet(spec, 'ioConfig.firehose.type') === 'local' && ( + {deepGet(spec, 'ioConfig.inputSource.type') === 'local' && ( This path must be available on the local filesystem of all Druid servers. @@ -1095,18 +1096,14 @@ export class LoadDataView extends React.PureComponent k !== '__time' && !aggregators[k]) .map(k => ({ @@ -1143,7 +1140,7 @@ export class LoadDataView extends React.PureComponent l.raw)), + fillInputFormat(spec, inputQueryState.data.data.map(l => l.raw)), ), ); } @@ -1158,14 +1155,12 @@ export class LoadDataView extends React.PureComponent - this.updateSpecPreview(deepSet(spec, 'dataSchema.parser.parseSpec', p)) - } + fields={getInputFormatFormFields()} + model={inputFormat} + onChange={p => this.updateSpecPreview(deepSet(spec, 'ioConfig.inputFormat', p))} /> {this.renderApplyButtonBar()} @@ -1307,11 +1300,7 @@ export class LoadDataView extends React.PureComponent { this.updateSpec( - deepSet( - spec, - 'dataSchema.parser.parseSpec.flattenSpec.fields', - sugestedFlattenFields, - ), + deepSet(spec, 'ioConfig.inputFormat.flattenSpec.fields', sugestedFlattenFields), ); }} /> @@ -1335,7 +1324,7 @@ export class LoadDataView extends React.PureComponent { this.setState({ @@ -1381,7 +1370,7 @@ export class LoadDataView extends React.PureComponent