Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions web-console/script/druid
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ function _build_distribution() {
(
# Add HEAD as an allowed HTTP method since this is how we check when the Druid service is ready.
cd "$(_get_code_root)" \
&& mvn -Pdist,skip-static-checks,skip-tests -Dmaven.javadoc.skip=true -q -T1C install \
&& mvn -Pdist,bundle-contrib-exts,skip-static-checks,skip-tests -Dforbiddenapis.skip=true -Dcheckstyle.skip=true -Dpmd.skip=true -Dmaven.javadoc.skip=true -Danimal.sniffer.skip=true -Denforcer.skip=true -Dcyclonedx.skip=true -q -T1C install \
&& cd distribution/target \
&& tar xzf "apache-druid-$(_get_druid_version)-bin.tar.gz" \
&& cd apache-druid-$(_get_druid_version) \
&& mkdir -p extensions/druid-testing-tools \
&& cp "$(_get_code_root)/extensions-core/testing-tools/target/druid-testing-tools-$(_get_druid_version).jar" extensions/druid-testing-tools/ \
&& mkdir -p extensions/druid-compressed-bigdecimal \
&& cp "$(_get_code_root)/extensions-contrib/compressed-bigdecimal/target/druid-compressed-bigdecimal-$(_get_druid_version).jar" extensions/druid-compressed-bigdecimal/ \
&& echo -e "\n\ndruid.extensions.loadList=[\"druid-hdfs-storage\", \"druid-kafka-indexing-service\", \"druid-multi-stage-query\", \"druid-testing-tools\", \"druid-bloom-filter\", \"druid-datasketches\", \"druid-histogram\", \"druid-stats\", \"druid-compressed-bigdecimal\"]" >> conf/druid/auto/_common/common.runtime.properties \
&& echo -e "\n\ndruid.extensions.loadList=[\"druid-hdfs-storage\", \"druid-kafka-indexing-service\", \"druid-multi-stage-query\", \"druid-testing-tools\", \"druid-bloom-filter\", \"druid-datasketches\", \"druid-histogram\", \"druid-stats\", \"druid-compressed-bigdecimal\", \"druid-parquet-extensions\", \"druid-deltalake-extensions\"]" >> conf/druid/auto/_common/common.runtime.properties \
&& echo -e "\n\ndruid.server.http.allowedHttpMethods=[\"HEAD\"]" >> conf/druid/auto/_common/common.runtime.properties \
&& echo -e "\n\ndruid.export.storage.baseDir=/" >> conf/druid/auto/_common/common.runtime.properties \
)
Expand Down
5 changes: 2 additions & 3 deletions web-console/src/components/json-input/json-input.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,8 @@ import AceEditor from 'react-ace';

import './json-input.scss';

function parseHjson(str: string) {
// Throwing on empty input is more consistent with how JSON.parse works
if (str.trim() === '') throw new Error('empty hjson');
function parseHjson(str: string): any {
if (str.trim() === '') return;
return Hjson.parse(str);
}

Expand Down
10 changes: 8 additions & 2 deletions web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,10 @@ export function isDruidSource(spec: Partial<IngestionSpec>): boolean {
return deepGet(spec, 'spec.ioConfig.inputSource.type') === 'druid';
}

export function isFixedFormatSource(spec: Partial<IngestionSpec>): boolean {
return oneOf(deepGet(spec, 'spec.ioConfig.inputSource.type'), 'druid', 'delta');
}

export function getPossibleSystemFieldsForSpec(spec: Partial<IngestionSpec>): string[] {
const inputSource = deepGet(spec, 'spec.ioConfig.inputSource');
if (!inputSource) return [];
Expand Down Expand Up @@ -1064,7 +1068,6 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
label: 'Delta filter',
type: 'json',
placeholder: '{"type": "=", "column": "name", "value": "foo"}',
defaultValue: {},
info: (
<>
<ExternalLink
Expand All @@ -1081,7 +1084,7 @@ export function getIoConfigFormFields(ingestionComboType: IngestionComboType): F
label: 'Delta snapshot version',
type: 'number',
placeholder: '(latest)',
defaultValue: {},
zeroMeansUndefined: true,
info: (
<>
The snapshot version to read from the Delta table. By default, the latest snapshot is
Expand Down Expand Up @@ -1616,6 +1619,9 @@ export function guessDataSourceNameFromInputSource(inputSource: InputSource): st
return actualPath ? actualPath.path : uriPath ? filenameFromPath(uriPath) : undefined;
}

case 'delta':
return inputSource.tablePath ? filenameFromPath(inputSource.tablePath) : undefined;

case 'http':
return Array.isArray(inputSource.uris) ? filenameFromPath(inputSource.uris[0]) : undefined;

Expand Down
3 changes: 1 addition & 2 deletions web-console/src/druid-models/input-source/input-source.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,6 @@ export const INPUT_SOURCE_FIELDS: Field<InputSource>[] = [
type: 'json',
placeholder: '{"type": "=", "column": "name", "value": "foo"}',
defined: typeIsKnown(KNOWN_TYPES, 'delta'),
required: false,
info: (
<>
<ExternalLink href={`${getLink('DOCS')}/ingestion/input-sources/#delta-filter-object`}>
Expand All @@ -668,8 +667,8 @@ export const INPUT_SOURCE_FIELDS: Field<InputSource>[] = [
label: 'Delta snapshot version',
type: 'number',
placeholder: '(latest)',
zeroMeansUndefined: true,
defined: typeIsKnown(KNOWN_TYPES, 'delta'),
required: false,
info: (
<>
The snapshot version to read from the Delta table. By default, the latest snapshot is read.
Expand Down
13 changes: 9 additions & 4 deletions web-console/src/utils/sampler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/

import { dedupe, F, SqlExpression, SqlFunction } from '@druid-toolkit/query';
import type { CancelToken } from 'axios';
import * as JSONBig from 'json-bigint-native';

import type {
Expand All @@ -40,6 +41,7 @@ import {
getSpecType,
getTimestampSchema,
isDruidSource,
isFixedFormatSource,
PLACEHOLDER_TIMESTAMP_SPEC,
REINDEX_TIMESTAMP_SPEC,
TIME_COLUMN,
Expand Down Expand Up @@ -187,12 +189,15 @@ export async function getProxyOverlordModules(): Promise<string[]> {
export async function postToSampler(
sampleSpec: SampleSpec,
forStr: string,
cancelToken?: CancelToken,
): Promise<SampleResponse> {
sampleSpec = fixSamplerLookups(fixSamplerTypes(sampleSpec));

let sampleResp: any;
try {
sampleResp = await Api.instance.post(`/druid/indexer/v1/sampler?for=${forStr}`, sampleSpec);
sampleResp = await Api.instance.post(`/druid/indexer/v1/sampler?for=${forStr}`, sampleSpec, {
cancelToken,
});
} catch (e) {
throw new Error(getDruidErrorMessage(e));
}
Expand Down Expand Up @@ -269,8 +274,7 @@ export async function sampleForConnect(
sampleStrategy,
);

const reingestMode = isDruidSource(spec);
if (!reingestMode) {
if (!isFixedFormatSource(spec)) {
ioConfig = deepSet(
ioConfig,
'inputFormat',
Expand All @@ -282,6 +286,7 @@ export async function sampleForConnect(
);
}

const reingestMode = isDruidSource(spec);
const sampleSpec: SampleSpec = {
type: samplerType,
spec: {
Expand All @@ -290,7 +295,7 @@ export async function sampleForConnect(
dataSchema: {
dataSource: 'sample',
timestampSpec: reingestMode ? REINDEX_TIMESTAMP_SPEC : PLACEHOLDER_TIMESTAMP_SPEC,
dimensionsSpec: {},
dimensionsSpec: { useSchemaDiscovery: true },
granularitySpec: {
rollup: false,
},
Expand Down
77 changes: 45 additions & 32 deletions web-console/src/views/load-data-view/load-data-view.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ import {
invalidPartitionConfig,
isDruidSource,
isEmptyIngestionSpec,
isFixedFormatSource,
isKafkaOrKinesis,
isStreamingSpec,
issueWithIoConfig,
Expand Down Expand Up @@ -267,26 +268,27 @@ function showBlankLine(line: SampleEntry): string {

function formatSampleEntries(
sampleEntries: SampleEntry[],
specialSource: undefined | 'druid' | 'kafka' | 'kinesis',
): string {
if (!sampleEntries.length) return 'No data returned from sampler';
specialSource: undefined | 'fixedFormat' | 'druid' | 'kafka' | 'kinesis',
): string[] {
if (!sampleEntries.length) return ['No data returned from sampler'];

switch (specialSource) {
case 'fixedFormat':
return sampleEntries.map(l => JSONBig.stringify(l.parsed));

case 'druid':
return sampleEntries.map(showDruidLine).join('\n');
return sampleEntries.map(showDruidLine);

case 'kafka':
return sampleEntries.map(showKafkaLine).join('\n');
return sampleEntries.map(showKafkaLine);

case 'kinesis':
return sampleEntries.map(showKinesisLine).join('\n');
return sampleEntries.map(showKinesisLine);

default:
return (
sampleEntries.every(l => !l.parsed)
? sampleEntries.map(showBlankLine)
: sampleEntries.map(showRawLine)
).join('\n');
return sampleEntries.every(l => !l.parsed)
? sampleEntries.map(showBlankLine)
: sampleEntries.map(showRawLine);
}
}

Expand Down Expand Up @@ -553,18 +555,19 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat

isStepEnabled(step: Step): boolean {
const { spec, cacheRows } = this.state;
const druidSource = isDruidSource(spec);
const ioConfig: IoConfig = deepGet(spec, 'spec.ioConfig') || EMPTY_OBJECT;

switch (step) {
case 'connect':
return Boolean(spec.type);

case 'parser':
return Boolean(!druidSource && spec.type && !issueWithIoConfig(ioConfig));
return Boolean(!isFixedFormatSource(spec) && spec.type && !issueWithIoConfig(ioConfig));

case 'timestamp':
return Boolean(!druidSource && cacheRows && deepGet(spec, 'spec.dataSchema.timestampSpec'));
return Boolean(
!isDruidSource(spec) && cacheRows && deepGet(spec, 'spec.dataSchema.timestampSpec'),
);

case 'transform':
case 'filter':
Expand Down Expand Up @@ -1258,7 +1261,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
const deltaState: Partial<LoadDataViewState> = {
inputQueryState: new QueryState({ data: sampleResponse }),
};
if (isDruidSource(spec)) {
if (isFixedFormatSource(spec)) {
deltaState.cacheRows = getCacheRowsFromSampleResponse(sampleResponse);
}
this.setState(deltaState as LoadDataViewState);
Expand All @@ -1270,8 +1273,15 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
const specType = getSpecType(spec);
const ioConfig: IoConfig = deepGet(spec, 'spec.ioConfig') || EMPTY_OBJECT;
const inlineMode = deepGet(spec, 'spec.ioConfig.inputSource.type') === 'inline';
const fixedFormatSource = isFixedFormatSource(spec);
const druidSource = isDruidSource(spec);
const specialSource = druidSource ? 'druid' : isKafkaOrKinesis(specType) ? specType : undefined;
const specialSource = druidSource
? 'druid'
: fixedFormatSource
? 'fixedFormat'
: isKafkaOrKinesis(specType)
? specType
: undefined;

let mainFill: JSX.Element | string;
if (inlineMode) {
Expand Down Expand Up @@ -1303,7 +1313,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
<TextArea
className="raw-lines"
readOnly
value={formatSampleEntries(inputData, specialSource)}
value={formatSampleEntries(inputData, specialSource).join('\n')}
/>
)}
{inputQueryState.isLoading() && <Loader />}
Expand Down Expand Up @@ -1375,7 +1385,7 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
</div>
{this.renderNextBar({
disabled: !inputQueryState.data,
nextStep: druidSource ? 'transform' : 'parser',
nextStep: druidSource ? 'transform' : fixedFormatSource ? 'timestamp' : 'parser',
onNextStep: () => {
if (!inputQueryState.data) return false;
const inputData = inputQueryState.data;
Expand Down Expand Up @@ -1423,6 +1433,15 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
}
}

this.updateSpec(fillDataSourceNameIfNeeded(newSpec));
}
if (fixedFormatSource) {
const newSpec = deepSet(
spec,
'spec.dataSchema.timestampSpec',
getTimestampSpec(inputQueryState.data),
);

this.updateSpec(fillDataSourceNameIfNeeded(newSpec));
} else {
const issue = issueWithSampleData(
Expand Down Expand Up @@ -1675,21 +1694,15 @@ export class LoadDataView extends React.PureComponent<LoadDataViewProps, LoadDat
disabled: !parserQueryState.data,
onNextStep: () => {
if (!parserQueryState.data) return false;
let possibleTimestampSpec: TimestampSpec;
if (isDruidSource(spec)) {
possibleTimestampSpec = {
column: TIME_COLUMN,
format: 'auto',
};
} else {
possibleTimestampSpec = getTimestampSpec(parserQueryState.data);
}

if (possibleTimestampSpec) {
const newSpec = deepSet(spec, 'spec.dataSchema.timestampSpec', possibleTimestampSpec);
this.updateSpec(newSpec);
}
const possibleTimestampSpec = isDruidSource(spec)
? {
column: TIME_COLUMN,
format: 'auto',
}
: getTimestampSpec(parserQueryState.data);

const newSpec = deepSet(spec, 'spec.dataSchema.timestampSpec', possibleTimestampSpec);
this.updateSpec(newSpec);
return true;
},
})}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ export const SqlDataLoaderView = React.memo(function SqlDataLoaderView(
<TitleFrame title="Load data" subtitle="Select input type">
<InputSourceStep
initInputSource={inputSource}
mode="sampler"
onSet={(inputSource, inputFormat) => {
setExternalConfigStep({ inputSource, inputFormat });
}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ export const ConnectExternalDataDialog = React.memo(function ConnectExternalData
) : (
<InputSourceStep
initInputSource={inputSource}
mode="sampler"
onSet={(inputSource, inputFormat, partitionedByHint) => {
setExternalConfigStep({ inputSource, inputFormat, partitionedByHint });
}}
Expand Down
Loading