File 0001-rbd-simplify-configuration-and-use-librbd-and-librad.patch of Package openstack-cinder

From 3767ac32a5d511231a24b01e365a6da0fb9333dd Mon Sep 17 00:00:00 2001
From: leseb <sebastien.han@enovance.com>
Date: Wed, 15 May 2013 13:45:45 +0200
Subject: [PATCH] rbd: simplify configuration and use librbd and librados

Add an rbd_ceph_conf options to mirror glance configuration, and use
the existing rbd_user option to choose how to connect to the cluster
instead of relying on an environment variable.  Use these settings
when running command line programs and when connecting via librados.

Use absolute imports so that importing the python librbd bindings
via 'import rbd' does not try to import cinder.drivers.rbd again.

Create some convenience wrappers to simplify librbd and librados
error handling and cleanup. Using these everywhere also simplifies
testing. Mock out all the librados and librbd calls in the tests
so these libraries don't need to be installed.

Remove the local_path() method since it's never used. It was
left over from nova-volume.

There are only three things still relying on the command line:
- importing an image
- exporting to an image
- getting monitor addresses

Importing and exporting on the command line include zero-detection
that would be little benefit to replicate here. librados and librbd
don't have a simple interface to obtain the monitor addresses, so
leave that to a command line tool as well.

Fixes: bug 1083540
Signed-off-by: Josh Durgin <josh.durgin@inktank.com>

Change-Id: I32d059c5e460c2dd8423119b3dbe4a9921f5e907
---
 cinder/tests/test_rbd.py     | 335 +++++++++++++++++++++++++++++++++++--------
 cinder/volume/drivers/rbd.py | 317 +++++++++++++++++++++++++++-------------
 2 files changed, 499 insertions(+), 153 deletions(-)

diff --git a/cinder/tests/test_rbd.py b/cinder/tests/test_rbd.py
index 54b6fff..41f118d 100644
--- a/cinder/tests/test_rbd.py
+++ b/cinder/tests/test_rbd.py
@@ -29,8 +29,7 @@ from cinder import test
 from cinder.tests.image import fake as fake_image
 from cinder.tests.test_volume import DriverTestCase
 from cinder.volume import configuration as conf
-from cinder.volume.drivers.rbd import RBDDriver
-from cinder.volume.drivers.rbd import VERSION as DRIVER_VERSION
+import cinder.volume.drivers.rbd as driver
 
 LOG = logging.getLogger(__name__)
 
@@ -39,37 +38,14 @@ class FakeImageService:
     def download(self, context, image_id, path):
         pass
 
-RADOS_DF_OUT = """
-{
-   "total_space" : "958931232",
-   "total_used" : "123906196",
-   "total_objects" : "4221",
-   "total_avail" : "787024012",
-   "pools" : [
-      {
-         "name" : "volumes",
-         "categories" : [
-            {
-               "write_bytes" : "226833",
-               "size_kb" : "17038386",
-               "read_bytes" : "221865",
-               "num_objects" : "4186",
-               "name" : "",
-               "size_bytes" : "17447306589",
-               "write_kb" : "20302730",
-               "num_object_copies" : "8372",
-               "read_kb" : "30",
-               "num_objects_unfound" : "0",
-               "num_object_clones" : "9",
-               "num_objects_missing_on_primary" : "0",
-               "num_objects_degraded" : "0"
-            }
-         ],
-         "id" : "4"
-      }
-   ]
-}
-"""
+
+class TestUtil(test.TestCase):
+    def test_ascii_str(self):
+        self.assertEqual(None, driver.ascii_str(None))
+        self.assertEqual('foo', driver.ascii_str('foo'))
+        self.assertEqual('foo', driver.ascii_str(u'foo'))
+        self.assertRaises(UnicodeEncodeError,
+                          driver.ascii_str, 'foo' + unichr(300))
 
 
 class RBDTestCase(test.TestCase):
@@ -79,17 +55,114 @@ class RBDTestCase(test.TestCase):
 
         def fake_execute(*args, **kwargs):
             return '', ''
-        self._mox = mox.Mox()
         self.configuration = mox.MockObject(conf.Configuration)
         self.configuration.volume_tmp_dir = None
         self.configuration.rbd_pool = 'rbd'
+        self.configuration.rbd_ceph_conf = None
         self.configuration.rbd_secret_uuid = None
         self.configuration.rbd_user = None
         self.configuration.append_config_values(mox.IgnoreArg())
 
