File halmount.py of Package ivman

#!/usr/bin/python
# hal mount script
# GPL, by Ludwig Nussel <lnussel@suse.de>

# TODO:
# - use volume.crypto_luks.clear.backing_volume to detach luks partition on
#   umount or at least display it somehow

import sys
import subprocess
from os import getuid, getpid
from optparse import OptionParser

def err(arg):
	if arg[-1] != "\n": arg += "\n"
	sys.stderr.write(arg)

try:
	import dbus
except:
	err("you need to install dbus-1-python")
	sys.exit(1)

class Dev:

	def __init__(self, bus, udi):

		self.bus = bus
		self.udi = udi

		self.update()

	def update(self):

		obj = self.bus.get_object("org.freedesktop.Hal", self.udi)
		props = obj.GetAllProperties(dbus_interface="org.freedesktop.Hal.Device")

		try:
			self.device = props["block.device"]
		except:
			self.device = ""

		try:
			self.mountpoint = props["volume.mount_point"]
		except:
			self.mountpoint = ""

		try:
			self.label = props["volume.label"]
		except:
			self.label = ""

		try:
			self.fstype = props["volume.fstype"]
		except:
			self.fstype = ""

		try:
			self.is_mounted = props["volume.is_mounted"]
		except:
			self.is_mounted = False

		self.has_uid = False
		self.options = ""
		try:
			l = props["volume.mount.valid_options"]

			for i in l:
				if i == "uid=": self.has_uid = True
				if self.options != "":
					self.options += ","
				self.options += i
		except:
			pass

class Mount:

	def __init__(self):
		self.mountable = []
		self.verbose = False

		self.bus = dbus.SystemBus()

		self.hal_obj = self.bus.get_object("org.freedesktop.Hal",
				"/org/freedesktop/Hal/Manager")
		self.hal = dbus.Interface(self.hal_obj,
				"org.freedesktop.Hal.Manager")

		devices = self.hal.GetAllDevices()

		for udi in devices:
			obj = self.bus.get_object("org.freedesktop.Hal", udi)

			if obj.PropertyExists("volume.fsusage", dbus_interface="org.freedesktop.Hal.Device"):
				fsusage = obj.GetProperty("volume.fsusage", dbus_interface="org.freedesktop.Hal.Device")
				if fsusage == "filesystem" or fsusage == "crypto":
					mdev = Dev(self.bus, udi)
					self.mountable.append(mdev)
	def listudi(self):
		for dev in self.mountable:
			print dev.device, dev.udi
		
	def list(self):
		for dev in self.mountable:

			if dev.fstype == "crypto_LUKS":
				print dev.device, "encrypted"
				continue

			if dev.is_mounted == True:
				print dev.device, "on", dev.mountpoint,
			else:
				print dev.device, "->",

				# python throws an exception when trying to
				# mount a label with unicode characters in it
				# anyways. Additionally get rid of control
				# characters.
				try:
					l = dev.label.encode('unicode_escape')
				except:
					try:
						l = dev.label.encode('string_escape')
					except:
						l = dev.label

				if l != "":
					try:
						print l,
					except:
						print '?',
				else:
					print "?",

			print "type", dev.fstype,

			if self.verbose == True:
				haveopts = False
				if dev.label != "":
					haveopts = True
					print "(label=\""+dev.label+"\"",

				if dev.options != "":
					if haveopts == False: print "(",
					print "options=\""+dev.options+"\"",
					haveopts = True

				if haveopts == True: print ")",

			print

	def cryptosetup(self, dev):
		import getpass
		try:
			pw = getpass.getpass();
		except:
			return 1

		if pw == "":
			return 1

		obj = self.bus.get_object("org.freedesktop.Hal", dev.udi)
		try:
			if(obj.Setup(pw, dbus_interface="org.freedesktop.Hal.Device.Volume.Crypto")):
				return 1
		except dbus.DBusException, msg:
			err(dev.device + ": " + str(msg))
			return 1
		return 0

	def cryptoteardown(self, dev):
		obj = self.bus.get_object("org.freedesktop.Hal", dev.udi)
		try:
			if(obj.Teardown(dbus_interface="org.freedesktop.Hal.Device.Volume.Crypto")):
				return 1
		except dbus.DBusException, msg:
			err(dev.device + ": " + str(msg))
			return 1
		return 0

	def mount(self, name, dest, fs, opt):
		ret = 0
		found = False
		if name[0:7] == "/media/":
			name = name[7:]
		if dest[0:7] == "/media/":
			dest = dest[7:]
		if name != "/": name = name.rstrip("/")
		if dest != "/": dest = dest.rstrip("/")
		for dev in self.mountable:
			if (self.all or (name[0:5] == "/dev/" and dev.device == name) or \
				dev.label == name):

				found = True
				if dev.is_mounted == True:
					err( dev.device + " is already mounted on " + dev.mountpoint)
					continue

				if dev.fstype == "crypto_LUKS":
					if(self.cryptosetup(dev)):
						ret += 1
					continue;

				d = dest
				if d == "":
					d=dev.label

				if self.verbose == True: print "mounting", dev.device, "on", d

				uid = getuid()
				if uid != 0 and dev.has_uid == True:
					add_uid_option = True
					for i in opt:
						if i[0:4] == "uid=":
							add_uid_option = False
					if add_uid_option == True:
						opt.append("uid="+str(uid))

				obj = self.bus.get_object("org.freedesktop.Hal", dev.udi)
				try:
					if opt == []:
						opt = [""]
					if(obj.Mount(d, fs, opt, dbus_interface="org.freedesktop.Hal.Device.Volume")):
						ret += 1
				except dbus.DBusException, msg:
					# XXX: looks like we have no way to get
					# the error name
					# org.freedesktop.Hal.Device.Volume.InvalidMountpoint
					if str(msg).find("mountpoint") != -1 and str(msg).find("invalid") != -1:
						try:
							obj.Mount("", fs, opt,
								dbus_interface="org.freedesktop.Hal.Device.Volume")
						except dbus.DBusException, msg:
							err(dev.device + ": " + str(msg))
							ret += 1
					else:
						if msg.get_dbus_name() == "org.freedesktop.Hal.Device.PermissionDeniedByPolicy":
							privilege = str(msg)[len("org.freedesktop.Hal.Device.PermissionDeniedByPolicy: "):]
							privilege = privilege[:privilege.find(" ")]

							try:
								bus = dbus.SessionBus()
								agent = bus.get_object("org.freedesktop.PolicyKit.AuthenticationAgent", "/")
								ok = agent.ObtainAuthorization(privilege, dbus.UInt32(0), dbus.UInt32(getpid()));
								if ok == True:
									self.mountable.append(dev)
									continue
								else:
									err("Missing Privilege: " + privilege);
									ret += 1
							except:
								err('authentication agent unreachable.\ntry running "polkit-auth --obtain %(p)s"' % { 'p': privilege})
								ret += 1
						else:
							err(dev.device + ": " + str(msg))
							ret += 1

				if ret == 0:
					dev.update()
					if dev.is_mounted == True:
						if d == "" or d != dest: print dev.device, "mounted on", dev.mountpoint
					else:
						err("failed to mount " + dev.device)

		if found == False:
			if name != "": err(name + " not found")
			else: err("no device found")
			ret += 1

		return ret

	def umount(self, name):
		ret = 0
		found = False
		if name != "/": name = name.rstrip("/")

		for dev in self.mountable:
			if (self.all or (name[0:5] == "/dev/" and dev.device == name) or dev.mountpoint == name \
				or dev.label == name):

				found = True;

				if dev.fstype == "crypto_LUKS":
					if(self.cryptoteardown(dev)):
						ret +=1
					continue

				if dev.is_mounted == False:
					err(dev.device + " is not mounted")
					continue

				obj = self.bus.get_object("org.freedesktop.Hal", dev.udi)
				try:
					if(obj.Unmount([""], dbus_interface="org.freedesktop.Hal.Device.Volume")):
						ret += 1
				except dbus.DBusException, msg:
					err(dev.device + ": " + str(msg))
					ret += 1;

		if found == False:
			if name != "": err(name + " not found")
			else: err("no device found")
			ret += 1

		return ret

	def eject(self, name):
		ret = 0
		found = False
		if name != "/": name = name.rstrip("/")

		for dev in self.mountable:
			if (self.all or (name[0:5] == "/dev/" and dev.device == name) or dev.mountpoint == name):
				found = True;
				obj = self.bus.get_object("org.freedesktop.Hal", dev.udi)
				try:
					if(obj.Eject([""], dbus_interface="org.freedesktop.Hal.Device.Volume")):
						ret += 1
				except dbus.DBusException, msg:
					err(dev.device + ": " + str(msg))
					ret += 1;

		if found == False:
			if name != "": err(name + " not found")
			else: err("no device found")
			ret += 1

		return ret

