Source code for pymedphys._mudensity.mudensity

# Copyright (C) 2019 Simon Biggs

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version (the "AGPL-3.0+").

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License and the additional terms for more
# details.

# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

# ADDITIONAL TERMS are also included as allowed by Section 7 of the GNU
# Affero General Public License. These additional terms are Sections 1, 5,
# 6, 7, 8, and 9 from the Apache License, Version 2.0 (the "Apache-2.0")
# where all references to the definition "License" are instead defined to
# mean the AGPL-3.0+.

# You should have received a copy of the Apache-2.0 along with this
# program. If not, see <http://www.apache.org/licenses/LICENSE-2.0>.

# pylint: disable=C0103,C1801

import numpy as np

import matplotlib.pyplot as plt

from pymedphys._utilities.constants import AGILITY
from pymedphys._utilities.controlpoints import remove_irrelevant_control_points

from .plt import pcolormesh_grid

__DEFAULT_LEAF_PAIR_WIDTHS = AGILITY
__DEFAULT_GRID_RESOLUTION = 1
__DEFAULT_MAX_LEAF_GAP = 400
__DEFAULT_MIN_STEP_PER_PIXEL = 10


def calc_mu_density(
    mu,
    mlc,
    jaw,
    grid_resolution=__DEFAULT_GRID_RESOLUTION,
    max_leaf_gap=__DEFAULT_MAX_LEAF_GAP,
    leaf_pair_widths=__DEFAULT_LEAF_PAIR_WIDTHS,
    min_step_per_pixel=__DEFAULT_MIN_STEP_PER_PIXEL,
):
    """Determine the MU Density.

    Both jaw and mlc positions are defined in bipolar format for each control
    point. A negative value indicates travel over the isocentre. All positional
    arguments are defined at the isocentre projection with the units of mm.

    Parameters
    ----------
    mu : numpy.ndarray
        1-D array containing an MU value for each control point.
    mlc : numpy.ndarray
        3-D array containing the MLC positions

            | axis 0: control point
            | axis 1: mlc pair
            | axis 2: leaf bank

    jaw : numpy.ndarray
        2-D array containing the jaw positions.

            | axis 0: control point
            | axis 1: diaphragm

    grid_resolution : float, optional
        The calc grid resolution. Defaults to 1 mm.

    max_leaf_gap : float, optional
        The maximum possible distance between opposing leaves. Defaults to
        400 mm.

    leaf_pair_widths : tuple, optional
        The widths of each leaf pair in the
        MLC limiting device. The number of entries in the tuples defines
        the number of leaf pairs. Each entry itself defines that particular
        leaf pair width. Defaults to 80 leaf pairs each 5 mm wide.

    min_step_per_pixel : int, optional
        The minimum number of time steps
        used per pixel for each control point. Defaults to 10.

    Returns
    -------
    mu_density : numpy.ndarray
        2-D array containing the calculated mu density.

            | axis 0: jaw direction
            | axis 1: mlc direction

    Examples
    --------
    >>> import numpy as np
    >>> import pymedphys
    >>>
    >>> leaf_pair_widths = (5, 5, 5)
    >>> max_leaf_gap = 10
    >>> mu = np.array([0, 2, 5, 10])
    >>> mlc = np.array([
    ...     [
    ...         [1, 1],
    ...         [2, 2],
    ...         [3, 3]
    ...     ],
    ...     [
    ...         [2, 2],
    ...         [3, 3],
    ...         [4, 4]
    ...     ],
    ...     [
    ...         [-2, 3],
    ...         [-2, 4],
    ...         [-2, 5]
    ...     ],
    ...     [
    ...         [0, 0],
    ...         [0, 0],
    ...         [0, 0]
    ...     ]
    ... ])
    >>> jaw = np.array([
    ...     [7.5, 7.5],
    ...     [7.5, 7.5],
    ...     [-2, 7.5],
    ...     [0, 0]
    ... ])
    >>>
    >>> grid = pymedphys.mudensity.grid(
    ...    max_leaf_gap=max_leaf_gap, leaf_pair_widths=leaf_pair_widths)
    >>> grid['mlc']
    array([-5., -4., -3., -2., -1.,  0.,  1.,  2.,  3.,  4.,  5.])
    >>>
    >>> grid['jaw']
    array([-8., -7., -6., -5., -4., -3., -2., -1.,  0.,  1.,  2.,  3.,  4.,
            5.,  6.,  7.,  8.])
    >>>
    >>> mu_density = pymedphys.mudensity.calculate(
    ...    mu, mlc, jaw, max_leaf_gap=max_leaf_gap,
    ...    leaf_pair_widths=leaf_pair_widths)
    >>> pymedphys.mudensity.display(grid, mu_density)
    >>>
    >>> np.round(mu_density, 1)
    array([[0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ],
           [0. , 0. , 0. , 0.3, 1.9, 2.2, 1.9, 0.4, 0. , 0. , 0. ],
           [0. , 0. , 0. , 0.4, 2.2, 2.5, 2.2, 0.6, 0. , 0. , 0. ],
           [0. , 0. , 0. , 0.4, 2.4, 2.8, 2.5, 0.8, 0. , 0. , 0. ],
           [0. , 0. , 0. , 0.4, 2.5, 3.1, 2.8, 1. , 0. , 0. , 0. ],
           [0. , 0. , 0. , 0.4, 2.5, 3.4, 3.1, 1.3, 0. , 0. , 0. ],
           [0. , 0. , 0.4, 2.3, 3.2, 3.7, 3.7, 3.5, 1.6, 0. , 0. ],
           [0. , 0. , 0.4, 2.3, 3.2, 3.8, 4. , 3.8, 1.9, 0.1, 0. ],
           [0. , 0. , 0.4, 2.3, 3.2, 3.8, 4.3, 4.1, 2.3, 0.1, 0. ],
           [0. , 0. , 0.4, 2.3, 3.2, 3.9, 5.2, 4.7, 2.6, 0.2, 0. ],
           [0. , 0. , 0.4, 2.3, 3.2, 3.8, 5.4, 6.6, 3.8, 0.5, 0. ],
           [0. , 0.3, 2.2, 3. , 3.5, 4. , 5.1, 7.5, 6.7, 3.9, 0.5],
           [0. , 0.3, 2.2, 3. , 3.5, 4. , 4.7, 6.9, 6.7, 3.9, 0.5],
           [0. , 0.3, 2.2, 3. , 3.5, 4. , 4.5, 6.3, 6.4, 3.9, 0.5],
           [0. , 0.3, 2.2, 3. , 3.5, 4. , 4.5, 5.6, 5.7, 3.8, 0.5],
           [0. , 0.3, 2.2, 3. , 3.5, 4. , 4.5, 5.1, 5.1, 3.3, 0.5],
           [0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. , 0. ]])


    MU Density from a Mosaiq record

    >>> import pymedphys
    >>>
    >>> def mu_density_from_mosaiq(msq_server_name, field_id):
    ...     with pymedphys.mosaiq.connect(msq_server_name) as cursor:
    ...         delivery = pymedphys.Delivery.from_mosaiq(cursor, field_id)
    ...
    ...     grid = pymedphys.mudensity.grid()
    ...     mu_density = delivery.mudensity()
    ...     pymedphys.mudensity.display(grid, mu_density)
    >>>
    >>> mu_density_from_mosaiq('a_server_name', 11111) # doctest: +SKIP


    MU Density from a logfile at a given filepath

    >>> import pymedphys
    >>>
    >>> def mu_density_from_logfile(filepath):
    ...     delivery_data = Delivery.from_logfile(filepath)
    ...     mu_density = Delivery.mudensity()
    ...
    ...     grid = pymedphys.mudensity.grid()
    ...     pymedphys.mudensity.display(grid, mu_density)
    >>>
    >>> mu_density_from_logfile(r"a/path/goes/here") # doctest: +SKIP

    """

    divisibility_of_max_leaf_gap = np.array(max_leaf_gap / 2 / grid_resolution)
    max_leaf_gap_is_divisible = (
        divisibility_of_max_leaf_gap.astype(int) == divisibility_of_max_leaf_gap
    )

    if not max_leaf_gap_is_divisible:
        raise ValueError(
            "The grid resolution needs to exaclty divide half of the max leaf " "gap"
        )

    leaf_pair_widths = np.array(leaf_pair_widths)

    if not np.max(np.abs(mlc)) <= max_leaf_gap / 2:  # pylint: disable = unneeded-not
        raise ValueError(
            "The mlc should not travel further out than half the maximum leaf " "gap."
        )

    mu, mlc, jaw = remove_irrelevant_control_points(mu, mlc, jaw)

    full_grid = get_grid(max_leaf_gap, grid_resolution, leaf_pair_widths)

    mu_density = np.zeros((len(full_grid["jaw"]), len(full_grid["mlc"])))

    for i in range(len(mu) - 1):
        control_point_slice = slice(i, i + 2, 1)
        current_mlc = mlc[control_point_slice, :, :]
        current_jaw = jaw[control_point_slice, :]
        delivered_mu = np.diff(mu[control_point_slice])

        grid, mu_density_of_slice = calc_single_control_point(
            current_mlc,
            current_jaw,
            delivered_mu,
            leaf_pair_widths=leaf_pair_widths,
            grid_resolution=grid_resolution,
            min_step_per_pixel=min_step_per_pixel,
        )
        full_grid_mu_density_of_slice = _convert_to_full_grid(
            grid, full_grid, mu_density_of_slice
        )

        mu_density += full_grid_mu_density_of_slice

    return mu_density