-        self.driver = RBDDriver(execute=fake_execute,
-                                configuration=self.configuration)
-        self._mox.ReplayAll()
+        self.rados = self.mox.CreateMockAnything()
+        self.rbd = self.mox.CreateMockAnything()
+        self.driver = driver.RBDDriver(execute=fake_execute,
+                                       configuration=self.configuration,
+                                       rados=self.rados,
+                                       rbd=self.rbd)
+
+    def test_create_volume(self):
+        name = u'volume-00000001'
+        size = 1
+        volume = dict(name=name, size=size)
+        mock_client = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RADOSClient')
+
+        driver.RADOSClient(self.driver).AndReturn(mock_client)
+        mock_client.__enter__().AndReturn(mock_client)
+        self.rbd.RBD_FEATURE_LAYERING = 1
+        mock_rbd = self.mox.CreateMockAnything()
+        self.rbd.RBD().AndReturn(mock_rbd)
+        mock_rbd.create(mox.IgnoreArg(), str(name), size * 1024 ** 3,
+                        old_format=False,
+                        features=self.rbd.RBD_FEATURE_LAYERING)
+        mock_client.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        self.driver.create_volume(volume)
+
+    def test_delete_volume(self):
+        name = u'volume-00000001'
+        volume = dict(name=name)
+        mock_client = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RADOSClient')
+
+        driver.RADOSClient(self.driver).AndReturn(mock_client)
+        mock_client.__enter__().AndReturn(mock_client)
+        mock_rbd = self.mox.CreateMockAnything()
+        self.rbd.RBD().AndReturn(mock_rbd)
+        mock_rbd.remove(mox.IgnoreArg(), str(name))
+        mock_client.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        self.driver.delete_volume(volume)
+
+    def test_create_snapshot(self):
+        vol_name = u'volume-00000001'
+        snap_name = u'snapshot-name'
+        snapshot = dict(volume_name=vol_name, name=snap_name)
+        mock_proxy = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RBDVolumeProxy')
+
+        driver.RBDVolumeProxy(self.driver, vol_name) \
+            .AndReturn(mock_proxy)
+        mock_proxy.__enter__().AndReturn(mock_proxy)
+        mock_proxy.create_snap(str(snap_name))
+        self.rbd.RBD_FEATURE_LAYERING = 1
+        mock_proxy.protect_snap(str(snap_name))
+        mock_proxy.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        self.driver.create_snapshot(snapshot)
+
+    def test_delete_snapshot(self):
+        vol_name = u'volume-00000001'
+        snap_name = u'snapshot-name'
+        snapshot = dict(volume_name=vol_name, name=snap_name)
+        mock_proxy = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RBDVolumeProxy')
+
+        driver.RBDVolumeProxy(self.driver, vol_name) \
+            .AndReturn(mock_proxy)
+        mock_proxy.__enter__().AndReturn(mock_proxy)
+        self.rbd.RBD_FEATURE_LAYERING = 1
+        mock_proxy.unprotect_snap(str(snap_name))
+        mock_proxy.remove_snap(str(snap_name))
+        mock_proxy.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        self.driver.delete_snapshot(snapshot)
+
+    def test_create_cloned_volume(self):
+        src_name = u'volume-00000001'
+        dst_name = u'volume-00000002'
+        mock_proxy = self.mox.CreateMockAnything()
+        mock_proxy.ioctx = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RBDVolumeProxy')
+
+        driver.RBDVolumeProxy(self.driver, src_name, read_only=True) \
+            .AndReturn(mock_proxy)
+        mock_proxy.__enter__().AndReturn(mock_proxy)
+        mock_proxy.copy(mock_proxy.ioctx, str(dst_name))
+        mock_proxy.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        self.driver.create_cloned_volume(dict(name=dst_name),
+                                         dict(name=src_name))
 
     def test_good_locations(self):
         locations = ['rbd://fsid/pool/image/snap',
@@ -113,6 +186,18 @@ class RBDTestCase(test.TestCase):
     def test_cloneable(self):
         self.stubs.Set(self.driver, '_get_fsid', lambda: 'abc')
         location = 'rbd://abc/pool/image/snap'
+        mock_proxy = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RBDVolumeProxy')
+
+        driver.RBDVolumeProxy(self.driver, 'image',
+                              pool='pool',
+                              snapshot='snap',
+                              read_only=True).AndReturn(mock_proxy)
+        mock_proxy.__enter__().AndReturn(mock_proxy)
+        mock_proxy.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
+
         self.assertTrue(self.driver._is_cloneable(location))
 
     def test_uncloneable_different_fsid(self):
@@ -121,11 +206,18 @@ class RBDTestCase(test.TestCase):
         self.assertFalse(self.driver._is_cloneable(location))
 
     def test_uncloneable_unreadable(self):
-        def fake_exc(*args):
-            raise exception.ProcessExecutionError()
         self.stubs.Set(self.driver, '_get_fsid', lambda: 'abc')
-        self.stubs.Set(self.driver, '_execute', fake_exc)
         location = 'rbd://abc/pool/image/snap'
+        self.stubs.Set(self.rbd, 'Error', test.TestingException)
+        self.mox.StubOutWithMock(driver, 'RBDVolumeProxy')
+
+        driver.RBDVolumeProxy(self.driver, 'image',
+                              pool='pool',
+                              snapshot='snap',
+                              read_only=True).AndRaise(test.TestingException)
+
+        self.mox.ReplayAll()
+
         self.assertFalse(self.driver._is_cloneable(location))
 
     def _copy_image(self):
@@ -138,6 +230,8 @@ class RBDTestCase(test.TestCase):
         self.stubs.Set(tempfile, 'NamedTemporaryFile', fake_temp_file)
         self.stubs.Set(os.path, 'exists', lambda x: True)
         self.stubs.Set(image_utils, 'fetch_to_raw', lambda w, x, y, z: None)
+        self.stubs.Set(self.driver, 'delete_volume', lambda x: None)
+        self.stubs.Set(self.driver, '_resize', lambda x: None)
         self.driver.copy_image_to_volume(None, {'name': 'test',
                                                 'size': 1},
                                          FakeImageService(), None)
@@ -151,38 +245,52 @@ class RBDTestCase(test.TestCase):
         self._copy_image()
 
     def test_update_volume_stats(self):
-        def fake_stats(*args):
-            return RADOS_DF_OUT, ''
-
-        def fake_safe_get(*args):
-            return "RBD"
+        self.stubs.Set(self.driver.configuration, 'safe_get', lambda x: 'RBD')
+        mock_client = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RADOSClient')
+
+        driver.RADOSClient(self.driver).AndReturn(mock_client)
+        mock_client.__enter__().AndReturn(mock_client)
+        self.mox.StubOutWithMock(mock_client, 'cluster')
+        mock_client.cluster.get_cluster_stats().AndReturn(dict(
+            kb=1234567890,
+            kb_used=4567890,
+            kb_avail=1000000000,
+            num_objects=4683))
+        mock_client.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
 
-        self.stubs.Set(self.driver, '_execute', fake_stats)
-        self.stubs.Set(self.driver.configuration, 'safe_get', fake_safe_get)
         expected = dict(
             volume_backend_name='RBD',
             vendor_name='Open Source',
-            driver_version=DRIVER_VERSION,
+            driver_version=driver.VERSION,
             storage_protocol='ceph',
-            total_capacity_gb=914,
-            free_capacity_gb=750,
+            total_capacity_gb=1177,
+            free_capacity_gb=953,
             reserved_percentage=0)
         actual = self.driver.get_volume_stats(True)
         self.assertDictMatch(expected, actual)
 
     def test_update_volume_stats_error(self):
-        def fake_exc(*args):
-            raise exception.ProcessExecutionError()
+        self.stubs.Set(self.driver.configuration, 'safe_get', lambda x: 'RBD')
+        mock_client = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RADOSClient')
+
+        driver.RADOSClient(self.driver).AndReturn(mock_client)
+        mock_client.__enter__().AndReturn(mock_client)
+        self.mox.StubOutWithMock(mock_client, 'cluster')
+        self.stubs.Set(self.rados, 'Error', test.TestingException)
+        mock_client.cluster.get_cluster_stats().AndRaise(test.TestingException)
+        mock_client.__exit__(test.TestingException,
+                             mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(None)
 
-        def fake_safe_get(*args):
-            return "RBD"
+        self.mox.ReplayAll()
 
-        self.stubs.Set(self.driver, '_execute', fake_exc)
-        self.stubs.Set(self.driver.configuration, 'safe_get', fake_safe_get)
         expected = dict(
             volume_backend_name='RBD',
             vendor_name='Open Source',
-            driver_version=DRIVER_VERSION,
+            driver_version=driver.VERSION,
             storage_protocol='ceph',
             total_capacity_gb='unknown',
             free_capacity_gb='unknown',
@@ -190,6 +298,119 @@ class RBDTestCase(test.TestCase):
         actual = self.driver.get_volume_stats(True)
         self.assertDictMatch(expected, actual)
 
+    def test_get_mon_addrs(self):
+        self.stubs.Set(self.driver, '_execute',
+                       lambda *a: (CEPH_MON_DUMP, ''))
+        hosts = ['::1', '::1', '::1', '127.0.0.1', 'example.com']
+        ports = ['6789', '6790', '6791', '6792', '6791']
+        self.assertEqual((hosts, ports), self.driver._get_mon_addrs())
+
+    def test_initialize_connection(self):
+        name = 'volume-00000001'
+        hosts = ['::1', '::1', '::1', '127.0.0.1', 'example.com']
+        ports = ['6789', '6790', '6791', '6792', '6791']
+        self.stubs.Set(self.driver, '_get_mon_addrs', lambda: (hosts, ports))
+        expected = {
+            'driver_volume_type': 'rbd',
+            'data': {
+                'name': '%s/%s' % (self.configuration.rbd_pool,
+                                   name),
+                'hosts': hosts,
+                'ports': ports,
+                'auth_enabled': False,
+                'auth_username': None,
+                'secret_type': 'ceph',
+                'secret_uuid': None,
+                }
+        }
+        actual = self.driver.initialize_connection(dict(name=name), None)
+        self.assertDictMatch(expected, actual)
+
+    def test_clone(self):
+        name = u'volume-00000001'
+        volume = dict(name=name)
+        src_pool = u'images'
+        src_image = u'image-name'
+        src_snap = u'snapshot-name'
+        mock_src_client = self.mox.CreateMockAnything()
+        mock_dst_client = self.mox.CreateMockAnything()
+        mock_rbd = self.mox.CreateMockAnything()
+        self.mox.StubOutWithMock(driver, 'RADOSClient')
+
+        driver.RADOSClient(self.driver, src_pool).AndReturn(mock_src_client)
+        mock_src_client.__enter__().AndReturn(mock_src_client)
+        driver.RADOSClient(self.driver).AndReturn(mock_dst_client)
+        mock_dst_client.__enter__().AndReturn(mock_dst_client)
+        self.rbd.RBD_FEATURE_LAYERING = 1
+        self.rbd.RBD().AndReturn(mock_rbd)
+        mock_rbd.clone(mox.IgnoreArg(),
+                       str(src_image),
+                       str(src_snap),
+                       mox.IgnoreArg(),
+                       str(name),
+                       features=self.rbd.RBD_FEATURE_LAYERING)
+        mock_dst_client.__exit__(None, None, None).AndReturn(None)
+        mock_src_client.__exit__(None, None, None).AndReturn(None)
+
+        self.mox.ReplayAll()
+
+        self.driver._clone(volume, src_pool, src_image, src_snap)
+
+    def test_rbd_volume_proxy_init(self):
+        name = u'volume-00000001'
+        snap = u'snapshot-name'
+        self.stubs.Set(self.driver, '_connect_to_rados',
+                       lambda x: (None, None))
+        self.mox.StubOutWithMock(self.driver, '_disconnect_from_rados')
+
+        # no snapshot
+        self.rbd.Image(None, str(name), snapshot=None, read_only=False) \
+                .AndReturn(None)
+        # snapshot
+        self.rbd.Image(None, str(name), snapshot=str(snap), read_only=True) \
+                .AndReturn(None)
+        # error causes disconnect
+        self.stubs.Set(self.rbd, 'Error', test.TestingException)
+        self.rbd.Image(None, str(name), snapshot=None, read_only=False) \
+                .AndRaise(test.TestingException)
+        self.driver._disconnect_from_rados(None, None)
+
+        self.mox.ReplayAll()
+
+        driver.RBDVolumeProxy(self.driver, name)
+        driver.RBDVolumeProxy(self.driver, name, snapshot=snap, read_only=True)
+        self.assertRaises(test.TestingException,
+                          driver.RBDVolumeProxy, self.driver, name)
+
+    def test_connect_to_rados(self):
+        mock_client = self.mox.CreateMockAnything()
+        mock_ioctx = self.mox.CreateMockAnything()
+        self.stubs.Set(self.rados, 'Error', test.TestingException)
+
+        # default configured pool
+        self.rados.Rados(rados_id=None, conffile=None).AndReturn(mock_client)
+        mock_client.connect()
+        mock_client.open_ioctx('rbd').AndReturn(mock_ioctx)
+
+        # different pool
+        self.rados.Rados(rados_id=None, conffile=None).AndReturn(mock_client)
+        mock_client.connect()
+        mock_client.open_ioctx('images').AndReturn(mock_ioctx)
+
+        # error
+        self.rados.Rados(rados_id=None, conffile=None).AndReturn(mock_client)
+        mock_client.connect()
+        mock_client.open_ioctx('rbd').AndRaise(test.TestingException)
+        mock_client.shutdown()
+
+        self.mox.ReplayAll()
+
+        self.assertEqual((mock_client, mock_ioctx),
+                         self.driver._connect_to_rados())
+        self.assertEqual((mock_client, mock_ioctx),
+                         self.driver._connect_to_rados('images'))
+        self.assertRaises(test.TestingException, self.driver._connect_to_rados)
+
 
 class ManagedRBDTestCase(DriverTestCase):
     driver_name = "cinder.volume.drivers.rbd.RBDDriver"
diff --git a/cinder/volume/drivers/rbd.py b/cinder/volume/drivers/rbd.py
index 6a8f5c0..c765ecf 100644
--- a/cinder/volume/drivers/rbd.py
+++ b/cinder/volume/drivers/rbd.py
@@ -15,6 +15,8 @@
 RADOS Block Device Driver
 """
 
+from __future__ import absolute_import
+
 import json
 import os
 import tempfile
@@ -28,6 +30,13 @@ from cinder.openstack.common import log as logging
 from cinder import utils
 from cinder.volume import driver
 
+try:
+    import rados
+    import rbd
+except ImportError:
+    rados = None
+    rbd = None
+
 LOG = logging.getLogger(__name__)
 
 rbd_opts = [
@@ -36,7 +45,11 @@ rbd_opts = [
                help='the RADOS pool in which rbd volumes are stored'),
     cfg.StrOpt('rbd_user',
                default=None,
-               help='the RADOS client name for accessing rbd volumes'),
+               help='the RADOS client name for accessing rbd volumes '
+                    '- only set when using cephx authentication'),
+    cfg.StrOpt('rbd_ceph_conf',
+               default='',  # default determined by librados
+               help='path to the ceph configuration file to use'),
     cfg.StrOpt('rbd_secret_uuid',
                default=None,
                help='the libvirt uuid of the secret for the rbd_user'
@@ -46,7 +59,72 @@ rbd_opts = [
                help='where to store temporary image files if the volume '
                     'driver does not write them directly to the volume'), ]
 
-VERSION = '1.0'
+VERSION = '1.1'
+
+
+def ascii_str(string):
+    """
+    Convert a string to ascii, or return None if the input is None.
+
+    This is useful where a parameter may be None by default, or a
+    string. librbd only accepts ascii, hence the need for conversion.
+    """
+    if string is None:
+        return string
+    return str(string)
+
+
+class RBDVolumeProxy(object):
+    """
+    Context manager for dealing with an existing rbd volume.
+
+    This handles connecting to rados and opening an ioctx automatically,
+    and otherwise acts like a librbd Image object.
+
+    The underlying librados client and ioctx can be accessed as
+    the attributes 'client' and 'ioctx'.
+    """
+    def __init__(self, driver, name, pool=None, snapshot=None,
+                 read_only=False):
+        client, ioctx = driver._connect_to_rados(pool)
+        try:
+            self.volume = driver.rbd.Image(ioctx, str(name),
+                                           snapshot=ascii_str(snapshot),
+                                           read_only=read_only)
+        except driver.rbd.Error:
+            LOG.exception(_("error opening rbd image %s"), name)
+            driver._disconnect_from_rados(client, ioctx)
+            raise
+        self.driver = driver
+        self.client = client
+        self.ioctx = ioctx
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type_, value, traceback):
+        try:
+            self.volume.close()
+        finally:
+            self.driver._disconnect_from_rados(self.client, self.ioctx)
+
+    def __getattr__(self, attrib):
+        return getattr(self.volume, attrib)
+
+
+class RADOSClient(object):
+    """
+    Context manager to simplify error handling for connecting to ceph
+    """
+    def __init__(self, driver, pool=None):
+        self.driver = driver
+        self.cluster, self.ioctx = driver._connect_to_rados(pool)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type_, value, traceback):
+        self.driver._disconnect_from_rados(self.cluster, self.ioctx)
 
 
 class RBDDriver(driver.VolumeDriver):
@@ -55,15 +133,66 @@ class RBDDriver(driver.VolumeDriver):
         super(RBDDriver, self).__init__(*args, **kwargs)
         self.configuration.append_config_values(rbd_opts)
         self._stats = {}
+        # allow overrides for testing
+        self.rados = kwargs.get('rados', rados)
+        self.rbd = kwargs.get('rbd', rbd)
 
     def check_for_setup_error(self):
         """Returns an error if prerequisites aren't met"""
-        (stdout, stderr) = self._execute('rados', 'lspools')
-        pools = stdout.split("\n")
-        if self.configuration.rbd_pool not in pools:
-            exception_message = (_("rbd has no pool %s") %
-                                 self.configuration.rbd_pool)
-            raise exception.VolumeBackendAPIException(data=exception_message)
+        if rados is None:
+            msg = _('rados and rbd python libraries not found')
+            raise exception.VolumeBackendAPIException(data=msg)
+        try:
+            with RADOSClient(self):
+                pass
+        except self.rados.Error:
+            msg = _('error connecting to ceph cluster')
+            LOG.exception(msg)
+            raise exception.VolumeBackendAPIException(data=msg)
+
+    def _ceph_args(self):
+        args = []
+        if self.configuration.rbd_user:
+            args.extend(['--id', self.configuration.rbd_user])
+        if self.configuration.rbd_ceph_conf:
+            args.extend(['--conf', self.configuration.rbd_ceph_conf])
+        return args
+
+    def _connect_to_rados(self, pool=None):
+        ascii_user = ascii_str(self.configuration.rbd_user)
+        ascii_conf = ascii_str(self.configuration.rbd_ceph_conf)
+        client = self.rados.Rados(rados_id=ascii_user, conffile=ascii_conf)
+        try:
+            client.connect()
+            pool_to_open = str(pool or self.configuration.rbd_pool)
+            ioctx = client.open_ioctx(pool_to_open)
+            return client, ioctx
+        except self.rados.Error:
+            # shutdown cannot raise an exception
+            client.shutdown()
+            raise
+
+    def _disconnect_from_rados(self, client, ioctx):
+        # closing an ioctx cannot raise an exception
+        ioctx.close()
+        client.shutdown()
+
+    def _get_mon_addrs(self):
+        args = ['ceph', 'mon', 'dump', '--format=json'] + self._ceph_args()
+        out, _ = self._execute(*args)
+        lines = out.split('\n')
+        if lines[0].startswith('dumped monmap epoch'):
+            lines = lines[1:]
+        monmap = json.loads('\n'.join(lines))
+        addrs = [mon['addr'] for mon in monmap['mons']]
+        hosts = []
+        ports = []
+        for addr in addrs:
+            host_port = addr[:addr.rindex('/')]
+            host, port = host_port.rsplit(':', 1)
+            hosts.append(host.strip('[]'))
+            ports.append(port)
+        return hosts, ports
 
     def _update_volume_stats(self):
         stats = {'vendor_name': 'Open Source',
@@ -76,13 +205,11 @@ class RBDDriver(driver.VolumeDriver):
         stats['volume_backend_name'] = backend_name or 'RBD'
 
         try:
-            stdout, _err = self._execute('rados', 'df', '--format', 'json')
-            new_stats = json.loads(stdout)
-            total = int(new_stats['total_space']) / 1024 ** 2
-            free = int(new_stats['total_avail']) / 1024 ** 2
-            stats['total_capacity_gb'] = total
-            stats['free_capacity_gb'] = free
-        except exception.ProcessExecutionError:
+            with RADOSClient(self) as client:
+                new_stats = client.cluster.get_cluster_stats()
+            stats['total_capacity_gb'] = new_stats['kb'] / 1024 ** 2
+            stats['free_capacity_gb'] = new_stats['kb_avail'] / 1024 ** 2
+        except self.rados.Error, AttributeError:
             # just log and return unknown capacities
             LOG.exception(_('error refreshing volume stats'))
         self._stats = stats
@@ -95,40 +222,50 @@ class RBDDriver(driver.VolumeDriver):
         return self._stats
 
     def _supports_layering(self):
-        stdout, _ = self._execute('rbd', '--help')
-        return 'clone' in stdout
+        return hasattr(self.rbd, 'RBD_FEATURE_LAYERING')
 
     def create_cloned_volume(self, volume, src_vref):
-        raise NotImplementedError()
+        """Clone a logical volume"""
+        with RBDVolumeProxy(self, src_vref['name'], read_only=True) as vol:
+            vol.copy(vol.ioctx, str(volume['name']))
 
     def create_volume(self, volume):
         """Creates a logical volume."""
         if int(volume['size']) == 0:
-            size = 100
+            size = 100 * 1024 ** 2
         else:
-            size = int(volume['size']) * 1024
-        args = ['rbd', 'create',
-                '--pool', self.configuration.rbd_pool,
-                '--size', size,
-                volume['name']]
+            size = int(volume['size']) * 1024 ** 3
+
+        old_format = True
+        features = 0
         if self._supports_layering():
-            args += ['--new-format']
-        self._try_execute(*args)
+            old_format = False
+            features = self.rbd.RBD_FEATURE_LAYERING
+
+        with RADOSClient(self) as client:
+            self.rbd.RBD().create(client.ioctx,
+                                  str(volume['name']),
+                                  size,
+                                  old_format=old_format,
+                                  features=features)
 
     def _clone(self, volume, src_pool, src_image, src_snap):
-        self._try_execute('rbd', 'clone',
-                          '--pool', src_pool,
-                          '--image', src_image,
-                          '--snap', src_snap,
-                          '--dest-pool', self.configuration.rbd_pool,
-                          '--dest', volume['name'])
+        LOG.debug(_('cloning %(pool)s/%(img)s@%(snap)s to %(dst)s') %
+                  dict(pool=src_pool, img=src_image, snap=src_snap,
+                       dst=volume['name']))
+        with RADOSClient(self, src_pool) as src_client:
+            with RADOSClient(self) as dest_client:
+                self.rbd.RBD().clone(src_client.ioctx,
+                                     str(src_image),
+                                     str(src_snap),
+                                     dest_client.ioctx,
+                                     str(volume['name']),
+                                     features=self.rbd.RBD_FEATURE_LAYERING)
 
     def _resize(self, volume):
-        size = int(volume['size']) * 1024
-        self._try_execute('rbd', 'resize',
-                          '--pool', self.configuration.rbd_pool,
-                          '--image', volume['name'],
-                          '--size', size)
+        size = int(volume['size']) * 1024 ** 3
+        with RBDVolumeProxy(self, volume['name']) as vol:
+            vol.resize(size)
 
     def create_volume_from_snapshot(self, volume, snapshot):
         """Creates a volume from a snapshot."""
@@ -139,47 +276,30 @@ class RBDDriver(driver.VolumeDriver):
 
     def delete_volume(self, volume):
         """Deletes a logical volume."""
-        stdout, _ = self._execute('rbd', 'snap', 'ls',
-                                  '--pool', self.configuration.rbd_pool,
-                                  volume['name'])
-        if stdout.count('\n') > 1:
-            raise exception.VolumeIsBusy(volume_name=volume['name'])
-        self._try_execute('rbd', 'rm',
-                          '--pool', self.configuration.rbd_pool,
-                          volume['name'])
+        with RADOSClient(self) as client:
+            try:
+                self.rbd.RBD().remove(client.ioctx, str(volume['name']))
+            except self.rbd.ImageHasSnapshots:
+                raise exception.VolumeIsBusy(volume_name=volume['name'])
 
     def create_snapshot(self, snapshot):
         """Creates an rbd snapshot"""
-        self._try_execute('rbd', 'snap', 'create',
-                          '--pool', self.configuration.rbd_pool,
-                          '--snap', snapshot['name'],
-                          snapshot['volume_name'])
-        if self._supports_layering():
-            self._try_execute('rbd', 'snap', 'protect',
-                              '--pool', self.configuration.rbd_pool,
-                              '--snap', snapshot['name'],
-                              snapshot['volume_name'])
+        with RBDVolumeProxy(self, snapshot['volume_name']) as volume:
+            snap = str(snapshot['name'])
+            volume.create_snap(snap)
+            if self._supports_layering():
+                volume.protect_snap(snap)
 
     def delete_snapshot(self, snapshot):
         """Deletes an rbd snapshot"""
-        if self._supports_layering():
-            try:
-                self._try_execute('rbd', 'snap', 'unprotect',
-                                  '--pool', self.configuration.rbd_pool,
-                                  '--snap', snapshot['name'],
-                                  snapshot['volume_name'])
-            except exception.ProcessExecutionError:
-                raise exception.SnapshotIsBusy(snapshot_name=snapshot['name'])
-        self._try_execute('rbd', 'snap', 'rm',
-                          '--pool', self.configuration.rbd_pool,
-                          '--snap', snapshot['name'],
-                          snapshot['volume_name'])
-
-    def local_path(self, volume):
-        """Returns the path of the rbd volume."""
-        # This is the same as the remote path
-        # since qemu accesses it directly.
-        return "rbd:%s/%s" % (self.configuration.rbd_pool, volume['name'])
+        with RBDVolumeProxy(self, snapshot['volume_name']) as volume:
+            snap = str(snapshot['name'])
+            if self._supports_layering():
+                try:
+                    volume.unprotect_snap(snap)
+                except self.rbd.ImageBusy:
+                    raise exception.SnapshotIsBusy(snapshot_name=snap)
+            volume.remove_snap(snap)
 
     def ensure_export(self, context, volume):
         """Synchronously recreates an export for a logical volume."""
@@ -194,17 +314,21 @@ class RBDDriver(driver.VolumeDriver):
         pass
 
     def initialize_connection(self, volume, connector):
-        return {
+        hosts, ports = self._get_mon_addrs()
+        data = {
             'driver_volume_type': 'rbd',
             'data': {
                 'name': '%s/%s' % (self.configuration.rbd_pool,
                                    volume['name']),
-                'auth_enabled': (self.configuration.rbd_secret_uuid
-                                 is not None),
+                'hosts': hosts,
+                'ports': ports,
+                'auth_enabled': (self.configuration.rbd_user is not None),
                 'auth_username': self.configuration.rbd_user,
                 'secret_type': 'ceph',
                 'secret_uuid': self.configuration.rbd_secret_uuid, }
         }
+        LOG.debug(_('connection data: %s'), data)
+        return data
 
     def terminate_connection(self, volume, connector, **kwargs):
         pass
@@ -224,13 +348,14 @@ class RBDDriver(driver.VolumeDriver):
         return pieces
 
     def _get_fsid(self):
-        stdout, _ = self._execute('ceph', 'fsid')
-        return stdout.rstrip('\n')
+        with RADOSClient(self) as client:
+            return client.cluster.get_fsid()
 
     def _is_cloneable(self, image_location):
         try:
             fsid, pool, image, snapshot = self._parse_location(image_location)
-        except exception.ImageUnacceptable:
+        except exception.ImageUnacceptable as e:
+            LOG.debug(_('not cloneable: %s'), e)
             return False
 
         if self._get_fsid() != fsid:
@@ -240,16 +365,16 @@ class RBDDriver(driver.VolumeDriver):
 
         # check that we can read the image
         try:
-            self._execute('rbd', 'info',
-                          '--pool', pool,
-                          '--image', image,
-                          '--snap', snapshot)
-        except exception.ProcessExecutionError:
-            LOG.debug(_('Unable to read image %s') % image_location)
+            with RBDVolumeProxy(self, image,
+                                pool=pool,
+                                snapshot=snapshot,
+                                read_only=True):
+                return True
+        except self.rbd.Error as e:
+            LOG.debug(_('Unable to open image %(loc)s: %(err)s') %
+                      dict(loc=image_location, err=e))
             return False
 
-        return True
-
     def clone_image(self, volume, image_location):
         if image_location is None or not self._is_cloneable(image_location):
             return False
@@ -264,25 +389,23 @@ class RBDDriver(driver.VolumeDriver):
             os.makedirs(tmp_dir)
 
     def copy_image_to_volume(self, context, volume, image_service, image_id):
-        # TODO(jdurgin): replace with librbd
-        # this is a temporary hack, since rewriting this driver
-        # to use librbd would take too long
         self._ensure_tmp_exists()
         tmp_dir = self.configuration.volume_tmp_dir
 
         with tempfile.NamedTemporaryFile(dir=tmp_dir) as tmp:
             image_utils.fetch_to_raw(context, image_service, image_id,
                                      tmp.name)
-            # import creates the image, so we must remove it first
-            self._try_execute('rbd', 'rm',
-                              '--pool', self.configuration.rbd_pool,
-                              volume['name'])
 
+            self.delete_volume(volume)
+
+            # keep using the command line import instead of librbd since it
+            # detects zeroes to preserve sparseness in the image
             args = ['rbd', 'import',
                     '--pool', self.configuration.rbd_pool,
                     tmp.name, volume['name']]
             if self._supports_layering():
                 args += ['--new-format']
+            args += self._ceph_args()
             self._try_execute(*args)
         self._resize(volume)
 
@@ -293,9 +416,11 @@ class RBDDriver(driver.VolumeDriver):
         tmp_file = os.path.join(tmp_dir,
                                 volume['name'] + '-' + image_meta['id'])
         with utils.remove_path_on_error(tmp_file):
-            self._try_execute('rbd', 'export',
-                              '--pool', self.configuration.rbd_pool,
-                              volume['name'], tmp_file)
+            args = ['rbd', 'export',
+                    '--pool', self.configuration.rbd_pool,
+                    volume['name'], tmp_file]
+            args += self._ceph_args()
+            self._try_execute(*args)
             image_utils.upload_volume(context, image_service,
                                       image_meta, tmp_file)
         os.unlink(tmp_file)
-- 
1.8.4

openSUSE Build Service is sponsored by