#! /usr/bin/python # -*- coding: utf-8 -*- # # lsdrv - Report on a system's disk interfaces and how they are used. # # Copyright (C) 2011 Philip J. Turmel # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 2. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. import os, io, re from subprocess import Popen, PIPE #------------------- # Handy base for objects as "bags of properties" # inspired by Peter Norvig http://norvig.com/python-iaq.html # Unlike the original, this one supplies 'None' instead of an attribute # error when an explicitly named property has not yet been set. class Struct(object): def __init__(self, **entries): self.__dict__.update(entries) def __repr__(self, recurse=[]): if self in recurse: return type(self) args = [] for (k,v) in vars(self).items(): if isinstance(v, Struct): args.append('%s=%s' % (k, v.__repr__(recurse+[self]))) else: args.append('%s=%s' % (k, repr(v))) return '%s(%s)' % (type(self), ', '.join(args)) def clone(self): return type(self)(**self.__dict__) def __getattr__(self, attr): return None #------------------- # Spawn an executable and collect its output. Equivalent to the # check_output convenience function of the subprocess module introduced # in python 2.7. def runx(*args, **kwargs): kwargs['stdout'] = PIPE kwargs['stderr'] = PIPE sub = Popen(*args, **kwargs) out, err = sub.communicate() return out #------------------- # Extract a matched expression from a string buffer # If a match is found, return the given replace expression. re1 = re.compile(r'([/:][a-zA-Z]*)0+([0-9])') re2 = re.compile(r'Serial.+\'(.+)\'') re3 = re.compile(r'Serial.+:(.+)') def extractre(regexp, buffer, retexp=r'\1'): mo = re.search(regexp, buffer) if mo: return mo.expand(retexp) return None #------------------- # Extract shell variable assignments from a multiline string buffer # This simple implementation returns everything after the equals sign # as the value, including any quotes. varsre = re.compile(r'^\s*([a-zA-Z][a-zA-Z0-9_]*)\s*=(.+)$', re.MULTILINE) def extractvars(buffer): vars=dict() for mo in varsre.finditer(buffer): vars[mo.group(1)] = mo.group(2) return vars #------------------- # By Seo Sanghyeon. Some changes by Connelly Barnes and Phil Turmel. def try_int(s): try: return int(s) except: return s natsortre = re.compile(r'(\d+|\D+)') def natsort_key(s): if isinstance(s, str): return map(try_int, natsortre.findall(s)) else: try: return tuple([natsort_keys(x) for x in s]) except: return s def natcmp(a, b): return cmp(natsort_key(a), natsort_key(b)) #------------------- # Convert device sizes expressed in kibibytes into human-readable # sizes with a reasonable power-of-two suffix. def k2size(k): if k<1000: return "%4.2fk" % k m=k/1024.0 if m<1000: return "%4.2fm" % m g=m/1024.0 if g<1000: return "%4.2fg" % g t=g/1024.0 if t<1000: return "%4.2ft" % t p=t/1024.0 return "%4.2fp" % p #------------------- # Convert device sizes expressed as 512-byte sectors into human-readable # sizes with a reasonable power-of-two suffix. def sect2size(sectors): return k2size(int(sectors)/2.0) #------------------- # Given a sysfs path to the parent of a physical block device, resolve the # controller path, look it up in the list of known controllers, and return # the corresponding struct object. If it's not present in the list, create # the struct object w/ filled in details. controllers=dict() def probe_controller(cpathlink): cpath = os.path.realpath(cpathlink) if cpath in controllers: return controllers[cpath] while cpath and not os.path.exists(cpath+'/driver'): cpath = os.path.dirname(cpath) if cpath in controllers: return controllers[cpath] if not cpath: return None cntrlr = Struct(cpath=cpath, units=dict(), abbrev=re1.sub(r'\1\2', cpath[12:]), driver = os.path.realpath(cpath+'/driver').rsplit('/',1)[-1], modpre = io.FileIO(cpath+'/modalias').read().split("\n",1)[0].split(':',1)[0]) if cntrlr.modpre == 'pci': cntrlr.description = runx(['lspci', '-s', cntrlr.abbrev.rsplit('/',1)[-1]]).split("\n",1)[0] cntrlr.descriptors = ['PCI', '[%s]' % cntrlr.driver, cntrlr.description] elif cntrlr.modpre == 'usb': if os.path.exists(cpath+'/busnum'): cntrlr.busnum = io.FileIO(cpath+'/busnum').read().split("\n",1)[0] cntrlr.devnum = io.FileIO(cpath+'/devnum').read().split("\n",1)[0] cntrlr.serial = io.FileIO(cpath+'/serial').read().split("\n",1)[0] else: parentpath = os.path.dirname(cpath) cntrlr.busnum = io.FileIO(parentpath+'/busnum').read().split("\n",1)[0] cntrlr.devnum = io.FileIO(parentpath+'/devnum').read().split("\n",1)[0] cntrlr.serial = io.FileIO(parentpath+'/serial').read().split("\n",1)[0] cntrlr.description = runx(['lsusb', '-s', cntrlr.busnum+':'+cntrlr.devnum]).split("\n",1)[0] cntrlr.descriptors = ['USB', '[%s]' % cntrlr.driver, cntrlr.description, '{%s}' % cntrlr.serial] else: cntrlr.descriptors = ['Controller %s' % cntrlr.abbrev[1:], '[%s]' % cntrlr.driver, cntrlr.description] controllers[cpath] = cntrlr return cntrlr #------------------- # Given a link to a physical block device syspath, resolve the real device # path, look it up in the list of known physical devices, and return # the corresponding struct object. If it's not present in the list, # create the struct object w/ filled in details, and probe its # controller. phydevs=dict() def probe_device(devpathlink, nodestr): devpath = os.path.realpath(devpathlink) if devpath in phydevs: return phydevs[devpath] phy = Struct(dpath=devpath, node=nodestr, vendor=io.FileIO(devpath+'/vendor').read().split("\n",1)[0].strip(), model=io.FileIO(devpath+'/model').read().split("\n",1)[0].strip()) if os.path.exists(devpath+'/unique_id'): phy.serial = io.FileIO(devpath+'/unique_id').read().split("\n",1)[0].strip() if not phy.serial: phy.serial = extractre(re2, runx(['sginfo', '-s', '/dev/block/'+nodestr])) if not phy.serial: phy.serial = extractre(re3, runx(['smartctl', '-i', '/dev/block/'+nodestr])) phy.name = "%s %s" % (os.path.realpath(devpath+'/subsystem').rsplit('/',1)[-1], devpath.rsplit('/',1)[-1]) phy.controller = probe_controller(os.path.dirname(devpath)) if phy.controller: phy.controller.units[phy.name] = phy phydevs[devpath] = phy return phy #------------------- # Collect block device information and create dictionaries by kernel # name and by device major:minor. Probe each block device and try to # describe the filesystem or other usage. blockbyname=dict() blockbynode=dict() sysclassblock="/sys/class/block/" for x in os.listdir(sysclassblock): nodestr=io.FileIO(sysclassblock+x+'/dev').read().split("\n")[0] sizestr=sect2size(io.FileIO(sysclassblock+x+'/size').read().split("\n")[0]) node = nodestr.split(':',1) dev=Struct(name=x, node=nodestr, size=sizestr, major=int(node[0]), minor=int(node[1]), shown=False) if os.path.exists(sysclassblock+x+'/device'): dev.phy = probe_device(sysclassblock+x+'/device', nodestr) if dev.phy: dev.phy.block = dev if os.path.exists(sysclassblock+x+'/holders'): dev.holders = os.listdir(sysclassblock+x+'/holders') else: dev.holders = [] if os.path.exists(sysclassblock+x+'/slaves'): dev.slaves = os.listdir(sysclassblock+x+'/slaves') else: dev.slaves = [] dev.partitions = [y for y in os.listdir(sysclassblock+x) if os.path.exists(sysclassblock+x+'/'+y+'/dev')] dev.__dict__.update(extractvars(runx(['blkid', '-p', '-o', 'udev', '/dev/block/'+nodestr]))) if os.path.exists(sysclassblock+x+'/md'): dev.isMD = True dev.__dict__.update(extractvars(runx(['mdadm', '--export', '--detail', '/dev/block/'+nodestr]))) if dev.ID_FS_TYPE == 'linux_raid_member': dev.hasMD = True dev.__dict__.update(extractvars(runx(['mdadm', '--export', '--examine', '/dev/block/'+nodestr]))) if dev.holders: mddir=sysclassblock+x+'/holders/'+dev.holders[0]+'/md/' dev.MD_array_state = io.FileIO(mddir+'array_state').read().split("\n")[0] dev.MD_array_size = io.FileIO(mddir+'array_size').read().split("\n")[0] dev.MD_slot = io.FileIO(mddir+'dev-'+x+'/slot').read().split("\n")[0] dev.MD_state = io.FileIO(mddir+'dev-'+x+'/state').read().split("\n")[0] dev.FS = "MD %s (%s/%s) %s %s %s %s" % (dev.MD_LEVEL, dev.MD_slot, int(dev.MD_DEVICES), dev.size, dev.holders[0], dev.MD_array_state, dev.MD_state) else: dev.FS = "MD %s (%s) %s inactive" % (dev.MD_LEVEL, dev.MD_DEVICES, dev.size) elif dev.ID_FS_TYPE and dev.ID_FS_TYPE[0:3] == 'LVM': # Placeholder string for inactive physical volumes. It'll be # overwritten when active PVs are scanned. dev.FS = "PV %s (inactive)" % dev.ID_FS_TYPE elif dev.ID_PART_TABLE_TYPE: dev.FS = "Partitioned (%s) %s" % (dev.ID_PART_TABLE_TYPE, dev.size) elif dev.ID_FS_TYPE: dev.FS = "(%s) %s" % (dev.ID_FS_TYPE, dev.size) else: dev.FS = "Empty/Unknown %s" % dev.size if dev.ID_FS_LABEL: dev.FS += " '%s'" % dev.ID_FS_LABEL if dev.ID_FS_UUID: dev.FS += " {%s}" % dev.ID_FS_UUID blockbyname[x] = dev blockbynode[nodestr] = dev #------------------- # Collect information on mounted file systems and annotate the # corresponding block device. Use the block device's major:minor node # numbers, as the mount list often shows symlinks. for x in io.FileIO('/proc/mounts').readlines(): if x[0:5] == '/dev/': mdev, mnt = tuple(x.split(' ', 2)[0:2]) devstat = os.stat(mdev) nodestr="%d:%d" % (os.major(devstat.st_rdev), os.minor(devstat.st_rdev)) if nodestr in blockbynode: mntstat = os.statvfs(mnt) dev = blockbynode[nodestr] dev.mountdev = mdev dev.mountpoint = mnt dev.mountinfo = mntstat #------------------- # Collect information on LVM volumes and groups and annotate the # corresponding block device. Use the block device's major:minor node # numbers, as the mount list often shows symlinks. vgroups = dict() for x in runx(['pvs', '-o', 'pv_name,pv_used,pv_size,pv_uuid,vg_name,vg_size,vg_free,vg_uuid', '--noheadings', '--separator', ' ']).split("\n"): if x: pv_name, pv_used, pv_size, pv_uuid, vg_name, vg_size, vg_free, vg_uuid = tuple(x.strip().split(' ',7)) devstat = os.stat(pv_name) nodestr="%d:%d" % (os.major(devstat.st_rdev), os.minor(devstat.st_rdev)) if nodestr in blockbynode: dev = blockbynode[nodestr] dev.vg_name = vg_name if not dev.hasLVM: dev.hasLVM = True dev.pv_used = pv_used dev.pv_size = pv_size dev.pv_uuid = pv_uuid dev.FS = "PV %s %s/%s VG %s %s {%s}" % (dev.ID_FS_TYPE, pv_used, pv_size, vg_name, vg_size, pv_uuid) if vg_name in vgroups: vgroups[vg_name].PVs += [dev] else: vgroups[vg_name] = Struct(name=vg_name, size=vg_size, free=vg_free, uuid=vg_uuid, LVs=[], PVs=[dev]) for x in runx(['lvs', '-o', 'vg_name,lv_name,lv_path', '--noheadings', '--separator', ' ']).split("\n"): if x: vg_name, lv_name, lv_path = tuple(x.strip().split(' ',2)) devstat = os.stat(lv_path) nodestr="%d:%d" % (os.major(devstat.st_rdev), os.minor(devstat.st_rdev)) if nodestr in blockbynode: dev = blockbynode[nodestr] dev.isLVM = True dev.vg_name = vg_name dev.__dict__.update(extractvars(runx(['lvs', '--rows', '-o', 'all', '--nameprefixes', '--noheadings', '--unquoted', lv_path]))) if vg_name in vgroups: vgroups[vg_name].LVs += [dev] else: vgroups[vg_name] = Struct(name=vg_name, LVs=[dev], PVs=[]) def show_vgroup(indent, vg): if vg.shown: return print "%s └─Volume Group %s (%s) %s free {%s}" % (indent, vg.name, ','.join([dev.name for dev in vg.PVs]), vg.free, vg.uuid) show_blocks(indent+" ", vg.LVs) vg.shown = True #------------------- # Given an indent level and list of block device names, recursively describe # them. continuation = ('│', '├') corner = (' ', '└') def show_blocks(indent, blocks): blocks = [x for x in blocks if not x.shown] for blk in blocks: if blk == blocks[-1]: branch=corner else: branch=continuation print "%s %s─%s: %s" % (indent, branch[1], blk.name, blk.FS) if blk.mountpoint: print "%s %s └─Mounted as %s @ %s" % (indent, branch[0], blk.mountdev, blk.mountpoint) elif blk.hasLVM: show_vgroup(indent+" ", vgroups[blk.vg_name]) else: subs = blk.partitions + blk.holders subs.sort(natcmp) if subs: show_blocks("%s %s " % (indent, branch[0]), [blockbyname[x] for x in subs]) blk.shown = True #------------------- # Collect SCSI host / controller pairs from sysfs and create an ordered tree. Skip # hosts that have targets, as they will already be in the list. Add empty physical # device entries for hosts without targets. scsidir = "/sys/bus/scsi/devices/" scsilist = os.listdir(scsidir) hosts = dict([(int(x[4:]), Struct(n=int(x[4:]), cpath=os.path.dirname(os.path.realpath(scsidir+x)), hpath='/'+x)) for x in scsilist if x[0:4]=='host']) for n, host in hosts.items(): cntrlr = probe_controller(host.cpath) if cntrlr : targets = [x for x in os.listdir(host.cpath+host.hpath) if x[0:6]=='target'] if not targets: phy = Struct(name='scsi %d:x:x:x [Empty]' % host.n) cntrlr.units[phy.name] = phy for cntrlr in controllers.values(): cntrlr.unitlist = cntrlr.units.keys() if cntrlr.unitlist: cntrlr.unitlist.sort(natcmp) cntrlr.first = cntrlr.unitlist[0] else: cntrlr.first = '' tree=[(cntrlr.first, cntrlr) for cntrlr in controllers.values()] tree.sort(natcmp) for f, cntrlr in tree: print " ".join(cntrlr.descriptors) if cntrlr.unitlist: cntrlr.units[cntrlr.unitlist[-1]].last = True branch = continuation for key in cntrlr.unitlist: phy = cntrlr.units[key] if phy.last: branch = corner unitdetail = phy.name if phy.vendor: unitdetail += ' '+phy.vendor if phy.model: unitdetail += ' '+phy.model if phy.serial: unitdetail += " {%s}" % phy.serial.strip() print ' %s─%s' % (branch[1], unitdetail) if phy.block: show_blocks(" %s " % branch[0], [phy.block]) unshown = [z.name for z in blockbynode.values() if z.size != '0.00k' and not z.shown] unshown.sort(natcmp) if unshown: print "Other Block Devices" show_blocks("", [blockbyname[x] for x in unshown])