from__future__importannotationsimportweakreffromabcimportABCfromtypingimportTYPE_CHECKINGfrom._adapter_baseimportQueueAdapterfrom._contextimportBoundDevice,BoundMultiDevicefrom._deviceimportDeviceifTYPE_CHECKING:# pragma: no coverfromcollections.abcimportIterable,Sequence
[docs]classMultiQueue:"""A queue on multiple devices."""devices:BoundMultiDevice"""Multi-device on which this queue operates."""queues:dict[BoundDevice,Queue]"""Single-device queues associated with device indices."""
[docs]@classmethoddefon_devices(cls,devices:Iterable[BoundDevice])->MultiQueue:"""Creates a queue from provided devices (belonging to the same context)."""returncls([Queue(device)fordeviceindevices])
def__init__(self,queues:Sequence[Queue]):""" :param queues: single-device queues (must belong to distinct devices and the same context). """self.devices=BoundMultiDevice.from_bound_devices([queue.deviceforqueueinqueues])self.queues={queue.device:queueforqueueinqueues}
[docs]defsynchronize(self)->None:"""Blocks until queues on all devices are empty."""forqueueinself.queues.values():queue.synchronize()
[docs]classQueue:"""A queue on a single device."""device:BoundDevice"""Device on which this queue operates."""def__init__(self,device:BoundDevice):""":param device: a device on which to create a queue."""self.device=deviceself._queue_adapter=device.context._context_adapter.make_queue_adapter(# noqa: SLF001device._device_adapter# noqa: SLF001)
[docs]defsynchronize(self)->None:"""Blocks until sub-queues on all devices are empty."""self._queue_adapter.synchronize()