mirror of
https://github.com/bigchaindb/bigchaindb.git
synced 2024-06-26 11:16:44 +02:00
Problem: processes don't have an intelligible name (#2078)
Solution: when a process starts, update the "process title" as well.
This commit is contained in:
parent
d5f45d4094
commit
14d523efd5
|
@ -7,7 +7,6 @@ import pickle
|
||||||
from socketserver import StreamRequestHandler, ThreadingTCPServer
|
from socketserver import StreamRequestHandler, ThreadingTCPServer
|
||||||
import struct
|
import struct
|
||||||
import sys
|
import sys
|
||||||
from multiprocessing import Process
|
|
||||||
|
|
||||||
from .configs import (
|
from .configs import (
|
||||||
DEFAULT_SOCKET_LOGGING_HOST,
|
DEFAULT_SOCKET_LOGGING_HOST,
|
||||||
|
@ -15,6 +14,7 @@ from .configs import (
|
||||||
PUBLISHER_LOGGING_CONFIG,
|
PUBLISHER_LOGGING_CONFIG,
|
||||||
SUBSCRIBER_LOGGING_CONFIG,
|
SUBSCRIBER_LOGGING_CONFIG,
|
||||||
)
|
)
|
||||||
|
from bigchaindb.utils import Process
|
||||||
from bigchaindb.common.exceptions import ConfigurationError
|
from bigchaindb.common.exceptions import ConfigurationError
|
||||||
|
|
||||||
|
|
||||||
|
@ -46,6 +46,7 @@ def setup_sub_logger(*, user_log_config=None):
|
||||||
server = LogRecordSocketServer(**kwargs)
|
server = LogRecordSocketServer(**kwargs)
|
||||||
with server:
|
with server:
|
||||||
server_proc = Process(
|
server_proc = Process(
|
||||||
|
name='logging_server',
|
||||||
target=server.serve_forever,
|
target=server.serve_forever,
|
||||||
kwargs={'log_config': user_log_config},
|
kwargs={'log_config': user_log_config},
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,14 +1,16 @@
|
||||||
import logging
|
import logging
|
||||||
import subprocess
|
import subprocess
|
||||||
import multiprocessing as mp
|
|
||||||
from os import getenv
|
from os import getenv
|
||||||
|
|
||||||
|
import setproctitle
|
||||||
|
|
||||||
import bigchaindb
|
import bigchaindb
|
||||||
from bigchaindb.tendermint.lib import BigchainDB
|
from bigchaindb.tendermint.lib import BigchainDB
|
||||||
from bigchaindb.tendermint.core import App
|
from bigchaindb.tendermint.core import App
|
||||||
from bigchaindb.web import server, websocket_server
|
from bigchaindb.web import server, websocket_server
|
||||||
from bigchaindb.tendermint import event_stream
|
from bigchaindb.tendermint import event_stream
|
||||||
from bigchaindb.events import Exchange, EventTypes
|
from bigchaindb.events import Exchange, EventTypes
|
||||||
|
from bigchaindb.utils import Process
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -28,7 +30,6 @@ BANNER = """
|
||||||
|
|
||||||
|
|
||||||
def start():
|
def start():
|
||||||
|
|
||||||
# Exchange object for event stream api
|
# Exchange object for event stream api
|
||||||
exchange = Exchange()
|
exchange = Exchange()
|
||||||
|
|
||||||
|
@ -37,7 +38,7 @@ def start():
|
||||||
settings=bigchaindb.config['server'],
|
settings=bigchaindb.config['server'],
|
||||||
log_config=bigchaindb.config['log'],
|
log_config=bigchaindb.config['log'],
|
||||||
bigchaindb_factory=BigchainDB)
|
bigchaindb_factory=BigchainDB)
|
||||||
p_webapi = mp.Process(name='webapi', target=app_server.run)
|
p_webapi = Process(name='webapi', target=app_server.run)
|
||||||
p_webapi.start()
|
p_webapi.start()
|
||||||
|
|
||||||
# start message
|
# start message
|
||||||
|
@ -51,15 +52,15 @@ def start():
|
||||||
])
|
])
|
||||||
|
|
||||||
# start websocket server
|
# start websocket server
|
||||||
p_websocket_server = mp.Process(name='ws',
|
p_websocket_server = Process(name='ws',
|
||||||
target=websocket_server.start,
|
target=websocket_server.start,
|
||||||
args=(exchange.get_subscriber_queue(EventTypes.BLOCK_VALID),))
|
args=(exchange.get_subscriber_queue(EventTypes.BLOCK_VALID),))
|
||||||
p_websocket_server.start()
|
p_websocket_server.start()
|
||||||
|
|
||||||
# connect to tendermint event stream
|
# connect to tendermint event stream
|
||||||
p_websocket_client = mp.Process(name='ws_to_tendermint',
|
p_websocket_client = Process(name='ws_to_tendermint',
|
||||||
target=event_stream.start,
|
target=event_stream.start,
|
||||||
args=(exchange.get_publisher_queue(),))
|
args=(exchange.get_publisher_queue(),))
|
||||||
p_websocket_client.start()
|
p_websocket_client.start()
|
||||||
|
|
||||||
# We need to import this after spawning the web server
|
# We need to import this after spawning the web server
|
||||||
|
@ -67,6 +68,8 @@ def start():
|
||||||
# for gevent.
|
# for gevent.
|
||||||
from abci import ABCIServer
|
from abci import ABCIServer
|
||||||
|
|
||||||
|
setproctitle.setproctitle('bigchaindb')
|
||||||
|
|
||||||
app = ABCIServer(app=App())
|
app = ABCIServer(app=App())
|
||||||
app.run()
|
app.run()
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,8 @@ import threading
|
||||||
import queue
|
import queue
|
||||||
import multiprocessing as mp
|
import multiprocessing as mp
|
||||||
|
|
||||||
|
import setproctitle
|
||||||
|
|
||||||
|
|
||||||
class ProcessGroup(object):
|
class ProcessGroup(object):
|
||||||
|
|
||||||
|
@ -26,6 +28,16 @@ class ProcessGroup(object):
|
||||||
self.processes.append(proc)
|
self.processes.append(proc)
|
||||||
|
|
||||||
|
|
||||||
|
class Process(mp.Process):
|
||||||
|
"""Wrapper around multiprocessing.Process that uses
|
||||||
|
setproctitle to set the name of the process when running
|
||||||
|
the target task."""
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
setproctitle.setproctitle(self.name)
|
||||||
|
super().run()
|
||||||
|
|
||||||
|
|
||||||
# Inspired by:
|
# Inspired by:
|
||||||
# - http://stackoverflow.com/a/24741694/597097
|
# - http://stackoverflow.com/a/24741694/597097
|
||||||
def pool(builder, size, timeout=None):
|
def pool(builder, size, timeout=None):
|
||||||
|
|
1
setup.py
1
setup.py
|
@ -84,6 +84,7 @@ install_requires = [
|
||||||
'aiohttp~=2.3',
|
'aiohttp~=2.3',
|
||||||
'python-rapidjson-schema==0.1.1',
|
'python-rapidjson-schema==0.1.1',
|
||||||
'abci~=0.3.0',
|
'abci~=0.3.0',
|
||||||
|
'setproctitle~=1.1.0',
|
||||||
]
|
]
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
|
|
|
@ -3,6 +3,8 @@ from unittest.mock import patch, call
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
pytestmark = pytest.mark.tendermint
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def mock_queue(monkeypatch):
|
def mock_queue(monkeypatch):
|
||||||
|
@ -157,3 +159,18 @@ def test_lazy_execution():
|
||||||
lz.name.upper()
|
lz.name.upper()
|
||||||
result = lz.run(cat)
|
result = lz.run(cat)
|
||||||
assert result == 'SHMUI'
|
assert result == 'SHMUI'
|
||||||
|
|
||||||
|
|
||||||
|
def test_process_set_title():
|
||||||
|
from uuid import uuid4
|
||||||
|
from multiprocessing import Queue
|
||||||
|
from setproctitle import getproctitle
|
||||||
|
from bigchaindb.utils import Process
|
||||||
|
|
||||||
|
queue = Queue()
|
||||||
|
uuid = str(uuid4())
|
||||||
|
|
||||||
|
process = Process(target=lambda: queue.put(getproctitle()),
|
||||||
|
name=uuid)
|
||||||
|
process.start()
|
||||||
|
assert queue.get() == uuid
|
||||||
|
|
Loading…
Reference in New Issue
Block a user