<fix>[ceph]: cap & parallelize host conn check#4145
Conversation
Walkthrough在 KVM HTTP 层加入可配置的消息级 timeout 传播;KVMHostFactory 汇总扩展点提供的 path->timeout 映射;CephKvmExtension 为主存储检查路径注册超时并在 Ceph 主存储的检查任务中设置 ChainTask 同步等级;新增集成测试覆盖并发与超时行为验证。 变更主存储连接检查超时与并发验证
序列图sequenceDiagram
participant Client
participant CephStorage
participant KVMHost
participant Http
participant REST
Client->>CephStorage: CheckHostStorageConnectionMsg
CephStorage->>CephStorage: ChainTask (getSyncLevel()=10)
CephStorage->>KVMHost: executeAsyncHttpCall(timeout from CephKvmExtension)
KVMHost->>Http: new Http().timeout(configured timeout)
Http->>REST: asyncJsonPost(url, effective timeout)
REST-->>Http: response or timeout
Http-->>KVMHost: callback
KVMHost-->>CephStorage: MessageReply
代码审查工作量评估🎯 3 (Moderate) | ⏱️ ~25 minutes 诗
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.42.3)plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.javaplugin/kvm/src/main/java/org/zstack/kvm/KVMHost.javaComment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java (1)
4904-4907: ⚡ Quick win把并发上限提取成具名常量。
这里的
10是本次修复的关键调参值,直接写裸值会让后续调优和排障时不够直观。建议像超时一样提成常量,例如CHECK_HOST_STORAGE_CONNECTION_SYNC_LEVEL。As per coding guidelines “避免使用魔法值(Magic Value):直接使用未经定义的数值或字符串应替换为枚举或常量。”可参考的修改
+ private static final int CHECK_HOST_STORAGE_CONNECTION_SYNC_LEVEL = 10; + `@Override` protected int getSyncLevel() { - return 10; + return CHECK_HOST_STORAGE_CONNECTION_SYNC_LEVEL; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java` around lines 4904 - 4907, Replace the magic number 10 in CephPrimaryStorageBase.getSyncLevel() with a named constant (e.g., CHECK_HOST_STORAGE_CONNECTION_SYNC_LEVEL) to make the concurrency limit explicit and configurable; add a static final int CHECK_HOST_STORAGE_CONNECTION_SYNC_LEVEL = 10 (near other class constants or top of the class) and return that constant from getSyncLevel() instead of the literal 10.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@test/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy`:
- Around line 159-163: 当前断言只验证“失败 + 耗时区间”,需要进一步收紧为超时错误:在
CephHostStorageCheckCase 的断言块(包含 reply、done、start 变量)中,除了 assert
!reply.get().isSuccess() 外,再断言返回的错误类型是 timeout(例如检查 reply.get().getError()
表示为超时,或 error.code/递归方法如 isTimeout() 为 true),以确保 setTimeout(3s) 被真正触发并返回超时错误。
- Around line 111-125: The test currently only checks that host1 was blocked
(host1Entered) and that host2 succeeded, but never asserts host1's final reply;
modify the host1 sendCheckMsg call to capture its MessageReply into an
AtomicReference (e.g., reply1), use reply1Done as the latch callback to set the
reference and count down, then after release.countDown() await reply1Done and
assert reply1.get().isSuccess() (mirroring the reply2 pattern for sendCheckMsg,
reply2, reply2Done).
---
Nitpick comments:
In
`@plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java`:
- Around line 4904-4907: Replace the magic number 10 in
CephPrimaryStorageBase.getSyncLevel() with a named constant (e.g.,
CHECK_HOST_STORAGE_CONNECTION_SYNC_LEVEL) to make the concurrency limit explicit
and configurable; add a static final int
CHECK_HOST_STORAGE_CONNECTION_SYNC_LEVEL = 10 (near other class constants or top
of the class) and return that constant from getSyncLevel() instead of the
literal 10.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: http://open.zstack.ai:20001/code-reviews/zstack-cloud.yaml (via .coderabbit.yaml)
Review profile: CHILL
Plan: Pro
Run ID: bd3d4d00-3965-4bc8-937c-6df87b34e69a
📒 Files selected for processing (3)
plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.javaplugin/kvm/src/main/java/org/zstack/kvm/KVMHost.javatest/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy
| CountDownLatch reply1Done = new CountDownLatch(1) | ||
| sendCheckMsg(ps.uuid, host1.uuid, { MessageReply r -> reply1Done.countDown() }) | ||
| assert host1Entered.await(10, TimeUnit.SECONDS) | ||
|
|
||
| AtomicReference<MessageReply> reply2 = new AtomicReference<>() | ||
| CountDownLatch reply2Done = new CountDownLatch(1) | ||
| sendCheckMsg(ps.uuid, host2.uuid, { MessageReply r -> reply2.set(r); reply2Done.countDown() }) | ||
|
|
||
| assert reply2Done.await(15, TimeUnit.SECONDS) | ||
| assert reply2.get().isSuccess() | ||
| assert reply1Done.getCount() == 1 | ||
|
|
||
| release.countDown() | ||
| assert reply1Done.await(15, TimeUnit.SECONDS) | ||
| } |
There was a problem hiding this comment.
补上 host1 最终结果断言。
这里现在只证明了 host2 没被串行化,但没有验证 host1 在放行后是否成功完成;如果实现变成“并发了但最终失败”,这个用例仍然会通过。建议像 reply2 一样保存 reply1 并断言成功。
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@test/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy`
around lines 111 - 125, The test currently only checks that host1 was blocked
(host1Entered) and that host2 succeeded, but never asserts host1's final reply;
modify the host1 sendCheckMsg call to capture its MessageReply into an
AtomicReference (e.g., reply1), use reply1Done as the latch callback to set the
reference and count down, then after release.countDown() await reply1Done and
assert reply1.get().isSuccess() (mirroring the reply2 pattern for sendCheckMsg,
reply2, reply2Done).
| assert done.await(20, TimeUnit.SECONDS) | ||
| long elapsed = System.currentTimeMillis() - start | ||
| assert !reply.get().isSuccess() | ||
| assert elapsed >= 2000 | ||
| assert elapsed < 20000 |
There was a problem hiding this comment.
把失败类型收紧到 timeout。
当前只断言“失败 + 耗时区间”,这会把路由错误、模拟器异常等非超时失败也算成通过,不能直接证明 setTimeout(3s) 被真正尊重。建议再断言返回的是 timeout 类错误。
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In
`@test/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy`
around lines 159 - 163, 当前断言只验证“失败 + 耗时区间”,需要进一步收紧为超时错误:在
CephHostStorageCheckCase 的断言块(包含 reply、done、start 变量)中,除了 assert
!reply.get().isSuccess() 外,再断言返回的错误类型是 timeout(例如检查 reply.get().getError()
表示为超时,或 error.code/递归方法如 isTimeout() 为 true),以确保 setTimeout(3s) 被真正触发并返回超时错误。
Per-host ceph connectivity check shared one serial chain per primary storage. A stuck in-flight check (riding the default 1800s http timeout) blocked every other host's check on the same PS for ~30min. - raise check chain syncLevel 1 -> 10 so hosts run concurrently - add KVMAgentHttpTimeoutExtensionPoint: modules register a per-path timeout, KVMHostFactory collects it, KVMHost resolves it (explicit msg timeout > registered path > default 1800s). ceph registers the check path at 5min, no per-call setTimeout. Resolves: ZSTAC-85421 Change-Id: I717a746f676c6f776168646f6f776c64636e6870
147ad24 to
9fa96e2
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java (1)
380-386: ⚡ Quick win建议增加重复路径检测
当多个扩展点为同一路径返回不同的超时值时,
putAll会静默覆盖,可能导致意外行为。建议在合并前检测并记录重复的路径配置,帮助排查超时配置冲突。♻️ 建议的重复检测实现
for (KVMAgentHttpTimeoutExtensionPoint ext : pluginRgty.getExtensionList(KVMAgentHttpTimeoutExtensionPoint.class)) { Map<String, Long> timeouts = ext.kvmAgentHttpTimeouts(); if (timeouts != null) { + for (Map.Entry<String, Long> entry : timeouts.entrySet()) { + if (agentHttpTimeouts.containsKey(entry.getKey())) { + logger.warn(String.format("KVM agent HTTP timeout path [%s] already configured with %dms by another extension, will be overwritten to %dms by %s", + entry.getKey(), agentHttpTimeouts.get(entry.getKey()), entry.getValue(), ext.getClass().getName())); + } + } agentHttpTimeouts.putAll(timeouts); } }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java` around lines 380 - 386, The loop in KVMHostFactory that merges KVMAgentHttpTimeoutExtensionPoint.kvmAgentHttpTimeouts() into agentHttpTimeouts currently uses agentHttpTimeouts.putAll(timeouts) which silently overwrites duplicate keys; modify the merge to iterate each entry from ext.kvmAgentHttpTimeouts(), check if agentHttpTimeouts.containsKey(key) and if the existing value != new value, call the class logger (e.g., logger.warn) recording the key, existing value, new value and ext.getClass().getName() to surface conflicts, then decide consistently whether to overwrite or keep the existing value (choose and implement one policy and document it in a comment).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java`:
- Around line 380-386: The loop in KVMHostFactory that merges
KVMAgentHttpTimeoutExtensionPoint.kvmAgentHttpTimeouts() into agentHttpTimeouts
currently uses agentHttpTimeouts.putAll(timeouts) which silently overwrites
duplicate keys; modify the merge to iterate each entry from
ext.kvmAgentHttpTimeouts(), check if agentHttpTimeouts.containsKey(key) and if
the existing value != new value, call the class logger (e.g., logger.warn)
recording the key, existing value, new value and ext.getClass().getName() to
surface conflicts, then decide consistently whether to overwrite or keep the
existing value (choose and implement one policy and document it in a comment).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: http://open.zstack.ai:20001/code-reviews/zstack-cloud.yaml (via .coderabbit.yaml)
Review profile: CHILL
Plan: Pro
Run ID: aa936b51-7d36-4fd1-9af4-412b7b80ce53
⛔ Files ignored due to path filters (1)
conf/springConfigXml/ceph.xmlis excluded by!**/*.xml
📒 Files selected for processing (6)
plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephKvmExtension.javaplugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.javaplugin/kvm/src/main/java/org/zstack/kvm/KVMAgentHttpTimeoutExtensionPoint.javaplugin/kvm/src/main/java/org/zstack/kvm/KVMHost.javaplugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.javatest/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy
✅ Files skipped from review due to trivial changes (1)
- plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentHttpTimeoutExtensionPoint.java
🚧 Files skipped from review as they are similar to previous changes (3)
- plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java
- plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java
- test/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy
问题
同一 ceph primary storage 下,每台 host 的存储连通性检查 (
CheckHostStorageConnectionMsg) 共用一条按 PS 串行的 chain(sync 签名只含 psUuid,不含 host)。当某台 host 的 in-flight check 卡住(吃默认 1800s http 超时),同 PS 其它所有 host 的 check 在队头被堵满 ~30min。现网表现:MN 重启后并发 reconnect,一台 host 的 ceph check 被孤立(kvmagent 重启 + drain 60s cushion 漏过),空等 1800s,拖垮同 PS 其它 host 的恢复。
修复(两条,正交)
getSyncLevel()由默认 1 提到 10,同 PS 不同 host 的 check 可并发,单台卡死只占 1/10 槽位,不再独占队头。KVMHostAsyncHttpCallMsg单独设 5min 超时(CHECK_HOST_STORAGE_CONNECTION_TIMEOUT),替代默认 1800s。通过新增的 per-message timeout 透传实现:KVMHost.executeAsyncHttpCall读msg.getTimeout()(复用NeedReplyMessage.timeout,-1=默认)传给内部Http,>0时覆盖timeoutManager.getTimeout()。合起来:最坏情况从「N 台 × 30min」压到「单台 5min、互不牵连」。
测试
新增 IT
CephHostStorageCheckCase(ceph PS + 2 host):testCheckNotSerializedAcrossHosts— 卡住 host1 的 check,断言 host2 的 check 仍并发返回、host1 仍 pendingtestPerMessageTimeoutHonored— 发带setTimeout(3s)的 KVMHostAsyncHttpCallMsg 到 stuck simulator,断言 ~3s 超时失败而非默认 1800s本地隔离 .m2 串行实跑:
Tests run: 1, Failures: 0, Errors: 0 — BUILD SUCCESSAsync Http Timeout ... timeout after 3000 MILLISECONDSreply2Done.await(15s)失败(host2 被串行堵死)→ BUILD FAILURE,证明测试守得住修复影响
NeedReplyMessage.timeout字段)Resolves: ZSTAC-85421
sync from gitlab !10049