parser = OptionParser(usage="%prog [options] <device|label> [mountpoint]")
parser.add_option("-t", dest="fstype", default="",
                  help="file system type")
parser.add_option("-o", dest="options", default="",
                  help="mount options")
parser.add_option("-v", dest="verbose", action="store_true", default=False,
                  help="verbose listing")
parser.add_option("-u", dest="umount", action="store_true", default=False,
                  help="umount")
parser.add_option("-e", dest="eject", action="store_true", default=False,
                  help="eject")
parser.add_option("-a", dest="all", action="store_true", default=False,
                  help="(u)mount all")
parser.add_option("--listudi", dest="listudi", action="store_true", default=False,
                  help="list UDIs")

(options, args) = parser.parse_args()

try:
	m = Mount()
except dbus.DBusException, msg:
	err("Can't connect to hald: "+str(msg))
	sys.exit(1)

m.verbose = options.verbose
m.all = options.all

ret = 0

if (options.umount == True) or (options.eject == True):
	if len(args) != 1 and options.all == False:
		err("specify device name or mountpoint")
		sys.exit(1)

	try:
		name = args[0]
	except:
		name = ""

	if options.umount == True:
		ret = m.umount(name)
	else:
		ret = m.eject(name)
elif options.listudi == True:
	m.listudi()
else:
	if len(args) == 0 and options.all == False:
		m.list()
	else:
		opt = [];
		if options.options != "":
			opt = options.options.split(",")

		try:
			name = args[0]
		except:
			name = ""
		try:
			dest = args[1]
		except:
			dest = ""

		ret = m.mount(name, dest, options.fstype, opt)

sys.exit(ret)
openSUSE Build Service is sponsored by