Skip to content

feat(phase-19): track H distributed training lessons 76-81#215

Open
rohitg00 wants to merge 20 commits into
mainfrom
feat/phase-19-track-h
Open

feat(phase-19): track H distributed training lessons 76-81#215
rohitg00 wants to merge 20 commits into
mainfrom
feat/phase-19-track-h

Conversation

@rohitg00
Copy link
Copy Markdown
Owner

Summary

  • Six new capstone lessons under phases/19-capstone-projects/ covering distributed-training internals from collectives up to an end-to-end demo.
  • Every lesson ships docs (en.md with mermaid diagram), code/main.py, tests/test_*.py, and quiz.json.
  • All demos are self-terminating and exit 0; tests use unittest and pass against torch.distributed gloo on CPU.
# Lesson What it builds
76 76-collective-ops-from-scratch Ring allreduce, tree broadcast, allgather, reduce_scatter on a multiprocessing.Queue mesh; verified byte-equal against gloo
77 77-data-parallel-ddp DDP wrapper: broadcast at init, allreduce-mean grads after backward; reference single-process loop proves per-step equivalence
78 78-zero-parameter-sharding ZeRO-1 with sharded Adam, reduce_scatter on grads, allgather on updated params, memory table
79 79-pipeline-parallel GPipe scheduler with bubble Gantt chart; closed-form (N-1)/(M+N-1) matches measured; 2-stage real pipeline over gloo
80 80-checkpoint-sharded-resume Per-rank shard files + sha256 manifest + atomic write + rotation; three failure modes exercised
81 81-end-to-end-distributed-train 4-rank tiny GPT training composing DDP + ZeRO-1 + sharded checkpoint; 20 steps; resume verified byte-equal

Implementation notes

  • Python stdlib multiprocessing plus torch (no DeepSpeed, FSDP, fairscale, accelerate).
  • gloo backend with file rendezvous; sets GLOO_SOCKET_IFNAME=lo0/lo so macOS does not hit the libuv accept bug.
  • Lessons 76, 77, 78, 81 spawn workers; lessons 79 (scheduler) and 80 (checkpoint) run single-process.

Test plan

  • python3 phases/19-capstone-projects/76-collective-ops-from-scratch/code/main.py exits 0; all 4 primitives match gloo reference within float epsilon.
  • python3 -m unittest discover -s phases/19-capstone-projects/76-collective-ops-from-scratch/tests -v passes 7 tests.
  • python3 phases/19-capstone-projects/77-data-parallel-ddp/code/main.py exits 0; DDP rank-0 loss matches single-process reference.
  • python3 -m unittest discover -s phases/19-capstone-projects/77-data-parallel-ddp/tests -v passes 6 tests.
  • python3 phases/19-capstone-projects/78-zero-parameter-sharding/code/main.py exits 0; all ranks end with identical param norm; ZeRO-1 shard bytes match formula.
  • python3 -m unittest discover -s phases/19-capstone-projects/78-zero-parameter-sharding/tests -v passes 7 tests.
  • python3 phases/19-capstone-projects/79-pipeline-parallel/code/main.py exits 0; closed-form bubble matches measured.
  • python3 -m unittest discover -s phases/19-capstone-projects/79-pipeline-parallel/tests -v passes 6 tests.
  • python3 phases/19-capstone-projects/80-checkpoint-sharded-resume/code/main.py exits 0; round-trip byte-equal; tamper and world-size mismatch rejected.
  • python3 -m unittest discover -s phases/19-capstone-projects/80-checkpoint-sharded-resume/tests -v passes 7 tests.
  • python3 phases/19-capstone-projects/81-end-to-end-distributed-train/code/main.py exits 0; prints RESUME VERIFIED.
  • python3 -m unittest discover -s phases/19-capstone-projects/81-end-to-end-distributed-train/tests -v passes 6 tests.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 26, 2026

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: ed44376a-3913-4511-abe1-9dc8e0c86e55

📥 Commits

Reviewing files that changed from the base of the PR and between fe176d1 and fbe907a.

📒 Files selected for processing (8)
  • phases/19-capstone-projects/76-collective-ops-from-scratch/code/main.py
  • phases/19-capstone-projects/77-data-parallel-ddp/code/main.py
  • phases/19-capstone-projects/78-zero-parameter-sharding/code/main.py
  • phases/19-capstone-projects/79-pipeline-parallel/code/main.py
  • phases/19-capstone-projects/80-checkpoint-sharded-resume/code/main.py
  • phases/19-capstone-projects/81-end-to-end-distributed-train/code/main.py
  • phases/19-capstone-projects/81-end-to-end-distributed-train/docs/en.md
  • phases/19-capstone-projects/81-end-to-end-distributed-train/tests/test_e2e.py
✅ Files skipped from review due to trivial changes (1)
  • phases/19-capstone-projects/81-end-to-end-distributed-train/docs/en.md

📝 Walkthrough

<review_stack_artifact>

