Source code for mygrad.nnet.layers.gru

import weakref
from numbers import Integral

import numpy as np

from mygrad._utils import SkipGradient
from mygrad.operation_base import Operation
from mygrad.tensor_base import Tensor

try:
    from numba import njit, vectorize
except ImportError:  # pragma: no cover
    raise ImportError(
        "The package `numba` must be installed in order to access the gru."
    )


@vectorize(
    ["float32(float32)", "float64(float64)"],
    nopython=True,
)
def sig(f):  # pragma: no cover
    """
    Calculates a sigmoid function
    """
    return 1 / (1 + np.exp(-f))


@vectorize(
    ["float32(float32)", "float64(float64)"],
    nopython=True,
)
def d_sig(f):  # pragma: no cover
    """
    Calculates the derivative of a sigmoid function
    """
    return f * (1 - f)


@vectorize(
    ["float32(float32)", "float64(float64)"],
    nopython=True,
)
def d_tanh(f):  # pragma: no cover
    """
    Calculates the derivative of a tanh function
    """
    return 1 - f**2


@njit
def dot(a, b):
    """
    Calculates the dot product between 2 arrays
    of shapes (W,X,Y) and (Y,Z), respectively
    """
    return np.dot(a.reshape(-1, a.shape[-1]), b).reshape(*a.shape[:-1], b.shape[-1])


@njit
def _gru_layer(s, z, r, h, Wz, Wr, Wh):
    """Given:
        S(t=0)
        z = X(t) Uz + bz
        r = X(t) Ur + br
        h = X(t) Uh + bh

    Compute Z(t), R(t), H(t), S(t) for all 1 <= t <= T

    Parameters
    ----------
    s : numpy.ndarray, shape=(T+1, N, D)
        Modified in-place
    z : numpy.ndarray, shape=(T, N, D)
        Modified in-place
    r : numpy.ndarray, shape=(T, N, D)
        Modified in-place
    h : numpy.ndarray, shape=(T, N, D)
        Modified in-place
    Wz : numpy.ndarray, shape=(D, D)
    Wr : numpy.ndarray, shape=(D, D)
    Wh : numpy.ndarray, shape=(D, D)"""
    for n in range(len(s) - 1):
        z[n] += np.dot(s[n], Wz)
        z[n] = sig(z[n])

        r[n] += np.dot(s[n], Wr)
        r[n] = sig(r[n])

        h[n] += np.dot(r[n] * s[n], Wh)
        h[n] = np.tanh(h[n])

        s[n + 1] = (1 - z[n]) * h[n] + z[n] * s[n]


@njit
def _gru_dLds(s, z, r, dLds, Wz, Wh, Wr, dz, dh, dr, s_h, one_z):
    """
                        Z_{t} = sigmoid(Uz X_{t} + Wz S_{t-1} + bz)
                        R_{t} = sigmoid(Ur X_{t} + Wr S_{t-1} + br)
                        H_{t} = tanh(Uh X_{t} + Wh (R{t} * S_{t-1}) + bh)
                        S_{t} = (1 - Z{t}) * H{t} + Z{t} * S_{t-1}

    Returns
    --------

        dL / ds(t) =   partial dL / ds(t+1) * ds(t+1) / ds(t)
                     + partial dL / ds(t+1) * ds(t+1) / dz(t) * dz(t) / ds(t)
                     + partial dL / ds(t+1) * ds(t+1) / dh(t) * dh(t) / ds(t)
                     + partial dL / ds(t+1) * ds(t+1) / dh(t) * dh(t) / dr(t) * dr(t) / ds(t)
    """
    dLdh = dot(dLds * one_z * dh, Wh)

    out = z * dLds
    out += dot(dLds * s_h * dz, Wz)
    out += dLdh * r
    out += dot(dLdh * s * dr, Wr)

    return out


@njit
def _gru_bptt(
    X, dLds, s, z, r, Wz, Wh, Wr, dz, dh, dr, s_h, one_z, bp_lim, old_dLds=None
):
    Wz, Wh, Wr = Wz.T, Wh.T, Wr.T
    bptt = bp_lim < len(X) - 1
    if bptt:  # pragma: no cover
        old_dLds = np.zeros_like(dLds)

    for i in range(bp_lim):
        #  dL(t) / ds(t) + dL(t+1) / ds(t)
        if bptt:  # pragma: no cover
            source_index = slice(1, len(dLds) - i)
            target_index = slice(None, len(dLds) - (i + 1))
            dt = dLds[source_index] - old_dLds[source_index]
            old_dLds = np.copy(dLds)
        else:  # no backprop truncation
            source_index = slice(len(dLds) - (i + 1), len(dLds) - i)
            target_index = slice(len(dLds) - (i + 2), len(dLds) - (i + 1))
            dt = dLds[source_index]

        dLds[target_index] += _gru_dLds(
            s[source_index],
            z[source_index],
            r[source_index],
            dt,
            Wz,
            Wh,
            Wr,
            dz[source_index],
            dh[source_index],
            dr[source_index],
            s_h[source_index],
            one_z[source_index],
        )


