Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions web-console/src/components/segment-timeline/segment-timeline.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -269,18 +269,23 @@ ORDER BY "start" DESC`;
};

this.dataQueryManager = new QueryManager({
processQuery: async ({ capabilities, dateRange }) => {
processQuery: async ({ capabilities, dateRange }, cancelToken) => {
let intervals: IntervalRow[];
let datasources: string[];
if (capabilities.hasSql()) {
intervals = await queryDruidSql({
query: SegmentTimeline.getSqlQuery(dateRange),
});
intervals = await queryDruidSql(
{
query: SegmentTimeline.getSqlQuery(dateRange),
},
cancelToken,
);
datasources = uniq(intervals.map(r => r.datasource).sort());
} else if (capabilities.hasCoordinatorAccess()) {
const startIso = dateRange[0].toISOString();

datasources = (await Api.instance.get(`/druid/coordinator/v1/datasources`)).data;
datasources = (
await Api.instance.get(`/druid/coordinator/v1/datasources`, { cancelToken })
).data;
intervals = (
await Promise.all(
datasources.map(async datasource => {
Expand All @@ -289,6 +294,7 @@ ORDER BY "start" DESC`;
`/druid/coordinator/v1/datasources/${Api.encodePath(
datasource,
)}/intervals?simple`,
{ cancelToken },
)
).data;

