xen-devel.lists.xenproject.org archive mirror
 help / color / mirror / Atom feed
From: Brendan Cully <brendan@cs.ubc.ca>
To: xen-devel@lists.xensource.com
Subject: [PATCH 3 of 7] Remus: move device handling into its own module
Date: Mon, 03 May 2010 11:53:50 -0700	[thread overview]
Message-ID: <35a9de6bca43769acf76.1272912830@kremvax.cs.ubc.ca> (raw)
In-Reply-To: <patchbomb.1272912827@kremvax.cs.ubc.ca>

[-- Attachment #1: Type: text/plain, Size: 51 bytes --]

Signed-off-by: Brendan Cully <brendan@cs.ubc.ca>



[-- Attachment #2: xen-4.0-3.patch --]
[-- Type: text/x-patch, Size: 13377 bytes --]

# HG changeset patch
# User Brendan Cully <brendan@cs.ubc.ca>
# Date 1272694699 25200
# Node ID 35a9de6bca43769acf76cbd3a4a1f8c4f245da4b
# Parent  f0786e8c7fd7b56f0c5712649fe52870d84488de
Remus: move device handling into its own module

Signed-off-by: Brendan Cully <brendan@cs.ubc.ca>

diff --git a/tools/remus/remus b/tools/python/xen/remus/device.py
copy from tools/remus/remus
copy to tools/python/xen/remus/device.py
--- a/tools/remus/remus
+++ b/tools/python/xen/remus/device.py
@@ -1,66 +1,12 @@
-#!/usr/bin/env python
+# Remus device interface
 #
-# This is a save process which also buffers outgoing I/O between
-# rounds, so that external viewers never see anything that hasn't
-# been committed at the backup
-#
-# TODO: fencing.
+# Coordinates with devices at suspend, resume, and commit hooks
 
-import optparse, os, re, select, signal, sys, time
-from xen.remus import save, vm
-from xen.xend import XendOptions
-from xen.remus import netlink, qdisc, util
+import os
 
-class CfgException(Exception): pass
+import netlink, qdisc, util
 
-class Cfg(object):
-    def __init__(self):
-        # must be set
-        self.domid = 0
-
-        self.host = 'localhost'
-        self.port = XendOptions.instance().get_xend_relocation_port()
-        self.interval = 200
-        self.netbuffer = True
-        self.timer = False
-
-        parser = optparse.OptionParser()
-        parser.usage = '%prog [options] domain [destination]'
-        parser.add_option('-i', '--interval', dest='interval', type='int',
-                          metavar='MS',
-                          help='checkpoint every MS milliseconds')
-        parser.add_option('-p', '--port', dest='port', type='int',
-                          help='send stream to port PORT', metavar='PORT')
-        parser.add_option('', '--no-net', dest='nonet', action='store_true',
-                          help='run without net buffering (benchmark option)')
-        parser.add_option('', '--timer', dest='timer', action='store_true',
-                          help='force pause at checkpoint interval (experimental)')
-        self.parser = parser
-
-    def usage(self):
-        self.parser.print_help()
-
-    def getargs(self):
-        opts, args = self.parser.parse_args()
-
-        if opts.interval:
-            self.interval = opts.interval
-        if opts.port:
-            self.port = opts.port
-        if opts.nonet:
-            self.netbuffer = False
-        if opts.timer:
-            self.timer = True
-
-        if not args:
-            raise CfgException('Missing domain')
-        self.domid = args[0]
-        if (len(args) > 1):
-            self.host = args[1]
-
-class ReplicatedDiskException(Exception): pass
-
-class BufferedDevice(object):
+class CheckpointedDevice(object):
     'Base class for buffered devices'
 
     def postsuspend(self):
@@ -75,7 +21,9 @@
         'called when backup has acknowledged checkpoint reception'
         pass
 
-class ReplicatedDisk(BufferedDevice):
+class ReplicatedDiskException(Exception): pass
+
+class ReplicatedDisk(CheckpointedDevice):
     """
     Send a checkpoint message to a replicated disk while the domain
     is paused between epochs.
@@ -114,9 +62,9 @@
         if msg != 'done':
             print 'Unknown message: %s' % msg
 
-class NetbufferException(Exception): pass
+class BufferedNICException(Exception): pass
 
-class Netbuffer(BufferedDevice):
+class BufferedNIC(CheckpointedDevice):
     """
     Buffer a protected domain's network output between rounds so that
     nothing is issued that a failover might not know about.
@@ -133,7 +81,7 @@
         self.devname = self._startimq(domid)
         dev = self.rth.getlink(self.devname)
         if not dev:
-            raise NetbufferException('could not find device %s' % self.devname)
+            raise BufferedNICException('could not find device %s' % self.devname)
         self.dev = dev['index']
         self.handle = qdisc.TC_H_ROOT
         self.q = qdisc.QueueQdisc()
@@ -164,8 +112,8 @@
                 self.installed = True
                 return
             if q['kind'] != 'pfifo_fast':
-                raise NetbufferException('there is already a queueing '
-                                         'discipline on %s' % self.devname)
+                raise BufferedNICException('there is already a queueing '
+                                           'discipline on %s' % self.devname)
 
         print 'installing buffer on %s' % self.devname
         req = qdisc.addrequest(self.dev, self.handle, self.q)
@@ -190,155 +138,3 @@
         util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev))
 
         return imqdev
