File halmount.py of Package ivman
#!/usr/bin/python
# hal mount script
# GPL, by Ludwig Nussel <lnussel@suse.de>
# TODO:
# - use volume.crypto_luks.clear.backing_volume to detach luks partition on
# umount or at least display it somehow
import sys
import subprocess
from os import getuid, getpid
from optparse import OptionParser
def err(arg):
if arg[-1] != "\n": arg += "\n"
sys.stderr.write(arg)
try:
import dbus
except:
err("you need to install dbus-1-python")
sys.exit(1)
class Dev:
def __init__(self, bus, udi):
self.bus = bus
self.udi = udi
self.update()
def update(self):
obj = self.bus.get_object("org.freedesktop.Hal", self.udi)
props = obj.GetAllProperties(dbus_interface="org.freedesktop.Hal.Device")
try:
self.device = props["block.device"]
except:
self.device = ""
try:
self.mountpoint = props["volume.mount_point"]
except:
self.mountpoint = ""
try:
self.label = props["volume.label"]
except:
self.label = ""
try:
self.fstype = props["volume.fstype"]
except:
self.fstype = ""
try:
self.is_mounted = props["volume.is_mounted"]
except:
self.is_mounted = False
self.has_uid = False
self.options = ""
try:
l = props["volume.mount.valid_options"]
for i in l:
if i == "uid=": self.has_uid = True
if self.options != "":
self.options += ","
self.options += i
except:
pass
class Mount:
def __init__(self):
self.mountable = []
self.verbose = False
self.bus = dbus.SystemBus()
self.hal_obj = self.bus.get_object("org.freedesktop.Hal",
"/org/freedesktop/Hal/Manager")
self.hal = dbus.Interface(self.hal_obj,
"org.freedesktop.Hal.Manager")
devices = self.hal.GetAllDevices()
for udi in devices:
obj = self.bus.get_object("org.freedesktop.Hal", udi)
if obj.PropertyExists("volume.fsusage", dbus_interface="org.freedesktop.Hal.Device"):
fsusage = obj.GetProperty("volume.fsusage", dbus_interface="org.freedesktop.Hal.Device")
if fsusage == "filesystem" or fsusage == "crypto":
mdev = Dev(self.bus, udi)
self.mountable.append(mdev)
def listudi(self):
for dev in self.mountable:
print dev.device, dev.udi
def list(self):
for dev in self.mountable:
if dev.fstype == "crypto_LUKS":
print dev.device, "encrypted"
continue
if dev.is_mounted == True:
print dev.device, "on", dev.mountpoint,
else:
print dev.device, "->",
# python throws an exception when trying to
# mount a label with unicode characters in it
# anyways. Additionally get rid of control
# characters.
try:
l = dev.label.encode('unicode_escape')
except:
try:
l = dev.label.encode('string_escape')
except:
l = dev.label
if l != "":
try:
print l,
except:
print '?',
else:
print "?",
print "type", dev.fstype,
if self.verbose == True:
haveopts = False
if dev.label != "":
haveopts = True
print "(label=\""+dev.label+"\"",
if dev.options != "":
if haveopts == False: print "(",
print "options=\""+dev.options+"\"",
haveopts = True
if haveopts == True: print ")",
print
def cryptosetup(self, dev):
import getpass
try:
pw = getpass.getpass();
except:
return 1
if pw == "":
return 1
obj = self.bus.get_object("org.freedesktop.Hal", dev.udi)
try:
if(obj.Setup(pw, dbus_interface="org.freedesktop.Hal.Device.Volume.Crypto")):
return 1
except dbus.DBusException, msg:
err(dev.device + ": " + str(msg))
return 1
return 0
def cryptoteardown(self, dev):
obj = self.bus.get_object("org.freedesktop.Hal", dev.udi)
try:
if(obj.Teardown(dbus_interface="org.freedesktop.Hal.Device.Volume.Crypto")):
return 1
except dbus.DBusException, msg:
err(dev.device + ": " + str(msg))
return 1
return 0
def mount(self, name, dest, fs, opt):
ret = 0
found = False
if name[0:7] == "/media/":
name = name[7:]
if dest[0:7] == "/media/":
dest = dest[7:]
if name != "/": name = name.rstrip("/")
if dest != "/": dest = dest.rstrip("/")
for dev in self.mountable:
if (self.all or (name[0:5] == "/dev/" and dev.device == name) or \
dev.label == name):
found = True
if dev.is_mounted == True:
err( dev.device + " is already mounted on " + dev.mountpoint)
continue
if dev.fstype == "crypto_LUKS":
if(self.cryptosetup(dev)):
ret += 1
continue;
d = dest
if d == "":
d=dev.label
if self.verbose == True: print "mounting", dev.device, "on", d
uid = getuid()
if uid != 0 and dev.has_uid == True:
add_uid_option = True
for i in opt:
if i[0:4] == "uid=":
add_uid_option = False
if add_uid_option == True:
opt.append("uid="+str(uid))
obj = self.bus.get_object("org.freedesktop.Hal", dev.udi)
try:
if opt == []:
opt = [""]
if(obj.Mount(d, fs, opt, dbus_interface="org.freedesktop.Hal.Device.Volume")):
ret += 1
except dbus.DBusException, msg:
# XXX: looks like we have no way to get
# the error name
# org.freedesktop.Hal.Device.Volume.InvalidMountpoint
if str(msg).find("mountpoint") != -1 and str(msg).find("invalid") != -1:
try:
obj.Mount("", fs, opt,
dbus_interface="org.freedesktop.Hal.Device.Volume")
except dbus.DBusException, msg:
err(dev.device + ": " + str(msg))
ret += 1
else:
if msg.get_dbus_name() == "org.freedesktop.Hal.Device.PermissionDeniedByPolicy":
privilege = str(msg)[len("org.freedesktop.Hal.Device.PermissionDeniedByPolicy: "):]
privilege = privilege[:privilege.find(" ")]
try:
bus = dbus.SessionBus()
agent = bus.get_object("org.freedesktop.PolicyKit.AuthenticationAgent", "/")
ok = agent.ObtainAuthorization(privilege, dbus.UInt32(0), dbus.UInt32(getpid()));
if ok == True:
self.mountable.append(dev)
continue
else:
err("Missing Privilege: " + privilege);
ret += 1
except:
err('authentication agent unreachable.\ntry running "polkit-auth --obtain %(p)s"' % { 'p': privilege})
ret += 1
else:
err(dev.device + ": " + str(msg))
ret += 1
if ret == 0:
dev.update()
if dev.is_mounted == True:
if d == "" or d != dest: print dev.device, "mounted on", dev.mountpoint
else:
err("failed to mount " + dev.device)
if found == False:
if name != "": err(name + " not found")
else: err("no device found")
ret += 1
return ret
def umount(self, name):
ret = 0
found = False
if name != "/": name = name.rstrip("/")
for dev in self.mountable:
if (self.all or (name[0:5] == "/dev/" and dev.device == name) or dev.mountpoint == name \
or dev.label == name):
found = True;
if dev.fstype == "crypto_LUKS":
if(self.cryptoteardown(dev)):
ret +=1
continue
if dev.is_mounted == False:
err(dev.device + " is not mounted")
continue
obj = self.bus.get_object("org.freedesktop.Hal", dev.udi)
try:
if(obj.Unmount([""], dbus_interface="org.freedesktop.Hal.Device.Volume")):
ret += 1
except dbus.DBusException, msg:
err(dev.device + ": " + str(msg))
ret += 1;
if found == False:
if name != "": err(name + " not found")
else: err("no device found")
ret += 1
return ret
def eject(self, name):
ret = 0
found = False
if name != "/": name = name.rstrip("/")
for dev in self.mountable:
if (self.all or (name[0:5] == "/dev/" and dev.device == name) or dev.mountpoint == name):
found = True;
obj = self.bus.get_object("org.freedesktop.Hal", dev.udi)
try:
if(obj.Eject([""], dbus_interface="org.freedesktop.Hal.Device.Volume")):
ret += 1
except dbus.DBusException, msg:
err(dev.device + ": " + str(msg))
ret += 1;
if found == False:
if name != "": err(name + " not found")
else: err("no device found")
ret += 1
return ret
parser = OptionParser(usage="%prog [options] <device|label> [mountpoint]")
parser.add_option("-t", dest="fstype", default="",
help="file system type")
parser.add_option("-o", dest="options", default="",
help="mount options")
parser.add_option("-v", dest="verbose", action="store_true", default=False,
help="verbose listing")
parser.add_option("-u", dest="umount", action="store_true", default=False,
help="umount")
parser.add_option("-e", dest="eject", action="store_true", default=False,
help="eject")
parser.add_option("-a", dest="all", action="store_true", default=False,
help="(u)mount all")
parser.add_option("--listudi", dest="listudi", action="store_true", default=False,
help="list UDIs")
(options, args) = parser.parse_args()
try:
m = Mount()
except dbus.DBusException, msg:
err("Can't connect to hald: "+str(msg))
sys.exit(1)
m.verbose = options.verbose
m.all = options.all
ret = 0
if (options.umount == True) or (options.eject == True):
if len(args) != 1 and options.all == False:
err("specify device name or mountpoint")
sys.exit(1)
try:
name = args[0]
except:
name = ""
if options.umount == True:
ret = m.umount(name)
else:
ret = m.eject(name)
elif options.listudi == True:
m.listudi()
else:
if len(args) == 0 and options.all == False:
m.list()
else:
opt = [];
if options.options != "":
opt = options.options.split(",")
try:
name = args[0]
except:
name = ""
try:
dest = args[1]
except:
dest = ""
ret = m.mount(name, dest, options.fstype, opt)
sys.exit(ret)