diff --git a/conf/springConfigXml/ceph.xml b/conf/springConfigXml/ceph.xml index c4489de344c..9b3f658033f 100755 --- a/conf/springConfigXml/ceph.xml +++ b/conf/springConfigXml/ceph.xml @@ -64,6 +64,7 @@ + diff --git a/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephKvmExtension.java b/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephKvmExtension.java index 461e841f69f..7cf59dd3e8e 100755 --- a/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephKvmExtension.java +++ b/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephKvmExtension.java @@ -20,6 +20,7 @@ import org.zstack.header.host.HypervisorType; import org.zstack.header.message.MessageReply; import org.zstack.header.storage.primary.PrimaryStorageConstant; +import org.zstack.kvm.KVMAgentHttpTimeoutExtensionPoint; import org.zstack.kvm.KVMConstant; import org.zstack.kvm.KVMHostConnectExtensionPoint; import org.zstack.kvm.KVMHostConnectedContext; @@ -30,15 +31,27 @@ import org.zstack.utils.function.Function; import javax.persistence.TypedQuery; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.zstack.utils.CollectionDSL.list; /** * Created by frank on 8/17/2015. */ -public class CephKvmExtension implements KVMHostConnectExtensionPoint, HostConnectionReestablishExtensionPoint { +public class CephKvmExtension implements KVMHostConnectExtensionPoint, HostConnectionReestablishExtensionPoint, + KVMAgentHttpTimeoutExtensionPoint { + static final long CHECK_HOST_STORAGE_CONNECTION_TIMEOUT = TimeUnit.MINUTES.toMillis(5); + + @Override + public Map kvmAgentHttpTimeouts() { + return Collections.singletonMap( + CephPrimaryStorageBase.CHECK_HOST_STORAGE_CONNECTION_PATH, + CHECK_HOST_STORAGE_CONNECTION_TIMEOUT); + } + @Autowired private CloudBus bus; @Autowired diff --git a/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java b/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java index fab8dd0b583..969db15411c 100755 --- a/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java +++ b/plugin/ceph/src/main/java/org/zstack/storage/ceph/primary/CephPrimaryStorageBase.java @@ -4900,6 +4900,11 @@ public String getSyncSignature() { return String.format("check-storage-%s-host-connection", msg.getPrimaryStorageUuid()); } + @Override + protected int getSyncLevel() { + return 10; + } + @Override public void run(SyncTaskChain chain) { CheckHostStorageConnectionReply reply = new CheckHostStorageConnectionReply(); diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentHttpTimeoutExtensionPoint.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentHttpTimeoutExtensionPoint.java new file mode 100644 index 00000000000..04db3715429 --- /dev/null +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMAgentHttpTimeoutExtensionPoint.java @@ -0,0 +1,7 @@ +package org.zstack.kvm; + +import java.util.Map; + +public interface KVMAgentHttpTimeoutExtensionPoint { + Map kvmAgentHttpTimeouts(); +} diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java index 0c3ec6d2d23..c698ca8758e 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHost.java @@ -488,6 +488,12 @@ class Http { Class responseClass; String commandStr; String commandName; + long timeout = -1; + + Http timeout(long timeout) { + this.timeout = timeout; + return this; + } public Http(String path, String cmd, String commandName, Class rspClz) { this.path = path; @@ -537,7 +543,7 @@ public void success(T ret) { public Class getReturnClass() { return responseClass; } - }, TimeUnit.MILLISECONDS, timeoutManager.getTimeout()); + }, TimeUnit.MILLISECONDS, timeout > 0 ? timeout : timeoutManager.getTimeout()); } else { restf.asyncJsonPost(path, cmd, header, new JsonAsyncRESTCallback(completion) { @Override @@ -2782,6 +2788,7 @@ private void executeAsyncHttpCall(final KVMHostAsyncHttpCallMsg msg, final NoErr String url = buildUrl(msg.getPath()); MessageCommandRecorder.record(msg.getCommandClassName()); new Http<>(url, msg.getCommand(), msg.getCommandClassName(), LinkedHashMap.class) + .timeout(msg.getTimeout() > 0 ? msg.getTimeout() : factory.getAgentHttpTimeout(msg.getPath())) .call(new ReturnValueCompletion(msg, completion) { @Override public void success(LinkedHashMap ret) { diff --git a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java index 513add98a83..f039a04a0e4 100755 --- a/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java +++ b/plugin/kvm/src/main/java/org/zstack/kvm/KVMHostFactory.java @@ -138,6 +138,7 @@ public class KVMHostFactory extends AbstractService implements HypervisorFactory public static final VolumeFormat VMDK_FORMAT = new VolumeFormat(VolumeConstant.VOLUME_FORMAT_VMDK, hypervisorType); private List connectExtensions = new ArrayList<>(); private final Map completeNicInfoExtensions = new HashMap<>(); + private final Map agentHttpTimeouts = new HashMap<>(); private int maxDataVolumeNum; private final Map socketTimeoutMap = new ConcurrentHashMap<>(); @@ -376,6 +377,17 @@ protected void populateExtensions() { } completeNicInfoExtensions.put(ext.getL2NetworkTypeVmNicOn(), ext); } + for (KVMAgentHttpTimeoutExtensionPoint ext : pluginRgty.getExtensionList(KVMAgentHttpTimeoutExtensionPoint.class)) { + Map timeouts = ext.kvmAgentHttpTimeouts(); + if (timeouts != null) { + agentHttpTimeouts.putAll(timeouts); + } + } + } + + public long getAgentHttpTimeout(String path) { + Long timeout = agentHttpTimeouts.get(path); + return timeout != null ? timeout : -1; } public KVMCompleteNicInformationExtensionPoint getCompleteNicInfoExtension(L2NetworkType type) { diff --git a/test/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy b/test/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy new file mode 100644 index 00000000000..49cb25c1903 --- /dev/null +++ b/test/src/test/groovy/org/zstack/test/integration/storage/ceph/CephHostStorageCheckCase.groovy @@ -0,0 +1,187 @@ +package org.zstack.test.integration.storage.ceph + +import org.springframework.http.HttpEntity +import org.zstack.core.cloudbus.CloudBus +import org.zstack.core.cloudbus.CloudBusCallBack +import org.zstack.header.host.HostConstant +import org.zstack.header.message.MessageReply +import org.zstack.header.storage.primary.PrimaryStorageConstant +import org.zstack.kvm.KVMAgentCommands +import org.zstack.kvm.KVMHostAsyncHttpCallMsg +import org.zstack.kvm.KVMHostFactory +import org.zstack.sdk.HostInventory +import org.zstack.sdk.PrimaryStorageInventory +import org.zstack.storage.ceph.primary.CephPrimaryStorageBase +import org.zstack.storage.primary.CheckHostStorageConnectionMsg +import org.zstack.test.integration.storage.StorageTest +import org.zstack.testlib.EnvSpec +import org.zstack.testlib.SubCase +import org.zstack.utils.data.SizeUnit +import org.zstack.utils.gson.JSONObjectUtil + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicReference + +class CephHostStorageCheckCase extends SubCase { + EnvSpec env + CloudBus bus + + @Override + void setup() { + useSpring(StorageTest.springSpec) + } + + @Override + void environment() { + env = makeEnv { + zone { + name = "zone" + cluster { + name = "test-cluster" + hypervisorType = "KVM" + + kvm { + name = "host1" + managementIp = "127.0.0.1" + username = "root" + password = "password" + usedMem = 1000 + totalCpu = 10 + } + + kvm { + name = "host2" + managementIp = "127.0.0.2" + username = "root" + password = "password" + usedMem = 1000 + totalCpu = 10 + } + + attachPrimaryStorage("ceph-pri") + } + + cephPrimaryStorage { + name = "ceph-pri" + description = "Test" + totalCapacity = SizeUnit.GIGABYTE.toByte(100) + availableCapacity = SizeUnit.GIGABYTE.toByte(100) + url = "ceph://pri" + fsid = "7ff218d9-f525-435f-8a40-3618d1772a64" + monUrls = ["root:password@localhost/?monPort=7777"] + } + } + } + } + + @Override + void test() { + env.create { + bus = bean(CloudBus.class) + testCheckNotSerializedAcrossHosts() + testPerMessageTimeoutHonored() + } + } + + @Override + void clean() { + env.delete() + } + + // ZSTAC-85421: a stuck per-host check must not block other hosts' check on the + // same primary storage. The chain syncLevel is raised from 1 to 10 so the second + // host's check runs concurrently instead of queueing behind the stuck one. + void testCheckNotSerializedAcrossHosts() { + def ps = env.inventoryByName("ceph-pri") as PrimaryStorageInventory + def host1 = env.inventoryByName("host1") as HostInventory + def host2 = env.inventoryByName("host2") as HostInventory + + CountDownLatch host1Entered = new CountDownLatch(1) + CountDownLatch release = new CountDownLatch(1) + + // The ceph check path registers a 5min timeout via KVMAgentHttpTimeoutExtensionPoint; + // KVMHostFactory collects it so the check no longer rides the 1800s default. + assert bean(KVMHostFactory.class).getAgentHttpTimeout(CephPrimaryStorageBase.CHECK_HOST_STORAGE_CONNECTION_PATH) == TimeUnit.MINUTES.toMillis(5) + + env.simulator(CephPrimaryStorageBase.CHECK_HOST_STORAGE_CONNECTION_PATH) { HttpEntity e -> + def cmd = JSONObjectUtil.toObject(e.body, CephPrimaryStorageBase.CheckHostStorageConnectionCmd) + if (cmd.hostUuid == host1.uuid) { + host1Entered.countDown() + release.await(60, TimeUnit.SECONDS) + } + return new KVMAgentCommands.AgentResponse() + } + + CountDownLatch reply1Done = new CountDownLatch(1) + sendCheckMsg(ps.uuid, host1.uuid, { MessageReply r -> reply1Done.countDown() }) + assert host1Entered.await(10, TimeUnit.SECONDS) + + AtomicReference reply2 = new AtomicReference<>() + CountDownLatch reply2Done = new CountDownLatch(1) + sendCheckMsg(ps.uuid, host2.uuid, { MessageReply r -> reply2.set(r); reply2Done.countDown() }) + + assert reply2Done.await(15, TimeUnit.SECONDS) + assert reply2.get().isSuccess() + assert reply1Done.getCount() == 1 + + release.countDown() + assert reply1Done.await(15, TimeUnit.SECONDS) + } + + // ZSTAC-85421: a KVMHostAsyncHttpCallMsg carrying an explicit timeout must fail at + // that timeout instead of riding the default 1800s. The ceph check relies on this to + // cap its blast radius at 5 minutes. + void testPerMessageTimeoutHonored() { + def host1 = env.inventoryByName("host1") as HostInventory + def stuckPath = "/test/zstac85421/stuck" + CountDownLatch release = new CountDownLatch(1) + + env.simulator(stuckPath) { HttpEntity e -> + release.await(60, TimeUnit.SECONDS) + return new KVMAgentCommands.AgentResponse() + } + + KVMHostAsyncHttpCallMsg msg = new KVMHostAsyncHttpCallMsg() + msg.setHostUuid(host1.uuid) + msg.setPath(stuckPath) + msg.setCommand(new KVMAgentCommands.AgentCommand()) + msg.setNoStatusCheck(true) + msg.setTimeout(TimeUnit.SECONDS.toMillis(3)) + bus.makeTargetServiceIdByResourceUuid(msg, HostConstant.SERVICE_ID, host1.uuid) + + AtomicReference reply = new AtomicReference<>() + CountDownLatch done = new CountDownLatch(1) + long start = System.currentTimeMillis() + bus.send(msg, new CloudBusCallBack(null) { + @Override + void run(MessageReply r) { + reply.set(r) + done.countDown() + } + }) + + assert done.await(20, TimeUnit.SECONDS) + long elapsed = System.currentTimeMillis() - start + assert !reply.get().isSuccess() + assert elapsed >= 2000 + assert elapsed < 20000 + + release.countDown() + } + + private void sendCheckMsg(String psUuid, String hostUuid, Closure cb) { + CheckHostStorageConnectionMsg msg = new CheckHostStorageConnectionMsg() + msg.setPrimaryStorageUuid(psUuid) + msg.setHostUuids([hostUuid]) + bus.makeTargetServiceIdByResourceUuid(msg, PrimaryStorageConstant.SERVICE_ID, psUuid) + bus.send(msg, new CloudBusCallBack(null) { + @Override + void run(MessageReply r) { + if (cb != null) { + cb(r) + } + } + }) + } +}