File glance-dont-test-qpid.patch of Package openstack-glance
diff -ruN a/glance/tests/unit/test_notifier.py b/glance/tests/unit/test_notifier.py
--- a/glance/tests/unit/test_notifier.py 2013-10-01 17:32:17.000000000 +0200
+++ b/glance/tests/unit/test_notifier.py 2013-10-02 10:44:51.371564341 +0200
@@ -18,8 +18,6 @@
import datetime
import kombu.entity
import mox
-import qpid
-import qpid.messaging
import stubout
import webob
@@ -314,114 +312,6 @@
self.assertEquals(info['conn_called'], 2)
-class TestQpidNotifier(utils.BaseTestCase):
- """Test Qpid notifier."""
-
- def setUp(self):
- super(TestQpidNotifier, self).setUp()
-
- self.mocker = mox.Mox()
-
- self.mock_connection = None
- self.mock_session = None
- self.mock_sender = None
- self.mock_receiver = None
-
- self.orig_connection = qpid.messaging.Connection
- self.orig_session = qpid.messaging.Session
- self.orig_sender = qpid.messaging.Sender
- self.orig_receiver = qpid.messaging.Receiver
- qpid.messaging.Connection = lambda *_x, **_y: self.mock_connection
- qpid.messaging.Session = lambda *_x, **_y: self.mock_session
- qpid.messaging.Sender = lambda *_x, **_y: self.mock_sender
- qpid.messaging.Receiver = lambda *_x, **_y: self.mock_receiver
-
- self.notify_qpid = importutils.import_module("glance.notifier."
- "notify_qpid")
- self.addCleanup(self.reset_qpid)
- self.addCleanup(self.mocker.ResetAll)
-
- def reset_qpid(self):
-
- qpid.messaging.Connection = self.orig_connection
- qpid.messaging.Session = self.orig_session
- qpid.messaging.Sender = self.orig_sender
- qpid.messaging.Receiver = self.orig_receiver
-
- def _test_notify(self, priority, exception=False, exception_send=False):
- test_msg = {'a': 'b'}
-
- self.mock_connection = self.mocker.CreateMock(self.orig_connection)
- self.mock_session = self.mocker.CreateMock(self.orig_session)
- self.mock_sender = self.mocker.CreateMock(self.orig_sender)
-
- self.mock_connection.username = ""
- if exception:
- self.mock_connection.open().AndRaise(
- Exception('Test open Exception'))
- else:
- self.mock_connection.open()
- self.mock_connection.session().AndReturn(self.mock_session)
- expected_address = ('glance/notifications.%s ; '
- '{"node": {"x-declare": {"auto-delete": true, '
- '"durable": false}, "type": "topic"}, '
- '"create": "always"}' % priority)
- self.mock_session.sender(expected_address).AndReturn(
- self.mock_sender)
- if exception_send:
- self.mock_sender.send(mox.IgnoreArg()).AndRaise(
- Exception('Test send Exception'))
- # NOTE(afazekas): the opened and close call is expected
- # in this case, but not expected if the open fails
- else:
- self.mock_sender.send(mox.IgnoreArg())
- self.mock_connection.opened().AndReturn(True)
- self.mock_connection.close()
-
- self.mocker.ReplayAll()
-
- self.config(notifier_strategy="qpid")
- notifier = self.notify_qpid.QpidStrategy()
- if priority == 'info':
- if exception or exception_send:
- self.assertRaises(Exception, notifier.info, test_msg)
- else:
- notifier.info(test_msg)
- elif priority == 'warn':
- if exception or exception_send:
- self.assertRaises(Exception, notifier.warn, test_msg)
- else:
- notifier.warn(test_msg)
- elif priority == 'error':
- if exception or exception_send:
- self.assertRaises(Exception, notifier.error, test_msg)
- else:
- notifier.error(test_msg)
-
- self.mocker.VerifyAll()
-
- def test_info(self):
- self._test_notify('info')
-
- def test_warn(self):
- self._test_notify('warn')
-
- def test_error(self):
- self._test_notify('error')
-
- def test_exception_open_successful(self):
- self._test_notify('info', exception=True)
-
- def test_info_fail(self):
- self._test_notify('info', exception_send=True)
-
- def test_warn_fail(self):
- self._test_notify('warn', exception_send=True)
-
- def test_error_fail(self):
- self._test_notify('error', exception_send=True)
-
-
class TestRabbitContentType(utils.BaseTestCase):
"""Test AMQP/Rabbit notifier works."""