diff --git a/conf/springConfigXml/ceph.xml b/conf/springConfigXml/ceph.xml
index c4489de344c..9b3f658033f 100755
--- a/conf/springConfigXml/ceph.xml
+++ b/conf/springConfigXml/ceph.xml
@@ -64,6 +64,7 @@
+
diff --git a/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephKvmExtension.java b/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephKvmExtension.java
index 461e841f69f..7cf59dd3e8e 100755
--- a/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephKvmExtension.java
+++ b/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephKvmExtension.java
@@ -20,6 +20,7 @@
import org.zstack.header.host.HypervisorType;
import org.zstack.header.message.MessageReply;
import org.zstack.header.storage.primary.PrimaryStorageConstant;
+import org.zstack.kvm.KVMAgentHttpTimeoutExtensionPoint;
import org.zstack.kvm.KVMConstant;
import org.zstack.kvm.KVMHostConnectExtensionPoint;
import org.zstack.kvm.KVMHostConnectedContext;
@@ -30,15 +31,27 @@
import org.zstack.utils.function.Function;
import javax.persistence.TypedQuery;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import static org.zstack.utils.CollectionDSL.list;
/**
* Created by frank on 8/17/2015.
*/
-public class CephKvmExtension implements KVMHostConnectExtensionPoint, HostConnectionReestablishExtensionPoint {
+public class CephKvmExtension implements KVMHostConnectExtensionPoint, HostConnectionReestablishExtensionPoint,
+ KVMAgentHttpTimeoutExtensionPoint {
+ static final long CHECK_HOST_STORAGE_CONNECTION_TIMEOUT = TimeUnit.MINUTES.toMillis(5);
+
+ @Override
+ public Map kvmAgentHttpTimeouts() {
+ return Collections.singletonMap(
+ CephPrimaryStorageBase.CHECK_HOST_STORAGE_CONNECTION_PATH,
+ CHECK_HOST_STORAGE_CONNECTION_TIMEOUT);
+ }
+
@Autowired
private CloudBus bus;
@Autowired
diff --git a/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java b/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java
index fab8dd0b583..969db15411c 100755
--- a/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java
+++ b/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java
@@ -4900,6 +4900,11 @@ public String getSyncSignature() {
return String.format("check-storage-%s-host-connection", msg.getPrimaryStorageUuid());
}
+ @Override
+ protected int getSyncLevel() {
+ return 10;
+ }
+
@Override
public void run(SyncTaskChain chain) {
CheckHostStorageConnectionReply reply = new CheckHostStorageConnectionReply();
diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentHttpTimeoutExtensionPoint.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentHttpTimeoutExtensionPoint.java
new file mode 100644
index 00000000000..04db3715429
--- /dev/null
+++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentHttpTimeoutExtensionPoint.java
@@ -0,0 +1,7 @@
+package org.zstack.kvm;
+
+import java.util.Map;
+
+public interface KVMAgentHttpTimeoutExtensionPoint {
+ Map kvmAgentHttpTimeouts();
+}
diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java
index 0c3ec6d2d23..c698ca8758e 100755
--- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java
+++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java
@@ -488,6 +488,12 @@ class Http {
Class responseClass;
String commandStr;
String commandName;
+ long timeout = -1;
+
+ Http timeout(long timeout) {
+ this.timeout = timeout;
+ return this;
+ }
public Http(String path, String cmd, String commandName, Class rspClz) {
this.path = path;
@@ -537,7 +543,7 @@ public void success(T ret) {
public Class getReturnClass() {
return responseClass;
}
- }, TimeUnit.MILLISECONDS, timeoutManager.getTimeout());
+ }, TimeUnit.MILLISECONDS, timeout > 0 ? timeout : timeoutManager.getTimeout());
} else {
restf.asyncJsonPost(path, cmd, header, new JsonAsyncRESTCallback(completion) {
@Override
@@ -2782,6 +2788,7 @@ private void executeAsyncHttpCall(final KVMHostAsyncHttpCallMsg msg, final NoErr
String url = buildUrl(msg.getPath());
MessageCommandRecorder.record(msg.getCommandClassName());
new Http<>(url, msg.getCommand(), msg.getCommandClassName(), LinkedHashMap.class)
+ .timeout(msg.getTimeout() > 0 ? msg.getTimeout() : factory.getAgentHttpTimeout(msg.getPath()))
.call(new ReturnValueCompletion(msg, completion) {
@Override
public void success(LinkedHashMap ret) {
diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java
index 513add98a83..f039a04a0e4 100755
--- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java
+++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java
@@ -138,6 +138,7 @@ public class KVMHostFactory extends AbstractService implements HypervisorFactory
public static final VolumeFormat VMDK_FORMAT = new VolumeFormat(VolumeConstant.VOLUME_FORMAT_VMDK, hypervisorType);
private List connectExtensions = new ArrayList<>();
private final Map completeNicInfoExtensions = new HashMap<>();
+ private final Map agentHttpTimeouts = new HashMap<>();
private int maxDataVolumeNum;
private final Map socketTimeoutMap = new ConcurrentHashMap<>();
@@ -376,6 +377,17 @@ protected void populateExtensions() {
}
completeNicInfoExtensions.put(ext.getL2NetworkTypeVmNicOn(), ext);
}
+ for (KVMAgentHttpTimeoutExtensionPoint ext : pluginRgty.getExtensionList(KVMAgentHttpTimeoutExtensionPoint.class)) {
+ Map timeouts = ext.kvmAgentHttpTimeouts();
+ if (timeouts != null) {
+ agentHttpTimeouts.putAll(timeouts);
+ }
+ }
+ }
+
+ public long getAgentHttpTimeout(String path) {
+ Long timeout = agentHttpTimeouts.get(path);
+ return timeout != null ? timeout : -1;
}
public KVMCompleteNicInformationExtensionPoint getCompleteNicInfoExtension(L2NetworkType type) {
diff --git a/test/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy b/test/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy
new file mode 100644
index 00000000000..49cb25c1903
--- /dev/null
+++ b/test/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy
@@ -0,0 +1,187 @@
+package org.zstack.test.integration.storage.ceph
+
+import org.springframework.http.HttpEntity
+import org.zstack.core.cloudbus.CloudBus
+import org.zstack.core.cloudbus.CloudBusCallBack
+import org.zstack.header.host.HostConstant
+import org.zstack.header.message.MessageReply
+import org.zstack.header.storage.primary.PrimaryStorageConstant
+import org.zstack.kvm.KVMAgentCommands
+import org.zstack.kvm.KVMHostAsyncHttpCallMsg
+import org.zstack.kvm.KVMHostFactory
+import org.zstack.sdk.HostInventory
+import org.zstack.sdk.PrimaryStorageInventory
+import org.zstack.storage.ceph.primary.CephPrimaryStorageBase
+import org.zstack.storage.primary.CheckHostStorageConnectionMsg
+import org.zstack.test.integration.storage.StorageTest
+import org.zstack.testlib.EnvSpec
+import org.zstack.testlib.SubCase
+import org.zstack.utils.data.SizeUnit
+import org.zstack.utils.gson.JSONObjectUtil
+
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+
+class CephHostStorageCheckCase extends SubCase {
+ EnvSpec env
+ CloudBus bus
+
+ @Override
+ void setup() {
+ useSpring(StorageTest.springSpec)
+ }
+
+ @Override
+ void environment() {
+ env = makeEnv {
+ zone {
+ name = "zone"
+ cluster {
+ name = "test-cluster"
+ hypervisorType = "KVM"
+
+ kvm {
+ name = "host1"
+ managementIp = "127.0.0.1"
+ username = "root"
+ password = "password"
+ usedMem = 1000
+ totalCpu = 10
+ }
+
+ kvm {
+ name = "host2"
+ managementIp = "127.0.0.2"
+ username = "root"
+ password = "password"
+ usedMem = 1000
+ totalCpu = 10
+ }
+
+ attachPrimaryStorage("ceph-pri")
+ }
+
+ cephPrimaryStorage {
+ name = "ceph-pri"
+ description = "Test"
+ totalCapacity = SizeUnit.GIGABYTE.toByte(100)
+ availableCapacity = SizeUnit.GIGABYTE.toByte(100)
+ url = "ceph://pri"
+ fsid = "7ff218d9-f525-435f-8a40-3618d1772a64"
+ monUrls = ["root:password@localhost/?monPort=7777"]
+ }
+ }
+ }
+ }
+
+ @Override
+ void test() {
+ env.create {
+ bus = bean(CloudBus.class)
+ testCheckNotSerializedAcrossHosts()
+ testPerMessageTimeoutHonored()
+ }
+ }
+
+ @Override
+ void clean() {
+ env.delete()
+ }
+
+ // ZSTAC-85421: a stuck per-host check must not block other hosts' check on the
+ // same primary storage. The chain syncLevel is raised from 1 to 10 so the second
+ // host's check runs concurrently instead of queueing behind the stuck one.
+ void testCheckNotSerializedAcrossHosts() {
+ def ps = env.inventoryByName("ceph-pri") as PrimaryStorageInventory
+ def host1 = env.inventoryByName("host1") as HostInventory
+ def host2 = env.inventoryByName("host2") as HostInventory
+
+ CountDownLatch host1Entered = new CountDownLatch(1)
+ CountDownLatch release = new CountDownLatch(1)
+
+ // The ceph check path registers a 5min timeout via KVMAgentHttpTimeoutExtensionPoint;
+ // KVMHostFactory collects it so the check no longer rides the 1800s default.
+ assert bean(KVMHostFactory.class).getAgentHttpTimeout(CephPrimaryStorageBase.CHECK_HOST_STORAGE_CONNECTION_PATH) == TimeUnit.MINUTES.toMillis(5)
+
+ env.simulator(CephPrimaryStorageBase.CHECK_HOST_STORAGE_CONNECTION_PATH) { HttpEntity e ->
+ def cmd = JSONObjectUtil.toObject(e.body, CephPrimaryStorageBase.CheckHostStorageConnectionCmd)
+ if (cmd.hostUuid == host1.uuid) {
+ host1Entered.countDown()
+ release.await(60, TimeUnit.SECONDS)
+ }
+ return new KVMAgentCommands.AgentResponse()
+ }
+
+ CountDownLatch reply1Done = new CountDownLatch(1)
+ sendCheckMsg(ps.uuid, host1.uuid, { MessageReply r -> reply1Done.countDown() })
+ assert host1Entered.await(10, TimeUnit.SECONDS)
+
+ AtomicReference reply2 = new AtomicReference<>()
+ CountDownLatch reply2Done = new CountDownLatch(1)
+ sendCheckMsg(ps.uuid, host2.uuid, { MessageReply r -> reply2.set(r); reply2Done.countDown() })
+
+ assert reply2Done.await(15, TimeUnit.SECONDS)
+ assert reply2.get().isSuccess()
+ assert reply1Done.getCount() == 1
+
+ release.countDown()
+ assert reply1Done.await(15, TimeUnit.SECONDS)
+ }
+
+ // ZSTAC-85421: a KVMHostAsyncHttpCallMsg carrying an explicit timeout must fail at
+ // that timeout instead of riding the default 1800s. The ceph check relies on this to
+ // cap its blast radius at 5 minutes.
+ void testPerMessageTimeoutHonored() {
+ def host1 = env.inventoryByName("host1") as HostInventory
+ def stuckPath = "/test/zstac85421/stuck"
+ CountDownLatch release = new CountDownLatch(1)
+
+ env.simulator(stuckPath) { HttpEntity e ->
+ release.await(60, TimeUnit.SECONDS)
+ return new KVMAgentCommands.AgentResponse()
+ }
+
+ KVMHostAsyncHttpCallMsg msg = new KVMHostAsyncHttpCallMsg()
+ msg.setHostUuid(host1.uuid)
+ msg.setPath(stuckPath)
+ msg.setCommand(new KVMAgentCommands.AgentCommand())
+ msg.setNoStatusCheck(true)
+ msg.setTimeout(TimeUnit.SECONDS.toMillis(3))
+ bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, host1.uuid)
+
+ AtomicReference reply = new AtomicReference<>()
+ CountDownLatch done = new CountDownLatch(1)
+ long start = System.currentTimeMillis()
+ bus.send(msg, new CloudBusCallBack(null) {
+ @Override
+ void run(MessageReply r) {
+ reply.set(r)
+ done.countDown()
+ }
+ })
+
+ assert done.await(20, TimeUnit.SECONDS)
+ long elapsed = System.currentTimeMillis() - start
+ assert !reply.get().isSuccess()
+ assert elapsed >= 2000
+ assert elapsed < 20000
+
+ release.countDown()
+ }
+
+ private void sendCheckMsg(String psUuid, String hostUuid, Closure cb) {
+ CheckHostStorageConnectionMsg msg = new CheckHostStorageConnectionMsg()
+ msg.setPrimaryStorageUuid(psUuid)
+ msg.setHostUuids([hostUuid])
+ bus.makeTargetServiceIdByResourceUuid(msg, PrimaryStorageConstant.SERVICE_ID, psUuid)
+ bus.send(msg, new CloudBusCallBack(null) {
+ @Override
+ void run(MessageReply r) {
+ if (cb != null) {
+ cb(r)
+ }
+ }
+ })
+ }
+}