</review_stack_artifact>

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 26.09% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely describes the main change: addition of six new distributed training capstone lessons (76-81) to phase 19, directly matching the substantial additions in the changeset.
Description check ✅ Passed The description provides detailed context for the PR changes, including a summary table of six new lessons with their content, implementation notes about using multiprocessing and gloo backend, and a comprehensive test plan for each lesson.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/phase-19-track-h
⚔️ Resolve merge conflicts
  • Resolve merge conflict in branch feat/phase-19-track-h

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

Note

Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.

🟡 Minor comments (9)
phases/19-capstone-projects/76-collective-ops-from-scratch/docs/en.md-61-67 (1)

61-67: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Update API descriptions to match the actual code surface.

Lines 61–67 describe outdated signatures and topology details (e.g., explicit rank/world_size args and ring-only wiring). In code, Mesh carries rank/world metadata, and function signatures are ring_allreduce(mesh, tensor), broadcast(mesh, tensor, src), etc. Please align this section to avoid copy/paste misuse.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@phases/19-capstone-projects/76-collective-ops-from-scratch/docs/en.md` around
lines 61 - 67, The API docs list outdated signatures and topology details;
update the text to match the actual code by removing explicit rank/world_size
parameters and noting that Mesh carries rank/world metadata and wiring may not
be ring-only. Specifically, change descriptions to reference the actual
function/class names and signatures: Mesh (which contains rank/world_size and
queue wiring), ring_allreduce(mesh, tensor), broadcast(mesh, tensor, src),
allgather(mesh, tensor), reduce_scatter(mesh, tensor), and _gloo_reference(op,
world_size, tensor) so the prose matches the implemented API surface and avoids
suggesting incorrect call patterns.
phases/19-capstone-projects/76-collective-ops-from-scratch/code/main.py-336-336 (1)

336-336: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Fix Ruff B905 and RUF010 in main.py

  • Line 336: zip(mesh_out, gloo_out) needs an explicit strict=... argument (B905).
  • Line 358: replace str(match) inside the f-string with an explicit conversion flag (e.g., {match!s:<12}) (RUF010).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@phases/19-capstone-projects/76-collective-ops-from-scratch/code/main.py` at
line 336, The zip call over mesh_out and gloo_out uses implicit truncation;
change the loop header "for m, g in zip(mesh_out, gloo_out)" to include an
explicit strict argument (e.g., strict=True or strict=False) to satisfy Ruff
B905 and ensure mismatch behavior is intentional. Also update the f-string that
currently uses str(match) to use an explicit conversion/format specifier such as
{match!s:<12} (or another alignment/width as needed) to replace str(match) and
satisfy RUF010; locate the f-string referencing match and apply the conversion
flag.
phases/19-capstone-projects/77-data-parallel-ddp/docs/en.md-68-70 (1)

68-70: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Align doc symbol names and verification claim with the actual lesson code.

Line 68/69 mention worker and _reference_single_process_loop, but the implemented functions are _ddp_worker and reference_single_process. Line 77 also claims per-step parameter checksum comparison, while the script prints per-step losses plus final parameter norm.

Also applies to: 77-77

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@phases/19-capstone-projects/77-data-parallel-ddp/docs/en.md` around lines 68
- 70, Update the docs to match the actual function names and verification
behavior: replace mentions of worker and _reference_single_process_loop with the
actual symbols _ddp_worker and reference_single_process, and change the
verification claim from "per-step parameter checksum comparison" to describe
what the script actually does (prints per-step losses and a final parameter
norm) or alternatively implement per-step checksum logic in functions
_ddp_worker/reference_single_process if you intend the original claim to be
true.
phases/19-capstone-projects/77-data-parallel-ddp/tests/test_ddp.py-27-29 (1)

27-29: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Guard against silent truncation when comparing loss curves.

On Line 27, zip(ref_losses, ddp_rank0_losses) can hide a length mismatch by truncating to the shorter list. Add an explicit length assertion before iterating.

Proposed fix
         ref_losses, _ = reference_single_process(world_size=4, steps=10)
         ddp_rank0_losses, _ = ddp[0]
+        self.assertEqual(
+            len(ref_losses),
+            len(ddp_rank0_losses),
+            "loss history length mismatch between reference and DDP rank0",
+        )
         for s, (a, b) in enumerate(zip(ref_losses, ddp_rank0_losses)):
             self.assertAlmostEqual(a, b, places=4,
                                    msg=f"divergence at step {s}: ref={a}, ddp={b}")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@phases/19-capstone-projects/77-data-parallel-ddp/tests/test_ddp.py` around
lines 27 - 29, Before iterating with the for loop that zips ref_losses and
ddp_rank0_losses, add an explicit length check (e.g.,
assertEqual(len(ref_losses), len(ddp_rank0_losses), msg=...)) so the test fails
clearly if one list is shorter; place this assertion immediately before the for
s, (a, b) in enumerate(zip(ref_losses, ddp_rank0_losses)) loop in test_ddp.py to
prevent silent truncation when comparing loss curves.
phases/19-capstone-projects/78-zero-parameter-sharding/docs/en.md-70-73 (1)

70-73: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Align documented helper names with the implemented API.

At Line 70, docs reference flatten_params / unflatten_into, but the implementation exposes gather_flat_params / scatter_flat_to_params. Please sync names to avoid confusion.

