Skip to content

[ray] Ray merge into#8028

Merged
JingsongLi merged 1 commit into
apache:masterfrom
XiaoHongbo-Hope:ray_merge_into
Jun 1, 2026
Merged

[ray] Ray merge into#8028
JingsongLi merged 1 commit into
apache:masterfrom
XiaoHongbo-Hope:ray_merge_into

Conversation

@XiaoHongbo-Hope
Copy link
Copy Markdown
Contributor

@XiaoHongbo-Hope XiaoHongbo-Hope commented May 28, 2026

Purpose

Add merge_into for data-evolution tables on Ray, UPDATE path: inner join →
repartition by _FIRST_ROW_ID → partial write. INSERT path: left_anti
join → PaimonDatasink. Single atomic commit.

MVP scope : only update="*" / insert="*".
Conditions and partial SET deferred to follow-up.

Tests

14 E2E tests covering matched update, not-matched insert, combined
commit, renamed ON columns, duplicate match error, blob exclusion,
source/parameter validation.

@XiaoHongbo-Hope XiaoHongbo-Hope changed the title [python][ray] Ray merge into [WIP][ray] Ray merge into May 28, 2026
@JingsongLi
Copy link
Copy Markdown
Contributor

API Design Recommendations for Paimon Ray merge_into from AI:

Based on the design patterns from Delta Lake, Lance, Iceberg/Ray, and Paimon Spark's MergeIntoPaimonDataEvolutionTable:

  ---
  1. Use string expressions for predicates, not Python callables

  Delta/Lance both use SQL-like strings that can be analyzed, optimized, and pushed down:

  # Delta
  dt.merge(source, predicate="target.id = source.id")
    .when_matched_update(updates={"name": "source.name"}, predicate="source.ts > target.ts")

  # Lance
  dataset.merge_insert("id").when_matched_update_all(condition="source.ts > target.ts")

  PR #8028 uses Python lambdas — these cannot be inspected for optimization, cannot participate in predicate pushdown, and are fragile for serialization across Ray workers:

  # PR #8028 (problematic)
  when_matched_update_condition=lambda r: r['s.age'] > r['t.age']

  Recommendation: Use string expressions or a simple expression DSL. If full SQL parsing is too heavy, at minimum support column-reference strings like "s.col" for SET values (which the PR already does) and simple comparison expressions for conditions.

  ---
  2. Align with Paimon Spark's execution model

  Paimon Spark's MergeIntoPaimonDataEvolutionTable uses:

  UPDATE path:  Target LEFT_OUTER JOIN Source → MergeRows → repartition(_FIRST_ROW_ID) → writePartialFields
  INSERT path:  Source LEFT_ANTI JOIN Target → MergeRows → write (full rows)

  The Ray implementation should mirror this:

  # UPDATE: inner join → extract (_ROW_ID, changed_cols) → partition by _FIRST_ROW_ID → partial write
  # INSERT: left_anti join → apply insert expressions → append write
  # COMMIT: atomic (update_msgs + insert_msgs), with snapshot conflict detection

  ---
  3. Recommended API shape

  from pypaimon.ray import merge_into, WhenMatched, WhenNotMatched

  merge_into(
      target="db.table",
      source=ray_dataset,  # or pa.Table, pd.DataFrame, str (table identifier)
      catalog_options={...},
      on=["id"],  # or {"target_col": "source_col"} for renamed keys
      when_matched=[
          WhenMatched(update="*"),                          # update all cols from source
          WhenMatched(update={"name": "s.name"}, condition="s.ts > t.ts"),  # conditional
      ],
      when_not_matched=[
          WhenNotMatched(insert="*"),                       # insert all cols
          WhenNotMatched(insert={"id": "s.id", "status": "'new'"}, condition="s.age > 18"),
      ],
  )

  Key differences from PR #8028:
  - condition is a string expression, not a callable
  - Drop merge_condition (non-standard semantics that routes unmatched rows to INSERT — confusing and diverges from SQL MERGE)
  - Return a metrics dict: {"num_matched": ..., "num_inserted": ..., "num_unchanged": ...}

  ---
  4. Do not invent custom commit conflict detection

  PR #8028 adds commit.strict-mode.last-safe-snapshot to file_store_commit.py — a custom mechanism that modifies the core commit path for all table operations.

  Paimon already has conflict detection via writer.rowIdCheckConflict(snapshotId) (used in the Spark implementation). The Ray connector should reuse the same mechanism:

  # Capture snapshot before read
  plan = table.newSnapshotReader().read()
  # ... do merge work ...
  # Conflict check at commit time (existing Paimon mechanism)
  writer.rowIdCheckConflict(plan.snapshotId())
  writer.commit(update_msgs + insert_msgs)

  ---
  5. Follow Iceberg/Ray's two-phase separation

  Iceberg's Ray connector cleanly separates:
  - Phase 1 (distributed workers): write files, return metadata
  - Phase 2 (driver): collect metadata, resolve conflicts, atomic commit

  For Paimon:
  - Phase 1 (distributed):
    - UPDATE: inner_join → map_batches(apply_clauses) → groupby(_FIRST_ROW_ID).map_groups(partial_write) → return commit messages
    - INSERT: left_anti_join → map_batches(apply_clauses) → write_datasink → return commit messages
  - Phase 2 (driver): collect all commit messages → single atomic commit with conflict check

  ---
  6. Avoid driver-side materialization of large datasets

  PR #8028 collects target keys or matched indices into driver-side Python dicts/sets — this will OOM on large tables.

  Better alternatives (from Iceberg/Ray):
  - Use ray.put(keys_table) to broadcast key sets via object store (supports spill-to-disk)
  - Use Ray Data join() for distributed matching instead of collecting to driver
  - Use coarse range filters to prune unrelated target files before reading

  ---
  7. Minimum viable scope for a first PR

  Given the complexity, a reasonable first PR should:
  1. Support only when_matched_update("*") + when_not_matched_insert("*") (no conditions, no partial SET)
  2. Use Ray Data join() for both paths (inner + left_anti)
  3. Reuse existing TableUpdateByRowId for the update path
  4. Reuse existing write_paimon for the insert path
  5. Atomic commit with existing Paimon snapshot conflict detection
  6. No merge_condition, no self-merge optimization, no vectorized fast paths

  Then iterate with follow-up PRs for conditions, partial SET, optimizations, etc.