def calc_single_control_point(
    mlc,
    jaw,
    delivered_mu=1,
    leaf_pair_widths=__DEFAULT_LEAF_PAIR_WIDTHS,
    grid_resolution=__DEFAULT_GRID_RESOLUTION,
    min_step_per_pixel=__DEFAULT_MIN_STEP_PER_PIXEL,
):
    """Calculate the MU Density for a single control point.

    Examples
    --------
    >>> import numpy as np
    >>> from pymedphys._mudensity.mudensity import (
    ...     calc_single_control_point, display_mu_density)
    >>>
    >>> leaf_pair_widths = (2, 2)
    >>> mlc = np.array([
    ...     [
    ...         [1, 1],
    ...         [2, 2],
    ...     ],
    ...     [
    ...         [2, 2],
    ...         [3, 3],
    ...     ]
    ... ])
    >>> jaw = np.array([
    ...     [1.5, 1.2],
    ...     [1.5, 1.2]
    ... ])
    >>> grid, mu_density = calc_single_control_point(
    ...     mlc, jaw, leaf_pair_widths=leaf_pair_widths)
    >>> display_mu_density(grid, mu_density)
    >>>
    >>> grid['mlc']
    array([-3., -2., -1.,  0.,  1.,  2.,  3.])
    >>>
    >>> grid['jaw']
    array([-1.5, -0.5,  0.5,  1.5])
    >>>
    >>> np.round(mu_density, 2)
    array([[0.  , 0.07, 0.43, 0.5 , 0.43, 0.07, 0.  ],
           [0.  , 0.14, 0.86, 1.  , 0.86, 0.14, 0.  ],
           [0.14, 0.86, 1.  , 1.  , 1.  , 0.86, 0.14],
           [0.03, 0.17, 0.2 , 0.2 , 0.2 , 0.17, 0.03]])
    """

    leaf_pair_widths = np.array(leaf_pair_widths)
    leaf_division = leaf_pair_widths / grid_resolution

    if not np.all(leaf_division.astype(int) == leaf_division):
        raise ValueError(
            "The grid resolution needs to exactly divide every leaf pair " "width."
        )

    if (
        not np.max(np.abs(jaw))  # pylint: disable = unneeded-not
        <= np.sum(leaf_pair_widths) / 2
    ):
        raise ValueError(
            "The jaw should not travel further out than the maximum leaf " "limits."
        )

    (grid, grid_leaf_map, mlc) = _determine_calc_grid_and_adjustments(
        mlc, jaw, leaf_pair_widths, grid_resolution
    )

    positions = {
        "mlc": {
            1: (-mlc[0, :, 0], -mlc[1, :, 0]),  # left
            -1: (mlc[0, :, 1], mlc[1, :, 1]),  # right
        },
        "jaw": {
            1: (-jaw[0::-1, 0], -jaw[1::, 0]),  # bot
            -1: (jaw[0::-1, 1], jaw[1::, 1]),  # top
        },
    }

    time_steps = _calc_time_steps(positions, grid_resolution, min_step_per_pixel)
    blocked_by_device = _calc_blocked_by_device(
        grid, positions, grid_resolution, time_steps
    )
    device_open = _calc_device_open(blocked_by_device)
    mlc_open, jaw_open = _remap_mlc_and_jaw(device_open, grid_leaf_map)
    open_fraction = _calc_open_fraction(mlc_open, jaw_open)

    mu_density = open_fraction * delivered_mu

    return grid, mu_density


