Source code for pyhf.experimental.modifiers

from __future__ import annotations

import logging
from typing import Any, Callable, Sequence

import pyhf
from pyhf import events, get_backend
from pyhf.parameters import ParamViewer

log = logging.getLogger(__name__)

__all__ = ["add_custom_modifier"]


def __dir__():
    return __all__


try:
    import numexpr as ne
except ModuleNotFoundError:
    log.error(
        "\nInstallation of the experimental extra is required to use pyhf.experimental.modifiers"
        + "\nPlease install with: python -m pip install 'pyhf[experimental]'\n",
        exc_info=True,
    )
    raise


class BaseApplier:
    ...


class BaseBuilder:
    ...


def _allocate_new_param(
    p: dict[str, Sequence[float]]
) -> dict[str, str | bool | int | Sequence[float]]:
    return {
        "paramset_type": "unconstrained",
        "n_parameters": 1,
        "is_shared": True,
        "inits": p["inits"],
        "bounds": p["bounds"],
        "is_scalar": True,
        "fixed": False,
    }


def make_func(expression: str, deps: list[str]) -> Callable[[Sequence[float]], Any]:
    def func(d: Sequence[float]) -> Any:
        return ne.evaluate(expression, local_dict=dict(zip(deps, d)))

    return func


def make_builder(
    func_name: str, deps: list[str], new_params: dict[str, dict[str, Sequence[float]]]
) -> BaseBuilder:
    class _builder(BaseBuilder):
        is_shared = False

        def __init__(self, config):
            self.builder_data = {"funcs": {}}
            self.config = config

        def collect(self, thismod, nom):
            maskval = bool(thismod)
            mask = [maskval] * len(nom)
            return {"mask": mask}

        def append(self, key, channel, sample, thismod, defined_sample):
            self.builder_data.setdefault(key, {}).setdefault(sample, {}).setdefault(
                "data", {"mask": []}
            )
            nom = (
                defined_sample["data"]
                if defined_sample
                else [0.0] * self.config.channel_nbins[channel]
            )
            mod_data = self.collect(thismod, nom)
            self.builder_data[key][sample]["data"]["mask"] += mod_data["mask"]
            if thismod:
                if thismod["name"] != func_name:
                    self.builder_data["funcs"].setdefault(
                        thismod["name"], thismod["data"]["expr"]
                    )
                self.required_parsets = {
                    k: [_allocate_new_param(v)] for k, v in new_params.items()
                }

        def finalize(self):
            return self.builder_data

    return _builder


def make_applier(
    func_name: str, deps: list[str], new_params: dict[str, dict[str, Sequence[float]]]
) -> BaseApplier:
    class _applier(BaseApplier):
        name = func_name
        op_code = "multiplication"

        def __init__(self, modifiers, pdfconfig, builder_data, batch_size=None):
            self.funcs = [make_func(v, deps) for v in builder_data["funcs"].values()]

            self.batch_size = batch_size
            pars_for_applier = deps
            _mod_names = [f"{mtype}/{m}" for m, mtype in modifiers]

            parfield_shape = (
                (self.batch_size, pdfconfig.npars)
                if self.batch_size
                else (pdfconfig.npars,)
            )
            self.param_viewer = ParamViewer(
                parfield_shape, pdfconfig.par_map, pars_for_applier
            )
            self._custom_mod_mask = [
                [[builder_data[mod_name][s]["data"]["mask"]] for s in pdfconfig.samples]
                for mod_name in _mod_names
            ]
            self._precompute()
            events.subscribe("tensorlib_changed")(self._precompute)

        def _precompute(self):
            tensorlib, _ = get_backend()
            if not self.param_viewer.index_selection:
                return
            self.custom_mod_mask = tensorlib.tile(
                tensorlib.astensor(self._custom_mod_mask),
                (1, 1, self.batch_size or 1, 1),
            )
            self.custom_mod_mask_bool = tensorlib.astensor(
                self.custom_mod_mask, dtype="bool"
            )
            self.custom_mod_default = tensorlib.ones(self.custom_mod_mask.shape)

        def apply(self, pars):
            """
            Returns:
                modification tensor: Shape (n_modifiers, n_global_samples, n_alphas, n_global_bin)
            """
            if not self.param_viewer.index_selection:
                return
            tensorlib, _ = get_backend()
            if self.batch_size is None:
                deps = self.param_viewer.get(pars)
                results = tensorlib.astensor([f(deps) for f in self.funcs])
                results = tensorlib.einsum(
                    "msab,m->msab", self.custom_mod_mask, results
                )
            else:
                deps = self.param_viewer.get(pars)
                results = tensorlib.astensor([f(deps) for f in self.funcs])
                results = tensorlib.einsum(
                    "msab,ma->msab", self.custom_mod_mask, results
                )
            results = tensorlib.where(
                self.custom_mod_mask_bool, results, self.custom_mod_default
            )
            return results

    return _applier


[docs] def add_custom_modifier( func_name: str, deps: list[str], new_params: dict[str, dict[str, Sequence[float]]] ) -> dict[str, tuple[BaseBuilder, BaseApplier]]: r""" Add a custom modifier type with the modifier data defined through a custom numexpr string expression. Example: >>> import pyhf >>> import pyhf.experimental.modifiers >>> pyhf.set_backend("numpy") >>> new_params = { ... "m1": {"inits": (1.0,), "bounds": ((-5.0, 5.0),)}, ... "m2": {"inits": (1.0,), "bounds": ((-5.0, 5.0),)}, ... } >>> expanded_pyhf = pyhf.experimental.modifiers.add_custom_modifier( ... "custom", ["m1", "m2"], new_params ... ) >>> model = pyhf.Model( ... { ... "channels": [ ... { ... "name": "singlechannel", ... "samples": [ ... { ... "name": "signal", ... "data": [10, 20], ... "modifiers": [ ... { ... "name": "f2", ... "type": "custom", ... "data": {"expr": "m1"}, ... }, ... ], ... }, ... { ... "name": "background", ... "data": [100, 150], ... "modifiers": [ ... { ... "name": "f1", ... "type": "custom", ... "data": {"expr": "m1+(m2**2)"}, ... }, ... ], ... }, ... ], ... } ... ] ... }, ... modifier_set=expanded_pyhf, ... poi_name="m1", ... validate=False, ... batch_size=1, ... ) >>> model.config.modifiers [('f1', 'custom'), ('f2', 'custom')] Args: func_name (:obj:`str`): The name of the custom modifier type. deps (:obj:`list`): The names of the new parameters of the modifier function. new_params (:obj:`dict`): The new parameters. Returns: :obj:`dict`: The updated ``pyhf.modifiers.histfactory_set`` with the added custom modifier type. .. versionadded:: 0.8.0 """ _builder = make_builder(func_name, deps, new_params) _applier = make_applier(func_name, deps, new_params) modifier_set = {_applier.name: (_builder, _applier)} modifier_set.update(**pyhf.modifiers.histfactory_set) return modifier_set