Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions migrations/1770846546000_import_performance_indexes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import type { Kysely } from "kysely";

export async function up(db: Kysely<any>): Promise<void> {
// Index for faster filtering on needsImport flag during importDataRecords
// This significantly speeds up the WHERE clause: dataSourceId = X AND needsImport = true
await db.schema
.createIndex("dataRecordNeedsImportIndex")
.on("dataRecord")
.columns(["dataSourceId", "needsImport"])
.execute();

// Index for faster filtering on needsEnrich flag during enrichDataRecords
await db.schema
.createIndex("dataRecordNeedsEnrichIndex")
.on("dataRecord")
.columns(["dataSourceId", "needsEnrich"])
.execute();

// Index for faster area lookups by code during geocoding
// This speeds up findAreaByCode queries significantly
await db.schema
.createIndex("areaCodeAreaSetIdIndex")
.on("area")
.columns(["code", "areaSetId"])
.execute();

// Index for faster area set lookups by code during enrichment
await db.schema
.createIndex("areaSetCodeIndex")
.on("areaSet")
.column("code")
.execute();
}

export async function down(db: Kysely<any>): Promise<void> {
await db.schema.dropIndex("dataRecordNeedsImportIndex").execute();
await db.schema.dropIndex("dataRecordNeedsEnrichIndex").execute();
await db.schema.dropIndex("areaCodeAreaSetIdIndex").execute();
await db.schema.dropIndex("areaSetCodeIndex").execute();
}
49 changes: 15 additions & 34 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
"lint": "run-p prettier lint:fix typecheck madge",
"lint:ci": "eslint && tsc --noEmit && prettier --log-level warn --check . && madge --circular --extensions ts,tsx --ts-config tsconfig.json bin src",
"start": "next start",
"test": "./bin/prepare-test-env.sh && NODE_TLS_REJECT_UNAUTHORIZED=0 node --env-file=.env.testing ./node_modules/.bin/vitest"
"test": "./bin/prepare-test-env.sh && NODE_TLS_REJECT_UNAUTHORIZED=0 node --env-file=.env.testing ./node_modules/.bin/vitest --exclude tests/performance/**",
"test:perf": "./bin/prepare-test-env.sh && NODE_TLS_REJECT_UNAUTHORIZED=0 node --env-file=.env.testing ./node_modules/.bin/vitest tests/performance --no-file-parallelism"
},
"dependencies": {
"@dnd-kit/core": "^6.3.1",
Expand Down Expand Up @@ -151,9 +152,9 @@
"vitest": "^3.2.4"
},
"optionalDependencies": {
"@img/sharp-linux-x64": "^0.34.1",
"@floating-ui/core": "^1.7.3",
"@floating-ui/dom": "^1.7.4",
"@img/sharp-linux-x64": "^0.34.1",
"@ngrok/ngrok-linux-x64-gnu": "^1.5",
"@rollup/rollup-linux-x64-gnu": "^4.44",
"lightningcss-linux-x64-gnu": "^1.30.1"
Expand Down
34 changes: 32 additions & 2 deletions src/server/jobs/importDataSource.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { DATA_SOURCE_JOB_BATCH_SIZE } from "@/constants";
import { getDataSourceAdaptor } from "@/server/adaptors";
import { geocodeRecord } from "@/server/mapping/geocode";
import { ColumnType } from "@/server/models/DataSource";
import { bulkFetchPostcodes, geocodeRecord } from "@/server/mapping/geocode";
import { AreaSetCode } from "@/server/models/AreaSet";
import { ColumnType, GeocodingType } from "@/server/models/DataSource";
import {
deleteByDataSourceId,
upsertDataRecords,
Expand All @@ -13,6 +14,7 @@ import {
import logger from "@/server/services/logger";
import { getPubSub } from "@/server/services/pubsub";
import { batchAsync } from "@/server/utils";
import type { PostcodeResult } from "@/server/mapping/geocode";
import type { ColumnDef } from "@/server/models/DataSource";
import type { DataSource } from "@/server/models/DataSource";
import type { ExternalRecord } from "@/types";
Expand Down Expand Up @@ -108,13 +110,41 @@ export const importBatch = async (
dataSource: DataSource,
columnDefsAccumulator: ColumnDef[],
) => {
// Pre-fetch postcodes if using postcode geocoding
let prefetchedPostcodes:
| { results: Map<string, PostcodeResult>; notFound: Set<string> }
| undefined;
if (
dataSource.geocodingConfig.type === GeocodingType.Code &&
"areaSetCode" in dataSource.geocodingConfig &&
dataSource.geocodingConfig.areaSetCode === AreaSetCode.PC &&
"column" in dataSource.geocodingConfig
) {
const column = dataSource.geocodingConfig.column;
const uniquePostcodes = Array.from(
new Set(
batch
.map((record) => {
const pc = String(record.json[column] || "");
return pc ? pc.replace(/\s+/g, "").toUpperCase() : null;
})
.filter((pc): pc is string => pc !== null),
),
);

if (uniquePostcodes.length > 0) {
prefetchedPostcodes = await bulkFetchPostcodes(uniquePostcodes);
}
}

const updatedRecords = await Promise.all(
batch.map(async (record) => {
const { columnDefs, typedJson } = typeJson(record.json);
addColumnDefs(columnDefsAccumulator, columnDefs);
const geocodeResult = await geocodeRecord(
record,
dataSource.geocodingConfig,
prefetchedPostcodes,
);
return {
externalId: record.externalId,
Expand Down
Loading