Source code for grunnur.program

from __future__ import annotations

from math import log10
from typing import (
    Any,
    Tuple,
    Union,
    List,
    Dict,
    Optional,
    Iterable,
    Mapping,
    Generic,
    TypeVar,
    Callable,
    Sequence,
    cast,
)
import weakref

import numpy

from .device import Device
from .adapter_base import (
    AdapterCompilationError,
    KernelAdapter,
    BufferAdapter,
    ProgramAdapter,
)
from .modules import render_with_modules
from .utils import update_dict
from .array_metadata import ArrayMetadataLike
from .array import Array, MultiArray
from .buffer import Buffer
from .queue import Queue, MultiQueue
from .context import Context, BoundDevice, BoundMultiDevice
from .api import cuda_api_id
from .template import DefTemplate
from .modules import Snippet


class CompilationError(RuntimeError):
    def __init__(self, backend_exception: Exception):
        super().__init__(str(backend_exception))
        self.backend_exception = backend_exception


def _check_set_constant_array(queue: Queue, program_devices: BoundMultiDevice) -> None:
    if queue.device.context != program_devices.context:
        raise ValueError("The provided queue must belong to the same context as this program uses")
    if queue.device not in program_devices:
        raise ValueError(
            f"The program was not compiled for the device this queue uses ({queue.device})"
        )


def _set_constant_array(
    queue: Queue,
    program_adapter: ProgramAdapter,
    name: str,
    arr: Union[Array, Buffer, "numpy.ndarray[Any, numpy.dtype[Any]]"],
) -> None:
    """
    Uploads a constant array ``arr`` corresponding to the symbol ``name`` to the context.
    """
    queue_adapter = queue._queue_adapter

    constant_data: Union[BufferAdapter, "numpy.ndarray[Any, numpy.dtype[Any]]"]

    if isinstance(arr, Array):
        constant_data = arr.data._buffer_adapter
    elif isinstance(arr, Buffer):
        constant_data = arr._buffer_adapter
    elif isinstance(arr, numpy.ndarray):
        constant_data = arr
    else:
        raise TypeError(f"Unsupported array type: {type(arr)}")

    program_adapter.set_constant_buffer(queue_adapter, name, constant_data)


class SingleDeviceProgram:
    """
    A program compiled for a single device.
    """

    device: BoundDevice

    source: str

    def __init__(
        self,
        device: BoundDevice,
        template_src: Union[str, Callable[..., str], DefTemplate, Snippet],
        no_prelude: bool = False,
        fast_math: bool = False,
        render_args: Sequence[Any] = [],
        render_globals: Mapping[str, Any] = {},
        constant_arrays: Optional[Mapping[str, ArrayMetadataLike]] = None,
        keep: bool = False,
        compiler_options: Optional[Sequence[str]] = None,
    ):
        """
        Renders and compiles the given template on a single device.

        :param device:
        :param template_src: see :py:meth:`compile`.
        :param no_prelude: see :py:meth:`compile`.
        :param fast_math: see :py:meth:`compile`.
        :param render_args: see :py:meth:`compile`.
        :param render_globals: see :py:meth:`compile`.
        :param kwds: additional parameters for compilation, see :py:func:`compile`.
        """
        if device.context.api.id != cuda_api_id() and constant_arrays and len(constant_arrays) > 0:
            raise ValueError("Compile-time constant arrays are only supported for CUDA API")

        self.device = device

        render_globals = update_dict(
            render_globals,
            dict(device_params=device.params),
            error_msg="'device_params' is a reserved global name and cannot be used",
        )

        src = render_with_modules(
            template_src, render_args=render_args, render_globals=render_globals
        )

        context_adapter = device.context._context_adapter

        if no_prelude:
            prelude = ""
        else:
            prelude = context_adapter.render_prelude(fast_math=fast_math)

        try:
            self._sd_program_adapter = context_adapter.compile_single_device(
                device._device_adapter,
                prelude,
                src,
                fast_math=fast_math,
                constant_arrays=constant_arrays,
                keep=keep,
                compiler_options=compiler_options,
            )
        except AdapterCompilationError as e:
            print(f"Failed to compile on {device}")

            lines = e.source.split("\n")
            max_num_len = int(log10(len(lines))) + 1
            for i, l in enumerate(lines):
                print(str(i + 1).rjust(max_num_len) + ": " + l)

            raise CompilationError(e.backend_exception)

        self.source = self._sd_program_adapter.source

    def get_kernel_adapter(self, kernel_name: str) -> KernelAdapter:
        """
        Returns a :py:class:`SingleDeviceKernel` object for a function (CUDA)/kernel (OpenCL)
        with the name ``kernel_name``.
        """
        return cast(KernelAdapter, getattr(self._sd_program_adapter, kernel_name))

    def set_constant_array(
        self,
        queue: Queue,
        name: str,
        arr: Union[Array, Buffer, "numpy.ndarray[Any, numpy.dtype[Any]]"],
    ) -> None:
        """
        Uploads a constant array ``arr`` corresponding to the symbol ``name`` to the context.
        """
        _set_constant_array(queue, self._sd_program_adapter, name, arr)