-
-class SignalException(Exception): pass
-
-def run(cfg):
-    closure = lambda: None
-    closure.cmd = None
-
-    def sigexception(signo, frame):
-        raise SignalException(signo)
-
-    def die():
-        # I am not sure what the best way to die is. xm destroy is another option,
-        # or we could attempt to trigger some instant reboot.
-        print "dying..."
-        print util.runcmd(['sudo', 'ifdown', 'eth2'])
-        # dangling imq0 handle on vif locks up the system
-        for buf in bufs:
-            buf.uninstall()
-        print util.runcmd(['sudo', 'xm', 'destroy', cfg.domid])
-        print util.runcmd(['sudo', 'ifup', 'eth2'])
-
-    def getcommand():
-        """Get a command to execute while running.
-        Commands include:
-          s: die prior to postsuspend hook
-          s2: die after postsuspend hook
-          r: die prior to preresume hook
-          r2: die after preresume hook
-          c: die prior to commit hook
-          c2: die after commit hook
-          """
-        r, w, x = select.select([sys.stdin], [], [], 0)
-        if sys.stdin not in r:
-            return
-
-        cmd = sys.stdin.readline().strip()
-        if cmd not in ('s', 's2', 'r', 'r2', 'c', 'c2'):
-            print "unknown command: %s" % cmd
-        closure.cmd = cmd
-
-    signal.signal(signal.SIGTERM, sigexception)
-
-    dom = vm.VM(cfg.domid)
-
-    # set up I/O buffers
-    bufs = []
-
-    # disks must commit before network can be released
-    for disk in dom.disks:
-        try:
-            bufs.append(ReplicatedDisk(disk))
-        except ReplicatedDiskException, e:
-            print e
-            continue
-
-    if cfg.netbuffer:
-        for vif in dom.vifs:
-            bufs.append(Netbuffer(dom.domid))
-
-    fd = save.MigrationSocket((cfg.host, cfg.port))
-
-    def postsuspend():
-        'Begin external checkpointing after domain has paused'
-        if not cfg.timer:
-            # when not using a timer thread, sleep until now + interval
-            closure.starttime = time.time()
-
-        if closure.cmd == 's':
-            die()
-
-        for buf in bufs:
-            buf.postsuspend()
-
-        if closure.cmd == 's2':
-            die()
-
-    def preresume():
-        'Complete external checkpointing before domain resumes'
-        if closure.cmd == 'r':
-            die()
-
-        for buf in bufs:
-            buf.preresume()
-
-        if closure.cmd == 'r2':
-            die()
-
-    def commit():
-        'commit network buffer'
-        if closure.cmd == 'c':
-            die()
-
-        print >> sys.stderr, "PROF: flushed memory at %0.6f" % (time.time())
-
-        for buf in bufs:
-            buf.commit()
-
-        if closure.cmd == 'c2':
-            die()
-
-        # Since the domain is running at this point, it's a good time to
-        # check for control channel commands
-        getcommand()
-
-        if not cfg.timer:
-            endtime = time.time()
-            elapsed = (endtime - closure.starttime) * 1000
-
-            if elapsed < cfg.interval:
-                time.sleep((cfg.interval - elapsed) / 1000.0)
-
-        # False ends checkpointing
-        return True
-
-    if cfg.timer:
-        interval = cfg.interval
-    else:
-        interval = 0
-
-    rc = 0
-
-    checkpointer = save.Saver(cfg.domid, fd, postsuspend, preresume, commit,
-                              interval)
-
-    try:
-        checkpointer.start()
-    except save.CheckpointError, e:
-        print e
-        rc = 1
-    except KeyboardInterrupt:
-        pass
-    except SignalException:
-        print '*** signalled ***'
-
-    for buf in bufs:
-        buf.uninstall()
-
-    sys.exit(rc)
-
-cfg = Cfg()
-try:
-    cfg.getargs()
-except CfgException, inst:
-    print str(inst)
-    cfg.usage()
-    sys.exit(1)
-
-try:
-    run(cfg)
-except vm.VMException, inst:
-    print str(inst)
-    sys.exit(1)
diff --git a/tools/remus/remus b/tools/remus/remus
--- a/tools/remus/remus
+++ b/tools/remus/remus
@@ -7,9 +7,10 @@
 # TODO: fencing.
 
 import optparse, os, re, select, signal, sys, time
