Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions .server-changes/parallel-batch-item-ingest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
area: webapp
type: improvement
---

Phase 2 streaming batch ingest (`POST /api/v3/batches/:batchId/items`) now processes
items with bounded concurrency instead of strictly sequentially. Previously each item's
payload offload + enqueue ran one at a time, so batches of many large payloads (each
offloaded to object storage) could take minutes and blow past Node's default 300s
`server.requestTimeout`, surfacing to the SDK as `408 terminated` and burning ~26 min
across the SDK's 5 retries.

Ingestion now uses `p-map` over the NDJSON stream with a configurable concurrency
(`STREAMING_BATCH_INGEST_CONCURRENCY`, default 10), which pulls lazily so at most
`concurrency` items are in flight — bounding peak memory to roughly
`concurrency × STREAMING_BATCH_ITEM_MAXIMUM_SIZE`. Set it to 1 to fall back to fully
sequential ingestion. Ordering and idempotency are unaffected (run order derives from
each item's index, and `enqueueBatchItem` dedups atomically per index); the NDJSON
parser now stamps oversized-item markers with their emit position so the consumer no
longer depends on processing order. Sealing/idempotency behaviour is unchanged.
4 changes: 4 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,10 @@ const EnvironmentSchema = z
// 2-phase batch API settings
STREAMING_BATCH_MAX_ITEMS: z.coerce.number().int().default(1_000), // Max items in streaming batch
STREAMING_BATCH_ITEM_MAXIMUM_SIZE: z.coerce.number().int().default(3_145_728),
// Number of streamed batch items ingested concurrently in Phase 2. Peak
// in-flight memory per request ≈ this × STREAMING_BATCH_ITEM_MAXIMUM_SIZE,
// so raise with care. Set to 1 for fully sequential ingestion.
STREAMING_BATCH_INGEST_CONCURRENCY: z.coerce.number().int().positive().default(10),
BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(100),
BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200),
BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"),
Expand Down
1 change: 1 addition & 0 deletions apps/webapp/app/routes/api.v3.batches.$batchId.items.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
const service = new StreamBatchItemsService();
const result = await service.call(authResult.environment, batchId, itemsIterator, {
maxItemBytes: env.STREAMING_BATCH_ITEM_MAXIMUM_SIZE,
concurrency: env.STREAMING_BATCH_INGEST_CONCURRENCY,
});

