Source code for grunnur.queue
from __future__ import annotations
from abc import ABC
from typing import Optional, Iterable, Sequence, Tuple, Set, Dict
import weakref
from .adapter_base import QueueAdapter
from .context import BoundDevice, BoundMultiDevice
from .device import Device
[docs]class MultiQueue:
"""
A queue on multiple devices.
"""
devices: BoundMultiDevice
"""Multi-device on which this queue operates."""
queues: Dict[BoundDevice, "Queue"]
"""Single-device queues associated with device indices."""
[docs] @classmethod
def on_devices(cls, devices: Iterable[BoundDevice]) -> "MultiQueue":
"""
Creates a queue from provided devices (belonging to the same context).
"""
return cls([Queue(device) for device in devices])
def __init__(self, queues: Sequence["Queue"]):
"""
:param queues: single-device queues (must belong to distinct devices and the same context).
"""
self.devices = BoundMultiDevice.from_bound_devices([queue.device for queue in queues])
self.queues = {queue.device: queue for queue in queues}
[docs] def synchronize(self) -> None:
"""
Blocks until queues on all devices are empty.
"""
for queue in self.queues.values():
queue.synchronize()
[docs]class Queue:
"""
A queue on a single device.
"""
device: BoundDevice
"""Device on which this queue operates."""
def __init__(self, device: BoundDevice):
"""
:param device: a device on which to create a queue.
"""
self.device = device
self._queue_adapter = device.context._context_adapter.make_queue_adapter(
device._device_adapter
)
[docs] def synchronize(self) -> None:
"""
Blocks until sub-queues on all devices are empty.
"""
self._queue_adapter.synchronize()