def single_mlc_pair(
    left_mlc,
    right_mlc,
    grid_resolution=__DEFAULT_GRID_RESOLUTION,
    min_step_per_pixel=__DEFAULT_MIN_STEP_PER_PIXEL,
):
    """Calculate the MU Density of a single leaf pair.

    Examples
    --------
    >>> import numpy as np
    >>> import matplotlib.pyplot as plt
    >>>
    >>> from pymedphys._mudensity.mudensity import single_mlc_pair
    >>>
    >>> mlc_left = (-2.3, 3.1)  # (start position, end position)
    >>> mlc_right = (0, 7.7)
    >>>
    >>> x, mu_density = single_mlc_pair(mlc_left, mlc_right)
    >>> fig = plt.plot(x, mu_density, '-o')
    >>>
    >>> x
    array([-2., -1.,  0.,  1.,  2.,  3.,  4.,  5.,  6.,  7.,  8.])
    >>>
    >>> np.round(mu_density, 3)
    array([0.064, 0.244, 0.408, 0.475, 0.53 , 0.572, 0.481, 0.352, 0.224,
           0.096, 0.004])
    """
    leaf_pair_widths = [grid_resolution]
    jaw = np.array(
        [
            [grid_resolution / 2, grid_resolution / 2],
            [grid_resolution / 2, grid_resolution / 2],
        ]
    )
    mlc = np.array([[[-left_mlc[0], right_mlc[0]]], [[-left_mlc[1], right_mlc[1]]]])

    grid, mu_density = calc_single_control_point(
        mlc,
        jaw,
        leaf_pair_widths=leaf_pair_widths,
        grid_resolution=grid_resolution,
        min_step_per_pixel=min_step_per_pixel,
    )

    return grid["mlc"], mu_density[0, :]