return json(result, { status: 200 });
Expand Down
250 changes: 156 additions & 94 deletions apps/webapp/app/runEngine/services/streamBatchItems.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
} from "@trigger.dev/core/v3";
import { BatchId } from "@trigger.dev/core/v3/isomorphic";
import type { BatchItem, RunEngine } from "@internal/run-engine";
import pMap from "p-map";
import type { BatchTaskRunStatus } from "@trigger.dev/database";
import { prisma, type PrismaClientOrTransaction } from "~/db.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
Expand Down Expand Up @@ -55,6 +56,8 @@ export function isIdempotentRetrySuccess(

export type StreamBatchItemsServiceOptions = {
maxItemBytes: number;
/** Max items processed concurrently. The route wires this to STREAMING_BATCH_INGEST_CONCURRENCY. */
concurrency: number;
};

export type OversizedItemMarker = {
Expand All @@ -68,6 +71,8 @@ export type OversizedItemMarker = {
export type StreamBatchItemsServiceConstructorOptions = {
prisma?: PrismaClientOrTransaction;
engine?: RunEngine;
/** Override the payload processor (used in tests to observe ingest concurrency). */
payloadProcessor?: BatchPayloadProcessor;
};

/**
Expand All @@ -88,7 +93,7 @@ export class StreamBatchItemsService extends WithRunEngine {

constructor(opts: StreamBatchItemsServiceConstructorOptions = {}) {
super({ prisma: opts.prisma ?? prisma, engine: opts.engine });
this.payloadProcessor = new BatchPayloadProcessor();
this.payloadProcessor = opts.payloadProcessor ?? new BatchPayloadProcessor();
}

/**
Expand Down Expand Up @@ -170,94 +175,28 @@ export class StreamBatchItemsService extends WithRunEngine {
);
}

// Process items from the stream with bounded concurrency.
//
// Ordering and idempotency do NOT depend on processing order:
// - The BatchQueue derives run order from each item's index
// (enqueue timestamp = batch.createdAt + itemIndex), not enqueue order.
// - enqueueBatchItem() dedups atomically per index.
// We cap concurrency to bound peak in-flight memory (≈ concurrency ×
// maxItemBytes) and to keep backpressure on the request body stream.
// p-map pulls lazily from the async iterator — at most `concurrency`
// items are read and in flight at once. stopOnError aborts ingestion on
// the first failure (the batch is left unsealed; the SDK's retry
// re-streams and dedups already-enqueued items).
const outcomes = await pMap(
itemsIterator,
(rawItem) => this.#processItem(rawItem, batchId, environment, batch.runCount),
{ concurrency: options.concurrency, stopOnError: true }
);

let itemsAccepted = 0;
let itemsDeduplicated = 0;
let lastIndex = -1;

// Process items from the stream
for await (const rawItem of itemsIterator) {
// Check for oversized item markers from the NDJSON parser
if (rawItem && typeof rawItem === "object" && "__batchItemError" in rawItem) {
const marker = rawItem as OversizedItemMarker;
const itemIndex = marker.index >= 0 ? marker.index : lastIndex + 1;

const errorMessage = `Batch item payload is too large (${(marker.actualSize / 1024).toFixed(1)} KB). Maximum allowed size is ${(marker.maxSize / 1024).toFixed(1)} KB. Reduce the payload size or offload large data to external storage.`;

// Enqueue with __error metadata - processItemCallback will detect this
// and use TriggerFailedTaskService to create a pre-failed run
const batchItem: BatchItem = {
task: marker.task,
payload: "{}",
payloadType: "application/json",
options: {
__error: errorMessage,
__errorCode: "PAYLOAD_TOO_LARGE",
},
};

const result = await this._engine.enqueueBatchItem(
batchId,
environment.id,
itemIndex,
batchItem
);

if (result.enqueued) {
itemsAccepted++;
} else {
itemsDeduplicated++;
}
lastIndex = itemIndex;
continue;
}

// Parse and validate the item
const parseResult = BatchItemNDJSONSchema.safeParse(rawItem);
if (!parseResult.success) {
throw new ServiceValidationError(
`Invalid item at index ${lastIndex + 1}: ${parseResult.error.message}`
);
}

const item = parseResult.data;
lastIndex = item.index;

// Validate index is within expected range
if (item.index >= batch.runCount) {
throw new ServiceValidationError(
`Item index ${item.index} exceeds batch runCount ${batch.runCount}`
);
}

// Get the original payload type
const originalPayloadType = (item.options?.payloadType as string) ?? "application/json";

// Process payload - offload to R2 if it exceeds threshold
const processedPayload = await this.payloadProcessor.process(
item.payload,
originalPayloadType,
batchId,
item.index,
environment
);

// Convert to BatchItem format with potentially offloaded payload
const batchItem: BatchItem = {
task: item.task,
payload: processedPayload.payload,
payloadType: processedPayload.payloadType,
options: item.options,
};

// Enqueue the item
const result = await this._engine.enqueueBatchItem(
batchId,
environment.id,
item.index,
batchItem
);

if (result.enqueued) {
for (const outcome of outcomes) {
if (outcome === "accepted") {
itemsAccepted++;
} else {
itemsDeduplicated++;
Expand Down Expand Up @@ -446,6 +385,112 @@ export class StreamBatchItemsService extends WithRunEngine {
}
);
}

/**
* Process a single streamed batch item: validate it, offload its payload to
* object storage if oversized, and enqueue it. Returns whether the item was
* newly enqueued ("accepted") or was a duplicate ("deduplicated"). Throws
* ServiceValidationError for invalid items, which aborts the stream.
*
* Safe to run concurrently: enqueueBatchItem() is atomic and order-independent
* per item index, and each item carries its own index (real items from the
* SDK; oversized markers are stamped by the NDJSON parser).
*/
async #processItem(
rawItem: unknown,
batchId: string,
environment: AuthenticatedEnvironment,
runCount: number
): Promise<"accepted" | "deduplicated"> {
// Oversized item marker emitted by the NDJSON parser
if (rawItem && typeof rawItem === "object" && "__batchItemError" in rawItem) {
const marker = rawItem as OversizedItemMarker;

// Same out-of-range guard as normal items: an oversized item with an
// out-of-range index must 4xx rather than create a stray pre-failed run.
if (marker.index >= runCount) {
throw new ServiceValidationError(
`Item index ${marker.index} exceeds batch runCount ${runCount}`
);
}

const errorMessage = `Batch item payload is too large (${(marker.actualSize / 1024).toFixed(
1
)} KB). Maximum allowed size is ${(marker.maxSize / 1024).toFixed(
1
)} KB. Reduce the payload size or offload large data to external storage.`;

// Enqueue with __error metadata - processItemCallback will detect this
// and use TriggerFailedTaskService to create a pre-failed run
const batchItem: BatchItem = {
task: marker.task,
payload: "{}",
payloadType: "application/json",
options: {
__error: errorMessage,
__errorCode: "PAYLOAD_TOO_LARGE",
},
};

const result = await this._engine.enqueueBatchItem(
batchId,
environment.id,
marker.index,
batchItem
);

return result.enqueued ? "accepted" : "deduplicated";
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// Parse and validate the item
const parseResult = BatchItemNDJSONSchema.safeParse(rawItem);
if (!parseResult.success) {
const rawIndex = (rawItem as { index?: unknown } | null)?.index;
const where = typeof rawIndex === "number" ? `index ${rawIndex}` : "unknown index";
throw new ServiceValidationError(
`Invalid item at ${where}: ${parseResult.error.message}`
);
}

const item = parseResult.data;

// Validate index is within expected range
if (item.index >= runCount) {
throw new ServiceValidationError(
`Item index ${item.index} exceeds batch runCount ${runCount}`
);
}

// Get the original payload type
const originalPayloadType = (item.options?.payloadType as string) ?? "application/json";

// Process payload - offload to object storage if it exceeds threshold
const processedPayload = await this.payloadProcessor.process(
item.payload,
originalPayloadType,
batchId,
item.index,
environment
);

// Convert to BatchItem format with potentially offloaded payload
const batchItem: BatchItem = {
task: item.task,
payload: processedPayload.payload,
payloadType: processedPayload.payloadType,
options: item.options,
};

// Enqueue the item
const result = await this._engine.enqueueBatchItem(
batchId,
environment.id,
item.index,
batchItem
);

return result.enqueued ? "accepted" : "deduplicated";
}
}

/**
Expand Down Expand Up @@ -587,12 +632,29 @@ export function createNdjsonParserStream(
let chunks: Uint8Array[] = [];
let totalBytes = 0;
let lineNumber = 0;
// 0-based position of the next object we emit (parsed item or oversized
// marker). The parser is the single sequential point in the pipeline, so this
// is the authoritative source of item ordering — downstream consumers can
// process items concurrently and must not rely on processing order to derive
// an item's index. Used to back-fill an oversized marker's index when it
// couldn't be extracted from the (truncated) raw bytes.
let emittedCount = 0;
// When an oversized incomplete line is detected (Case 2), we must discard
// all remaining bytes of that line until the next newline delimiter.
let skipUntilNewline = false;

const NEWLINE_BYTE = 0x0a; // '\n'

/**
* Emit a parsed object or marker downstream and advance the emit position.
* Every emitted object MUST go through here so `emittedCount` stays aligned
* with item position (empty/skipped lines never emit, so they don't count).
*/
function emit(controller: TransformStreamDefaultController<unknown>, obj: unknown): void {
controller.enqueue(obj);
emittedCount++;
}

/**
* Concatenate all chunks into a single Uint8Array
*/
Expand Down Expand Up @@ -675,7 +737,7 @@ export function createNdjsonParserStream(

try {
const obj = JSON.parse(trimmed);
controller.enqueue(obj);
emit(controller, obj);
} catch (err) {
throw new Error(`Invalid JSON at line ${lineNumber}: ${(err as Error).message}`);
}
Expand Down Expand Up @@ -715,12 +777,12 @@ export function createNdjsonParserStream(
const extracted = extractIndexAndTask(lineBytes);
const marker: OversizedItemMarker = {
__batchItemError: "OVERSIZED",
index: extracted.index,
index: extracted.index >= 0 ? extracted.index : emittedCount,
task: extracted.task,
actualSize: newlineIndex,
maxSize: maxItemBytes,
};
controller.enqueue(marker);
emit(controller, marker);
lineNumber++;
continue;
}
Expand All @@ -736,12 +798,12 @@ export function createNdjsonParserStream(
const extracted = extractIndexAndTask(concatenateChunks());
const marker: OversizedItemMarker = {
__batchItemError: "OVERSIZED",
index: extracted.index,
index: extracted.index >= 0 ? extracted.index : emittedCount,
task: extracted.task,
actualSize: totalBytes,
maxSize: maxItemBytes,
};
controller.enqueue(marker);
emit(controller, marker);
lineNumber++;
// Clear buffer and skip remaining bytes of this oversized line
// until the next newline delimiter is found in a subsequent chunk
Expand All @@ -768,12 +830,12 @@ export function createNdjsonParserStream(
const extracted = extractIndexAndTask(concatenateChunks());
const marker: OversizedItemMarker = {
__batchItemError: "OVERSIZED",
index: extracted.index,
index: extracted.index >= 0 ? extracted.index : emittedCount,
task: extracted.task,
actualSize: totalBytes,
maxSize: maxItemBytes,
};
controller.enqueue(marker);
emit(controller, marker);
return;
}

Expand Down
Loading