Proposed fix
-- `flatten_params(module)` and `unflatten_into(module, flat)` that pack a model's parameters into one contiguous tensor and unpack back. The flat layout is what makes sharding by rank a simple slice.
+- `gather_flat_params(module)` and `scatter_flat_to_params(module, flat)` that pack a model's parameters into one contiguous tensor and unpack back. The flat layout is what makes sharding by rank a simple slice.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@phases/19-capstone-projects/78-zero-parameter-sharding/docs/en.md` around
lines 70 - 73, Docs mention flatten_params/unflatten_into but the code exposes
gather_flat_params/scatter_flat_to_params; update the documentation to use the
implemented API names (replace flatten_params -> gather_flat_params and
unflatten_into -> scatter_flat_to_params) and adjust any example calls or prose
(e.g., where step(), ZeroOptimizer, and the demo use the flat pack/unpack
helpers) so the documented helper names match the actual functions.
phases/19-capstone-projects/78-zero-parameter-sharding/code/main.py-253-253 (1)

253-253: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Use ceil-based shard sizing in the memory table.

Line 253 uses floor division, which underreports ZeRO-1 optimizer-state bytes when p_params % world_size != 0. This should mirror padded shard sizing used by shard_bounds.

Proposed fix
 def memory_table(p_params: int, world_size: int) -> str:
@@
-    zero1 = (fp16 + fp16) * p_params + (fp32 * 3 * p_params) // world_size
+    shard_params = (p_params + world_size - 1) // world_size
+    zero1 = (fp16 + fp16) * p_params + (fp32 * 3 * shard_params)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@phases/19-capstone-projects/78-zero-parameter-sharding/code/main.py` at line
253, The memory calculation for ZeRO-1 shards underreports when p_params %
world_size != 0 because zero1 uses floor division; update the computation in the
memory table to use ceil-based shard sizing to mirror shard_bounds (i.e.,
compute per-shard param count as ceil(p_params / world_size) instead of p_params
// world_size) so zero1 = (fp16 + fp16) * p_params + (fp32 * 3 *
ceil_shard_params) / use the same ceil_shard_params variable wherever shard
sizing is used (referencing zero1, p_params, world_size, fp16, fp32 and
shard_bounds).
phases/19-capstone-projects/81-end-to-end-distributed-train/quiz.json-34-34 (1)

34-34: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Correct the LayerNorm statement in the quiz option.

“LayerNorm running statistics” is technically incorrect; LayerNorm does not maintain running mean/variance buffers like BatchNorm.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@phases/19-capstone-projects/81-end-to-end-distributed-train/quiz.json` at
line 34, Update the quiz option string that currently reads "GPT adds LayerNorm
running statistics, embedding gradient shape, and softmax+cross-entropy edge
cases that an MLP would hide" to remove the incorrect claim about LayerNorm
maintaining running statistics; instead state that LayerNorm uses per-sample
mean/variance or mention its learnable affine parameters (gamma/beta). Locate
the offending string (the option text in the quiz entry) and replace "LayerNorm
running statistics" with a corrected phrase such as "LayerNorm per-sample
mean/variance" or "LayerNorm affine parameters (gamma/beta)" so the option is
factually accurate.
phases/19-capstone-projects/79-pipeline-parallel/code/main.py-37-47 (1)

37-47: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Validate pipeline dimensions before computing schedules.

num_stages <= 0 or num_microbatches <= 0 currently lead to unclear failures (e.g., empty schedule then max() crash). Add explicit argument checks and fail fast with ValueError.

Proposed patch
+def _validate_dims(num_stages: int, num_microbatches: int) -> None:
+    if num_stages <= 0:
+        raise ValueError("num_stages must be > 0")
+    if num_microbatches <= 0:
+        raise ValueError("num_microbatches must be > 0")
+
+
 def bubble_fraction(num_stages: int, num_microbatches: int) -> float:
@@
+    _validate_dims(num_stages, num_microbatches)
     n = num_stages
     m = num_microbatches
     return (n - 1) / (m + n - 1)
@@
 def gpipe_schedule(num_stages: int, num_microbatches: int) -> list:
@@
+    _validate_dims(num_stages, num_microbatches)
     n = num_stages
     m = num_microbatches
@@
 def measure_bubble(num_stages: int, num_microbatches: int) -> float:
@@
+    _validate_dims(num_stages, num_microbatches)
     schedule = gpipe_schedule(num_stages, num_microbatches)

