diff --git a/apps/webapp/app/components/runs/v3/CancelRunDialog.tsx b/apps/webapp/app/components/runs/v3/CancelRunDialog.tsx index facff746c5e..566bc787daa 100644 --- a/apps/webapp/app/components/runs/v3/CancelRunDialog.tsx +++ b/apps/webapp/app/components/runs/v3/CancelRunDialog.tsx @@ -10,9 +10,18 @@ import { SpinnerWhite } from "~/components/primitives/Spinner"; type CancelRunDialogProps = { runFriendlyId: string; redirectPath: string; + // Fired on submit so the parent can close the Radix Dialog without + // wrapping the submit button in `DialogClose` — that wrapper races + // submit (close fires first, unmounts the form, and the cancel POST + // never lands). Optional so existing call sites still type-check. + onCancelSubmitted?: () => void; }; -export function CancelRunDialog({ runFriendlyId, redirectPath }: CancelRunDialogProps) { +export function CancelRunDialog({ + runFriendlyId, + redirectPath, + onCancelSubmitted, +}: CancelRunDialogProps) { const navigation = useNavigation(); const formAction = `/resources/taskruns/${runFriendlyId}/cancel`; @@ -27,7 +36,11 @@ export function CancelRunDialog({ runFriendlyId, redirectPath }: CancelRunDialog +
onCancelSubmitted?.()} + > - - - + )} @@ -587,6 +675,35 @@ function TraceView({ ); } +// Controlled wrapper around the cancel dialog. Owns the Radix open state +// so the dialog closes itself once the cancel action transitions through +// submission. We can't ``-wrap the submit button +// because Radix's onClick handler swallows the button's name=value pair +// that the form action depends on for `redirectUrl`. +function ControlledCancelRunDialog({ + runFriendlyId, + redirectPath, +}: { + runFriendlyId: string; + redirectPath: string; +}) { + const [open, setOpen] = useState(false); + return ( + + + + + setOpen(false)} + /> + + ); +} + function NoLogsView({ run, resizable }: Pick) { const plan = useCurrentPlan(); const organization = useOrganization(); @@ -616,6 +733,11 @@ function NoLogsView({ run, resizable }: Pick) { >
{daysSinceCompleted === undefined ? ( + // NoLogsView only renders when the loader returns no trace. + // Buffered runs always carry a synthetic trace (see + // buildSyntheticTraceForBufferedRun) so they never reach + // this branch — the message here is the pre-mollifier + // copy for runs with no completedAt and no logs. We tidy up older logs to keep things running smoothly. diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx index 09f3f33fcb3..7e825fe303d 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx @@ -120,6 +120,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { try { const result = await presenter.call({ projectSlug: projectParam, + envSlug: envParam, spanId: spanParam, runFriendlyId: runParam, userId, @@ -1021,6 +1022,10 @@ function RunBody({ Admin only + + Buffered + {run.isBuffered ? "Yes" : "No"} + Worker queue {run.workerQueue} @@ -1096,7 +1101,7 @@ function RunBody({ {run.isCached ? "Jump to original run" : "Focus on run"} )} - + {!run.isBuffered && }
{run.logsDeletedAt === null ? ( diff --git a/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts b/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts index 5c7725c510b..c2a6fa9590c 100644 --- a/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts +++ b/apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts @@ -9,6 +9,8 @@ import { formatDurationMilliseconds } from "@trigger.dev/core/v3/utils/durations import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server"; import { TaskEventKind } from "@trigger.dev/database"; import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; +import { deserialiseMollifierSnapshot } from "~/v3/mollifier/mollifierSnapshot.server"; export async function loader({ params, request }: LoaderFunctionArgs) { const user = await requireUser(request); @@ -30,6 +32,67 @@ export async function loader({ params, request }: LoaderFunctionArgs) { }); if (!run || !run.organizationId) { + // Buffered run? It hasn't executed, so there are no events to + // stream — but a 404 is wrong: the run does exist, the customer's + // "Download logs" button on the run-detail page generates this + // exact URL, and a 404 reads as "your run vanished" rather than + // "no logs yet". Verify the entry exists in the buffer (with the + // user as a member of the entry's org), and if so stream a single + // informational line in the same ` + // ` shape `formatRunEvent` uses below — so a downstream + // log viewer / grep over the downloaded file produces a + // meaningful explanation, not a 0-byte mystery. + const buffer = getMollifierBuffer(); + if (buffer) { + const entry = await buffer.getEntry(parsedParams.runParam); + if (entry) { + const member = await prisma.orgMember.findFirst({ + where: { userId: user.id, organizationId: entry.orgId }, + select: { id: true }, + }); + if (member) { + let taskIdentifier: string | undefined; + try { + // Use the shared webapp wrapper rather than raw JSON.parse so + // every read-side module shares a single deserialisation path + // (see contract comment in `mollifierSnapshot.server.ts` and + // `syntheticRedirectInfo.server.ts`). Keeps behaviour + // consistent if the snapshot encoding ever changes. + const snapshot = deserialiseMollifierSnapshot(entry.payload) as { + taskIdentifier?: unknown; + }; + if (typeof snapshot.taskIdentifier === "string") { + taskIdentifier = snapshot.taskIdentifier; + } + } catch { + // Fall through — taskIdentifier stays undefined. + } + const placeholderParts = [ + entry.createdAt.toISOString(), + ...(taskIdentifier ? [taskIdentifier] : []), + "INFO", + "Run is queued, has not started executing yet — no logs to download.", + ]; + const placeholder = placeholderParts.join(" ") + "\n"; + const placeholderReadable = new Readable({ + read() { + this.push(placeholder); + this.push(null); + }, + }); + const gzipStream = createGzip(); + const compressed = placeholderReadable.pipe(gzipStream); + return new Response(compressed as any, { + status: 200, + headers: { + "Content-Type": "application/octet-stream", + "Content-Disposition": `attachment; filename="${parsedParams.runParam}.log"`, + "Content-Encoding": "gzip", + }, + }); + } + } + } return new Response("Not found", { status: 404 }); } diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts index 240d7d3d8ed..fa6ee29f3db 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.cancel.ts @@ -6,6 +6,7 @@ import { redirectWithErrorMessage, redirectWithSuccessMessage } from "~/models/m import { logger } from "~/services/logger.server"; import { requireUserId } from "~/services/session.server"; import { CancelTaskRunService } from "~/v3/services/cancelTaskRun.server"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; export const cancelSchema = z.object({ redirectUrl: z.string(), @@ -42,15 +43,56 @@ export const action: ActionFunction = async ({ request, params }) => { }, }); - if (!taskRun) { + if (taskRun) { + const cancelRunService = new CancelTaskRunService(); + await cancelRunService.call(taskRun); + return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`); + } + + // PG miss — try the mollifier buffer. The customer can hit cancel + // on a buffered run from the dashboard during the burst window. + // Snapshot a `mark_cancelled` patch; the drainer's + // bifurcation routes the run to `engine.createCancelledRun` on + // next pop. + const buffer = getMollifierBuffer(); + const entry = buffer ? await buffer.getEntry(runParam) : null; + if (!entry) { submission.error = { runParam: ["Run not found"] }; return json(submission); } - const cancelRunService = new CancelTaskRunService(); - await cancelRunService.call(taskRun); + // Dashboard auth: verify the requesting user is a member of the + // buffered run's org. The API path scopes by env id from the + // authenticated request; the dashboard route uses org-membership + // because the URL doesn't carry an envId. + const member = await prisma.orgMember.findFirst({ + where: { userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) { + submission.error = { runParam: ["Run not found"] }; + return json(submission); + } - return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`); + const result = await buffer!.mutateSnapshot(runParam, { + type: "mark_cancelled", + cancelledAt: new Date().toISOString(), + cancelReason: "Canceled by user", + }); + if (result === "applied_to_snapshot") { + return redirectWithSuccessMessage(submission.value.redirectUrl, request, `Canceled run`); + } + // "not_found" or "busy" — both indicate the drainer raced us between + // the getEntry check above and mutateSnapshot. On "not_found" the + // entry was just popped and the PG row is in flight; on "busy" the + // drainer is mid-materialisation. Either way the customer should + // retry — by then the PG row exists and the regular cancel path at + // the top of this action takes over. + return redirectWithErrorMessage( + submission.value.redirectUrl, + request, + "Run is materialising — retry in a moment" + ); } catch (error) { if (error instanceof Error) { logger.error("Failed to cancel run", { diff --git a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts index 8a22822d06b..507d3cc706f 100644 --- a/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts +++ b/apps/webapp/app/routes/resources.taskruns.$runParam.replay.ts @@ -11,6 +11,12 @@ import { requireUser } from "~/services/session.server"; import { sortEnvironments } from "~/utils/environmentSort"; import { v3RunSpanPath } from "~/utils/pathBuilder"; import { ReplayTaskRunService } from "~/v3/services/replayTaskRun.server"; +import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server"; +import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server"; +import { + buildSyntheticReplayTaskRun, + type SyntheticReplayTaskRun, +} from "~/v3/mollifier/syntheticReplayTaskRun.server"; import parseDuration from "parse-duration"; import { findCurrentWorkerDeployment } from "~/v3/models/workerDeployment.server"; import { queueTypeFromType } from "~/presenters/v3/QueueRetrievePresenter.server"; @@ -33,7 +39,7 @@ export async function loader({ request, params }: LoaderFunctionArgs) { Object.fromEntries(new URL(request.url).searchParams) ); - const run = await $replica.taskRun.findFirst({ + let run = await $replica.taskRun.findFirst({ select: { payload: true, payloadType: true, @@ -88,6 +94,83 @@ export async function loader({ request, params }: LoaderFunctionArgs) { where: { friendlyId: runParam, project: { organization: { members: { some: { userId } } } } }, }); + let synthetic: + | (Awaited> & { __synth: true }) + | undefined; + if (!run) { + // Buffered fallback: read the snapshot and look up the env list via + // the snapshot's organizationId. Without this the Replay dialog + // 404s for runs queued in the mollifier buffer, which dumps the + // user back to the task list. + const buffer = getMollifierBuffer(); + const entry = buffer ? await buffer.getEntry(runParam) : null; + if (!entry) throw new Response("Not Found", { status: 404 }); + const member = await prisma.orgMember.findFirst({ + where: { userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) throw new Response("Not Found", { status: 404 }); + const buffered = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: entry.envId, + organizationId: entry.orgId, + }); + if (!buffered) throw new Response("Not Found", { status: 404 }); + synthetic = Object.assign(buffered, { __synth: true as const }); + // Scope the project lookup to the buffer entry's org as well as the + // env id. The prior `orgMember.findFirst` above confirms the user + // belongs to `entry.orgId`; pinning `organizationId` here means a + // malformed entry whose envId resolves to a different org can't leak + // that project's data through this loader. Mirrors the PG path's + // `project.organization.members.some.userId` scoping (lines 42-95) + // — the env filter and select shape are kept identical so the Replay + // dialog renders the same dropdown either way. + const orgProject = await $replica.project.findFirst({ + where: { + organizationId: entry.orgId, + environments: { some: { id: entry.envId } }, + }, + select: { + slug: true, + environments: { + select: { + id: true, + type: true, + slug: true, + branchName: true, + orgMember: { select: { user: true } }, + }, + where: { + archivedAt: null, + OR: [ + { type: { in: ["PREVIEW", "STAGING", "PRODUCTION"] } }, + { type: "DEVELOPMENT", orgMember: { userId } }, + ], + }, + }, + }, + }); + if (!orgProject) throw new Response("Not Found", { status: 404 }); + run = { + payload: buffered.payload, + payloadType: buffered.payloadType ?? "application/json", + seedMetadata: buffered.seedMetadata ?? null, + seedMetadataType: buffered.seedMetadataType ?? null, + runtimeEnvironmentId: entry.envId, + concurrencyKey: buffered.concurrencyKey ?? null, + maxAttempts: buffered.maxAttempts ?? null, + maxDurationInSeconds: buffered.maxDurationInSeconds ?? null, + machinePreset: buffered.machinePreset ?? null, + workerQueue: buffered.workerQueue ?? null, + ttl: buffered.ttl ?? null, + idempotencyKey: buffered.idempotencyKey ?? null, + runTags: buffered.runTags, + queue: buffered.queue ?? "task/", + taskIdentifier: buffered.taskIdentifier ?? "", + project: orgProject, + } as unknown as typeof run; + } + if (!run) { throw new Response("Not Found", { status: 404 }); } @@ -164,6 +247,15 @@ export async function loader({ request, params }: LoaderFunctionArgs) { } export const action: ActionFunction = async ({ request, params }) => { + // Dashboard auth: identical pattern to resources.taskruns.$runParam.cancel.ts. + // The loader above this action already gates with `requireUser`, but + // Remix's action runs independently — without this call any request + // with a valid runParam could submit a replay. The PG findFirst below + // also adds the org-membership filter so a PAT can't replay another + // org's run, and the buffered fallback verifies org membership via + // orgMember.findFirst against the snapshot's orgId. + const user = await requireUser(request); + const userId = user.id; const { runParam } = ParamSchema.parse(params); const formData = await request.formData(); @@ -174,9 +266,18 @@ export const action: ActionFunction = async ({ request, params }) => { } try { - const taskRun = await prisma.taskRun.findFirst({ + const pgRun = await prisma.taskRun.findFirst({ where: { friendlyId: runParam, + project: { + organization: { + members: { + some: { + userId, + }, + }, + }, + }, }, include: { runtimeEnvironment: { @@ -192,6 +293,50 @@ export const action: ActionFunction = async ({ request, params }) => { }, }); + // Mollifier read-fallback: if the original isn't in PG yet, + // synthesise a TaskRun from the buffered snapshot. The B4-extended + // SyntheticRun carries every field ReplayTaskRunService reads. We + // also need projectSlug + orgSlug + envSlug for the redirect path, + // so look those up via the snapshot's runtimeEnvironmentId. + let taskRun: SyntheticReplayTaskRun | null = pgRun ?? null; + if (!taskRun) { + const buffer = getMollifierBuffer(); + const entry = buffer ? await buffer.getEntry(runParam) : null; + if (entry) { + // Same org-membership gate as the PG path above. Without this + // any authenticated user who knows a runId could replay the + // buffered run across orgs. + const member = await prisma.orgMember.findFirst({ + where: { userId, organizationId: entry.orgId }, + select: { id: true }, + }); + if (!member) { + return redirectWithErrorMessage( + submission.value.failedRedirect, + request, + "Run not found" + ); + } + const synthetic = await findRunByIdWithMollifierFallback({ + runId: runParam, + environmentId: entry.envId, + organizationId: entry.orgId, + }); + if (synthetic) { + const envRow = await prisma.runtimeEnvironment.findFirst({ + where: { id: entry.envId }, + select: { + slug: true, + project: { select: { slug: true, organization: { select: { slug: true } } } }, + }, + }); + if (envRow) { + taskRun = buildSyntheticReplayTaskRun({ synthetic, envRow }); + } + } + } + } + if (!taskRun) { return redirectWithErrorMessage(submission.value.failedRedirect, request, "Run not found"); } diff --git a/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts index 28739b00fe6..3fe13d572a2 100644 --- a/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts +++ b/apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts @@ -7,6 +7,7 @@ import type { MollifierDrainerTerminalFailureHandler, } from "@trigger.dev/redis-worker"; import { logger } from "~/services/logger.server"; +import { recordRunDebugLog } from "~/v3/eventRepository/index.server"; import { PerformTaskRunAlertsService } from "~/v3/services/alerts/performTaskRunAlerts.server"; import { startSpan } from "~/v3/tracing.server"; import type { MollifierSnapshot } from "./mollifierSnapshot.server"; @@ -129,8 +130,10 @@ export function createDrainerHandler(deps: { span.setAttribute("mollifier.run_friendly_id", input.runId); span.setAttribute("taskRunId", input.runId); + let triggerSucceeded = false; try { await deps.engine.trigger(input.payload as any, deps.prisma); + triggerSucceeded = true; } catch (err) { // The retryable-PG class re-throws so the drainer's outer // worker loop can `buffer.requeue` (handled in @@ -179,6 +182,54 @@ export function createDrainerHandler(deps: { throw err; } } + + // Admin-only audit trail emitted once engine.trigger has + // landed a PG row. `recordRunDebugLog` flips this to the + // admin-gated debug kind (TaskEventKind.LOG in the PG store / + // DEBUG_EVENT in the ClickHouse store) which the trace view + + // logs download already strip for non-admins + // (`eventRepository.server.ts:108`, + // `resources.runs.$runParam.logs.download.ts:118`). + // + // Placement: emit as a zero-duration marker AT materialisation + // time, not as a back-dated bar spanning the buffered window. + // `engine.trigger` rewrites the run's root span at + // materialisation (it adopts the synth root via traceId/spanId + // carryover but updates start_time to "now"), so the trace + // renderer treats materialisation time as t=0. A back-dated + // event with startTime = bufferedAt would land before that t=0 + // and get clipped from the tree. Same pattern as the + // `[engine] QUEUED` markers. The window itself is preserved + // in metadata so admins can read it off the span detail pane. + // + // Best-effort: `recordRunDebugLog` has its own try/catch and + // returns a result, so it never throws into the materialisation + // path. Failures are logged but not surfaced because the + // customer-visible run has already landed. + if (triggerSucceeded) { + const debugResult = await recordRunDebugLog( + RunId.fromFriendlyId(input.runId), + `Mollifier buffered ${dwellMs}ms before materialising`, + { + attributes: { + runId: input.runId, + metadata: { + "mollifier.bufferedAt": input.createdAt.toISOString(), + "mollifier.materialisedAt": new Date().toISOString(), + "mollifier.dwellMs": dwellMs, + "mollifier.attempts": input.attempts, + }, + }, + parentId: snapshotSpanId, + } + ); + if (!debugResult.success && debugResult.code !== "RUN_NOT_FOUND") { + logger.warn("mollifier drainer: failed to record admin debug log", { + runId: input.runId, + code: debugResult.code, + }); + } + } }); }); }; diff --git a/apps/webapp/app/v3/mollifier/syntheticReplayTaskRun.server.ts b/apps/webapp/app/v3/mollifier/syntheticReplayTaskRun.server.ts new file mode 100644 index 00000000000..01962cf7890 --- /dev/null +++ b/apps/webapp/app/v3/mollifier/syntheticReplayTaskRun.server.ts @@ -0,0 +1,51 @@ +import type { TaskRun } from "@trigger.dev/database"; +import type { SyntheticRun } from "./readFallback.server"; + +export type SyntheticReplayTaskRun = TaskRun & { + project: { slug: string; organization: { slug: string } }; + runtimeEnvironment: { slug: string }; +}; + +// Adapt a buffered-run snapshot into the TaskRun-shaped input that +// `ReplayTaskRunService.call` expects. ReplayTaskRunService builds the +// new run's traceparent as `00-${existingTaskRun.traceId}-${existingTaskRun.spanId}-01` +// without guarding for undefined, so a synthetic with missing traceId +// or spanId (older snapshots — both fields are documented optional on +// `SyntheticRun`) would produce `00-undefined-undefined-01`, an invalid +// W3C traceparent that OTel silently drops, severing the replay's trace +// link to the original run. +// +// Returns null when those fields are missing — the caller surfaces this +// as "Run not found" so the customer retries once the drainer has +// materialised the PG row, where traceId/spanId are guaranteed present. +export function buildSyntheticReplayTaskRun(args: { + synthetic: SyntheticRun; + envRow: { + slug: string; + project: { slug: string; organization: { slug: string } }; + }; +}): SyntheticReplayTaskRun | null { + const { synthetic, envRow } = args; + if (!synthetic.traceId || !synthetic.spanId) return null; + return { + // The double `as unknown as TaskRun` cast is load-bearing — a direct + // `synthetic as TaskRun` won't compile. `SyntheticRun` carries the + // subset of fields that `ReplayTaskRunService.call` actually reads + // (the contract is enumerated on the SyntheticRun type comment in + // readFallback.server.ts), but its shape is not structurally + // assignable to the full Prisma `TaskRun` row: optional vs required + // fields diverge, several PG columns (number, batchId variants, + // status enum widening) are deliberately absent or narrower on the + // synthetic. Routing it through `unknown` is the explicit "we know + // this is a subset, we've audited which fields are read" signal, + // and the traceId/spanId guard above prevents the only field + // ReplayTaskRunService consumes that would corrupt downstream + // behaviour (the OTel traceparent) when undefined. + ...(synthetic as unknown as TaskRun), + project: { + slug: envRow.project.slug, + organization: { slug: envRow.project.organization.slug }, + }, + runtimeEnvironment: { slug: envRow.slug }, + }; +} diff --git a/apps/webapp/app/v3/mollifier/syntheticRunHeader.server.ts b/apps/webapp/app/v3/mollifier/syntheticRunHeader.server.ts new file mode 100644 index 00000000000..6b5420f3d8e --- /dev/null +++ b/apps/webapp/app/v3/mollifier/syntheticRunHeader.server.ts @@ -0,0 +1,66 @@ +import type { SyntheticRun } from "./readFallback.server"; + +// Synthesise the run-detail page's `run` header shape (the NavBar + +// status badge + Cancel-button gate) from a buffered run snapshot. The +// shape matches `RunPresenter.getRun`'s `runData` — keep this in sync +// when fields are added there. +// +// CANCELED and FAILED state is reflected back from +// `SyntheticRun.cancelledAt` / `status` so terminal buffered runs show +// the correct status in the NavBar + isFinished:true (which collapses +// the Cancel button on the page header) before the drainer materialises +// the PG row. This mirrors what `buildSyntheticSpanRun` does for the +// right-side details panel — the SyntheticRun.cancelledAt contract +// comment in readFallback.server.ts names this exact UI surface. +// +// FAILED status maps to `SYSTEM_FAILURE` to match the drainer's +// non-retryable terminal path, which is what `buildSyntheticSpanRun` +// uses too. Symmetric across the header + span-detail panel so an +// admin doesn't see "Pending" + "FAILED" simultaneously on the same +// run. +export function buildSyntheticRunHeader(args: { + run: SyntheticRun; + environment: { + id: string; + organizationId: string; + type: "PRODUCTION" | "DEVELOPMENT" | "STAGING" | "PREVIEW"; + slug: string; + }; +}) { + const { run, environment } = args; + const isCancelled = run.status === "CANCELED"; + const isFailed = run.status === "FAILED"; + + return { + // `id` mirrors RunPresenter.getRun's runData (the PG path), which + // is the internal cuid — not the friendlyId. SyntheticRun.id is + // already the cuid (RunId.fromFriendlyId(entry.runId) in + // readFallback.server.ts) so the admin debug tooltip on the run + // detail page shows the same format for buffered + materialised + // runs. + id: run.id, + number: 1, + friendlyId: run.friendlyId, + traceId: run.traceId ?? "", + spanId: run.spanId ?? "", + status: isCancelled + ? ("CANCELED" as const) + : isFailed + ? ("SYSTEM_FAILURE" as const) + : ("PENDING" as const), + isFinished: isCancelled || isFailed, + startedAt: null, + completedAt: run.cancelledAt ?? null, + logsDeletedAt: null, + rootTaskRun: null, + parentTaskRun: null, + environment: { + id: environment.id, + organizationId: environment.organizationId, + type: environment.type, + slug: environment.slug, + userId: undefined, + userName: undefined, + }, + }; +} diff --git a/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts b/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts index bdc7873304f..334f9e5db80 100644 --- a/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts +++ b/apps/webapp/app/v3/mollifier/syntheticSpanRun.server.ts @@ -177,6 +177,7 @@ export async function buildSyntheticSpanRun(args: { traceId: run.traceId ?? "", spanId: run.spanId ?? "", isCached: false, + isBuffered: true, machinePreset: narrowMachinePreset(run.machinePreset), taskEventStore: "taskEvent", externalTraceId: undefined, diff --git a/apps/webapp/test/mollifierDrainerHandler.test.ts b/apps/webapp/test/mollifierDrainerHandler.test.ts index 9aea04f014b..71eb501974b 100644 --- a/apps/webapp/test/mollifierDrainerHandler.test.ts +++ b/apps/webapp/test/mollifierDrainerHandler.test.ts @@ -19,6 +19,22 @@ vi.mock("~/v3/services/alerts/performTaskRunAlerts.server", () => ({ }, })); +// The drainer calls `recordRunDebugLog` after a successful engine.trigger +// to emit an admin-only LOG-kind event encoding the buffered window. +// The real implementation imports the configured event repository (prisma +// + clickhouse + env), which has heavy side-effects on first import. +// Stub it to a vi.fn so the unit tests can assert call shape without +// dragging the whole eventRepository graph into webapp test setup. +// `vi.hoisted` is required because `vi.mock` factories are hoisted above +// regular `const`s — referencing a top-level variable from inside the +// factory otherwise fires `Cannot access 'X' before initialization`. +const { recordRunDebugLogMock } = vi.hoisted(() => ({ + recordRunDebugLogMock: vi.fn(async () => ({ success: true as const })), +})); +vi.mock("~/v3/eventRepository/index.server", () => ({ + recordRunDebugLog: recordRunDebugLogMock, +})); + import { createDrainerHandler, isRetryablePgError, @@ -371,4 +387,109 @@ describe("createDrainerHandler", () => { ).rejects.toThrow("engine rejected the snapshot"); expect(createFailedTaskRun).not.toHaveBeenCalled(); }); + + it("emits an admin-only LOG-kind event with the buffered window after engine.trigger succeeds", async () => { + // The drainer's audit trail rides the existing TaskEventKind.LOG + // filter pattern (`eventRepository.server.ts:108` + `logs.download.ts:118`) + // — admins see the buffered window in the trace; non-admins don't. + recordRunDebugLogMock.mockClear(); + const trigger = vi.fn(async () => ({ friendlyId: "run_z" })); + const handler = createDrainerHandler({ + engine: { trigger } as any, + prisma: {} as any, + }); + + const bufferedAt = new Date(Date.now() - 4_000); + await handler({ + runId: "run_z", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", spanId: "snapspan", traceId: "snaptrace" }, + attempts: 2, + createdAt: bufferedAt, + } as any); + + expect(recordRunDebugLogMock).toHaveBeenCalledOnce(); + const [callRunId, message, options] = recordRunDebugLogMock.mock.calls[0] as [ + string, + string, + any, + ]; + // Internal cuid derived from the friendlyId, mirroring what + // `findRunForEventCreation` queries on. + expect(callRunId).toBe("z"); + expect(message).toMatch(/Mollifier buffered \d+ms before materialising/); + // Emitted as a marker at materialisation time (no `startTime` / + // `duration` overrides) — engine.trigger has just rewritten the + // root span's start_time to "now", so back-dating the event would + // clip it off-screen in the trace renderer. The historical window + // is preserved in metadata so admins can still read it. + expect(options.startTime).toBeUndefined(); + expect(options.duration).toBeUndefined(); + expect(options.parentId).toBe("snapspan"); + expect(options.attributes.metadata["mollifier.bufferedAt"]).toBe(bufferedAt.toISOString()); + expect(options.attributes.metadata["mollifier.attempts"]).toBe(2); + expect(options.attributes.metadata["mollifier.dwellMs"]).toBeGreaterThan(0); + }); + + it("does NOT emit the admin LOG event when engine.trigger fails non-retryably", async () => { + // The audit trail is for runs that actually materialised. On a + // terminal SYSTEM_FAILURE path the customer-visible outcome is the + // failure row; emitting a "buffered for Xms" event next to it would + // imply the buffered window completed normally. + recordRunDebugLogMock.mockClear(); + const trigger = vi.fn(async () => { + throw new Error("engine rejected the snapshot"); + }); + const createFailedTaskRun = vi.fn(async () => ({ id: "internal" })); + const handler = createDrainerHandler({ + engine: { trigger, createFailedTaskRun } as any, + prisma: {} as any, + }); + + await handler({ + runId: "run_z", + envId: "env_a", + orgId: "org_1", + payload: { taskIdentifier: "t", environment: envFixture }, + attempts: 0, + createdAt: new Date(), + } as any); + + expect(recordRunDebugLogMock).not.toHaveBeenCalled(); + }); + + it("does NOT emit the admin LOG event on the cancel-bifurcation path", async () => { + // Cancel-bifurcation writes a CANCELED row directly without calling + // engine.trigger. There's no buffered-then-materialised window to + // describe — the run never ran. + recordRunDebugLogMock.mockClear(); + const friendlyId = RunId.generate().friendlyId; + const createCancelledRun = vi.fn(async () => ({ + id: "internal", + friendlyId, + status: "CANCELED", + })); + const handler = createDrainerHandler({ + engine: { createCancelledRun } as any, + prisma: {} as any, + }); + + await handler({ + runId: friendlyId, + envId: "env_a", + orgId: "org_1", + payload: { + friendlyId, + taskIdentifier: "t", + environment: envFixture, + cancelledAt: new Date().toISOString(), + cancelReason: "Canceled by user", + }, + attempts: 0, + createdAt: new Date(), + } as any); + + expect(recordRunDebugLogMock).not.toHaveBeenCalled(); + }); }); diff --git a/apps/webapp/test/mollifierSyntheticReplayTaskRun.test.ts b/apps/webapp/test/mollifierSyntheticReplayTaskRun.test.ts new file mode 100644 index 00000000000..6df2d92dde4 --- /dev/null +++ b/apps/webapp/test/mollifierSyntheticReplayTaskRun.test.ts @@ -0,0 +1,106 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { buildSyntheticReplayTaskRun } from "~/v3/mollifier/syntheticReplayTaskRun.server"; +import type { SyntheticRun } from "~/v3/mollifier/readFallback.server"; + +const NOW = new Date("2026-05-21T10:00:00Z"); + +function makeSyntheticRun(overrides: Partial = {}): SyntheticRun { + return { + id: "run_internal_1", + friendlyId: "run_friendly_1", + status: "QUEUED", + cancelledAt: undefined, + cancelReason: undefined, + delayUntil: undefined, + taskIdentifier: "hello-world", + createdAt: NOW, + payload: { message: "hi" }, + payloadType: "application/json", + metadata: undefined, + metadataType: undefined, + seedMetadata: undefined, + seedMetadataType: undefined, + idempotencyKey: undefined, + idempotencyKeyOptions: undefined, + isTest: false, + depth: 0, + ttl: "10m", + tags: [], + runTags: [], + lockedToVersion: undefined, + resumeParentOnCompletion: false, + parentTaskRunId: undefined, + traceId: "trace_1", + spanId: "span_1", + parentSpanId: undefined, + runtimeEnvironmentId: "env_a", + engine: "V2", + workerQueue: "worker-queue-1", + queue: "task/hello-world", + concurrencyKey: undefined, + machinePreset: "small-1x", + realtimeStreamsVersion: "v1", + maxAttempts: 3, + maxDurationInSeconds: 3600, + replayedFromTaskRunFriendlyId: undefined, + annotations: undefined, + traceContext: undefined, + scheduleId: undefined, + batchId: undefined, + parentTaskRunFriendlyId: undefined, + rootTaskRunFriendlyId: undefined, + ...overrides, + }; +} + +const ENV_ROW = { + slug: "dev", + project: { slug: "hello-world", organization: { slug: "references" } }, +}; + +describe("buildSyntheticReplayTaskRun", () => { + it("returns the adapted TaskRun shape when traceId and spanId are present", () => { + const taskRun = buildSyntheticReplayTaskRun({ + synthetic: makeSyntheticRun(), + envRow: ENV_ROW, + }); + expect(taskRun).not.toBeNull(); + expect(taskRun!.traceId).toBe("trace_1"); + expect(taskRun!.spanId).toBe("span_1"); + expect(taskRun!.project.slug).toBe("hello-world"); + expect(taskRun!.project.organization.slug).toBe("references"); + expect(taskRun!.runtimeEnvironment.slug).toBe("dev"); + }); + + it("returns null when the snapshot has no traceId", () => { + // ReplayTaskRunService builds `00-${traceId}-${spanId}-01` without + // guarding for undefined. Falling through with a missing traceId + // would emit `00-undefined-...-01`, an invalid W3C traceparent that + // OTel silently drops, breaking the replayed run's trace linkage to + // the original. The helper must refuse rather than degrade silently. + const taskRun = buildSyntheticReplayTaskRun({ + synthetic: makeSyntheticRun({ traceId: undefined }), + envRow: ENV_ROW, + }); + expect(taskRun).toBeNull(); + }); + + it("returns null when the snapshot has no spanId", () => { + const taskRun = buildSyntheticReplayTaskRun({ + synthetic: makeSyntheticRun({ spanId: undefined }), + envRow: ENV_ROW, + }); + expect(taskRun).toBeNull(); + }); + + it("returns null when both traceId and spanId are missing", () => { + const taskRun = buildSyntheticReplayTaskRun({ + synthetic: makeSyntheticRun({ traceId: undefined, spanId: undefined }), + envRow: ENV_ROW, + }); + expect(taskRun).toBeNull(); + }); +}); diff --git a/apps/webapp/test/mollifierSyntheticRunHeader.test.ts b/apps/webapp/test/mollifierSyntheticRunHeader.test.ts new file mode 100644 index 00000000000..5d1a3d1a748 --- /dev/null +++ b/apps/webapp/test/mollifierSyntheticRunHeader.test.ts @@ -0,0 +1,114 @@ +import { describe, expect, it, vi } from "vitest"; + +vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} })); + +import { buildSyntheticRunHeader } from "~/v3/mollifier/syntheticRunHeader.server"; +import type { SyntheticRun } from "~/v3/mollifier/readFallback.server"; + +const NOW = new Date("2026-05-21T10:00:00Z"); +const CANCELLED_AT = new Date("2026-05-21T10:00:30Z"); + +function makeSyntheticRun(overrides: Partial = {}): SyntheticRun { + return { + id: "run_internal_1", + friendlyId: "run_friendly_1", + status: "QUEUED", + cancelledAt: undefined, + cancelReason: undefined, + delayUntil: undefined, + taskIdentifier: "hello-world", + createdAt: NOW, + payload: { message: "hi" }, + payloadType: "application/json", + metadata: undefined, + metadataType: undefined, + seedMetadata: undefined, + seedMetadataType: undefined, + idempotencyKey: undefined, + idempotencyKeyOptions: undefined, + isTest: false, + depth: 0, + ttl: "10m", + tags: [], + runTags: [], + lockedToVersion: undefined, + resumeParentOnCompletion: false, + parentTaskRunId: undefined, + traceId: "trace_1", + spanId: "span_1", + parentSpanId: undefined, + runtimeEnvironmentId: "env_a", + engine: "V2", + workerQueue: "worker-queue-1", + queue: "task/hello-world", + concurrencyKey: undefined, + machinePreset: "small-1x", + realtimeStreamsVersion: "v1", + maxAttempts: 3, + maxDurationInSeconds: 3600, + replayedFromTaskRunFriendlyId: undefined, + annotations: undefined, + traceContext: undefined, + scheduleId: undefined, + batchId: undefined, + parentTaskRunFriendlyId: undefined, + rootTaskRunFriendlyId: undefined, + ...overrides, + }; +} + +const ENV = { + id: "env_a", + organizationId: "org_a", + type: "DEVELOPMENT" as const, + slug: "dev", +}; + +describe("buildSyntheticRunHeader", () => { + it("returns PENDING / non-final state for a queued buffered run", () => { + const header = buildSyntheticRunHeader({ run: makeSyntheticRun(), environment: ENV }); + expect(header.status).toBe("PENDING"); + expect(header.isFinished).toBe(false); + expect(header.completedAt).toBeNull(); + }); + + it("reflects CANCELED state from the snapshot so the NavBar and Cancel-button gate update before the drainer materialises", () => { + const header = buildSyntheticRunHeader({ + run: makeSyntheticRun({ status: "CANCELED", cancelledAt: CANCELLED_AT }), + environment: ENV, + }); + // The Cancel button in route.tsx is gated on `!run.isFinished` and the + // status badge reads `run.status`. Both must flip on buffered-cancel + // or the user sees a "Pending" badge with a Cancel button on a run + // that's already cancelled in the snapshot. + expect(header.status).toBe("CANCELED"); + expect(header.isFinished).toBe(true); + expect(header.completedAt).toEqual(CANCELLED_AT); + }); + + it("forwards identity and environment fields from the snapshot", () => { + const header = buildSyntheticRunHeader({ run: makeSyntheticRun(), environment: ENV }); + expect(header.friendlyId).toBe("run_friendly_1"); + // `id` mirrors RunPresenter.getRun (the PG path) which puts the + // internal cuid in this field. SyntheticRun.id is the cuid; the + // header must surface it (not the friendlyId). + expect(header.id).toBe("run_internal_1"); + expect(header.traceId).toBe("trace_1"); + expect(header.spanId).toBe("span_1"); + expect(header.environment).toMatchObject({ + id: "env_a", + organizationId: "org_a", + type: "DEVELOPMENT", + slug: "dev", + }); + }); + + it("falls back to empty strings when the snapshot has no trace/span ids", () => { + const header = buildSyntheticRunHeader({ + run: makeSyntheticRun({ traceId: undefined, spanId: undefined }), + environment: ENV, + }); + expect(header.traceId).toBe(""); + expect(header.spanId).toBe(""); + }); +});