-from xen.remus import save, vm
+
+from xen.remus import save, util, vm
+from xen.remus.device import ReplicatedDisk, BufferedNIC
 from xen.xend import XendOptions
-from xen.remus import netlink, qdisc, util
 
 class CfgException(Exception): pass
 
@@ -58,139 +59,6 @@
         if (len(args) > 1):
             self.host = args[1]
 
-class ReplicatedDiskException(Exception): pass
-
-class BufferedDevice(object):
-    'Base class for buffered devices'
-
-    def postsuspend(self):
-        'called after guest has suspended'
-        pass
-
-    def preresume(self):
-        'called before guest resumes'
-        pass
-
-    def commit(self):
-        'called when backup has acknowledged checkpoint reception'
-        pass
-
-class ReplicatedDisk(BufferedDevice):
-    """
-    Send a checkpoint message to a replicated disk while the domain
-    is paused between epochs.
-    """
-    FIFODIR = '/var/run/tap'
-
-    def __init__(self, disk):
-        # look up disk, make sure it is tap:buffer, and set up socket
-        # to request commits.
-        self.ctlfd = None
-
-        if not disk.uname.startswith('tap:remus:') and not disk.uname.startswith('tap:tapdisk:remus:'):
-            raise ReplicatedDiskException('Disk is not replicated: %s' %
-                                        str(disk))
-        fifo = re.match("tap:.*(remus.*)\|", disk.uname).group(1).replace(':', '_')
-        absfifo = os.path.join(self.FIFODIR, fifo)
-        absmsgfifo = absfifo + '.msg'
-
-        self.installed = False
-        self.ctlfd = open(absfifo, 'w+b')
-        self.msgfd = open(absmsgfifo, 'r+b')
-
-    def __del__(self):
-        self.uninstall()
-
-    def uninstall(self):
-        if self.ctlfd:
-            self.ctlfd.close()
-            self.ctlfd = None
-
-    def postsuspend(self):
-        os.write(self.ctlfd.fileno(), 'flush')
-
-    def commit(self):
-        msg = os.read(self.msgfd.fileno(), 4)
-        if msg != 'done':
-            print 'Unknown message: %s' % msg
-
-class NetbufferException(Exception): pass
-
-class Netbuffer(BufferedDevice):
-    """
-    Buffer a protected domain's network output between rounds so that
-    nothing is issued that a failover might not know about.
-    """
-    # shared rtnetlink handle
-    rth = None
-
-    def __init__(self, domid):
-        self.installed = False
-
-        if not self.rth:
-            self.rth = netlink.rtnl()
-
-        self.devname = self._startimq(domid)
-        dev = self.rth.getlink(self.devname)
-        if not dev:
-            raise NetbufferException('could not find device %s' % self.devname)
-        self.dev = dev['index']
-        self.handle = qdisc.TC_H_ROOT
-        self.q = qdisc.QueueQdisc()
-
-    def __del__(self):
-        self.uninstall()
-
-    def postsuspend(self):
-        if not self.installed:
-            self._setup()
-
-        self._sendqmsg(qdisc.TC_QUEUE_CHECKPOINT)
-
-    def commit(self):
-        '''Called when checkpoint has been acknowledged by
-        the backup'''
-        self._sendqmsg(qdisc.TC_QUEUE_RELEASE)
-
-    def _sendqmsg(self, action):
-        self.q.action = action
-        req = qdisc.changerequest(self.dev, self.handle, self.q)
-        self.rth.talk(req.pack())
-
-    def _setup(self):
-        q = self.rth.getqdisc(self.dev)
-        if q:
-            if q['kind'] == 'queue':
-                self.installed = True
-                return
-            if q['kind'] != 'pfifo_fast':
-                raise NetbufferException('there is already a queueing '
-                                         'discipline on %s' % self.devname)
-
-        print 'installing buffer on %s' % self.devname
-        req = qdisc.addrequest(self.dev, self.handle, self.q)
-        self.rth.talk(req.pack())
-        self.installed = True
-
-    def uninstall(self):
-        if self.installed:
-            req = qdisc.delrequest(self.dev, self.handle)
-            self.rth.talk(req.pack())
-            self.installed = False
-
-    def _startimq(self, domid):
-        # stopgap hack to set up IMQ for an interface. Wrong in many ways.
-        imqebt = '/usr/lib/xen/bin/imqebt'
-        imqdev = 'imq0'
-        vid = 'vif%d.0' % domid
-        for mod in ['sch_queue', 'imq', 'ebt_imq']:
-            util.runcmd(['modprobe', mod])
-        util.runcmd("ip link set %s up" % (imqdev))
-        util.runcmd("%s -F FORWARD" % (imqebt))
-        util.runcmd("%s -A FORWARD -i %s -j imq --todev %s" % (imqebt, vid, imqdev))
-
-        return imqdev
-
 class SignalException(Exception): pass
 
 def run(cfg):

