feat(phase-19): track H distributed training lessons 76-81#215
feat(phase-19): track H distributed training lessons 76-81#215rohitg00 wants to merge 20 commits into
Conversation
…cast, allgather, reduce_scatter)
…RO-1 + sharded checkpoint
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (8)
✅ Files skipped from review due to trivial changes (1)
📝 Walkthrough<review_stack_artifact> </review_stack_artifact> 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 12
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
🟡 Minor comments (9)
phases/19-capstone-projects/76-collective-ops-from-scratch/docs/en.md-61-67 (1)
61-67:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winUpdate API descriptions to match the actual code surface.
Lines 61–67 describe outdated signatures and topology details (e.g., explicit
rank/world_sizeargs and ring-only wiring). In code,Meshcarries rank/world metadata, and function signatures arering_allreduce(mesh, tensor),broadcast(mesh, tensor, src), etc. Please align this section to avoid copy/paste misuse.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@phases/19-capstone-projects/76-collective-ops-from-scratch/docs/en.md` around lines 61 - 67, The API docs list outdated signatures and topology details; update the text to match the actual code by removing explicit rank/world_size parameters and noting that Mesh carries rank/world metadata and wiring may not be ring-only. Specifically, change descriptions to reference the actual function/class names and signatures: Mesh (which contains rank/world_size and queue wiring), ring_allreduce(mesh, tensor), broadcast(mesh, tensor, src), allgather(mesh, tensor), reduce_scatter(mesh, tensor), and _gloo_reference(op, world_size, tensor) so the prose matches the implemented API surface and avoids suggesting incorrect call patterns.phases/19-capstone-projects/76-collective-ops-from-scratch/code/main.py-336-336 (1)
336-336:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winFix Ruff B905 and RUF010 in
main.py
- Line 336:
zip(mesh_out, gloo_out)needs an explicitstrict=...argument (B905).- Line 358: replace
str(match)inside the f-string with an explicit conversion flag (e.g.,{match!s:<12}) (RUF010).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@phases/19-capstone-projects/76-collective-ops-from-scratch/code/main.py` at line 336, The zip call over mesh_out and gloo_out uses implicit truncation; change the loop header "for m, g in zip(mesh_out, gloo_out)" to include an explicit strict argument (e.g., strict=True or strict=False) to satisfy Ruff B905 and ensure mismatch behavior is intentional. Also update the f-string that currently uses str(match) to use an explicit conversion/format specifier such as {match!s:<12} (or another alignment/width as needed) to replace str(match) and satisfy RUF010; locate the f-string referencing match and apply the conversion flag.phases/19-capstone-projects/77-data-parallel-ddp/docs/en.md-68-70 (1)
68-70:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winAlign doc symbol names and verification claim with the actual lesson code.
Line 68/69 mention
workerand_reference_single_process_loop, but the implemented functions are_ddp_workerandreference_single_process. Line 77 also claims per-step parameter checksum comparison, while the script prints per-step losses plus final parameter norm.Also applies to: 77-77
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@phases/19-capstone-projects/77-data-parallel-ddp/docs/en.md` around lines 68 - 70, Update the docs to match the actual function names and verification behavior: replace mentions of worker and _reference_single_process_loop with the actual symbols _ddp_worker and reference_single_process, and change the verification claim from "per-step parameter checksum comparison" to describe what the script actually does (prints per-step losses and a final parameter norm) or alternatively implement per-step checksum logic in functions _ddp_worker/reference_single_process if you intend the original claim to be true.phases/19-capstone-projects/77-data-parallel-ddp/tests/test_ddp.py-27-29 (1)
27-29:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winGuard against silent truncation when comparing loss curves.
On Line 27,
zip(ref_losses, ddp_rank0_losses)can hide a length mismatch by truncating to the shorter list. Add an explicit length assertion before iterating.Proposed fix
ref_losses, _ = reference_single_process(world_size=4, steps=10) ddp_rank0_losses, _ = ddp[0] + self.assertEqual( + len(ref_losses), + len(ddp_rank0_losses), + "loss history length mismatch between reference and DDP rank0", + ) for s, (a, b) in enumerate(zip(ref_losses, ddp_rank0_losses)): self.assertAlmostEqual(a, b, places=4, msg=f"divergence at step {s}: ref={a}, ddp={b}")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@phases/19-capstone-projects/77-data-parallel-ddp/tests/test_ddp.py` around lines 27 - 29, Before iterating with the for loop that zips ref_losses and ddp_rank0_losses, add an explicit length check (e.g., assertEqual(len(ref_losses), len(ddp_rank0_losses), msg=...)) so the test fails clearly if one list is shorter; place this assertion immediately before the for s, (a, b) in enumerate(zip(ref_losses, ddp_rank0_losses)) loop in test_ddp.py to prevent silent truncation when comparing loss curves.phases/19-capstone-projects/78-zero-parameter-sharding/docs/en.md-70-73 (1)
70-73:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winAlign documented helper names with the implemented API.
At Line 70, docs reference
flatten_params/unflatten_into, but the implementation exposesgather_flat_params/scatter_flat_to_params. Please sync names to avoid confusion.Proposed fix
-- `flatten_params(module)` and `unflatten_into(module, flat)` that pack a model's parameters into one contiguous tensor and unpack back. The flat layout is what makes sharding by rank a simple slice. +- `gather_flat_params(module)` and `scatter_flat_to_params(module, flat)` that pack a model's parameters into one contiguous tensor and unpack back. The flat layout is what makes sharding by rank a simple slice.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@phases/19-capstone-projects/78-zero-parameter-sharding/docs/en.md` around lines 70 - 73, Docs mention flatten_params/unflatten_into but the code exposes gather_flat_params/scatter_flat_to_params; update the documentation to use the implemented API names (replace flatten_params -> gather_flat_params and unflatten_into -> scatter_flat_to_params) and adjust any example calls or prose (e.g., where step(), ZeroOptimizer, and the demo use the flat pack/unpack helpers) so the documented helper names match the actual functions.phases/19-capstone-projects/78-zero-parameter-sharding/code/main.py-253-253 (1)
253-253:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winUse ceil-based shard sizing in the memory table.
Line 253 uses floor division, which underreports ZeRO-1 optimizer-state bytes when
p_params % world_size != 0. This should mirror padded shard sizing used byshard_bounds.Proposed fix
def memory_table(p_params: int, world_size: int) -> str: @@ - zero1 = (fp16 + fp16) * p_params + (fp32 * 3 * p_params) // world_size + shard_params = (p_params + world_size - 1) // world_size + zero1 = (fp16 + fp16) * p_params + (fp32 * 3 * shard_params)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@phases/19-capstone-projects/78-zero-parameter-sharding/code/main.py` at line 253, The memory calculation for ZeRO-1 shards underreports when p_params % world_size != 0 because zero1 uses floor division; update the computation in the memory table to use ceil-based shard sizing to mirror shard_bounds (i.e., compute per-shard param count as ceil(p_params / world_size) instead of p_params // world_size) so zero1 = (fp16 + fp16) * p_params + (fp32 * 3 * ceil_shard_params) / use the same ceil_shard_params variable wherever shard sizing is used (referencing zero1, p_params, world_size, fp16, fp32 and shard_bounds).phases/19-capstone-projects/81-end-to-end-distributed-train/quiz.json-34-34 (1)
34-34:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winCorrect the LayerNorm statement in the quiz option.
“LayerNorm running statistics” is technically incorrect; LayerNorm does not maintain running mean/variance buffers like BatchNorm.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@phases/19-capstone-projects/81-end-to-end-distributed-train/quiz.json` at line 34, Update the quiz option string that currently reads "GPT adds LayerNorm running statistics, embedding gradient shape, and softmax+cross-entropy edge cases that an MLP would hide" to remove the incorrect claim about LayerNorm maintaining running statistics; instead state that LayerNorm uses per-sample mean/variance or mention its learnable affine parameters (gamma/beta). Locate the offending string (the option text in the quiz entry) and replace "LayerNorm running statistics" with a corrected phrase such as "LayerNorm per-sample mean/variance" or "LayerNorm affine parameters (gamma/beta)" so the option is factually accurate.phases/19-capstone-projects/79-pipeline-parallel/code/main.py-37-47 (1)
37-47:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winValidate pipeline dimensions before computing schedules.
num_stages <= 0ornum_microbatches <= 0currently lead to unclear failures (e.g., empty schedule thenmax()crash). Add explicit argument checks and fail fast withValueError.Proposed patch
+def _validate_dims(num_stages: int, num_microbatches: int) -> None: + if num_stages <= 0: + raise ValueError("num_stages must be > 0") + if num_microbatches <= 0: + raise ValueError("num_microbatches must be > 0") + + def bubble_fraction(num_stages: int, num_microbatches: int) -> float: @@ + _validate_dims(num_stages, num_microbatches) n = num_stages m = num_microbatches return (n - 1) / (m + n - 1) @@ def gpipe_schedule(num_stages: int, num_microbatches: int) -> list: @@ + _validate_dims(num_stages, num_microbatches) n = num_stages m = num_microbatches @@ def measure_bubble(num_stages: int, num_microbatches: int) -> float: @@ + _validate_dims(num_stages, num_microbatches) schedule = gpipe_schedule(num_stages, num_microbatches)Also applies to: 50-71, 90-97
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@phases/19-capstone-projects/79-pipeline-parallel/code/main.py` around lines 37 - 47, Add explicit argument validation to ensure num_stages > 0 and num_microbatches > 0 and raise ValueError with a clear message if not; specifically, update bubble_fraction to check m and n at the top and raise ValueError("num_stages must be > 0 and num_microbatches must be > 0") (or similar), and apply the same guard to the other schedule-generation functions that accept num_stages and num_microbatches so they fail fast instead of producing empty schedules or triggering later crashes.phases/19-capstone-projects/79-pipeline-parallel/docs/en.md-55-58 (1)
55-58:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winUpdate Build It API names to match the actual code.
Lines 55-58 describe
PipelineStage/Pipeline(...), but the implementation exportsStageMLP, schedule utilities, andrun_pipeline(...). This mismatch is confusing for learners.Proposed patch
-- `PipelineStage`: a small `nn.Module` that holds one stage's parameters and exposes `forward(activation)`. -- `Pipeline(stages, num_microbatches)`: orchestrates the GPipe schedule on simulated stages using simulated wall-clock per stage. +- `StageMLP`: a small `nn.Module` used as each stage in the 2-rank demo. +- `run_pipeline(steps, batch, microbatches)`: spawns the 2-rank gloo pipeline and collects per-rank outputs. - `bubble_fraction(num_stages, num_microbatches)`: closed-form (N-1)/(M+N-1). -- A 4-stage demo that prints the per-microbatch trace and the measured bubble fraction. +- A 4-stage schedule demo that prints a Gantt chart plus measured/closed-form bubble fraction.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@phases/19-capstone-projects/79-pipeline-parallel/docs/en.md` around lines 55 - 58, Update the docs to use the actual exported API names instead of the old ones: replace references to PipelineStage and Pipeline(...) with StageMLP (the module implementing a stage), the schedule utilities (names that implement the GPipe schedule), and run_pipeline(...) (the function that orchestrates execution and measures bubble fraction); ensure the list mentions bubble_fraction(...) unchanged and the 4-stage demo that prints per-microbatch trace and measured bubble fraction so the doc matches the real symbols StageMLP, schedule utils, and run_pipeline.
🧹 Nitpick comments (2)
phases/19-capstone-projects/81-end-to-end-distributed-train/tests/test_e2e.py (1)
28-35: ⚡ Quick winRun the expensive E2E setup once per class, not once per test.
This currently executes the full distributed demo for every test method, which is costly and slows CI significantly.
Proposed refactor
class TestEndToEnd(unittest.TestCase): - def setUp(self): - self.out = run_e2e(world_size=WORLD_SIZE, steps=20) - self.results = self.out["results"] - self.ckpt_dir = self.out["ckpt_dir"] + `@classmethod` + def setUpClass(cls): + cls.out = run_e2e(world_size=WORLD_SIZE, steps=20) + cls.results = cls.out["results"] + cls.ckpt_dir = cls.out["ckpt_dir"] - def tearDown(self): - if "workdir" in self.out: - shutil.rmtree(self.out["workdir"], ignore_errors=True) + `@classmethod` + def tearDownClass(cls): + if "workdir" in cls.out: + shutil.rmtree(cls.out["workdir"], ignore_errors=True)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@phases/19-capstone-projects/81-end-to-end-distributed-train/tests/test_e2e.py` around lines 28 - 35, The test currently runs the expensive run_e2e in setUp for every test; change it to run once per test class by moving the run_e2e call into a `@classmethod` setUpClass and store results on cls (e.g., cls.out, cls.results, cls.ckpt_dir) and move the cleanup logic into a corresponding `@classmethod` tearDownClass to remove cls.out["workdir"] with shutil.rmtree; keep instance-level tests referencing self.results/self.ckpt_dir by reading from cls in setUp or directly using self.__class__.results if needed.phases/19-capstone-projects/79-pipeline-parallel/tests/test_pipeline.py (1)
37-45: ⚡ Quick winAssert backward set coverage too, not only counts.
This test currently checks exact
(stage, microbatch)coverage only for forward. Add the same set-equality assertion for backward to catch duplicate/missing backward placements.Proposed patch
self.assertEqual(len(forwards), n * m) self.assertEqual(len(backwards), n * m) self.assertEqual(set(forwards), {(s, mb) for s in range(n) for mb in range(m)}) + self.assertEqual(set(backwards), {(s, mb) for s in range(n) for mb in range(m)})🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@phases/19-capstone-projects/79-pipeline-parallel/tests/test_pipeline.py` around lines 37 - 45, The test test_schedule_covers_every_microbatch_through_every_stage currently verifies forward coverage by asserting set(forwards) equals the expected {(s, mb) for s in range(n) for mb in range(m)} but only checks backwards by count; update the test to also assert set(backwards) equals the same expected set to catch duplicate or missing backward placements produced by gpipe_schedule. Locate the locals forwards and backwards in test_schedule_covers_every_microbatch_through_every_stage and add a set-equality assertion for backwards analogous to the forwards assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@phases/19-capstone-projects/76-collective-ops-from-scratch/code/main.py`:
- Around line 321-327: The worker join logic in run_mesh currently calls
p.join(timeout=30) but doesn't check if processes are still alive or exited
abnormally; after joining each process in procs, explicitly check p.is_alive()
and p.exitcode (for example iterate procs, call p.join(timeout=30) if still
alive call p.terminate() and p.join(short_timeout), and if exitcode is non-zero
raise or surface an error), ensure any terminated/killed workers do not leave
partial data in out_queue before returning results; if a worker failed, raise a
RuntimeError (including the process pid/exitcode) instead of returning
incomplete results (refer to procs, out_queue, world_size, byte_counter,
results).
- Around line 249-271: The worker processes started via ctx.Process in the block
that builds procs must always be reaped even if out_queue.get(...) raises; wrap
the result-collection and out_queue.get loop in a try/finally and move the
join/terminate/exitcode checks for procs into the finally block so cleanup runs
on success and on error, ensuring each Process in procs is joined, checked for
non-zero exitcode (raise after cleanup), and terminated/joined if still alive;
reference the variables procs, out_queue, and the ctx.Process/_gloo_worker
launch in your changes.
In `@phases/19-capstone-projects/77-data-parallel-ddp/code/main.py`:
- Around line 153-169: The result-collection loop currently sits inside a try
that on exception can skip joining/terminating spawned processes, leaking
children; move the process cleanup into a finally block so procs are always
joined/terminated regardless of out_queue.get() failures. Specifically, after
the block that calls out_queue.get(...) (referencing out_queue and results),
ensure the loop that iterates procs and calls
p.join(...)/p.terminate()/p.join(...) is executed in a finally (or a nested
finally) so it runs on all paths; you can then keep the existing cleanup for
init_file and init_dir (FileNotFoundError/OSError handling) either in the same
outer finally or a subsequent one.
In `@phases/19-capstone-projects/78-zero-parameter-sharding/code/main.py`:
- Around line 216-241: The current try/finally only removes init_file/init_dir,
so if out_queue.get(...) raises the for-loop that joins/terminates child
processes (creating Process objects via ctx.Process targeting _zero_worker and
stored in procs) is skipped and workers can be left running; refactor so the
join/terminate loop that iterates procs and calls
p.join()/p.is_alive()/p.terminate() always runs in the outer finally (or a
nested finally inside the try) before cleaning up init_file/init_dir, ensuring
all processes are reaped even when out_queue.get times out or raises.
In `@phases/19-capstone-projects/79-pipeline-parallel/code/main.py`:
- Around line 172-189: The out_queue.get timeout can raise and currently skips
the process cleanup loop, leaking child processes; ensure procs are always
joined/terminated by moving the join/terminate logic into the finally block (or
by wrapping the collection loop in a try/except and performing the same
join/terminate in the existing finally). Specifically, ensure the Process
objects created via ctx.Process (target=_pipe_worker) appended to procs are
iterated in the finally to call p.join(timeout=5), check p.is_alive(), then
p.terminate() and p.join(timeout=2), and preserve collecting results from
out_queue before cleanup when possible.
In `@phases/19-capstone-projects/80-checkpoint-sharded-resume/code/main.py`:
- Around line 179-188: rotate_checkpoints: avoid unstable mtime ordering and
handle keep_last=0 by deterministically sorting directories by the numeric step
extracted from the directory name (e.g., parse int from c.name after "step_")
and then selecting the oldest entries to delete; implement a stable key like
lambda c: int(c.name.split("_", 1)[1]) (with safe parsing/skip for unexpected
names), sort ascending, and compute to_delete = children[:-keep_last] when
keep_last>0 else children to ensure keep_last=0 deletes all.
- Around line 138-140: The rename loop (for tmp, final in tmp_paths:
os.replace(...)) and final manifest replace are not followed by a directory
fsync, so directory metadata may be lost on crash; in the save_sharded function,
after performing the os.replace calls for tmp_paths and
manifest_tmp→manifest_final, open the target directory
(os.open(os.path.dirname(final) or common parent, os.O_RDONLY | getattr(os,
"O_DIRECTORY", 0))), call os.fsync(fd) to persist directory metadata, then close
the fd; apply this same fsync after the manifest replace to ensure the rename
entries are durable.
- Around line 159-176: load_sharded currently reads and indexes shards without
validating their metadata first; before any file I/O validate all entries in
manifest.shards: ensure each shard.rank is an int in [0, manifest.world_size),
detect duplicate ranks (use a set) and raise CheckpointError on duplicates or
out-of-range ranks, and validate shard.path is safe by rejecting absolute paths
or path traversal (resolve shard_path against src and ensure the resolved path
is inside src; raise CheckpointError if not). Only after these checks pass,
allocate per_rank and proceed to open and verify each shard file (using
shard.rank to index per_rank). Use the same CheckpointError behavior for
validation failures so malformed manifests are rejected before any reads or
IndexError can occur.
In `@phases/19-capstone-projects/81-end-to-end-distributed-train/code/main.py`:
- Line 275: The RuntimeError raises use f-strings without interpolation (e.g.,
raise RuntimeError(f"world_size mismatch")) and there are ambiguous
single-letter variable names triggering lint errors (E741/F541); change those
raises to plain string literals (raise RuntimeError("world_size mismatch")) for
each occurrence and rename ambiguous identifiers like l or O to clear names
(e.g., local_rank, global_rank, world_size) wherever they appear to satisfy the
linter and improve clarity while keeping the same behavior.
- Around line 387-389: The except block that currently only calls
shutil.rmtree(workdir, ignore_errors=True) and re-raises can leave spawned
worker processes running; update the failure path to first iterate the list of
worker Process objects (e.g., the variable holding spawned workers such as
workers or worker_procs), call terminate() on each live process and then join()
them (or use multiprocessing.active_children() to find and terminate any
children), handle/ignore errors during termination, then remove the workdir and
re-raise the exception so no worker processes leak across tests/runs.
In `@phases/19-capstone-projects/81-end-to-end-distributed-train/docs/en.md`:
- Line 3: Docs are out of sync: update the lesson doc text to match the
implementation in code/main.py rather than changing code; specifically, replace
the described model hyperparameters (4 layers, hidden 64, vocab 256, seq 32)
with the actual values used (2 layers, dim 32, vocab 64, seq 16), state that the
head is untied (not tied embeddings), change checkpoint description to "step-10
checkpoint" instead of end+halfway checkpoints, describe resume verification as
in-process via the verify_resume routine (not spawn-based), and remove the
incorrect claim about "LayerNorm running stats"; reference the implementation
symbols build_model/GPT (or whatever model constructor is used), verify_resume,
and the checkpoint save_checkpoint/checkpoint_step logic so the doc aligns with
code.
In
`@phases/19-capstone-projects/81-end-to-end-distributed-train/tests/test_e2e.py`:
- Line 69: The loop variable `l` in the enumeration `for s, l in
enumerate(losses):` is ambiguous and triggers E741; rename it to a clear
identifier such as `loss` (e.g., `for s, loss in enumerate(losses):`) and update
all uses inside that loop and any related assertions or log statements
(references to `l`) to `loss` so linting and readability are fixed (look for
this in tests/test_e2e.py around the losses handling).
---
Minor comments:
In `@phases/19-capstone-projects/76-collective-ops-from-scratch/code/main.py`:
- Line 336: The zip call over mesh_out and gloo_out uses implicit truncation;
change the loop header "for m, g in zip(mesh_out, gloo_out)" to include an
explicit strict argument (e.g., strict=True or strict=False) to satisfy Ruff
B905 and ensure mismatch behavior is intentional. Also update the f-string that
currently uses str(match) to use an explicit conversion/format specifier such as
{match!s:<12} (or another alignment/width as needed) to replace str(match) and
satisfy RUF010; locate the f-string referencing match and apply the conversion
flag.
In `@phases/19-capstone-projects/76-collective-ops-from-scratch/docs/en.md`:
- Around line 61-67: The API docs list outdated signatures and topology details;
update the text to match the actual code by removing explicit rank/world_size
parameters and noting that Mesh carries rank/world metadata and wiring may not
be ring-only. Specifically, change descriptions to reference the actual
function/class names and signatures: Mesh (which contains rank/world_size and
queue wiring), ring_allreduce(mesh, tensor), broadcast(mesh, tensor, src),
allgather(mesh, tensor), reduce_scatter(mesh, tensor), and _gloo_reference(op,
world_size, tensor) so the prose matches the implemented API surface and avoids
suggesting incorrect call patterns.
In `@phases/19-capstone-projects/77-data-parallel-ddp/docs/en.md`:
- Around line 68-70: Update the docs to match the actual function names and
verification behavior: replace mentions of worker and
_reference_single_process_loop with the actual symbols _ddp_worker and
reference_single_process, and change the verification claim from "per-step
parameter checksum comparison" to describe what the script actually does (prints
per-step losses and a final parameter norm) or alternatively implement per-step
checksum logic in functions _ddp_worker/reference_single_process if you intend
the original claim to be true.
In `@phases/19-capstone-projects/77-data-parallel-ddp/tests/test_ddp.py`:
- Around line 27-29: Before iterating with the for loop that zips ref_losses and
ddp_rank0_losses, add an explicit length check (e.g.,
assertEqual(len(ref_losses), len(ddp_rank0_losses), msg=...)) so the test fails
clearly if one list is shorter; place this assertion immediately before the for
s, (a, b) in enumerate(zip(ref_losses, ddp_rank0_losses)) loop in test_ddp.py to
prevent silent truncation when comparing loss curves.
In `@phases/19-capstone-projects/78-zero-parameter-sharding/code/main.py`:
- Line 253: The memory calculation for ZeRO-1 shards underreports when p_params
% world_size != 0 because zero1 uses floor division; update the computation in
the memory table to use ceil-based shard sizing to mirror shard_bounds (i.e.,
compute per-shard param count as ceil(p_params / world_size) instead of p_params
// world_size) so zero1 = (fp16 + fp16) * p_params + (fp32 * 3 *
ceil_shard_params) / use the same ceil_shard_params variable wherever shard
sizing is used (referencing zero1, p_params, world_size, fp16, fp32 and
shard_bounds).
In `@phases/19-capstone-projects/78-zero-parameter-sharding/docs/en.md`:
- Around line 70-73: Docs mention flatten_params/unflatten_into but the code
exposes gather_flat_params/scatter_flat_to_params; update the documentation to
use the implemented API names (replace flatten_params -> gather_flat_params and
unflatten_into -> scatter_flat_to_params) and adjust any example calls or prose
(e.g., where step(), ZeroOptimizer, and the demo use the flat pack/unpack
helpers) so the documented helper names match the actual functions.
In `@phases/19-capstone-projects/79-pipeline-parallel/code/main.py`:
- Around line 37-47: Add explicit argument validation to ensure num_stages > 0
and num_microbatches > 0 and raise ValueError with a clear message if not;
specifically, update bubble_fraction to check m and n at the top and raise
ValueError("num_stages must be > 0 and num_microbatches must be > 0") (or
similar), and apply the same guard to the other schedule-generation functions
that accept num_stages and num_microbatches so they fail fast instead of
producing empty schedules or triggering later crashes.
In `@phases/19-capstone-projects/79-pipeline-parallel/docs/en.md`:
- Around line 55-58: Update the docs to use the actual exported API names
instead of the old ones: replace references to PipelineStage and Pipeline(...)
with StageMLP (the module implementing a stage), the schedule utilities (names
that implement the GPipe schedule), and run_pipeline(...) (the function that
orchestrates execution and measures bubble fraction); ensure the list mentions
bubble_fraction(...) unchanged and the 4-stage demo that prints per-microbatch
trace and measured bubble fraction so the doc matches the real symbols StageMLP,
schedule utils, and run_pipeline.
In `@phases/19-capstone-projects/81-end-to-end-distributed-train/quiz.json`:
- Line 34: Update the quiz option string that currently reads "GPT adds
LayerNorm running statistics, embedding gradient shape, and
softmax+cross-entropy edge cases that an MLP would hide" to remove the incorrect
claim about LayerNorm maintaining running statistics; instead state that
LayerNorm uses per-sample mean/variance or mention its learnable affine
parameters (gamma/beta). Locate the offending string (the option text in the
quiz entry) and replace "LayerNorm running statistics" with a corrected phrase
such as "LayerNorm per-sample mean/variance" or "LayerNorm affine parameters
(gamma/beta)" so the option is factually accurate.
---
Nitpick comments:
In `@phases/19-capstone-projects/79-pipeline-parallel/tests/test_pipeline.py`:
- Around line 37-45: The test
test_schedule_covers_every_microbatch_through_every_stage currently verifies
forward coverage by asserting set(forwards) equals the expected {(s, mb) for s
in range(n) for mb in range(m)} but only checks backwards by count; update the
test to also assert set(backwards) equals the same expected set to catch
duplicate or missing backward placements produced by gpipe_schedule. Locate the
locals forwards and backwards in
test_schedule_covers_every_microbatch_through_every_stage and add a set-equality
assertion for backwards analogous to the forwards assertion.
In
`@phases/19-capstone-projects/81-end-to-end-distributed-train/tests/test_e2e.py`:
- Around line 28-35: The test currently runs the expensive run_e2e in setUp for
every test; change it to run once per test class by moving the run_e2e call into
a `@classmethod` setUpClass and store results on cls (e.g., cls.out, cls.results,
cls.ckpt_dir) and move the cleanup logic into a corresponding `@classmethod`
tearDownClass to remove cls.out["workdir"] with shutil.rmtree; keep
instance-level tests referencing self.results/self.ckpt_dir by reading from cls
in setUp or directly using self.__class__.results if needed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: f49bb96d-a12e-4f1a-9754-50edb5d56b68
📒 Files selected for processing (25)
catalog.jsonphases/19-capstone-projects/76-collective-ops-from-scratch/code/main.pyphases/19-capstone-projects/76-collective-ops-from-scratch/docs/en.mdphases/19-capstone-projects/76-collective-ops-from-scratch/quiz.jsonphases/19-capstone-projects/76-collective-ops-from-scratch/tests/test_collectives.pyphases/19-capstone-projects/77-data-parallel-ddp/code/main.pyphases/19-capstone-projects/77-data-parallel-ddp/docs/en.mdphases/19-capstone-projects/77-data-parallel-ddp/quiz.jsonphases/19-capstone-projects/77-data-parallel-ddp/tests/test_ddp.pyphases/19-capstone-projects/78-zero-parameter-sharding/code/main.pyphases/19-capstone-projects/78-zero-parameter-sharding/docs/en.mdphases/19-capstone-projects/78-zero-parameter-sharding/quiz.jsonphases/19-capstone-projects/78-zero-parameter-sharding/tests/test_zero.pyphases/19-capstone-projects/79-pipeline-parallel/code/main.pyphases/19-capstone-projects/79-pipeline-parallel/docs/en.mdphases/19-capstone-projects/79-pipeline-parallel/quiz.jsonphases/19-capstone-projects/79-pipeline-parallel/tests/test_pipeline.pyphases/19-capstone-projects/80-checkpoint-sharded-resume/code/main.pyphases/19-capstone-projects/80-checkpoint-sharded-resume/docs/en.mdphases/19-capstone-projects/80-checkpoint-sharded-resume/quiz.jsonphases/19-capstone-projects/80-checkpoint-sharded-resume/tests/test_checkpoint.pyphases/19-capstone-projects/81-end-to-end-distributed-train/code/main.pyphases/19-capstone-projects/81-end-to-end-distributed-train/docs/en.mdphases/19-capstone-projects/81-end-to-end-distributed-train/quiz.jsonphases/19-capstone-projects/81-end-to-end-distributed-train/tests/test_e2e.py
| results = {} | ||
| for _ in range(world_size): | ||
| rank, tensor = out_queue.get(timeout=60) | ||
| results[rank] = tensor | ||
| for p in procs: | ||
| p.join(timeout=30) | ||
| return [results[r] for r in range(world_size)], byte_counter.value |
There was a problem hiding this comment.
Harden mesh worker lifecycle checks before returning results.
run_mesh joins with a timeout (Line 326) but does not verify is_alive()/exitcode. A stuck or crashed worker can be missed after queue collection. Add explicit post-join checks and terminate lingering processes.
Suggested fix
for p in procs:
- p.join(timeout=30)
+ p.join(timeout=30)
+ if p.is_alive():
+ p.terminate()
+ p.join(timeout=2)
+ raise RuntimeError("mesh worker did not exit cleanly")
+ if p.exitcode != 0:
+ raise RuntimeError(f"mesh worker exited with {p.exitcode}")
return [results[r] for r in range(world_size)], byte_counter.value🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@phases/19-capstone-projects/76-collective-ops-from-scratch/code/main.py`
around lines 321 - 327, The worker join logic in run_mesh currently calls
p.join(timeout=30) but doesn't check if processes are still alive or exited
abnormally; after joining each process in procs, explicitly check p.is_alive()
and p.exitcode (for example iterate procs, call p.join(timeout=30) if still
alive call p.terminate() and p.join(short_timeout), and if exitcode is non-zero
raise or surface an error), ensure any terminated/killed workers do not leave
partial data in out_queue before returning results; if a worker failed, raise a
RuntimeError (including the process pid/exitcode) instead of returning
incomplete results (refer to procs, out_queue, world_size, byte_counter,
results).
| def rotate_checkpoints(parent_dir: str, keep_last: int = 5) -> list: | ||
| """Delete oldest checkpoint directories so only the most recent keep_last remain.""" | ||
| parent = Path(parent_dir) | ||
| if not parent.exists(): | ||
| return [] | ||
| children = sorted( | ||
| [c for c in parent.iterdir() if c.is_dir() and c.name.startswith("step_")], | ||
| key=lambda c: c.stat().st_mtime, | ||
| ) | ||
| to_delete = children[:-keep_last] if len(children) > keep_last else [] |
There was a problem hiding this comment.
Rotation ordering is unstable and keep_last=0 currently keeps everything.
Sorting by mtime can be nondeterministic for rapidly-created dirs; and with keep_last=0, children[:-0] becomes empty, so nothing is deleted. This can silently break disk-budget enforcement.
🔧 Suggested fix
def rotate_checkpoints(parent_dir: str, keep_last: int = 5) -> list:
@@
+ if keep_last < 1:
+ raise ValueError("keep_last must be >= 1")
+
+ def _step_num(p: Path) -> int:
+ # step_0007 -> 7
+ try:
+ return int(p.name.split("_", 1)[1])
+ except (IndexError, ValueError):
+ return -1
+
children = sorted(
[c for c in parent.iterdir() if c.is_dir() and c.name.startswith("step_")],
- key=lambda c: c.stat().st_mtime,
+ key=_step_num,
)
- to_delete = children[:-keep_last] if len(children) > keep_last else []
+ to_delete = children[:-keep_last] if len(children) > keep_last else []🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@phases/19-capstone-projects/80-checkpoint-sharded-resume/code/main.py` around
lines 179 - 188, rotate_checkpoints: avoid unstable mtime ordering and handle
keep_last=0 by deterministically sorting directories by the numeric step
extracted from the directory name (e.g., parse int from c.name after "step_")
and then selecting the oldest entries to delete; implement a stable key like
lambda c: int(c.name.split("_", 1)[1]) (with safe parsing/skip for unexpected
names), sort ascending, and compute to_delete = children[:-keep_last] when
keep_last>0 else children to ensure keep_last=0 deletes all.
…ot leak processes
…cannot leak processes
… fix keep_last=0 rotation
…eap workers in finally, sync docs with current model dims
# Conflicts: # catalog.json
Summary
phases/19-capstone-projects/covering distributed-training internals from collectives up to an end-to-end demo.code/main.py,tests/test_*.py, andquiz.json.unittestand pass againsttorch.distributedgloo on CPU.76-collective-ops-from-scratchmultiprocessing.Queuemesh; verified byte-equal against gloo77-data-parallel-ddp78-zero-parameter-sharding79-pipeline-parallel(N-1)/(M+N-1)matches measured; 2-stage real pipeline over gloo80-checkpoint-sharded-resume81-end-to-end-distributed-trainImplementation notes
multiprocessingplustorch(no DeepSpeed, FSDP, fairscale, accelerate).GLOO_SOCKET_IFNAME=lo0/loso macOS does not hit the libuv accept bug.Test plan
python3 phases/19-capstone-projects/76-collective-ops-from-scratch/code/main.pyexits 0; all 4 primitives match gloo reference within float epsilon.python3 -m unittest discover -s phases/19-capstone-projects/76-collective-ops-from-scratch/tests -vpasses 7 tests.python3 phases/19-capstone-projects/77-data-parallel-ddp/code/main.pyexits 0; DDP rank-0 loss matches single-process reference.python3 -m unittest discover -s phases/19-capstone-projects/77-data-parallel-ddp/tests -vpasses 6 tests.python3 phases/19-capstone-projects/78-zero-parameter-sharding/code/main.pyexits 0; all ranks end with identical param norm; ZeRO-1 shard bytes match formula.python3 -m unittest discover -s phases/19-capstone-projects/78-zero-parameter-sharding/tests -vpasses 7 tests.python3 phases/19-capstone-projects/79-pipeline-parallel/code/main.pyexits 0; closed-form bubble matches measured.python3 -m unittest discover -s phases/19-capstone-projects/79-pipeline-parallel/tests -vpasses 6 tests.python3 phases/19-capstone-projects/80-checkpoint-sharded-resume/code/main.pyexits 0; round-trip byte-equal; tamper and world-size mismatch rejected.python3 -m unittest discover -s phases/19-capstone-projects/80-checkpoint-sharded-resume/tests -vpasses 7 tests.python3 phases/19-capstone-projects/81-end-to-end-distributed-train/code/main.pyexits 0; prints RESUME VERIFIED.python3 -m unittest discover -s phases/19-capstone-projects/81-end-to-end-distributed-train/tests -vpasses 6 tests.