"""This module implements the Manager class."""
from __future__ import annotations
import argparse
import logging
import selectors
import time
from multiprocessing.connection import Connection
from typing import Any
from typing import cast
from typing import List
from typing import Sequence
from bqskit.runtime import default_manager_port
from bqskit.runtime import default_worker_port
from bqskit.runtime.address import RuntimeAddress
from bqskit.runtime.base import import_tests_package
from bqskit.runtime.base import parse_ipports
from bqskit.runtime.base import ServerBase
from bqskit.runtime.direction import MessageDirection
from bqskit.runtime.message import RuntimeMessage
from bqskit.runtime.result import RuntimeResult
from bqskit.runtime.task import RuntimeTask
[docs]
class Manager(ServerBase):
"""
BQSKit Runtime Manager.
A Manager is a middle node in the process hierarchy and is responsible for
managing workers or other managers. The manager is part of the detached
architecture. Here managers are started individually as separate processes,
which in turn start their own workers. Then, if necessary, more managers can
be started to manage the level-1 managers, and so on, until finally, a
detached server is started and clients can connect.
"""
[docs]
def __init__(
self,
port: int = default_manager_port,
num_workers: int = -1,
ipports: list[tuple[str, int]] | None = None,
worker_port: int = default_worker_port,
only_connect: bool = False,
) -> None:
"""
Create a manager instance in one of two ways:
1) Leave all options default and start a manager which spawns and
manages as many worker processes as there are os threads.
You can also specify the number of workers to spawn via
the `num_workers` parameter. In this mode, the manager
is a level-1 manager and manages workers.
2) Specify ip and port pairs, then assume at each endpoint there
is a listening manager and attempt to establish a connection.
In this mode, the manager will not spawn any workers and just
manage the specified managers.
In either case, if any problems arise during startup, no recovery
is attempted and the manager terminates.
Args:
port (int): The port this manager listens for server connections.
Default can be found in the
:obj:`~bqskit.runtime.default_manager_port` global variable.
num_workers (int): The number of workers to spawn. If -1,
then spawn as many workers as CPUs on the system.
(Default: -1). Ignored if `ipports` is not None.
ipports (list[tuple[str, int]] | None): If not None, then all
the addresses and ports of running managers to connect to.
worker_port (int): The port this server will listen for workers
on. Default can be found in the
:obj:`~bqskit.runtime.default_worker_port` global variable.
only_connect (bool): If true, do not spawn workers, only connect
to already spawned workers.
"""
super().__init__()
# Connect upstream
self.upstream = self.listen_once('0.0.0.0', port)
# Handshake with upstream
msg, payload = self.upstream.recv()
assert msg == RuntimeMessage.CONNECT
self.lower_id_bound = payload[0]
self.upper_id_bound = payload[1]
self.sel.register(
self.upstream,
selectors.EVENT_READ,
MessageDirection.ABOVE,
)
# Case 1: spawn and manage workers
if ipports is None:
if only_connect:
self.connect_to_workers(num_workers, worker_port)
else:
self.spawn_workers(num_workers, worker_port)
# Case 2: Connect to managers at ipports
else:
self.connect_to_managers(ipports)
# Track info on sent messages to reduce redundant messages:
self.last_num_idle_sent_up = self.total_workers
# Inform upstream we are starting
msg = (self.upstream, RuntimeMessage.STARTED, self.total_workers)
self.outgoing.put(msg)
self.logger.info('Sent start message upstream.')
[docs]
def handle_message(
self,
msg: RuntimeMessage,
direction: MessageDirection,
conn: Connection,
payload: Any,
) -> None:
"""Process the message coming from `direction`."""
if direction == MessageDirection.ABOVE:
if msg == RuntimeMessage.SUBMIT:
rtask = cast(RuntimeTask, payload)
self.schedule_tasks([rtask])
self.update_upstream_idle_workers()
elif msg == RuntimeMessage.SUBMIT_BATCH:
rtasks = cast(List[RuntimeTask], payload)
self.schedule_tasks(rtasks)
self.update_upstream_idle_workers()
elif msg == RuntimeMessage.RESULT:
result = cast(RuntimeResult, payload)
self.send_result_down(result)
elif msg == RuntimeMessage.CANCEL:
addr = cast(RuntimeAddress, payload)
self.broadcast_cancel(addr)
elif msg == RuntimeMessage.SHUTDOWN:
self.handle_shutdown()
else:
raise RuntimeError(f'Unexpected message type: {msg.name}')
elif direction == MessageDirection.BELOW:
if msg == RuntimeMessage.SUBMIT:
rtask = cast(RuntimeTask, payload)
self.send_up_or_schedule_tasks([rtask])
self.update_upstream_idle_workers()
elif msg == RuntimeMessage.SUBMIT_BATCH:
rtasks = cast(List[RuntimeTask], payload)
self.send_up_or_schedule_tasks(rtasks)
self.update_upstream_idle_workers()
elif msg == RuntimeMessage.RESULT:
result = cast(RuntimeResult, payload)
self.handle_result_from_below(result)
elif msg == RuntimeMessage.WAITING:
num_idle = cast(int, payload)
self.handle_waiting(conn, num_idle)
self.update_upstream_idle_workers()
elif msg == RuntimeMessage.UPDATE:
task_diff = cast(int, payload)
self.handle_update(conn, task_diff)
else:
# Forward all other messages up
self.outgoing.put((self.upstream, msg, payload))
else:
raise RuntimeError(f'Unexpected message from {direction.name}.')
[docs]
def handle_system_error(self, error_str: str) -> None:
"""
Handle an error in runtime code as opposed to client code.
This is called when an error arises in runtime code not in a
RuntimeTask's coroutine code.
"""
try:
self.upstream.send((RuntimeMessage.ERROR, error_str))
# Sleep to ensure server receives error message before shutdown
time.sleep(1)
except Exception:
# If server has crashed then just exit
pass
[docs]
def handle_shutdown(self) -> None:
"""Shutdown the manager and clean up spawned processes."""
super().handle_shutdown()
# Forward shutdown message upwards if possible
try:
self.upstream.send((RuntimeMessage.SHUTDOWN, None))
self.upstream.close()
except Exception:
# If server has already shutdown or crashed, just exit
pass
[docs]
def send_up_or_schedule_tasks(self, tasks: Sequence[RuntimeTask]) -> None:
"""Either send the tasks upstream or schedule them downstream."""
num_idle = self.num_idle_workers
if num_idle != 0:
self.outgoing.put((self.upstream, RuntimeMessage.UPDATE, num_idle))
self.schedule_tasks(tasks[:num_idle])
if len(tasks) > num_idle:
self.outgoing.put((
self.upstream,
RuntimeMessage.SUBMIT_BATCH,
tasks[num_idle:],
))
[docs]
def handle_result_from_below(self, result: RuntimeResult) -> None:
"""Forward the result to its destination and track the completion."""
# Record a task has been completed
self.get_employee_responsible_for(result.completed_by).num_tasks -= 1
# Forward result to final destination
if self.is_my_worker(result.return_address.worker_id):
self.send_result_down(result)
self.outgoing.put((self.upstream, RuntimeMessage.UPDATE, -1))
else:
# If its destination worker is not an employee of mine,
# then my boss will know where to send this result.
self.outgoing.put((self.upstream, RuntimeMessage.RESULT, result))
[docs]
def update_upstream_idle_workers(self) -> None:
"""Update the total number of idle workers upstream."""
if self.num_idle_workers != self.last_num_idle_sent_up:
self.last_num_idle_sent_up = self.num_idle_workers
m = (self.upstream, RuntimeMessage.WAITING, self.num_idle_workers)
self.outgoing.put(m)
[docs]
def handle_update(self, conn: Connection, task_diff: int) -> None:
"""Handle a task count update from a lower level manager or worker."""
self.conn_to_employee_dict[conn].num_tasks += task_diff
self.outgoing.put((self.upstream, RuntimeMessage.UPDATE, task_diff))
[docs]
def start_manager() -> None:
"""Entry point for runtime manager processes."""
parser = argparse.ArgumentParser(
prog='bqskit-manager',
description='Launch a BQSKit runtime manager process.',
)
parser.add_argument(
'-n', '--num-workers',
default=-1,
type=int,
help='The number of workers to spawn. If negative, will spawn'
' one worker for each available CPU. Defaults to -1.',
)
parser.add_argument(
'-m', '--managers',
nargs='+',
help='The ip and port pairs were managers are expected to be waiting.',
)
parser.add_argument(
'-p', '--port',
type=int,
default=default_manager_port,
help='The port this manager will listen for servers on.',
)
parser.add_argument(
'-w', '--worker-port',
type=int,
default=default_worker_port,
help='The port this manager will listen for workers on.',
)
parser.add_argument(
'-x', '--only-connect',
action='store_true',
help='Do not spawn workers, only connect to them.',
)
parser.add_argument(
'-v', '--verbose',
action='count',
default=0,
help='Enable logging of increasing verbosity, either -v, -vv, or -vvv.',
)
parser.add_argument(
'-i', '--import-tests',
action='store_true',
help='Import the bqskit tests package; used during testing.',
)
args = parser.parse_args()
# If ips and ports were provided parse them
ipports = None if args.managers is None else parse_ipports(args.managers)
# Set up logging
_logger = logging.getLogger('bqskit-runtime')
_logger.setLevel([30, 20, 10, 1][min(args.verbose, 3)])
_logger.addHandler(logging.StreamHandler())
# Import tests package recursively
if args.import_tests:
import_tests_package()
if args.num_workers < -1:
num_workers = -1
else:
num_workers = args.num_workers
# Create the manager
manager = Manager(
args.port,
num_workers,
ipports,
args.worker_port,
args.only_connect,
)
# Start the manager
manager.run()