Expand Down
4 changes: 2 additions & 2 deletions web-console/src/components/show-json/show-json.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ export const ShowJson = React.memo(function ShowJson(props: ShowJsonProps) {
const { endpoint, transform, downloadFilename } = props;

const [jsonState] = useQueryManager<null, string>({
processQuery: async () => {
const resp = await Api.instance.get(endpoint);
processQuery: async (_, cancelToken) => {
const resp = await Api.instance.get(endpoint, { cancelToken });
let data = resp.data;
if (transform) data = transform(data);
return typeof data === 'string' ? data : JSONBig.stringify(data, undefined, 2);
Expand Down
3 changes: 2 additions & 1 deletion web-console/src/components/show-log/show-log.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ export class ShowLog extends React.PureComponent<ShowLogProps, ShowLogState> {
};

this.showLogQueryManager = new QueryManager({
processQuery: async () => {
processQuery: async (_, cancelToken) => {
const { endpoint, tailOffset } = this.props;
const resp = await Api.instance.get(
endpoint + (tailOffset ? `?offset=-${tailOffset}` : ''),
{ cancelToken },
);
const data = resp.data;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,10 @@ export const SupervisorHistoryPanel = React.memo(function SupervisorHistoryPanel
const [diffIndex, setDiffIndex] = useState(-1);
const [historyState] = useQueryManager<string, SupervisorHistoryEntry[]>({
initQuery: supervisorId,
processQuery: async supervisorId => {
processQuery: async (supervisorId, cancelToken) => {
const resp = await Api.instance.get(
`/druid/indexer/v1/supervisor/${Api.encodePath(supervisorId)}/history`,
{ cancelToken },
);
return resp.data.map((vs: SupervisorHistoryEntry) => deepSet(vs, 'spec', cleanSpec(vs.spec)));
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ export const CompactionDynamicConfigDialog = React.memo(function CompactionDynam

useQueryManager<null, Record<string, any>>({
initQuery: null,
processQuery: async () => {
processQuery: async (_, cancelToken) => {
try {
const c = (await Api.instance.get('/druid/coordinator/v1/config/compaction')).data;
const c = (
await Api.instance.get('/druid/coordinator/v1/config/compaction', { cancelToken })
).data;
setDynamicConfig({
compactionTaskSlotRatio: c.compactionTaskSlotRatio ?? DEFAULT_RATIO,
maxCompactionTaskSlots: c.maxCompactionTaskSlots ?? DEFAULT_MAX,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,11 @@ export const CompactionHistoryDialog = React.memo(function CompactionHistoryDial
const [diffIndex, setDiffIndex] = useState(-1);
const [historyState] = useQueryManager<string, CompactionHistoryEntry[]>({
initQuery: datasource,
processQuery: async datasource => {
processQuery: async (datasource, cancelToken) => {
try {
const resp = await Api.instance.get(
`/druid/coordinator/v1/config/compaction/${Api.encodePath(datasource)}/history?count=20`,
{ cancelToken },
);
return resp.data;
} catch (e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,19 @@ export const CoordinatorDynamicConfigDialog = React.memo(function CoordinatorDyn

const [historyRecordsState] = useQueryManager<null, any[]>({
initQuery: null,
processQuery: async () => {
const historyResp = await Api.instance.get(`/druid/coordinator/v1/config/history?count=100`);
processQuery: async (_, cancelToken) => {
const historyResp = await Api.instance.get(`/druid/coordinator/v1/config/history?count=100`, {
cancelToken,
});
return historyResp.data;
},
});

useQueryManager<null, Record<string, any>>({
initQuery: null,
processQuery: async () => {
processQuery: async (_, cancelToken) => {
try {
const configResp = await Api.instance.get('/druid/coordinator/v1/config');
const configResp = await Api.instance.get('/druid/coordinator/v1/config', { cancelToken });
setDynamicConfig(configResp.data || {});
} catch (e) {
AppToaster.show({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ export const DatasourceColumnsTable = React.memo(function DatasourceColumnsTable
) {
const [columnsState] = useQueryManager<string, DatasourceColumnsTableRow[]>({
initQuery: props.datasource,
processQuery: async (datasourceId: string) => {
return await queryDruidSql<ColumnMetadata>({
query: `SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS
processQuery: async (datasourceId, cancelToken) => {
return await queryDruidSql<ColumnMetadata>(
{
query: `SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'druid' AND TABLE_NAME = ${L(datasourceId)}`,
});
},
cancelToken,
);
},
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@ export const LookupValuesTable = React.memo(function LookupValuesTable(
props: LookupValuesTableProps,
) {
const [entriesState] = useQueryManager<string, LookupRow[]>({
processQuery: async (lookupId: string) => {
return await queryDruidSql<LookupRow>({
query: `SELECT "k", "v" FROM ${N('lookup').table(lookupId)} LIMIT 5000`,
});
processQuery: async (lookupId, cancelToken) => {
return await queryDruidSql<LookupRow>(
{
query: `SELECT "k", "v" FROM ${N('lookup').table(lookupId)} LIMIT 5000`,
},
cancelToken,
);
},
initQuery: props.lookupId,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,19 @@ export const OverlordDynamicConfigDialog = React.memo(function OverlordDynamicCo

const [historyRecordsState] = useQueryManager<null, any[]>({
initQuery: null,
processQuery: async () => {
const historyResp = await Api.instance.get(`/druid/indexer/v1/worker/history?count=100`);
processQuery: async (_, cancelToken) => {
const historyResp = await Api.instance.get(`/druid/indexer/v1/worker/history?count=100`, {
cancelToken,
});
return historyResp.data;
},
});

useQueryManager<null, Record<string, any>>({
initQuery: null,
processQuery: async () => {
processQuery: async (_, cancelToken) => {
try {
const configResp = await Api.instance.get(`/druid/indexer/v1/worker`);
const configResp = await Api.instance.get(`/druid/indexer/v1/worker`, { cancelToken });
setDynamicConfig(configResp.data || {});
} catch (e) {
AppToaster.show({
Expand Down
18 changes: 12 additions & 6 deletions web-console/src/dialogs/retention-dialog/retention-dialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,24 @@ export const RetentionDialog = React.memo(function RetentionDialog(props: Retent

const [tiersState] = useQueryManager<Capabilities, string[]>({
initQuery: capabilities,
processQuery: async capabilities => {
processQuery: async (capabilities, cancelToken) => {
if (capabilities.hasSql()) {
const sqlResp = await queryDruidSql<{ tier: string }>({
query: `SELECT "tier"
const sqlResp = await queryDruidSql<{ tier: string }>(
{
query: `SELECT "tier"
FROM "sys"."servers"
WHERE "server_type" = 'historical'
GROUP BY 1
ORDER BY 1`,
});
},
cancelToken,
);

return sqlResp.map(d => d.tier);
} else if (capabilities.hasCoordinatorAccess()) {
const allServiceResp = await Api.instance.get('/druid/coordinator/v1/servers?simple');
const allServiceResp = await Api.instance.get('/druid/coordinator/v1/servers?simple', {
cancelToken,
});
return filterMap(allServiceResp.data, (s: any) =>
s.type === 'historical' ? s.tier : undefined,
);
Expand All @@ -78,9 +83,10 @@ ORDER BY 1`,

const [historyQueryState] = useQueryManager<string, any[]>({
initQuery: props.datasource,
processQuery: async datasource => {
processQuery: async (datasource, cancelToken) => {
const historyResp = await Api.instance.get(
`/druid/coordinator/v1/rules/${Api.encodePath(datasource)}/history?count=200`,
{ cancelToken },
);
return historyResp.data;
},
Expand Down
4 changes: 2 additions & 2 deletions web-console/src/dialogs/status-dialog/status-dialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ export const StatusDialog = React.memo(function StatusDialog(props: StatusDialog

const [responseState] = useQueryManager<null, StatusResponse>({
initQuery: null,
processQuery: async () => {
const resp = await Api.instance.get(`/status`);
processQuery: async (_, cancelToken) => {
const resp = await Api.instance.get(`/status`, { cancelToken });
return resp.data;
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,10 @@ export const SupervisorResetOffsetsDialog = React.memo(function SupervisorResetO

const [statusResp] = useQueryManager<string, SupervisorStatus>({
initQuery: supervisorId,
processQuery: async supervisorId => {
processQuery: async (supervisorId, cancelToken) => {
const statusResp = await Api.instance.get(
`/druid/indexer/v1/supervisor/${Api.encodePath(supervisorId)}/status`,
{ cancelToken },
);
return statusResp.data;
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ export const SupervisorStatisticsTable = React.memo(function SupervisorStatistic
SupervisorStatisticsTableRow[]
>({
initQuery: null,
processQuery: async () => {
const resp = await Api.instance.get<SupervisorStats>(statsEndpoint);
processQuery: async (_, cancelToken) => {
const resp = await Api.instance.get<SupervisorStats>(statsEndpoint, { cancelToken });
return normalizeSupervisorStatisticsResults(resp.data);
},
});
Expand Down
37 changes: 34 additions & 3 deletions web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import { AutoForm, ExternalLink } from '../../components';
import { IndexSpecDialog } from '../../dialogs/index-spec-dialog/index-spec-dialog';
import { getLink } from '../../links';
import {
allowKeys,
deepDelete,
deepGet,
deepMove,
deepSet,
deepSetIfUnset,
deleteKeys,
EMPTY_ARRAY,
EMPTY_OBJECT,
filterMap,
Expand Down Expand Up @@ -79,6 +79,11 @@ export interface IngestionSpec {
readonly spec: IngestionSpecInner;
readonly context?: { useConcurrentLocks?: boolean };
readonly suspended?: boolean;

// Added by the server
readonly id?: string;
readonly groupId?: string;
readonly resource?: any;
}

export interface IngestionSpecInner {
Expand Down Expand Up @@ -490,11 +495,37 @@ export function normalizeSpec(spec: Partial<IngestionSpec>): IngestionSpec {
}

/**
* Make sure that any extra junk in the spec other than 'type', 'spec', and 'context' is removed
* This function cleans a spec that was returned by the server so that it can be re-opened in the data loader to be
* submitted again.
* @param spec - the spec to clean
*/
export function cleanSpec(spec: Partial<IngestionSpec>): Partial<IngestionSpec> {
return allowKeys(spec, ['type', 'spec', 'context', 'suspended']) as IngestionSpec;
const specSpec = spec.spec;

// For backwards compatible reasons the contents of `spec` (`dataSchema`, `ioConfig`, and `tuningConfig`)
// can be duplicated at the top level. This function removes these duplicates (if needed) so that there is no confusion
// which is the authoritative copy.
if (
specSpec &&
specSpec.dataSchema &&
specSpec.ioConfig &&
specSpec.tuningConfig &&
(spec as any).dataSchema &&
(spec as any).ioConfig &&
(spec as any).tuningConfig
) {
spec = deleteKeys(spec, ['dataSchema', 'ioConfig', 'tuningConfig'] as any[]);
}

// Sometimes the dataSource can (redundantly) make it to the top level for some reason - delete it
if (
typeof specSpec?.dataSchema?.dataSource === 'string' &&
typeof (spec as any).dataSource === 'string'
) {
spec = deleteKeys(spec, ['dataSource'] as any[]);
}

return deleteKeys(spec, ['id', 'groupId', 'resource']);
}

export function upgradeSpec(spec: any, yolo = false): Partial<IngestionSpec> {
Expand Down
7 changes: 5 additions & 2 deletions web-console/src/utils/druid-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,13 @@ export class DruidError extends Error {
}
}

export async function queryDruidRune(runeQuery: Record<string, any>): Promise<any> {
export async function queryDruidRune(
runeQuery: Record<string, any>,
cancelToken?: CancelToken,
): Promise<any> {
let runeResultResp: AxiosResponse;
try {
runeResultResp = await Api.instance.post('/druid/v2', runeQuery);
runeResultResp = await Api.instance.post('/druid/v2', runeQuery, { cancelToken });
} catch (e) {
throw new Error(getDruidErrorMessage(e));
}
Expand Down
9 changes: 6 additions & 3 deletions web-console/src/views/datasources-view/datasources-view.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -426,19 +426,22 @@ GROUP BY 1, 2`;
this.datasourceQueryManager = new QueryManager<DatasourceQuery, DatasourcesAndDefaultRules>({
processQuery: async (
{ capabilities, visibleColumns, showUnused },
_cancelToken,
cancelToken,
setIntermediateQuery,
) => {
let datasources: DatasourceQueryResultRow[];
if (capabilities.hasSql()) {
const query = DatasourcesView.query(visibleColumns);
setIntermediateQuery(query);
datasources = await queryDruidSql({ query });
datasources = await queryDruidSql({ query }, cancelToken);
} else if (capabilities.hasCoordinatorAccess()) {
const datasourcesResp = await Api.instance.get(
'/druid/coordinator/v1/datasources?simple',
{ cancelToken },
);
const loadstatusResp = await Api.instance.get('/druid/coordinator/v1/loadstatus?simple');
const loadstatusResp = await Api.instance.get('/druid/coordinator/v1/loadstatus?simple', {
cancelToken,
});
const loadstatus = loadstatusResp.data;
datasources = datasourcesResp.data.map((d: any): DatasourceQueryResultRow => {
const totalDataSize = deepGet(d, 'properties.segments.size') || -1;
Expand Down
Loading