[docs]class Program: """ A compiled program on device(s). """ devices: BoundMultiDevice """The devices on which this program was compiled.""" sources: Dict[BoundDevice, str] """Source files used for each device.""" kernel: "KernelHub" """An object whose attributes are :py:class:`~grunnur.program.Kernel` objects with the corresponding names.""" def __init__( self, devices: Sequence[BoundDevice], template_src: Union[str, Callable[..., str], DefTemplate, Snippet], no_prelude: bool = False, fast_math: bool = False, render_args: Sequence[Any] = (), render_globals: Mapping[str, Any] = {}, compiler_options: Optional[Sequence[str]] = None, keep: bool = False, constant_arrays: Optional[Mapping[str, ArrayMetadataLike]] = None, ): """ :param devices: a single- or a multi-device object on which to compile this program. :param template_src: a string with the source code, or a Mako template source to render. :param no_prelude: do not add prelude to the rendered source. :param fast_math: compile using fast (but less accurate) math functions. :param render_args: a list of positional args to pass to the template. :param render_globals: a dictionary of globals to pass to the template. :param compiler_options: a list of options to pass to the backend compiler. :param keep: keep the intermediate files in a temporary directory. :param constant_arrays: (**CUDA only**) a dictionary ``name: (size, dtype)`` of global constant arrays to be declared in the program. """ sd_programs = {} sources = {} multi_device = BoundMultiDevice.from_bound_devices(devices) for device in multi_device: sd_program = SingleDeviceProgram( device, template_src, no_prelude=no_prelude, fast_math=fast_math, render_args=render_args, render_globals=render_globals, compiler_options=compiler_options, keep=keep, constant_arrays=constant_arrays, ) sd_programs[device] = sd_program sources[device] = sd_program.source self._sd_programs = sd_programs self.sources = sources self.devices = multi_device # TODO: create dynamically, in case someone wants to hold a reference to it and # discard this Program object self.kernel = KernelHub(self)
[docs] def set_constant_array( self, queue: Queue, name: str, arr: Union[Array, "numpy.ndarray[Any, numpy.dtype[Any]]"] ) -> None: """ Uploads a constant array to the context's devices (**CUDA only**). :param queue: the queue to use for the transfer. :param name: the name of the constant array symbol in the code. :param arr: either a device or a host array. """ _check_set_constant_array(queue, self.devices) self._sd_programs[queue.device].set_constant_array(queue, name, arr)
[docs]class KernelHub: """ An object providing access to the host program's kernels. """ def __init__(self, program: Program): self._program_ref = weakref.proxy(program)
[docs] def __getattr__(self, kernel_name: str) -> "Kernel": """ Returns a :py:class:`~grunnur.program.Kernel` object for a function (CUDA)/kernel (OpenCL) with the name ``kernel_name``. """ program = self._program_ref sd_kernel_adapters = { device: sd_program.get_kernel_adapter(kernel_name) for device, sd_program in program._sd_programs.items() } return Kernel(program, sd_kernel_adapters)
def extract_arg( arg: Union[ Mapping[BoundDevice, Union[Array, Buffer, numpy.generic]], MultiArray, Array, Buffer, numpy.generic, ], device: BoundDevice, ) -> Union[BufferAdapter, numpy.generic]: single_device_arg: Union[Array, Buffer, numpy.generic] if isinstance(arg, Mapping): single_device_arg = arg[device] elif isinstance(arg, MultiArray): single_device_arg = arg.subarrays[device] else: single_device_arg = arg if isinstance(single_device_arg, Array): return single_device_arg.data._buffer_adapter elif isinstance(single_device_arg, Buffer): return single_device_arg._buffer_adapter else: return single_device_arg
[docs]class PreparedKernel: """ A kernel specialized for execution on a set of devices with all possible preparations and checks performed. """ def __init__( self, devices: BoundMultiDevice, sd_kernel_adapters: Mapping[BoundDevice, KernelAdapter], global_sizes: Mapping[BoundDevice, Sequence[int]], local_sizes: Mapping[BoundDevice, Optional[Sequence[int]]], hold_reference: Optional["Kernel"] = None, ): # If this object can be used by itself (e.g. when created from `Kernel.prepare()`), # this attribute will hold thre reference to the original `Kernel`. # On the other hand, in `StaticKernel` the object is used internally, # and holding a reference to the parent `StaticKernel` here will result in a reference cycle. # So `StaticKernel` will just pass `None`. self._hold_reference = hold_reference self._prepared_kernel_adapters = {} for device in sd_kernel_adapters: kernel_ls = local_sizes[device] kernel_gs = global_sizes[device] pkernel = sd_kernel_adapters[device].prepare(kernel_gs, kernel_ls) self._prepared_kernel_adapters[device] = pkernel self._devices = devices
[docs] def __call__( self, queue: Union[Queue, MultiQueue], *args: Union[MultiArray, Array, Buffer, numpy.generic], local_mem: int = 0, ) -> Any: """ Enqueues the kernel on the devices in the given queue. The kernel must have been prepared for all of these devices. If an argument is a :py:class:`~grunnur.Array` or :py:class:`~grunnur.Buffer` object, it must belong to the device on which the kernel is being executed (so ``queue`` must only have one device). If an argument is a :py:class:`~grunnur.MultiArray`, it should have subarrays on all the devices from the given ``queue``. If an argument is a ``numpy`` scalar, it will be passed to the kernel directly. If an argument is a integer-keyed ``dict``, its values corresponding to the device indices the kernel is executed on will be passed as kernel arguments. :param args: kernel arguments. :param kwds: backend-specific keyword parameters. :returns: a list of ``Event`` objects for enqueued kernels in case of PyOpenCL. """ if isinstance(queue, Queue): queue = MultiQueue([queue]) # Technically this would be caught by `issubset()`, but it'll help to provide # a more specific error to the user. if queue.devices.context != self._devices.context: raise ValueError("The provided queue must belong to the same context this program uses") if not queue.devices.issubset(self._devices): raise ValueError( f"Requested execution on devices {queue.devices}; " f"only compiled for {self._devices}" ) ret_vals = [] for device in queue.devices: kernel_args = [extract_arg(arg, device) for arg in args] single_queue = queue.queues[device] pkernel = self._prepared_kernel_adapters[device] ret_val = pkernel(single_queue._queue_adapter, *kernel_args, local_mem=0) ret_vals.append(ret_val) return ret_vals
def normalize_sizes( devices: Sequence[BoundDevice], global_size: Union[Sequence[int], Mapping[BoundDevice, Sequence[int]]], local_size: Union[Sequence[int], None, Mapping[BoundDevice, Optional[Sequence[int]]]] = None, ) -> Tuple[ BoundMultiDevice, Dict[BoundDevice, Tuple[int, ...]], Dict[BoundDevice, Optional[Tuple[int, ...]]], ]: if not isinstance(global_size, Mapping): global_size = {device: global_size for device in devices} if not isinstance(local_size, Mapping): local_size = {device: local_size for device in devices} normalized_global_size = {device: tuple(gs) for device, gs in global_size.items()} normalized_local_size = { device: tuple(ls) if ls is not None else None for device, ls in local_size.items() } if normalized_global_size.keys() != normalized_local_size.keys(): raise ValueError( "Mismatched device sets for global and local sizes: " f"local sizes have {list(normalized_local_size.keys())}, " f"global sizes have {list(normalized_global_size.keys())}" ) devices_subset = BoundMultiDevice.from_bound_devices( [device for device in devices if device in normalized_global_size] ) return devices_subset, normalized_global_size, normalized_local_size
[docs]class Kernel: """ A kernel compiled for multiple devices. """ def __init__(self, program: Program, sd_kernel_adapters: Dict[BoundDevice, KernelAdapter]): self._program = program self._sd_kernel_adapters = sd_kernel_adapters @property def max_total_local_sizes(self) -> Dict[BoundDevice, int]: """ The maximum possible number of threads in a block (CUDA)/work items in a work group (OpenCL) for this kernel. """ return { device: sd_kernel_adapter.max_total_local_size for device, sd_kernel_adapter in self._sd_kernel_adapters.items() }
[docs] def prepare( self, global_size: Union[Sequence[int], Mapping[BoundDevice, Sequence[int]]], local_size: Union[ Sequence[int], None, Mapping[BoundDevice, Optional[Sequence[int]]] ] = None, ) -> "PreparedKernel": """ Prepares the kernel for execution. If ``local_size`` or ``global_size`` are integer, they will be treated as 1-tuples. One can pass specific global and local sizes for each device using dictionaries keyed with device indices. This achieves another purpose: the kernel will only be prepared for those devices, and not for all devices available in the context. :param global_size: the total number of threads (CUDA)/work items (OpenCL) in each dimension (column-major). Note that there may be a maximum size in each dimension as well as the maximum number of dimensions. See :py:class:`~grunnur.adapter_base.DeviceParameters` for details. :param local_size: the number of threads in a block (CUDA)/work items in a work group (OpenCL) in each dimension (column-major). If ``None``, it will be chosen automatically. """ multi_device, n_global_size, n_local_size = normalize_sizes( self._program.devices, global_size, local_size ) # Filter out only the kernel adapters mentioned in global/local_size sd_kernel_adapters = {device: self._sd_kernel_adapters[device] for device in multi_device} return PreparedKernel( multi_device, sd_kernel_adapters, n_global_size, n_local_size, hold_reference=self )
[docs] def __call__( self, queue: Union[Queue, MultiQueue], global_size: Union[Sequence[int], Mapping[BoundDevice, Sequence[int]]], local_size: Union[ Sequence[int], None, Mapping[BoundDevice, Optional[Sequence[int]]] ] = None, *args: Union[MultiArray, Array, Buffer, numpy.generic], local_mem: int = 0, ) -> Any: """ A shortcut for :py:meth:`Kernel.prepare` and subsequent :py:meth:`PreparedKernel.__call__`. See their doc entries for details. """ pkernel = self.prepare(global_size, local_size) return pkernel(queue, *args, local_mem=local_mem)