def calc_mu_density_return_grid(
    mu,
    mlc,
    jaw,
    grid_resolution=__DEFAULT_GRID_RESOLUTION,
    max_leaf_gap=__DEFAULT_MAX_LEAF_GAP,
    leaf_pair_widths=__DEFAULT_LEAF_PAIR_WIDTHS,
    min_step_per_pixel=__DEFAULT_MIN_STEP_PER_PIXEL,
):
    """DEPRECATED. This is a temporary helper function to provide the old
    api.
    """

    leaf_pair_widths = np.array(leaf_pair_widths)
    mu_density = calc_mu_density(
        mu,
        mlc,
        jaw,
        grid_resolution=grid_resolution,
        max_leaf_gap=max_leaf_gap,
        leaf_pair_widths=leaf_pair_widths,
        min_step_per_pixel=min_step_per_pixel,
    )

    full_grid = get_grid(max_leaf_gap, grid_resolution, leaf_pair_widths)

    grid_xx, grid_yy = np.meshgrid(full_grid["mlc"], full_grid["jaw"])

    return grid_xx, grid_yy, mu_density


def get_grid(
    max_leaf_gap=__DEFAULT_MAX_LEAF_GAP,
    grid_resolution=__DEFAULT_GRID_RESOLUTION,
    leaf_pair_widths=__DEFAULT_LEAF_PAIR_WIDTHS,
):
    """Get the MU Density grid for plotting purposes.

    Examples
    --------
    See `pymedphys.mudensity.calculate`_.
    """

    leaf_pair_widths = np.array(leaf_pair_widths)

    grid = dict()

    grid["mlc"] = np.arange(
        -max_leaf_gap / 2, max_leaf_gap / 2 + grid_resolution, grid_resolution
    ).astype("float")

    _, top_of_reference_leaf = _determine_leaf_centres(leaf_pair_widths)
    grid_reference_position = _determine_reference_grid_position(
        top_of_reference_leaf, grid_resolution
    )

    # It might be better to use round instead of ceil here.
    total_leaf_widths = np.sum(leaf_pair_widths)
    top_grid_pos = (
        np.ceil((total_leaf_widths / 2 - grid_reference_position) / grid_resolution)
        * grid_resolution
        + grid_reference_position
    )

    bot_grid_pos = (
        grid_reference_position
        - np.ceil((total_leaf_widths / 2 + grid_reference_position) / grid_resolution)
        * grid_resolution
    )

    grid["jaw"] = np.arange(
        bot_grid_pos, top_grid_pos + grid_resolution, grid_resolution
    )

    return grid


