Problem: Exchange can fail silently (#2177)
Solution: Raise a RuntimeError if Exchange is running and a new subscriber queue is requested.
This commit is contained in:
parent
cc88b4d286
commit
6181e52dbd
|
@ -1,3 +1,4 @@
|
|||
from queue import Empty
|
||||
from collections import defaultdict
|
||||
from multiprocessing import Queue
|
||||
|
||||
|
@ -41,6 +42,7 @@ class Exchange:
|
|||
|
||||
def __init__(self):
|
||||
self.publisher_queue = Queue()
|
||||
self.started_queue = Queue()
|
||||
|
||||
# Map <event_types -> queues>
|
||||
self.queues = defaultdict(list)
|
||||
|
@ -60,7 +62,16 @@ class Exchange:
|
|||
|
||||
Returns:
|
||||
a :class:`multiprocessing.Queue`.
|
||||
Raises:
|
||||
RuntimeError if called after `run`
|
||||
"""
|
||||
|
||||
try:
|
||||
self.started_queue.get_nowait()
|
||||
raise RuntimeError('Cannot create a new subscriber queue while Exchange is running.')
|
||||
except Empty:
|
||||
pass
|
||||
|
||||
if event_types is None:
|
||||
event_types = EventTypes.ALL
|
||||
|
||||
|
@ -83,6 +94,7 @@ class Exchange:
|
|||
|
||||
def run(self):
|
||||
"""Start the exchange"""
|
||||
self.started_queue.put('STARTED')
|
||||
|
||||
while True:
|
||||
event = self.publisher_queue.get()
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
import pytest
|
||||
pytestmark = pytest.mark.tendermint
|
||||
|
||||
|
||||
def test_event_handler():
|
||||
from bigchaindb.events import EventTypes, Event, Exchange
|
||||
|
||||
|
@ -35,6 +39,18 @@ def test_event_handler():
|
|||
assert sub3.qsize() == 0
|
||||
|
||||
|
||||
def test_event_handler_raises_when_called_after_start():
|
||||
from bigchaindb.events import Exchange, POISON_PILL
|
||||
|
||||
exchange = Exchange()
|
||||
publisher_queue = exchange.get_publisher_queue()
|
||||
publisher_queue.put(POISON_PILL)
|
||||
exchange.run()
|
||||
|
||||
with pytest.raises(RuntimeError):
|
||||
exchange.get_subscriber_queue()
|
||||
|
||||
|
||||
def test_exchange_stops_with_poison_pill():
|
||||
from bigchaindb.events import EventTypes, Event, Exchange, POISON_PILL
|
||||
|
||||
|
|
Loading…
Reference in New Issue