@leaves12138
Copy link
Copy Markdown
Contributor

Thanks for working on this. Since this PR is still marked as draft/WIP and several checks are still running, I am leaving it unapproved for now. Please request review again once the implementation is ready and CI is green.

@XiaoHongbo-Hope XiaoHongbo-Hope marked this pull request as ready for review May 31, 2026 12:53
@XiaoHongbo-Hope XiaoHongbo-Hope changed the title [WIP][ray] Ray merge into [ray] Ray merge into May 31, 2026
Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on Ray merge_into; the overall decomposition (prepare -> matched/anti joins -> transform -> distributed row-id update/insert -> single commit) is clear, and the API/docs are easy to follow.

However, I do not think this PR is ready to merge in its current state because the core behavior is intentionally disabled by skipped tests and the PR body says it still depends on prerequisite fixes.

Blocking points:

  1. The new public API is exported from pypaimon.ray.__init__ and documented in docs/docs/pypaimon/ray-data.md, but the main E2E coverage that proves update/insert/combined merge semantics is skipped with @unittest.skip(_SKIP_UPDATE_STAR). The only unskipped tests are validation/projection helpers; they do not exercise successful merge_into writes.
  2. The implementation still has a TODO for the global-index column-update-action check after #8045. Since this path commits row-id updates, merging it before the prerequisite check is wired in may expose an API that accepts tables/configurations we do not actually support safely yet.
  3. Because the PR itself states it sits behind those prerequisites, I think it should either stay draft/unmerged until the prerequisites land, or remove the public export/docs and keep this as an internal preparatory patch.