def _backprop(var, grad):  # pragma: no cover
    if not var.constant:
        if var._grad is None:
            var._grad = np.asarray(grad)
        else:
            var._grad += grad


class GRUnit(Operation):
    def __call__(
        self, X, Uz, Wz, bz, Ur, Wr, br, Uh, Wh, bh, s0=None, bp_lim=None, dropout=0.0
    ):
        if bp_lim is not None:
            assert isinstance(bp_lim, Integral) and 0 <= bp_lim < len(X)
        assert 0.0 <= dropout < 1.0
        self._dropout = dropout
        self.bp_lim = bp_lim if bp_lim is not None else len(X) - 1

        self.X = X  # type: Tensor  # shape=(T, N, C)

        self.Uz = Uz  # type: Tensor  # shape=(C, D)
        self.Wz = Wz  # type: Tensor  # shape=(D, D)
        self.bz = bz  # type: Tensor  # shape=(D,)

        self.Ur = Ur  # type: Tensor  # shape=(C, D)
        self.Wr = Wr  # type: Tensor  # shape=(D, D)
        self.br = br  # type: Tensor  # shape=(D,)

        self.Uh = Uh  # type: Tensor  # shape=(C, D)
        self.Wh = Wh  # type: Tensor  # shape=(D, D)
        self.bh = bh  # type: Tensor  # shape=(D,)

        self.variables = (
            self.X,
            self.Uz,
            self.Wz,
            self.bz,
            self.Ur,
            self.Wr,
            self.br,
            self.Uh,
            self.Wh,
            self.bh,
        )

        self.type = max(t.dtype for t in self.variables)

        T, N, C = X.shape
        (D,) = bz.shape

        seq = self.X.data

        # t starts at 0 for S; all other sequences begin at t = 1
        out = np.zeros((T + 1, N, D), dtype=self.type)

        if s0 is not None:
            out[0] = s0.data if isinstance(s0, Tensor) else s0

        # compute all contributions to Z, R, H from the input sequence
        # shape: T, N, D
        z = np.tensordot(seq, self.Uz.data, [[-1], [0]]).astype(self.type, copy=False)
        r = np.tensordot(seq, self.Ur.data, [[-1], [0]]).astype(self.type, copy=False)
        h = np.tensordot(seq, self.Uh.data, [[-1], [0]]).astype(self.type, copy=False)

        if dropout:
            p = 1 - dropout
            # For Uz/Ur/Uh: a dropout mask is generated for each datum and is applied uniformly across T
            self._dropUz, self._dropUr, self._dropUh = (
                np.random.binomial(1, p, size=(3, 1, N, D)) / p
            )
            self._dropWz, self._dropWr, self._dropWh = (
                np.random.binomial(1, p, size=(3, D, D)) / p
            )

            z *= self._dropUz
            r *= self._dropUr
            h *= self._dropUh

            Wz = (self._dropWz * self.Wz.data).astype(self.type, copy=False)
            Wr = (self._dropWr * self.Wr.data).astype(self.type, copy=False)
            Wh = (self._dropWh * self.Wh.data).astype(self.type, copy=False)

        else:
            self._dropUz, self._dropUr, self._dropUh = None, None, None
            self._dropWz, self._dropWr, self._dropWh = None, None, None
            Wz = self.Wz.data.astype(self.type, copy=False)
            Wr = self.Wr.data.astype(self.type, copy=False)
            Wh = self.Wh.data.astype(self.type, copy=False)

        z += bz.data.astype(self.type, copy=False)  # X Uz + bz
        r += br.data.astype(self.type, copy=False)  # X Ur + br
        h += bh.data.astype(self.type, copy=False)  # X Uh + bh

        _gru_layer(out, z, r, h, Wz, Wr, Wh)

        self._z = z
        self._r = r
        self._h = h

        return out

    def backward_var(self, grad, index, **kwargs):
        raise SkipGradient("Gradient computed in GRU.backward()")

    def backward(self, grad, **kwargs):
        hidden_seq = self._hidden_seq()
        if hidden_seq is None:  # pragma: no cover
            assert False, "should be unreachable"

        s = hidden_seq.data[:-1]
        z = self._z
        r = self._r
        h = self._h

        dLds = grad[1:].astype(self.type, copy=False)

        const = {"1 - h**2": d_tanh(h), "z*(1 - z)": d_sig(z), "r*(1 - r)": d_sig(r)}

        if self._dropout:
            Wz = (self._dropWz * self.Wz.data).astype(self.type, copy=False)
            Wr = (self._dropWr * self.Wr.data).astype(self.type, copy=False)
            Wh = (self._dropWh * self.Wh.data).astype(self.type, copy=False)
        else:
            Wz = self.Wz.data.astype(self.type, copy=False)
            Wr = self.Wr.data.astype(self.type, copy=False)
            Wh = self.Wh.data.astype(self.type, copy=False)

        const["s - h"] = s - h
        const["1 - z"] = 1 - z

        _gru_bptt(
            self.X.data,
            dLds,
            s,
            z,
            r,
            Wz,
            Wh,
            Wr,
            const["z*(1 - z)"],
            const["1 - h**2"],
            const["r*(1 - r)"],
            const["s - h"],
            const["1 - z"],
            self.bp_lim,
        )

        zgrad = dLds * const["s - h"]  # dL / dz
        hgrad = dLds * const["1 - z"]  # dL / dh
        rgrad = dot(const["1 - h**2"] * hgrad, Wh.T) * s  # dL / dr

        hidden_seq._grad = dLds

        if not (self.Uz.constant and self.Wz.constant and self.bz.constant):
            dz = zgrad * const["z*(1 - z)"]
        # backprop through Wz
        if not self.Wz.constant:
            dWz = np.tensordot(s, dz, ([0, 1], [0, 1]))
            if self._dropout:
                dWz *= self._dropWz
            _backprop(
                self.Wz, dWz.astype(self.Wz.dtype, copy=False)
            )  # self.Wz.backward(dWz, **kwargs)
        # backprop through bz
        if not self.bz.constant:
            _backprop(self.bz, dz.sum(axis=(0, 1), dtype=self.bz.dtype))
        # backprop through bz
        if not self.Uz.constant:
            if self._dropout:
                dz *= (
                    self._dropUz
                )  # IMPORTANT augmented update: this must come after Wz and bz backprop
            _backprop(
                self.Uz,
                np.tensordot(self.X.data, dz, ([0, 1], [0, 1])).astype(
                    self.Uz.dtype, copy=False
                ),
            )

        if not (self.Ur.constant and self.Wr.constant and self.br.constant):
            dr = rgrad * const["r*(1 - r)"]
        # backprop through Wr
        if not self.Wr.constant:
            dWr = np.tensordot(s, dr, ([0, 1], [0, 1]))
            if self._dropout:
                dWr *= self._dropWr
            _backprop(self.Wr, dWr.astype(self.Wr.dtype, copy=False))
        # backprop through br
        if not self.br.constant:
            _backprop(
                self.br, dr.sum(axis=(0, 1), dtype=self.br.dtype)
            )  # self.br.backward(dr.sum(axis=(0, 1)), **kwargs)
        # backprop through Ur
        if not self.Ur.constant:
            if self._dropout:
                dr *= (
                    self._dropUr
                )  # IMPORTANT augmented update: this must come after Wr and br backprop
            _backprop(
                self.Ur,
                np.tensordot(self.X.data, dr, ([0, 1], [0, 1])).astype(
                    self.Ur.dtype, copy=False
                ),
            )

        if not (self.Uh.constant and self.Wh.constant and self.bh.constant):
            dh = hgrad * const["1 - h**2"]
        # backprop through Wh
        if not self.Wh.constant:
            dWh = np.tensordot((s * r), dh, ([0, 1], [0, 1]))
            if self._dropout:
                dWh *= self._dropWh
            _backprop(
                self.Wh, dWh.astype(self.Wh.dtype, copy=False)
            )  # self.Wh.backward(dWh, **kwargs)
        # backprop through bh
        if not self.bh.constant:
            _backprop(
                self.bh, dh.sum(axis=(0, 1), dtype=self.bh.dtype)
            )  # self.bh.backward(dh.sum(axis=(0, 1)), **kwargs)
        # backprop through Uh
        if not self.Uh.constant:
            if self._dropout:
                dh *= (
                    self._dropUh
                )  # IMPORTANT augmented update: this must come after Wh and bh backprop
            _backprop(
                self.Uh,
                np.tensordot(self.X.data, dh, ([0, 1], [0, 1])).astype(
                    self.Uh.dtype, copy=False
                ),
            )

        # backprop through X
        if not self.X.constant:
            tmp = dLds * const["1 - z"] * const["1 - h**2"]
            if not self._dropout:
                dLdX = np.dot(
                    (dLds * const["s - h"]) * const["z*(1 - z)"], self.Uz.data.T
                )
                dLdX += np.dot(tmp, self.Uh.data.T)
                dLdX += np.dot(
                    np.dot(tmp, Wh.T) * s * const["r*(1 - r)"], self.Ur.data.T
                )
            else:
                dLdX = np.dot(
                    (self._dropUz * (dLds * const["s - h"]) * const["z*(1 - z)"]),
                    self.Uz.data.T,
                )
                dLdX += np.dot(self._dropUh * tmp, self.Uh.data.T)
                dLdX += np.dot(
                    self._dropUr * (dot(tmp, Wh.T) * s * const["r*(1 - r)"]),
                    self.Ur.data.T,
                )
            _backprop(
                self.X, dLdX.astype(self.X.dtype, copy=False)
            )  # self.X.backward(dLdX, **kwargs)

        del self._z
        del self._r
        del self._h

        super().backward(grad)


