Source code for grunnur.adapter_base

from abc import ABC, abstractmethod
from enum import Enum
from typing import (
    Any,
    Tuple,
    Mapping,
    Iterable,
    Sequence,
    List,
    Mapping,
    TypeVar,
    Union,
    Optional,
)

import numpy

from .array_metadata import ArrayMetadataLike


[docs]class DeviceType(Enum): """ An enum representing a device's type. """ CPU = 1 "CPU type" GPU = 2 "GPU type"
[docs]class APIID: """ An ID of an :py:class:`~grunnur.API` object. """ shortcut: str """This API's shortcut.""" def __init__(self, shortcut: str): self.shortcut = shortcut def __eq__(self, other: Any) -> bool: return type(self) == type(other) and self.shortcut == other.shortcut def __hash__(self) -> int: return hash((type(self), self.shortcut)) def __str__(self) -> str: return f"id({self.shortcut})"
class APIAdapterFactory(ABC): """ A helper class that allows handling cases when an API's backend is unavailable or temporarily replaced by a mock object. """ @property @abstractmethod def api_id(self) -> "APIID": pass @property @abstractmethod def available(self) -> bool: pass @abstractmethod def make_api_adapter(self) -> "APIAdapter": pass class APIAdapter(ABC): @property @abstractmethod def id(self) -> APIID: pass @property @abstractmethod def platform_count(self) -> int: pass @abstractmethod def get_platform_adapters(self) -> Tuple["PlatformAdapter", ...]: pass @abstractmethod def isa_backend_device(self, obj: Any) -> bool: pass @abstractmethod def isa_backend_platform(self, obj: Any) -> bool: pass @abstractmethod def isa_backend_context(self, obj: Any) -> bool: pass @abstractmethod def make_device_adapter(self, backend_device: Any) -> "DeviceAdapter": pass @abstractmethod def make_platform_adapter(self, backend_platform: Any) -> "PlatformAdapter": pass @abstractmethod def make_context_adapter_from_device_adapters( self, device_adapters: Sequence["DeviceAdapter"] ) -> "ContextAdapter": pass @abstractmethod def make_context_adapter_from_backend_contexts( self, backend_contexts: Sequence[Any], take_ownership: bool ) -> "ContextAdapter": pass def __eq__(self, other: Any) -> bool: return type(self) == type(other) and self.id == other.id def __hash__(self) -> int: return hash((type(self), self.id)) class PlatformAdapter(ABC): @property @abstractmethod def api_adapter(self) -> APIAdapter: pass @property @abstractmethod def platform_idx(self) -> int: pass @property @abstractmethod def name(self) -> str: pass @property @abstractmethod def vendor(self) -> str: pass @property @abstractmethod def version(self) -> str: pass @property @abstractmethod def device_count(self) -> int: pass @abstractmethod def get_device_adapters(self) -> Tuple["DeviceAdapter", ...]: pass class DeviceAdapter(ABC): @property @abstractmethod def platform_adapter(self) -> PlatformAdapter: pass @property @abstractmethod def device_idx(self) -> int: pass @property @abstractmethod def name(self) -> str: pass @property @abstractmethod def params(self) -> "DeviceParameters": pass
[docs]class DeviceParameters(ABC): """ An object containing device's specifications. """ @property @abstractmethod def type(self) -> DeviceType: """ Device type. """ pass @property @abstractmethod def max_total_local_size(self) -> int: """ The maximum total number of threads in one block (CUDA), or work items in one work group (OpenCL). """ pass @property @abstractmethod def max_local_sizes(self) -> Tuple[int, ...]: """ The maximum number of threads in one block (CUDA), or work items in one work group (OpenCL) for each of the available dimensions. """ pass @property @abstractmethod def warp_size(self) -> int: """ The number of threads (CUDA)/work items (OpenCL) that are executed synchronously (within one multiprocessor/compute unit). """ pass @property @abstractmethod def max_num_groups(self) -> Tuple[int, ...]: """ The maximum number of blocks (CUDA)/work groups (OpenCL) for each of the available dimensions. """ pass @property @abstractmethod def local_mem_size(self) -> int: """ The size of shared (CUDA)/local (OpenCL) memory (in bytes). """ pass @property @abstractmethod def local_mem_banks(self) -> int: """ The number of independent channels for shared (CUDA)/local (OpenCL) memory, which can be used from one warp without request serialization. """ pass @property @abstractmethod def compute_units(self) -> int: """ The number of multiprocessors (CUDA)/compute units (OpenCL) for the device. """ pass
class ContextAdapter(ABC): @property @abstractmethod def device_adapters(self) -> Mapping[int, DeviceAdapter]: pass @property @abstractmethod def device_order(self) -> List[int]: pass @abstractmethod def make_queue_adapter(self, device_adapter: DeviceAdapter) -> "QueueAdapter": pass @abstractmethod def allocate(self, device_adapter: DeviceAdapter, size: int) -> "BufferAdapter": pass @staticmethod @abstractmethod def render_prelude(fast_math: bool = False) -> str: """ Renders the prelude allowing one to write kernels compiling both in CUDA and OpenCL. :param fast_math: whether the compilation with fast math is requested. """ pass @abstractmethod def compile_single_device( self, device_adapter: DeviceAdapter, prelude: str, src: str, keep: bool = False, fast_math: bool = False, compiler_options: Optional[Sequence[str]] = None, constant_arrays: Optional[Mapping[str, ArrayMetadataLike]] = None, ) -> "ProgramAdapter": """ Compiles the given source with the given prelude on a single device. :param device_idx: the number of the device to compile on. :param prelude: the source of the prelude to prepend to the main source. :param src: the source of the kernels to be compiled. :param keep: see :py:meth:`compile`. :param fast_math: see :py:meth:`compile`. :param compiler_options: see :py:meth:`compile`. :param constant_arrays: (**CUDA only**) see :py:meth:`compile`. """ pass def deactivate(self) -> None: """ For CUDA API: deactivates this context, popping all the CUDA context objects from the stack. Other APIs: no effect. """ pass class BufferAdapter(ABC): """ A memory buffer on the device. """ @property @abstractmethod def kernel_arg(self) -> Any: pass @property @abstractmethod def size(self) -> int: """ This buffer's size (in bytes). """ pass @property @abstractmethod def offset(self) -> int: """ This buffer's offset from the start of the physical memory allocation (will be non-zero for buffers created using :py:meth:`get_sub_region`). """ pass @abstractmethod def get_sub_region(self, origin: int, size: int) -> "BufferAdapter": """ Returns a buffer sub-region starting at ``origin`` and of length ``size`` (in bytes). """ pass @abstractmethod def set( self, queue_adapter: "QueueAdapter", source: Union["numpy.ndarray[Any, numpy.dtype[Any]]", "BufferAdapter"], no_async: bool = False, ) -> None: pass @abstractmethod def get( self, queue_adapter: "QueueAdapter", host_array: "numpy.ndarray[Any, numpy.dtype[Any]]", async_: bool = False, ) -> None: pass class QueueAdapter(ABC): @abstractmethod def synchronize(self) -> None: pass class AdapterCompilationError(RuntimeError): def __init__(self, backend_exception: Exception, source: str): super().__init__(str(backend_exception)) self.backend_exception = backend_exception self.source = source class ProgramAdapter(ABC): @abstractmethod def __getattr__(self, kernel_name: str) -> "KernelAdapter": pass @abstractmethod def set_constant_buffer( self, queue_adapter: QueueAdapter, name: str, arr: Union[BufferAdapter, "numpy.ndarray[Any, numpy.dtype[Any]]"], ) -> None: pass @property @abstractmethod def source(self) -> str: pass class KernelAdapter(ABC): @property @abstractmethod def program_adapter(self) -> ProgramAdapter: pass @property @abstractmethod def max_total_local_size(self) -> int: pass @abstractmethod def prepare( self, global_size: Sequence[int], local_size: Optional[Sequence[int]] = None ) -> "PreparedKernelAdapter": pass class PreparedKernelAdapter(ABC): @abstractmethod def __call__( self, queue_adapter: QueueAdapter, *args: Union[BufferAdapter, numpy.generic], local_mem: int = 0, ) -> Any: pass