* [PATCH 2/4] colo: Introduce resource agent
2019-11-21 17:49 [PATCH 0/4] colo: Introduce resource agent and high-level test Lukas Straub
2019-11-21 17:49 ` [PATCH 1/4] block/quorum.c: stable children names Lukas Straub
@ 2019-11-21 17:49 ` Lukas Straub
2019-11-21 17:49 ` [PATCH 3/4] colo: Introduce high-level test Lukas Straub
` (2 subsequent siblings)
4 siblings, 0 replies; 13+ messages in thread
From: Lukas Straub @ 2019-11-21 17:49 UTC (permalink / raw)
To: qemu-devel
Cc: Zhang, Chen, Jason Wang, Alberto Garcia, Dr. David Alan Gilbert
Introduce a resource agent which can be used in a
Pacemaker cluster to manage qemu COLO.
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
scripts/colo-resource-agent/colo | 1026 ++++++++++++++++++++++++++++++
1 file changed, 1026 insertions(+)
create mode 100755 scripts/colo-resource-agent/colo
diff --git a/scripts/colo-resource-agent/colo b/scripts/colo-resource-agent/colo
new file mode 100755
index 0000000000..5fd9cfc0b5
--- /dev/null
+++ b/scripts/colo-resource-agent/colo
@@ -0,0 +1,1026 @@
+#!/usr/bin/env python
+
+# Resource agent for qemu COLO for use with Pacemaker CRM
+#
+# Copyright (c) Lukas Straub <lukasstraub2@web.de>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+from __future__ import print_function
+import subprocess
+import sys
+import os
+import os.path
+import signal
+import socket
+import json
+import re
+import time
+import logging
+import logging.handlers
+
+# Constants
+OCF_SUCCESS = 0
+OCF_ERR_GENERIC = 1
+OCF_ERR_ARGS = 2
+OCF_ERR_UNIMPLEMENTED = 3
+OCF_ERR_PERM = 4
+OCF_ERR_INSTALLED = 5
+OCF_ERR_CONFIGURED = 6
+OCF_NOT_RUNNING = 7
+OCF_RUNNING_MASTER = 8
+OCF_FAILED_MASTER = 9
+
+# Get environment variables
+OCF_RESKEY_CRM_meta_notify_type \
+ = os.getenv("OCF_RESKEY_CRM_meta_notify_type")
+OCF_RESKEY_CRM_meta_notify_operation \
+ = os.getenv("OCF_RESKEY_CRM_meta_notify_operation")
+OCF_RESKEY_CRM_meta_notify_start_uname \
+ = os.getenv("OCF_RESKEY_CRM_meta_notify_start_uname", "")
+OCF_RESKEY_CRM_meta_notify_stop_uname \
+ = os.getenv("OCF_RESKEY_CRM_meta_notify_stop_uname", "")
+OCF_RESKEY_CRM_meta_notify_active_uname \
+ = os.getenv("OCF_RESKEY_CRM_meta_notify_active_uname", "")
+OCF_RESKEY_CRM_meta_notify_promote_uname \
+ = os.getenv("OCF_RESKEY_CRM_meta_notify_promote_uname", "")
+OCF_RESKEY_CRM_meta_notify_demote_uname \
+ = os.getenv("OCF_RESKEY_CRM_meta_notify_demote_uname", "")
+OCF_RESKEY_CRM_meta_notify_master_uname \
+ = os.getenv("OCF_RESKEY_CRM_meta_notify_master_uname", "")
+OCF_RESKEY_CRM_meta_notify_slave_uname \
+ = os.getenv("OCF_RESKEY_CRM_meta_notify_slave_uname", "")
+
+HA_RSCTMP = os.getenv("HA_RSCTMP", "/run/resource-agents")
+HA_LOGFACILITY = os.getenv("HA_LOGFACILITY")
+HA_LOGFILE = os.getenv("HA_LOGFILE")
+HA_DEBUGLOG = os.getenv("HA_DEBUGLOG")
+OCF_RESOURCE_INSTANCE = os.getenv("OCF_RESOURCE_INSTANCE", "default-instance")
+OCF_RESKEY_CRM_meta_timeout \
+ = int(os.getenv("OCF_RESKEY_CRM_meta_timeout", "60000"))
+OCF_RESKEY_CRM_meta_interval \
+ = int(os.getenv("OCF_RESKEY_CRM_meta_interval", "1"))
+OCF_RESKEY_CRM_meta_clone_max \
+ = int(os.getenv("OCF_RESKEY_CRM_meta_clone_max", "1"))
+OCF_RESKEY_CRM_meta_clone_node_max \
+ = int(os.getenv("OCF_RESKEY_CRM_meta_clone_node_max", "1"))
+OCF_RESKEY_CRM_meta_master_max \
+ = int(os.getenv("OCF_RESKEY_CRM_meta_master_max", "1"))
+OCF_RESKEY_CRM_meta_master_node_max \
+ = int(os.getenv("OCF_RESKEY_CRM_meta_master_node_max", "1"))
+OCF_RESKEY_CRM_meta_notify \
+ = os.getenv("OCF_RESKEY_CRM_meta_notify")
+OCF_RESKEY_CRM_meta_globally_unique \
+ = os.getenv("OCF_RESKEY_CRM_meta_globally_unique")
+
+HOSTNAME = os.getenv("OCF_RESKEY_CRM_meta_on_node", socket.gethostname())
+
+OCF_ACTION = os.getenv("__OCF_ACTION")
+if not OCF_ACTION and len(sys.argv) == 2:
+ OCF_ACTION = sys.argv[1]
+
+# Resource parameters
+OCF_RESKEY_binary_default = "qemu-system-x86_64"
+OCF_RESKEY_log_dir_default = HA_RSCTMP
+OCF_RESKEY_options_default = ""
+OCF_RESKEY_disk_size_default = ""
+OCF_RESKEY_active_hidden_dir_default = ""
+OCF_RESKEY_listen_address_default = "0.0.0.0"
+OCF_RESKEY_base_port_default = "9000"
+OCF_RESKEY_checkpoint_interval_default = "20000"
+OCF_RESKEY_debug_default = "false"
+
+OCF_RESKEY_binary = os.getenv("OCF_RESKEY_binary", OCF_RESKEY_binary_default)
+OCF_RESKEY_log_dir = os.getenv("OCF_RESKEY_log_dir", OCF_RESKEY_log_dir_default)
+OCF_RESKEY_options = os.getenv("OCF_RESKEY_options", OCF_RESKEY_options_default)
+OCF_RESKEY_disk_size \
+ = os.getenv("OCF_RESKEY_disk_size", OCF_RESKEY_disk_size_default)
+OCF_RESKEY_active_hidden_dir \
+ = os.getenv("OCF_RESKEY_active_hidden_dir", \
+ OCF_RESKEY_active_hidden_dir_default)
+OCF_RESKEY_listen_address \
+ = os.getenv("OCF_RESKEY_listen_address", OCF_RESKEY_listen_address_default)
+OCF_RESKEY_base_port \
+ = os.getenv("OCF_RESKEY_base_port", OCF_RESKEY_base_port_default)
+OCF_RESKEY_checkpoint_interval \
+ = os.getenv("OCF_RESKEY_checkpoint_interval", \
+ OCF_RESKEY_checkpoint_interval_default)
+OCF_RESKEY_debug = os.getenv("OCF_RESKEY_debug", OCF_RESKEY_debug_default)
+
+ACTIVE_IMAGE = os.path.join(OCF_RESKEY_active_hidden_dir, \
+ OCF_RESOURCE_INSTANCE + "-active.qcow2")
+HIDDEN_IMAGE = os.path.join(OCF_RESKEY_active_hidden_dir, \
+ OCF_RESOURCE_INSTANCE + "-hidden.qcow2")
+
+QMP_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-qmp.sock")
+COMP_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-compare.sock")
+COMP_OUT_SOCK = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE \
+ + "-comp_out.sock")
+
+PID_FILE = os.path.join(HA_RSCTMP, OCF_RESOURCE_INSTANCE + "-qemu.pid")
+
+QMP_LOG = os.path.join(OCF_RESKEY_log_dir, OCF_RESOURCE_INSTANCE + "-qmp.log")
+QEMU_LOG = os.path.join(OCF_RESKEY_log_dir, OCF_RESOURCE_INSTANCE + "-qemu.log")
+
+# Exception only raised by ourself
+class Error(Exception):
+ pass
+
+def setup_constants():
+ # This function is called after the parameters where validated
+ global MIGRATE_PORT, MIROR_PORT, COMPARE_IN_PORT, NBD_PORT
+ MIGRATE_PORT = int(OCF_RESKEY_base_port)
+ MIROR_PORT = int(OCF_RESKEY_base_port) + 1
+ COMPARE_IN_PORT = int(OCF_RESKEY_base_port) + 2
+ NBD_PORT = int(OCF_RESKEY_base_port) + 3
+
+ global QEMU_PRIMARY_CMDLINE
+ QEMU_PRIMARY_CMDLINE = ("'%(OCF_RESKEY_binary)s' %(OCF_RESKEY_options)s"
+ " -chardev socket,id=comp0,path='%(COMP_SOCK)s',server,nowait"
+ " -chardev socket,id=comp0-0,path='%(COMP_SOCK)s'"
+ " -chardev socket,id=comp_out,path='%(COMP_OUT_SOCK)s',server,nowait"
+ " -chardev socket,id=comp_out0,path='%(COMP_OUT_SOCK)s'"
+ " -drive if=none,node-name=colo-disk0,driver=quorum,read-pattern=fifo,"
+ "vote-threshold=1,children.0=parent0"
+ " -qmp unix:'%(QMP_SOCK)s',server,nowait"
+ " -daemonize -D '%(QEMU_LOG)s' -pidfile '%(PID_FILE)s'") % globals()
+
+ global QEMU_SECONDARY_CMDLINE
+ QEMU_SECONDARY_CMDLINE = ("'%(OCF_RESKEY_binary)s' %(OCF_RESKEY_options)s"
+ " -chardev socket,id=red0,host='%(OCF_RESKEY_listen_address)s',"
+ "port=%(MIROR_PORT)s,server,nowait"
+ " -chardev socket,id=red1,host='%(OCF_RESKEY_listen_address)s',"
+ "port=%(COMPARE_IN_PORT)s,server,nowait"
+ " -object filter-redirector,id=f1,netdev=hn0,queue=tx,indev=red0"
+ " -object filter-redirector,id=f2,netdev=hn0,queue=rx,outdev=red1"
+ " -object filter-rewriter,id=rew0,netdev=hn0,queue=all"
+ " -drive if=none,node-name=childs0,top-id=colo-disk0,"
+ "driver=replication,mode=secondary,file.driver=qcow2,"
+ "file.file.filename='%(ACTIVE_IMAGE)s',file.backing.driver=qcow2,"
+ "file.backing.file.filename='%(HIDDEN_IMAGE)s',"
+ "file.backing.backing=parent0"
+ " -drive if=none,node-name=colo-disk0,driver=quorum,read-pattern=fifo,"
+ "vote-threshold=1,children.0=childs0"
+ " -incoming tcp:'%(OCF_RESKEY_listen_address)s':%(MIGRATE_PORT)s"
+ " -qmp unix:'%(QMP_SOCK)s',server,nowait"
+ " -daemonize -D '%(QEMU_LOG)s' -pidfile '%(PID_FILE)s'") % globals()
+
+def qemu_colo_meta_data():
+ print("""\
+<?xml version="1.0"?>
+<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd">
+<resource-agent name="colo">
+
+ <version>0.1</version>
+ <longdesc lang="en">
+Resource agent for qemu COLO. (https://wiki.qemu.org/Features/COLO)
+
+After defining the master/slave instance, the master score has to be
+manually set to show which node has up-to-date data. So you copy your
+image to one host (and create empty images the other host(s)) and then
+run "crm_master -r name_of_your_primitive -v 10" on that host.
+
+Also, you have to set 'notify=true' in the metadata attributes when
+defining the master/slave instance.
+ </longdesc>
+ <shortdesc lang="en">Qemu COLO</shortdesc>
+
+ <parameters>
+
+ <parameter name="binary" unique="0" required="0">
+ <longdesc lang="en">Qemu binary to use</longdesc>
+ <shortdesc lang="en">Qemu binary</shortdesc>
+ <content type="string" default=\"""" \
+ + OCF_RESKEY_binary_default + """\"/>
+ </parameter>
+
+ <parameter name="log_dir" unique="0" required="0">
+ <longdesc lang="en">Directory to place logs in</longdesc>
+ <shortdesc lang="en">Log directory</shortdesc>
+ <content type="string" default=\"""" \
+ + OCF_RESKEY_log_dir_default + """\"/>
+ </parameter>
+
+ <parameter name="options" unique="0" required="1">
+ <longdesc lang="en">
+Options to pass to qemu. These will be passed alongside COLO specific
+options, so you need to follow these conventions: The netdev should have
+id=hn0 and the disk controller drive=colo-disk0. The image node should
+have id/node-name=parent0, but should not be connected to the guest.
+Example:
+-vnc :0 -enable-kvm -cpu qemu64,+kvmclock -m 512 -netdev bridge,id=hn0
+-device virtio-net,netdev=hn0 -device virtio-blk,drive=colo-disk0
+-drive if=none,id=parent0,format=qcow2,file=/mnt/vms/vm01.qcow2
+ </longdesc>
+ <shortdesc lang="en">Options to pass to qemu.</shortdesc>
+ </parameter>
+
+ <parameter name="disk_size" unique="0" required="1">
+ <longdesc lang="en">Disk size of the image</longdesc>
+ <shortdesc lang="en">Disk size of the image</shortdesc>
+ <content type="string" default=\"""" \
+ + OCF_RESKEY_disk_size_default + """\"/>
+ </parameter>
+
+ <parameter name="active_hidden_dir" unique="0" required="1">
+ <longdesc lang="en">
+Directory where the active and hidden images will be stored. It is
+recommended to put this on a ramdisk.
+ </longdesc>
+ <shortdesc lang="en">Path to active and hidden images</shortdesc>
+ <content type="string" default=\"""" \
+ + OCF_RESKEY_active_hidden_dir_default + """\"/>
+ </parameter>
+
+ <parameter name="listen_address" unique="0" required="0">
+ <longdesc lang="en">Address to listen on.</longdesc>
+ <shortdesc lang="en">Listen address</shortdesc>
+ <content type="string" default=\"""" \
+ + OCF_RESKEY_listen_address_default + """\"/>
+ </parameter>
+
+ <parameter name="base_port" unique="1" required="0">
+ <longdesc lang="en">
+4 tcp ports that are unique for each instance. (base_port to base_port + 3)
+ </longdesc>
+ <shortdesc lang="en">Ports to use</shortdesc>
+ <content type="integer" default=\"""" \
+ + OCF_RESKEY_base_port_default + """\"/>
+ </parameter>
+
+ <parameter name="checkpoint_interval" unique="0" required="0">
+ <longdesc lang="en">
+Interval for regular checkpoints in milliseconds.
+ </longdesc>
+ <shortdesc lang="en">Interval for regular checkpoints</shortdesc>
+ <content type="integer" default=\"""" \
+ + OCF_RESKEY_checkpoint_interval_default + """\"/>
+ </parameter>
+
+ <parameter name="debug" unique="0" required="0">
+ <longdesc lang="en">Enable debuging</longdesc>
+ <shortdesc lang="en">Enable debuging</shortdesc>
+ <content type="string" default=\"""" \
+ + OCF_RESKEY_debug_default + """\"/>
+ </parameter>
+
+ </parameters>
+
+ <actions>
+ <action name="start" timeout="30s" />
+ <action name="stop" timeout="20s" />
+ <action name="monitor" timeout="40s" \
+ interval="12s" depth="0" />
+ <action name="monitor" timeout="40s" \
+ interval="10s" depth="0" role="Slave" />
+ <action name="monitor" timeout="40s" \
+ interval="11s" depth="0" role="Master" />
+ <action name="notify" timeout="120s" />
+ <action name="promote" timeout="120s" />
+ <action name="demote" timeout="120s" />
+ <action name="meta-data" timeout="5s" />
+ <action name="validate-all" timeout="20s" />
+ </actions>
+
+</resource-agent>
+""")
+
+def logs_open():
+ global log
+ log = logging.getLogger(OCF_RESOURCE_INSTANCE)
+ log.setLevel(logging.DEBUG)
+ formater = logging.Formatter("(%(name)s) %(levelname)s: %(message)s")
+
+ if sys.stdout.isatty():
+ handler = logging.StreamHandler(stream=sys.stderr)
+ handler.setFormatter(formater)
+ log.addHandler(handler)
+
+ if HA_LOGFACILITY:
+ handler = logging.handlers.SysLogHandler("/dev/log")
+ handler.setFormatter(formater)
+ log.addHandler(handler)
+
+ if HA_LOGFILE:
+ handler = logging.FileHandler(HA_LOGFILE)
+ handler.setFormatter(formater)
+ log.addHandler(handler)
+
+ if HA_DEBUGLOG and HA_DEBUGLOG != HA_LOGFILE:
+ handler = logging.FileHandler(HA_DEBUGLOG)
+ handler.setFormatter(formater)
+ log.addHandler(handler)
+
+ global qmp_log
+ qmp_log = logging.getLogger("qmp_log")
+ qmp_log.setLevel(logging.DEBUG)
+ formater = logging.Formatter("%(message)s")
+
+ if is_true(OCF_RESKEY_debug):
+ handler = logging.handlers.WatchedFileHandler(QMP_LOG)
+ handler.setFormatter(formater)
+ qmp_log.addHandler(handler)
+ else:
+ handler = logging.NullHandler()
+ qmp_log.addHandler(handler)
+
+def rotate_logfile(logfile, numlogs):
+ numlogs -= 1
+ for n in range(numlogs, -1, -1):
+ file = logfile
+ if n != 0:
+ file = "%s.%s" % (file, n)
+ if os.path.exists(file):
+ if n == numlogs:
+ os.remove(file)
+ else:
+ newname = "%s.%s" % (logfile, n + 1)
+ os.rename(file, newname)
+
+def is_writable(file):
+ return os.access(file, os.W_OK)
+
+def is_executable_file(file):
+ return os.path.isfile(file) and os.access(file, os.X_OK)
+
+def is_true(var):
+ return re.match("yes|true|1|YES|TRUE|True|ja|on|ON", str(var)) != None
+
+# Check if the binary exists and is executable
+def check_binary(binary):
+ if is_executable_file(binary):
+ return True
+ PATH = os.getenv("PATH", os.defpath)
+ for dir in PATH.split(os.pathsep):
+ if is_executable_file(os.path.join(dir, binary)):
+ return True
+ log.error("binary \"%s\" doesn't exist or not executable" % binary)
+ return False
+
+def run_command(commandline):
+ proc = subprocess.Popen(commandline, shell=True, stdout=subprocess.PIPE,\
+ stderr=subprocess.STDOUT, universal_newlines=True)
+ stdout, stderr = proc.communicate()
+ if proc.returncode != 0:
+ log.error("command \"%s\" failed with code %s:\n%s" \
+ % (commandline, proc.returncode, stdout))
+ raise Error()
+
+# Functions for setting and getting the master score to tell Pacemaker which
+# host has the most recent data
+def set_master_score(score):
+ if score <= 0:
+ os.system("crm_master -q -l forever -D")
+ else:
+ os.system("crm_master -q -l forever -v %s" % score)
+
+def set_remote_master_score(remote, score):
+ if score <= 0:
+ os.system("crm_master -q -l forever -N '%s' -D" % remote)
+ else:
+ os.system("crm_master -q -l forever -N '%s' -v %s" % (remote, score))
+
+def get_master_score():
+ proc = subprocess.Popen("crm_master -q -G", shell=True, \
+ stdout=subprocess.PIPE, universal_newlines=True)
+ stdout, stderr = proc.communicate()
+ if proc.returncode != 0:
+ return 0
+ else:
+ return int(str.strip(stdout))
+
+def get_remote_master_score(remote):
+ proc = subprocess.Popen("crm_master -q -N '%s' -G" % remote, shell=True, \
+ stdout=subprocess.PIPE, universal_newlines=True)
+ stdout, stderr = proc.communicate()
+ if proc.returncode != 0:
+ return 0
+ else:
+ return int(str.strip(stdout))
+
+def recv_line(fd):
+ line = ""
+ while True:
+ tmp = fd.recv(1)
+ if tmp == "\n":
+ break
+ line += tmp
+ return line
+
+# Filter out events
+def read_answer(fd):
+ while True:
+ line = recv_line(fd)
+ qmp_log.debug(line)
+
+ answer = json.loads(line)
+ # Ignore everything else
+ if "return" in answer or "error" in answer:
+ break
+ return answer
+
+# Execute one or more qmp commands
+def qmp_execute(fd, commands, ignore_error = False):
+ for command in commands:
+ if not command:
+ continue
+
+ try:
+ to_send = json.dumps(command)
+ fd.sendall(to_send + "\n")
+ qmp_log.debug(to_send)
+
+ answer = read_answer(fd)
+ except Exception as e:
+ log.error("while executing qmp command: %s\n%s" \
+ % (json.dumps(command), e))
+ raise Error()
+
+ if not ignore_error and ("error" in answer):
+ log.error("qmp command returned error:\n%s\n%s" \
+ % (json.dumps(command), json.dumps(answer)))
+ raise Error()
+
+ return answer
+
+# Open qemu qmp connection
+def qmp_open(fail_fast = False):
+ # Timeout for commands = our timeout minus 10s
+ timeout = max(1, (OCF_RESKEY_CRM_meta_timeout/1000)-10)
+
+ try:
+ fd = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ if fail_fast:
+ fd.settimeout(min(10, (OCF_RESKEY_CRM_meta_timeout/1000)))
+ else:
+ fd.settimeout(timeout)
+ fd.connect(QMP_SOCK)
+
+ answer = qmp_execute(fd, [{"execute": "qmp_capabilities"}], True)
+ except Exception as e:
+ log.error("while connecting to qmp socket: %s" % e)
+ raise Error()
+
+ if "error" in answer:
+ log.error("while connecting to qmp socket: %s" % json.dumps(answer))
+ raise Error()
+
+ fd.settimeout(timeout)
+ return fd
+
+# Check qemu health and colo role
+def qmp_check_state(fd):
+ answer = qmp_execute(fd, [{"execute": "query-status"}])
+ vm_status = answer["return"]
+
+ answer = qmp_execute(fd, [{"execute": "query-colo-status"}])
+ colo_status = answer["return"]
+
+ if vm_status["status"] == "inmigrate":
+ role = OCF_SUCCESS
+ replication = OCF_NOT_RUNNING
+
+ elif (vm_status["status"] == "running" \
+ or vm_status["status"] == "colo" \
+ or vm_status["status"] == "finish-migrate") \
+ and colo_status["mode"] == "none" \
+ and (colo_status["reason"] == "request" \
+ or colo_status["reason"] == "none"):
+ role = OCF_RUNNING_MASTER
+ replication = OCF_NOT_RUNNING
+
+ elif (vm_status["status"] == "running" \
+ or vm_status["status"] == "colo" \
+ or vm_status["status"] == "finish-migrate") \
+ and colo_status["mode"] == "secondary":
+ role = OCF_SUCCESS
+ replication = OCF_SUCCESS
+
+ elif (vm_status["status"] == "running" \
+ or vm_status["status"] == "colo" \
+ or vm_status["status"] == "finish-migrate") \
+ and colo_status["mode"] == "primary":
+ role = OCF_RUNNING_MASTER
+ replication = OCF_SUCCESS
+
+ else:
+ log.error("unknown qemu status: vm status: %s colo mode: %s" \
+ % (vm_status["status"], colo_status["mode"]))
+ role = OCF_ERR_GENERIC
+ replication = OCF_ERR_GENERIC
+
+ return role, replication
+
+# Get the host of the nbd node
+def qmp_get_nbd_remote(fd):
+ block_nodes = qmp_execute(fd, [{"execute": "query-named-block-nodes"}])
+ for node in block_nodes["return"]:
+ if node["node-name"] == "nbd0":
+ url = str(node["image"]["filename"])
+ return str.split(url, "//")[1].split("/")[0].split(":")[0]
+ return None
+
+# Check if we are currently resyncing
+def qmp_check_resync(fd):
+ answer = qmp_execute(fd, [{"execute": "query-block-jobs"}])
+ for job in answer["return"]:
+ if job["device"] == "resync":
+ return job
+ return None
+
+def qmp_start_resync(fd, remote):
+ qmp_execute(fd, [
+ {"execute": "blockdev-add", "arguments": {"driver": "nbd", "node-name": "nbd0", "server": {"type": "inet", "host": str(remote), "port": str(NBD_PORT)}, "export": "parent0"}},
+ {"execute": "blockdev-mirror", "arguments": {"device": "colo-disk0", "job-id": "resync", "target": "nbd0", "sync": "full", "auto-dismiss": False}}
+ ])
+
+def qmp_cancel_resync(fd):
+ if qmp_check_resync(fd)["status"] != "concluded":
+ qmp_execute(fd, [{"execute": "block-job-cancel", "arguments": {"device": "resync", "force": True}}], True)
+ # Wait for the block-job to finish
+ while qmp_check_resync(fd)["status"] != "concluded":
+ time.sleep(1)
+
+ qmp_execute(fd, [
+ {"execute": "block-job-dismiss", "arguments": {"id": "resync"}},
+ {"execute": "blockdev-del", "arguments": {"node-name": "nbd0"}}
+ ])
+
+def qmp_start_colo(fd, remote):
+ # Check if we have a filter-rewriter
+ answer = qmp_execute(fd, [{"execute": "qom-list", "arguments": {"path": "/objects/rew0"}}], True)
+ if "error" in answer:
+ if answer["error"]["class"] == "DeviceNotFound":
+ have_filter_rewriter = False
+ else:
+ log.error("while checking for filter-rewriter:\n%s" \
+ % json.dumps(answer))
+ raise Error()
+ else:
+ have_filter_rewriter = True
+
+ # Pause VM and cancel resync
+ qmp_execute(fd, [
+ {"execute": "stop"},
+ {"execute": "block-job-cancel", "arguments": {"device": "resync"}}
+ ])
+
+ # Wait for the block-job to finish
+ while qmp_check_resync(fd)["status"] != "concluded":
+ time.sleep(1)
+
+ # Add the replication node
+ qmp_execute(fd, [
+ {"execute": "block-job-dismiss", "arguments": {"id": "resync"}},
+ {"execute": "blockdev-add", "arguments": {"driver": "replication", "node-name": "replication0", "mode": "primary", "file": "nbd0"}},
+ {"execute": "x-blockdev-change", "arguments": {"parent": "colo-disk0", "node": "replication0"}}
+ ])
+
+ # Connect mirror and compare_in to secondary
+ qmp_execute(fd, [
+ {"execute": "chardev-add", "arguments": {"id": "mirror0", "backend": {"type": "socket", "data": {"addr": {"type": "inet", "data": {"host": str(remote), "port": str(MIROR_PORT)}}, "server": False, "reconnect": 1}}}},
+ {"execute": "chardev-add", "arguments": {"id": "compare1", "backend": {"type": "socket", "data": {"addr": {"type": "inet", "data": {"host": str(remote), "port": str(COMPARE_IN_PORT)}}, "server": False, "reconnect": 1}}}}
+ ])
+
+ # Add the COLO filters
+ if have_filter_rewriter:
+ qmp_execute(fd, [
+ {"execute": "object-add", "arguments": {"qom-type": "filter-mirror", "id": "m0", "props": {"insert": "before", "position": "id=rew0", "netdev": "hn0", "queue": "tx", "outdev": "mirror0"}}},
+ {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire0", "props": {"insert": "before", "position": "id=rew0", "netdev": "hn0", "queue": "rx", "indev": "comp_out"}}},
+ {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire1", "props": {"insert": "before", "position": "id=rew0", "netdev": "hn0", "queue": "rx", "outdev": "comp0"}}},
+ {"execute": "object-add", "arguments": {"qom-type": "iothread", "id": "iothread1"}},
+ {"execute": "object-add", "arguments": {"qom-type": "colo-compare", "id": "comp0", "props": {"primary_in": "comp0-0", "secondary_in": "compare1", "outdev": "comp_out0", "iothread": "iothread1"}}}
+ ])
+ else:
+ qmp_execute(fd, [
+ {"execute": "object-add", "arguments": {"qom-type": "filter-mirror", "id": "m0", "props": {"netdev": "hn0", "queue": "tx", "outdev": "mirror0"}}},
+ {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire0", "props": {"netdev": "hn0", "queue": "rx", "indev": "comp_out"}}},
+ {"execute": "object-add", "arguments": {"qom-type": "filter-redirector", "id": "redire1", "props": {"netdev": "hn0", "queue": "rx", "outdev": "comp0"}}},
+ {"execute": "object-add", "arguments": {"qom-type": "iothread", "id": "iothread1"}},
+ {"execute": "object-add", "arguments": {"qom-type": "colo-compare", "id": "comp0", "props": {"primary_in": "comp0-0", "secondary_in": "compare1", "outdev": "comp_out0", "iothread": "iothread1"}}}
+ ])
+
+ # Start COLO
+ qmp_execute(fd, [
+ {"execute": "migrate-set-capabilities", "arguments": {"capabilities": [{"capability": "x-colo", "state": True }] }},
+ {"execute": "migrate-set-parameters" , "arguments": {"x-checkpoint-delay": int(OCF_RESKEY_checkpoint_interval)}},
+ {"execute": "migrate", "arguments": {"uri": "tcp:%s:%s" % (remote, MIGRATE_PORT)}}
+ ])
+
+ # Wait for COLO to start
+ while qmp_execute(fd, [{"execute": "query-status"}])["return"]["status"] \
+ == "paused":
+ time.sleep(1)
+
+def qmp_primary_failover(fd):
+ qmp_execute(fd, [
+ {"execute": "x-blockdev-change", "arguments": {"parent": "colo-disk0", "child": "children.1"}},
+ {"execute": "blockdev-del", "arguments": {"node-name": "replication0"}},
+ {"execute": "blockdev-del", "arguments": {"node-name": "nbd0"}},
+ {"execute": "object-del", "arguments": {"id": "comp0"}},
+ {"execute": "object-del", "arguments": {"id": "iothread1"}},
+ {"execute": "object-del", "arguments": {"id": "m0"}},
+ {"execute": "object-del", "arguments": {"id": "redire0"}},
+ {"execute": "object-del", "arguments": {"id": "redire1"}},
+ {"execute": "chardev-remove", "arguments": {"id": "mirror0"}},
+ {"execute": "chardev-remove", "arguments": {"id": "compare1"}},
+ {"execute": "x-colo-lost-heartbeat"}
+ ])
+
+def qmp_secondary_failover(fd):
+ # Stop the NBD server, and resume
+ qmp_execute(fd, [
+ {"execute": "nbd-server-stop"},
+ {"execute": "x-colo-lost-heartbeat"}
+ ])
+
+ # Prepare for continuing replication when we have a new secondary
+ qmp_execute(fd, [
+ {"execute": "object-del", "arguments": {"id": "f2"}},
+ {"execute": "object-del", "arguments": {"id": "f1"}},
+ {"execute": "chardev-remove", "arguments": {"id": "red1"}},
+ {"execute": "chardev-remove", "arguments": {"id": "red0"}},
+ {"execute": "chardev-add", "arguments": {"id": "comp0", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_SOCK)}}, "server": True}}}},
+ {"execute": "chardev-add", "arguments": {"id": "comp0-0", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_SOCK)}}, "server": False}}}},
+ {"execute": "chardev-add", "arguments": {"id": "comp_out", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_OUT_SOCK)}}, "server": True}}}},
+ {"execute": "chardev-add", "arguments": {"id": "comp_out0", "backend": {"type": "socket", "data": {"addr": {"type": "unix", "data": {"path": str(COMP_OUT_SOCK)}}, "server": False}}}}
+ ])
+
+# Sanity checks: check parameters, files, binaries, etc.
+def qemu_colo_validate_all():
+ # Check resource parameters
+ if not str.isdigit(OCF_RESKEY_base_port):
+ log.error("base_port needs to be a number")
+ return OCF_ERR_CONFIGURED
+
+ if not str.isdigit(OCF_RESKEY_checkpoint_interval):
+ log.error("checkpoint_interval needs to be a number")
+ return OCF_ERR_CONFIGURED
+
+ if not OCF_RESKEY_active_hidden_dir:
+ log.error("active_hidden_dir needs to be specified")
+ return OCF_ERR_CONFIGURED
+
+ if not OCF_RESKEY_disk_size:
+ log.error("disk_size needs to be specified")
+ return OCF_ERR_CONFIGURED
+
+ # Check resource meta configuration
+ if OCF_ACTION != "stop":
+ if OCF_RESKEY_CRM_meta_master_max != 1:
+ log.error("only one master allowed")
+ return OCF_ERR_CONFIGURED
+
+ if OCF_RESKEY_CRM_meta_clone_max > 2:
+ log.error("maximum 2 clones allowed")
+ return OCF_ERR_CONFIGURED
+
+ if OCF_RESKEY_CRM_meta_master_node_max != 1:
+ log.error("only one master per node allowed")
+ return OCF_ERR_CONFIGURED
+
+ if OCF_RESKEY_CRM_meta_clone_node_max != 1:
+ log.error("only one clone per node allowed")
+ return OCF_ERR_CONFIGURED
+
+ # Check if notify is enabled
+ if OCF_ACTION != "stop" and OCF_ACTION != "monitor":
+ if not is_true(OCF_RESKEY_CRM_meta_notify) \
+ and not OCF_RESKEY_CRM_meta_notify_start_uname:
+ log.error("notify needs to be enabled")
+ return OCF_ERR_CONFIGURED
+
+ # Check that globally-unique is disabled
+ if is_true(OCF_RESKEY_CRM_meta_globally_unique):
+ log.error("globally-unique needs to be disabled")
+ return OCF_ERR_CONFIGURED
+
+ # Check binaries
+ if not check_binary(OCF_RESKEY_binary):
+ return OCF_ERR_INSTALLED
+
+ if not check_binary("qemu-img"):
+ return OCF_ERR_INSTALLED
+
+ # Check paths and files
+ if not is_writable(OCF_RESKEY_active_hidden_dir) \
+ or not os.path.isdir(OCF_RESKEY_active_hidden_dir):
+ log.error("active and hidden image directory missing or not writable")
+ return OCF_ERR_PERM
+
+ return OCF_SUCCESS
+
+# Check if qemu is running
+def check_pid():
+ if not os.path.exists(PID_FILE):
+ return OCF_NOT_RUNNING, None
+
+ fd = open(PID_FILE, "r")
+ pid = int(str.strip(fd.readline()))
+ fd.close()
+ try:
+ os.kill(pid, 0)
+ except OSError:
+ log.info("qemu is not running")
+ return OCF_NOT_RUNNING, pid
+ else:
+ return OCF_SUCCESS, pid
+
+def qemu_colo_monitor(fail_fast = False):
+ status, pid = check_pid()
+ if status != OCF_SUCCESS:
+ return status, OCF_ERR_GENERIC
+
+ fd = qmp_open(fail_fast)
+
+ role, replication = qmp_check_state(fd)
+ if role != OCF_SUCCESS and role != OCF_RUNNING_MASTER:
+ return role, replication
+
+ if not fail_fast and OCF_RESKEY_CRM_meta_interval != 0:
+ # This isn't a probe monitor
+ block_job = qmp_check_resync(fd)
+ if block_job != None:
+ if "error" in block_job:
+ log.error("resync error: %s" % block_job["error"])
+ qmp_cancel_resync(fd)
+ # TODO: notify pacemaker about peer failure
+ elif block_job["ready"] == True:
+ log.info("resync done, starting colo")
+ peer = qmp_get_nbd_remote(fd)
+ qmp_start_colo(fd, peer)
+ # COLO started, our secondary now can be promoted if the
+ # primary fails
+ set_remote_master_score(peer, 100)
+ else:
+ pct_done = (float(block_job["offset"]) \
+ / float(block_job["len"])) * 100
+ log.info("resync %.2f%% done" % pct_done)
+
+ fd.close()
+
+ return role, replication
+
+def qemu_colo_start():
+ if check_pid()[0] == OCF_SUCCESS:
+ log.info("qemu is already running")
+ return OCF_SUCCESS
+
+ run_command("qemu-img create -q -f qcow2 %s %s" \
+ % (ACTIVE_IMAGE, OCF_RESKEY_disk_size))
+ run_command("qemu-img create -q -f qcow2 %s %s" \
+ % (HIDDEN_IMAGE, OCF_RESKEY_disk_size))
+
+ rotate_logfile(QMP_LOG, 4)
+ rotate_logfile(QEMU_LOG, 4)
+ run_command(QEMU_SECONDARY_CMDLINE)
+
+ fd = qmp_open()
+ qmp_execute(fd, [
+ {"execute": "nbd-server-start", "arguments": {"addr": {"type": "inet", "data": {"host": str(OCF_RESKEY_listen_address), "port": str(NBD_PORT)}}}},
+ {"execute": "nbd-server-add", "arguments": {"device": "parent0", "writable": True}}
+ ])
+ fd.close()
+
+ return OCF_SUCCESS
+
+def env_do_shutdown_guest():
+ return OCF_RESKEY_CRM_meta_notify_active_uname \
+ and OCF_RESKEY_CRM_meta_notify_stop_uname \
+ and str.strip(OCF_RESKEY_CRM_meta_notify_active_uname) \
+ == str.strip(OCF_RESKEY_CRM_meta_notify_stop_uname)
+
+def env_find_secondary():
+ # slave(s) =
+ # OCF_RESKEY_CRM_meta_notify_slave_uname
+ # - OCF_RESKEY_CRM_meta_notify_stop_uname
+ # + OCF_RESKEY_CRM_meta_notify_start_uname
+ # Filter out hosts that are stopping and ourselves
+ for host in str.split(OCF_RESKEY_CRM_meta_notify_slave_uname, " "):
+ if host:
+ for stopping_host \
+ in str.split(OCF_RESKEY_CRM_meta_notify_stop_uname, " "):
+ if host == stopping_host:
+ break
+ else:
+ if host != HOSTNAME:
+ # we found a valid secondary
+ return host
+
+ for host in str.split(OCF_RESKEY_CRM_meta_notify_start_uname, " "):
+ if host != HOSTNAME:
+ # we found a valid secondary
+ return host
+
+ # we found no secondary
+ return None
+
+def _qemu_colo_stop(monstatus, shutdown_guest):
+ # stop action must do everything possible to stop the resource
+ try:
+ now = time.time()
+ timeout = now + (OCF_RESKEY_CRM_meta_timeout/1000)-10
+ force_stop = False
+
+ if monstatus == OCF_NOT_RUNNING:
+ log.info("resource is already stopped")
+ return OCF_SUCCESS
+ elif monstatus == OCF_RUNNING_MASTER or monstatus == OCF_SUCCESS:
+ force_stop = False
+ else:
+ force_stop = True
+
+ if not force_stop:
+ fd = qmp_open(True)
+ if shutdown_guest:
+ if monstatus == OCF_RUNNING_MASTER:
+ qmp_execute(fd, [{"execute": "system_powerdown"}])
+ else:
+ qmp_execute(fd, [{"execute": "quit"}])
+ fd.close()
+
+ # wait for qemu to stop
+ while time.time() < timeout:
+ status, pid = check_pid()
+ if status == OCF_NOT_RUNNING:
+ # qemu stopped
+ return OCF_SUCCESS
+ elif status == OCF_SUCCESS:
+ # wait
+ time.sleep(1)
+ else:
+ # something went wrong, force stop instead
+ break
+
+ log.warning("clean stop timeout reached")
+ except Exception as e:
+ log.warning("error while stopping: \"%s\"" % e.message)
+
+ log.info("force stopping qemu")
+
+ status, pid = check_pid()
+ if status == OCF_NOT_RUNNING:
+ return OCF_SUCCESS
+ try:
+ os.kill(pid, signal.SIGTERM)
+ time.sleep(2)
+ os.kill(pid, signal.SIGKILL)
+ except Exception:
+ pass
+
+ while check_pid()[0] != OCF_NOT_RUNNING:
+ time.sleep(1)
+
+ return OCF_SUCCESS
+
+def qemu_colo_stop():
+ shutdown_guest = env_do_shutdown_guest()
+ try:
+ role, replication = qemu_colo_monitor(True)
+ except Exception:
+ role, replication = OCF_ERR_GENERIC, OCF_ERR_GENERIC
+
+ status = _qemu_colo_stop(role, shutdown_guest)
+
+ if HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname):
+ peer = env_find_secondary()
+ if peer and (get_remote_master_score(peer) > 10) \
+ and not shutdown_guest:
+ # We where a healthy primary and had a healthy secondary. We where
+ # stopped so outdate ourselves, as the secondary will take over.
+ set_master_score(0)
+ else:
+ if role == OCF_RUNNING_MASTER:
+ # We where a healthy primary but had no healty secondary or it
+ # was stopped as well. So we have up-to-date data.
+ set_master_score(10)
+ else:
+ # We where a unhealthy primary but also had no healty secondary.
+ # So we still should have up-to-date data.
+ set_master_score(5)
+ else:
+ if get_master_score() > 10:
+ if role == OCF_SUCCESS:
+ if shutdown_guest:
+ # We where a healthy secondary and (probably) had a healthy
+ # primary and both where stopped. So we have up-to-date data
+ # too.
+ set_master_score(10)
+ else:
+ # We where a healthy secondary and (probably) had a healthy
+ # primary still running. So we are now out of date.
+ set_master_score(0)
+ else:
+ # We where a unhealthy secondary. So we are now out of date.
+ set_master_score(0)
+
+ return status
+
+def qemu_colo_notify():
+ action = "%s-%s" % (OCF_RESKEY_CRM_meta_notify_type, \
+ OCF_RESKEY_CRM_meta_notify_operation)
+
+ if action == "post-start":
+ if HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname):
+ peer = str.strip(OCF_RESKEY_CRM_meta_notify_start_uname)
+ fd = qmp_open()
+ qmp_start_resync(fd, peer)
+ fd.close()
+ # The secondary has inconsistent data until resync is finished
+ set_remote_master_score(peer, 0)
+
+ elif action == "pre-stop":
+ if not env_do_shutdown_guest() \
+ and HOSTNAME == str.strip(OCF_RESKEY_CRM_meta_notify_master_uname):
+ fd = qmp_open()
+ peer = qmp_get_nbd_remote(fd)
+ if peer == str.strip(OCF_RESKEY_CRM_meta_notify_stop_uname):
+ if qmp_check_resync(fd) != None:
+ qmp_cancel_resync(fd)
+ elif peer and get_remote_master_score(peer) > 10:
+ qmp_primary_failover(fd)
+ fd.close()
+
+ return OCF_SUCCESS
+
+def qemu_colo_promote():
+ role, replication = qemu_colo_monitor()
+
+ if role == OCF_SUCCESS and replication == OCF_NOT_RUNNING:
+ status = _qemu_colo_stop(role, False)
+ if status != OCF_SUCCESS:
+ return status
+
+ rotate_logfile(QMP_LOG, 4)
+ rotate_logfile(QEMU_LOG, 4)
+ run_command(QEMU_PRIMARY_CMDLINE)
+ set_master_score(101)
+
+ peer = env_find_secondary()
+ if peer:
+ fd = qmp_open()
+ qmp_start_resync(fd, peer)
+ # The secondary has inconsistent data until resync is finished
+ set_remote_master_score(peer, 0)
+ fd.close()
+ return OCF_SUCCESS
+ elif role == OCF_SUCCESS and replication == OCF_SUCCESS:
+ fd = qmp_open()
+ qmp_secondary_failover(fd)
+ set_master_score(101)
+
+ peer = env_find_secondary()
+ if peer:
+ qmp_start_resync(fd, peer)
+ # The secondary has inconsistent data until resync is finished
+ set_remote_master_score(peer, 0)
+ fd.close()
+ return OCF_SUCCESS
+ else:
+ return OCF_ERR_GENERIC
+
+def qemu_colo_demote():
+ status = qemu_colo_stop()
+ if status != OCF_SUCCESS:
+ return status
+ return qemu_colo_start()
+
+
+if OCF_ACTION == "meta-data":
+ qemu_colo_meta_data()
+ exit(OCF_SUCCESS)
+
+logs_open()
+
+status = qemu_colo_validate_all()
+# Exit here if our sanity checks fail, but try to continue if we need to stop
+if status != OCF_SUCCESS and OCF_ACTION != "stop":
+ exit(status)
+
+setup_constants()
+
+os.system("echo 'Action: %s' >> /tmp/ra-env.log" % OCF_ACTION)
+os.system("env >> /tmp/ra-env.log")
+
+try:
+ if OCF_ACTION == "start":
+ status = qemu_colo_start()
+ elif OCF_ACTION == "stop":
+ status = qemu_colo_stop()
+ elif OCF_ACTION == "monitor":
+ status = qemu_colo_monitor()[0]
+ elif OCF_ACTION == "notify":
+ status = qemu_colo_notify()
+ elif OCF_ACTION == "promote":
+ status = qemu_colo_promote()
+ elif OCF_ACTION == "demote":
+ status = qemu_colo_demote()
+ elif OCF_ACTION == "validate-all":
+ status = qemu_colo_validate_all()
+ else:
+ status = OCF_ERR_UNIMPLEMENTED
+except Error:
+ exit(OCF_ERR_GENERIC)
+else:
+ exit(status)
--
2.20.1
^ permalink raw reply related [flat|nested] 13+ messages in thread
* [PATCH 3/4] colo: Introduce high-level test
2019-11-21 17:49 [PATCH 0/4] colo: Introduce resource agent and high-level test Lukas Straub
2019-11-21 17:49 ` [PATCH 1/4] block/quorum.c: stable children names Lukas Straub
2019-11-21 17:49 ` [PATCH 2/4] colo: Introduce resource agent Lukas Straub
@ 2019-11-21 17:49 ` Lukas Straub
2019-11-21 17:49 ` [PATCH 4/4] MAINTAINERS: Add myself as maintainer for COLO resource agent Lukas Straub
2019-11-22 9:46 ` [PATCH 0/4] colo: Introduce resource agent and high-level test Dr. David Alan Gilbert
4 siblings, 0 replies; 13+ messages in thread
From: Lukas Straub @ 2019-11-21 17:49 UTC (permalink / raw)
To: qemu-devel
Cc: Zhang, Chen, Jason Wang, Alberto Garcia, Dr. David Alan Gilbert
Add high-level test relying on the colo resource-agent to test
all failover cases while checking guest network connectivity
Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
scripts/colo-resource-agent/crm_master | 44 +++
tests/acceptance/colo.py | 444 +++++++++++++++++++++++++
2 files changed, 488 insertions(+)
create mode 100755 scripts/colo-resource-agent/crm_master
create mode 100644 tests/acceptance/colo.py
diff --git a/scripts/colo-resource-agent/crm_master b/scripts/colo-resource-agent/crm_master
new file mode 100755
index 0000000000..00c386b949
--- /dev/null
+++ b/scripts/colo-resource-agent/crm_master
@@ -0,0 +1,44 @@
+#!/bin/bash
+
+# Fake crm_master for COLO testing
+#
+# Copyright (c) Lukas Straub <lukasstraub2@web.de>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+TMPDIR="$HA_RSCTMP"
+score=0
+query=0
+
+OPTIND=1
+while getopts 'Qql:Dv:N:G' opt; do
+ case "$opt" in
+ Q|q)
+ # Noop
+ ;;
+ "l")
+ # Noop
+ ;;
+ "D")
+ score=0
+ ;;
+ "v")
+ score=$OPTARG
+ ;;
+ "N")
+ TMPDIR="$COLO_SMOKE_REMOTE_TMP"
+ ;;
+ "G")
+ query=1
+ ;;
+ esac
+done
+
+if (( query )); then
+ cat "${TMPDIR}/master_score" || exit 1
+else
+ echo $score > "${TMPDIR}/master_score" || exit 1
+fi
+
+exit 0
diff --git a/tests/acceptance/colo.py b/tests/acceptance/colo.py
new file mode 100644
index 0000000000..94a6adabdd
--- /dev/null
+++ b/tests/acceptance/colo.py
@@ -0,0 +1,444 @@
+#!/usr/bin/env python
+
+# High-level test for qemu COLO testing all failover cases while checking
+# guest network connectivity
+#
+# Copyright (c) Lukas Straub <lukasstraub2@web.de>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+
+import select
+import sys
+import subprocess
+import shutil
+import os
+import signal
+import os.path
+import json
+import time
+import tempfile
+
+from avocado import Test
+from avocado.utils.archive import gzip_uncompress
+from avocado.utils import network
+from avocado_qemu import pick_default_qemu_bin, SRC_ROOT_DIR
+
+class ColoTest(Test):
+ timeout = 120
+
+ # Constants
+ OCF_SUCCESS = 0
+ OCF_ERR_GENERIC = 1
+ OCF_ERR_ARGS = 2
+ OCF_ERR_UNIMPLEMENTED = 3
+ OCF_ERR_PERM = 4
+ OCF_ERR_INSTALLED = 5
+ OCF_ERR_CONFIGURED = 6
+ OCF_NOT_RUNNING = 7
+ OCF_RUNNING_MASTER = 8
+ OCF_FAILED_MASTER = 9
+
+ HOSTA = 10
+ HOSTB = 11
+
+ QEMU_OPTIONS = (" -enable-kvm -cpu qemu64,+kvmclock -m 256"
+ " -device virtio-net,netdev=hn0"
+ " -device virtio-blk,drive=colo-disk0")
+ COLO_RA = "scripts/colo-resource-agent/colo"
+ FAKEPATH = ".:scripts/colo-resource-agent"
+
+ bridge_proc = None
+ ssh_proc = None
+
+ def setUp(self):
+ # Qemu binary
+ default_qemu_bin = pick_default_qemu_bin()
+ self.QEMU_BINARY = self.params.get('qemu_bin', default=default_qemu_bin)
+
+ # Find free port range
+ base_port = 1024
+ while True:
+ base_port = network.find_free_port(start_port=base_port, \
+ address="127.0.0.1")
+ if base_port == None:
+ self.cancel("Failed to find a free port")
+ for n in range(base_port, base_port +6):
+ if not network.is_port_free(n, "127.0.0.1"):
+ base_port = n +1
+ break
+ else:
+ # for loop above didn't break
+ break
+
+ self.BRIDGE_HOSTA_PORT = base_port
+ self.BRIDGE_HOSTB_PORT = base_port + 1
+ self.SSH_PORT = base_port + 2
+ self.COLO_BASE_PORT = base_port + 3
+
+ # Temporary directories
+ self.TMPDIR = tempfile.mkdtemp()
+ self.TMPA = os.path.join(self.TMPDIR, "hosta")
+ self.TMPB = os.path.join(self.TMPDIR, "hostb")
+ os.makedirs(self.TMPA)
+ os.makedirs(self.TMPB)
+
+ # Disk images
+ self.HOSTA_IMAGE = os.path.join(self.TMPA, "image.raw")
+ self.HOSTB_IMAGE = os.path.join(self.TMPB, "image.raw")
+
+ image_url = ("https://downloads.openwrt.org/releases/18.06.5/targets/"
+ "x86/64/openwrt-18.06.5-x86-64-combined-ext4.img.gz")
+ image_hash = ("55589a3a9b943218b1734d196bcaa92a"
+ "3cfad91c07fa6891474b4291ce1b8ec2")
+ self.IMAGE_SIZE = "285736960b"
+ download = self.fetch_asset(image_url, asset_hash=image_hash, \
+ algorithm="sha256")
+ gzip_uncompress(download, self.HOSTA_IMAGE)
+ shutil.copyfile(self.HOSTA_IMAGE, self.HOSTB_IMAGE)
+
+ self.log.info("Will put logs in \"%s\"" % self.outputdir)
+ self.RA_LOG = os.path.join(self.outputdir, "resource-agent.log")
+ self.HOSTA_LOGDIR = os.path.join(self.outputdir, "hosta")
+ self.HOSTB_LOGDIR = os.path.join(self.outputdir, "hostb")
+ os.makedirs(self.HOSTA_LOGDIR)
+ os.makedirs(self.HOSTB_LOGDIR)
+
+ # Network bridge
+ self.BRIDGE_PIDFILE = os.path.join(self.TMPDIR, "bridge.pid")
+ pid = self.read_pidfile(self.BRIDGE_PIDFILE)
+ if not (pid and self.check_pid(pid)):
+ self.run_command(("%s -M none -daemonize -pidfile '%s'"
+ " -netdev socket,id=hosta,listen=127.0.0.1:%s"
+ " -netdev hubport,id=porta,hubid=0,netdev=hosta"
+ " -netdev socket,id=hostb,listen=127.0.0.1:%s"
+ " -netdev hubport,id=portb,hubid=0,netdev=hostb"
+ " -netdev user,net=192.168.1.1/24,host=192.168.1.2,"
+ "hostfwd=tcp:127.0.0.1:%s-192.168.1.1:22,id=host"
+ " -netdev hubport,id=hostport,hubid=0,netdev=host")
+ % (self.QEMU_BINARY, self.BRIDGE_PIDFILE,
+ self.BRIDGE_HOSTA_PORT, self.BRIDGE_HOSTB_PORT,
+ self.SSH_PORT), 0)
+
+ def tearDown(self):
+ try:
+ pid = self.read_pidfile(self.BRIDGE_PIDFILE)
+ if pid and self.check_pid(pid):
+ os.kill(pid, signal.SIGKILL)
+ except Exception():
+ pass
+ try:
+ self.ra_stop(self.HOSTA)
+ except Exception():
+ pass
+ try:
+ self.ra_stop(self.HOSTB)
+ except Exception():
+ pass
+ try:
+ if self.ssh_proc:
+ self.ssh_proc.terminate()
+ except Exception():
+ pass
+
+ shutil.rmtree(self.TMPDIR)
+
+ def run_command(self, cmdline, expected_status, env=None, error_fail=True):
+ proc = subprocess.Popen(cmdline, shell=True, stdout=subprocess.PIPE, \
+ stderr=subprocess.STDOUT, \
+ universal_newlines=True, env=env)
+ stdout, stderr = proc.communicate()
+ if proc.returncode != expected_status:
+ message = "command \"%s\" failed with code %s:\n%s" \
+ % (cmdline, proc.returncode, stdout)
+ if error_fail:
+ self.log.error(message)
+ self.fail("command \"%s\" failed" % cmdline)
+ else:
+ self.log.info(message)
+
+ return proc.returncode
+
+ def cat_line(self, path):
+ line=""
+ try:
+ fd = open(path, "r")
+ line = str.strip(fd.readline())
+ fd.close()
+ except:
+ pass
+ return line
+
+ def read_pidfile(self, pidfile):
+ try:
+ pid = int(self.cat_line(pidfile))
+ except ValueError:
+ return None
+ else:
+ return pid
+
+ def check_pid(self, pid):
+ try:
+ os.kill(pid, 0)
+ except OSError:
+ return False
+ else:
+ return True
+
+ def ssh_ping(self, proc):
+ proc.stdin.write("ping\n")
+ if not select.select([proc.stdout], [], [], 30)[0]:
+ raise self.fail("ssh ping timeout reached")
+ if proc.stdout.readline() != "ping\n":
+ raise self.fail("unexpected ssh ping answer")
+
+ def ssh_open(self):
+ commandline = ("ssh -o \"UserKnownHostsFile /dev/null\""
+ " -o \"StrictHostKeyChecking no\""
+ " -p%s root@127.0.0.1") % self.SSH_PORT
+
+ self.log.info("Connecting via ssh")
+ for i in range(10):
+ if self.run_command(commandline + " exit", 0, error_fail=False) \
+ == 0:
+ proc = subprocess.Popen(commandline + " cat", shell=True, \
+ stdin=subprocess.PIPE, \
+ stdout=subprocess.PIPE, \
+ stderr=0, \
+ universal_newlines=True,
+ bufsize=1)
+ self.ssh_ping(proc)
+ return proc
+ else:
+ time.sleep(5)
+ self.fail("ssh connect timeout reached")
+
+ def ssh_close(self, proc):
+ proc.terminate()
+
+ def setup_base_env(self, host):
+ PATH = os.getenv("PATH", "")
+ env = { "PATH": "%s:%s" % (self.FAKEPATH, PATH),
+ "HA_LOGFILE": self.RA_LOG,
+ "OCF_RESOURCE_INSTANCE": "colo-test",
+ "OCF_RESKEY_CRM_meta_clone_max": "2",
+ "OCF_RESKEY_CRM_meta_notify": "true",
+ "OCF_RESKEY_CRM_meta_timeout": "30000",
+ "OCF_RESKEY_binary": self.QEMU_BINARY,
+ "OCF_RESKEY_disk_size": str(self.IMAGE_SIZE),
+ "OCF_RESKEY_checkpoint_interval": "1000",
+ "OCF_RESKEY_base_port": str(self.COLO_BASE_PORT),
+ "OCF_RESKEY_debug": "true"}
+
+ if host == self.HOSTA:
+ env.update({"OCF_RESKEY_options":
+ ("%s -netdev socket,id=hn0,connect=127.0.0.1:%s"
+ " -drive if=none,id=parent0,format=raw,file='%s'")
+ % (self.QEMU_OPTIONS, self.BRIDGE_HOSTA_PORT,
+ self.HOSTA_IMAGE),
+ "OCF_RESKEY_active_hidden_dir": self.TMPA,
+ "OCF_RESKEY_listen_address": "127.0.0.1",
+ "OCF_RESKEY_log_dir": self.HOSTA_LOGDIR,
+ "OCF_RESKEY_CRM_meta_on_node": "127.0.0.1",
+ "HA_RSCTMP": self.TMPA,
+ "COLO_SMOKE_REMOTE_TMP": self.TMPB})
+ else:
+ env.update({"OCF_RESKEY_options":
+ ("%s -netdev socket,id=hn0,connect=127.0.0.1:%s"
+ " -drive if=none,id=parent0,format=raw,file='%s'")
+ % (self.QEMU_OPTIONS, self.BRIDGE_HOSTB_PORT,
+ self.HOSTB_IMAGE),
+ "OCF_RESKEY_active_hidden_dir": self.TMPB,
+ "OCF_RESKEY_listen_address": "127.0.0.2",
+ "OCF_RESKEY_log_dir": self.HOSTB_LOGDIR,
+ "OCF_RESKEY_CRM_meta_on_node": "127.0.0.2",
+ "HA_RSCTMP": self.TMPB,
+ "COLO_SMOKE_REMOTE_TMP": self.TMPA})
+ return env
+
+ def ra_start(self, host):
+ env = self.setup_base_env(host)
+ self.run_command(self.COLO_RA + " start", self.OCF_SUCCESS, env)
+
+ def ra_stop(self, host):
+ env = self.setup_base_env(host)
+ self.run_command(self.COLO_RA + " stop", self.OCF_SUCCESS, env)
+
+ def ra_monitor(self, host, expected_status):
+ env = self.setup_base_env(host)
+ self.run_command(self.COLO_RA + " monitor", expected_status, env)
+
+ def ra_promote(self, host):
+ env = self.setup_base_env(host)
+ self.run_command(self.COLO_RA + " promote", self.OCF_SUCCESS, env)
+
+ def ra_notify_start(self, host):
+ env = self.setup_base_env(host)
+
+ env.update({"OCF_RESKEY_CRM_meta_notify_type": "post",
+ "OCF_RESKEY_CRM_meta_notify_operation": "start"})
+
+ if host == self.HOSTA:
+ env.update({"OCF_RESKEY_CRM_meta_notify_master_uname": "127.0.0.1",
+ "OCF_RESKEY_CRM_meta_notify_start_uname": "127.0.0.2"})
+ else:
+ env.update({"OCF_RESKEY_CRM_meta_notify_master_uname": "127.0.0.2",
+ "OCF_RESKEY_CRM_meta_notify_start_uname": "127.0.0.1"})
+
+ self.run_command(self.COLO_RA + " notify", self.OCF_SUCCESS, env)
+
+ def ra_notify_stop(self, host):
+ env = self.setup_base_env(host)
+
+ env.update({"OCF_RESKEY_CRM_meta_notify_type": "pre",
+ "OCF_RESKEY_CRM_meta_notify_operation": "stop"})
+
+ if host == self.HOSTA:
+ env.update({"OCF_RESKEY_CRM_meta_notify_master_uname": "127.0.0.1",
+ "OCF_RESKEY_CRM_meta_notify_stop_uname": "127.0.0.2"})
+ else:
+ env.update({"OCF_RESKEY_CRM_meta_notify_master_uname": "127.0.0.2",
+ "OCF_RESKEY_CRM_meta_notify_stop_uname": "127.0.0.1"})
+
+ self.run_command(self.COLO_RA + " notify", self.OCF_SUCCESS, env)
+
+ def kill_qemu_pre(self, host, hang_qemu=False):
+ if host == self.HOSTA:
+ pid = self.read_pidfile(os.path.join(self.TMPA, \
+ "colo-test-qemu.pid"))
+ else:
+ pid = self.read_pidfile(os.path.join(self.TMPB, \
+ "colo-test-qemu.pid"))
+
+ if pid and self.check_pid(pid):
+ if hang_qemu:
+ os.kill(pid, signal.SIGSTOP)
+ else:
+ os.kill(pid, signal.SIGKILL)
+ while self.check_pid(pid):
+ time.sleep(1)
+
+ def kill_qemu_post(self, host, hang_qemu=False):
+ if host == self.HOSTA:
+ pid = self.read_pidfile(os.path.join(self.TMPA, \
+ "colo-test-qemu.pid"))
+ else:
+ pid = self.read_pidfile(os.path.join(self.TMPB, \
+ "colo-test-qemu.pid"))
+
+ if hang_qemu and pid and self.check_pid(pid):
+ os.kill(pid, signal.SIGKILL)
+ while self.check_pid(pid):
+ time.sleep(1)
+
+ def get_master_score(self, host):
+ if host == self.HOSTA:
+ return int(self.cat_line(os.path.join(self.TMPA, "master_score")))
+ else:
+ return int(self.cat_line(os.path.join(self.TMPB, "master_score")))
+
+ def _test_colo(self, hang_qemu=False, loop=False, do_ssh_ping=True):
+ self.ra_stop(self.HOSTA)
+ self.ra_stop(self.HOSTB)
+
+ self.log.info("Startup")
+ self.ra_start(self.HOSTA)
+ self.ra_start(self.HOSTB)
+
+ self.ra_monitor(self.HOSTA, self.OCF_SUCCESS)
+ self.ra_monitor(self.HOSTB, self.OCF_SUCCESS)
+
+ self.log.info("Promoting")
+ self.ra_promote(self.HOSTA)
+ self.ra_notify_start(self.HOSTA)
+
+ while self.get_master_score(self.HOSTB) != 100:
+ self.ra_monitor(self.HOSTA, self.OCF_RUNNING_MASTER)
+ self.ra_monitor(self.HOSTB, self.OCF_SUCCESS)
+ time.sleep(1)
+
+ if do_ssh_ping:
+ self.ssh_proc = self.ssh_open()
+
+ primary = self.HOSTA
+ secondary = self.HOSTB
+
+ while True:
+ self.log.info("Secondary failover")
+ self.kill_qemu_pre(primary, hang_qemu)
+ self.ra_notify_stop(secondary)
+ self.ra_monitor(secondary, self.OCF_SUCCESS)
+ self.ra_promote(secondary)
+ self.ra_monitor(secondary, self.OCF_RUNNING_MASTER)
+ self.kill_qemu_post(primary, hang_qemu)
+ if do_ssh_ping:
+ self.ssh_ping(self.ssh_proc)
+ tmp = primary
+ primary = secondary
+ secondary = tmp
+
+ self.log.info("Secondary continue replication")
+ self.ra_start(secondary)
+ self.ra_notify_start(primary)
+ if do_ssh_ping:
+ self.ssh_ping(self.ssh_proc)
+
+ # Wait for resync
+ while self.get_master_score(secondary) != 100:
+ self.ra_monitor(primary, self.OCF_RUNNING_MASTER)
+ self.ra_monitor(secondary, self.OCF_SUCCESS)
+ time.sleep(1)
+ if do_ssh_ping:
+ self.ssh_ping(self.ssh_proc)
+
+ self.log.info("Primary failover")
+ self.kill_qemu_pre(secondary, hang_qemu)
+ self.ra_monitor(primary, self.OCF_RUNNING_MASTER)
+ self.ra_notify_stop(primary)
+ self.ra_monitor(primary, self.OCF_RUNNING_MASTER)
+ self.kill_qemu_post(secondary, hang_qemu)
+ if do_ssh_ping:
+ self.ssh_ping(self.ssh_proc)
+
+ self.log.info("Primary continue replication")
+ self.ra_start(secondary)
+ self.ra_notify_start(primary)
+ if do_ssh_ping:
+ self.ssh_ping(self.ssh_proc)
+
+ # Wait for resync
+ while self.get_master_score(secondary) != 100:
+ self.ra_monitor(primary, self.OCF_RUNNING_MASTER)
+ self.ra_monitor(secondary, self.OCF_SUCCESS)
+ time.sleep(1)
+ if do_ssh_ping:
+ self.ssh_ping(self.ssh_proc)
+
+ if not loop:
+ break
+
+ if do_ssh_ping:
+ self.ssh_close(self.ssh_proc)
+
+ self.ra_stop(self.HOSTA)
+ self.ra_stop(self.HOSTB)
+
+ self.ra_monitor(self.HOSTA, self.OCF_NOT_RUNNING)
+ self.ra_monitor(self.HOSTB, self.OCF_NOT_RUNNING)
+ self.log.info("all ok")
+
+ def test_colo_peer_crashing(self):
+ """
+ :avocado: tags=colo
+ :avocado: tags=arch:x86_64
+ """
+ self.log.info("Testing with peer qemu crashing")
+ self._test_colo()
+
+ def test_colo_peer_hanging(self):
+ """
+ :avocado: tags=colo
+ :avocado: tags=arch:x86_64
+ """
+ self.log.info("Testing with peer qemu hanging")
+ self._test_colo(hang_qemu=True)
--
2.20.1
^ permalink raw reply related [flat|nested] 13+ messages in thread