Also applies to: 50-71, 90-97

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@phases/19-capstone-projects/79-pipeline-parallel/code/main.py` around lines
37 - 47, Add explicit argument validation to ensure num_stages > 0 and
num_microbatches > 0 and raise ValueError with a clear message if not;
specifically, update bubble_fraction to check m and n at the top and raise
ValueError("num_stages must be > 0 and num_microbatches must be > 0") (or
similar), and apply the same guard to the other schedule-generation functions
that accept num_stages and num_microbatches so they fail fast instead of
producing empty schedules or triggering later crashes.
phases/19-capstone-projects/79-pipeline-parallel/docs/en.md-55-58 (1)

55-58: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Update Build It API names to match the actual code.

Lines 55-58 describe PipelineStage/Pipeline(...), but the implementation exports StageMLP, schedule utilities, and run_pipeline(...). This mismatch is confusing for learners.

Proposed patch
-- `PipelineStage`: a small `nn.Module` that holds one stage's parameters and exposes `forward(activation)`.
-- `Pipeline(stages, num_microbatches)`: orchestrates the GPipe schedule on simulated stages using simulated wall-clock per stage.
+- `StageMLP`: a small `nn.Module` used as each stage in the 2-rank demo.
+- `run_pipeline(steps, batch, microbatches)`: spawns the 2-rank gloo pipeline and collects per-rank outputs.
 - `bubble_fraction(num_stages, num_microbatches)`: closed-form (N-1)/(M+N-1).
-- A 4-stage demo that prints the per-microbatch trace and the measured bubble fraction.
+- A 4-stage schedule demo that prints a Gantt chart plus measured/closed-form bubble fraction.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@phases/19-capstone-projects/79-pipeline-parallel/docs/en.md` around lines 55
- 58, Update the docs to use the actual exported API names instead of the old
ones: replace references to PipelineStage and Pipeline(...) with StageMLP (the
module implementing a stage), the schedule utilities (names that implement the
GPipe schedule), and run_pipeline(...) (the function that orchestrates execution
and measures bubble fraction); ensure the list mentions bubble_fraction(...)
unchanged and the 4-stage demo that prints per-microbatch trace and measured
bubble fraction so the doc matches the real symbols StageMLP, schedule utils,
and run_pipeline.
🧹 Nitpick comments (2)
phases/19-capstone-projects/81-end-to-end-distributed-train/tests/test_e2e.py (1)

28-35: ⚡ Quick win

Run the expensive E2E setup once per class, not once per test.

This currently executes the full distributed demo for every test method, which is costly and slows CI significantly.

Proposed refactor
 class TestEndToEnd(unittest.TestCase):
-    def setUp(self):
-        self.out = run_e2e(world_size=WORLD_SIZE, steps=20)
-        self.results = self.out["results"]
-        self.ckpt_dir = self.out["ckpt_dir"]
+    `@classmethod`
+    def setUpClass(cls):
+        cls.out = run_e2e(world_size=WORLD_SIZE, steps=20)
+        cls.results = cls.out["results"]
+        cls.ckpt_dir = cls.out["ckpt_dir"]
 
