diff --git a/src/jobManagerClient.ts b/src/jobManagerClient.ts index 4871546..23c00c4 100644 --- a/src/jobManagerClient.ts +++ b/src/jobManagerClient.ts @@ -17,7 +17,6 @@ import { httpClientConfig } from './models/utils'; export class JobManagerClient extends HttpClient { public constructor( protected readonly logger: Logger, - protected jobType: string, protected jobManagerBaseUrl: string, httpRetryConfig: IHttpRetryConfig | undefined = httpClientConfig, targetService: string | undefined = 'jobManagerClient', @@ -144,8 +143,8 @@ export class JobManagerClient extends HttpClient { } } - public async consume(taskType: string): Promise | null> { - const consumeTaskUrl = `/tasks/${this.jobType}/${taskType}/startPending`; + public async consume(jobType: string, taskType: string): Promise | null> { + const consumeTaskUrl = `/tasks/${jobType}/${taskType}/startPending`; try { const taskResponse = await this.post>(consumeTaskUrl); return taskResponse; @@ -157,7 +156,7 @@ export class JobManagerClient extends HttpClient { err, url: consumeTaskUrl, targetService: this.targetService, - jobType: this.jobType, + jobType, taskType: taskType, msg: `failed to consume task`, errorMessage: (err as { message: string }).message, diff --git a/src/taskHandler.ts b/src/taskHandler.ts index 8f977db..15e4285 100644 --- a/src/taskHandler.ts +++ b/src/taskHandler.ts @@ -15,7 +15,6 @@ export class TaskHandler { public constructor( protected readonly logger: Logger, - protected jobType: string, protected jobManagerBaseUrl: string, protected heartbeatUrl: string, protected dequeueIntervalMs: number, @@ -26,7 +25,7 @@ export class TaskHandler { disableJobManagerDebugLogs: boolean | undefined = true, disableHeartbeatDebugLogs: boolean | undefined = true ) { - this.jobManagerClient = new JobManagerClient(logger, jobType, jobManagerBaseUrl, httpRetryConfig, jobTargetService, disableJobManagerDebugLogs); + this.jobManagerClient = new JobManagerClient(logger, jobManagerBaseUrl, httpRetryConfig, jobTargetService, disableJobManagerDebugLogs); this.heartbeatClient = new HeartbeatClient( logger, heartbeatIntervalMs, @@ -37,23 +36,23 @@ export class TaskHandler { ); } - public async waitForTask(taskType: string): Promise> { + public async waitForTask(jobType: string, taskType: string): Promise> { let task: ITaskResponse | null; this.logger.info({ - jobType: this.jobType, + jobType, taskType, - msg: `waitForTask jobType=${this.jobType}, taskType=${taskType}`, + msg: `waitForTask jobType=${jobType}, taskType=${taskType}`, }); do { - task = await this.dequeue(taskType); + task = await this.dequeue(jobType, taskType); await new Promise((resolve) => setTimeout(resolve, this.dequeueIntervalMs)); } while (!task); return task; } - public async dequeue(taskType: string): Promise | null> { + public async dequeue(jobType: string, taskType: string): Promise | null> { try { - const response = await this.jobManagerClient.consume(taskType); + const response = await this.jobManagerClient.consume(jobType, taskType); if (response) { const taskId = response.id; this.heartbeatClient.start(taskId); @@ -65,9 +64,9 @@ export class TaskHandler { } else { this.logger.error({ err, - jobType: this.jobType, + jobType, taskType, - msg: `dequeue FAILED for jobType=${this.jobType}, taskType=${taskType}`, + msg: `dequeue FAILED for jobType=${jobType}, taskType=${taskType}`, errorMessage: (err as { message: string }).message, }); throw err;