def display_mu_density_diff(
    grid, mudensity_eval, mudensity_ref, grid_resolution=None, colour_range=None
):
    cmap = "bwr"
    diff = mudensity_eval - mudensity_ref
    if colour_range is None:
        colour_range = np.max(np.abs(diff))

    # pylint: disable=invalid-unary-operand-type
    display_mu_density(
        grid,
        diff,
        grid_resolution=grid_resolution,
        cmap=cmap,
        vmin=-colour_range,
        vmax=colour_range,
    )


def display_mu_density(
    grid, mu_density, grid_resolution=None, cmap=None, vmin=None, vmax=None
):
    """Prints a colour plot of the MU Density.

    Examples
    --------
    See `pymedphys.mudensity.calculate`_.
    """
    if grid_resolution is None:
        grid_resolution = grid["mlc"][1] - grid["mlc"][0]

    x, y = pcolormesh_grid(grid["mlc"], grid["jaw"], grid_resolution)
    plt.pcolormesh(x, y, mu_density, cmap=cmap, vmin=vmin, vmax=vmax)
    plt.colorbar()
    plt.title("MU density")
    plt.xlabel("MLC direction (mm)")
    plt.ylabel("Jaw direction (mm)")
    plt.axis("equal")
    plt.gca().invert_yaxis()


def _calc_blocked_t(travel_diff, grid_resolution):
    blocked_t = np.ones_like(travel_diff) * np.nan

    fully_blocked = travel_diff <= -grid_resolution / 2
    fully_open = travel_diff >= grid_resolution / 2
    blocked_t[fully_blocked] = 1
    blocked_t[fully_open] = 0

    transient = ~fully_blocked & ~fully_open

    blocked_t[transient] = (
        -travel_diff[transient] + grid_resolution / 2
    ) / grid_resolution

    assert np.all(~np.isnan(blocked_t))

    return blocked_t


def _calc_time_steps(positions, grid_resolution, min_step_per_pixel):
    maximum_travel = []
    for _, value in positions.items():
        for _, (start, end) in value.items():
            maximum_travel.append(np.max(np.abs(end - start)))

    maximum_travel = np.max(maximum_travel)
    number_of_pixels = np.ceil(maximum_travel / grid_resolution)
    time_steps = number_of_pixels * min_step_per_pixel

    if time_steps < 10:
        time_steps = 10

    return time_steps


def _calc_blocked_by_device(grid, positions, grid_resolution, time_steps):
    blocked_by_device = {}

    for device, value in positions.items():
        blocked_by_device[device] = dict()

        for multiplier, (start, end) in value.items():
            dt = (end - start) / (time_steps - 1)
            travel = start[None, :] + np.arange(0, time_steps)[:, None] * dt[None, :]
            travel_diff = multiplier * (
                grid[device][None, None, :] - travel[:, :, None]
            )

            blocked_by_device[device][multiplier] = _calc_blocked_t(
                travel_diff, grid_resolution
            )

    return blocked_by_device