-    def tearDown(self):
-        if "workdir" in self.out:
-            shutil.rmtree(self.out["workdir"], ignore_errors=True)
+    `@classmethod`
+    def tearDownClass(cls):
+        if "workdir" in cls.out:
+            shutil.rmtree(cls.out["workdir"], ignore_errors=True)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@phases/19-capstone-projects/81-end-to-end-distributed-train/tests/test_e2e.py`
around lines 28 - 35, The test currently runs the expensive run_e2e in setUp for
every test; change it to run once per test class by moving the run_e2e call into
a `@classmethod` setUpClass and store results on cls (e.g., cls.out, cls.results,
cls.ckpt_dir) and move the cleanup logic into a corresponding `@classmethod`
tearDownClass to remove cls.out["workdir"] with shutil.rmtree; keep
instance-level tests referencing self.results/self.ckpt_dir by reading from cls
in setUp or directly using self.__class__.results if needed.
phases/19-capstone-projects/79-pipeline-parallel/tests/test_pipeline.py (1)

37-45: ⚡ Quick win

Assert backward set coverage too, not only counts.

This test currently checks exact (stage, microbatch) coverage only for forward. Add the same set-equality assertion for backward to catch duplicate/missing backward placements.

Proposed patch
         self.assertEqual(len(forwards), n * m)
         self.assertEqual(len(backwards), n * m)
         self.assertEqual(set(forwards), {(s, mb) for s in range(n) for mb in range(m)})
+        self.assertEqual(set(backwards), {(s, mb) for s in range(n) for mb in range(m)})
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@phases/19-capstone-projects/79-pipeline-parallel/tests/test_pipeline.py`
around lines 37 - 45, The test
test_schedule_covers_every_microbatch_through_every_stage currently verifies
forward coverage by asserting set(forwards) equals the expected {(s, mb) for s
in range(n) for mb in range(m)} but only checks backwards by count; update the
test to also assert set(backwards) equals the same expected set to catch
duplicate or missing backward placements produced by gpipe_schedule. Locate the
locals forwards and backwards in
test_schedule_covers_every_microbatch_through_every_stage and add a set-equality
assertion for backwards analogous to the forwards assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@phases/19-capstone-projects/76-collective-ops-from-scratch/code/main.py`:
- Around line 321-327: The worker join logic in run_mesh currently calls
p.join(timeout=30) but doesn't check if processes are still alive or exited
abnormally; after joining each process in procs, explicitly check p.is_alive()
and p.exitcode (for example iterate procs, call p.join(timeout=30) if still
alive call p.terminate() and p.join(short_timeout), and if exitcode is non-zero
raise or surface an error), ensure any terminated/killed workers do not leave
partial data in out_queue before returning results; if a worker failed, raise a
RuntimeError (including the process pid/exitcode) instead of returning
incomplete results (refer to procs, out_queue, world_size, byte_counter,
results).
- Around line 249-271: The worker processes started via ctx.Process in the block
that builds procs must always be reaped even if out_queue.get(...) raises; wrap
the result-collection and out_queue.get loop in a try/finally and move the
join/terminate/exitcode checks for procs into the finally block so cleanup runs
on success and on error, ensuring each Process in procs is joined, checked for
non-zero exitcode (raise after cleanup), and terminated/joined if still alive;
reference the variables procs, out_queue, and the ctx.Process/_gloo_worker
launch in your changes.

In `@phases/19-capstone-projects/77-data-parallel-ddp/code/main.py`:
- Around line 153-169: The result-collection loop currently sits inside a try
that on exception can skip joining/terminating spawned processes, leaking
children; move the process cleanup into a finally block so procs are always
joined/terminated regardless of out_queue.get() failures. Specifically, after
the block that calls out_queue.get(...) (referencing out_queue and results),
ensure the loop that iterates procs and calls
p.join(...)/p.terminate()/p.join(...) is executed in a finally (or a nested
finally) so it runs on all paths; you can then keep the existing cleanup for
init_file and init_dir (FileNotFoundError/OSError handling) either in the same
outer finally or a subsequent one.

In `@phases/19-capstone-projects/78-zero-parameter-sharding/code/main.py`:
- Around line 216-241: The current try/finally only removes init_file/init_dir,
so if out_queue.get(...) raises the for-loop that joins/terminates child
processes (creating Process objects via ctx.Process targeting _zero_worker and
stored in procs) is skipped and workers can be left running; refactor so the
join/terminate loop that iterates procs and calls
p.join()/p.is_alive()/p.terminate() always runs in the outer finally (or a
nested finally inside the try) before cleaning up init_file/init_dir, ensuring
all processes are reaped even when out_queue.get times out or raises.

In `@phases/19-capstone-projects/79-pipeline-parallel/code/main.py`:
- Around line 172-189: The out_queue.get timeout can raise and currently skips
the process cleanup loop, leaking child processes; ensure procs are always
joined/terminated by moving the join/terminate logic into the finally block (or
by wrapping the collection loop in a try/except and performing the same
join/terminate in the existing finally). Specifically, ensure the Process
objects created via ctx.Process (target=_pipe_worker) appended to procs are
iterated in the finally to call p.join(timeout=5), check p.is_alive(), then
p.terminate() and p.join(timeout=2), and preserve collecting results from
out_queue before cleanup when possible.

In `@phases/19-capstone-projects/80-checkpoint-sharded-resume/code/main.py`:
- Around line 179-188: rotate_checkpoints: avoid unstable mtime ordering and
handle keep_last=0 by deterministically sorting directories by the numeric step
extracted from the directory name (e.g., parse int from c.name after "step_")
and then selecting the oldest entries to delete; implement a stable key like
lambda c: int(c.name.split("_", 1)[1]) (with safe parsing/skip for unexpected
names), sort ascending, and compute to_delete = children[:-keep_last] when
keep_last>0 else children to ensure keep_last=0 deletes all.
- Around line 138-140: The rename loop (for tmp, final in tmp_paths:
os.replace(...)) and final manifest replace are not followed by a directory
fsync, so directory metadata may be lost on crash; in the save_sharded function,
after performing the os.replace calls for tmp_paths and
manifest_tmp→manifest_final, open the target directory
(os.open(os.path.dirname(final) or common parent, os.O_RDONLY | getattr(os,
"O_DIRECTORY", 0))), call os.fsync(fd) to persist directory metadata, then close
the fd; apply this same fsync after the manifest replace to ensure the rename
entries are durable.
- Around line 159-176: load_sharded currently reads and indexes shards without
validating their metadata first; before any file I/O validate all entries in
manifest.shards: ensure each shard.rank is an int in [0, manifest.world_size),
detect duplicate ranks (use a set) and raise CheckpointError on duplicates or
out-of-range ranks, and validate shard.path is safe by rejecting absolute paths
or path traversal (resolve shard_path against src and ensure the resolved path
is inside src; raise CheckpointError if not). Only after these checks pass,
allocate per_rank and proceed to open and verify each shard file (using
shard.rank to index per_rank). Use the same CheckpointError behavior for
validation failures so malformed manifests are rejected before any reads or
IndexError can occur.

In `@phases/19-capstone-projects/81-end-to-end-distributed-train/code/main.py`:
- Line 275: The RuntimeError raises use f-strings without interpolation (e.g.,
raise RuntimeError(f"world_size mismatch")) and there are ambiguous
single-letter variable names triggering lint errors (E741/F541); change those
raises to plain string literals (raise RuntimeError("world_size mismatch")) for
each occurrence and rename ambiguous identifiers like l or O to clear names
(e.g., local_rank, global_rank, world_size) wherever they appear to satisfy the
linter and improve clarity while keeping the same behavior.
- Around line 387-389: The except block that currently only calls
shutil.rmtree(workdir, ignore_errors=True) and re-raises can leave spawned
worker processes running; update the failure path to first iterate the list of
worker Process objects (e.g., the variable holding spawned workers such as
workers or worker_procs), call terminate() on each live process and then join()
them (or use multiprocessing.active_children() to find and terminate any
children), handle/ignore errors during termination, then remove the workdir and
re-raise the exception so no worker processes leak across tests/runs.

In `@phases/19-capstone-projects/81-end-to-end-distributed-train/docs/en.md`:
- Line 3: Docs are out of sync: update the lesson doc text to match the
implementation in code/main.py rather than changing code; specifically, replace
the described model hyperparameters (4 layers, hidden 64, vocab 256, seq 32)
with the actual values used (2 layers, dim 32, vocab 64, seq 16), state that the
head is untied (not tied embeddings), change checkpoint description to "step-10
checkpoint" instead of end+halfway checkpoints, describe resume verification as
in-process via the verify_resume routine (not spawn-based), and remove the
incorrect claim about "LayerNorm running stats"; reference the implementation
symbols build_model/GPT (or whatever model constructor is used), verify_resume,
and the checkpoint save_checkpoint/checkpoint_step logic so the doc aligns with
code.

In
`@phases/19-capstone-projects/81-end-to-end-distributed-train/tests/test_e2e.py`:
- Line 69: The loop variable `l` in the enumeration `for s, l in
enumerate(losses):` is ambiguous and triggers E741; rename it to a clear
identifier such as `loss` (e.g., `for s, loss in enumerate(losses):`) and update
all uses inside that loop and any related assertions or log statements
(references to `l`) to `loss` so linting and readability are fixed (look for
this in tests/test_e2e.py around the losses handling).

---

Minor comments:
In `@phases/19-capstone-projects/76-collective-ops-from-scratch/code/main.py`:
- Line 336: The zip call over mesh_out and gloo_out uses implicit truncation;
change the loop header "for m, g in zip(mesh_out, gloo_out)" to include an
explicit strict argument (e.g., strict=True or strict=False) to satisfy Ruff
B905 and ensure mismatch behavior is intentional. Also update the f-string that
currently uses str(match) to use an explicit conversion/format specifier such as
{match!s:<12} (or another alignment/width as needed) to replace str(match) and
satisfy RUF010; locate the f-string referencing match and apply the conversion
flag.

In `@phases/19-capstone-projects/76-collective-ops-from-scratch/docs/en.md`:
- Around line 61-67: The API docs list outdated signatures and topology details;
update the text to match the actual code by removing explicit rank/world_size
parameters and noting that Mesh carries rank/world metadata and wiring may not
be ring-only. Specifically, change descriptions to reference the actual
function/class names and signatures: Mesh (which contains rank/world_size and
queue wiring), ring_allreduce(mesh, tensor), broadcast(mesh, tensor, src),
allgather(mesh, tensor), reduce_scatter(mesh, tensor), and _gloo_reference(op,
world_size, tensor) so the prose matches the implemented API surface and avoids
suggesting incorrect call patterns.

In `@phases/19-capstone-projects/77-data-parallel-ddp/docs/en.md`:
- Around line 68-70: Update the docs to match the actual function names and
verification behavior: replace mentions of worker and
_reference_single_process_loop with the actual symbols _ddp_worker and
reference_single_process, and change the verification claim from "per-step
parameter checksum comparison" to describe what the script actually does (prints
per-step losses and a final parameter norm) or alternatively implement per-step
checksum logic in functions _ddp_worker/reference_single_process if you intend
the original claim to be true.

In `@phases/19-capstone-projects/77-data-parallel-ddp/tests/test_ddp.py`:
- Around line 27-29: Before iterating with the for loop that zips ref_losses and
ddp_rank0_losses, add an explicit length check (e.g.,
assertEqual(len(ref_losses), len(ddp_rank0_losses), msg=...)) so the test fails
clearly if one list is shorter; place this assertion immediately before the for
s, (a, b) in enumerate(zip(ref_losses, ddp_rank0_losses)) loop in test_ddp.py to
prevent silent truncation when comparing loss curves.

In `@phases/19-capstone-projects/78-zero-parameter-sharding/code/main.py`:
- Line 253: The memory calculation for ZeRO-1 shards underreports when p_params
% world_size != 0 because zero1 uses floor division; update the computation in
the memory table to use ceil-based shard sizing to mirror shard_bounds (i.e.,
compute per-shard param count as ceil(p_params / world_size) instead of p_params
// world_size) so zero1 = (fp16 + fp16) * p_params + (fp32 * 3 *
ceil_shard_params) / use the same ceil_shard_params variable wherever shard
sizing is used (referencing zero1, p_params, world_size, fp16, fp32 and
shard_bounds).

In `@phases/19-capstone-projects/78-zero-parameter-sharding/docs/en.md`:
- Around line 70-73: Docs mention flatten_params/unflatten_into but the code
exposes gather_flat_params/scatter_flat_to_params; update the documentation to
use the implemented API names (replace flatten_params -> gather_flat_params and
unflatten_into -> scatter_flat_to_params) and adjust any example calls or prose
(e.g., where step(), ZeroOptimizer, and the demo use the flat pack/unpack
helpers) so the documented helper names match the actual functions.

In `@phases/19-capstone-projects/79-pipeline-parallel/code/main.py`:
- Around line 37-47: Add explicit argument validation to ensure num_stages > 0
and num_microbatches > 0 and raise ValueError with a clear message if not;
specifically, update bubble_fraction to check m and n at the top and raise
ValueError("num_stages must be > 0 and num_microbatches must be > 0") (or
similar), and apply the same guard to the other schedule-generation functions
that accept num_stages and num_microbatches so they fail fast instead of
producing empty schedules or triggering later crashes.

In `@phases/19-capstone-projects/79-pipeline-parallel/docs/en.md`:
- Around line 55-58: Update the docs to use the actual exported API names
instead of the old ones: replace references to PipelineStage and Pipeline(...)
with StageMLP (the module implementing a stage), the schedule utilities (names
that implement the GPipe schedule), and run_pipeline(...) (the function that
orchestrates execution and measures bubble fraction); ensure the list mentions
bubble_fraction(...) unchanged and the 4-stage demo that prints per-microbatch
trace and measured bubble fraction so the doc matches the real symbols StageMLP,
schedule utils, and run_pipeline.

In `@phases/19-capstone-projects/81-end-to-end-distributed-train/quiz.json`:
- Line 34: Update the quiz option string that currently reads "GPT adds
LayerNorm running statistics, embedding gradient shape, and
softmax+cross-entropy edge cases that an MLP would hide" to remove the incorrect
claim about LayerNorm maintaining running statistics; instead state that
LayerNorm uses per-sample mean/variance or mention its learnable affine
parameters (gamma/beta). Locate the offending string (the option text in the
quiz entry) and replace "LayerNorm running statistics" with a corrected phrase
such as "LayerNorm per-sample mean/variance" or "LayerNorm affine parameters
(gamma/beta)" so the option is factually accurate.

---

Nitpick comments:
In `@phases/19-capstone-projects/79-pipeline-parallel/tests/test_pipeline.py`:
- Around line 37-45: The test
test_schedule_covers_every_microbatch_through_every_stage currently verifies
forward coverage by asserting set(forwards) equals the expected {(s, mb) for s
in range(n) for mb in range(m)} but only checks backwards by count; update the
test to also assert set(backwards) equals the same expected set to catch
duplicate or missing backward placements produced by gpipe_schedule. Locate the
locals forwards and backwards in
test_schedule_covers_every_microbatch_through_every_stage and add a set-equality
assertion for backwards analogous to the forwards assertion.

In
`@phases/19-capstone-projects/81-end-to-end-distributed-train/tests/test_e2e.py`:
- Around line 28-35: The test currently runs the expensive run_e2e in setUp for
every test; change it to run once per test class by moving the run_e2e call into
a `@classmethod` setUpClass and store results on cls (e.g., cls.out, cls.results,
cls.ckpt_dir) and move the cleanup logic into a corresponding `@classmethod`
tearDownClass to remove cls.out["workdir"] with shutil.rmtree; keep
instance-level tests referencing self.results/self.ckpt_dir by reading from cls
in setUp or directly using self.__class__.results if needed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

Run ID: f49bb96d-a12e-4f1a-9754-50edb5d56b68

📥 Commits

Reviewing files that changed from the base of the PR and between c1374e1 and fe176d1.

📒 Files selected for processing (25)
  • catalog.json
  • phases/19-capstone-projects/76-collective-ops-from-scratch/code/main.py
  • phases/19-capstone-projects/76-collective-ops-from-scratch/docs/en.md
  • phases/19-capstone-projects/76-collective-ops-from-scratch/quiz.json
  • phases/19-capstone-projects/76-collective-ops-from-scratch/tests/test_collectives.py
  • phases/19-capstone-projects/77-data-parallel-ddp/code/main.py
  • phases/19-capstone-projects/77-data-parallel-ddp/docs/en.md
  • phases/19-capstone-projects/77-data-parallel-ddp/quiz.json
  • phases/19-capstone-projects/77-data-parallel-ddp/tests/test_ddp.py
  • phases/19-capstone-projects/78-zero-parameter-sharding/code/main.py
  • phases/19-capstone-projects/78-zero-parameter-sharding/docs/en.md
  • phases/19-capstone-projects/78-zero-parameter-sharding/quiz.json
  • phases/19-capstone-projects/78-zero-parameter-sharding/tests/test_zero.py
  • phases/19-capstone-projects/79-pipeline-parallel/code/main.py
  • phases/19-capstone-projects/79-pipeline-parallel/docs/en.md
  • phases/19-capstone-projects/79-pipeline-parallel/quiz.json
  • phases/19-capstone-projects/79-pipeline-parallel/tests/test_pipeline.py
  • phases/19-capstone-projects/80-checkpoint-sharded-resume/code/main.py
  • phases/19-capstone-projects/80-checkpoint-sharded-resume/docs/en.md
  • phases/19-capstone-projects/80-checkpoint-sharded-resume/quiz.json
  • phases/19-capstone-projects/80-checkpoint-sharded-resume/tests/test_checkpoint.py
  • phases/19-capstone-projects/81-end-to-end-distributed-train/code/main.py
  • phases/19-capstone-projects/81-end-to-end-distributed-train/docs/en.md
  • phases/19-capstone-projects/81-end-to-end-distributed-train/quiz.json
  • phases/19-capstone-projects/81-end-to-end-distributed-train/tests/test_e2e.py

Comment thread phases/19-capstone-projects/76-collective-ops-from-scratch/code/main.py Outdated
Comment on lines +321 to +327
results = {}
for _ in range(world_size):
rank, tensor = out_queue.get(timeout=60)
results[rank] = tensor
for p in procs:
p.join(timeout=30)
return [results[r] for r in range(world_size)], byte_counter.value
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Harden mesh worker lifecycle checks before returning results.

run_mesh joins with a timeout (Line 326) but does not verify is_alive()/exitcode. A stuck or crashed worker can be missed after queue collection. Add explicit post-join checks and terminate lingering processes.

Suggested fix
     for p in procs:
-        p.join(timeout=30)
+        p.join(timeout=30)
+        if p.is_alive():
+            p.terminate()
+            p.join(timeout=2)
+            raise RuntimeError("mesh worker did not exit cleanly")
+        if p.exitcode != 0:
+            raise RuntimeError(f"mesh worker exited with {p.exitcode}")
     return [results[r] for r in range(world_size)], byte_counter.value
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@phases/19-capstone-projects/76-collective-ops-from-scratch/code/main.py`
around lines 321 - 327, The worker join logic in run_mesh currently calls
p.join(timeout=30) but doesn't check if processes are still alive or exited
abnormally; after joining each process in procs, explicitly check p.is_alive()
and p.exitcode (for example iterate procs, call p.join(timeout=30) if still
alive call p.terminate() and p.join(short_timeout), and if exitcode is non-zero
raise or surface an error), ensure any terminated/killed workers do not leave
partial data in out_queue before returning results; if a worker failed, raise a
RuntimeError (including the process pid/exitcode) instead of returning
incomplete results (refer to procs, out_queue, world_size, byte_counter,
results).

