from __future__ import annotations
from abc import abstractmethod, ABC
from collections import Counter
from collections.abc import Iterable as IterableBase
from typing import (
Protocol,
Any,
Union,
NamedTuple,
Set,
Dict,
Optional,
Iterable,
List,
Sequence,
cast,
runtime_checkable,
)
import weakref
from weakref import ReferenceType
import numpy
from .sorted_list import SortedList
from .adapter_base import BufferAdapter, QueueAdapter
from .buffer import Buffer
from .array import Array
from .context import BoundDevice
from .queue import Queue
@runtime_checkable
class VirtualAllocations(Protocol):
def __virtual_allocations__(self) -> Sequence[Union[Array, Buffer]]: # pragma: no cover
...
def extract_dependencies(dependencies: Any) -> Set[int]:
"""
Recursively extracts allocation identifiers from an iterable or an ``Array`` object.
"""
if isinstance(dependencies, VirtualBufferAdapter):
return {dependencies._id}
if isinstance(dependencies, Buffer):
return extract_dependencies(dependencies._buffer_adapter)
if isinstance(dependencies, Array):
return extract_dependencies(dependencies.data)
if isinstance(dependencies, IterableBase):
results = set()
for dep in dependencies:
results.update(extract_dependencies(dep))
return results
if isinstance(dependencies, VirtualAllocations):
# a hook for exposing nested virtual allocations in arbitrary classes
return extract_dependencies(dependencies.__virtual_allocations__)
return set()
[docs]class VirtualAllocator:
"""
A helper callable object to use as an allocator
for :py:class:`~grunnur.Array` creation.
Encapsulates the dependencies (as identifiers, doesn't hold references for actual objects).
"""
def __init__(self, manager: "VirtualManager", dependencies: Set[int]):
self.manager = manager
self.dependencies = dependencies
def __call__(self, device: BoundDevice, size: int) -> Buffer:
# TODO: seems redundant to pass a device here,
# but it must mimic the API of ``Buffer.allocate()``.
if device != self.manager.device:
raise ValueError(
f"This allocator is attached to device {self.manager.device}, "
f"but was asked to allocate on {device}"
)
return self.manager._allocate_virtual(size, self.dependencies)
class VirtualBufferAdapter(BufferAdapter):
"""
A virtual buffer object.
"""
def __init__(
self, manager: "VirtualManager", size: int, id_: int, buffer_adapter: BufferAdapter
):
self._manager = manager
self._id = id_
self._size = size
self._real_buffer_adapter = buffer_adapter
@property
def kernel_arg(self) -> Any:
return self._real_buffer_adapter.kernel_arg
def get_sub_region(self, origin: int, size: int) -> BufferAdapter:
# FIXME: how to handle it?
raise NotImplementedError("Virtual buffers do not support subregions")
def get(
self,
queue_adapter: QueueAdapter,
host_array: "numpy.ndarray[Any, numpy.dtype[Any]]",
async_: bool = False,
) -> None:
return self._real_buffer_adapter.get(queue_adapter, host_array, async_=async_)
def set(
self,
queue_adapter: QueueAdapter,
source: Union["numpy.ndarray[Any, numpy.dtype[Any]]", BufferAdapter],
no_async: bool = False,
) -> None:
return self._real_buffer_adapter.set(queue_adapter, source, no_async=no_async)
@property
def offset(self) -> int:
return 0
@property
def size(self) -> int:
return self._size
def _set_real_buffer_adapter(self, buf: Buffer) -> None:
self._real_buffer_adapter = buf._buffer_adapter
[docs]class VirtualManager(ABC):
"""
Base class for a manager of virtual allocations.
:param context: an instance of :py:class:`~grunnur.Context`.
"""
def __init__(self, device: BoundDevice):
self.device = device
self._id_counter = 0
self._virtual_buffers: Dict[int, ReferenceType[VirtualBufferAdapter]] = {}
[docs] def allocator(self, dependencies: Optional[Any] = None) -> VirtualAllocator:
"""
Create a callable to use for :py:class:`~grunnur.Array` creation.
:param dependencies: can be a :py:class:`~grunnur.Array` instance
(the ones containing persistent allocations will be ignored),
an iterable with valid values,
or an object with the attribute ``__virtual_allocations__`` which is a valid value
(the last two will be processed recursively).
"""
dependencies = extract_dependencies(dependencies)
return VirtualAllocator(self, dependencies)
def _allocate_virtual(self, size: int, dependencies: Set[int]) -> Buffer:
new_id = self._id_counter
self._id_counter += 1
if not dependencies.issubset(self._virtual_buffers):
missing_deps = dependencies.difference(self._virtual_buffers)
missing_deps_str = ", ".join(str(dep) for dep in missing_deps)
raise ValueError(
f"Some of the declared dependencies (with IDs {missing_deps}) do not exist"
)
rbuf = self._allocate_specific(new_id, size, dependencies)
vbuf = VirtualBufferAdapter(self, size, new_id, rbuf._buffer_adapter)
self._virtual_buffers[new_id] = weakref.ref(vbuf, lambda _: self._free(new_id))
return Buffer(self.device, vbuf)
def _update_buffer(self, id_: int) -> None:
vbuf = self._virtual_buffers[id_]()
assert vbuf is not None
buf = self._get_real_buffer(id_)
vbuf._set_real_buffer_adapter(buf)
def _update_all(self) -> None:
for id_ in self._virtual_buffers:
self._update_buffer(id_)
def _free(self, id_: int) -> None:
del self._virtual_buffers[id_]
self._free_specific(id_)
[docs] def pack(self, queue: Queue) -> None:
"""
Packs the real allocations possibly reducing total memory usage.
This process can be slow and may synchronize the base queue.
"""
self._pack_specific(queue)
self._update_all()
[docs] def statistics(self) -> "VirtualAllocationStatistics":
"""
Returns allocation statistics.
"""
return VirtualAllocationStatistics(
self._real_buffers(),
# cast() to override inference here - we know that vb() will not return `None`
[cast(VirtualBufferAdapter, vb()) for vb in self._virtual_buffers.values()],
)
@abstractmethod
def _allocate_specific(self, new_id: int, size: int, dependencies: Set[int]) -> Buffer:
pass
@abstractmethod
def _get_real_buffer(self, id_: int) -> Buffer:
pass
@abstractmethod
def _real_buffers(self) -> List[Buffer]:
pass
@abstractmethod
def _free_specific(self, id_: int) -> None:
pass
@abstractmethod
def _pack_specific(self, queue: Queue) -> None:
pass
[docs]class VirtualAllocationStatistics:
"""
Virtual allocation details.
"""
real_size_total: int
"""The total size of physical allocations (in bytes)."""
real_num: int
"""The number of physical allocations."""
real_sizes: Dict[int, int]
"""A dictionary ``size: count`` with the counts for physical allocations of each size."""
virtual_size_total: int
"""The total size of virtual allocations (in bytes)."""
virtual_num: int
"""The number of virtual allocations."""
virtual_sizes: Dict[int, int]
"""A dictionary ``size: count`` with the counts for virtual allocations of each size."""
def __init__(
self, real_buffers: Iterable[Buffer], virtual_buffers: Iterable[VirtualBufferAdapter]
):
real_sizes = [rb.size for rb in real_buffers]
virtual_sizes = [vb.size for vb in virtual_buffers]
self.real_size_total: int = sum(real_sizes)
self.real_num: int = len(real_sizes)
self.real_sizes: Dict[int, int] = dict(Counter(real_sizes))
self.virtual_size_total: int = sum(virtual_sizes)
self.virtual_num: int = len(virtual_sizes)
self.virtual_sizes: Dict[int, int] = dict(Counter(virtual_sizes))
def __str__(self) -> str:
real_buffers = ", ".join(f"{num}x{size}b" for size, num in sorted(self.real_sizes.items()))
virtual_buffers = ", ".join(
f"{num}x{size}b" for size, num in sorted(self.virtual_sizes.items())
)
return (
f"VirtualAllocationStatistics("
f"real: {self.real_num} allocs, "
f"total size {self.real_size_total}b, "
f"buffers: {real_buffers}; "
f"virtual: {self.virtual_num} allocs, "
f"total size {self.virtual_size_total}b, "
f"buffers: {virtual_buffers})"
)
[docs]class TrivialManager(VirtualManager):
"""
Trivial manager --- allocates a separate buffer for each allocation request.
"""
def __init__(self, device: BoundDevice):
VirtualManager.__init__(self, device)
self._rbuffers: Dict[int, Buffer] = {}
def _allocate_specific(self, new_id: int, size: int, _dependencies: Set[int]) -> Buffer:
buf = Buffer.allocate(self.device, size)
self._rbuffers[new_id] = buf
return buf
def _get_real_buffer(self, id_: int) -> Buffer:
return self._rbuffers[id_]
def _free_specific(self, id_: int) -> None:
del self._rbuffers[id_]
def _pack_specific(self, queue: Queue) -> None:
pass
def _real_buffers(self) -> List[Buffer]:
return list(self._rbuffers.values())
[docs]class ZeroOffsetManager(VirtualManager):
"""
Tries to assign several allocation requests to a single real allocation,
if dependencies allow that.
All virtual allocations start from the beginning of real allocations.
"""
class VirtualAllocation(NamedTuple):
size: int
dependencies: Set[int]
class RealAllocation(NamedTuple):
buffer: Buffer
virtual_ids: Set[int]
class RealSize(NamedTuple):
size: int
real_id: int
class VirtualMapping(NamedTuple):
real_id: int
sub_region: Buffer
def __init__(self, device: BoundDevice):
VirtualManager.__init__(self, device)
self._virtual_allocations: Dict[int, ZeroOffsetManager.VirtualAllocation] = {}
self._real_sizes = SortedList[ZeroOffsetManager.RealSize]((), key=lambda x: x.size)
self._virtual_to_real: Dict[int, ZeroOffsetManager.VirtualMapping] = {}
self._real_allocations: Dict[int, ZeroOffsetManager.RealAllocation] = {}
self._real_id_counter = 0
def _allocate_specific(self, new_id: int, size: int, dependencies: Set[int]) -> Buffer:
# Dependencies should be bidirectional.
# So if some new allocation says it depends on earlier ones,
# we need to update their dependency lists.
for dep in dependencies:
self._virtual_allocations[dep].dependencies.add(new_id)
# Save virtual allocation parameters
self._virtual_allocations[new_id] = self.VirtualAllocation(size, dependencies)
# Find a real allocation using the greedy algorithm.
return self._fast_add(new_id, size, dependencies)
def _fast_add(self, new_id: int, size: int, dependencies: Set[int]) -> Buffer:
"""
Greedy algorithm to find a real allocation for a given virtual allocation.
"""
# Find the smallest real allocation which can hold the requested virtual allocation.
try:
idx_start = self._real_sizes.argfind_ge(size)
except ValueError:
idx_start = len(self._real_sizes)
# Check all real allocations with suitable sizes, starting from the smallest one.
# Use the first real allocation which does not contain ``new_id``'s dependencies.
for idx in range(idx_start, len(self._real_sizes)):
real_id = self._real_sizes[idx].real_id
buf = self._real_allocations[real_id].buffer
virtual_ids = self._real_allocations[real_id].virtual_ids
if virtual_ids.isdisjoint(dependencies):
virtual_ids.add(new_id)
break
else:
# If no suitable real allocation is found, create a new one.
buf = Buffer.allocate(self.device, size)
real_id = self._real_id_counter
self._real_id_counter += 1
self._real_allocations[real_id] = self.RealAllocation(buf, set([new_id]))
self._real_sizes.insert(self.RealSize(size, real_id))
# TODO: Here it would be more appropriate to use buffer.get_sub_region(0, size),
# but OpenCL does not allow several overlapping subregions to be used in a single kernel
# for both read and write, which ruins the whole idea.
# So we are passing full buffers and hope that the Array class takes care of sizes.
self._virtual_to_real[new_id] = self.VirtualMapping(
real_id, self._real_allocations[real_id].buffer
)
return buf
def _get_real_buffer(self, id_: int) -> Buffer:
return self._virtual_to_real[id_].sub_region
def _free_specific(self, id_: int) -> None:
# Remove the allocation from the dependency lists of its dependencies
dependencies = self._virtual_allocations[id_].dependencies
for dep in dependencies:
self._virtual_allocations[dep].dependencies.remove(id_)
vtr = self._virtual_to_real[id_]
# Clear virtual allocation data
del self._virtual_allocations[id_]
del self._virtual_to_real[id_]
# Fast and non-optimal free.
# Remove the virtual allocation from the real allocation,
# and delete the real allocation if its no longer used by other virtual allocations.
ra = self._real_allocations[vtr.real_id]
ra.virtual_ids.remove(id_)
if len(ra.virtual_ids) == 0:
del self._real_allocations[vtr.real_id]
self._real_sizes.remove(self.RealSize(ra.buffer.size, vtr.real_id))
def _pack_specific(self, queue: Queue) -> None:
"""
Full memory re-pack.
In theory, should find the optimal (with the minimal real allocation size) distribution
of virtual allocations.
"""
# Need to synchronize, because we are going to change allocation addresses,
# and we do not want to free the memory some kernel is reading from.
queue.synchronize()
# Clear all real allocation data.
self._real_sizes = SortedList((), key=lambda x: x.size)
self._real_allocations = {}
self._real_id_counter = 0
va = self._virtual_allocations
# Sort all virtual allocations by size
virtual_sizes = sorted([(va[id_].size, id_) for id_ in va], key=lambda x: x[0])
# Application of greedy algorithm for virtual allocations starting from the largest one
# should give the optimal distribution.
for size, id_ in reversed(virtual_sizes):
self._fast_add(id_, size, self._virtual_allocations[id_].dependencies)
def _real_buffers(self) -> List[Buffer]:
return [ra.buffer for ra in self._real_allocations.values()]