[docs]def gru( X, Uz, Wz, bz, Ur, Wr, br, Uh, Wh, bh, s0=None, bp_lim=None, dropout=0.0, constant=None, ): r"""Performs a forward pass of sequential data through a Gated Recurrent Unit layer, returning the 'hidden-descriptors' arrived at by utilizing the trainable parameters as follows:: Z_{t} = sigmoid(X_{t} Uz + S_{t-1} Wz + bz) R_{t} = sigmoid(X_{t} Ur + S_{t-1} Wr + br) H_{t} = tanh(X_{t} Uh + (R{t} * S_{t-1}) Wh + bh) S_{t} = (1 - Z{t}) * H{t} + Z{t} * S_{t-1} Parameters ---------- X : array_like, shape=(T, N, C) The sequential data to be passed forward. Uz : array_like, shape=(C, D) The weights used to map sequential data to its hidden-descriptor representation Wz : array_like, shape=(D, D) The weights used to map a hidden-descriptor to a hidden-descriptor. bz : array_like, shape=(D,) The biases used to scale a hidden-descriptor. Ur : array_like, shape=(C, D) The weights used to map sequential data to its hidden-descriptor representation Wr : array_like, shape=(D, D) The weights used to map a hidden-descriptor to a hidden-descriptor. br : array_like, shape=(D,) The biases used to scale a hidden-descriptor. Uh : array_like, shape=(C, D) The weights used to map sequential data to its hidden-descriptor representation Wh : array_like, shape=(D, D) The weights used to map a hidden-descriptor to a hidden-descriptor. bh : array_like, shape=(D,) The biases used to scale a hidden-descriptor. s0 : Optional[array_like], shape=(N, D) The 'seed' hidden descriptors to feed into the RNN. If None, a Tensor of zeros of shape (N, D) is created. bp_lim : Optional[int] *This feature is experimental and is currently untested*. The (non-zero) limit of the depth of back propagation through time to be performed. If `None` back propagation is passed back through the entire sequence. E.g. `bp_lim=3` will propagate gradients only up to 3 steps backward through the recursive sequence. dropout : float (default=0.), 0 <= dropout < 1 If non-zero, the dropout scheme described in [1]_ is applied. See Notes for more details. constant : bool, optional (default=False) If True, the resulting Tensor is a constant. Returns ------- mygrad.Tensor, shape=(T+1, N, D) The sequence of 'hidden-descriptors' produced by the forward pass of the RNN. Notes ----- - :math:`T` : Sequence length - :math:`N` : Batch size - :math:`C` : Length of single datum - :math:`D` : Length of 'hidden' descriptor The GRU system of equations is given by: .. math:: Z_{t} = \sigma (X_{t} U_z + S_{t-1} Wz + bz) R_{t} = \sigma (X_{t} U_r + S_{t-1} Wr + br) H_{t} = tanh(X_{t} U_h + (R_{t} * S_{t-1}) W_h + b_h) S_{t} = (1 - Z_{t}) * H_{t} + Z_{t} * S_{t-1} Following the dropout scheme specified in [1]_, the hidden-hidden weights (Wz/Wr/Wh) randomly have their weights dropped prior to forward/back-prop. The input connections (via Uz/Ur/Uh) have variational dropout ([2]_) applied to them with a common dropout mask across all t. That is three static dropout masks, each with shape-(N,D), are applied to .. math:: X_{t} U_z X_{t} U_r X_{t} U_h respectively, for all :math:`t`. References ---------- .. [1] S. Merity, et. al. "Regularizing and Optimizing LSTM Language Models", arXiv:1708.02182v1, 2017. .. [2] Y. Gal, Z. Ghahramani "A Theoretically Grounded Application of Dropout in Recurrent Neural Networks" arXiv:1512.05287v5, 2016.""" if s0 is not None: if not isinstance(s0, np.ndarray) and not ( isinstance(s0, Tensor) and (constant or s0.constant) ): raise ValueError( "GRU does not support non-constant tensors for the initial hidden" "state value, `s0`" ) s = Tensor._op( GRUnit, X, Uz, Wz, bz, Ur, Wr, br, Uh, Wh, bh, op_kwargs=dict(s0=s0, bp_lim=bp_lim, dropout=dropout), constant=constant, ) try: s.creator._hidden_seq = weakref.ref(s) except AttributeError: # pragma: no cover # `no-autodiff` mode does not record creator pass return s