Once the prerequisite fixes land, please re-enable the E2E tests (at least matched update, not-matched insert, combined update+insert, renamed ON columns, duplicate source match error, and single-snapshot combined commit) and wire the global-index update-action check before exposing this as a public Ray API. After that I am happy to review again; the structure looks like a good direction.

@JingsongLi
Copy link
Copy Markdown
Contributor

MERGE INTO Topology

                          ┌─────────────────────────┐
                          │     merge_into() entry    │
                          │  _prepare(): validate &   │
                          │  normalize parameters     │
                          └────────────┬────────────┘
                                       │
                          ┌────────────▼────────────┐
                          │   Pin base_snapshot_id   │
                          │  All branches read the   │
                          │  same snapshot for        │
                          │  consistency              │
                          └────────────┬────────────┘
                                       │
                     ┌─────────────────┴─────────────────┐
                     │                                   │
         ┌───────────▼───────────┐           ┌───────────▼───────────┐
         │   MATCHED branch      │           │ NOT MATCHED branch    │
         │   (update path)       │           │ (insert path)         │
         └───────────┬───────────┘           └───────────┬───────────┘
                     │                                   │
    ┌────────────────▼────────────────┐     ┌────────────▼────────────────┐
    │  read_paimon(target, projection)│     │  read_paimon(target, on_cols)│
    │  Read only _ROW_ID + needed cols│     │  Read only join-key cols     │
    │  rename cols → t.<col>          │     │  rename cols → t.<col>       │
    └────────────────┬────────────────┘     └────────────┬────────────────┘
                     │                                   │
    ┌────────────────▼────────────────┐     ┌────────────▼────────────────┐
    │  source_ds rename → s.<col>     │     │  source_ds rename → s.<col>  │
    └────────────────┬────────────────┘     └────────────┬────────────────┘
                     │                                   │
    ┌────────────────▼────────────────┐     ┌────────────▼────────────────┐
    │        INNER JOIN               │     │       LEFT ANTI JOIN        │
    │  on: t.<key> = s.<key>          │     │  on: s.<key> = t.<key>      │
    │  num_partitions = N             │     │  num_partitions = N         │
    │  → rows that matched            │     │  → source rows with no match│
    └────────────────┬────────────────┘     └────────────┬────────────────┘
                     │                                   │
    ┌────────────────▼────────────────┐     ┌────────────▼────────────────┐
    │  map_batches(matched_transform) │     │ map_batches(insert_transform)│
    │  Build output per SET spec      │     │  Build output per SET spec   │
    │  out: [_ROW_ID, update_cols...] │     │  out: [all target columns]   │
    │  → update_ds                    │     │  blob cols filled with null  │
    └────────────────┬────────────────┘     │  → insert_ds                │
                     │                      └────────────┬────────────────┘
    ┌────────────────▼────────────────┐                   │
    │  distributed_update_apply()     │                   │
    │                                 │                   │
    │  1. Build planner, get FilesInfo │                   │
    │     (first_row_ids index)        │                   │
    │  2. ray.put(FilesInfo) broadcast │                   │
    │  3. map_batches(_assign_frid)    │                   │
    │     searchsorted to assign each  │                   │
    │     row to its owning data file  │                   │
    │  4. groupby(_FIRST_ROW_ID)       │                   │
    │     .map_groups(_apply_group)     │                   │
    │     One TableUpdateByRowId per    │                   │
    │     group → produces CommitMsgs  │                   │
    │  5. iter_batches to collect       │                   │
    │     pickled CommitMessages        │                   │
    │  → update_msgs, num_updated      │                   │
    └────────────────┬────────────────┘                   │
                     │                      ┌────────────▼────────────────┐
                     │                      │ distributed_write_collect   │
                     │                      │  _CollectingDatasink        │
                     │                      │  (subclass of PaimonDatasink│
                     │                      │  write_datasink() to write  │
                     │                      │  → insert_msgs              │
                     │                      └────────────┬────────────────┘
                     │                                   │
                     └─────────────┬─────────────────────┘
                                   │
                     ┌─────────────▼─────────────────────┐
                     │  Merge update_msgs + insert_msgs   │
                     │  Single atomic commit              │
                     │  → produces exactly 1 snapshot     │
                     └─────────────┬─────────────────────┘
                                   │
                     ┌─────────────▼─────────────────────┐
                     │  Return metrics:                   │
                     │  {num_matched, num_inserted,       │
                     │   num_unchanged}                   │
                     └───────────────────────────────────┘

Key Design Decisions

  1. Two independent JOINs instead of one LEFT OUTER JOIN — Following Spark's approach: a unified left_outer join would require materialize() to feed both branches, which can OOM on large merges. Instead, the matched path uses inner join and the not-matched path uses left_anti join, each running independently.

  2. Distributed grouping for the update path — The most complex part:

    • _assign_frid: uses np.searchsorted to map each row's _ROW_ID to the first_row_id of its owning data file
    • groupby(_FIRST_ROW_ID).map_groups: groups by data file, each group spawns a TableUpdateByRowId worker for local updates
    • FilesInfo is broadcast via ray.put() to all workers, avoiding per-task manifest re-scans
  3. Snapshot pinning — All reads are pinned to base_snapshot_id, preventing concurrent commits from causing the matched/not-matched branches to see different versions of the data.

  4. Atomic commitCommitMessages from both update and insert are merged into a single commit() call, guaranteeing exactly one snapshot is produced.

  5. Duplicate match detection_apply_group checks count_distinct(_ROW_ID) != num_rows; if multiple source rows match the same target row, it raises an error rather than silently picking a winner.

@XiaoHongbo-Hope XiaoHongbo-Hope force-pushed the ray_merge_into branch 3 times, most recently from 29b47b6 to a8f8f01 Compare June 1, 2026 05:51
@XiaoHongbo-Hope
Copy link
Copy Markdown
Contributor Author

@JingsongLi Ready now

@XiaoHongbo-Hope XiaoHongbo-Hope force-pushed the ray_merge_into branch 2 times, most recently from b4bc2cf to a8d39fc Compare June 1, 2026 09:49
Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update. The API is much closer to the direction discussed earlier: merge_into(..., on=..., when_matched=[WhenMatched(update="*")], when_not_matched=[WhenNotMatched(insert="*")]), two pinned joins, and a single commit are all good choices. Also good to see the successful E2E tests enabled now.

I still have a few API-design concerns before exposing this from pypaimon.ray.__all__ and documenting it as a public API:

  1. Please stabilize the public clause shape now. The PR body/docs say partial SET and conditions are deferred, but the exported WhenMatched / WhenNotMatched dataclasses currently only contain update: str / insert: str, and SetSpec = str. If we know the intended final API is condition="..." plus mapping-based SET/INSERT expressions, I think the public class should already reserve that shape, e.g. WhenMatched(update="*", condition=None) / WhenNotMatched(insert="*", condition=None) and a SetSpec type that includes mappings, while validation can still reject non-"*" or non-None conditions for this MVP. Otherwise the first public release of this API does not reflect the planned API surface.

  2. Do not leave Python callables in the API path. _needed_target_cols still has a callable(value) branch. Even though the current _normalize_set_spec only allows "*", this makes it look like Python callables are an intended future SET representation. For Ray merge semantics we should keep expressions analyzable/serializable, preferably string expressions or a small expression DSL, not callables.

  3. Clarify the source: str contract. Accepting a string as source is convenient, but it currently means “read a Paimon table from the same catalog options at latest snapshot”. The public docs should state that limitation explicitly, or the API should expose source-specific catalog/snapshot options. Otherwise users may assume they can merge from a different catalog or a pinned source snapshot.

  4. Fix the documented default for num_partitions. The docs say it defaults to max(16, cluster_cpus * 2), but _resolve_num_partitions returns max(1, cpus * 2) and falls back to 4. Please make the docs and implementation match before publishing the API.

  5. Avoid accidentally publishing TableUpdateByRowId internals. FilesInfo, precomputed_files_info, and snapshot_files_info() are useful for this implementation, but the docstring calls snapshot_files_info() a “Public accessor”. If these are only for Ray merge internals, please mark them private/internal (or at least avoid describing them as public API), because exposing file-index internals will be hard to change later.

So the implementation direction looks fine to me, but I would not expose it as a stable Ray API until these API-surface details are tightened.

@XiaoHongbo-Hope XiaoHongbo-Hope force-pushed the ray_merge_into branch 2 times, most recently from 164fdb9 to 18c41a2 Compare June 1, 2026 11:45
@JingsongLi
Copy link
Copy Markdown
Contributor

I took another pass over the latest commit and I still think this should stay unmerged for now because of two semantic issues in the new Ray merge path:

  1. Insert-only merge does not carry the base snapshot into conflict detection. The update path passes base_snapshot_id into distributed_update_apply, and the update messages get check_from_snapshot through TableUpdateByRowId. The insert path, however, collects messages from distributed_write_collect_msgs without setting check_from_snapshot. Since FileStoreCommit only enables row-id conflict detection when at least one commit message has check_from_snapshot != -1, a WHEN NOT MATCHED THEN INSERT * merge can anti-join against snapshot N, then commit after a concurrent writer has inserted a matching key, and still append the stale unmatched source row. Spark's data-evolution merge calls rowIdCheckConflict(plan.snapshotId()) for the whole commit; the Ray path should make insert-only and combined commits participate in the same base-snapshot conflict check.

  2. UPDATE SET * includes partition columns, but partial row-id writes are emitted back to the original partition. _prepare builds settable_field_names from all non-blob target fields, _normalize_set_spec("*") maps every one of them to s.<col>, and then TableUpdateByRowId._write_group writes the partial file using the original split partition. If a matched source row changes a partition key, this produces a partial column file under the old partition carrying the new partition value, rather than moving the row or rejecting the change. That can make partition pruning / row contents inconsistent. The new tests only cover unpartitioned tables, so this case is not guarded.

I would fix these before exposing merge_into as a public Ray API: set the base snapshot/conflict check for insert messages too, and either reject partition-key changes in matched updates or implement a real cross-partition rewrite, with a partitioned data-evolution merge test covering it.

Copy link
Copy Markdown
Contributor

@leaves12138 leaves12138 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

API-design follow-up only: the latest version looks much better to me. Reserving condition: Optional[str] on WhenMatched / WhenNotMatched, widening SetSpec to allow mapping-style SET/INSERT specs, documenting the source: str limitation, fixing the num_partitions default in docs, and making FilesInfo internal address the main API-shape concerns I raised earlier.

From the public API perspective, I do not think we need to block this direction anymore. A few non-blocking nits before/after merge:

  1. Please document that future mapping values are intended to be literals and analyzable expressions/column references such as "s.col" / "t.col", not Python callables. Mapping[str, Any] is flexible, but without docs users may assume arbitrary Python objects/functions are supported.
  2. Since condition is already in the dataclasses, it would be helpful to mention in the docs that it is reserved for a follow-up and currently raises NotImplementedError.
  3. source: str currently means a Paimon table identifier from the same catalog at the latest snapshot. That is acceptable for the MVP; users who need a pinned source snapshot can pass a Dataset produced by read_paimon(..., snapshot_id=...).

So, API-wise, this is now acceptable to me for the MVP.

Implements SQL MERGE INTO semantics on Paimon data-evolution tables
using Ray Datasets. UPDATE: inner join → vectorized transform →
groupby(_FIRST_ROW_ID) → TableUpdateByRowId partial write. INSERT:
left_anti join → PaimonDatasink. Two-phase commit: workers return
commit messages, driver does a single atomic BatchTableCommit.

MVP scope (per Jingsong review): only update='*'/insert='*', no
conditions, no partial SET. E2e tests skipped pending the
fix-update-all-columns PR.
@XiaoHongbo-Hope
Copy link
Copy Markdown
Contributor Author

Updated after latest comment @JingsongLi

@JingsongLi
Copy link
Copy Markdown
Contributor

+1

@JingsongLi JingsongLi merged commit 0978e4c into apache:master Jun 1, 2026
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants