diff --git a/lib/sea/SeaNativeLoader.ts b/lib/sea/SeaNativeLoader.ts index 42f6ceb7..5d1d5ce6 100644 --- a/lib/sea/SeaNativeLoader.ts +++ b/lib/sea/SeaNativeLoader.ts @@ -64,6 +64,49 @@ export interface SeaNativeStatement { close(): Promise; } +/** + * Server-side execution status returned by `AsyncStatement.status()`. + * Mirrors the kernel `StatementStatus` enum collapsed to its variant + * name. `'Unknown'` is the forward-compat arm for kernel variants the + * binding doesn't recognise. + */ +export type SeaNativeStatementStatus = + | 'Pending' + | 'Running' + | 'Succeeded' + | 'Failed' + | 'Cancelled' + | 'Closed' + | 'Unknown'; + +/** + * Typed surface for the opaque napi `AsyncResultHandle`. Returned by + * `AsyncStatement.awaitResult()`; same fetch-side surface as + * `SeaNativeStatement` but without `cancel()` / `close()` (the parent + * `AsyncStatement` owns server-side lifecycle). + */ +export interface SeaNativeAsyncResultHandle { + readonly statementId: string; + fetchNextBatch(): Promise; + schema(): Promise; +} + +/** + * Typed surface for the opaque napi `AsyncStatement`. Returned by + * `Connection.submitStatement(...)`. The kernel submits with + * `wait_timeout=0s`, so the server returns a `statementId` while the + * query is still Pending/Running; JS drives polling via `status()` + * and materialises results with `awaitResult()`. This is the + * async-execution path the Thrift backend always uses (`runAsync`). + */ +export interface SeaNativeAsyncStatement { + readonly statementId: string; + status(): Promise; + awaitResult(): Promise; + cancel(): Promise; + close(): Promise; +} + /** * Typed surface for the opaque napi `Connection` handle. Signatures * match `native/sea/index.d.ts` exactly as generated by napi-rs from @@ -118,6 +161,18 @@ export interface SeaNativeConnection { */ executeStatement(sql: string, options?: SeaNativeExecuteOptions): Promise; + /** + * Submit a SQL statement asynchronously and return an + * `AsyncStatement` handle without blocking until the query + * finishes. The kernel sends `wait_timeout=0s`, so the server + * responds as soon as it has a `statementId` (Pending/Running); + * drive polling via the handle's `status()` / `awaitResult()`. + * Same option semantics as `executeStatement`; only the + * pending-vs-blocking return contract differs. This is the + * async-execution path the Thrift backend always uses. + */ + submitStatement(sql: string, options?: SeaNativeExecuteOptions): Promise; + // ── Metadata methods ────────────────────────────────────────────────── /** All catalogs visible to the session. */ listCatalogs(): Promise; diff --git a/lib/sea/SeaOperationBackend.ts b/lib/sea/SeaOperationBackend.ts index 005f3170..e8e11ef0 100644 --- a/lib/sea/SeaOperationBackend.ts +++ b/lib/sea/SeaOperationBackend.ts @@ -15,25 +15,39 @@ /** * `IOperationBackend` implementation for the SEA path. * - * Combines: - * - **Fetch pipeline (from sea-results):** - * `napi.Statement.fetchNextBatch()` → `SeaResultsProvider` → - * `ArrowResultConverter` (Phase 1 + Phase 2; reused unchanged) → - * `ResultSlicer` (chunk-size normalisation; reused unchanged). The M0 - * row shape is byte-identical to the thrift path for every M0 - * datatype (parity gate exercised by `tests/integration/sea/results-e2e.test.ts`). + * Two construction shapes, sharing one fetch/lifecycle surface: * - * - **Lifecycle (from sea-operation):** `cancel()` / `close()` / - * `finished()` (alias of `waitUntilReady`) delegate to the helpers - * in `SeaOperationLifecycle.ts`. The helpers handle idempotency, - * flag-set-before-await ordering (so cancel-mid-fetch propagates), - * logging via `IClientContext`, and kernel-error mapping. + * - **Async query path (`asyncStatement`)** — `executeStatement` submits + * with the kernel's `wait_timeout=0s` and hands back a pending + * `AsyncStatement`. `waitUntilReady()` polls `status()` to a terminal + * state (firing the progress callback each tick, exactly like the + * Thrift backend's `getOperationStatus` loop), then materialises the + * result stream via `awaitResult()`. This is true Thrift-parity + * async execution: `status()` reports real Pending/Running/Succeeded + * states and a long-running query can be cancelled mid-flight. * - * The lifecycle helpers route fetch-after-cancel / fetch-after-close - * through `failIfNotActive`, which throws an `OperationStateError` - * matching the Thrift `failIfClosed` semantics. We call it from - * `fetchChunk`/`hasMore`/`getResultMetadata` so the cancel-mid-fetch - * e2e (cancel < 200ms) drives against this backend cleanly. + * The JS-side poll loop (rather than a single blocking `awaitResult()`) + * is what keeps `cancel()` responsive: the kernel `AsyncStatement` + * serialises its methods behind one mutex, so a single in-flight + * `awaitResult()` would hold that mutex for the whole query and queue + * `cancel()` behind it. Polling `status()` releases the mutex between + * ticks, leaving gaps for `cancel()` to land. + * + * - **Blocking metadata path (`statement`)** — the metadata methods + * (`listCatalogs`, `listTypeInfo`, …) return a kernel `Statement` + * that has already run to a terminal state, so there is nothing to + * poll: `waitUntilReady()` resolves immediately (one synthesized + * FINISHED tick) and the handle itself is the result source. + * + * Fetch pipeline (shared): `fetchNextBatch()` → `SeaResultsProvider` → + * `ArrowResultConverter` → `ResultSlicer`, byte-identical to the Thrift + * path for every datatype. + * + * Lifecycle (shared): `cancel()` / `close()` delegate to the helpers in + * `SeaOperationLifecycle.ts` (idempotency, flag-set-before-await + * ordering, kernel-error mapping). `failIfNotActive` routes + * fetch-after-cancel / fetch-after-close through an `OperationStateError` + * matching the Thrift `failIfClosed` semantics. */ import { v4 as uuidv4 } from 'uuid'; @@ -48,11 +62,19 @@ import { import IOperationBackend from '../contracts/IOperationBackend'; import IClientContext from '../contracts/IClientContext'; import Status from '../dto/Status'; +import OperationStateError, { OperationStateErrorCode } from '../errors/OperationStateError'; import ArrowResultConverter from '../result/ArrowResultConverter'; import ResultSlicer from '../result/ResultSlicer'; import SeaResultsProvider from './SeaResultsProvider'; import { arrowSchemaToThriftSchema, decodeIpcSchema } from './SeaArrowIpc'; -import { SeaNativeStatement } from './SeaNativeLoader'; +import { + SeaNativeStatement, + SeaNativeAsyncStatement, + SeaNativeStatementStatus, + SeaArrowSchema, + SeaArrowBatch, +} from './SeaNativeLoader'; +import { decodeNapiKernelError } from './SeaErrorMapping'; import { SeaStatementHandle, SeaOperationLifecycleState, @@ -63,6 +85,20 @@ import { failIfNotActive, } from './SeaOperationLifecycle'; +/** + * Server-status poll cadence for the async path, in milliseconds. + * Matches the Thrift backend's `waitUntilReady` `delay(100)` so the + * two backends place the same GetStatementStatus / getOperationStatus + * load on the server for the same query. + */ +const STATUS_POLL_INTERVAL_MS = 100; + +function delay(ms: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +} + /** * Structural union of the lifecycle surface (cancel/close) and the * fetch surface (fetchNextBatch/schema). The real napi `Statement` @@ -73,23 +109,79 @@ import { export type SeaOperationStatement = SeaStatementHandle & Partial; /** - * Constructor options for `SeaOperationBackend`. + * Minimal result-fetch surface shared by the async `AsyncResultHandle` + * (from `awaitResult()`) and the blocking metadata `Statement`. Both + * expose `schema()` + `fetchNextBatch()`; only this slice is consumed + * by the fetch pipeline. + */ +interface SeaFetchHandle { + schema(): Promise; + fetchNextBatch(): Promise; +} + +/** + * Constructor options for `SeaOperationBackend`. Exactly one of + * `asyncStatement` (query path) or `statement` (metadata path) must be + * provided. */ export interface SeaOperationBackendOptions { - /** The opaque napi `Statement` handle returned by `Connection.executeStatement(...)`. */ - statement: SeaOperationStatement; + /** + * The pending napi `AsyncStatement` returned by + * `Connection.submitStatement(...)`. Async query path. + */ + asyncStatement?: SeaNativeAsyncStatement; + /** + * The terminal napi `Statement` returned by a metadata method + * (`listCatalogs`, `listTypeInfo`, …). Blocking metadata path. + */ + statement?: SeaOperationStatement; context: IClientContext; /** * Optional override for `id`. When not provided a fresh UUIDv4 is - * generated upstream (in `SeaSessionBackend.executeStatement`); the - * kernel does not yet surface its internal statement-id at the napi - * boundary. Once it does, the JS layer can thread it through here. + * generated. For the async path the kernel surfaces a real + * server-issued `statementId`, which is used as the id when no + * explicit override is given. */ id?: string; } +/** Map the kernel's `StatementStatus` variant name to a Thrift `TOperationState`. */ +function statusToOperationState(status: SeaNativeStatementStatus): TOperationState { + switch (status) { + case 'Pending': + return TOperationState.PENDING_STATE; + case 'Running': + return TOperationState.RUNNING_STATE; + case 'Succeeded': + return TOperationState.FINISHED_STATE; + case 'Failed': + return TOperationState.ERROR_STATE; + case 'Cancelled': + return TOperationState.CANCELED_STATE; + case 'Closed': + return TOperationState.CLOSED_STATE; + case 'Unknown': + default: + return TOperationState.UKNOWN_STATE; + } +} + +/** Synthesize the Thrift status response shape from an operation state. */ +function synthesizeStatus(operationState: TOperationState): TGetOperationStatusResp { + return { + status: { statusCode: TStatusCode.SUCCESS_STATUS }, + operationState, + hasResultSet: true, + }; +} + export default class SeaOperationBackend implements IOperationBackend { - private readonly statement: SeaOperationStatement; + private readonly asyncStatement?: SeaNativeAsyncStatement; + + private readonly blockingStatement?: SeaOperationStatement; + + /** cancel/close target — the async handle or the blocking statement. */ + private readonly lifecycleHandle: SeaStatementHandle; private readonly context: IClientContext; @@ -105,10 +197,24 @@ export default class SeaOperationBackend implements IOperationBackend { private metadataPromise?: Promise; - constructor({ statement, context, id }: SeaOperationBackendOptions) { - this.statement = statement; + /** + * Memoised result-fetch handle. For the async path this is the + * `awaitResult()` promise (resolved once the statement is terminal); + * for the metadata path it resolves to the blocking statement itself. + */ + private fetchHandlePromise?: Promise; + + constructor({ asyncStatement, statement, context, id }: SeaOperationBackendOptions) { + if ((asyncStatement === undefined) === (statement === undefined)) { + throw new Error( + 'SeaOperationBackend: exactly one of `asyncStatement` or `statement` must be provided', + ); + } + this.asyncStatement = asyncStatement; + this.blockingStatement = statement; + this.lifecycleHandle = (asyncStatement ?? statement) as SeaStatementHandle; this.context = context; - this._id = id ?? uuidv4(); + this._id = id ?? asyncStatement?.statementId ?? uuidv4(); } public get id(): string { @@ -116,16 +222,14 @@ export default class SeaOperationBackend implements IOperationBackend { } public get hasResultSet(): boolean { - // M0 only routes through SeaOperationBackend for executeStatement - // calls. DDL/DML without a result set is not exercised through SEA - // for M0; the napi Statement still produces a schema (empty) in - // that case, which the converter renders as zero rows. Reporting - // `true` keeps the facade's fetch path enabled for M0 parity. + // The kernel statement always produces a schema (empty for DDL/DML), + // which the converter renders as zero rows. Reporting `true` keeps + // the facade's fetch path enabled for parity with the Thrift backend. return true; } // --------------------------------------------------------------------------- - // Fetch / metadata (owned by the sea-results pipeline). + // Fetch / metadata. // --------------------------------------------------------------------------- public async fetchChunk({ @@ -157,10 +261,8 @@ export default class SeaOperationBackend implements IOperationBackend { return this.metadataPromise; } this.metadataPromise = (async () => { - if (!this.statement.schema) { - throw new Error('SeaOperationBackend: statement.schema() is not available on this handle'); - } - const arrowSchemaIpc = await this.statement.schema(); + const handle = await this.getFetchHandle(); + const arrowSchemaIpc = await handle.schema(); const arrowSchema = decodeIpcSchema(arrowSchemaIpc.ipcBytes); const thriftSchema: TTableSchema = arrowSchemaToThriftSchema(arrowSchema); const meta: TGetResultSetMetadataResp = { @@ -184,72 +286,177 @@ export default class SeaOperationBackend implements IOperationBackend { } // --------------------------------------------------------------------------- - // Status / lifecycle (owned by the sea-operation lifecycle helpers). + // Status / lifecycle. // --------------------------------------------------------------------------- public async status(_progress: boolean): Promise { - // Synthesised — kernel only surfaces terminal-or-running statements - // through its public API; we report CANCELED/CLOSED if the lifecycle - // flag is set, else FINISHED. Matches the Thrift status shape so - // facade-level callers see consistent telemetry across backends. + // JS-initiated lifecycle wins — it may be ahead of the server. if (this.lifecycle.isCancelled) { - return { - status: { statusCode: TStatusCode.SUCCESS_STATUS }, - operationState: TOperationState.CANCELED_STATE, - hasResultSet: true, - }; + return synthesizeStatus(TOperationState.CANCELED_STATE); } if (this.lifecycle.isClosed) { - return { - status: { statusCode: TStatusCode.SUCCESS_STATUS }, - operationState: TOperationState.CLOSED_STATE, - hasResultSet: true, - }; + return synthesizeStatus(TOperationState.CLOSED_STATE); } - return { - status: { statusCode: TStatusCode.SUCCESS_STATUS }, - operationState: TOperationState.FINISHED_STATE, - hasResultSet: true, - }; + if (this.asyncStatement) { + // Real server status — single GetStatementStatus RPC, no polling. + const state = await this.asyncStatement.status(); + return synthesizeStatus(statusToOperationState(state)); + } + // Blocking metadata path: the statement is already terminal. + return synthesizeStatus(TOperationState.FINISHED_STATE); } public async waitUntilReady(options?: { progress?: boolean; callback?: (progress: TGetOperationStatusResp) => unknown; }): Promise { - // Kernel's `Statement::execute().await` has already resolved by the - // time we hold a Statement handle — there is no pending/running - // state to poll for M0. seaFinished fires the progress callback - // once with a synthesised FINISHED response so progress-UI callers - // see the same one-shot completion tick the Thrift path emits at - // the end of its polling loop. + if (this.asyncStatement) { + return this.waitUntilReadyAsync(options); + } + // Blocking metadata path: the kernel statement has already resolved, + // so there is nothing to poll. Fire the progress callback once with + // a synthesized FINISHED tick, matching the Thrift path's final tick. return seaFinished(this.lifecycle, options); } public async cancel(): Promise { - return seaCancel(this.lifecycle, this.statement, this.context, this._id); + return seaCancel(this.lifecycle, this.lifecycleHandle, this.context, this._id); } public async close(): Promise { - return seaClose(this.lifecycle, this.statement, this.context, this._id); + return seaClose(this.lifecycle, this.lifecycleHandle, this.context, this._id); } // --------------------------------------------------------------------------- // Internals. // --------------------------------------------------------------------------- + /** + * Poll the kernel `AsyncStatement` to a terminal state, mirroring the + * Thrift backend's `getOperationStatus` loop. Fires the progress + * callback each tick; on FINISHED, materialises the result stream + * (so the first fetch is free); on a bad terminal state, throws the + * same `OperationStateError` the Thrift path raises. + */ + private async waitUntilReadyAsync(options?: { + progress?: boolean; + callback?: (progress: TGetOperationStatusResp) => unknown; + }): Promise { + // Already materialised → terminal-and-ready, nothing to wait for. + if (this.fetchHandlePromise) { + return; + } + + for (;;) { + // JS-initiated cancel/close short-circuits before the next poll. + failIfNotActive(this.lifecycle); + + // eslint-disable-next-line no-await-in-loop + const state = await this.asyncStatement!.status(); + const operationState = statusToOperationState(state); + + if (options?.callback) { + // eslint-disable-next-line no-await-in-loop + await Promise.resolve(options.callback(synthesizeStatus(operationState))); + } + + switch (operationState) { + case TOperationState.INITIALIZED_STATE: + case TOperationState.PENDING_STATE: + case TOperationState.RUNNING_STATE: + break; + + case TOperationState.FINISHED_STATE: + // Materialise the result stream now so the first fetch/metadata + // call doesn't pay an extra await_result round-trip. + // eslint-disable-next-line no-await-in-loop + await this.getFetchHandle(); + return; + + case TOperationState.CANCELED_STATE: + throw new OperationStateError( + OperationStateErrorCode.Canceled, + synthesizeStatus(operationState), + ); + + case TOperationState.CLOSED_STATE: + throw new OperationStateError( + OperationStateErrorCode.Closed, + synthesizeStatus(operationState), + ); + + case TOperationState.ERROR_STATE: + // `status()` collapses Failed to the variant name only; the + // real SQL-error envelope (sql_state / error_code / query_id) + // rides on `awaitResult()`'s rejection. Surface that. + // eslint-disable-next-line no-await-in-loop + await this.throwAsyncError(synthesizeStatus(operationState)); + break; + + case TOperationState.TIMEDOUT_STATE: + throw new OperationStateError( + OperationStateErrorCode.Timeout, + synthesizeStatus(operationState), + ); + + case TOperationState.UKNOWN_STATE: + default: + throw new OperationStateError( + OperationStateErrorCode.Unknown, + synthesizeStatus(operationState), + ); + } + + // eslint-disable-next-line no-await-in-loop + await delay(STATUS_POLL_INTERVAL_MS); + } + } + + /** + * Drive `awaitResult()` to extract the kernel's typed error envelope + * for a Failed statement and re-throw it decoded. Falls back to a + * generic `OperationStateError` if `awaitResult()` unexpectedly + * resolves. + */ + private async throwAsyncError(response: TGetOperationStatusResp): Promise { + try { + await this.asyncStatement!.awaitResult(); + } catch (err) { + throw decodeNapiKernelError(err); + } + throw new OperationStateError(OperationStateErrorCode.Error, response); + } + + /** + * Resolve (and memoise) the result-fetch handle. Async path: the + * `awaitResult()` stream; metadata path: the blocking statement itself. + */ + private getFetchHandle(): Promise { + if (!this.fetchHandlePromise) { + if (this.asyncStatement) { + this.fetchHandlePromise = this.asyncStatement.awaitResult(); + } else { + const stmt = this.blockingStatement!; + if (!stmt.schema || !stmt.fetchNextBatch) { + return Promise.reject( + new Error('SeaOperationBackend: fetch surface is not available on this handle'), + ); + } + this.fetchHandlePromise = Promise.resolve(stmt as SeaFetchHandle); + } + } + return this.fetchHandlePromise; + } + private async getResultSlicer(): Promise> { if (this.resultSlicer) { return this.resultSlicer; } - if (!this.statement.fetchNextBatch) { - throw new Error('SeaOperationBackend: statement.fetchNextBatch() is not available on this handle'); - } const metadata = await this.getResultMetadata(); - // The lifecycle subset has cancel/close only; fetch methods exist on - // the full napi Statement. Cast is safe here because we've just - // verified `fetchNextBatch` is callable. - this.resultsProvider = new SeaResultsProvider(this.statement as SeaNativeStatement); + const handle = await this.getFetchHandle(); + // SeaResultsProvider consumes only `fetchNextBatch`; the fetch handle + // (async result handle or blocking statement) satisfies that slice. + this.resultsProvider = new SeaResultsProvider(handle as unknown as SeaNativeStatement); const converter = new ArrowResultConverter(this.context, this.resultsProvider, metadata); this.resultSlicer = new ResultSlicer(this.context, converter); return this.resultSlicer; diff --git a/lib/sea/SeaSessionBackend.ts b/lib/sea/SeaSessionBackend.ts index bd4ad59d..f4208054 100644 --- a/lib/sea/SeaSessionBackend.ts +++ b/lib/sea/SeaSessionBackend.ts @@ -181,16 +181,24 @@ export default class SeaSessionBackend implements ISessionBackend { } const hasOptions = Object.keys(nativeOptions).length > 0; - let nativeStatement; + // Submit asynchronously (kernel `wait_timeout=0s`): the server + // returns a pending `AsyncStatement` handle immediately while the + // query runs, exactly like the Thrift backend's always-async + // (`runAsync: true`) path. `SeaOperationBackend` polls `status()` + // to terminal in `waitUntilReady()` and materialises results via + // `awaitResult()`, so a long-running query can be cancelled + // mid-flight and `status()` reports real Pending/Running/Succeeded + // states — parity the blocking `executeStatement()` path can't offer. + let asyncStatement; try { - nativeStatement = hasOptions - ? await this.connection.executeStatement(statement, nativeOptions) - : await this.connection.executeStatement(statement); + asyncStatement = hasOptions + ? await this.connection.submitStatement(statement, nativeOptions) + : await this.connection.submitStatement(statement); } catch (err) { throw decodeNapiKernelError(err); } return new SeaOperationBackend({ - statement: nativeStatement, + asyncStatement, context: this.context, }); } diff --git a/native/sea/index.d.ts b/native/sea/index.d.ts index 85b82edf..076c0f37 100644 --- a/native/sea/index.d.ts +++ b/native/sea/index.d.ts @@ -4,16 +4,138 @@ /* auto-generated by NAPI-RS */ /** - * JS-visible options for opening a Databricks SQL session over PAT. + * Per-statement options for `Connection.executeStatement`. * - * M0 supports PAT only — `token` is required. OAuth M2M / U2M variants - * land in M1 along with a discriminated-union shape on the JS side. + * Mirrors the kernel `StatementSpec` knobs that are safe to thread + * through napi without a kernel-side change. Today this covers: + * - `statementConf` — per-statement Spark conf overlay + * (`StatementSpec.statement_conf` → SEA `parameters` / + * Thrift `confOverlay`) + * - `queryTags` — convenience wrapper over `statementConf` with + * key `query_tags`; serialised to the same comma-separated + * `key:value` wire shape NodeJS Thrift's `serializeQueryTags` + * produces (`lib/utils/queryTags.ts`). Backslashes in keys are + * doubled; backslash/colon/comma in values are backslash-escaped. + * + * `rowLimit` (SEA `row_limit`) and `queryTimeoutSecs` (the per-statement + * server wait timeout) are exposed here and threaded onto the kernel + * `StatementSpec`. `positionalParams` (`?`) and `namedParams` (`:name`) + * carry bound query parameters, decoded via `params::parse_typed_value`. + * + * **Tag-order caveat (M4 parity note).** The napi `queryTags` field + * is a Rust `HashMap` whose iteration order is + * non-deterministic, so the serialised `query_tags` value may have + * a different key order than Thrift's `serializeQueryTags` (which + * iterates `Object.keys(...)` in insertion order) for the same + * input. The SEA server is order-insensitive on conf values, so + * the two are functionally equivalent. If a caller needs + * byte-identical Thrift parity, the JS adapter pre-serialises via + * `serializeQueryTags` and writes the result into + * `statementConf["query_tags"]` directly — see + * `SeaSessionBackend.executeStatement` in the NodeJS driver. This + * path is the one the production code uses. + */ +export interface ExecuteOptions { + /** + * Per-statement Spark conf overlay. Merged on top of the + * session-level `sessionConf` at execute time; this map wins + * on key collisions. Unknown keys are rejected by the server. + */ + statementConf?: Record + /** + * Query tags as key→value pairs. Serialised to a comma- + * separated `key:value` string (backslash-escaping `\`, `:`, + * `,`) and placed into `statementConf["query_tags"]`, matching + * NodeJS Thrift's `serializeQueryTags` wire shape. Passing + * both `queryTags` AND a `query_tags` key in `statementConf` + * raises `InvalidArgument` — the caller's intent is ambiguous + * so we refuse to silently pick one over the other. + * + * See the struct-level "Tag-order caveat" for the + * HashMap-iteration-order vs `Object.keys`-iteration-order + * divergence and the byte-identical-Thrift-parity workaround. + */ + queryTags?: Record + /** + * Server-side cap on the number of rows this statement returns + * (SEA `row_limit`), independent of any SQL `LIMIT`. Maps to + * `StatementSpec.row_limit`. Omitted ⇒ no driver-imposed cap. + */ + rowLimit?: number + /** + * Per-statement server wait timeout in whole seconds. Bounds how + * long the server waits before cancelling the statement + * (`on_wait_timeout = CANCEL`), surfacing as a timeout — the + * server statement timeout (JDBC `setQueryTimeout`). Maps to + * `StatementSpec.query_timeout_secs`. Distinct from the + * connection-level transport timeout. The SEA wire caps it at 50s. + */ + queryTimeoutSecs?: number + /** + * Positional parameters, in 1-based wire order. Index `i` in this + * Vec corresponds to the `i+1`-th `?` placeholder in the SQL. + * Each entry is a `{ sqlType, value }` pair — `value` is the + * string-encoded literal or `null` for SQL NULL. Mirrors + * `StatementSpec::positional_params`; decoded via [`parse_typed_value`]. + */ + positionalParams?: Array + /** + * Named parameters (`:name` placeholders). Each carries its `name` + * alongside the `{ sqlType, value? }` pair. Mapped to a kernel + * `TypedValue` via the same [`parse_typed_value`] codec and bound with + * `StatementSpec::param_named`. Named is the SEA-spec-required public + * param form (`StatementParameter.name` is `openapi_required`); + * positional is the documented-undocumented variant. The two are + * mutually exclusive at the SQL level (`?` vs `:name`). + */ + namedParams?: Array +} +/** + * A named bound parameter — a [`TypedValueInput`] plus its `:name`. Kept a + * distinct napi object (rather than an optional `name` on `TypedValueInput`) + * so the positional surface stays a clean ordered list with no name field. + */ +export interface NamedTypedValueInput { + name: string + sqlType: string + value?: string +} +/** + * Authentication mode selector crossing the napi boundary. The string + * literals are what napi-rs emits from this `#[napi(string_enum)]` — the + * NodeJS SEA adapter (`SeaAuth`) matches them verbatim (`'Pat'`, + * `'OAuthM2m'`, `'OAuthU2m'`). + * + * Mirrors the kernel [`AuthConfig`] variants this binding supports. + * `OAuthFederation` / `External` are intentionally not exposed yet — the + * kernel marks federation as not-yet-implemented and `External` is a + * Rust-trait escape hatch with no JS-callback bridge. + */ +export const enum AuthMode { + /** Personal access token (`token`). */ + Pat = 'Pat', + /** OAuth 2.0 machine-to-machine — `oauthClientId` + `oauthClientSecret`. */ + OAuthM2m = 'OAuthM2m', + /** + * OAuth 2.0 user-to-machine (browser flow) — optional `oauthClientId` + * + `oauthRedirectPort`. + */ + OAuthU2m = 'OAuthU2m' +} +/** + * JS-visible options for opening a Databricks SQL session. + * + * Authentication is selected by `authMode` (default [`AuthMode::Pat`]): + * - `Pat` — `token` required. + * - `OAuthM2m` — `oauthClientId` + `oauthClientSecret` required. + * - `OAuthU2m` — `oauthClientId` / `oauthRedirectPort` optional (kernel + * defaults to the `databricks-cli` client on port 8020). * * Catalog / schema / sessionConf are applied once at session creation * and remain in effect for every statement run on the resulting * `Connection`. The SEA wire protocol carries them on * `CreateSession`, not on `ExecuteStatement` — so there is no - * per-statement override path in either this binding or pyo3. + * per-statement override path on this binding. */ export interface ConnectionOptions { /** @@ -27,10 +149,32 @@ export interface ConnectionOptions { */ httpPath: string /** - * Personal access token. Must be non-empty (the kernel rejects - * empty PATs at session construction). + * Authentication mode. Omitted ⇒ [`AuthMode::Pat`] (back-compat: + * existing PAT callers pass only `token`). */ - token: string + authMode?: AuthMode + /** + * Personal access token. Required (and non-empty) for + * [`AuthMode::Pat`]; ignored otherwise. + */ + token?: string + /** + * OAuth client id. Required for [`AuthMode::OAuthM2m`]; optional for + * [`AuthMode::OAuthU2m`] (kernel defaults to `databricks-cli`). + */ + oauthClientId?: string + /** OAuth client secret. Required for [`AuthMode::OAuthM2m`]. */ + oauthClientSecret?: string + /** + * Localhost callback port for the [`AuthMode::OAuthU2m`] browser + * flow. Omitted ⇒ kernel default (8020). + */ + oauthRedirectPort?: number + /** + * OAuth scopes override (M2M / U2M). Omitted ⇒ kernel defaults + * (`["all-apis"]` for M2M; `["all-apis", "offline_access"]` for U2M). + */ + oauthScopes?: Array /** * Default catalog for statements executed on this session. * Routed through the kernel's `DefaultOpts` and onto the SEA @@ -49,20 +193,99 @@ export interface ConnectionOptions { * `session_confs`. Unknown keys are rejected server-side. */ sessionConf?: Record + /** + * Maximum number of pooled HTTP connections per host. Routes + * through the kernel's [`HttpConfig::pool_max_idle_per_host`]. + * Tunes the underlying `reqwest` connection pool — higher values + * reduce reconnect overhead when many statements run + * concurrently against the same warehouse. + * + * When the JS caller does NOT provide `maxConnections`, the napi + * binding applies a NodeJS-driver-appropriate default of + * [`NAPI_DEFAULT_POOL_MAX_IDLE_PER_HOST`] (100) — chosen to match + * the JDBC driver's `HttpConnectionPoolSize` default and to close + * the throughput gap vs the NodeJS Thrift driver's + * `maxSockets: Infinity` pool for bursty workloads. The kernel + * core's [`HttpConfig::pool_max_idle_per_host`] default remains + * at the conservative kernel value (10); each binding chooses + * its own user-facing default. Mirrors the Python connector's + * `max_connections` kwarg on the SEA backend, which exposes the + * knob but keeps its own urllib3-aligned default of 10. + * + * Napi-rs serialises `u32` as JS `number`; values up to + * `2^32 - 1` round-trip safely (any reasonable pool size fits). + */ + maxConnections?: number + /** + * Render `INTERVAL` / `DURATION` result columns as strings + * (`ResultConfig.intervals_as_string`). The kernel default is + * native Arrow `month_interval` / `duration[us]` types; the NodeJS + * Thrift driver surfaces intervals as strings, so the SEA driver + * sets this `true` for byte-compatible parity. Omitted ⇒ kernel + * default (native Arrow interval types). + */ + intervalsAsString?: boolean + /** + * Render complex (`ARRAY` / `MAP` / `STRUCT` / `VARIANT`) result + * columns as JSON strings (`ResultConfig.complex_types_as_json`) + * instead of native Arrow nested types. Omitted ⇒ kernel default + * (native Arrow nested types, which the NodeJS Arrow decoder + * already renders identically to the Thrift path). + */ + complexTypesAsJson?: boolean } /** - * Open a Databricks SQL session over PAT auth and return an opaque - * `Connection` wrapping the kernel `Session`. + * Open a Databricks SQL session and return an opaque `Connection` + * wrapping the kernel `Session`. Authentication is selected by + * `options.auth_mode` (PAT / OAuth M2M / OAuth U2M) — see + * [`build_auth_config`]. * * The JS-visible name is `openSession` (napi-rs converts snake_case * to camelCase for free functions). */ export declare function openSession(options: ConnectionOptions): Promise +/** + * JS-visible binding for a single positional parameter. + * + * Shape mirrors the `TSparkParameter` wire object the Thrift backend + * already emits via `DBSQLParameter.toSparkParameter()` — `type` is the + * canonical Databricks SQL type name (`"INT"`, `"STRING"`, + * `"DECIMAL(10,2)"`, ...), `value` is the string-encoded literal or + * `None` for SQL NULL. + * + * Why a string for `value` instead of a tagged JS union: round-tripping + * arbitrary JS values across the FFI requires either (a) a custom + * napi `FromNapiValue` per arm, or (b) a `serde_json::Value`-style + * dynamic dispatch on the Rust side. The Node-driver adapter already + * stringifies before calling the binding (see `DBSQLParameter` and the + * existing pyo3 wrapper), so the string-in / string-parsed contract + * adds no JS-side complexity and keeps the kernel-side validation in + * one place. + */ +export interface TypedValueInput { + /** + * Canonical Databricks SQL type name. Case-insensitive for the + * simple variants; for DECIMAL the parenthesised form + * (`"DECIMAL(10,2)"`) is required so the kernel can extract + * precision/scale. + */ + sqlType: string + /** + * String-encoded value. `None` always produces `TypedValue::Null` + * regardless of `sql_type` — matches the connector's + * `VoidParameter` shape and the pyo3 binding's contract. + */ + value?: string +} /** * A single Arrow IPC stream payload encoding one record batch (plus * the schema header so the JS-side reader is stateless). */ export interface ArrowBatch { + /** + * Arrow IPC stream payload (schema header + 1 record-batch + * message). Decode with `apache-arrow`'s `RecordBatchReader`. + */ ipcBytes: Buffer } /** @@ -70,6 +293,11 @@ export interface ArrowBatch { * record-batch messages). Returned by `Statement.schema()`. */ export interface ArrowSchema { + /** + * Arrow IPC stream payload (schema header only, no record-batch + * messages). Decode with `apache-arrow`'s `RecordBatchReader` — + * the reader will expose the schema and immediately end. + */ ipcBytes: Buffer } /** @@ -79,6 +307,122 @@ export interface ArrowSchema { * loaded?" probe for the JS-side loader's structured diagnostics. */ export declare function version(): string +/** + * Opaque async-statement handle. + * + * Returned by `Connection.submitStatement(...)` after the kernel + * `Statement::submit()` returns (server sent `wait_timeout=0s`, so + * the response carries a `statement_id` but the statement is still + * `Pending`/`Running`). JS drives polling via `status()` / + * `awaitResult()`. + * + * Concurrency shape matches the sync `Statement`: every method + * takes `inner.lock()` and holds the guard across the kernel + * `.await`. tokio `Mutex` is FIFO. `cancel()` / `close()` queue + * behind any in-flight `awaitResult()` until it returns naturally. + * The kernel's `AwaitResultCancelGuard` covers the drop-cancel + * case independently — see module docs. + */ +export declare class AsyncStatement { + /** + * Server-issued statement id. Cached at construction; readable + * even after `close()` so JS-side log lines can correlate + * against kernel / server logs which key on the same id. + */ + get statementId(): string + /** + * One-shot status check. Returns a string enum matching the + * kernel `StatementStatus` shape: + * `'Pending' | 'Running' | 'Succeeded' | 'Failed' | + * 'Cancelled' | 'Closed'`. Returns + * `KernelError(InvalidStatementHandle)` if the statement has + * been explicitly `close()`d. + * + * The `Failed` variant collapses to the string `'Failed'` on + * the JS side; the underlying error envelope (sql_state / + * error_code / query_id) is surfaced by `awaitResult()`'s + * rejection, which is where callers actually need the typed + * error. `status()` is intended for polling progress UIs + * that only need the state name. + */ + status(): Promise + /** + * Block until the server reaches a terminal state, then return + * an `AsyncResultHandle` that wraps the materialised result + * stream. The handle exposes `fetchNextBatch()` / `schema()` + * for consuming the result, plus `statementId` for log + * correlation. + * + * Drop-cancel safety: kernel `await_result` installs + * `AwaitResultCancelGuard` which fires a fire-and-forget + * `cancel_statement` if the future is dropped mid-poll + * (timeout, tokio::select! loser, JS-side `Promise.race` + * loser). The `util::guarded` `catch_unwind` here covers the + * V8-panic-across-boundary case on top. Returns + * `KernelError(InvalidStatementHandle)` if the statement has + * been explicitly `close()`d. + */ + awaitResult(): Promise + /** + * Server-side cancel. Returns + * `KernelError(InvalidStatementHandle)` if the statement has + * been explicitly `close()`d. Idempotent against a server + * that already reached a terminal state — the kernel's + * `cancel_statement` is a no-op there. + */ + cancel(): Promise + /** + * Explicit close. Idempotent — a second call on an + * already-closed handle returns `Ok(())`. On `Err`, the napi + * inner is already `None`, so a JS-side retry sees the + * closed-handle short-circuit and returns `Ok(())` without + * re-attempting the wire call. The kernel's own `Drop` + * fire-and-forget retry runs once in the background. + */ + close(): Promise +} +/** + * Opaque result-fetch handle returned by + * `AsyncStatement.awaitResult()`. Wraps a kernel `ResultStream` + * directly; structurally analogous to the sync `Statement`'s + * fetch-side surface (`fetchNextBatch` / `schema` / + * `statementId`). + * + * `cancel()` / `close()` are not exposed: the parent + * `AsyncStatement` owns server-side lifecycle. A `close()` here + * would create dual-ownership of the same statement_id with + * inconsistent close semantics. Callers `close()` the parent + * `AsyncStatement` after they're done fetching. + * + * Schema is cached at construction so it survives the underlying + * stream being drained; mirrors the sync `Statement.schema()` + * post-close contract. + */ +export declare class AsyncResultHandle { + /** + * Server-issued statement id. Cached at construction; readable + * for log correlation. Matches the parent `AsyncStatement`'s + * `statementId`. + */ + get statementId(): string + /** + * Pull the next batch of results. Returns `null` when the + * stream is exhausted. The returned `ArrowBatch.ipcBytes` is a + * complete Arrow IPC stream (schema header + 1 record-batch + * message), suitable for handing to `apache-arrow`'s + * `RecordBatchReader`. Byte-identical to the sync + * `Statement.fetchNextBatch()` payload for the same query. + */ + fetchNextBatch(): Promise + /** + * Result schema as an Arrow IPC payload (schema header only, + * no record-batch message). Available before any batches have + * been fetched. Sync because the body has no `.await` — + * `encode_ipc_stream` is pure CPU work over the cached + * `Arc`. + */ + schema(): ArrowSchema +} /** * Opaque connection handle wrapping a kernel `Session`. * @@ -86,37 +430,76 @@ export declare function version(): string * - the Drop impl can clone the `Arc` and `.take()` the session on a * background tokio task without holding `&mut self` (which Drop is * forbidden from doing across an `await`), - * - `executeStatement` can share immutable access to the session via - * the `Arc` clones the kernel makes internally - * (`Session::statement()` only needs `&self`). + * - `close()` can `.take()` the session to consume it for the kernel's + * move-by-value `Session::close(self)` signature. + * + * **Current concurrency shape** — `executeStatement` holds + * `inner.lock()` across `stmt.execute().await`, so two concurrent + * `Promise.all([executeStatement(q1), executeStatement(q2)])` calls + * on the same Connection serialise even though the kernel transport + * supports concurrent statements per session, and `close()` blocks + * behind any in-flight execute. The kernel's `Session::statement()` + * is `&self`-callable, so the right shape is `Arc` with + * concurrent execute paths; that lands in the follow-up lock-shape + * refactor — see + * `sea-workflow/jira-candidates/2026-05-24-napi-cancel-during-fetch.md`. */ export declare class Connection { + /** + * Server-issued session id. Cached at construction; readable + * even after `close()` so JS-side log lines can correlate + * against kernel / server logs which key on the same id. + */ + get sessionId(): string /** * Execute a SQL statement and return a Statement handle that * streams batches via `fetchNextBatch()`. * - * No per-statement options: catalog / schema / sessionConf are - * session-level (`openSession`). Positional / named parameters - * land in M1 via `Statement::spec().param(…)` on the kernel. - */ - executeStatement(sql: string): Promise - /** - * Explicit close. Marks the connection wrapper as closed so - * subsequent calls on this `Connection` return `InvalidArg`, then - * schedules a fire-and-forget server-side close on the runtime. - * - * **Why fire-and-forget and not `Session::close().await`:** the - * kernel's `Session::close(self).await` body holds a - * `tracing::EnteredSpan` (a `!Send` type) across an `.await`, so - * the future is not `Send`. napi-rs's `execute_tokio_future` glue - * rejects non-`Send` futures, and `Handle::spawn` does too. The - * kernel's `SessionInner::Drop` already spawns the - * `delete_session` RPC on the same runtime handle the napi - * binding captured, so dropping the value is functionally - * equivalent — the difference is that JS callers can't observe a - * `delete_session` failure from `close()`. Tracked as a kernel- - * side follow-up (clone the span rather than entering it) in - * Round 3 findings. + * Catalog / schema / sessionConf are session-level + * (`openSession`). Per-statement options on `ExecuteOptions`: + * - `statementConf` — per-statement Spark conf overlay + * - `queryTags` — serialised to a comma-separated `key:value` + * string and placed in `statement_conf["query_tags"]`, + * matching NodeJS Thrift's `serializeQueryTags` wire shape + * + * `options` is omitted/`None` for the no-options path; passing + * `{ statementConf: {} }` (an empty map) is treated the same as + * omission to keep the wire shape stable for the common case. + */ + executeStatement(sql: string, options?: ExecuteOptions | undefined | null): Promise + /** + * Submit a SQL statement and return immediately with an + * `AsyncStatement` handle, without blocking until the query + * finishes. The kernel's `Statement::submit()` sends + * `wait_timeout=0s`, so the server responds as soon as it has a + * `statement_id` (state `Pending`/`Running`); JS drives polling + * via `AsyncStatement.status()` and materialises results with + * `AsyncStatement.awaitResult()`. + * + * This is the async-execution path the Thrift backend always + * uses (`runAsync: true`): the SEA backend submits, returns a + * pending operation handle, and polls to terminal during + * fetch. Option semantics (statementConf / queryTags / + * rowLimit / queryTimeoutSecs / positional + named params) are + * identical to `executeStatement`; only the blocking-vs-pending + * return contract differs. + */ + submitStatement(sql: string, options?: ExecuteOptions | undefined | null): Promise + /** + * Explicit close. Awaits the server-side `DeleteSession` so the + * JS caller can observe failures (auth revoked mid-session, + * warehouse stopped, network error). Idempotent — a second call + * on an already-closed connection returns `Ok`. + * + * **Errors are terminal from the JS side.** The kernel session + * handle is consumed (`take()`) BEFORE the wire `DeleteSession` + * runs, because `Session::close` takes `self` by value. On `Err`, + * the napi `inner` is already `None`, so a JS-side retry sees a + * closed connection and returns `Ok(())` without re-attempting + * the wire call. The kernel's own `Drop` fire-and-forget retry + * runs once in the background — the JS caller can log the error + * but cannot drive a retry. If you need retry-on-failure + * semantics for `DeleteSession`, layer them above this method. */ close(): Promise /** @@ -184,39 +567,97 @@ export declare class Connection { /** * Opaque executed-statement handle. * - * `inner` is wrapped in `Arc>>` so: - * - `fetch_next_batch` can `await` `ResultStream::next_batch` which - * requires `&mut ExecutedStatement` (via `result_stream_mut`), - * - `cancel` / `close` (which take `&self` on the kernel side via the - * `ExecutedStatementHandle` trait) can run concurrently with each - * other from a JS perspective without panicking, - * - `Drop` can hand the inner handle off to a tokio task without - * touching `&mut self` across an `await`. + * **Current concurrency shape** — every method takes `inner.lock()` + * and holds the guard across the kernel `.await`. tokio `Mutex` is + * FIFO, so cancel/close queue behind any in-flight `fetchNextBatch` + * until it returns naturally. This is a known limitation that exists + * because the napi shape has not yet been split into an + * `Arc` (for cancel/close, which the + * kernel exposes as `&self`-callable) plus a `Mutex>` only + * for the borrowed-mut fetch path. The lock-shape refactor needs a + * small kernel-side accessor and lands in a follow-up PR — see + * `sea-workflow/jira-candidates/2026-05-24-napi-cancel-during-fetch.md`. + * + * `schema` and `statement_id` are cached at construction so they + * survive `close()` — JS callers building error reports against a + * disposed statement can still read them. */ export declare class Statement { /** - * Pull the next batch of results. Returns `None` when the stream + * Server-issued statement id. Cached at construction; readable + * even after `close()` so JS-side log lines can correlate against + * kernel / server logs which key on the same id. + */ + get statementId(): string + /** + * Pull the next batch of results. Returns `null` when the stream * is exhausted. The returned `ArrowBatch.ipcBytes` is a complete * Arrow IPC stream (schema header + 1 record-batch message) * suitable for handing to `apache-arrow`'s `RecordBatchReader`. + * + * On `Err`, the stream is in an unspecified state — call + * `close()` and discard the `Statement`. Subsequent + * `fetchNextBatch()` calls after an error are not guaranteed to + * succeed or fail consistently. */ fetchNextBatch(): Promise /** * Result schema as an Arrow IPC payload (schema header only, no * record-batch message). Available before any batches have been - * fetched. + * fetched, and remains available after `close()` — the kernel + * materialises the schema eagerly so JS callers can build error + * reports against a disposed statement. + * + * Sync because the body has no `.await` — `encode_ipc_stream` is + * pure CPU work over an `Arc` already cached on the + * wrapper. Mirrors `pyo3/src/statement.rs::arrow_schema` (sync). + * napi-rs converts a panic in a sync `#[napi]` entry point into a + * thrown JS error via its own macro-expanded boundary, so the + * `util::guarded` `catch_unwind` wrapper that the `async fn` + * entry points use is not required for this method. */ - schema(): Promise + schema(): ArrowSchema /** - * Server-side cancel. No-op if already finished or if this - * `Statement` wraps a metadata `ResultStream` (metadata calls have - * no in-flight cancellation surface in the kernel today). + * Server-side cancel. + * + * For executed statements: short-circuits to `Ok(())` if + * `fetchNextBatch` has already returned `null` (stream + * naturally exhausted) — matches the JDBC `Statement.cancel()` + * no-op-after-completion contract, so JS callers can fire cancel + * defensively without distinguishing "real cancel" from "raced + * with natural completion." + * + * For metadata streams: no-op (the kernel has no in-flight + * cancellation surface for metadata calls today). + * + * Returns `KernelError(InvalidStatementHandle)` if the statement + * has been explicitly `close()`d. */ cancel(): Promise /** - * Explicit close. Awaits the server-side close so the JS caller - * can observe failures. For metadata streams, drops the stream - * (no server round-trip needed). + * Explicit close. + * + * For executed statements: awaits the server-side `CloseStatement` + * so the JS caller can observe failures (auth revoked mid-session, + * network error, server-side error). Idempotent — a second call + * on an already-closed statement returns `Ok`. + * + * **Errors are terminal from the JS side.** The kernel executed + * handle is taken out of `inner` BEFORE the wire `CloseStatement` + * runs (so `Drop` knows there's nothing left to clean up). On + * `Err`, the napi `inner` is already `None`, so a JS-side retry + * sees a closed statement and returns `Ok(())` without re- + * attempting the wire call. The kernel-level `ExecutedStatement` + * has been consumed at that point and the value is dropped on + * the way out of the closure — the kernel's `ExecutedStatement:: + * Drop` then fires-and-forgets a single retry on the captured + * runtime. The JS caller can log the error but cannot drive a + * further retry. If you need retry-on-failure semantics for + * `CloseStatement`, layer them above this method. + * + * For metadata streams: drops the stream (no server round-trip + * needed — metadata results have no in-flight server-side + * resource to release). */ close(): Promise } diff --git a/tests/unit/sea/SeaOperationBackend.test.ts b/tests/unit/sea/SeaOperationBackend.test.ts index 17f593e3..9e91a340 100644 --- a/tests/unit/sea/SeaOperationBackend.test.ts +++ b/tests/unit/sea/SeaOperationBackend.test.ts @@ -38,6 +38,8 @@ import { import SeaOperationBackend from '../../../lib/sea/SeaOperationBackend'; import ClientContextStub from '../.stubs/ClientContextStub'; +import { TOperationState } from '../../../thrift/TCLIService_types'; +import { SeaNativeStatementStatus } from '../../../lib/sea/SeaNativeLoader'; // Minimal stub of the napi `Statement` surface that emits a precomputed // Arrow IPC payload per `fetchNextBatch()` call. Used to feed @@ -267,3 +269,174 @@ describe('SeaOperationBackend — M0 datatype round-trip via napi → ArrowResul expect(stub.closed).to.equal(true); }); }); + +// Result-fetch surface returned by `AsyncStatement.awaitResult()`. +class AsyncResultHandleStub { + public readonly statementId = 'stmt-async-1'; + + private readonly schemaIpc: Buffer; + + private readonly batches: Buffer[]; + + constructor(schemaIpc: Buffer, batches: Buffer[]) { + this.schemaIpc = schemaIpc; + this.batches = [...batches]; + } + + public async fetchNextBatch(): Promise<{ ipcBytes: Buffer } | null> { + if (this.batches.length === 0) return null; + return { ipcBytes: this.batches.shift() as Buffer }; + } + + public async schema(): Promise<{ ipcBytes: Buffer }> { + return { ipcBytes: this.schemaIpc }; + } +} + +// Pending `AsyncStatement` whose `status()` walks a scripted sequence, +// mirroring the kernel submit+poll path (server returns Pending/Running +// before Succeeded). `awaitResult()` yields the result handle once the +// scripted statuses are exhausted (or immediately for terminal states). +class AsyncStatementStub { + public readonly statementId = 'stmt-async-1'; + + public statusCalls = 0; + + public cancelled = false; + + public closed = false; + + public awaitResultCalls = 0; + + private readonly statuses: SeaNativeStatementStatus[]; + + private readonly resultHandle: AsyncResultHandleStub | null; + + private readonly awaitResultError: Error | null; + + constructor( + statuses: SeaNativeStatementStatus[], + resultHandle: AsyncResultHandleStub | null, + awaitResultError: Error | null = null, + ) { + this.statuses = statuses; + this.resultHandle = resultHandle; + this.awaitResultError = awaitResultError; + } + + public async status(): Promise { + this.statusCalls += 1; + const idx = Math.min(this.statusCalls - 1, this.statuses.length - 1); + return this.statuses[idx]; + } + + public async awaitResult(): Promise { + this.awaitResultCalls += 1; + if (this.awaitResultError) { + throw this.awaitResultError; + } + return this.resultHandle as AsyncResultHandleStub; + } + + public async cancel(): Promise { + this.cancelled = true; + } + + public async close(): Promise { + this.closed = true; + } +} + +describe('SeaOperationBackend — async submit+poll lifecycle (Thrift runAsync parity)', () => { + const schema = new Schema([withTypeName(new Field('x', new Int32(), true), 'INT')]); + const schemaIpc = ipcSchemaOnly(schema); + + it('id defaults to the server-issued statementId', () => { + const stmt = new AsyncStatementStub(['Succeeded'], new AsyncResultHandleStub(schemaIpc, [])); + const backend = new SeaOperationBackend({ + asyncStatement: stmt as any, + context: new ClientContextStub(), + }); + expect(backend.id).to.equal('stmt-async-1'); + }); + + it('status() reports the real server state (RUNNING before terminal)', async () => { + const stmt = new AsyncStatementStub(['Running'], new AsyncResultHandleStub(schemaIpc, [])); + const backend = new SeaOperationBackend({ + asyncStatement: stmt as any, + context: new ClientContextStub(), + }); + const resp = await backend.status(false); + expect(resp.operationState).to.equal(TOperationState.RUNNING_STATE); + }); + + it('waitUntilReady() polls status() to terminal, firing the callback each tick', async () => { + const data = ipcFromColumns(schema, { x: [7] }); + const stmt = new AsyncStatementStub( + ['Pending', 'Running', 'Succeeded'], + new AsyncResultHandleStub(schemaIpc, [data]), + ); + const backend = new SeaOperationBackend({ + asyncStatement: stmt as any, + context: new ClientContextStub(), + }); + + const ticks: number[] = []; + await backend.waitUntilReady({ callback: (r) => ticks.push(r.operationState as number) }); + + // Polled Pending → Running → Succeeded (3 status RPCs, 3 callback ticks). + expect(stmt.statusCalls).to.equal(3); + expect(ticks).to.deep.equal([ + TOperationState.PENDING_STATE, + TOperationState.RUNNING_STATE, + TOperationState.FINISHED_STATE, + ]); + // Result stream was materialised; the first fetch returns the row. + const rows = await backend.fetchChunk({ limit: 10 }); + expect(rows).to.deep.equal([{ x: 7 }]); + }); + + it('waitUntilReady() throws on a Failed statement, surfacing the kernel error', async () => { + const kernelErr = new Error('DIVIDE_BY_ZERO'); + const stmt = new AsyncStatementStub(['Running', 'Failed'], null, kernelErr); + const backend = new SeaOperationBackend({ + asyncStatement: stmt as any, + context: new ClientContextStub(), + }); + + let thrown: unknown; + try { + await backend.waitUntilReady(); + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(Error); + // On ERROR_STATE the backend drives awaitResult() to extract the real + // error envelope rather than a generic state error. + expect(stmt.awaitResultCalls).to.equal(1); + expect((thrown as Error).message).to.match(/DIVIDE_BY_ZERO/); + }); + + it('cancel() forwards to the async statement and aborts an in-flight wait', async () => { + // status stays Running forever; cancel() flips the lifecycle flag so the + // next poll iteration short-circuits with a Canceled OperationStateError. + const stmt = new AsyncStatementStub(['Running'], new AsyncResultHandleStub(schemaIpc, [])); + const backend = new SeaOperationBackend({ + asyncStatement: stmt as any, + context: new ClientContextStub(), + }); + + const waitPromise = backend.waitUntilReady(); + await backend.cancel(); + expect(stmt.cancelled).to.equal(true); + + let thrown: unknown; + try { + await waitPromise; + } catch (err) { + thrown = err; + } + expect(thrown).to.be.instanceOf(Error); + expect((thrown as Error).message).to.match(/cancel/i); + }); +}); diff --git a/tests/unit/sea/execution.test.ts b/tests/unit/sea/execution.test.ts index f155caf2..6adf2714 100644 --- a/tests/unit/sea/execution.test.ts +++ b/tests/unit/sea/execution.test.ts @@ -22,6 +22,9 @@ import { SeaNativeConnection, SeaNativeExecuteOptions, SeaNativeStatement, + SeaNativeAsyncStatement, + SeaNativeAsyncResultHandle, + SeaNativeStatementStatus, } from '../../../lib/sea/SeaNativeLoader'; import IClientContext, { ClientConfig } from '../../../lib/contracts/IClientContext'; import IDBSQLLogger, { LogLevel } from '../../../lib/contracts/IDBSQLLogger'; @@ -56,6 +59,44 @@ class FakeNativeStatement implements SeaNativeStatement { } } +class FakeNativeAsyncResultHandle implements SeaNativeAsyncResultHandle { + public readonly statementId = 'fake-statement-id'; + + public async fetchNextBatch() { + return null; + } + + public async schema() { + return { ipcBytes: Buffer.alloc(0) }; + } +} + +class FakeNativeAsyncStatement implements SeaNativeAsyncStatement { + public readonly statementId = 'fake-statement-id'; + + public cancelled = false; + + public closed = false; + + public statusToReturn: SeaNativeStatementStatus = 'Succeeded'; + + public async status() { + return this.statusToReturn; + } + + public async awaitResult() { + return new FakeNativeAsyncResultHandle(); + } + + public async cancel() { + this.cancelled = true; + } + + public async close() { + this.closed = true; + } +} + class FakeNativeConnection implements SeaNativeConnection { public closed = false; @@ -67,6 +108,8 @@ class FakeNativeConnection implements SeaNativeConnection { public statementToReturn: FakeNativeStatement = new FakeNativeStatement(); + public asyncStatementToReturn: FakeNativeAsyncStatement = new FakeNativeAsyncStatement(); + public async executeStatement( sql: string, options?: SeaNativeExecuteOptions, @@ -79,6 +122,21 @@ class FakeNativeConnection implements SeaNativeConnection { return this.statementToReturn; } + // Async-execution path used by `executeStatement` on the SEA backend + // (kernel submit + poll, mirroring Thrift `runAsync`). Records the same + // `lastSql` / `lastOptions` so option-forwarding assertions are shared. + public async submitStatement( + sql: string, + options?: SeaNativeExecuteOptions, + ): Promise { + if (this.throwOnExecute) { + throw this.throwOnExecute; + } + this.lastSql = sql; + this.lastOptions = options; + return this.asyncStatementToReturn; + } + // Metadata stubs — return a fresh statement so callers can test wrapping. public async listCatalogs() { return new FakeNativeStatement(); } diff --git a/tests/unit/sea/metadata.test.ts b/tests/unit/sea/metadata.test.ts index 8afdfccd..1131df0b 100644 --- a/tests/unit/sea/metadata.test.ts +++ b/tests/unit/sea/metadata.test.ts @@ -19,6 +19,7 @@ import SeaTableTypeFilter from '../../../lib/sea/SeaTableTypeFilter'; import { SeaNativeConnection, SeaNativeStatement, + SeaNativeAsyncStatement, } from '../../../lib/sea/SeaNativeLoader'; import IOperationBackend from '../../../lib/contracts/IOperationBackend'; import IClientContext, { ClientConfig } from '../../../lib/contracts/IClientContext'; @@ -65,6 +66,24 @@ class FakeMetadataConnection implements SeaNativeConnection { return this.record('executeStatement', [_sql]); } + // Metadata tests exercise the dedicated list* methods (blocking + // statements); the async query path isn't used here, but the interface + // requires it. Record the call and return a minimal async handle. + public async submitStatement(_sql: string): Promise { + this.record('submitStatement', [_sql]); + return { + statementId: 'fake-statement-id', + status: async () => 'Succeeded' as const, + awaitResult: async () => ({ + statementId: 'fake-statement-id', + fetchNextBatch: async () => null, + schema: async () => ({ ipcBytes: Buffer.alloc(0) }), + }), + cancel: async () => {}, + close: async () => {}, + }; + } + public async listCatalogs(): Promise { return this.record('listCatalogs', []); }