def _calc_device_open(blocked_by_device):
    device_open = {}

    for device, value in blocked_by_device.items():
        device_sum = np.sum(
            np.concatenate(
                [np.expand_dims(blocked, axis=0) for _, blocked in value.items()],
                axis=0,
            ),
            axis=0,
        )

        device_open[device] = 1 - device_sum

    return device_open


def _remap_mlc_and_jaw(device_open, grid_leaf_map):
    mlc_open = device_open["mlc"][:, grid_leaf_map, :]
    jaw_open = device_open["jaw"][:, 0, :]

    return mlc_open, jaw_open


def _calc_open_fraction(mlc_open, jaw_open):
    open_t = mlc_open * jaw_open[:, :, None]
    open_fraction = np.mean(open_t, axis=0)

    return open_fraction


def _determine_leaf_centres(leaf_pair_widths):
    total_leaf_widths = np.sum(leaf_pair_widths)
    leaf_centres = (
        np.cumsum(leaf_pair_widths) - leaf_pair_widths / 2 - total_leaf_widths / 2
    )

    reference_leaf_index = len(leaf_centres) // 2

    top_of_reference_leaf = (
        leaf_centres[reference_leaf_index] + leaf_pair_widths[reference_leaf_index] / 2
    )

    return leaf_centres, top_of_reference_leaf


def _determine_reference_grid_position(top_of_reference_leaf, grid_resolution):
    grid_reference_position = top_of_reference_leaf - grid_resolution / 2

    return grid_reference_position


def _determine_calc_grid_and_adjustments(mlc, jaw, leaf_pair_widths, grid_resolution):
    min_y = np.min(-jaw[:, 0])
    max_y = np.max(jaw[:, 1])

    leaf_centres, top_of_reference_leaf = _determine_leaf_centres(leaf_pair_widths)
    grid_reference_position = _determine_reference_grid_position(
        top_of_reference_leaf, grid_resolution
    )

    top_grid_pos = (
        np.round((max_y - grid_reference_position) / grid_resolution)
    ) * grid_resolution + grid_reference_position

    bot_grid_pos = (
        grid_reference_position
        - (np.round((-min_y + grid_reference_position) / grid_resolution))
        * grid_resolution
    )

    grid = dict()
    grid["jaw"] = np.arange(
        bot_grid_pos, top_grid_pos + grid_resolution, grid_resolution
    ).astype("float")

    grid_leaf_map = np.argmin(
        np.abs(grid["jaw"][:, None] - leaf_centres[None, :]), axis=1
    )

    adjusted_grid_leaf_map = grid_leaf_map - np.min(grid_leaf_map)

    leaves_to_be_calced = np.unique(grid_leaf_map)
    adjusted_mlc = mlc[:, leaves_to_be_calced, :]

    min_x = np.round(np.min(-adjusted_mlc[:, :, 0]) / grid_resolution) * grid_resolution
    max_x = np.round(np.max(adjusted_mlc[:, :, 1]) / grid_resolution) * grid_resolution

    grid["mlc"] = np.arange(min_x, max_x + grid_resolution, grid_resolution).astype(
        "float"
    )

    return grid, adjusted_grid_leaf_map, adjusted_mlc


def _convert_to_full_grid(grid, full_grid, mu_density):
    grid_xx, grid_yy = np.meshgrid(grid["mlc"], grid["jaw"])
    full_grid_xx, full_grid_yy = np.meshgrid(full_grid["mlc"], full_grid["jaw"])

    xx_from, xx_to = np.where(
        np.abs(full_grid_xx[None, 0, :] - grid_xx[0, :, None]) < 0.0001
    )
    yy_from, yy_to = np.where(
        np.abs(full_grid_yy[None, :, 0] - grid_yy[:, 0, None]) < 0.0001
    )

    full_grid_mu_density = np.zeros_like(full_grid_xx)
    full_grid_mu_density[  # pylint: disable=unsupported-assignment-operation
        np.ix_(yy_to, xx_to)
    ] = mu_density[np.ix_(yy_from, xx_from)]

    return full_grid_mu_density