Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 99 additions & 12 deletions web-console/src/druid-models/input-format/input-format.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import React from 'react';
import type { Field } from '../../components';
import { AutoForm, ExternalLink } from '../../components';
import { getLink } from '../../links';
import { compact, deepGet, deepSet, oneOf, typeIsKnown } from '../../utils';
import { compact, deepGet, deepSet, oneOf, oneOfKnown, typeIsKnown } from '../../utils';
import type { FlattenSpec } from '../flatten-spec/flatten-spec';

export interface InputFormat {
Expand Down Expand Up @@ -58,6 +58,7 @@ const KNOWN_TYPES = [
'orc',
'avro_ocf',
'avro_stream',
'protobuf',
'regex',
'kafka',
'javascript',
Expand Down Expand Up @@ -230,6 +231,44 @@ function generateInputFormatFields(streaming: boolean) {
defined: typeIsKnown(KNOWN_TYPES, 'csv', 'tsv', 'regex'),
info: <>A custom delimiter for multi-value dimensions.</>,
},
{
name: 'avroBytesDecoder',
type: 'json',
defined: typeIsKnown(KNOWN_TYPES, 'avro_stream'),
required: true,
placeholder: `{ type: "schema_repo", ... }`,
info: (
<>
<p>Specifies how to decode bytes to Avro record.</p>
<p>
For more details refer to the{' '}
<ExternalLink href={`${getLink('DOCS')}/ingestion/data-formats/#avro-bytes-decoder`}>
documentation
</ExternalLink>
.
</p>
</>
),
},
{
name: 'schema',
type: 'json',
defined: typeIsKnown(KNOWN_TYPES, 'avro_ocf'),
info: (
<>
Define a reader schema to be used when parsing Avro records. This is useful when parsing
multiple versions of Avro OCF file data.
</>
),
},
{
name: 'protoBytesDecoder',
type: 'json',
defined: typeIsKnown(KNOWN_TYPES, 'protobuf'),
required: true,
placeholder: `{ ... }`,
info: <>Specifies how to decode bytes to Protobuf record.</>,
},
{
name: 'binaryAsString',
type: 'boolean',
Expand Down Expand Up @@ -320,7 +359,7 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
name: 'keyFormat.featureSpec',
label: 'Kafka key JSON parser features',
type: 'json',
defined: inputFormat => deepGet(inputFormat, 'keyFormat.type') === 'json',
defined: inputFormat => oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'json'),
hideInMore: true,
info: (
<>
Expand All @@ -342,7 +381,7 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
name: 'keyFormat.assumeNewlineDelimited',
label: 'Kafka key assume newline delimited',
type: 'boolean',
defined: inputFormat => deepGet(inputFormat, 'keyFormat.type') === 'json',
defined: inputFormat => oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'json'),
disabled: inputFormat => Boolean(inputFormat.useJsonNodeReader),
defaultValue: false,
hideInMore: true,
Expand Down Expand Up @@ -370,7 +409,7 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
name: 'keyFormat.useJsonNodeReader',
label: 'Kafka key use JSON node reader',
type: 'boolean',
defined: inputFormat => deepGet(inputFormat, 'keyFormat.type') === 'json',
defined: inputFormat => oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'json'),
disabled: inputFormat => Boolean(inputFormat.assumeNewlineDelimited),
defaultValue: false,
hideInMore: true,
Expand Down Expand Up @@ -400,22 +439,24 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
type: 'string',
defaultValue: '\t',
suggestions: ['\t', ';', '|', '#'],
defined: inputFormat => deepGet(inputFormat, 'keyFormat.type') === 'tsv',
defined: inputFormat => oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'tsv'),
info: <>A custom delimiter for data values.</>,
},
{
name: 'keyFormat.pattern',
label: 'Kafka key pattern',
type: 'string',
defined: inputFormat => deepGet(inputFormat, 'keyFormat.type') === 'regex',
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'regex'),
required: true,
},
{
name: 'keyFormat.skipHeaderRows',
label: 'Kafka key skip header rows',
type: 'number',
defaultValue: 0,
defined: inputFormat => oneOf(deepGet(inputFormat, 'keyFormat.type'), 'csv', 'tsv'),
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'csv', 'tsv'),
min: 0,
info: (
<>
Expand All @@ -427,7 +468,8 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
name: 'keyFormat.findColumnsFromHeader',
label: 'Kafka key find columns from header',
type: 'boolean',
defined: inputFormat => oneOf(deepGet(inputFormat, 'keyFormat.type'), 'csv', 'tsv'),
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'csv', 'tsv'),
required: true,
hideInMore: true,
info: (
Expand Down Expand Up @@ -463,12 +505,57 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
type: 'string',
defaultValue: '\x01',
suggestions: ['\x01', '\x00'],
defined: inputFormat => oneOf(deepGet(inputFormat, 'keyFormat.type'), 'csv', 'tsv', 'regex'),
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'csv', 'tsv', 'regex'),
info: <>A custom delimiter for multi-value dimensions.</>,
},
{
name: 'keyFormat.avroBytesDecoder',
label: 'Kafka key Avro bytes decoder',
type: 'json',
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'avro_stream'),
required: true,
placeholder: `{ type: "schema_repo", ... }`,
info: (
<>
<p>Specifies how to decode bytes to Avro record.</p>
<p>
For more details refer to the{' '}
<ExternalLink href={`${getLink('DOCS')}/ingestion/data-formats/#avro-bytes-decoder`}>
documentation
</ExternalLink>
.
</p>
</>
),
},
{
name: 'keyFormat.schema',
label: 'Key format schema',
type: 'json',
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'avro_ocf'),
info: (
<>
Define a reader schema to be used when parsing Avro records. This is useful when parsing
multiple versions of Avro OCF file data.
</>
),
},
{
name: 'keyFormat.protoBytesDecoder',
label: 'Kafka key proto bytes decoder',
type: 'json',
defined: inputFormat =>
oneOfKnown(deepGet(inputFormat, 'keyFormat.type'), KNOWN_TYPES, 'protobuf'),
required: true,
placeholder: `{ ... }`,
info: <>Specifies how to decode bytes to Protobuf record.</>,
},
{
name: 'keyFormat.binaryAsString',
label: 'Kafka key list binary as string',
label: 'Kafka key binary as string',
type: 'boolean',
defaultValue: false,
defined: inputFormat =>
Expand Down Expand Up @@ -498,7 +585,7 @@ export const KAFKA_METADATA_INPUT_FORMAT_FIELDS: Field<InputFormat>[] = [
label: 'Kafka header format type',
type: 'string',
defined: typeIsKnown(KNOWN_TYPES, 'kafka'),
placeholder: `(don't parse Kafka herders)`,
placeholder: `(don't parse Kafka headers)`,
suggestions: [undefined, 'string'],
},
{
Expand Down Expand Up @@ -529,5 +616,5 @@ export function inputFormatCanProduceNestedData(inputFormat: InputFormat): boole
inputFormat.valueFormat && inputFormatCanProduceNestedData(inputFormat.valueFormat),
);
}
return oneOf(inputFormat.type, 'json', 'parquet', 'orc', 'avro_ocf', 'avro_stream');
return oneOf(inputFormat.type, 'json', 'parquet', 'orc', 'avro_ocf', 'avro_stream', 'protobuf');
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