Comment thread phases/19-capstone-projects/77-data-parallel-ddp/code/main.py
Comment thread phases/19-capstone-projects/78-zero-parameter-sharding/code/main.py
Comment thread phases/19-capstone-projects/79-pipeline-parallel/code/main.py Outdated
Comment on lines +179 to +188
def rotate_checkpoints(parent_dir: str, keep_last: int = 5) -> list:
"""Delete oldest checkpoint directories so only the most recent keep_last remain."""
parent = Path(parent_dir)
if not parent.exists():
return []
children = sorted(
[c for c in parent.iterdir() if c.is_dir() and c.name.startswith("step_")],
key=lambda c: c.stat().st_mtime,
)
to_delete = children[:-keep_last] if len(children) > keep_last else []
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Rotation ordering is unstable and keep_last=0 currently keeps everything.

Sorting by mtime can be nondeterministic for rapidly-created dirs; and with keep_last=0, children[:-0] becomes empty, so nothing is deleted. This can silently break disk-budget enforcement.

🔧 Suggested fix
 def rotate_checkpoints(parent_dir: str, keep_last: int = 5) -> list:
@@
+    if keep_last < 1:
+        raise ValueError("keep_last must be >= 1")
+
+    def _step_num(p: Path) -> int:
+        # step_0007 -> 7
+        try:
+            return int(p.name.split("_", 1)[1])
+        except (IndexError, ValueError):
+            return -1
+
     children = sorted(
         [c for c in parent.iterdir() if c.is_dir() and c.name.startswith("step_")],
-        key=lambda c: c.stat().st_mtime,
+        key=_step_num,
     )
-    to_delete = children[:-keep_last] if len(children) > keep_last else []
+    to_delete = children[:-keep_last] if len(children) > keep_last else []
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@phases/19-capstone-projects/80-checkpoint-sharded-resume/code/main.py` around
lines 179 - 188, rotate_checkpoints: avoid unstable mtime ordering and handle
keep_last=0 by deterministically sorting directories by the numeric step
extracted from the directory name (e.g., parse int from c.name after "step_")
and then selecting the oldest entries to delete; implement a stable key like
lambda c: int(c.name.split("_", 1)[1]) (with safe parsing/skip for unexpected
names), sort ascending, and compute to_delete = children[:-keep_last] when
keep_last>0 else children to ensure keep_last=0 deletes all.

Comment thread phases/19-capstone-projects/81-end-to-end-distributed-train/code/main.py Outdated
Comment thread phases/19-capstone-projects/81-end-to-end-distributed-train/code/main.py Outdated
Comment thread phases/19-capstone-projects/81-end-to-end-distributed-train/docs/en.md Outdated
Comment thread phases/19-capstone-projects/81-end-to-end-distributed-train/tests/test_e2e.py Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant