Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,9 @@ export class RunEngineTriggerTaskService {

if (!queueSizeGuard.ok) {
throw new ServiceValidationError(
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
undefined,
"warn"
);
}
}
Expand Down
15 changes: 14 additions & 1 deletion apps/webapp/app/v3/scheduleEngine.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { ScheduleEngine } from "@internal/schedule-engine";
import type { TriggerScheduledTaskErrorType } from "@internal/schedule-engine";
import { stringifyIO } from "@trigger.dev/core/v3";
import { prisma } from "~/db.server";
import { env } from "~/env.server";
Expand All @@ -8,6 +9,7 @@ import { singleton } from "~/utils/singleton";
import { TriggerTaskService } from "./services/triggerTask.server";
import { meter, tracer } from "./tracer.server";
import { workerQueue } from "~/services/worker.server";
import { ServiceValidationError } from "./services/common.server";

export const scheduleEngine = singleton("ScheduleEngine", createScheduleEngine);

Expand Down Expand Up @@ -111,9 +113,20 @@ function createScheduleEngine() {

return { success: !!result };
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
let errorType: TriggerScheduledTaskErrorType = "SYSTEM_ERROR";

if (
error instanceof ServiceValidationError &&
errorMessage.includes("queue size limit for this environment has been reached")
) {
errorType = "QUEUE_LIMIT";
}

return {
success: false,
error: error instanceof Error ? error.message : String(error),
error: errorMessage,
errorType,
};
}
},
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/services/batchTriggerV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,9 @@ export class BatchTriggerV3Service extends BaseService {

if (!queueSizeGuard.isWithinLimits) {
throw new ServiceValidationError(
`Cannot trigger ${newRunCount} tasks as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
`Cannot trigger ${newRunCount} tasks as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
undefined,
"warn"
);
}

Expand Down
8 changes: 7 additions & 1 deletion apps/webapp/app/v3/services/common.server.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
export type ServiceValidationErrorLevel = "error" | "warn" | "info";

export class ServiceValidationError extends Error {
constructor(message: string, public status?: number) {
constructor(
message: string,
public status?: number,
public logLevel?: ServiceValidationErrorLevel
) {
super(message);
this.name = "ServiceValidationError";
}
Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/services/triggerTaskV1.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ export class TriggerTaskServiceV1 extends BaseService {

if (!queueSizeGuard.isWithinLimits) {
throw new ServiceValidationError(
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
undefined,
"warn"
);
}
}
Expand Down
5 changes: 4 additions & 1 deletion internal-packages/run-engine/src/engine/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ export function runStatusFromError(
}
}

export type ServiceValidationErrorLevel = "error" | "warn" | "info";

export class ServiceValidationError extends Error {
constructor(
message: string,
public status?: number,
public metadata?: Record<string, unknown>
public metadata?: Record<string, unknown>,
public logLevel?: ServiceValidationErrorLevel
) {
super(message);
this.name = "ServiceValidationError";
Expand Down
10 changes: 5 additions & 5 deletions internal-packages/run-engine/src/run-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2545,7 +2545,7 @@ export class RunQueue {
return;
}

this.logger.info("Processing concurrency keys from stream", {
this.logger.debug("Processing concurrency keys from stream", {
keys: uniqueKeys,
});

Expand Down Expand Up @@ -2615,22 +2615,22 @@ export class RunQueue {
}

private async processCurrentConcurrencyRunIds(concurrencyKey: string, runIds: string[]) {
this.logger.info("Processing concurrency set with runs", {
this.logger.debug("Processing concurrency set with runs", {
concurrencyKey,
runIds: runIds.slice(0, 5), // Log first 5 for debugging,
runIds: runIds.slice(0, 5),
runIdsLength: runIds.length,
});

// Call the callback to determine which runs are completed
const completedRuns = await this.options.concurrencySweeper?.callback(runIds);

if (!completedRuns) {
this.logger.info("No completed runs found in concurrency set", { concurrencyKey });
this.logger.debug("No completed runs found in concurrency set", { concurrencyKey });
return;
}

if (completedRuns.length === 0) {
this.logger.info("No completed runs found in concurrency set", { concurrencyKey });
this.logger.debug("No completed runs found in concurrency set", { concurrencyKey });
return;
}

Expand Down
25 changes: 18 additions & 7 deletions internal-packages/schedule-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -497,17 +497,28 @@ export class ScheduleEngine {

span.setAttribute("trigger_success", true);
} else {
this.logger.error("Failed to trigger scheduled task", {
instanceId: params.instanceId,
taskIdentifier: instance.taskSchedule.taskIdentifier,
durationMs: triggerDuration,
error: result.error,
});
const isQueueLimit = result.errorType === "QUEUE_LIMIT";

if (isQueueLimit) {
this.logger.warn("Scheduled task trigger skipped due to queue limit", {
instanceId: params.instanceId,
taskIdentifier: instance.taskSchedule.taskIdentifier,
durationMs: triggerDuration,
error: result.error,
});
} else {
this.logger.error("Failed to trigger scheduled task", {
instanceId: params.instanceId,
taskIdentifier: instance.taskSchedule.taskIdentifier,
durationMs: triggerDuration,
error: result.error,
});
}

this.scheduleExecutionFailureCounter.add(1, {
environment_type: environmentType,
schedule_type: scheduleType,
error_type: "task_failure",
error_type: isQueueLimit ? "queue_limit" : "task_failure",
});

span.setAttribute("trigger_success", false);
Expand Down
8 changes: 7 additions & 1 deletion internal-packages/schedule-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,14 @@ export type TriggerScheduledTaskParams = {
exactScheduleTime?: Date;
};

export type TriggerScheduledTaskErrorType = "QUEUE_LIMIT" | "SYSTEM_ERROR";

export interface TriggerScheduledTaskCallback {
(params: TriggerScheduledTaskParams): Promise<{ success: boolean; error?: string }>;
(params: TriggerScheduledTaskParams): Promise<{
success: boolean;
error?: string;
errorType?: TriggerScheduledTaskErrorType;
}>;
}

export interface ScheduleEngineOptions {
Expand Down
1 change: 1 addition & 0 deletions internal-packages/schedule-engine/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export type {
ScheduleEngineOptions,
TriggerScheduleParams,
TriggerScheduledTaskCallback,
TriggerScheduledTaskErrorType,
} from "./engine/types.js";
82 changes: 48 additions & 34 deletions packages/redis-worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -745,23 +745,25 @@ class Worker<TCatalog extends WorkerCatalog> {
).catch(async (error) => {
const errorMessage = error instanceof Error ? error.message : String(error);
const shouldLogError = catalogItem.logErrors ?? true;
const errorLogLevel =
error && typeof error === "object" && "logLevel" in error ? error.logLevel : undefined;

if (shouldLogError) {
this.logger.error(`Worker error processing batch`, {
name: this.options.name,
jobType,
batchSize: items.length,
error,
errorMessage,
});
const logAttributes = {
name: this.options.name,
jobType,
batchSize: items.length,
error,
errorMessage,
};

if (!shouldLogError) {
this.logger.info(`Worker failed to process batch`, logAttributes);
} else if (errorLogLevel === "warn") {
this.logger.warn(`Worker error processing batch`, logAttributes);
} else if (errorLogLevel === "info") {
this.logger.info(`Worker error processing batch`, logAttributes);
} else {
this.logger.info(`Worker failed to process batch`, {
name: this.options.name,
jobType,
batchSize: items.length,
error,
errorMessage,
});
this.logger.error(`Worker error processing batch`, logAttributes);
}

// Re-enqueue each item individually with retry logic
Expand All @@ -775,20 +777,21 @@ class Worker<TCatalog extends WorkerCatalog> {
const retryDelay = calculateNextRetryDelay(retrySettings, newAttempt);

if (!retryDelay) {
if (shouldLogError) {
this.logger.error(`Worker batch item reached max attempts. Moving to DLQ.`, {
name: this.options.name,
id: item.id,
jobType,
attempt: newAttempt,
});
const dlqLogAttributes = {
name: this.options.name,
id: item.id,
jobType,
attempt: newAttempt,
};

if (!shouldLogError) {
this.logger.info(`Worker batch item reached max attempts. Moving to DLQ.`, dlqLogAttributes);
} else if (errorLogLevel === "warn") {
this.logger.warn(`Worker batch item reached max attempts. Moving to DLQ.`, dlqLogAttributes);
} else if (errorLogLevel === "info") {
this.logger.info(`Worker batch item reached max attempts. Moving to DLQ.`, dlqLogAttributes);
} else {
this.logger.info(`Worker batch item reached max attempts. Moving to DLQ.`, {
name: this.options.name,
id: item.id,
jobType,
attempt: newAttempt,
});
this.logger.error(`Worker batch item reached max attempts. Moving to DLQ.`, dlqLogAttributes);
}

await this.queue.moveToDeadLetterQueue(item.id, errorMessage);
Expand Down Expand Up @@ -895,6 +898,8 @@ class Worker<TCatalog extends WorkerCatalog> {
const errorMessage = error instanceof Error ? error.message : String(error);

const shouldLogError = catalogItem.logErrors ?? true;
const errorLogLevel =
error && typeof error === "object" && "logLevel" in error ? error.logLevel : undefined;

const logAttributes = {
name: this.options.name,
Expand All @@ -906,10 +911,14 @@ class Worker<TCatalog extends WorkerCatalog> {
errorMessage,
};

if (shouldLogError) {
this.logger.error(`Worker error processing item`, logAttributes);
} else {
if (!shouldLogError) {
this.logger.info(`Worker failed to process item`, logAttributes);
} else if (errorLogLevel === "warn") {
this.logger.warn(`Worker error processing item`, logAttributes);
} else if (errorLogLevel === "info") {
this.logger.info(`Worker error processing item`, logAttributes);
} else {
this.logger.error(`Worker error processing item`, logAttributes);
}

// Attempt requeue logic.
Expand All @@ -922,13 +931,18 @@ class Worker<TCatalog extends WorkerCatalog> {
const retryDelay = calculateNextRetryDelay(retrySettings, newAttempt);

if (!retryDelay) {
if (shouldLogError) {
this.logger.error(`Worker item reached max attempts. Moving to DLQ.`, {
if (!shouldLogError || errorLogLevel === "info") {
this.logger.info(`Worker item reached max attempts. Moving to DLQ.`, {
...logAttributes,
attempt: newAttempt,
});
} else if (errorLogLevel === "warn") {
this.logger.warn(`Worker item reached max attempts. Moving to DLQ.`, {
...logAttributes,
attempt: newAttempt,
});
} else {
this.logger.info(`Worker item reached max attempts. Moving to DLQ.`, {
this.logger.error(`Worker item reached max attempts. Moving to DLQ.`, {
...logAttributes,
attempt: newAttempt,
});
Expand Down
Loading