.destination-pages-pane {
.download-button {
margin-top: 4px;
margin-left: 2px;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import {
wait,
} from '../../../utils';

import './destination-pages-pane.scss';

type ResultFormat = 'object' | 'array' | 'objectLines' | 'arrayLines' | 'csv';

const RESULT_FORMATS: ResultFormat[] = ['objectLines', 'object', 'arrayLines', 'array', 'csv'];
Expand Down Expand Up @@ -86,24 +88,28 @@ export const DestinationPagesPane = React.memo(function DestinationPagesPane(
);
}

function getPageFilename(pageIndex: number) {
return `${id}_page${pageIndex}.${desiredExtension}`;
function getPageFilename(pageIndex: number, numPages: number) {
const numPagesString = String(numPages);
const pageNumberString = String(pageIndex + 1).padStart(numPagesString.length, '0');
return `${id}_page_${pageNumberString}_of_${numPagesString}.${desiredExtension}`;
}

async function downloadAllPages() {
if (!pages) return;
const numPages = pages.length;
for (let i = 0; i < pages.length; i++) {
downloadUrl(getPageUrl(i), getPageFilename(i));
downloadUrl(getPageUrl(i), getPageFilename(i, numPages));
await wait(100);
}
}

const numPages = pages.length;
return (
<div className="execution-details-pane">
<div className="destination-pages-pane">
<p>
{`${
typeof numTotalRows === 'number' ? pluralIfNeeded(numTotalRows, 'row') : 'Results'
} have been written to ${pluralIfNeeded(pages.length, 'page')}. `}
} have been written to ${pluralIfNeeded(numPages, 'page')}. `}
</p>
<p>
Format when downloading:{' '}
Expand Down Expand Up @@ -133,7 +139,7 @@ export const DestinationPagesPane = React.memo(function DestinationPagesPane(
<Button
intent={Intent.PRIMARY}
icon={IconNames.DOWNLOAD}
text={`Download all data (${pluralIfNeeded(pages.length, 'file')})`}
text={`Download all data (${pluralIfNeeded(numPages, 'file')})`}
onClick={() => void downloadAllPages()}
/>
)}
Expand All @@ -142,11 +148,11 @@ export const DestinationPagesPane = React.memo(function DestinationPagesPane(
data={pages}
loading={false}
sortable={false}
defaultPageSize={clamp(pages.length, 1, SMALL_TABLE_PAGE_SIZE)}
showPagination={pages.length > SMALL_TABLE_PAGE_SIZE}
defaultPageSize={clamp(numPages, 1, SMALL_TABLE_PAGE_SIZE)}
showPagination={numPages > SMALL_TABLE_PAGE_SIZE}
columns={[
{
Header: 'Page number',
Header: 'Page ID',
id: 'id',
accessor: 'id',
className: 'padded',
Expand Down Expand Up @@ -175,11 +181,12 @@ export const DestinationPagesPane = React.memo(function DestinationPagesPane(
width: 300,
Cell: ({ value }) => (
<AnchorButton
className="download-button"
icon={IconNames.DOWNLOAD}
text="Download"
minimal
href={getPageUrl(value)}
download={getPageFilename(value)}
download={getPageFilename(value, numPages)}
/>
),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,9 @@ export const InputSourceStep = React.memo(function InputSourceStep(props: InputS
<li>
<ExternalLink href="https://avro.apache.org">Avro</ExternalLink>
</li>
<li>
<ExternalLink href="https://protobuf.dev">Protobuf</ExternalLink>
</li>
<li>
Any line format that can be parsed with a custom regular expression (regex)
</li>
Expand Down