[ray] Ray merge into#8028
Conversation
|
API Design Recommendations for Paimon Ray merge_into from AI: Based on the design patterns from Delta Lake, Lance, Iceberg/Ray, and Paimon Spark's MergeIntoPaimonDataEvolutionTable: |
|
Thanks for working on this. Since this PR is still marked as draft/WIP and several checks are still running, I am leaving it unapproved for now. Please request review again once the implementation is ready and CI is green. |
leaves12138
left a comment
There was a problem hiding this comment.
Thanks for working on Ray merge_into; the overall decomposition (prepare -> matched/anti joins -> transform -> distributed row-id update/insert -> single commit) is clear, and the API/docs are easy to follow.
However, I do not think this PR is ready to merge in its current state because the core behavior is intentionally disabled by skipped tests and the PR body says it still depends on prerequisite fixes.
Blocking points:
- The new public API is exported from
pypaimon.ray.__init__and documented indocs/docs/pypaimon/ray-data.md, but the main E2E coverage that proves update/insert/combined merge semantics is skipped with@unittest.skip(_SKIP_UPDATE_STAR). The only unskipped tests are validation/projection helpers; they do not exercise successfulmerge_intowrites. - The implementation still has a TODO for the global-index column-update-action check after #8045. Since this path commits row-id updates, merging it before the prerequisite check is wired in may expose an API that accepts tables/configurations we do not actually support safely yet.
- Because the PR itself states it sits behind those prerequisites, I think it should either stay draft/unmerged until the prerequisites land, or remove the public export/docs and keep this as an internal preparatory patch.
Once the prerequisite fixes land, please re-enable the E2E tests (at least matched update, not-matched insert, combined update+insert, renamed ON columns, duplicate source match error, and single-snapshot combined commit) and wire the global-index update-action check before exposing this as a public Ray API. After that I am happy to review again; the structure looks like a good direction.
MERGE INTO TopologyKey Design Decisions
|
29b47b6 to
a8f8f01
Compare
|
@JingsongLi Ready now |
b4bc2cf to
a8d39fc
Compare
leaves12138
left a comment
There was a problem hiding this comment.
Thanks for the update. The API is much closer to the direction discussed earlier: merge_into(..., on=..., when_matched=[WhenMatched(update="*")], when_not_matched=[WhenNotMatched(insert="*")]), two pinned joins, and a single commit are all good choices. Also good to see the successful E2E tests enabled now.
I still have a few API-design concerns before exposing this from pypaimon.ray.__all__ and documenting it as a public API:
-
Please stabilize the public clause shape now. The PR body/docs say partial SET and conditions are deferred, but the exported
WhenMatched/WhenNotMatcheddataclasses currently only containupdate: str/insert: str, andSetSpec = str. If we know the intended final API iscondition="..."plus mapping-based SET/INSERT expressions, I think the public class should already reserve that shape, e.g.WhenMatched(update="*", condition=None)/WhenNotMatched(insert="*", condition=None)and aSetSpectype that includes mappings, while validation can still reject non-"*"or non-Noneconditions for this MVP. Otherwise the first public release of this API does not reflect the planned API surface. -
Do not leave Python callables in the API path.
_needed_target_colsstill has acallable(value)branch. Even though the current_normalize_set_speconly allows"*", this makes it look like Python callables are an intended future SET representation. For Ray merge semantics we should keep expressions analyzable/serializable, preferably string expressions or a small expression DSL, not callables. -
Clarify the
source: strcontract. Accepting a string assourceis convenient, but it currently means “read a Paimon table from the same catalog options at latest snapshot”. The public docs should state that limitation explicitly, or the API should expose source-specific catalog/snapshot options. Otherwise users may assume they can merge from a different catalog or a pinned source snapshot. -
Fix the documented default for
num_partitions. The docs say it defaults tomax(16, cluster_cpus * 2), but_resolve_num_partitionsreturnsmax(1, cpus * 2)and falls back to4. Please make the docs and implementation match before publishing the API. -
Avoid accidentally publishing
TableUpdateByRowIdinternals.FilesInfo,precomputed_files_info, andsnapshot_files_info()are useful for this implementation, but the docstring callssnapshot_files_info()a “Public accessor”. If these are only for Ray merge internals, please mark them private/internal (or at least avoid describing them as public API), because exposing file-index internals will be hard to change later.
So the implementation direction looks fine to me, but I would not expose it as a stable Ray API until these API-surface details are tightened.
164fdb9 to
18c41a2
Compare
|
I took another pass over the latest commit and I still think this should stay unmerged for now because of two semantic issues in the new Ray merge path:
I would fix these before exposing |
leaves12138
left a comment
There was a problem hiding this comment.
API-design follow-up only: the latest version looks much better to me. Reserving condition: Optional[str] on WhenMatched / WhenNotMatched, widening SetSpec to allow mapping-style SET/INSERT specs, documenting the source: str limitation, fixing the num_partitions default in docs, and making FilesInfo internal address the main API-shape concerns I raised earlier.
From the public API perspective, I do not think we need to block this direction anymore. A few non-blocking nits before/after merge:
- Please document that future mapping values are intended to be literals and analyzable expressions/column references such as
"s.col"/"t.col", not Python callables.Mapping[str, Any]is flexible, but without docs users may assume arbitrary Python objects/functions are supported. - Since
conditionis already in the dataclasses, it would be helpful to mention in the docs that it is reserved for a follow-up and currently raisesNotImplementedError. source: strcurrently means a Paimon table identifier from the same catalog at the latest snapshot. That is acceptable for the MVP; users who need a pinned source snapshot can pass aDatasetproduced byread_paimon(..., snapshot_id=...).
So, API-wise, this is now acceptable to me for the MVP.
Implements SQL MERGE INTO semantics on Paimon data-evolution tables using Ray Datasets. UPDATE: inner join → vectorized transform → groupby(_FIRST_ROW_ID) → TableUpdateByRowId partial write. INSERT: left_anti join → PaimonDatasink. Two-phase commit: workers return commit messages, driver does a single atomic BatchTableCommit. MVP scope (per Jingsong review): only update='*'/insert='*', no conditions, no partial SET. E2e tests skipped pending the fix-update-all-columns PR.
18c41a2 to
9162090
Compare
|
Updated after latest comment @JingsongLi |
|
+1 |
Purpose
Add
merge_intofor data-evolution tables on Ray, UPDATE path: inner join →repartition by
_FIRST_ROW_ID→ partial write. INSERT path: left_antijoin → PaimonDatasink. Single atomic commit.
MVP scope : only
update="*"/insert="*".Conditions and partial SET deferred to follow-up.
Tests
14 E2E tests covering matched update, not-matched insert, combined
commit, renamed ON columns, duplicate match error, blob exclusion,
source/parameter validation.