From: Brendan Cully <brendan@cs.ubc.ca>
To: xen-devel@lists.xensource.com
Subject: [PATCH 3 of 7] Remus: move device handling into its own module
Date: Mon, 03 May 2010 11:53:50 -0700 [thread overview]
Message-ID: <35a9de6bca43769acf76.1272912830@kremvax.cs.ubc.ca> (raw)
In-Reply-To: <patchbomb.1272912827@kremvax.cs.ubc.ca>
[-- Attachment #1: Type: text/plain, Size: 51 bytes --]
Signed-off-by: Brendan Cully <brendan@cs.ubc.ca>
[-- Attachment #2: xen-4.0-3.patch --]
[-- Type: text/x-patch, Size: 13377 bytes --]
# HG changeset patch
# User Brendan Cully <brendan@cs.ubc.ca>
# Date 1272694699 25200
# Node ID 35a9de6bca43769acf76cbd3a4a1f8c4f245da4b
# Parent f0786e8c7fd7b56f0c5712649fe52870d84488de
Remus: move device handling into its own module
Signed-off-by: Brendan Cully <brendan@cs.ubc.ca>
diff --git a/tools/remus/remus b/tools/python/xen/remus/device.py
copy from tools/remus/remus
copy to tools/python/xen/remus/device.py
--- a/tools/remus/remus
+++ b/tools/python/xen/remus/device.py
@@ -1,66 +1,12 @@
-#!/usr/bin/env python
+# Remus device interface
#
-# This is a save process which also buffers outgoing I/O between
-# rounds, so that external viewers never see anything that hasn't
-# been committed at the backup
-#
-# TODO: fencing.
+# Coordinates with devices at suspend, resume, and commit hooks
-import optparse, os, re, select, signal, sys, time
-from xen.remus import save, vm
-from xen.xend import XendOptions
-from xen.remus import netlink, qdisc, util
+import os
-class CfgException(Exception): pass
+import netlink, qdisc, util
-class Cfg(object):
- def __init__(self):
- # must be set
- self.domid = 0
-
- self.host = 'localhost'
- self.port = XendOptions.instance().get_xend_relocation_port()
- self.interval = 200
- self.netbuffer = True
- self.timer = False
-
- parser = optparse.OptionParser()
- parser.usage = '%prog [options] domain [destination]'
- parser.add_option('-i', '--interval', dest='interval', type='int',
- metavar='MS',
- help='checkpoint every MS milliseconds')
- parser.add_option('-p', '--port', dest='port', type='int',
- help='send stream to port PORT', metavar='PORT')
- parser.add_option('', '--no-net', dest='nonet', action='store_true',
- help='run without net buffering (benchmark option)')
- parser.add_option('', '--timer', dest='timer', action='store_true',
- help='force pause at checkpoint interval (experimental)')
- self.parser = parser
-
- def usage(self):
- self.parser.print_help()
-
- def getargs(self):
- opts, args = self.parser.parse_args()
-
- if opts.interval:
- self.interval = opts.interval
- if opts.port:
- self.port = opts.port
- if opts.nonet:
- self.netbuffer = False
- if opts.timer:
- self.timer = True
-
- if not args:
- raise CfgException('Missing domain')
- self.domid = args[0]
- if (len(args) > 1):
- self.host = args[1]
-
-class ReplicatedDiskException(Exception): pass
-
-class BufferedDevice(object):
+class CheckpointedDevice(object):
'Base class for buffered devices'
def postsuspend(self):
@@ -75,7 +21,9 @@
'called when backup has acknowledged checkpoint reception'
pass
-class ReplicatedDisk(BufferedDevice):
+class ReplicatedDiskException(Exception): pass
+
+class ReplicatedDisk(CheckpointedDevice):
"""
Send a checkpoint message to a replicated disk while the domain
is paused between epochs.
@@ -114,9 +62,9 @@
if msg != 'done':
print 'Unknown message: %s' % msg
-class NetbufferException(Exception): pass
+class BufferedNICException(Exception): pass
-class Netbuffer(BufferedDevice):
+class BufferedNIC(CheckpointedDevice):
"""
Buffer a protected domain's network output between rounds so that
nothing is issued that a failover might not know about.
@@ -133,7 +81,7 @@
self.devname = self._startimq(domid)
dev = self.rth.getlink(self.devname)
if not dev:
- raise NetbufferException('could not find device %s' % self.devname)
+ raise BufferedNICException('could not find device %s' % self.devname)
self.dev = dev['index']
self.handle = qdisc.TC_H_ROOT
self.q = qdisc.QueueQdisc()
@@ -164,8 +112,8 @@
self.installed = True
return
if q['kind'] != 'pfifo_fast':
- raise NetbufferException('there is already a queueing '
- 'discipline on %s' % self.devname)
+ raise BufferedNICException('there is already a queueing '
+ 'discipline on %s' % self.devname)
print 'installing buffer on %s' % self.devname
req = qdisc.addrequest(self.dev, self.handle, self.q)
@@ -190,155 +138,3 @@
util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev))
return imqdev
-
-class SignalException(Exception): pass
-
-def run(cfg):
- closure = lambda: None
- closure.cmd = None
-
- def sigexception(signo, frame):
- raise SignalException(signo)
-
- def die():
- # I am not sure what the best way to die is. xm destroy is another option,
- # or we could attempt to trigger some instant reboot.
- print "dying..."
- print util.runcmd(['sudo', 'ifdown', 'eth2'])
- # dangling imq0 handle on vif locks up the system
- for buf in bufs:
- buf.uninstall()
- print util.runcmd(['sudo', 'xm', 'destroy', cfg.domid])
- print util.runcmd(['sudo', 'ifup', 'eth2'])
-
- def getcommand():
- """Get a command to execute while running.
- Commands include:
- s: die prior to postsuspend hook
- s2: die after postsuspend hook
- r: die prior to preresume hook
- r2: die after preresume hook
- c: die prior to commit hook
- c2: die after commit hook
- """
- r, w, x = select.select([sys.stdin], [], [], 0)
- if sys.stdin not in r:
- return
-
- cmd = sys.stdin.readline().strip()
- if cmd not in ('s', 's2', 'r', 'r2', 'c', 'c2'):
- print "unknown command: %s" % cmd
- closure.cmd = cmd
-
- signal.signal(signal.SIGTERM, sigexception)
-
- dom = vm.VM(cfg.domid)
-
- # set up I/O buffers
- bufs = []
-
- # disks must commit before network can be released
- for disk in dom.disks:
- try:
- bufs.append(ReplicatedDisk(disk))
- except ReplicatedDiskException, e:
- print e
- continue
-
- if cfg.netbuffer:
- for vif in dom.vifs:
- bufs.append(Netbuffer(dom.domid))
-
- fd = save.MigrationSocket((cfg.host, cfg.port))
-
- def postsuspend():
- 'Begin external checkpointing after domain has paused'
- if not cfg.timer:
- # when not using a timer thread, sleep until now + interval
- closure.starttime = time.time()
-
- if closure.cmd == 's':
- die()
-
- for buf in bufs:
- buf.postsuspend()
-
- if closure.cmd == 's2':
- die()
-
- def preresume():
- 'Complete external checkpointing before domain resumes'
- if closure.cmd == 'r':
- die()
-
- for buf in bufs:
- buf.preresume()
-
- if closure.cmd == 'r2':
- die()
-
- def commit():
- 'commit network buffer'
- if closure.cmd == 'c':
- die()
-
- print >> sys.stderr, "PROF: flushed memory at %0.6f" % (time.time())
-
- for buf in bufs:
- buf.commit()
-
- if closure.cmd == 'c2':
- die()
-
- # Since the domain is running at this point, it's a good time to
- # check for control channel commands
- getcommand()
-
- if not cfg.timer:
- endtime = time.time()
- elapsed = (endtime - closure.starttime) * 1000
-
- if elapsed < cfg.interval:
- time.sleep((cfg.interval - elapsed) / 1000.0)
-
- # False ends checkpointing
- return True
-
- if cfg.timer:
- interval = cfg.interval
- else:
- interval = 0
-
- rc = 0
-
- checkpointer = save.Saver(cfg.domid, fd, postsuspend, preresume, commit,
- interval)
-
- try:
- checkpointer.start()
- except save.CheckpointError, e:
- print e
- rc = 1
- except KeyboardInterrupt:
- pass
- except SignalException:
- print '*** signalled ***'
-
- for buf in bufs:
- buf.uninstall()
-
- sys.exit(rc)
-
-cfg = Cfg()
-try:
- cfg.getargs()
-except CfgException, inst:
- print str(inst)
- cfg.usage()
- sys.exit(1)
-
-try:
- run(cfg)
-except vm.VMException, inst:
- print str(inst)
- sys.exit(1)
diff --git a/tools/remus/remus b/tools/remus/remus
--- a/tools/remus/remus
+++ b/tools/remus/remus
@@ -7,9 +7,10 @@
# TODO: fencing.
import optparse, os, re, select, signal, sys, time
-from xen.remus import save, vm
+
+from xen.remus import save, util, vm
+from xen.remus.device import ReplicatedDisk, BufferedNIC
from xen.xend import XendOptions
-from xen.remus import netlink, qdisc, util
class CfgException(Exception): pass
@@ -58,139 +59,6 @@
if (len(args) > 1):
self.host = args[1]
-class ReplicatedDiskException(Exception): pass
-
-class BufferedDevice(object):
- 'Base class for buffered devices'
-
- def postsuspend(self):
- 'called after guest has suspended'
- pass
-
- def preresume(self):
- 'called before guest resumes'
- pass
-
- def commit(self):
- 'called when backup has acknowledged checkpoint reception'
- pass
-
-class ReplicatedDisk(BufferedDevice):
- """
- Send a checkpoint message to a replicated disk while the domain
- is paused between epochs.
- """
- FIFODIR = '/var/run/tap'
-
- def __init__(self, disk):
- # look up disk, make sure it is tap:buffer, and set up socket
- # to request commits.
- self.ctlfd = None
-
- if not disk.uname.startswith('tap:remus:') and not disk.uname.startswith('tap:tapdisk:remus:'):
- raise ReplicatedDiskException('Disk is not replicated: %s' %
- str(disk))
- fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', '_')
- absfifo = os.path.join(self.FIFODIR, fifo)
- absmsgfifo = absfifo + '.msg'
-
- self.installed = False
- self.ctlfd = open(absfifo, 'w+b')
- self.msgfd = open(absmsgfifo, 'r+b')
-
- def __del__(self):
- self.uninstall()
-
- def uninstall(self):
- if self.ctlfd:
- self.ctlfd.close()
- self.ctlfd = None
-
- def postsuspend(self):
- os.write(self.ctlfd.fileno(), 'flush')
-
- def commit(self):
- msg = os.read(self.msgfd.fileno(), 4)
- if msg != 'done':
- print 'Unknown message: %s' % msg
-
-class NetbufferException(Exception): pass
-
-class Netbuffer(BufferedDevice):
- """
- Buffer a protected domain's network output between rounds so that
- nothing is issued that a failover might not know about.
- """
- # shared rtnetlink handle
- rth = None
-
- def __init__(self, domid):
- self.installed = False
-
- if not self.rth:
- self.rth = netlink.rtnl()
-
- self.devname = self._startimq(domid)
- dev = self.rth.getlink(self.devname)
- if not dev:
- raise NetbufferException('could not find device %s' % self.devname)
- self.dev = dev['index']
- self.handle = qdisc.TC_H_ROOT
- self.q = qdisc.QueueQdisc()
-
- def __del__(self):
- self.uninstall()
-
- def postsuspend(self):
- if not self.installed:
- self._setup()
-
- self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT)
-
- def commit(self):
- '''Called when checkpoint has been acknowledged by
- the backup'''
- self._sendqmsg(qdisc.TC_QUEUE_RELEASE)
-
- def _sendqmsg(self, action):
- self.q.action = action
- req = qdisc.changerequest(self.dev, self.handle, self.q)
- self.rth.talk(req.pack())
-
- def _setup(self):
- q = self.rth.getqdisc(self.dev)
- if q:
- if q['kind'] == 'queue':
- self.installed = True
- return
- if q['kind'] != 'pfifo_fast':
- raise NetbufferException('there is already a queueing '
- 'discipline on %s' % self.devname)
-
- print 'installing buffer on %s' % self.devname
- req = qdisc.addrequest(self.dev, self.handle, self.q)
- self.rth.talk(req.pack())
- self.installed = True
-
- def uninstall(self):
- if self.installed:
- req = qdisc.delrequest(self.dev, self.handle)
- self.rth.talk(req.pack())
- self.installed = False
-
- def _startimq(self, domid):
- # stopgap hack to set up IMQ for an interface. Wrong in many ways.
- imqebt = '/usr/lib/xen/bin/imqebt'
- imqdev = 'imq0'
- vid = 'vif%d.0' % domid
- for mod in ['sch_queue', 'imq', 'ebt_imq']:
- util.runcmd(['modprobe', mod])
- util.runcmd("ip link set %s up" % (imqdev))
- util.runcmd("%s -F FORWARD" % (imqebt))
- util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev))
-
- return imqdev
-
class SignalException(Exception): pass
def run(cfg):
[-- Attachment #3: Type: text/plain, Size: 138 bytes --]
_______________________________________________
Xen-devel mailing list
Xen-devel@lists.xensource.com
http://lists.xensource.com/xen-devel
next prev parent reply other threads:[~2010-05-03 18:53 UTC|newest]
Thread overview: 11+ messages / expand[flat|nested] mbox.gz Atom feed top
2010-05-03 18:53 [PATCH 0 of 7] Remus: pvops dom0 support Brendan Cully
2010-05-03 18:53 ` [PATCH 1 of 7] Remus: python netlink fixes Brendan Cully
2010-05-03 18:53 ` [PATCH 2 of 7] Remus: remove obsolete code Brendan Cully
2010-05-03 18:53 ` Brendan Cully [this message]
2010-05-03 18:53 ` [PATCH 4 of 7] Remus: fix VM stringification Brendan Cully
2010-05-03 19:21 ` Measuring the amount of memory read/written (basically touched) by a domU Yuvraj Agarwal
2010-05-03 19:32 ` Keir Fraser
2010-05-03 18:53 ` [PATCH 5 of 7] Remus: include device name in vif objects Brendan Cully
2010-05-03 18:53 ` [PATCH 6 of 7] Remus: add file locking and modprobe utility functions Brendan Cully
2010-05-03 18:53 ` [PATCH 7 of 7] Remus: use IFB for net buffer on newer kernels Brendan Cully
2010-05-03 20:05 ` [PATCH 0 of 7] Remus: pvops dom0 support Gilberto Nunes
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=35a9de6bca43769acf76.1272912830@kremvax.cs.ubc.ca \
--to=brendan@cs.ubc.ca \
--cc=xen-devel@lists.xensource.com \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).