[-- Attachment #3: Type: text/plain, Size: 138 bytes --]

_______________________________________________
Xen-devel mailing list
Xen-devel@lists.xensource.com
http://lists.xensource.com/xen-devel

  parent reply	other threads:[~2010-05-03 18:53 UTC|newest]

Thread overview: 11+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2010-05-03 18:53 [PATCH 0 of 7] Remus: pvops dom0 support Brendan Cully
2010-05-03 18:53 ` [PATCH 1 of 7] Remus: python netlink fixes Brendan Cully
2010-05-03 18:53 ` [PATCH 2 of 7] Remus: remove obsolete code Brendan Cully
2010-05-03 18:53 ` Brendan Cully [this message]
2010-05-03 18:53 ` [PATCH 4 of 7] Remus: fix VM stringification Brendan Cully
2010-05-03 19:21   ` Measuring the amount of memory read/written (basically touched) by a domU Yuvraj Agarwal
2010-05-03 19:32     ` Keir Fraser
2010-05-03 18:53 ` [PATCH 5 of 7] Remus: include device name in vif objects Brendan Cully
2010-05-03 18:53 ` [PATCH 6 of 7] Remus: add file locking and modprobe utility functions Brendan Cully
2010-05-03 18:53 ` [PATCH 7 of 7] Remus: use IFB for net buffer on newer kernels Brendan Cully
2010-05-03 20:05 ` [PATCH 0 of 7] Remus: pvops dom0 support Gilberto Nunes

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=35a9de6bca43769acf76.1272912830@kremvax.cs.ubc.ca \
    --to=brendan@cs.ubc.ca \
    --cc=xen-devel@lists.xensource.com \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).