diff --git a/apps/sim/app/api/copilot/chat/stop/route.test.ts b/apps/sim/app/api/copilot/chat/stop/route.test.ts index bab5465507..452131f21e 100644 --- a/apps/sim/app/api/copilot/chat/stop/route.test.ts +++ b/apps/sim/app/api/copilot/chat/stop/route.test.ts @@ -1,79 +1,19 @@ /** * @vitest-environment node */ -import { authMockFns } from '@sim/testing' +import { authMockFns, dbChainMock, dbChainMockFns, resetDbChainMock } from '@sim/testing' import { NextRequest } from 'next/server' import { beforeEach, describe, expect, it, vi } from 'vitest' -const { - mockSelect, - mockFrom, - mockWhereSelect, - mockLimit, - mockForUpdate, - mockUpdate, - mockSet, - mockWhereUpdate, - mockReturning, - mockPublishStatusChanged, - mockSql, - mockTransaction, -} = vi.hoisted(() => { - const mockSelect = vi.fn() - const mockFrom = vi.fn() - const mockWhereSelect = vi.fn() - const mockLimit = vi.fn() - const mockForUpdate = vi.fn() - const mockUpdate = vi.fn() - const mockSet = vi.fn() - const mockWhereUpdate = vi.fn() - const mockReturning = vi.fn() - const mockPublishStatusChanged = vi.fn() - const mockSql = vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({ - strings, - values, - })) - const mockTransaction = vi.fn( - (callback: (tx: { select: typeof mockSelect; update: typeof mockUpdate }) => unknown) => - callback({ select: mockSelect, update: mockUpdate }) - ) - - return { - mockSelect, - mockFrom, - mockWhereSelect, - mockLimit, - mockForUpdate, - mockUpdate, - mockSet, - mockWhereUpdate, - mockReturning, - mockPublishStatusChanged, - mockSql, - mockTransaction, - } -}) +vi.mock('@sim/db', () => dbChainMock) -vi.mock('@sim/db/schema', () => ({ - copilotChats: { - id: 'copilotChats.id', - userId: 'copilotChats.userId', - workspaceId: 'copilotChats.workspaceId', - messages: 'copilotChats.messages', - conversationId: 'copilotChats.conversationId', - }, -})) - -vi.mock('@sim/db', () => ({ - db: { - transaction: mockTransaction, - }, +const { mockAppendCopilotChatMessages, mockPublishStatusChanged } = vi.hoisted(() => ({ + mockAppendCopilotChatMessages: vi.fn(), + mockPublishStatusChanged: vi.fn(), })) -vi.mock('drizzle-orm', () => ({ - and: vi.fn((...conditions: unknown[]) => ({ conditions, type: 'and' })), - eq: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'eq' })), - sql: mockSql, +vi.mock('@/lib/copilot/chat/messages-store', () => ({ + appendCopilotChatMessages: mockAppendCopilotChatMessages, })) vi.mock('@/lib/copilot/tasks', () => ({ @@ -92,39 +32,33 @@ function createRequest(body: Record) { }) } +/** + * Sequence the two in-tx reads `finalizeAssistantTurn` issues: the chat row + * (`FOR UPDATE ... LIMIT 1`) and the last-message lookup that drives dedup + * (both terminate on `.limit(1)`). + */ +function mockReads(opts: { + chat: Record | null + last?: { messageId: string; role: string } +}) { + dbChainMockFns.limit.mockResolvedValueOnce(opts.chat ? [opts.chat] : []) + dbChainMockFns.limit.mockResolvedValueOnce(opts.last ? [opts.last] : []) +} + describe('copilot chat stop route', () => { beforeEach(() => { vi.clearAllMocks() - + // Drain the once-queue (clearAllMocks/resetDbChainMock don't), then restore defaults. + dbChainMockFns.limit.mockReset() + resetDbChainMock() authMockFns.mockGetSession.mockResolvedValue({ user: { id: 'user-1' } }) - - mockLimit.mockResolvedValue([ - { - workspaceId: 'ws-1', - messages: [{ id: 'stream-1', role: 'user', content: 'hello' }], - conversationId: 'stream-1', - }, - ]) - mockForUpdate.mockReturnValue({ limit: mockLimit }) - mockWhereSelect.mockReturnValue({ for: mockForUpdate }) - mockFrom.mockReturnValue({ where: mockWhereSelect }) - mockSelect.mockReturnValue({ from: mockFrom }) - - mockReturning.mockResolvedValue([{ workspaceId: 'ws-1' }]) - mockWhereUpdate.mockReturnValue({ returning: mockReturning }) - mockSet.mockReturnValue({ where: mockWhereUpdate }) - mockUpdate.mockReturnValue({ set: mockSet }) }) it('returns 401 when unauthenticated', async () => { authMockFns.mockGetSession.mockResolvedValueOnce(null) const response = await POST( - createRequest({ - chatId: 'chat-1', - streamId: 'stream-1', - content: '', - }) + createRequest({ chatId: 'chat-1', streamId: 'stream-1', content: '' }) ) expect(response.status).toBe(401) @@ -132,41 +66,37 @@ describe('copilot chat stop route', () => { }) it('is a no-op when the chat is missing', async () => { - mockLimit.mockResolvedValueOnce([]) + mockReads({ chat: null }) const response = await POST( - createRequest({ - chatId: 'missing-chat', - streamId: 'stream-1', - content: '', - }) + createRequest({ chatId: 'missing-chat', streamId: 'stream-1', content: '' }) ) expect(response.status).toBe(200) expect(await response.json()).toEqual({ success: true }) - expect(mockUpdate).not.toHaveBeenCalled() + expect(mockAppendCopilotChatMessages).not.toHaveBeenCalled() }) it('appends a stopped assistant message even with no content', async () => { + mockReads({ + chat: { workspaceId: 'ws-1', conversationId: 'stream-1', model: null }, + last: { messageId: 'stream-1', role: 'user' }, + }) + const response = await POST( - createRequest({ - chatId: 'chat-1', - streamId: 'stream-1', - content: '', - }) + createRequest({ chatId: 'chat-1', streamId: 'stream-1', content: '' }) ) expect(response.status).toBe(200) expect(await response.json()).toEqual({ success: true }) - const setArg = mockSet.mock.calls[0]?.[0] - expect(setArg).toBeTruthy() + const setArg = dbChainMockFns.set.mock.calls[0]?.[0] as Record expect(setArg.conversationId).toBeNull() - expect(setArg.messages).toBeTruthy() + expect(Object.hasOwn(setArg, 'messages')).toBe(false) - const appendedPayload = JSON.parse(setArg.messages.values[1] as string) - expect(appendedPayload).toHaveLength(1) - expect(appendedPayload[0]).toMatchObject({ + expect(mockAppendCopilotChatMessages).toHaveBeenCalledTimes(1) + const [, appended] = mockAppendCopilotChatMessages.mock.calls[0] + expect(appended[0]).toMatchObject({ role: 'assistant', content: '', contentBlocks: [{ type: 'complete', status: 'cancelled' }], @@ -181,32 +111,21 @@ describe('copilot chat stop route', () => { }) it('appends a stopped assistant message if the stream marker was already cleared', async () => { - mockLimit.mockResolvedValueOnce([ - { - workspaceId: 'ws-1', - messages: [{ id: 'stream-1', role: 'user', content: 'hello' }], - conversationId: null, - }, - ]) + mockReads({ + chat: { workspaceId: 'ws-1', conversationId: null, model: null }, + last: { messageId: 'stream-1', role: 'user' }, + }) const response = await POST( - createRequest({ - chatId: 'chat-1', - streamId: 'stream-1', - content: 'partial', - }) + createRequest({ chatId: 'chat-1', streamId: 'stream-1', content: 'partial' }) ) expect(response.status).toBe(200) expect(await response.json()).toEqual({ success: true }) - const setArg = mockSet.mock.calls[0]?.[0] - expect(setArg.messages).toBeTruthy() - const appendedPayload = JSON.parse(setArg.messages.values[1] as string) - expect(appendedPayload[0]).toMatchObject({ - role: 'assistant', - content: 'partial', - }) + expect(mockAppendCopilotChatMessages).toHaveBeenCalledTimes(1) + const [, appended] = mockAppendCopilotChatMessages.mock.calls[0] + expect(appended[0]).toMatchObject({ role: 'assistant', content: 'partial' }) expect(mockPublishStatusChanged).toHaveBeenCalledWith({ workspaceId: 'ws-1', @@ -217,28 +136,19 @@ describe('copilot chat stop route', () => { }) it('republishes completed status when the assistant was already persisted', async () => { - mockLimit.mockResolvedValueOnce([ - { - workspaceId: 'ws-1', - messages: [ - { id: 'stream-1', role: 'user', content: 'hello' }, - { id: 'assistant-1', role: 'assistant', content: 'partial' }, - ], - conversationId: null, - }, - ]) + mockReads({ + chat: { workspaceId: 'ws-1', conversationId: null, model: null }, + last: { messageId: 'assistant-1', role: 'assistant' }, + }) const response = await POST( - createRequest({ - chatId: 'chat-1', - streamId: 'stream-1', - content: 'partial', - }) + createRequest({ chatId: 'chat-1', streamId: 'stream-1', content: 'partial' }) ) expect(response.status).toBe(200) expect(await response.json()).toEqual({ success: true }) - expect(mockUpdate).not.toHaveBeenCalled() + expect(mockAppendCopilotChatMessages).not.toHaveBeenCalled() + expect(dbChainMockFns.set).not.toHaveBeenCalled() expect(mockPublishStatusChanged).toHaveBeenCalledWith({ workspaceId: 'ws-1', chatId: 'chat-1', diff --git a/apps/sim/app/api/copilot/chat/update-messages/route.test.ts b/apps/sim/app/api/copilot/chat/update-messages/route.test.ts index c56415116a..2e7dfa134c 100644 --- a/apps/sim/app/api/copilot/chat/update-messages/route.test.ts +++ b/apps/sim/app/api/copilot/chat/update-messages/route.test.ts @@ -16,6 +16,7 @@ const { mockSet, mockUpdateWhere, mockReturning, + mockReplaceCopilotChatMessages, } = vi.hoisted(() => ({ mockSelect: vi.fn(), mockFrom: vi.fn(), @@ -25,15 +26,23 @@ const { mockSet: vi.fn(), mockUpdateWhere: vi.fn(), mockReturning: vi.fn(), + mockReplaceCopilotChatMessages: vi.fn(), })) vi.mock('@sim/db', () => ({ db: { select: mockSelect, update: mockUpdate, + transaction: async ( + cb: (tx: { update: typeof mockUpdate; select: typeof mockSelect }) => unknown + ) => cb({ update: mockUpdate, select: mockSelect }), }, })) +vi.mock('@/lib/copilot/chat/messages-store', () => ({ + replaceCopilotChatMessages: mockReplaceCopilotChatMessages, +})) + vi.mock('drizzle-orm', () => ({ and: vi.fn((...conditions: unknown[]) => ({ conditions, type: 'and' })), eq: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'eq' })), @@ -257,10 +266,13 @@ describe('Copilot Chat Update Messages API Route', () => { expect(mockSelect).toHaveBeenCalled() expect(mockUpdate).toHaveBeenCalled() - expect(mockSet).toHaveBeenCalledWith({ + expect(mockSet).toHaveBeenCalledWith({ updatedAt: expect.any(Date) }) + expect(mockReplaceCopilotChatMessages).toHaveBeenCalledWith( + 'chat-123', messages, - updatedAt: expect.any(Date), - }) + { chatModel: 'gpt-4' }, + expect.anything() + ) }) it('should successfully update chat messages with optional fields', async () => { @@ -315,8 +327,10 @@ describe('Copilot Chat Update Messages API Route', () => { messageCount: 2, }) - expect(mockSet).toHaveBeenCalledWith({ - messages: [ + expect(mockSet).toHaveBeenCalledWith({ updatedAt: expect.any(Date) }) + expect(mockReplaceCopilotChatMessages).toHaveBeenCalledWith( + 'chat-456', + [ { id: 'msg-1', role: 'user', @@ -345,8 +359,9 @@ describe('Copilot Chat Update Messages API Route', () => { ], }, ], - updatedAt: expect.any(Date), - }) + { chatModel: 'gpt-4' }, + expect.anything() + ) }) it('should handle empty messages array', async () => { @@ -373,10 +388,13 @@ describe('Copilot Chat Update Messages API Route', () => { messageCount: 0, }) - expect(mockSet).toHaveBeenCalledWith({ - messages: [], - updatedAt: expect.any(Date), - }) + expect(mockSet).toHaveBeenCalledWith({ updatedAt: expect.any(Date) }) + expect(mockReplaceCopilotChatMessages).toHaveBeenCalledWith( + 'chat-789', + [], + { chatModel: 'gpt-4' }, + expect.anything() + ) }) it('should handle database errors during chat lookup', async () => { @@ -485,10 +503,13 @@ describe('Copilot Chat Update Messages API Route', () => { messageCount: 100, }) - expect(mockSet).toHaveBeenCalledWith({ + expect(mockSet).toHaveBeenCalledWith({ updatedAt: expect.any(Date) }) + expect(mockReplaceCopilotChatMessages).toHaveBeenCalledWith( + 'chat-large', messages, - updatedAt: expect.any(Date), - }) + { chatModel: 'gpt-4' }, + expect.anything() + ) }) it('should handle messages with both user and assistant roles', async () => { diff --git a/apps/sim/app/api/copilot/chat/update-messages/route.ts b/apps/sim/app/api/copilot/chat/update-messages/route.ts index 1b654c4930..7c7792e3f2 100644 --- a/apps/sim/app/api/copilot/chat/update-messages/route.ts +++ b/apps/sim/app/api/copilot/chat/update-messages/route.ts @@ -6,7 +6,7 @@ import { type NextRequest, NextResponse } from 'next/server' import { updateCopilotMessagesContract } from '@/lib/api/contracts/copilot' import { parseRequest } from '@/lib/api/server' import { getAccessibleCopilotChatAuth } from '@/lib/copilot/chat/lifecycle' -import { replaceCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write' +import { replaceCopilotChatMessages } from '@/lib/copilot/chat/messages-store' import { normalizeMessage, type PersistedMessage } from '@/lib/copilot/chat/persisted-message' import { authenticateCopilotRequestSessionOnly, @@ -73,9 +73,7 @@ export const POST = withRouteHandler(async (req: NextRequest) => { return createNotFoundResponse('Chat not found or unauthorized') } - // Update chat with new messages, plan artifact, and config const updateData: Record = { - messages: normalizedMessages, updatedAt: new Date(), } @@ -87,16 +85,20 @@ export const POST = withRouteHandler(async (req: NextRequest) => { updateData.config = config } - const [updated] = await db - .update(copilotChats) - .set(updateData) - .where(eq(copilotChats.id, chatId)) - .returning({ model: copilotChats.model }) - if (updated) { - await replaceCopilotChatMessages(chatId, normalizedMessages, { - chatModel: updated.model ?? null, - }) - } + await db.transaction(async (tx) => { + const [updated] = await tx + .update(copilotChats) + .set(updateData) + .where(eq(copilotChats.id, chatId)) + .returning({ model: copilotChats.model }) + if (!updated) return + await replaceCopilotChatMessages( + chatId, + normalizedMessages, + { chatModel: updated.model ?? null }, + tx + ) + }) logger.info(`[${tracker.requestId}] Successfully updated chat`, { chatId, diff --git a/apps/sim/app/api/mothership/chats/[chatId]/fork/route.ts b/apps/sim/app/api/mothership/chats/[chatId]/fork/route.ts index 54f4cc588e..fec6bc6c19 100644 --- a/apps/sim/app/api/mothership/chats/[chatId]/fork/route.ts +++ b/apps/sim/app/api/mothership/chats/[chatId]/fork/route.ts @@ -6,8 +6,8 @@ import { eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { forkMothershipChatContract } from '@/lib/api/contracts/mothership-tasks' import { parseRequest } from '@/lib/api/server' -import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write' -import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' +import { loadCopilotChatMessages } from '@/lib/copilot/chat/lifecycle' +import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-store' import { fetchGo } from '@/lib/copilot/request/go/fetch' import { authenticateCopilotRequestSessionOnly, @@ -50,9 +50,19 @@ export const POST = withRouteHandler( const { chatId } = parsed.data.params const { upToMessageId } = parsed.data.body - // Load parent chat and verify ownership. const [parent] = await db - .select() + .select({ + id: copilotChats.id, + userId: copilotChats.userId, + type: copilotChats.type, + workspaceId: copilotChats.workspaceId, + title: copilotChats.title, + model: copilotChats.model, + resources: copilotChats.resources, + previewYaml: copilotChats.previewYaml, + planArtifact: copilotChats.planArtifact, + config: copilotChats.config, + }) .from(copilotChats) .where(eq(copilotChats.id, chatId)) .limit(1) @@ -65,8 +75,7 @@ export const POST = withRouteHandler( await assertActiveWorkspaceAccess(parent.workspaceId, userId) } - // Find the fork point in the Sim-side messages array. - const messages = Array.isArray(parent.messages) ? (parent.messages as PersistedMessage[]) : [] + const messages = await loadCopilotChatMessages(chatId) const forkIdx = messages.findIndex((m) => m.id === upToMessageId) if (forkIdx < 0) { return createBadRequestResponse('Message not found in chat') @@ -83,32 +92,36 @@ export const POST = withRouteHandler( const title = `Fork | ${baseTitle}` const now = new Date() - const [newChat] = await db - .insert(copilotChats) - .values({ - id: newId, - userId, - workspaceId: parent.workspaceId, - type: parent.type, - title, - model: parent.model, - messages: forkedMessages, - resources: parentResources, - previewYaml: parent.previewYaml, - planArtifact: parent.planArtifact, - config: parent.config, - conversationId: null, - updatedAt: now, - lastSeenAt: now, - }) - .returning({ id: copilotChats.id, workspaceId: copilotChats.workspaceId }) + const newChat = await db.transaction(async (tx) => { + const [row] = await tx + .insert(copilotChats) + .values({ + id: newId, + userId, + workspaceId: parent.workspaceId, + type: parent.type, + title, + model: parent.model, + resources: parentResources, + previewYaml: parent.previewYaml, + planArtifact: parent.planArtifact, + config: parent.config, + conversationId: null, + updatedAt: now, + lastSeenAt: now, + }) + .returning({ id: copilotChats.id, workspaceId: copilotChats.workspaceId }) + + if (!row) return null + + await appendCopilotChatMessages(newId, forkedMessages, { chatModel: parent.model }, tx) + return row + }) if (!newChat) { return createInternalServerErrorResponse('Failed to create forked chat') } - await appendCopilotChatMessages(newId, forkedMessages, { chatModel: parent.model }) - // Clone copilot-service conversation state (messages, active_messages, memory files). // Best-effort: if the copilot service doesn't have a row for the source chat yet, skip. try { diff --git a/apps/sim/app/api/mothership/chats/route.ts b/apps/sim/app/api/mothership/chats/route.ts index 1b7157fdde..c5610da215 100644 --- a/apps/sim/app/api/mothership/chats/route.ts +++ b/apps/sim/app/api/mothership/chats/route.ts @@ -106,7 +106,6 @@ export const POST = withRouteHandler(async (request: NextRequest) => { type: 'mothership', title: null, model: 'claude-opus-4-6', - messages: [], updatedAt: now, lastSeenAt: now, }) diff --git a/apps/sim/app/api/superuser/import-workflow/route.ts b/apps/sim/app/api/superuser/import-workflow/route.ts index 912278040b..07ab98932b 100644 --- a/apps/sim/app/api/superuser/import-workflow/route.ts +++ b/apps/sim/app/api/superuser/import-workflow/route.ts @@ -7,8 +7,8 @@ import { type NextRequest, NextResponse } from 'next/server' import { importWorkflowAsSuperuserContract } from '@/lib/api/contracts/workflows' import { parseRequest } from '@/lib/api/server' import { getSession } from '@/lib/auth' -import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write' -import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' +import { loadCopilotChatMessages } from '@/lib/copilot/chat/lifecycle' +import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-store' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' import { verifyEffectiveSuperUser } from '@/lib/templates/permissions' import { parseWorkflowJson } from '@/lib/workflows/operations/import-export' @@ -167,34 +167,46 @@ export const POST = withRouteHandler(async (request: NextRequest) => { // Copy copilot chats associated with the source workflow const sourceCopilotChats = await db - .select() + .select({ + id: copilotChats.id, + title: copilotChats.title, + model: copilotChats.model, + previewYaml: copilotChats.previewYaml, + planArtifact: copilotChats.planArtifact, + config: copilotChats.config, + }) .from(copilotChats) .where(eq(copilotChats.workflowId, workflowId)) let copilotChatsImported = 0 for (const chat of sourceCopilotChats) { - const [imported] = await db - .insert(copilotChats) - .values({ - userId: session.user.id, - workflowId: newWorkflowId, - title: chat.title ? `[Import] ${chat.title}` : null, - messages: chat.messages, - model: chat.model, - conversationId: null, // Don't copy conversation ID - previewYaml: chat.previewYaml, - planArtifact: chat.planArtifact, - config: chat.config, - createdAt: new Date(), - updatedAt: new Date(), - }) - .returning({ id: copilotChats.id }) - if (imported && Array.isArray(chat.messages) && chat.messages.length > 0) { - await appendCopilotChatMessages(imported.id, chat.messages as PersistedMessage[], { - chatModel: chat.model, - }) - } + const sourceMessages = await loadCopilotChatMessages(chat.id) + await db.transaction(async (tx) => { + const [imported] = await tx + .insert(copilotChats) + .values({ + userId: session.user.id, + workflowId: newWorkflowId, + title: chat.title ? `[Import] ${chat.title}` : null, + model: chat.model, + conversationId: null, // Don't copy conversation ID + previewYaml: chat.previewYaml, + planArtifact: chat.planArtifact, + config: chat.config, + createdAt: new Date(), + updatedAt: new Date(), + }) + .returning({ id: copilotChats.id }) + if (imported && sourceMessages.length > 0) { + await appendCopilotChatMessages( + imported.id, + sourceMessages, + { chatModel: chat.model }, + tx + ) + } + }) copilotChatsImported++ } diff --git a/apps/sim/lib/cleanup/chat-cleanup.ts b/apps/sim/lib/cleanup/chat-cleanup.ts index b154687bc3..c5dafdf9c2 100644 --- a/apps/sim/lib/cleanup/chat-cleanup.ts +++ b/apps/sim/lib/cleanup/chat-cleanup.ts @@ -1,5 +1,5 @@ import { db } from '@sim/db' -import { copilotChats, workspaceFiles } from '@sim/db/schema' +import { copilotMessages, workspaceFiles } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, inArray, isNull } from 'drizzle-orm' import { chunkArray } from '@/lib/cleanup/batch-delete' @@ -11,7 +11,7 @@ import { isUsingCloudStorage, StorageService } from '@/lib/uploads' const logger = createLogger('ChatCleanup') const COPILOT_CLEANUP_BATCH_SIZE = 1000 -/** Bounds JSONB detoast memory: `messages` can be MBs per row. */ +/** Bounds how many chats' `copilot_messages` rows are scanned per query. */ const CHAT_FILE_COLLECT_CHUNK_SIZE = 500 /** @@ -31,7 +31,7 @@ interface FileRef { /** * Collect all file storage keys for the given chat IDs from two sources: * 1. workspaceFiles rows with chatId FK (chat-scoped contexts only) - * 2. fileAttachments[].key inside copilotChats.messages JSONB + * 2. fileAttachments[].key inside each copilot_messages.content */ export async function collectChatFiles(chatIds: string[]): Promise { const files: FileRef[] = [] @@ -40,7 +40,7 @@ export async function collectChatFiles(chatIds: string[]): Promise { const seen = new Set() for (const chunk of chunkArray(chatIds, CHAT_FILE_COLLECT_CHUNK_SIZE)) { - const [linkedFiles, chatsWithMessages] = await Promise.all([ + const [linkedFiles, messageRows] = await Promise.all([ db .select({ key: workspaceFiles.key, context: workspaceFiles.context }) .from(workspaceFiles) @@ -51,10 +51,12 @@ export async function collectChatFiles(chatIds: string[]): Promise { inArray(workspaceFiles.context, [...CHAT_SCOPED_CONTEXTS]) ) ), + // Scan every message row for the chat (no deleted_at filter): this is a + // deletion path collecting blob keys, so attachments on any row count. db - .select({ messages: copilotChats.messages }) - .from(copilotChats) - .where(inArray(copilotChats.id, chunk)), + .select({ content: copilotMessages.content }) + .from(copilotMessages) + .where(inArray(copilotMessages.chatId, chunk)), ]) for (const f of linkedFiles) { @@ -64,24 +66,21 @@ export async function collectChatFiles(chatIds: string[]): Promise { } } - for (const chat of chatsWithMessages) { - const messages = chat.messages as unknown[] - if (!Array.isArray(messages)) continue - for (const msg of messages) { - if (!msg || typeof msg !== 'object') continue - const attachments = (msg as Record).fileAttachments - if (!Array.isArray(attachments)) continue - for (const attachment of attachments) { - if ( - attachment && - typeof attachment === 'object' && - (attachment as Record).key - ) { - const key = (attachment as Record).key as string - if (!seen.has(key)) { - seen.add(key) - files.push({ key, context: 'copilot' }) - } + for (const row of messageRows) { + const msg = row.content + if (!msg || typeof msg !== 'object') continue + const attachments = (msg as Record).fileAttachments + if (!Array.isArray(attachments)) continue + for (const attachment of attachments) { + if ( + attachment && + typeof attachment === 'object' && + (attachment as Record).key + ) { + const key = (attachment as Record).key as string + if (!seen.has(key)) { + seen.add(key) + files.push({ key, context: 'copilot' }) } } } diff --git a/apps/sim/lib/copilot/chat/lifecycle.test.ts b/apps/sim/lib/copilot/chat/lifecycle.test.ts index df1689cc68..3ced9edfae 100644 --- a/apps/sim/lib/copilot/chat/lifecycle.test.ts +++ b/apps/sim/lib/copilot/chat/lifecycle.test.ts @@ -122,13 +122,15 @@ describe('lifecycle copilot chat reads (cutover to copilot_messages)', () => { }) it('resolveOrCreateChat creates a new chat with an empty transcript', async () => { - // insert().values().returning() -> fresh chat with empty messages - dbChainMockFns.returning.mockResolvedValueOnce([{ ...chatRow, messages: [] }]) + dbChainMockFns.returning.mockResolvedValueOnce([chatRow]) const result = await resolveOrCreateChat({ userId: USER_ID, model: 'm' }) expect(result.isNew).toBe(true) expect(result.conversationHistory).toEqual([]) + expect(result.chat?.messages).toEqual([]) + const insertValues = dbChainMockFns.values.mock.calls[0]?.[0] as Record + expect(Object.hasOwn(insertValues, 'messages')).toBe(false) // a brand-new chat must not trigger a messages read expect(dbChainMockFns.orderBy).not.toHaveBeenCalled() }) diff --git a/apps/sim/lib/copilot/chat/lifecycle.ts b/apps/sim/lib/copilot/chat/lifecycle.ts index e86c127c10..1fcb5e51af 100644 --- a/apps/sim/lib/copilot/chat/lifecycle.ts +++ b/apps/sim/lib/copilot/chat/lifecycle.ts @@ -6,6 +6,7 @@ import { getActiveWorkflowRecord, } from '@sim/workflow-authz' import { and, asc, eq, isNull, sql } from 'drizzle-orm' +import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' import { assertActiveWorkspaceAccess, checkWorkspaceAccess, @@ -52,16 +53,6 @@ const copilotChatDetailColumns = { updatedAt: copilotChats.updatedAt, } as const -/** - * Returning column set for newly-inserted chats. A fresh chat has no - * `copilot_messages` rows yet, so the transcript is the just-inserted empty - * JSONB array — return it directly rather than issuing a second query. - */ -const copilotChatDetailReturningColumns = { - ...copilotChatDetailColumns, - messages: copilotChats.messages, -} as const - /** * Column set for the legacy copilot chat detail endpoint. Extends * `copilotChatDetailColumns` with `model`, `planArtifact`, and `config` — the @@ -83,7 +74,7 @@ const copilotChatLegacyDetailColumns = { * to a legacy JSONB array element — so the downstream normalize/transcript * pipeline is unchanged. */ -async function loadCopilotChatMessages(chatId: string): Promise[]> { +export async function loadCopilotChatMessages(chatId: string): Promise { const rows = await db .select({ content: copilotMessages.content }) .from(copilotMessages) @@ -93,7 +84,7 @@ async function loadCopilotChatMessages(chatId: string): Promise row.content as Record) + return rows.map((row) => row.content as PersistedMessage) } type CopilotChatAuthRow = Pick< @@ -298,10 +289,9 @@ export async function resolveOrCreateChat(params: { type: type ?? 'copilot', title: null, model, - messages: [], lastSeenAt: now, }) - .returning(copilotChatDetailReturningColumns) + .returning(copilotChatDetailColumns) if (!newChat) { logger.warn('Failed to create new copilot chat row', { userId, workflowId, workspaceId }) @@ -315,7 +305,7 @@ export async function resolveOrCreateChat(params: { return { chatId: newChat.id, - chat: newChat, + chat: { ...newChat, messages: [] }, conversationHistory: [], isNew: true, } diff --git a/apps/sim/lib/copilot/chat/messages-dual-write.ts b/apps/sim/lib/copilot/chat/messages-dual-write.ts deleted file mode 100644 index 54e98afe3a..0000000000 --- a/apps/sim/lib/copilot/chat/messages-dual-write.ts +++ /dev/null @@ -1,143 +0,0 @@ -import { db } from '@sim/db' -import { copilotMessages } from '@sim/db/schema' -import { createLogger } from '@sim/logger' -import { getErrorMessage } from '@sim/utils/errors' -import { and, eq, notInArray, sql } from 'drizzle-orm' -import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' - -const logger = createLogger('CopilotMessagesDualWrite') - -/** - * Keep the first occurrence of each message id. A single `INSERT ... ON - * CONFLICT` cannot touch the same conflict target twice, so a repeated id - * would otherwise throw. - */ -function dedupeById(messages: PersistedMessage[]): PersistedMessage[] { - const seen = new Set() - const out: PersistedMessage[] = [] - for (const m of messages) { - if (seen.has(m.id)) continue - seen.add(m.id) - out.push(m) - } - return out -} - -function toRow( - chatId: string, - message: PersistedMessage, - seq: number, - options?: { chatModel?: string | null; streamId?: string | null } -): typeof copilotMessages.$inferInsert { - const ts = new Date(message.timestamp) - return { - chatId, - messageId: message.id, - role: message.role, - content: message, - seq, - model: options?.chatModel ?? null, - streamId: options?.streamId ?? null, - createdAt: ts, - updatedAt: ts, - } -} - -/** - * Append messages to the new `copilot_messages` table. Best-effort — errors - * are logged but never thrown; the legacy `copilot_chats.messages` JSONB - * column stays the source of truth during the dual-write rollout. - * - * `seq` is `MAX(seq) + index`, computed in JS (not in SQL, where every row of - * a multi-row INSERT would read the same pre-insert MAX and collide). The - * read-then-insert is non-atomic, so interleaved appends to one chat can tie - * `seq`; that window is bounded by the cutover read order (`seq, created_at, - * id`) and `replaceCopilotChatMessages`, which re-densifies `seq` from the - * authoritative JSONB order on the next snapshot save. - */ -export async function appendCopilotChatMessages( - chatId: string, - messages: PersistedMessage[], - options?: { chatModel?: string | null; streamId?: string | null } -): Promise { - if (messages.length === 0) return - try { - const deduped = dedupeById(messages) - const [maxRow] = await db - .select({ maxSeq: sql`max(${copilotMessages.seq})` }) - .from(copilotMessages) - .where(eq(copilotMessages.chatId, chatId)) - const base = (maxRow?.maxSeq ?? -1) + 1 - await db - .insert(copilotMessages) - .values(deduped.map((m, i) => toRow(chatId, m, base + i, options))) - .onConflictDoUpdate({ - target: [copilotMessages.chatId, copilotMessages.messageId], - set: { - content: sql`excluded.content`, - role: sql`excluded.role`, - model: sql`COALESCE(excluded.model, ${copilotMessages.model})`, - streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`, - seq: sql`COALESCE(${copilotMessages.seq}, excluded.seq)`, - updatedAt: sql`now()`, - }, - }) - } catch (err) { - logger.warn('Failed to append copilot chat messages', { - chatId, - messageCount: messages.length, - error: getErrorMessage(err), - }) - } -} - -/** - * Replace all messages for a chat. Used by the update-messages endpoint that - * receives a full snapshot of the conversation state. Best-effort. - */ -export async function replaceCopilotChatMessages( - chatId: string, - messages: PersistedMessage[], - options?: { chatModel?: string | null } -): Promise { - try { - const deduped = dedupeById(messages) - const newMessageIds = deduped.map((m) => m.id) - await db.transaction(async (tx) => { - // Drop rows for messages not in the new snapshot. - await tx - .delete(copilotMessages) - .where( - newMessageIds.length > 0 - ? and( - eq(copilotMessages.chatId, chatId), - notInArray(copilotMessages.messageId, newMessageIds) - ) - : eq(copilotMessages.chatId, chatId) - ) - if (deduped.length === 0) return - // Snapshot is authoritative on order, so seq = array index is overwritten - // on conflict; stream_id / model are preserved via COALESCE. - await tx - .insert(copilotMessages) - .values(deduped.map((m, i) => toRow(chatId, m, i, options))) - .onConflictDoUpdate({ - target: [copilotMessages.chatId, copilotMessages.messageId], - set: { - content: sql`excluded.content`, - role: sql`excluded.role`, - model: sql`COALESCE(excluded.model, ${copilotMessages.model})`, - streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`, - seq: sql`excluded.seq`, - updatedAt: sql`now()`, - }, - }) - }) - } catch (err) { - logger.warn('Failed to replace copilot chat messages', { - chatId, - messageCount: messages.length, - error: getErrorMessage(err), - }) - } -} diff --git a/apps/sim/lib/copilot/chat/messages-dual-write.test.ts b/apps/sim/lib/copilot/chat/messages-store.test.ts similarity index 95% rename from apps/sim/lib/copilot/chat/messages-dual-write.test.ts rename to apps/sim/lib/copilot/chat/messages-store.test.ts index 17d3e1666c..a96cff1250 100644 --- a/apps/sim/lib/copilot/chat/messages-dual-write.test.ts +++ b/apps/sim/lib/copilot/chat/messages-store.test.ts @@ -9,7 +9,7 @@ vi.mock('@sim/db', () => dbChainMock) import { appendCopilotChatMessages, replaceCopilotChatMessages, -} from '@/lib/copilot/chat/messages-dual-write' +} from '@/lib/copilot/chat/messages-store' import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' const userMsg: PersistedMessage = { @@ -32,7 +32,7 @@ function lastValuesRows() { return calls[calls.length - 1][0] as Array> } -describe('messages-dual-write', () => { +describe('messages-store', () => { beforeEach(() => { vi.clearAllMocks() resetDbChainMock() @@ -124,10 +124,12 @@ describe('messages-dual-write', () => { expect(rows[0].messageId).toBe('msg-user-1') }) - it('swallows DB errors so the legacy JSONB write stays canonical', async () => { + it('propagates DB errors — copilot_messages is the sole store', async () => { dbChainMockFns.onConflictDoUpdate.mockRejectedValueOnce(new Error('connection lost')) - await expect(appendCopilotChatMessages('chat-1', [userMsg])).resolves.toBeUndefined() + await expect(appendCopilotChatMessages('chat-1', [userMsg])).rejects.toThrow( + 'connection lost' + ) }) }) @@ -185,10 +187,10 @@ describe('messages-dual-write', () => { expect(rows[0].model).toBe('gpt-4o-mini') }) - it('swallows DB errors so the legacy JSONB write stays canonical', async () => { + it('propagates DB errors — the snapshot is authoritative', async () => { dbChainMockFns.transaction.mockRejectedValueOnce(new Error('tx aborted')) - await expect(replaceCopilotChatMessages('chat-1', [userMsg])).resolves.toBeUndefined() + await expect(replaceCopilotChatMessages('chat-1', [userMsg])).rejects.toThrow('tx aborted') }) }) }) diff --git a/apps/sim/lib/copilot/chat/messages-store.ts b/apps/sim/lib/copilot/chat/messages-store.ts new file mode 100644 index 0000000000..485a76ead3 --- /dev/null +++ b/apps/sim/lib/copilot/chat/messages-store.ts @@ -0,0 +1,122 @@ +import { db } from '@sim/db' +import { copilotMessages } from '@sim/db/schema' +import { and, eq, notInArray, sql } from 'drizzle-orm' +import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' +import type { DbOrTx } from '@/lib/db/types' + +/** + * Keep the first occurrence of each message id. A single `INSERT ... ON + * CONFLICT` cannot touch the same conflict target twice, so a repeated id + * would otherwise throw. + */ +function dedupeById(messages: PersistedMessage[]): PersistedMessage[] { + const seen = new Set() + const out: PersistedMessage[] = [] + for (const m of messages) { + if (seen.has(m.id)) continue + seen.add(m.id) + out.push(m) + } + return out +} + +function toRow( + chatId: string, + message: PersistedMessage, + seq: number, + options?: { chatModel?: string | null; streamId?: string | null } +): typeof copilotMessages.$inferInsert { + const ts = new Date(message.timestamp) + return { + chatId, + messageId: message.id, + role: message.role, + content: message, + seq, + model: options?.chatModel ?? null, + streamId: options?.streamId ?? null, + createdAt: ts, + updatedAt: ts, + } +} + +/** + * Append messages to the `copilot_messages` table — the sole store for chat + * transcripts. Throws on failure (a swallowed write would lose messages). + * Pass `executor` to enlist the write in an existing transaction. + * + * `seq` is `MAX(seq) + index`, computed in JS. The read-then-insert is + * non-atomic, but per-chat appends are serialized by the pending-stream lock + * and the `seq, created_at, id` read order breaks any residual tie. + */ +export async function appendCopilotChatMessages( + chatId: string, + messages: PersistedMessage[], + options?: { chatModel?: string | null; streamId?: string | null }, + executor: DbOrTx = db +): Promise { + if (messages.length === 0) return + const deduped = dedupeById(messages) + const [maxRow] = await executor + .select({ maxSeq: sql`max(${copilotMessages.seq})` }) + .from(copilotMessages) + .where(eq(copilotMessages.chatId, chatId)) + const base = (maxRow?.maxSeq ?? -1) + 1 + await executor + .insert(copilotMessages) + .values(deduped.map((m, i) => toRow(chatId, m, base + i, options))) + .onConflictDoUpdate({ + target: [copilotMessages.chatId, copilotMessages.messageId], + set: { + content: sql`excluded.content`, + role: sql`excluded.role`, + model: sql`COALESCE(excluded.model, ${copilotMessages.model})`, + streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`, + seq: sql`COALESCE(${copilotMessages.seq}, excluded.seq)`, + updatedAt: sql`now()`, + }, + }) +} + +/** + * Replace all messages for a chat from a full snapshot (used by update-messages). + * Throws on failure. Pass `executor` to enlist the delete+insert in an existing + * transaction; otherwise it runs in its own. + */ +export async function replaceCopilotChatMessages( + chatId: string, + messages: PersistedMessage[], + options?: { chatModel?: string | null }, + executor?: DbOrTx +): Promise { + const deduped = dedupeById(messages) + const newMessageIds = deduped.map((m) => m.id) + const run = async (tx: DbOrTx) => { + await tx + .delete(copilotMessages) + .where( + newMessageIds.length > 0 + ? and( + eq(copilotMessages.chatId, chatId), + notInArray(copilotMessages.messageId, newMessageIds) + ) + : eq(copilotMessages.chatId, chatId) + ) + if (deduped.length === 0) return + await tx + .insert(copilotMessages) + .values(deduped.map((m, i) => toRow(chatId, m, i, options))) + .onConflictDoUpdate({ + target: [copilotMessages.chatId, copilotMessages.messageId], + set: { + content: sql`excluded.content`, + role: sql`excluded.role`, + model: sql`COALESCE(excluded.model, ${copilotMessages.model})`, + streamId: sql`COALESCE(excluded.stream_id, ${copilotMessages.streamId})`, + seq: sql`excluded.seq`, + updatedAt: sql`now()`, + }, + }) + } + await (executor ? run(executor) : db.transaction(run)) +} diff --git a/apps/sim/lib/copilot/chat/post.test.ts b/apps/sim/lib/copilot/chat/post.test.ts index 8b937704ac..c2271ece05 100644 --- a/apps/sim/lib/copilot/chat/post.test.ts +++ b/apps/sim/lib/copilot/chat/post.test.ts @@ -28,6 +28,7 @@ const { releasePendingChatStream, resolveOrCreateChat, finalizeAssistantTurn, + appendCopilotChatMessages, mockPublishStatusChanged, } = vi.hoisted(() => ({ getEffectiveDecryptedEnv: vi.fn(), @@ -41,6 +42,7 @@ const { releasePendingChatStream: vi.fn(), resolveOrCreateChat: vi.fn(), finalizeAssistantTurn: vi.fn(), + appendCopilotChatMessages: vi.fn(), mockPublishStatusChanged: vi.fn(), })) @@ -86,30 +88,40 @@ vi.mock('@/lib/copilot/chat/terminal-state', () => ({ finalizeAssistantTurn, })) +vi.mock('@/lib/copilot/chat/messages-store', () => ({ + appendCopilotChatMessages, +})) + vi.mock('@/lib/copilot/tasks', () => ({ taskPubSub: { publishStatusChanged: mockPublishStatusChanged, }, })) -vi.mock('@sim/db', () => ({ - db: { - update: vi.fn(() => ({ - set: vi.fn(() => ({ - where: vi.fn(() => ({ - returning: vi.fn().mockResolvedValue([]), - })), +vi.mock('@sim/db', () => { + const update = vi.fn(() => ({ + set: vi.fn(() => ({ + where: vi.fn(() => ({ + returning: vi.fn().mockResolvedValue([]), })), })), - select: vi.fn(() => ({ - from: vi.fn(() => ({ - where: vi.fn(() => ({ - limit: vi.fn().mockResolvedValue([{ permissionType: 'write' }]), - })), + })) + const select = vi.fn(() => ({ + from: vi.fn(() => ({ + where: vi.fn(() => ({ + limit: vi.fn().mockResolvedValue([{ permissionType: 'write' }]), })), })), - }, -})) + })) + return { + db: { + update, + select, + transaction: async (cb: (tx: { update: typeof update; select: typeof select }) => unknown) => + cb({ update, select }), + }, + } +}) vi.mock('drizzle-orm', () => ({ and: vi.fn(() => ({})), diff --git a/apps/sim/lib/copilot/chat/post.ts b/apps/sim/lib/copilot/chat/post.ts index 9c1d5950d0..31a0c5fce9 100644 --- a/apps/sim/lib/copilot/chat/post.ts +++ b/apps/sim/lib/copilot/chat/post.ts @@ -4,13 +4,13 @@ import { copilotChats, permissions } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { getErrorMessage } from '@sim/utils/errors' import { generateId } from '@sim/utils/id' -import { and, eq, sql } from 'drizzle-orm' +import { and, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' import { isZodError, validationErrorResponse } from '@/lib/api/server' import { getSession } from '@/lib/auth' import { type ChatLoadResult, resolveOrCreateChat } from '@/lib/copilot/chat/lifecycle' -import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write' +import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-store' import { buildCopilotRequestPayload } from '@/lib/copilot/chat/payload' import { buildPersistedAssistantMessage, @@ -293,7 +293,7 @@ async function persistUserMessage(params: { * span parented to the about-to-be-dropped Next.js HTTP span. */ parentOtelContext?: OtelContext -}): Promise { +}): Promise { const { chatId, userMessageId, @@ -304,7 +304,7 @@ async function persistUserMessage(params: { notifyWorkspaceStatus, parentOtelContext, } = params - if (!chatId) return undefined + if (!chatId) return return withCopilotSpan( TraceSpan.CopilotChatPersistUserMessage, @@ -326,31 +326,32 @@ async function persistUserMessage(params: { contexts, }) - const [updated] = await db - .update(copilotChats) - .set({ - messages: sql`${copilotChats.messages} || ${JSON.stringify([userMsg])}::jsonb`, - conversationId: userMessageId, - updatedAt: new Date(), - }) - .where(eq(copilotChats.id, chatId)) - .returning({ messages: copilotChats.messages, model: copilotChats.model }) + const updated = await db.transaction(async (tx) => { + const [row] = await tx + .update(copilotChats) + .set({ + conversationId: userMessageId, + updatedAt: new Date(), + }) + .where(eq(copilotChats.id, chatId)) + .returning({ model: copilotChats.model }) - if (updated) { - await appendCopilotChatMessages(chatId, [userMsg], { - streamId: userMessageId, - chatModel: updated.model ?? null, - }) - } + if (!row) return null - const messagesAfter = Array.isArray(updated?.messages) ? updated.messages : undefined - span.setAttributes({ - [TraceAttr.ChatPersistOutcome]: updated - ? CopilotChatPersistOutcome.Appended - : CopilotChatPersistOutcome.ChatNotFound, - [TraceAttr.ChatMessagesAfter]: messagesAfter?.length ?? 0, + await appendCopilotChatMessages( + chatId, + [userMsg], + { streamId: userMessageId, chatModel: row.model ?? null }, + tx + ) + return row }) + span.setAttribute( + TraceAttr.ChatPersistOutcome, + updated ? CopilotChatPersistOutcome.Appended : CopilotChatPersistOutcome.ChatNotFound + ) + if (notifyWorkspaceStatus && updated && workspaceId) { taskPubSub?.publishStatusChanged({ workspaceId, @@ -359,8 +360,6 @@ async function persistUserMessage(params: { streamId: userMessageId, }) } - - return messagesAfter }, parentOtelContext ) @@ -885,7 +884,7 @@ export async function handleUnifiedChatPost(req: NextRequest) { }), activeOtelRoot.context ) - const persistedMessagesPromise = persistUserMessage({ + const persistUserMessagePromise = persistUserMessage({ chatId: actualChatId, userMessageId, message: body.message, @@ -908,24 +907,17 @@ export async function handleUnifiedChatPost(req: NextRequest) { activeOtelRoot.context ) - const [agentContexts, userPermission, workspaceContext, persistedMessages, executionContext] = + const [agentContexts, userPermission, workspaceContext, , executionContext] = await Promise.all([ agentContextsPromise, userPermissionPromise, workspaceContextPromise, - persistedMessagesPromise, + persistUserMessagePromise, executionContextPromise, ]) executionContext.userPermission = userPermission ?? undefined - if (persistedMessages) { - conversationHistory = persistedMessages.filter((message) => { - const record = message as Record - return record.id !== userMessageId - }) - } - // buildPayload is the last synchronous step before the outbound // Sim → Go HTTP call. It runs per-tool schema generation (subscription // lookup + registry iteration, cached 30s) and file upload tracking diff --git a/apps/sim/lib/copilot/chat/terminal-state.test.ts b/apps/sim/lib/copilot/chat/terminal-state.test.ts index cf4a230bf3..7adb8e42ae 100644 --- a/apps/sim/lib/copilot/chat/terminal-state.test.ts +++ b/apps/sim/lib/copilot/chat/terminal-state.test.ts @@ -3,212 +3,145 @@ */ import { copilotChats } from '@sim/db/schema' +import { dbChainMock, dbChainMockFns, resetDbChainMock } from '@sim/testing' import { eq } from 'drizzle-orm' import { beforeEach, describe, expect, it, vi } from 'vitest' -const { - selectForUpdate, - selectLimit, - selectWhere, - selectFrom, - select, - updateWhere, - updateSet, - update, - transaction, -} = vi.hoisted(() => { - const selectLimit = vi.fn() - const selectForUpdate = vi.fn(() => ({ limit: selectLimit })) - const selectWhere = vi.fn(() => ({ for: selectForUpdate })) - const selectFrom = vi.fn(() => ({ where: selectWhere })) - const select = vi.fn(() => ({ from: selectFrom })) - - const updateWhere = vi.fn() - const updateSet = vi.fn(() => ({ where: updateWhere })) - const update = vi.fn(() => ({ set: updateSet })) - - const transaction = vi.fn( - (callback: (tx: { select: typeof select; update: typeof update }) => unknown) => - callback({ select, update }) - ) - - return { - selectForUpdate, - selectLimit, - selectWhere, - selectFrom, - select, - updateWhere, - updateSet, - update, - transaction, - } -}) +vi.mock('@sim/db', () => dbChainMock) + +const { mockAppendCopilotChatMessages } = vi.hoisted(() => ({ + mockAppendCopilotChatMessages: vi.fn(), +})) -vi.mock('@sim/db', () => ({ - db: { - transaction, - }, +vi.mock('@/lib/copilot/chat/messages-store', () => ({ + appendCopilotChatMessages: mockAppendCopilotChatMessages, })) import { finalizeAssistantTurn } from './terminal-state' +const assistantMessage = { + id: 'assistant-1', + role: 'assistant' as const, + content: 'hi', + timestamp: '2024-01-01T00:00:00.000Z', +} + +/** + * Sequence the two in-tx reads: the chat row (`FOR UPDATE ... LIMIT 1`) and the + * last-message lookup that drives dedup — both terminate on `.limit(1)`. + */ +function mockReads(opts: { + chat: Record | null + last?: { messageId: string; role: string } +}) { + dbChainMockFns.limit.mockResolvedValueOnce(opts.chat ? [opts.chat] : []) + dbChainMockFns.limit.mockResolvedValueOnce(opts.last ? [opts.last] : []) +} + describe('finalizeAssistantTurn', () => { beforeEach(() => { vi.clearAllMocks() - updateWhere.mockResolvedValue(undefined) + // Drain the once-queue (clearAllMocks/resetDbChainMock don't), then restore defaults. + dbChainMockFns.limit.mockReset() + resetDbChainMock() }) - it('appends the assistant message when the user turn is still last', async () => { - selectLimit.mockResolvedValue([ - { - messages: [{ id: 'user-1', role: 'user', content: 'hello' }], - conversationId: 'user-1', - workspaceId: 'ws-1', - }, - ]) + it('appends the assistant message when the user turn has no reply yet', async () => { + mockReads({ + chat: { conversationId: 'user-1', workspaceId: 'ws-1', model: null }, + last: { messageId: 'user-1', role: 'user' }, + }) - await finalizeAssistantTurn({ + const result = await finalizeAssistantTurn({ chatId: 'chat-1', userMessageId: 'user-1', - assistantMessage: { - id: 'assistant-1', - role: 'assistant', - content: 'hi', - timestamp: '2024-01-01T00:00:00.000Z', - }, + assistantMessage, }) - expect(updateSet).toHaveBeenCalledWith( - expect.objectContaining({ - updatedAt: expect.any(Date), - conversationId: null, - messages: expect.anything(), - }) + expect(result.appendedAssistant).toBe(true) + const updateArg = dbChainMockFns.set.mock.calls[0]?.[0] as Record + expect(updateArg).toEqual( + expect.objectContaining({ updatedAt: expect.any(Date), conversationId: null }) + ) + expect(Object.hasOwn(updateArg, 'messages')).toBe(false) + expect(dbChainMockFns.where).toHaveBeenCalledWith(eq(copilotChats.id, 'chat-1')) + expect(mockAppendCopilotChatMessages).toHaveBeenCalledTimes(1) + expect(mockAppendCopilotChatMessages).toHaveBeenCalledWith( + 'chat-1', + [assistantMessage], + { streamId: 'user-1', chatModel: null }, + expect.anything() ) - expect(updateWhere).toHaveBeenCalledWith(eq(copilotChats.id, 'chat-1')) }) it('only clears the active stream marker when a response is already persisted', async () => { - selectLimit.mockResolvedValue([ - { - messages: [ - { id: 'user-1', role: 'user', content: 'hello' }, - { id: 'assistant-1', role: 'assistant', content: 'partial' }, - ], - conversationId: 'user-1', - workspaceId: 'ws-1', - }, - ]) - - await finalizeAssistantTurn({ + mockReads({ + chat: { conversationId: 'user-1', workspaceId: 'ws-1', model: null }, + last: { messageId: 'assistant-1', role: 'assistant' }, + }) + + const result = await finalizeAssistantTurn({ chatId: 'chat-1', userMessageId: 'user-1', - assistantMessage: { - id: 'assistant-2', - role: 'assistant', - content: 'final', - timestamp: '2024-01-01T00:00:00.000Z', - }, + assistantMessage: { ...assistantMessage, id: 'assistant-2' }, }) - const updateCalls = updateSet.mock.calls as unknown as Array<[Record]> - const updateArg = updateCalls[0]?.[0] - expect(updateArg).toBeDefined() - if (!updateArg) { - throw new Error('Expected updateSet to be called') - } + expect(result.outcome).toBe('assistant_already_persisted') + const updateArg = dbChainMockFns.set.mock.calls[0]?.[0] as Record expect(updateArg).toEqual( - expect.objectContaining({ - updatedAt: expect.any(Date), - conversationId: null, - }) + expect.objectContaining({ updatedAt: expect.any(Date), conversationId: null }) ) expect(Object.hasOwn(updateArg, 'messages')).toBe(false) - expect(updateWhere).toHaveBeenCalledWith(eq(copilotChats.id, 'chat-1')) + expect(mockAppendCopilotChatMessages).not.toHaveBeenCalled() }) it('appends a stopped assistant when the stream marker was already cleared', async () => { - selectLimit.mockResolvedValue([ - { - messages: [{ id: 'user-1', role: 'user', content: 'hello' }], - conversationId: null, - workspaceId: 'ws-1', - }, - ]) + mockReads({ + chat: { conversationId: null, workspaceId: 'ws-1', model: null }, + last: { messageId: 'user-1', role: 'user' }, + }) const result = await finalizeAssistantTurn({ chatId: 'chat-1', userMessageId: 'user-1', streamMarkerPolicy: 'active-or-cleared', - assistantMessage: { - id: 'assistant-1', - role: 'assistant', - content: 'partial', - timestamp: '2024-01-01T00:00:00.000Z', - }, + assistantMessage, }) expect(result.appendedAssistant).toBe(true) - expect(updateSet).toHaveBeenCalledWith( - expect.objectContaining({ - updatedAt: expect.any(Date), - conversationId: null, - messages: expect.anything(), - }) - ) + expect(mockAppendCopilotChatMessages).toHaveBeenCalledTimes(1) }) it('does not append on a cleared marker unless the policy allows it', async () => { - selectLimit.mockResolvedValue([ - { - messages: [{ id: 'user-1', role: 'user', content: 'hello' }], - conversationId: null, - workspaceId: 'ws-1', - }, - ]) + mockReads({ chat: { conversationId: null, workspaceId: 'ws-1', model: null } }) const result = await finalizeAssistantTurn({ chatId: 'chat-1', userMessageId: 'user-1', - assistantMessage: { - id: 'assistant-1', - role: 'assistant', - content: 'partial', - timestamp: '2024-01-01T00:00:00.000Z', - }, + assistantMessage, }) expect(result.updated).toBe(false) - expect(updateSet).not.toHaveBeenCalled() + expect(dbChainMockFns.set).not.toHaveBeenCalled() + expect(mockAppendCopilotChatMessages).not.toHaveBeenCalled() }) it('reports already persisted when a cleared marker races with a duplicate stop', async () => { - selectLimit.mockResolvedValue([ - { - messages: [ - { id: 'user-1', role: 'user', content: 'hello' }, - { id: 'assistant-1', role: 'assistant', content: 'partial' }, - ], - conversationId: null, - workspaceId: 'ws-1', - }, - ]) + mockReads({ + chat: { conversationId: null, workspaceId: 'ws-1', model: null }, + last: { messageId: 'assistant-1', role: 'assistant' }, + }) const result = await finalizeAssistantTurn({ chatId: 'chat-1', userMessageId: 'user-1', streamMarkerPolicy: 'active-or-cleared', - assistantMessage: { - id: 'assistant-2', - role: 'assistant', - content: 'partial', - timestamp: '2024-01-01T00:00:00.000Z', - }, + assistantMessage: { ...assistantMessage, id: 'assistant-2' }, }) expect(result.updated).toBe(false) expect(result.outcome).toBe('assistant_already_persisted') - expect(updateSet).not.toHaveBeenCalled() + expect(dbChainMockFns.set).not.toHaveBeenCalled() + expect(mockAppendCopilotChatMessages).not.toHaveBeenCalled() }) }) diff --git a/apps/sim/lib/copilot/chat/terminal-state.ts b/apps/sim/lib/copilot/chat/terminal-state.ts index 8b4fdf11fe..5ff7886b42 100644 --- a/apps/sim/lib/copilot/chat/terminal-state.ts +++ b/apps/sim/lib/copilot/chat/terminal-state.ts @@ -1,7 +1,7 @@ import { db } from '@sim/db' -import { copilotChats } from '@sim/db/schema' -import { and, eq, sql } from 'drizzle-orm' -import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write' +import { copilotChats, copilotMessages } from '@sim/db/schema' +import { and, desc, eq, isNull, sql } from 'drizzle-orm' +import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-store' import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' import { CopilotChatFinalizeOutcome } from '@/lib/copilot/generated/trace-attribute-values-v1' import { TraceAttr } from '@/lib/copilot/generated/trace-attributes-v1' @@ -48,15 +48,12 @@ export async function finalizeAssistantTurn({ [TraceAttr.ChatHasAssistantMessage]: !!assistantMessage, }, async (span) => { - let appendedAssistantMessage: PersistedMessage | undefined - let chatModel: string | null = null const result = await db.transaction(async (tx) => { const where = userId ? and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId)) : eq(copilotChats.id, chatId) const [row] = await tx .select({ - messages: copilotChats.messages, conversationId: copilotChats.conversationId, workspaceId: copilotChats.workspaceId, model: copilotChats.model, @@ -65,10 +62,6 @@ export async function finalizeAssistantTurn({ .where(where) .for('update') .limit(1) - chatModel = row?.model ?? null - - const messages: Record[] = Array.isArray(row?.messages) ? row.messages : [] - span.setAttribute(TraceAttr.ChatExistingMessageCount, messages.length) if (!row) { return { @@ -80,6 +73,8 @@ export async function finalizeAssistantTurn({ } } + const chatModel = row.model ?? null + const markerMatches = row.conversationId === userMessageId const markerAlreadyCleared = row.conversationId === null const ownsTurn = @@ -94,13 +89,20 @@ export async function finalizeAssistantTurn({ } } - const userIdx = messages.findIndex((message) => message.id === userMessageId) - const alreadyHasResponse = - userIdx >= 0 && - userIdx + 1 < messages.length && - (messages[userIdx + 1] as Record)?.role === 'assistant' - const canAppendAssistant = - userIdx >= 0 && userIdx === messages.length - 1 && !alreadyHasResponse + // Append only when the user message is still the last row: anything + // after it means the turn already has a response (dedup under the lock). + const [lastMessage] = await tx + .select({ messageId: copilotMessages.messageId, role: copilotMessages.role }) + .from(copilotMessages) + .where(and(eq(copilotMessages.chatId, chatId), isNull(copilotMessages.deletedAt))) + .orderBy( + sql`${copilotMessages.seq} desc nulls last`, + desc(copilotMessages.createdAt), + desc(copilotMessages.id) + ) + .limit(1) + const canAppendAssistant = lastMessage?.messageId === userMessageId + const alreadyHasResponse = lastMessage?.role === 'assistant' const updateWhere = userId ? and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId)) @@ -111,14 +113,13 @@ export async function finalizeAssistantTurn({ } if (assistantMessage && canAppendAssistant) { - await tx - .update(copilotChats) - .set({ - ...baseUpdate, - messages: sql`${copilotChats.messages} || ${JSON.stringify([assistantMessage])}::jsonb`, - }) - .where(updateWhere) - appendedAssistantMessage = assistantMessage + await tx.update(copilotChats).set(baseUpdate).where(updateWhere) + await appendCopilotChatMessages( + chatId, + [assistantMessage], + { streamId: userMessageId, chatModel }, + tx + ) return { found: true, updated: true, @@ -136,9 +137,7 @@ export async function finalizeAssistantTurn({ appendedAssistant: false, workspaceId: row.workspaceId, outcome: assistantMessage - ? alreadyHasResponse - ? CopilotChatFinalizeOutcome.AssistantAlreadyPersisted - : CopilotChatFinalizeOutcome.StaleUserMessage + ? CopilotChatFinalizeOutcome.AssistantAlreadyPersisted : CopilotChatFinalizeOutcome.ClearedStreamMarkerOnly, } } @@ -154,13 +153,6 @@ export async function finalizeAssistantTurn({ } }) - if (appendedAssistantMessage) { - await appendCopilotChatMessages(chatId, [appendedAssistantMessage], { - streamId: userMessageId, - chatModel, - }) - } - span.setAttribute(TraceAttr.ChatFinalizeOutcome, result.outcome) return result } diff --git a/apps/sim/lib/copilot/vfs/workspace-vfs.ts b/apps/sim/lib/copilot/vfs/workspace-vfs.ts index 74fb0735a5..5823e1b644 100644 --- a/apps/sim/lib/copilot/vfs/workspace-vfs.ts +++ b/apps/sim/lib/copilot/vfs/workspace-vfs.ts @@ -1175,27 +1175,32 @@ export class WorkspaceVFS { .select({ id: copilotChats.id, title: copilotChats.title, - messageCount: sql`COALESCE(jsonb_array_length(${copilotChats.messages}), 0)`, + messageCount: sql`COALESCE(( + SELECT COUNT(*) FROM copilot_messages cm + WHERE cm.chat_id = ${copilotChats.id} AND cm.deleted_at IS NULL + ), 0)`, messages: sql`COALESCE(( SELECT jsonb_agg( jsonb_build_object( - 'role', m.value->>'role', - 'content', m.value->'content', + 'role', cm.content->>'role', + 'content', cm.content->'content', 'contentBlocks', COALESCE(( SELECT jsonb_agg(jsonb_build_object('type', 'text', 'content', b.value->'content') ORDER BY b.ord) FROM jsonb_array_elements( - CASE WHEN jsonb_typeof(m.value->'contentBlocks') = 'array' - THEN m.value->'contentBlocks' + CASE WHEN jsonb_typeof(cm.content->'contentBlocks') = 'array' + THEN cm.content->'contentBlocks' ELSE '[]'::jsonb END ) WITH ORDINALITY AS b(value, ord) WHERE b.value->>'type' = 'text' ), '[]'::jsonb) ) - ORDER BY m.ord + ORDER BY cm.seq ASC NULLS LAST, cm.created_at ASC, cm.id ASC ) - FROM jsonb_array_elements(${copilotChats.messages}) WITH ORDINALITY AS m(value, ord) - WHERE m.value->>'role' IN ('user', 'assistant') + FROM copilot_messages cm + WHERE cm.chat_id = ${copilotChats.id} + AND cm.deleted_at IS NULL + AND cm.content->>'role' IN ('user', 'assistant') ), '[]'::jsonb)`, createdAt: copilotChats.createdAt, updatedAt: copilotChats.updatedAt, diff --git a/apps/sim/lib/data-drains/sources/copilot-chats.ts b/apps/sim/lib/data-drains/sources/copilot-chats.ts index d1d25c2aaf..6bde4632dc 100644 --- a/apps/sim/lib/data-drains/sources/copilot-chats.ts +++ b/apps/sim/lib/data-drains/sources/copilot-chats.ts @@ -1,6 +1,6 @@ import { db } from '@sim/db' -import { copilotChats } from '@sim/db/schema' -import { and, inArray } from 'drizzle-orm' +import { copilotChats, copilotMessages } from '@sim/db/schema' +import { and, asc, inArray, isNull, sql } from 'drizzle-orm' import { decodeTimeCursor, encodeTimeCursor, @@ -10,7 +10,34 @@ import { import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers' import type { Cursor, DrainSource, SourcePageInput } from '@/lib/data-drains/types' -type CopilotChatRow = typeof copilotChats.$inferSelect +/** + * The transcript no longer lives on `copilot_chats.messages` — it is assembled + * per page from the normalized `copilot_messages` table, so `messages` is the + * ordered list of message `content` objects rather than the DB column. + */ +type CopilotChatRow = Omit & { + messages: unknown[] +} + +/** Chat metadata columns, excluding the legacy `messages` JSONB. */ +const chatColumns = { + id: copilotChats.id, + userId: copilotChats.userId, + workflowId: copilotChats.workflowId, + workspaceId: copilotChats.workspaceId, + type: copilotChats.type, + title: copilotChats.title, + model: copilotChats.model, + conversationId: copilotChats.conversationId, + previewYaml: copilotChats.previewYaml, + planArtifact: copilotChats.planArtifact, + config: copilotChats.config, + resources: copilotChats.resources, + lastSeenAt: copilotChats.lastSeenAt, + pinned: copilotChats.pinned, + createdAt: copilotChats.createdAt, + updatedAt: copilotChats.updatedAt, +} as const /** * Cursor is `createdAt` (immutable) but rows themselves are mutable — @@ -28,18 +55,42 @@ async function* pages(input: SourcePageInput): AsyncIterable { while (!input.signal.aborted) { const cursorClause = timeCursorPredicate(copilotChats.createdAt, copilotChats.id, cursor) - const rows = await db - .select() + const metaRows = await db + .select(chatColumns) .from(copilotChats) .where(and(inArray(copilotChats.workspaceId, workspaceIds), cursorClause)) .orderBy(...timeCursorOrderBy(copilotChats.createdAt, copilotChats.id)) .limit(input.chunkSize) - if (rows.length === 0) return + if (metaRows.length === 0) return + + const chatIds = metaRows.map((r) => r.id) + const messageRows = await db + .select({ chatId: copilotMessages.chatId, content: copilotMessages.content }) + .from(copilotMessages) + .where(and(inArray(copilotMessages.chatId, chatIds), isNull(copilotMessages.deletedAt))) + .orderBy( + asc(copilotMessages.chatId), + sql`${copilotMessages.seq} asc nulls last`, + asc(copilotMessages.createdAt), + asc(copilotMessages.id) + ) + const messagesByChat = new Map() + for (const m of messageRows) { + const existing = messagesByChat.get(m.chatId) + if (existing) existing.push(m.content) + else messagesByChat.set(m.chatId, [m.content]) + } + + const rows: CopilotChatRow[] = metaRows.map((r) => ({ + ...r, + messages: messagesByChat.get(r.id) ?? [], + })) + yield rows - const last = rows[rows.length - 1] + const last = metaRows[metaRows.length - 1] cursor = { ts: last.createdAt.toISOString(), id: last.id } - if (rows.length < input.chunkSize) return + if (metaRows.length < input.chunkSize) return } } diff --git a/apps/sim/lib/mothership/inbox/executor.ts b/apps/sim/lib/mothership/inbox/executor.ts index d677e13124..67ac529663 100644 --- a/apps/sim/lib/mothership/inbox/executor.ts +++ b/apps/sim/lib/mothership/inbox/executor.ts @@ -4,7 +4,7 @@ import { getErrorMessage } from '@sim/utils/errors' import { generateId } from '@sim/utils/id' import { and, eq, sql } from 'drizzle-orm' import { resolveOrCreateChat } from '@/lib/copilot/chat/lifecycle' -import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-dual-write' +import { appendCopilotChatMessages } from '@/lib/copilot/chat/messages-store' import { buildIntegrationToolSchemas } from '@/lib/copilot/chat/payload' import { buildPersistedAssistantMessage, @@ -227,7 +227,6 @@ export async function executeInboxTask(taskId: string): Promise { if (chatId) { await persistChatMessages( chatId, - userId, userMessageId, messageContent, { @@ -328,7 +327,6 @@ async function resolveUserId( */ async function persistChatMessages( chatId: string, - userId: string, userMessageId: string, userContent: string, result: OrchestratorResult, @@ -343,22 +341,24 @@ async function persistChatMessages( const assistantMessage = buildPersistedAssistantMessage(result) - const newMessages = JSON.stringify([userMessage, assistantMessage]) - const [updated] = await db - .update(copilotChats) - .set({ - messages: sql`COALESCE(${copilotChats.messages}, '[]'::jsonb) || ${newMessages}::jsonb`, - updatedAt: new Date(), - }) - .where(eq(copilotChats.id, chatId)) - .returning({ model: copilotChats.model }) - if (updated) { - await appendCopilotChatMessages(chatId, [userMessage, assistantMessage], { - chatModel: updated.model ?? null, - }) - } + // Best-effort: the email response is the primary deliverable, so a failure + // here is logged (in the catch below) rather than failing the task. + await db.transaction(async (tx) => { + const [updated] = await tx + .update(copilotChats) + .set({ updatedAt: new Date() }) + .where(eq(copilotChats.id, chatId)) + .returning({ model: copilotChats.model }) + if (!updated) return + await appendCopilotChatMessages( + chatId, + [userMessage, assistantMessage], + { chatModel: updated.model ?? null }, + tx + ) + }) } catch (err) { - logger.warn('Failed to persist chat messages', { + logger.error('Failed to persist chat messages', { chatId, error: getErrorMessage(err, 'Unknown error'), })