Module response

Your handy frequency and impulse response processing object.

travis-ci codecov

This module supplies the Response class: an abstraction of frequency and impulse responses and a set of handy methods for their processing. It implements a fluent interface for chaining the processing commands.

Find the documentation here and the source code on GitHub.

import numpy as np
from response import Response

fs = 48000  # sampling rate
T = 0.5     # length of signal
# a sine at 100 Hz
t = np.arange(int(T * fs)) / fs
x = np.sin(2 * np.pi * 100 * t)
# Do chain of processing
r = (
    Response.from_time(fs, x)
    # time window at the end and beginning
    .time_window((0, 0.1), (-0.1, None), window="hann")  # equivalent to Tukey window
    # zeropad to one second length
    .zeropad_to_length(fs * 1)
    # circular shift to center
    .circdelay(T / 2)
    # resample with polyphase filter, keep gain of filter
    .resample_poly(500, window=("kaiser", 0.5), normalize="same_amplitude")
    # cut 0.2s at beginning and end
    .timecrop(0.2, -0.2)
    # apply frequency domain window
    .freq_window((0, 90), (110, 500))
)
# plot magnitude, phase and time response
r.plot(show=True)
# real impulse response
r.in_time
# complex frequency response
r.in_freq
# and much more ...
Expand source code
"""Your handy frequency and impulse response processing object.

[![](https://img.shields.io/pypi/l/response.svg?style=flat)](https://pypi.org/project/response/)
[![](https://img.shields.io/pypi/v/response.svg?style=flat)](https://pypi.org/project/response/)
[![travis-ci](https://travis-ci.org/fhchl/Response.svg?branch=master)](https://travis-ci.org/fhchl/Response)
[![codecov](https://codecov.io/gh/fhchl/Response/branch/master/graph/badge.svg)](https://codecov.io/gh/fhchl/Response)

This module supplies the `Response` class: an abstraction of frequency and
impulse responses and a set of handy methods for their processing. It implements a
[fluent interface][1] for chaining the processing commands.

Find the documentation [here][2] and the source code on [GitHub][3].

```python
import numpy as np
from response import Response

fs = 48000  # sampling rate
T = 0.5     # length of signal
# a sine at 100 Hz
t = np.arange(int(T * fs)) / fs
x = np.sin(2 * np.pi * 100 * t)
# Do chain of processing
r = (
    Response.from_time(fs, x)
    # time window at the end and beginning
    .time_window((0, 0.1), (-0.1, None), window="hann")  # equivalent to Tukey window
    # zeropad to one second length
    .zeropad_to_length(fs * 1)
    # circular shift to center
    .circdelay(T / 2)
    # resample with polyphase filter, keep gain of filter
    .resample_poly(500, window=("kaiser", 0.5), normalize="same_amplitude")
    # cut 0.2s at beginning and end
    .timecrop(0.2, -0.2)
    # apply frequency domain window
    .freq_window((0, 90), (110, 500))
)
# plot magnitude, phase and time response
r.plot(show=True)
# real impulse response
r.in_time
# complex frequency response
r.in_freq
# and much more ...
```

[1]: https://en.wikipedia.org/wiki/Fluent_interface
[2]: https://fhchl.github.io/Response/
[3]: https://github.com/fhchl/Response

"""

import warnings
from fractions import Fraction
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
from scipy.io import wavfile
from scipy.signal import get_window, lfilter, resample, resample_poly, tukey, welch


class Response(object):
    """Representation of a linear response in time and frequency domain."""

    def __init__(self, fs, fdata=None, tdata=None, isEvenSampled=True):
        """Create Response from time or frequency data.

        Use `from_time` or `from_freq methods` to create objects of this class!

        Parameters
        ----------
        fs : int
            Sampling frequency in Hertz
        fdata : (..., nt) complex ndarray, optional
            Single sided frequency spectra with nt from ns to nr points.
        tdata : (..., nf) real ndarray, optional
            Time responses with nt from ns to nr points.
        isEvenSampled : bool or None, optional
            If fdata is given, this tells us if the last entry of fdata is the
            Nyquist frequency or not. Must be `None` if tdata is given.

        Raises
        ------
        ValueError
            if neither fdata or tdata are given.

        """
        assert float(fs).is_integer()

        if fdata is not None and tdata is None:
            fdata = np.atleast_1d(fdata)
            self._nf = fdata.shape[-1]

            if isEvenSampled:
                self._nt = 2 * (self._nf - 1)
            else:
                self._nt = 2 * self._nf - 1
            self._isEvenSampled = isEvenSampled

            self.__set_frequency_data(fdata)
        elif tdata is not None and fdata is None:
            assert np.all(np.imag(tdata) == 0), "Time data must be real."
            tdata = np.atleast_1d(tdata)
            self._nt = tdata.shape[-1]
            self._nf = self._nt // 2 + 1
            self._isEvenSampled = self._nt % 2 == 0

            self.__set_time_data(tdata)
        else:
            raise ValueError("One and only one of fdata and tdata must be given.")

        self._fs = int(fs)
        self._freqs = freq_vector(self._nt, fs)
        self._times = time_vector(self._nt, fs)
        self._time_length = self._nt * 1 / fs
        self.df = self._freqs[1]  # frequency resolution
        self.dt = self._times[1]  # time resolution

    @classmethod
    def from_time(cls, fs, tdata, **kwargs):
        """Generate Response obj from time response data."""
        tf = cls(fs, tdata=tdata, **kwargs)
        return tf

    @classmethod
    def from_freq(cls, fs, fdata, **kwargs):
        """Generate Response obj from frequency response data."""
        tf = cls(fs, fdata=fdata, **kwargs)
        return tf

    @classmethod
    def from_wav(cls, fps):
        """Import responses from wav files.

        Parameters
        ----------
        fps : list
            File paths of all wav files.

        Returns
        -------
        Response
            New Response object with imported time responses.

        """
        fpi = iter(fps)
        fs, data = wavfile.read(next(fpi))
        hlist = [data] + [wavfile.read(fp)[1] for fp in fpi]

        h = np.array(hlist)
        if data.dtype in [np.uint8, np.int16, np.int32]:
            lim_orig = (np.iinfo(data.dtype).min, np.iinfo(data.dtype).max)
            lim_new = (-1.0, 1.0)
            h = _rescale(h, lim_orig, lim_new).astype(np.double)

        return cls.from_time(fs, h)

    @classmethod
    def new_dirac(cls, fs, T=None, n=None, nch=(1,)):
        """Generate new allpass / dirac response."""
        nch = np.atleast_1d(nch)
        if T is not None:
            nt = round(fs * T)
        else:
            nt = n
        h = np.zeros((*nch, nt))
        h[..., 0] = 1
        return cls.from_time(fs, h)

    @classmethod
    def join(cls, tfs, axis=0, newaxis=True):
        """Concat or stack a set of Responses along a given axis.

        Parameters
        ----------
        tfs : array_like
            List of Responses
        axis : int, optional
            Indice of axis along wich to concatenate / stack TFs.
        newaxis : bool, optional
            If True, do not concatenate but stack arrays along a new axis.

        Returns
        -------
        Response

        Note
        ----
        Transfer functions need to have same sampling rate, length etc.

        """
        joinfunc = np.stack if newaxis else np.concatenate
        tdata = joinfunc([tf.in_time for tf in tfs], axis=axis)
        return cls.from_time(tfs[0].fs, tdata)

    @property
    def time_length(self):
        """Length of time response in seconds."""
        return self._time_length

    @property
    def nf(self):  # noqa: D401
        """Number of frequencies in frequency representation."""
        return len(self._freqs)

    @property
    def nt(self):  # noqa: D401
        """Number of taps."""
        return len(self._times)

    @property
    def fs(self):  # noqa: D401
        """Sampling frequency."""
        return self._fs

    @property
    def freqs(self):  # noqa: D401
        """Frequencies."""
        return self._freqs

    @property
    def times(self):  # noqa: D401
        """Times."""
        return self._times

    @property
    def in_time(self):
        """Time domain response.

        Returns
        -------
        (... , n) ndarray
            Real FIR filters.

        """
        if self._in_time is None:
            self._in_time = np.fft.irfft(self._in_freq, n=self._times.size)
        return self._in_time

    @property
    def in_freq(self):
        """Single sided frequency spectrum.

        Returns
        -------
        (... , n) ndarray
            Complex frequency response.

        """
        if self._in_freq is None:
            self._in_freq = np.fft.rfft(self._in_time)
        return self._in_freq

    @property
    def amplitude_spectrum(self):
        """Amplitude spectrum."""
        X = self.in_freq / self.nt

        if self.nt % 2 == 0:
            # zero and nyquist element only appear once in complex spectrum
            X[..., 1:-1] *= 2
        else:
            # there is no nyquist element
            X[..., 1:] *= 2

        return X

    def __set_time_data(self, tdata):
        """Set time data without creating new object."""
        assert tdata.shape[-1] == self._nt
        self._in_time = tdata
        self._in_freq = None

    def __set_frequency_data(self, fdata):
        """Set frequency data without creating new object."""
        assert fdata.shape[-1] == self._nf
        self._in_freq = fdata
        self._in_time = None

    def plot(
        self,
        group_delay=False,
        slce=None,
        flim=None,
        dblim=None,
        tlim=None,
        grpdlim=None,
        dbref=1,
        show=False,
        use_fig=None,
        label=None,
        unwrap_phase=False,
        logf=True,
        third_oct_f=True,
        plot_kw={},
        **fig_kw,
    ):
        """Plot the response in both domains.

        Parameters
        ----------
        group_delay : bool, optional
            Display group delay instead of phase.
        slce : numpy.lib.index_tricks.IndexExpression
            only plot subset of responses defined by a slice. Last
            dimension (frequency or time) is always completely taken.
        flim : tuple or None, optional
            Frequency axis limits as tuple `(lower, upper)`
        dblim : tuple or None, optional
            Magnitude axis limits as tuple `(lower, upper)`
        tlim : tuple or None, optional
            Time axis limits as tuple `(lower, upper)`
        grpdlim: tuple or None, optional
            Group delay axis limit as tuple `(lower, upper)`
        dbref : float
            dB reference in magnitude plot
        show : bool, optional
            Run `matplotlib.pyplot.show()`
        use_fig : matplotlib.pyplot.Figure
            Reuse an existing figure.
        label : None, optional
            Description
        unwrap_phase : bool, optional
            unwrap phase in phase plot
        logf : bool, optional
            If `True`, use logarithmic frequency axis.
        third_oct_f: bool, optional
            Label frequency axis with third octave bands.
        plot_kw : dictionary, optional
            Keyword arguments passed to the `plt.plot` commands.
        **fig_kw
            Additional options passe to figure creation.

        """
        if use_fig is None:
            fig_kw = {**{"figsize": (10, 10)}, **fig_kw}
            fig, axes = plt.subplots(nrows=3, constrained_layout=True, **fig_kw)
        else:
            fig = use_fig
            axes = fig.axes

        self.plot_magnitude(
            use_ax=axes[0],
            slce=slce,
            dblim=dblim,
            flim=flim,
            dbref=dbref,
            label=label,
            plot_kw=plot_kw,
            logf=logf,
            third_oct_f=third_oct_f,
        )
        if group_delay:
            self.plot_group_delay(
                use_ax=axes[1],
                slce=slce,
                flim=flim,
                ylim=grpdlim,
                plot_kw=plot_kw,
                logf=logf,
                third_oct_f=third_oct_f,
            )
        else:
            self.plot_phase(
                use_ax=axes[1],
                slce=slce,
                flim=flim,
                plot_kw=plot_kw,
                unwrap=unwrap_phase,
                logf=logf,
                third_oct_f=third_oct_f,
            )
        self.plot_time(
            use_ax=axes[2], tlim=tlim, slce=slce, plot_kw=plot_kw
        )

        if show:
            plt.show()

        return fig

    def plot_magnitude(
        self,
        use_ax=None,
        slce=None,
        dblim=None,
        flim=None,
        dbref=1,
        label=None,
        plot_kw={},
        logf=True,
        third_oct_f=True,
        **fig_kw,
    ):
        """Plot magnitude response."""
        # TODO: compute db limits similar to librosa.amplitude_to_db / power_to_db
        if use_ax is None:
            fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
            fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
        else:
            ax = use_ax
            fig = ax.get_figure()

        # append frequency/time dimension to slice
        if slce is None:
            slce = [np.s_[:] for n in range(len(self.in_time.shape))]
        elif isinstance(slce, tuple):
            slce = slce + (np.s_[:],)
        else:
            slce = (slce, np.s_[:])

        # move time / frequency axis to first dimension
        freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape(
            (self.nf, -1)
        )

        plotf = ax.semilogx if logf else ax.plot
        plotf(
            self.freqs,
            20 * np.log10(np.abs(freq_plotready / dbref)),
            label=label,
            **plot_kw,
        )
        ax.set_xlabel("Frequency [Hz]")
        ax.set_ylabel("Magnitude [dB]")
        ax.set_title("Frequency response")
        ax.grid(True)

        if flim is None:
            lowlim = min(10, self.fs / 2 / 100)
            flim = (lowlim, self.fs / 2)
        ax.set_xlim(flim)

        if dblim is not None:
            ax.set_ylim(dblim)

        if label is not None:
            ax.legend()

        if third_oct_f:
            _add_octave_band_xticks(ax)

        return fig

    def plot_phase(
        self,
        use_ax=None,
        slce=None,
        flim=None,
        label=None,
        unwrap=False,
        ylim=None,
        plot_kw={},
        logf=True,
        third_oct_f=True,
        **fig_kw,
    ):
        """Plot phase response."""
        if use_ax is None:
            fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
            fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
        else:
            ax = use_ax
            fig = ax.get_figure()

        # append frequency/time dimension to slice
        if slce is None:
            slce = [np.s_[:] for n in range(len(self.in_time.shape))]
        elif isinstance(slce, tuple):
            slce = slce + (np.s_[:],)
        else:
            slce = (slce, np.s_[:])

        # move time / frequency axis to first dimension
        freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape(
            (self.nf, -1)
        )
        phase = (
            np.unwrap(np.angle(freq_plotready)) if unwrap else np.angle(freq_plotready)
        )

        plotf = ax.semilogx if logf else ax.plot
        plotf(self.freqs, phase, label=label, **plot_kw)
        ax.set_xlabel("Frequency [Hz]")
        ax.set_ylabel("Phase [rad]")
        ax.set_title("Phase response")
        ax.grid(True)

        if flim is None:
            lowlim = min(10, self.fs / 2 / 100)
            flim = (lowlim, self.fs / 2)
        ax.set_xlim(flim)
        if ylim:
            ax.set_ylim(ylim)

        if label is not None:
            ax.legend()

        if third_oct_f:
            _add_octave_band_xticks(ax)

        return fig

    def plot_time(
        self,
        use_ax=None,
        slce=None,
        tlim=None,
        ylim=None,
        label=None,
        plot_kw={},
        **fig_kw,
    ):
        """Plot time response."""
        if use_ax is None:
            fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
            fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
        else:
            ax = use_ax
            fig = ax.get_figure()

        # append frequency/time dimension to slice
        if slce is None:
            slce = [np.s_[:] for n in range(len(self.in_time.shape))]
        elif isinstance(slce, tuple):
            slce = slce + (np.s_[:],)
        else:
            slce = (slce, np.s_[:])

        time_plotready = np.rollaxis(self.in_time[tuple(slce)], -1).reshape(
            (self.nt, -1)
        )

        ax.plot(self.times, time_plotready, label=label, **plot_kw)
        ax.set_xlabel("Time [s]")
        ax.set_ylabel("")
        ax.set_title("Time response")
        ax.grid(True)

        if tlim:
            ax.set_xlim(tlim)
        if ylim:
            ax.set_ylim(ylim)

        if label is not None:
            ax.legend()

        return fig

    def plot_group_delay(
        self,
        use_ax=None,
        slce=None,
        flim=None,
        label=None,
        ylim=None,
        plot_kw={},
        logf=True,
        third_oct_f=True,
        **fig_kw,
    ):
        """Plot group delay."""
        if use_ax is None:
            fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
            fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
        else:
            ax = use_ax
            fig = ax.get_figure()

        # append frequency/time dimension to slice
        if slce is None:
            slce = [np.s_[:] for n in range(len(self.in_time.shape))]
        elif isinstance(slce, tuple):
            slce = slce + (np.s_[:],)
        else:
            slce = (slce, np.s_[:])

        # move time / frequency axis to first dimension
        freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape(
            (self.nf, -1)
        )

        df = self.freqs[1] - self.freqs[0]
        # TODO: use scipy.signal.group_delay here as below has problem at larger delays
        grpd = -np.gradient(np.unwrap(np.angle(freq_plotready)), df, axis=0)

        plotf = ax.semilogx if logf else ax.plot
        plotf(self.freqs, grpd, label=label, **plot_kw)
        ax.set_xlabel("Frequency [Hz]")
        ax.set_ylabel("Delay [s]")
        ax.set_title("Group Delay")
        ax.grid(True)

        if flim is None:
            lowlim = min(10, self.fs / 2 / 100)
            flim = (lowlim, self.fs / 2)
        ax.set_xlim(flim)

        if ylim:
            ax.set_ylim(ylim)

        if label is not None:
            ax.legend()

        if third_oct_f:
            _add_octave_band_xticks(ax)

        return fig

    def plot_power_in_bands(
        self, bands=None, use_ax=None, barkwargs={}, avgaxis=None, dbref=1, **figkwargs
    ):
        """Plot signal's power in bands.

        Parameters
        ----------
        bands : list or None, optional
            List of tuples (f_center, f_lower, f_upper). If `None`, use third octave
            bands.
        use_ax : matplotlib.axis.Axis or None, optional
            Plot into this axis.
        barkwargs : dict
            Keyword arguments to `axis.bar`
        avgaxis : int, tuple or None
            Average power over these axes.
        dbref : float
            dB reference.
        **figkwargs
            Keyword arguments passed to plt.subplots

        Returns
        -------
        P : ndarray
            Power in bands
        fc : ndarray
            Band frequencies
        fig : matplotlib.figure.Figure
            Figure

        """
        P, fc = self.power_in_bands(bands=bands, avgaxis=avgaxis)

        nbands = P.shape[-1]
        P = np.atleast_2d(P).reshape((-1, nbands))

        if use_ax is None:
            fig, ax = plt.subplots(**figkwargs)
        else:
            ax = use_ax
            fig = ax.get_figure()

        xticks = range(1, nbands + 1)
        for i in range(P.shape[0]):
            ax.bar(xticks, 10 * np.log10(P[i] / dbref ** 2), **barkwargs)
        ax.set_xticks(xticks)
        ax.set_xticklabels(["{:.0f}".format(f) for f in fc], rotation="vertical")
        ax.grid(True)
        ax.set_xlabel("Band's center frequencies [Hz]")
        ax.set_ylabel("Power [dB]")

        return (P, fc, fig)

    def time_window(self, startwindow, stopwindow, window="hann"):
        """Apply time domain windows.

        Parameters
        ----------
        startwindow : None or tuple
            Tuple (t1, t2) with beginning and end times of window opening.
        stopwindow : None or tuple
            Tuple (t1, t2) with beginning and end times of window closing.
        window : string or tuple of string and parameter values, optional
            Desired window to use. See scipy.signal.get_window for a list of
            windows and required parameters.

        Returns
        -------
        Response
            Time windowed response object

        """
        n = self.times.size
        twindow = _time_window(self.fs, n, startwindow, stopwindow, window=window)
        new_response = self.from_time(self.fs, self.in_time * twindow)

        return new_response

    def freq_window(self, startwindow, stopwindow, window="hann"):
        """Apply frequency domain window.

        Parameters
        ----------
        startwindow : None or tuple
            Tuple (t1, t2) with beginning and end frequencies of window opening.
        stopwindow : None or tuple
            Tuple (t1, t2) with beginning and end frequencies of window closing.
        window : string or tuple of string and parameter values, optional
            Desired window to use. See scipy.signal.get_window for a list of
            windows and required parameters.

        Returns
        -------
        Response
            Frequency windowed response object

        """
        n = self.times.size
        fwindow = _freq_window(self.fs, n, startwindow, stopwindow, window=window)
        new_response = self.from_freq(self.fs, self.in_freq * fwindow)

        return new_response

    def window_around_peak(self, tleft, tright, alpha, return_window=False):
        """Time window each impulse response around its peak value.

        Parameters
        ----------
        tleft, tright : float
            Window starts `tleft` seconds before and ends `tright` seconds after maximum
            of impulse response.
        alpha : float
            `alpha` parameter of `scipy.signal.tukey` window.
        return_window : bool, optional
            Also return used time window

        Returns
        -------
        Response
            Time windowed response object.
        ndarray
            Time window, if `return_window` is `True`.

        """
        window = _construct_window_around_peak(
            self.fs, self.in_time, tleft, tright, alpha=alpha
        )

        if return_window:
            return self.from_time(self.fs, self.in_time * window), window

        return self.from_time(self.fs, self.in_time * window)

    def delay(self, dt, keep_length=True):
        """Delay time response by dt seconds.

        Rounds of to closest integer delay.
        """
        x = delay(self.fs, self.in_time, dt, keep_length=keep_length)
        return self.from_time(self.fs, x)

    def circdelay(self, dt):
        """Delay by circular shift.

        Rounds of to closest integer delay.
        """
        x = self.in_time
        n = int(round(dt * self.fs))
        shifted = np.roll(x, n, axis=-1)

        return self.from_time(self.fs, shifted)

    def timecrop(self, start, end):
        """Crop time response.

        Parameters
        ----------
        start, end : float
            Start and end times in seconds. Does not include sample at t=end. Use
            end=None to force inclusion of last sample.

        Returns
        -------
        Response
            New Response object with cropped time.

        Notes
        -----
        Creates new Response object.

        Examples
        --------
        >>> import numpy as np
        >>> from response import Response
        >>> r = Response.from_time(100, np.random.normal(size=100))
        >>> split = 0.2

        The following holds:

        >>> np.all(np.concatenate(
        ...     (
        ...         r.timecrop(0, split).in_time,
        ...         r.timecrop(split, None).in_time,
        ...     ),
        ...     axis=-1,
        ... ) == r.in_time)
        True

        """
        if start < 0:
            start += self.time_length
        if end is not None and end < 0:
            end += self.time_length
        assert 0 <= start < self.time_length
        assert end is None or (0 < end <= self.time_length)

        _, i_start = _find_nearest(self.times, start)
        if end is None:
            i_end = None
        else:
            _, i_end = _find_nearest(self.times, end)

        h = self.in_time[..., i_start:i_end]

        new_response = self.from_time(self.fs, h)

        return new_response

    def non_causal_timecrop(self, length):
        """Cut length of non-causal impulse response.

        "FFT shift, cropping on both ends, iFFT shift"

        Parameters
        ----------
        length : float
            final length in seconds

        Returns
        -------
        Response
            New Response object new length.

        Note
        ----
        Can introduce delay pre-delay by a sample.

        """
        assert length < self.time_length

        cut = (self.time_length - length) / 2

        _, i_start = _find_nearest(self.times, cut)
        _, i_end = _find_nearest(self.times, self.time_length - cut)

        h = np.fft.ifftshift(np.fft.fftshift(self.in_time)[..., i_start:i_end])

        new_response = self.from_time(self.fs, h)

        if new_response.time_length != length:
            w = f"Could not precisely shrink to {length}s with fs = {self.fs}"
            warnings.warn(w)

        return new_response

    def zeropad(self, before, after):
        """Zeropad time response.

        Parameters
        ----------
        before, after : int
            Number of zero samples inserted before and after response.

        Returns
        -------
        Response
            Zeropadded response

        """
        assert before % 1 == 0
        assert after % 1 == 0
        dims = self.in_time.ndim

        pad_width = [(0, 0) for n in range(dims)]
        pad_width[-1] = (int(before), int(after))

        h = np.pad(self.in_time, pad_width, "constant")

        return self.from_time(self.fs, h)

    def zeropad_to_power_of_2(self):
        """Pad time response for length of power of 2.

        Returns
        -------
        Response
            New response object with larger, power of 2 length.

        """
        # https://stackoverflow.com/questions/14267555/find-the-smallest-power-of-2-greater-than-n-in-python
        n = 2 ** (self.nt - 1).bit_length()
        return self.zeropad(0, n - self.nt)

    def zeropad_to_length(self, n):
        """Zeropad time response to specific length.

        Returns
        -------
        Response
            New response object with new length n.

        """
        oldn = self.nt
        assert n >= oldn
        return self.zeropad(0, n - oldn)

    def resample(self, fs_new, normalize="same_gain", window=None):
        """Resample using Fourier method.

        Parameters
        ----------
        fs_new : int
            New sample rate
        normalize : str, optional
            If 'same_gain', normalize such that the gain is the same
            as the original signal. If 'same_amplitude', amplitudes will be preserved.
        window : None, optional
            Passed to scipy.signal.resample.

        Returns
        -------
        Response
            New resampled response object.

        Raises
        ------
        ValueError
            If resulting number of samples would be a non-integer.

        """
        if fs_new == self.fs:
            return self

        nt_new = fs_new * self.time_length

        if nt_new % 1 != 0:
            raise ValueError(
                "New number of samples must be integer, but is {}".format(nt_new)
            )

        nt_new = int(nt_new)

        h_new = resample(self.in_time, nt_new, axis=-1, window=window)

        if normalize == "same_gain":
            h_new *= self.nt / nt_new
        elif normalize == "same_amplitude":
            pass
        else:
            raise ValueError(
                "Expected 'same_gain' or 'same_amplitude', got %s" % (normalize,)
            )

        return self.from_time(fs_new, h_new)

    def resample_poly(self, fs_new, normalize="same_gain", window=("kaiser", 5.0)):
        """Resample using polyphase filtering.

        Parameters
        ----------
        fs_new : int
            New sample rate
        normalize : str, optional
            If 'same_gain', normalize such that the gain is the same
            as the original signal. If 'same_amplitude', amplitudes will be preserved.
        window : None, optional
            Passed to scipy.signal.resample_poly.

        Returns
        -------
        Response
            New resampled response object.

        """
        if fs_new == self.fs:
            return self

        ratio = Fraction(fs_new, self.fs)
        up = ratio.numerator
        down = ratio.denominator

        if up > 1000 or down > 1000:
            print("Warning: resampling with high ratio {}/{}".format(up, down))

        h_new = resample_poly(self.in_time, up, down, axis=-1, window=window)

        if normalize == "same_gain":
            h_new *= down / up
        elif normalize == "same_amplitude":
            pass
        else:
            raise ValueError(
                "Expected 'same_gain' or 'same_amplitude', got %s" % (normalize,)
            )

        return self.from_time(fs_new, h_new)

    def normalize(self, maxval=1):
        """Normalize time response.

        Parameters
        ----------
        maxval: float, optional
            Maximum amplitude in resulting time response.

        Returns
        -------
        Response

        """
        h = self.in_time
        h /= np.abs(self.in_time).max()
        h *= maxval
        return self.from_time(self.fs, h)

    def export_wav(self, folder, name_fmt="{:02d}.wav", dtype=np.int16):
        """Export impulse response to wave files.

        Dimension of data must 2.

        Parameters
        ----------
        folder : file path
            Save in this folder
        name_fmt : str, optional
            Format string for file names with one placeholder, e.g. 'filt1{:02d}.wav'.
        dtype : one of np.float32, np.int32, np.int16, np.uint8
            Data is converted to this type.

        """
        data = np.atleast_2d(self.in_time)

        assert data.ndim == 2
        assert np.all(np.abs(data) <= 1.0)

        # convert and scale to new output datatype
        if dtype in [np.uint8, np.int16, np.int32]:
            lim_orig = (-1.0, 1.0)
            lim_new = (np.iinfo(dtype).min, np.iinfo(dtype).max)
            data = _rescale(data, lim_orig, lim_new).astype(dtype)
        elif dtype != np.float32:
            raise TypeError(f"dtype {dtype} is not supported by scipy.wavfile.write.")

        path = Path(folder)
        if not path.is_dir():
            path.mkdir(parents=True, exist_ok=False)

        for i in range(data.shape[0]):
            wavfile.write(path / name_fmt.format(i + 1), self.fs, data[i])

    def export_npz(self, filename, dtype=np.float32):
        """Export impulse response as npz file.

        Parameters
        ----------
        filename: str or Path
            File path
        dtype: numpy dtype
            Convert to this type before saving

        """
        np.savez(
            filename, impulse_response=self.in_time.astype(dtype), samplerate=self.fs
        )

    def power_in_bands(self, bands=None, avgaxis=None):
        """Compute power of signal in third octave bands.

        Power(band) =   1/T  integral  |X(f)| ** 2 df
                            f in band

        Parameters
        ----------
        bands : list of tuples, length nbands optional
            Center, lower and upper frequencies of bands.
        avgaxis: int, tuple or None
            Average result over these axis

        Returns
        -------
        P: ndarray, shape (..., nbands)
            Power in bands
        fcs: list, length nbands
            Center frequencies of bands

        """
        if bands is None:
            bands = _third_octave_bands

        # center frequencies
        fcs = np.asarray([b[0] for b in bands])
        Npow2 = 2 ** (self.nt - 1).bit_length()
        f = np.fft.fftfreq(Npow2, d=1 / self.fs)

        shape = list(self.in_freq.shape)
        shape[-1] = len(bands)
        P = np.zeros(shape)
        for i, (fc, fl, fu) in enumerate(bands):
            if fu < self.fs / 2:  # include only bands in frequency range
                iband = np.logical_and(fl <= f, f < fu)
                P[..., i] = np.sum(
                    np.abs(np.fft.fft(self.in_time, n=Npow2, axis=-1)[..., iband]) ** 2
                    * 2  # energy from negative and positive frequencies
                    * self.dt
                    / self.nt
                    / self.time_length,
                    axis=-1,
                )
            else:
                P[..., i] = 0

        if avgaxis is not None:
            P = P.mean(axis=avgaxis)

        return P, fcs

    @classmethod
    def time_vector(cls, n, fs):
        """Time values of filter with n taps sampled at fs.

        Parameters
        ----------
        n : int
            number of taps in FIR filter
        fs : int
            sampling frequency in Hertz

        Returns
        -------
        (n) ndarray
            times in seconds

        """
        return time_vector(n, fs)

    @classmethod
    def freq_vector(cls, n, fs):
        """Frequency values of filter with n taps sampled at fs up to Nyquist.

        Parameters
        ----------
        n : int
            Number of taps in FIR filter
        fs : int
            Sampling frequency in Hertz

        Returns
        -------
        (n // 2 + 1) ndarray
            Frequencies in Hz

        """
        return freq_vector(n, fs, sided='single')

    def filter(self, b, a=[1]):
        """Filter response along one-dimension with an IIR or FIR filter.

        Parameters
        ----------
        b : array_like
            The numerator coefficient vector in a 1-D sequence.
        a : array_like, optional
            The denominator coefficient vector in a 1-D sequence.  If ``a[0]``
            is not 1, then both `a` and `b` are normalized by ``a[0]``.

        """
        return self.from_time(self.fs, lfilter(b, a, self.in_time, axis=-1))

    def add_noise(self, snr, unit=None):
        """Add noise to x with relative noise level SNR.

        Parameters
        ----------
        snr : float
            relative magnitude of noise, i.e. snr = Ex/En
        unit : None or str, optional
            if "dB", SNR is specified in dB, i.e. SNR = 10*log(Ex/En).

        Returns
        -------
        Response

        """
        return self.from_time(self.fs, noisify(self.in_time, snr, unit=unit))

    def psd(self, **kwargs):
        """Compute the power spectral density of the signal.

        Parameters
        ----------
        kwargs
            keword arguments passed to scipy.signal.welch

        Returns
        -------
        f : ndarray
            Array of sample frequencies.
        Pxx : ndarray
            Power spectral density of time signal.

        Notes
        -----
        Use scaling='density' for power per bin bandwidth and scaling='spectrum' for
        power per bin.

        """
        return welch(self.in_time, fs=self.fs, **kwargs)


####################
# Module functions #
####################


def noisify(x, snr, unit=None):
    """Add noise to x with relative noise level SNR.

    Parameters
    ----------
    x : ndarray
        data
    snr : float
        relative energy of noise, snr = Energy(x)/Energy(n).
    unit : None or str, optional
        if "dB", snr is specified in dB, i.e. snr = 10*log(Ex/En).

    Returns
    -------
    ndarray
        data with noise

    Examples
    --------
    Create signal

    >>> import numpy as np
    >>> t = np.linspace(0, 1, 1000000, endpoint=False)
    >>> x = np.sin(2*np.pi*10*t)  # signal

    Add noise with 6 dB SNR to a sinusoidal signal:

    >>> snrdB = 6
    >>> xn = noisify(x, snrdB, "dB")  # signal plus noise

    >>> energy_x = np.linalg.norm(x)**2
    >>> energy_xn = np.linalg.norm(xn)**2
    >>> snr = 10 ** (snrdB / 20)
    >>> np.allclose((1 + 1/snr) * energy_x, energy_xn, rtol=1e-2)
    True

    """
    if unit == "dB":
        snr = 10 ** (snr / 20)

    if np.iscomplexobj(x):
        n = np.random.standard_normal(x.shape) + 1j * np.random.standard_normal(x.shape)
    else:
        n = np.random.standard_normal(x.shape)

    n *= 1 / np.sqrt(snr) * np.linalg.norm(x) / np.linalg.norm(n)

    return x + n


def time_vector(n, fs):
    """Time values of filter with n taps sampled at fs.

    Parameters
    ----------
    n : int
        number of taps in FIR filter
    fs : int
        sampling frequency in Hertz

    Returns
    -------
    (n) ndarray
        times in seconds

    """
    T = 1 / fs
    return np.arange(n, dtype=float) * T  # float against int wrapping


def freq_vector(n, fs, sided="single"):
    """Frequency values of filter with n taps sampled at fs up to Nyquist.

    Parameters
    ----------
    n : int
        Number of taps in FIR filter
    fs : int
        Sampling frequency in Hertz
    sided: str
        Generate frequencies for a "single" or "double" sided spectrum

    Returns
    -------
    (n // 2 + 1) ndarray
        Frequencies in Hz

    """
    # use float against int wrapping
    if sided == "single":
        f = np.arange(n // 2 + 1, dtype=float) * fs / n
    elif sided == "double":
        f = np.arange(n, dtype=float) * fs / n
    else:
        raise ValueError("Invalid value for sided.")

    return f


def delay(fs, x, dt, keep_length=True, axis=-1):
    """Delay time signal by dt seconds by inserting zeros.

    Examples
    --------
    >>> delay(1, [1, 2, 3], 1)
    array([0., 1., 2.])

    >>> delay(1, [1, 2, 3], 1, keep_length=False)
    array([0., 1., 2., 3.])

    >>> delay(1, [1, 0, 0], -1)
    array([0., 0., 0.])

    >>> delay(1, [1, 0, 0], -1, keep_length=False)
    array([0, 0])

    """
    dn = int(round(dt * fs))
    x = np.asarray(x)
    n = x.shape[axis]

    if dn > 0:
        # delay
        zeros_shape = list(x.shape)
        zeros_shape[axis] = dn
        zeros = np.zeros(zeros_shape)

        delayed = np.concatenate((zeros, x), axis=axis)

        if keep_length:
            # slice that takes 0 to ntaps samples along axis
            slc = [slice(None)] * len(x.shape)
            slc[axis] = slice(0, n)
            delayed = delayed[tuple(slc)]

    elif dn < 0:
        # pre-delay
        slc = [slice(None)] * len(x.shape)
        slc[axis] = slice(-dn, n)
        delayed = x[tuple(slc)]

        if keep_length:
            zeros_shape = list(x.shape)
            zeros_shape[axis] = -dn
            zeros = np.zeros(zeros_shape)
            delayed = np.concatenate((delayed, zeros), axis=axis)
    else:
        # no delay
        delayed = x

    return delayed


def delay_between(h1, h2):
    """Estimate delay of h2 relative to h1 using cross correlation.

    Parameters
    ----------
    h1 : ((N,) L) array_like
        Reference signals.
    h2 : ((M,) L) array_like
        Delayed signals.

    Returns
    -------
    delay : (N, M) ndarray
        Delays in samples. `h2[j]` is delayed relative to `h1[i]` by `delay[i, j]`.

    Examples
    --------
    >>> a = [1, 0, 0, 0]
    >>> b = [0, 0, 1, 0]
    >>> delay_between(a, b)
    array(2)

    """
    h1 = np.atleast_2d(h1)
    h2 = np.atleast_2d(h2)
    assert h1.shape[-1] == h2.shape[-1], "h1 and h2 must have same number of samples"

    L = h1.shape[-1]

    delay = np.zeros((h1.shape[0], h2.shape[0]), dtype=int)
    for i in range(h1.shape[0]):
        for j in range(h2.shape[0]):
            xcorrmax = np.argmax(np.correlate(h2[j], h1[i], mode="full"))
            delay[i, j] = xcorrmax - L + 1

    return delay.squeeze()


def align(h, href, upsample=1):
    """Align two impulse responses using cross correlation.

    Parameters
    ----------
    h : array_like
        Response that will be aligned.
    href : array_like
        Response to which will be aligned.
    upsample : int, optional
        Upsample both responses before alignment by this factor.

    Returns
    -------
    ndarray
        Time aligned version of `h`.

    """
    href = resample_poly(href, upsample, 1)
    h = resample_poly(h, upsample, 1)
    delay = delay_between(href, h).squeeze()
    h = np.roll(h, -int(delay))
    h = resample_poly(h, 1, upsample)
    return h


#####################
# Utility functions #
#####################


def _sample_window(n, startwindow, stopwindow, window="hann"):
    """Create a sample domain window."""
    swindow = np.ones(n)

    if startwindow is not None:
        length = startwindow[1] - startwindow[0]
        w = get_window(window, 2 * length, fftbins=False)[:length]
        swindow[: startwindow[0]] = 0
        swindow[startwindow[0] : startwindow[1]] = w

    if stopwindow is not None:
        # stop window
        length = stopwindow[1] - stopwindow[0]
        w = get_window(window, 2 * length, fftbins=False)[length:]
        swindow[stopwindow[0] + 1 : stopwindow[1] + 1] = w
        swindow[stopwindow[1] + 1 :] = 0

    return swindow


def _time_window(fs, n, startwindow_t, stopwindow_t, window="hann"):
    """Create a time domain window.

    Negative times are relative to the end. Short cut for end time is `None`.
    """
    times = time_vector(n, fs)
    T = times[-1] + times[1]  # total time length

    if startwindow_t is None:
        startwindow_n = None
    else:
        startwindow_n = []
        for t in startwindow_t:
            if t < 0:
                t += T
            assert 0 <= t or t <= T
            startwindow_n.append(_find_nearest(times, t)[1])

    if stopwindow_t is None:
        stopwindow_n = None
    else:
        stopwindow_n = []
        for t in stopwindow_t:
            if t is None:
                t = times[-1]
            elif t < 0:
                t += T
            assert 0 <= t or t <= T
            stopwindow_n.append(_find_nearest(times, t)[1])

    twindow = _sample_window(n, startwindow_n, stopwindow_n, window=window)

    return twindow


def _freq_window(fs, n, startwindow_f, stopwindow_f, window="hann"):
    """Create a frequency domain window."""
    freqs = freq_vector(n, fs)

    if startwindow_f is not None:
        startwindow_n = [_find_nearest(freqs, f)[1] for f in startwindow_f]
    else:
        startwindow_n = None

    if stopwindow_f is not None:
        stopwindow_n = [_find_nearest(freqs, f)[1] for f in stopwindow_f]
    else:
        stopwindow_n = None

    fwindow = _sample_window(len(freqs), startwindow_n, stopwindow_n, window=window)

    return fwindow


def _rescale(x, xlim, ylim):
    """Rescale values to new bounds.

    Parameters
    ----------
    x : ndarray
        Values to rescale
    xlim : tuple
        Original value bounds (xmin, xmax)
    ylim : float
        New value bounds (ymin, ymax)

    Returns
    -------
    ndarray
        Rescaled values

    """
    m = (ylim[1] - ylim[0]) / (xlim[1] - xlim[0])
    c = ylim[1] - m * xlim[1]
    y = m * x + c
    return y


def _find_nearest(array, value):
    """Find nearest value in an array and its index.

    Returns
    -------
    value
        Value of nearest entry in array
    idx
        Index of that value

    """
    idx = (np.abs(array - value)).argmin()
    return array[idx], idx


def _construct_window_around_peak(fs, irs, tleft, tright, alpha=0.5):
    """Create time window around maximum of response.

    Parameters
    ----------
    fs : int
        Sample rate.
    irs : array_like
        Input response.
    tleft : float
        Start of time window relative to impulse response peak.
    tright : float
        End of time window relative to impulse response peak.
    alpha : float, optional
        Alpha parameter of Tukey window.

    Returns
    -------
    ndarray
        Time windows.

    """
    orig_shape = irs.shape
    flat_irs = irs.reshape(-1, irs.shape[-1])

    sleft = int(fs * tleft)
    sright = int(fs * tright)

    windows = np.ones(flat_irs.shape)
    for i in range(flat_irs.shape[0]):
        ipeak = np.argmax(np.abs(flat_irs[i]))
        iwstart = max(ipeak - sleft, 0)
        iwend = min(ipeak + sright, flat_irs.shape[-1])

        window = tukey(iwend - iwstart, alpha=alpha)

        windows[i, iwstart:iwend] *= window
        windows[i, :iwstart] = 0
        windows[i, iwend:] = 0

    return windows.reshape(orig_shape)


def _aroll(x, n, circular=False, axis=-1, copy=True):
    """Roll each entry along axis individually.

    Can be used to delay / shift each response by its own shift.

    Parameters
    ----------
    x : ndarray (Ni...,M,Nj...)
        Input array
    n : ndarray (Ni...,Nj...)
        Delay times of each entry along axis.
    circular: bool, optional
        If True, wrap around ends. Else replace with zeros.
    axis : int, optional
        Axis along which is rolled.
    copy : bool, optional
        If True, operate on copy of `x`. Else roll inplace.

    Returns
    -------
    ndarray (Ni...,M,Nj...)
        Array with rolled entries

    """
    n = n.astype(int)

    if copy:
        x = x.copy()

    # move axis to first dim and reshape to 2D
    xview = np.rollaxis(x, axis)
    xview = xview.reshape(xview.shape[0], -1)
    n = n.reshape(-1)

    assert n.shape[0] == xview.shape[1], 'Shapes of x and n do not match.'

    for i in range(n.shape[0]):
        xview[:, i] = np.roll(xview[:, i], n[i])

        if not circular:
            if n[i] > 0:
                xview[: n[i], i] = 0
            elif n[i] < 0:
                xview[n[i] :, i] = 0

    return x


# center, lower, upper frequency of third octave bands
_third_octave_bands = (
    (16, 13.920_292_470_942_801, 17.538_469_504_833_955),
    (20, 17.538_469_504_833_95, 22.097_086_912_079_607),
    (25, 22.097_086_912_079_615, 27.840_584_941_885_613),
    (31, 27.840_584_941_885_602, 35.076_939_009_667_91),
    (40, 35.076_939_009_667_9, 44.194_173_824_159_215),
    (50, 44.194_173_824_159_23, 55.681_169_883_771_226),
    (63, 55.681_169_883_771_204, 70.153_878_019_335_82),
    (80, 70.153_878_019_335_82, 88.388_347_648_318_44),
    (100, 88.388_347_648_318_43, 111.362_339_767_542_41),
    (125, 111.362_339_767_542_41, 140.307_756_038_671_64),
    (160, 140.307_756_038_671_64, 176.776_695_296_636_9),
    (200, 176.776_695_296_636_86, 222.724_679_535_084_82),
    (250, 222.724_679_535_084_82, 280.615_512_077_343_3),
    (315, 280.615_512_077_343_3, 353.553_390_593_273_8),
    (400, 353.553_390_593_273_8, 445.449_359_070_169_75),
    (500, 445.449_359_070_169_63, 561.231_024_154_686_6),
    (630, 561.231_024_154_686_6, 707.106_781_186_547_6),
    (800, 707.106_781_186_547_6, 890.898_718_140_339_5),
    (1000, 890.898_718_140_339_3, 1122.462_048_309_373_1),
    (1260, 1122.462_048_309_373_1, 1414.213_562_373_095),
    (1600, 1414.213_562_373_094_9, 1781.797_436_280_678_5),
    (2000, 1781.797_436_280_678_5, 2244.924_096_618_746_3),
    (2500, 2244.924_096_618_746_3, 2828.427_124_746_19),
    (3200, 2828.427_124_746_19, 3563.594_872_561_358),
    (4000, 3563.594_872_561_357, 4489.848_193_237_492_5),
    (5000, 4489.848_193_237_492_5, 5656.854_249_492_38),
    (6300, 5656.854_249_492_379_5, 7127.189_745_122_714),
    (8000, 7127.189_745_122_714, 8979.696_386_474_985),
    (10000, 8979.696_386_474_985, 11313.708_498_984_76),
    (12600, 11313.708_498_984_759, 14254.379_490_245_428),
    (16000, 14254.379_490_245_428, 17959.392_772_949_97),
    (20000, 17959.392_772_949_966, 22627.416_997_969_518),
)


def _add_octave_band_xticks(ax, bands=np.array(_third_octave_bands)[:, 0]):
    """Add band ticks to axis."""
    left, right = ax.get_xlim()
    b = bands[np.logical_and(left <= bands, bands <= right)]
    ax.set_xticks(b)
    ax.set_xticks([], minor=True)
    ax.set_xticklabels([int(round(bs)) for bs in b], minor=False)

Functions

def align(h, href, upsample=1)

Align two impulse responses using cross correlation.

Parameters

h : array_like
Response that will be aligned.
href : array_like
Response to which will be aligned.
upsample : int, optional
Upsample both responses before alignment by this factor.

Returns

ndarray
Time aligned version of h.
Expand source code
def align(h, href, upsample=1):
    """Align two impulse responses using cross correlation.

    Parameters
    ----------
    h : array_like
        Response that will be aligned.
    href : array_like
        Response to which will be aligned.
    upsample : int, optional
        Upsample both responses before alignment by this factor.

    Returns
    -------
    ndarray
        Time aligned version of `h`.

    """
    href = resample_poly(href, upsample, 1)
    h = resample_poly(h, upsample, 1)
    delay = delay_between(href, h).squeeze()
    h = np.roll(h, -int(delay))
    h = resample_poly(h, 1, upsample)
    return h
def delay(fs, x, dt, keep_length=True, axis=-1)

Delay time signal by dt seconds by inserting zeros.

Examples

>>> delay(1, [1, 2, 3], 1)
array([0., 1., 2.])
>>> delay(1, [1, 2, 3], 1, keep_length=False)
array([0., 1., 2., 3.])
>>> delay(1, [1, 0, 0], -1)
array([0., 0., 0.])
>>> delay(1, [1, 0, 0], -1, keep_length=False)
array([0, 0])
Expand source code
def delay(fs, x, dt, keep_length=True, axis=-1):
    """Delay time signal by dt seconds by inserting zeros.

    Examples
    --------
    >>> delay(1, [1, 2, 3], 1)
    array([0., 1., 2.])

    >>> delay(1, [1, 2, 3], 1, keep_length=False)
    array([0., 1., 2., 3.])

    >>> delay(1, [1, 0, 0], -1)
    array([0., 0., 0.])

    >>> delay(1, [1, 0, 0], -1, keep_length=False)
    array([0, 0])

    """
    dn = int(round(dt * fs))
    x = np.asarray(x)
    n = x.shape[axis]

    if dn > 0:
        # delay
        zeros_shape = list(x.shape)
        zeros_shape[axis] = dn
        zeros = np.zeros(zeros_shape)

        delayed = np.concatenate((zeros, x), axis=axis)

        if keep_length:
            # slice that takes 0 to ntaps samples along axis
            slc = [slice(None)] * len(x.shape)
            slc[axis] = slice(0, n)
            delayed = delayed[tuple(slc)]

    elif dn < 0:
        # pre-delay
        slc = [slice(None)] * len(x.shape)
        slc[axis] = slice(-dn, n)
        delayed = x[tuple(slc)]

        if keep_length:
            zeros_shape = list(x.shape)
            zeros_shape[axis] = -dn
            zeros = np.zeros(zeros_shape)
            delayed = np.concatenate((delayed, zeros), axis=axis)
    else:
        # no delay
        delayed = x

    return delayed
def delay_between(h1, h2)

Estimate delay of h2 relative to h1 using cross correlation.

Parameters

h1 : ((N,) L) array_like
Reference signals.
h2 : ((M,) L) array_like
Delayed signals.

Returns

delay : (N, M) ndarray
Delays in samples. h2[j] is delayed relative to h1[i] by delay()[i, j].

Examples

>>> a = [1, 0, 0, 0]
>>> b = [0, 0, 1, 0]
>>> delay_between(a, b)
array(2)
Expand source code
def delay_between(h1, h2):
    """Estimate delay of h2 relative to h1 using cross correlation.

    Parameters
    ----------
    h1 : ((N,) L) array_like
        Reference signals.
    h2 : ((M,) L) array_like
        Delayed signals.

    Returns
    -------
    delay : (N, M) ndarray
        Delays in samples. `h2[j]` is delayed relative to `h1[i]` by `delay[i, j]`.

    Examples
    --------
    >>> a = [1, 0, 0, 0]
    >>> b = [0, 0, 1, 0]
    >>> delay_between(a, b)
    array(2)

    """
    h1 = np.atleast_2d(h1)
    h2 = np.atleast_2d(h2)
    assert h1.shape[-1] == h2.shape[-1], "h1 and h2 must have same number of samples"

    L = h1.shape[-1]

    delay = np.zeros((h1.shape[0], h2.shape[0]), dtype=int)
    for i in range(h1.shape[0]):
        for j in range(h2.shape[0]):
            xcorrmax = np.argmax(np.correlate(h2[j], h1[i], mode="full"))
            delay[i, j] = xcorrmax - L + 1

    return delay.squeeze()
def freq_vector(n, fs, sided='single')

Frequency values of filter with n taps sampled at fs up to Nyquist.

Parameters

n : int
Number of taps in FIR filter
fs : int
Sampling frequency in Hertz
sided : str
Generate frequencies for a "single" or "double" sided spectrum

Returns

(n // 2 + 1) ndarray Frequencies in Hz

Expand source code
def freq_vector(n, fs, sided="single"):
    """Frequency values of filter with n taps sampled at fs up to Nyquist.

    Parameters
    ----------
    n : int
        Number of taps in FIR filter
    fs : int
        Sampling frequency in Hertz
    sided: str
        Generate frequencies for a "single" or "double" sided spectrum

    Returns
    -------
    (n // 2 + 1) ndarray
        Frequencies in Hz

    """
    # use float against int wrapping
    if sided == "single":
        f = np.arange(n // 2 + 1, dtype=float) * fs / n
    elif sided == "double":
        f = np.arange(n, dtype=float) * fs / n
    else:
        raise ValueError("Invalid value for sided.")

    return f
def noisify(x, snr, unit=None)

Add noise to x with relative noise level SNR.

Parameters

x : ndarray
data
snr : float
relative energy of noise, snr = Energy(x)/Energy(n).
unit : None or str, optional
if "dB", snr is specified in dB, i.e. snr = 10*log(Ex/En).

Returns

ndarray
data with noise

Examples

Create signal

>>> import numpy as np
>>> t = np.linspace(0, 1, 1000000, endpoint=False)
>>> x = np.sin(2*np.pi*10*t)  # signal

Add noise with 6 dB SNR to a sinusoidal signal:

>>> snrdB = 6
>>> xn = noisify(x, snrdB, "dB")  # signal plus noise
>>> energy_x = np.linalg.norm(x)**2
>>> energy_xn = np.linalg.norm(xn)**2
>>> snr = 10 ** (snrdB / 20)
>>> np.allclose((1 + 1/snr) * energy_x, energy_xn, rtol=1e-2)
True
Expand source code
def noisify(x, snr, unit=None):
    """Add noise to x with relative noise level SNR.

    Parameters
    ----------
    x : ndarray
        data
    snr : float
        relative energy of noise, snr = Energy(x)/Energy(n).
    unit : None or str, optional
        if "dB", snr is specified in dB, i.e. snr = 10*log(Ex/En).

    Returns
    -------
    ndarray
        data with noise

    Examples
    --------
    Create signal

    >>> import numpy as np
    >>> t = np.linspace(0, 1, 1000000, endpoint=False)
    >>> x = np.sin(2*np.pi*10*t)  # signal

    Add noise with 6 dB SNR to a sinusoidal signal:

    >>> snrdB = 6
    >>> xn = noisify(x, snrdB, "dB")  # signal plus noise

    >>> energy_x = np.linalg.norm(x)**2
    >>> energy_xn = np.linalg.norm(xn)**2
    >>> snr = 10 ** (snrdB / 20)
    >>> np.allclose((1 + 1/snr) * energy_x, energy_xn, rtol=1e-2)
    True

    """
    if unit == "dB":
        snr = 10 ** (snr / 20)

    if np.iscomplexobj(x):
        n = np.random.standard_normal(x.shape) + 1j * np.random.standard_normal(x.shape)
    else:
        n = np.random.standard_normal(x.shape)

    n *= 1 / np.sqrt(snr) * np.linalg.norm(x) / np.linalg.norm(n)

    return x + n
def time_vector(n, fs)

Time values of filter with n taps sampled at fs.

Parameters

n : int
number of taps in FIR filter
fs : int
sampling frequency in Hertz

Returns

(n) ndarray times in seconds

Expand source code
def time_vector(n, fs):
    """Time values of filter with n taps sampled at fs.

    Parameters
    ----------
    n : int
        number of taps in FIR filter
    fs : int
        sampling frequency in Hertz

    Returns
    -------
    (n) ndarray
        times in seconds

    """
    T = 1 / fs
    return np.arange(n, dtype=float) * T  # float against int wrapping

Classes

class Response (fs, fdata=None, tdata=None, isEvenSampled=True)

Representation of a linear response in time and frequency domain.

Create Response from time or frequency data.

Use from_time or from_freq methods to create objects of this class!

Parameters

fs : int
Sampling frequency in Hertz
fdata : (…, nt) complex ndarray, optional
Single sided frequency spectra with nt from ns to nr points.
tdata : (…, nf) real ndarray, optional
Time responses with nt from ns to nr points.
isEvenSampled : bool or None, optional
If fdata is given, this tells us if the last entry of fdata is the Nyquist frequency or not. Must be None if tdata is given.

Raises

ValueError
if neither fdata or tdata are given.
Expand source code
class Response(object):
    """Representation of a linear response in time and frequency domain."""

    def __init__(self, fs, fdata=None, tdata=None, isEvenSampled=True):
        """Create Response from time or frequency data.

        Use `from_time` or `from_freq methods` to create objects of this class!

        Parameters
        ----------
        fs : int
            Sampling frequency in Hertz
        fdata : (..., nt) complex ndarray, optional
            Single sided frequency spectra with nt from ns to nr points.
        tdata : (..., nf) real ndarray, optional
            Time responses with nt from ns to nr points.
        isEvenSampled : bool or None, optional
            If fdata is given, this tells us if the last entry of fdata is the
            Nyquist frequency or not. Must be `None` if tdata is given.

        Raises
        ------
        ValueError
            if neither fdata or tdata are given.

        """
        assert float(fs).is_integer()

        if fdata is not None and tdata is None:
            fdata = np.atleast_1d(fdata)
            self._nf = fdata.shape[-1]

            if isEvenSampled:
                self._nt = 2 * (self._nf - 1)
            else:
                self._nt = 2 * self._nf - 1
            self._isEvenSampled = isEvenSampled

            self.__set_frequency_data(fdata)
        elif tdata is not None and fdata is None:
            assert np.all(np.imag(tdata) == 0), "Time data must be real."
            tdata = np.atleast_1d(tdata)
            self._nt = tdata.shape[-1]
            self._nf = self._nt // 2 + 1
            self._isEvenSampled = self._nt % 2 == 0

            self.__set_time_data(tdata)
        else:
            raise ValueError("One and only one of fdata and tdata must be given.")

        self._fs = int(fs)
        self._freqs = freq_vector(self._nt, fs)
        self._times = time_vector(self._nt, fs)
        self._time_length = self._nt * 1 / fs
        self.df = self._freqs[1]  # frequency resolution
        self.dt = self._times[1]  # time resolution

    @classmethod
    def from_time(cls, fs, tdata, **kwargs):
        """Generate Response obj from time response data."""
        tf = cls(fs, tdata=tdata, **kwargs)
        return tf

    @classmethod
    def from_freq(cls, fs, fdata, **kwargs):
        """Generate Response obj from frequency response data."""
        tf = cls(fs, fdata=fdata, **kwargs)
        return tf

    @classmethod
    def from_wav(cls, fps):
        """Import responses from wav files.

        Parameters
        ----------
        fps : list
            File paths of all wav files.

        Returns
        -------
        Response
            New Response object with imported time responses.

        """
        fpi = iter(fps)
        fs, data = wavfile.read(next(fpi))
        hlist = [data] + [wavfile.read(fp)[1] for fp in fpi]

        h = np.array(hlist)
        if data.dtype in [np.uint8, np.int16, np.int32]:
            lim_orig = (np.iinfo(data.dtype).min, np.iinfo(data.dtype).max)
            lim_new = (-1.0, 1.0)
            h = _rescale(h, lim_orig, lim_new).astype(np.double)

        return cls.from_time(fs, h)

    @classmethod
    def new_dirac(cls, fs, T=None, n=None, nch=(1,)):
        """Generate new allpass / dirac response."""
        nch = np.atleast_1d(nch)
        if T is not None:
            nt = round(fs * T)
        else:
            nt = n
        h = np.zeros((*nch, nt))
        h[..., 0] = 1
        return cls.from_time(fs, h)

    @classmethod
    def join(cls, tfs, axis=0, newaxis=True):
        """Concat or stack a set of Responses along a given axis.

        Parameters
        ----------
        tfs : array_like
            List of Responses
        axis : int, optional
            Indice of axis along wich to concatenate / stack TFs.
        newaxis : bool, optional
            If True, do not concatenate but stack arrays along a new axis.

        Returns
        -------
        Response

        Note
        ----
        Transfer functions need to have same sampling rate, length etc.

        """
        joinfunc = np.stack if newaxis else np.concatenate
        tdata = joinfunc([tf.in_time for tf in tfs], axis=axis)
        return cls.from_time(tfs[0].fs, tdata)

    @property
    def time_length(self):
        """Length of time response in seconds."""
        return self._time_length

    @property
    def nf(self):  # noqa: D401
        """Number of frequencies in frequency representation."""
        return len(self._freqs)

    @property
    def nt(self):  # noqa: D401
        """Number of taps."""
        return len(self._times)

    @property
    def fs(self):  # noqa: D401
        """Sampling frequency."""
        return self._fs

    @property
    def freqs(self):  # noqa: D401
        """Frequencies."""
        return self._freqs

    @property
    def times(self):  # noqa: D401
        """Times."""
        return self._times

    @property
    def in_time(self):
        """Time domain response.

        Returns
        -------
        (... , n) ndarray
            Real FIR filters.

        """
        if self._in_time is None:
            self._in_time = np.fft.irfft(self._in_freq, n=self._times.size)
        return self._in_time

    @property
    def in_freq(self):
        """Single sided frequency spectrum.

        Returns
        -------
        (... , n) ndarray
            Complex frequency response.

        """
        if self._in_freq is None:
            self._in_freq = np.fft.rfft(self._in_time)
        return self._in_freq

    @property
    def amplitude_spectrum(self):
        """Amplitude spectrum."""
        X = self.in_freq / self.nt

        if self.nt % 2 == 0:
            # zero and nyquist element only appear once in complex spectrum
            X[..., 1:-1] *= 2
        else:
            # there is no nyquist element
            X[..., 1:] *= 2

        return X

    def __set_time_data(self, tdata):
        """Set time data without creating new object."""
        assert tdata.shape[-1] == self._nt
        self._in_time = tdata
        self._in_freq = None

    def __set_frequency_data(self, fdata):
        """Set frequency data without creating new object."""
        assert fdata.shape[-1] == self._nf
        self._in_freq = fdata
        self._in_time = None

    def plot(
        self,
        group_delay=False,
        slce=None,
        flim=None,
        dblim=None,
        tlim=None,
        grpdlim=None,
        dbref=1,
        show=False,
        use_fig=None,
        label=None,
        unwrap_phase=False,
        logf=True,
        third_oct_f=True,
        plot_kw={},
        **fig_kw,
    ):
        """Plot the response in both domains.

        Parameters
        ----------
        group_delay : bool, optional
            Display group delay instead of phase.
        slce : numpy.lib.index_tricks.IndexExpression
            only plot subset of responses defined by a slice. Last
            dimension (frequency or time) is always completely taken.
        flim : tuple or None, optional
            Frequency axis limits as tuple `(lower, upper)`
        dblim : tuple or None, optional
            Magnitude axis limits as tuple `(lower, upper)`
        tlim : tuple or None, optional
            Time axis limits as tuple `(lower, upper)`
        grpdlim: tuple or None, optional
            Group delay axis limit as tuple `(lower, upper)`
        dbref : float
            dB reference in magnitude plot
        show : bool, optional
            Run `matplotlib.pyplot.show()`
        use_fig : matplotlib.pyplot.Figure
            Reuse an existing figure.
        label : None, optional
            Description
        unwrap_phase : bool, optional
            unwrap phase in phase plot
        logf : bool, optional
            If `True`, use logarithmic frequency axis.
        third_oct_f: bool, optional
            Label frequency axis with third octave bands.
        plot_kw : dictionary, optional
            Keyword arguments passed to the `plt.plot` commands.
        **fig_kw
            Additional options passe to figure creation.

        """
        if use_fig is None:
            fig_kw = {**{"figsize": (10, 10)}, **fig_kw}
            fig, axes = plt.subplots(nrows=3, constrained_layout=True, **fig_kw)
        else:
            fig = use_fig
            axes = fig.axes

        self.plot_magnitude(
            use_ax=axes[0],
            slce=slce,
            dblim=dblim,
            flim=flim,
            dbref=dbref,
            label=label,
            plot_kw=plot_kw,
            logf=logf,
            third_oct_f=third_oct_f,
        )
        if group_delay:
            self.plot_group_delay(
                use_ax=axes[1],
                slce=slce,
                flim=flim,
                ylim=grpdlim,
                plot_kw=plot_kw,
                logf=logf,
                third_oct_f=third_oct_f,
            )
        else:
            self.plot_phase(
                use_ax=axes[1],
                slce=slce,
                flim=flim,
                plot_kw=plot_kw,
                unwrap=unwrap_phase,
                logf=logf,
                third_oct_f=third_oct_f,
            )
        self.plot_time(
            use_ax=axes[2], tlim=tlim, slce=slce, plot_kw=plot_kw
        )

        if show:
            plt.show()

        return fig

    def plot_magnitude(
        self,
        use_ax=None,
        slce=None,
        dblim=None,
        flim=None,
        dbref=1,
        label=None,
        plot_kw={},
        logf=True,
        third_oct_f=True,
        **fig_kw,
    ):
        """Plot magnitude response."""
        # TODO: compute db limits similar to librosa.amplitude_to_db / power_to_db
        if use_ax is None:
            fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
            fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
        else:
            ax = use_ax
            fig = ax.get_figure()

        # append frequency/time dimension to slice
        if slce is None:
            slce = [np.s_[:] for n in range(len(self.in_time.shape))]
        elif isinstance(slce, tuple):
            slce = slce + (np.s_[:],)
        else:
            slce = (slce, np.s_[:])

        # move time / frequency axis to first dimension
        freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape(
            (self.nf, -1)
        )

        plotf = ax.semilogx if logf else ax.plot
        plotf(
            self.freqs,
            20 * np.log10(np.abs(freq_plotready / dbref)),
            label=label,
            **plot_kw,
        )
        ax.set_xlabel("Frequency [Hz]")
        ax.set_ylabel("Magnitude [dB]")
        ax.set_title("Frequency response")
        ax.grid(True)

        if flim is None:
            lowlim = min(10, self.fs / 2 / 100)
            flim = (lowlim, self.fs / 2)
        ax.set_xlim(flim)

        if dblim is not None:
            ax.set_ylim(dblim)

        if label is not None:
            ax.legend()

        if third_oct_f:
            _add_octave_band_xticks(ax)

        return fig

    def plot_phase(
        self,
        use_ax=None,
        slce=None,
        flim=None,
        label=None,
        unwrap=False,
        ylim=None,
        plot_kw={},
        logf=True,
        third_oct_f=True,
        **fig_kw,
    ):
        """Plot phase response."""
        if use_ax is None:
            fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
            fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
        else:
            ax = use_ax
            fig = ax.get_figure()

        # append frequency/time dimension to slice
        if slce is None:
            slce = [np.s_[:] for n in range(len(self.in_time.shape))]
        elif isinstance(slce, tuple):
            slce = slce + (np.s_[:],)
        else:
            slce = (slce, np.s_[:])

        # move time / frequency axis to first dimension
        freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape(
            (self.nf, -1)
        )
        phase = (
            np.unwrap(np.angle(freq_plotready)) if unwrap else np.angle(freq_plotready)
        )

        plotf = ax.semilogx if logf else ax.plot
        plotf(self.freqs, phase, label=label, **plot_kw)
        ax.set_xlabel("Frequency [Hz]")
        ax.set_ylabel("Phase [rad]")
        ax.set_title("Phase response")
        ax.grid(True)

        if flim is None:
            lowlim = min(10, self.fs / 2 / 100)
            flim = (lowlim, self.fs / 2)
        ax.set_xlim(flim)
        if ylim:
            ax.set_ylim(ylim)

        if label is not None:
            ax.legend()

        if third_oct_f:
            _add_octave_band_xticks(ax)

        return fig

    def plot_time(
        self,
        use_ax=None,
        slce=None,
        tlim=None,
        ylim=None,
        label=None,
        plot_kw={},
        **fig_kw,
    ):
        """Plot time response."""
        if use_ax is None:
            fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
            fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
        else:
            ax = use_ax
            fig = ax.get_figure()

        # append frequency/time dimension to slice
        if slce is None:
            slce = [np.s_[:] for n in range(len(self.in_time.shape))]
        elif isinstance(slce, tuple):
            slce = slce + (np.s_[:],)
        else:
            slce = (slce, np.s_[:])

        time_plotready = np.rollaxis(self.in_time[tuple(slce)], -1).reshape(
            (self.nt, -1)
        )

        ax.plot(self.times, time_plotready, label=label, **plot_kw)
        ax.set_xlabel("Time [s]")
        ax.set_ylabel("")
        ax.set_title("Time response")
        ax.grid(True)

        if tlim:
            ax.set_xlim(tlim)
        if ylim:
            ax.set_ylim(ylim)

        if label is not None:
            ax.legend()

        return fig

    def plot_group_delay(
        self,
        use_ax=None,
        slce=None,
        flim=None,
        label=None,
        ylim=None,
        plot_kw={},
        logf=True,
        third_oct_f=True,
        **fig_kw,
    ):
        """Plot group delay."""
        if use_ax is None:
            fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
            fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
        else:
            ax = use_ax
            fig = ax.get_figure()

        # append frequency/time dimension to slice
        if slce is None:
            slce = [np.s_[:] for n in range(len(self.in_time.shape))]
        elif isinstance(slce, tuple):
            slce = slce + (np.s_[:],)
        else:
            slce = (slce, np.s_[:])

        # move time / frequency axis to first dimension
        freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape(
            (self.nf, -1)
        )

        df = self.freqs[1] - self.freqs[0]
        # TODO: use scipy.signal.group_delay here as below has problem at larger delays
        grpd = -np.gradient(np.unwrap(np.angle(freq_plotready)), df, axis=0)

        plotf = ax.semilogx if logf else ax.plot
        plotf(self.freqs, grpd, label=label, **plot_kw)
        ax.set_xlabel("Frequency [Hz]")
        ax.set_ylabel("Delay [s]")
        ax.set_title("Group Delay")
        ax.grid(True)

        if flim is None:
            lowlim = min(10, self.fs / 2 / 100)
            flim = (lowlim, self.fs / 2)
        ax.set_xlim(flim)

        if ylim:
            ax.set_ylim(ylim)

        if label is not None:
            ax.legend()

        if third_oct_f:
            _add_octave_band_xticks(ax)

        return fig

    def plot_power_in_bands(
        self, bands=None, use_ax=None, barkwargs={}, avgaxis=None, dbref=1, **figkwargs
    ):
        """Plot signal's power in bands.

        Parameters
        ----------
        bands : list or None, optional
            List of tuples (f_center, f_lower, f_upper). If `None`, use third octave
            bands.
        use_ax : matplotlib.axis.Axis or None, optional
            Plot into this axis.
        barkwargs : dict
            Keyword arguments to `axis.bar`
        avgaxis : int, tuple or None
            Average power over these axes.
        dbref : float
            dB reference.
        **figkwargs
            Keyword arguments passed to plt.subplots

        Returns
        -------
        P : ndarray
            Power in bands
        fc : ndarray
            Band frequencies
        fig : matplotlib.figure.Figure
            Figure

        """
        P, fc = self.power_in_bands(bands=bands, avgaxis=avgaxis)

        nbands = P.shape[-1]
        P = np.atleast_2d(P).reshape((-1, nbands))

        if use_ax is None:
            fig, ax = plt.subplots(**figkwargs)
        else:
            ax = use_ax
            fig = ax.get_figure()

        xticks = range(1, nbands + 1)
        for i in range(P.shape[0]):
            ax.bar(xticks, 10 * np.log10(P[i] / dbref ** 2), **barkwargs)
        ax.set_xticks(xticks)
        ax.set_xticklabels(["{:.0f}".format(f) for f in fc], rotation="vertical")
        ax.grid(True)
        ax.set_xlabel("Band's center frequencies [Hz]")
        ax.set_ylabel("Power [dB]")

        return (P, fc, fig)

    def time_window(self, startwindow, stopwindow, window="hann"):
        """Apply time domain windows.

        Parameters
        ----------
        startwindow : None or tuple
            Tuple (t1, t2) with beginning and end times of window opening.
        stopwindow : None or tuple
            Tuple (t1, t2) with beginning and end times of window closing.
        window : string or tuple of string and parameter values, optional
            Desired window to use. See scipy.signal.get_window for a list of
            windows and required parameters.

        Returns
        -------
        Response
            Time windowed response object

        """
        n = self.times.size
        twindow = _time_window(self.fs, n, startwindow, stopwindow, window=window)
        new_response = self.from_time(self.fs, self.in_time * twindow)

        return new_response

    def freq_window(self, startwindow, stopwindow, window="hann"):
        """Apply frequency domain window.

        Parameters
        ----------
        startwindow : None or tuple
            Tuple (t1, t2) with beginning and end frequencies of window opening.
        stopwindow : None or tuple
            Tuple (t1, t2) with beginning and end frequencies of window closing.
        window : string or tuple of string and parameter values, optional
            Desired window to use. See scipy.signal.get_window for a list of
            windows and required parameters.

        Returns
        -------
        Response
            Frequency windowed response object

        """
        n = self.times.size
        fwindow = _freq_window(self.fs, n, startwindow, stopwindow, window=window)
        new_response = self.from_freq(self.fs, self.in_freq * fwindow)

        return new_response

    def window_around_peak(self, tleft, tright, alpha, return_window=False):
        """Time window each impulse response around its peak value.

        Parameters
        ----------
        tleft, tright : float
            Window starts `tleft` seconds before and ends `tright` seconds after maximum
            of impulse response.
        alpha : float
            `alpha` parameter of `scipy.signal.tukey` window.
        return_window : bool, optional
            Also return used time window

        Returns
        -------
        Response
            Time windowed response object.
        ndarray
            Time window, if `return_window` is `True`.

        """
        window = _construct_window_around_peak(
            self.fs, self.in_time, tleft, tright, alpha=alpha
        )

        if return_window:
            return self.from_time(self.fs, self.in_time * window), window

        return self.from_time(self.fs, self.in_time * window)

    def delay(self, dt, keep_length=True):
        """Delay time response by dt seconds.

        Rounds of to closest integer delay.
        """
        x = delay(self.fs, self.in_time, dt, keep_length=keep_length)
        return self.from_time(self.fs, x)

    def circdelay(self, dt):
        """Delay by circular shift.

        Rounds of to closest integer delay.
        """
        x = self.in_time
        n = int(round(dt * self.fs))
        shifted = np.roll(x, n, axis=-1)

        return self.from_time(self.fs, shifted)

    def timecrop(self, start, end):
        """Crop time response.

        Parameters
        ----------
        start, end : float
            Start and end times in seconds. Does not include sample at t=end. Use
            end=None to force inclusion of last sample.

        Returns
        -------
        Response
            New Response object with cropped time.

        Notes
        -----
        Creates new Response object.

        Examples
        --------
        >>> import numpy as np
        >>> from response import Response
        >>> r = Response.from_time(100, np.random.normal(size=100))
        >>> split = 0.2

        The following holds:

        >>> np.all(np.concatenate(
        ...     (
        ...         r.timecrop(0, split).in_time,
        ...         r.timecrop(split, None).in_time,
        ...     ),
        ...     axis=-1,
        ... ) == r.in_time)
        True

        """
        if start < 0:
            start += self.time_length
        if end is not None and end < 0:
            end += self.time_length
        assert 0 <= start < self.time_length
        assert end is None or (0 < end <= self.time_length)

        _, i_start = _find_nearest(self.times, start)
        if end is None:
            i_end = None
        else:
            _, i_end = _find_nearest(self.times, end)

        h = self.in_time[..., i_start:i_end]

        new_response = self.from_time(self.fs, h)

        return new_response

    def non_causal_timecrop(self, length):
        """Cut length of non-causal impulse response.

        "FFT shift, cropping on both ends, iFFT shift"

        Parameters
        ----------
        length : float
            final length in seconds

        Returns
        -------
        Response
            New Response object new length.

        Note
        ----
        Can introduce delay pre-delay by a sample.

        """
        assert length < self.time_length

        cut = (self.time_length - length) / 2

        _, i_start = _find_nearest(self.times, cut)
        _, i_end = _find_nearest(self.times, self.time_length - cut)

        h = np.fft.ifftshift(np.fft.fftshift(self.in_time)[..., i_start:i_end])

        new_response = self.from_time(self.fs, h)

        if new_response.time_length != length:
            w = f"Could not precisely shrink to {length}s with fs = {self.fs}"
            warnings.warn(w)

        return new_response

    def zeropad(self, before, after):
        """Zeropad time response.

        Parameters
        ----------
        before, after : int
            Number of zero samples inserted before and after response.

        Returns
        -------
        Response
            Zeropadded response

        """
        assert before % 1 == 0
        assert after % 1 == 0
        dims = self.in_time.ndim

        pad_width = [(0, 0) for n in range(dims)]
        pad_width[-1] = (int(before), int(after))

        h = np.pad(self.in_time, pad_width, "constant")

        return self.from_time(self.fs, h)

    def zeropad_to_power_of_2(self):
        """Pad time response for length of power of 2.

        Returns
        -------
        Response
            New response object with larger, power of 2 length.

        """
        # https://stackoverflow.com/questions/14267555/find-the-smallest-power-of-2-greater-than-n-in-python
        n = 2 ** (self.nt - 1).bit_length()
        return self.zeropad(0, n - self.nt)

    def zeropad_to_length(self, n):
        """Zeropad time response to specific length.

        Returns
        -------
        Response
            New response object with new length n.

        """
        oldn = self.nt
        assert n >= oldn
        return self.zeropad(0, n - oldn)

    def resample(self, fs_new, normalize="same_gain", window=None):
        """Resample using Fourier method.

        Parameters
        ----------
        fs_new : int
            New sample rate
        normalize : str, optional
            If 'same_gain', normalize such that the gain is the same
            as the original signal. If 'same_amplitude', amplitudes will be preserved.
        window : None, optional
            Passed to scipy.signal.resample.

        Returns
        -------
        Response
            New resampled response object.

        Raises
        ------
        ValueError
            If resulting number of samples would be a non-integer.

        """
        if fs_new == self.fs:
            return self

        nt_new = fs_new * self.time_length

        if nt_new % 1 != 0:
            raise ValueError(
                "New number of samples must be integer, but is {}".format(nt_new)
            )

        nt_new = int(nt_new)

        h_new = resample(self.in_time, nt_new, axis=-1, window=window)

        if normalize == "same_gain":
            h_new *= self.nt / nt_new
        elif normalize == "same_amplitude":
            pass
        else:
            raise ValueError(
                "Expected 'same_gain' or 'same_amplitude', got %s" % (normalize,)
            )

        return self.from_time(fs_new, h_new)

    def resample_poly(self, fs_new, normalize="same_gain", window=("kaiser", 5.0)):
        """Resample using polyphase filtering.

        Parameters
        ----------
        fs_new : int
            New sample rate
        normalize : str, optional
            If 'same_gain', normalize such that the gain is the same
            as the original signal. If 'same_amplitude', amplitudes will be preserved.
        window : None, optional
            Passed to scipy.signal.resample_poly.

        Returns
        -------
        Response
            New resampled response object.

        """
        if fs_new == self.fs:
            return self

        ratio = Fraction(fs_new, self.fs)
        up = ratio.numerator
        down = ratio.denominator

        if up > 1000 or down > 1000:
            print("Warning: resampling with high ratio {}/{}".format(up, down))

        h_new = resample_poly(self.in_time, up, down, axis=-1, window=window)

        if normalize == "same_gain":
            h_new *= down / up
        elif normalize == "same_amplitude":
            pass
        else:
            raise ValueError(
                "Expected 'same_gain' or 'same_amplitude', got %s" % (normalize,)
            )

        return self.from_time(fs_new, h_new)

    def normalize(self, maxval=1):
        """Normalize time response.

        Parameters
        ----------
        maxval: float, optional
            Maximum amplitude in resulting time response.

        Returns
        -------
        Response

        """
        h = self.in_time
        h /= np.abs(self.in_time).max()
        h *= maxval
        return self.from_time(self.fs, h)

    def export_wav(self, folder, name_fmt="{:02d}.wav", dtype=np.int16):
        """Export impulse response to wave files.

        Dimension of data must 2.

        Parameters
        ----------
        folder : file path
            Save in this folder
        name_fmt : str, optional
            Format string for file names with one placeholder, e.g. 'filt1{:02d}.wav'.
        dtype : one of np.float32, np.int32, np.int16, np.uint8
            Data is converted to this type.

        """
        data = np.atleast_2d(self.in_time)

        assert data.ndim == 2
        assert np.all(np.abs(data) <= 1.0)

        # convert and scale to new output datatype
        if dtype in [np.uint8, np.int16, np.int32]:
            lim_orig = (-1.0, 1.0)
            lim_new = (np.iinfo(dtype).min, np.iinfo(dtype).max)
            data = _rescale(data, lim_orig, lim_new).astype(dtype)
        elif dtype != np.float32:
            raise TypeError(f"dtype {dtype} is not supported by scipy.wavfile.write.")

        path = Path(folder)
        if not path.is_dir():
            path.mkdir(parents=True, exist_ok=False)

        for i in range(data.shape[0]):
            wavfile.write(path / name_fmt.format(i + 1), self.fs, data[i])

    def export_npz(self, filename, dtype=np.float32):
        """Export impulse response as npz file.

        Parameters
        ----------
        filename: str or Path
            File path
        dtype: numpy dtype
            Convert to this type before saving

        """
        np.savez(
            filename, impulse_response=self.in_time.astype(dtype), samplerate=self.fs
        )

    def power_in_bands(self, bands=None, avgaxis=None):
        """Compute power of signal in third octave bands.

        Power(band) =   1/T  integral  |X(f)| ** 2 df
                            f in band

        Parameters
        ----------
        bands : list of tuples, length nbands optional
            Center, lower and upper frequencies of bands.
        avgaxis: int, tuple or None
            Average result over these axis

        Returns
        -------
        P: ndarray, shape (..., nbands)
            Power in bands
        fcs: list, length nbands
            Center frequencies of bands

        """
        if bands is None:
            bands = _third_octave_bands

        # center frequencies
        fcs = np.asarray([b[0] for b in bands])
        Npow2 = 2 ** (self.nt - 1).bit_length()
        f = np.fft.fftfreq(Npow2, d=1 / self.fs)

        shape = list(self.in_freq.shape)
        shape[-1] = len(bands)
        P = np.zeros(shape)
        for i, (fc, fl, fu) in enumerate(bands):
            if fu < self.fs / 2:  # include only bands in frequency range
                iband = np.logical_and(fl <= f, f < fu)
                P[..., i] = np.sum(
                    np.abs(np.fft.fft(self.in_time, n=Npow2, axis=-1)[..., iband]) ** 2
                    * 2  # energy from negative and positive frequencies
                    * self.dt
                    / self.nt
                    / self.time_length,
                    axis=-1,
                )
            else:
                P[..., i] = 0

        if avgaxis is not None:
            P = P.mean(axis=avgaxis)

        return P, fcs

    @classmethod
    def time_vector(cls, n, fs):
        """Time values of filter with n taps sampled at fs.

        Parameters
        ----------
        n : int
            number of taps in FIR filter
        fs : int
            sampling frequency in Hertz

        Returns
        -------
        (n) ndarray
            times in seconds

        """
        return time_vector(n, fs)

    @classmethod
    def freq_vector(cls, n, fs):
        """Frequency values of filter with n taps sampled at fs up to Nyquist.

        Parameters
        ----------
        n : int
            Number of taps in FIR filter
        fs : int
            Sampling frequency in Hertz

        Returns
        -------
        (n // 2 + 1) ndarray
            Frequencies in Hz

        """
        return freq_vector(n, fs, sided='single')

    def filter(self, b, a=[1]):
        """Filter response along one-dimension with an IIR or FIR filter.

        Parameters
        ----------
        b : array_like
            The numerator coefficient vector in a 1-D sequence.
        a : array_like, optional
            The denominator coefficient vector in a 1-D sequence.  If ``a[0]``
            is not 1, then both `a` and `b` are normalized by ``a[0]``.

        """
        return self.from_time(self.fs, lfilter(b, a, self.in_time, axis=-1))

    def add_noise(self, snr, unit=None):
        """Add noise to x with relative noise level SNR.

        Parameters
        ----------
        snr : float
            relative magnitude of noise, i.e. snr = Ex/En
        unit : None or str, optional
            if "dB", SNR is specified in dB, i.e. SNR = 10*log(Ex/En).

        Returns
        -------
        Response

        """
        return self.from_time(self.fs, noisify(self.in_time, snr, unit=unit))

    def psd(self, **kwargs):
        """Compute the power spectral density of the signal.

        Parameters
        ----------
        kwargs
            keword arguments passed to scipy.signal.welch

        Returns
        -------
        f : ndarray
            Array of sample frequencies.
        Pxx : ndarray
            Power spectral density of time signal.

        Notes
        -----
        Use scaling='density' for power per bin bandwidth and scaling='spectrum' for
        power per bin.

        """
        return welch(self.in_time, fs=self.fs, **kwargs)

Static methods

def freq_vector(n, fs)

Frequency values of filter with n taps sampled at fs up to Nyquist.

Parameters

n : int
Number of taps in FIR filter
fs : int
Sampling frequency in Hertz

Returns

(n // 2 + 1) ndarray Frequencies in Hz

Expand source code
@classmethod
def freq_vector(cls, n, fs):
    """Frequency values of filter with n taps sampled at fs up to Nyquist.

    Parameters
    ----------
    n : int
        Number of taps in FIR filter
    fs : int
        Sampling frequency in Hertz

    Returns
    -------
    (n // 2 + 1) ndarray
        Frequencies in Hz

    """
    return freq_vector(n, fs, sided='single')
def from_freq(fs, fdata, **kwargs)

Generate Response obj from frequency response data.

Expand source code
@classmethod
def from_freq(cls, fs, fdata, **kwargs):
    """Generate Response obj from frequency response data."""
    tf = cls(fs, fdata=fdata, **kwargs)
    return tf
def from_time(fs, tdata, **kwargs)

Generate Response obj from time response data.

Expand source code
@classmethod
def from_time(cls, fs, tdata, **kwargs):
    """Generate Response obj from time response data."""
    tf = cls(fs, tdata=tdata, **kwargs)
    return tf
def from_wav(fps)

Import responses from wav files.

Parameters

fps : list
File paths of all wav files.

Returns

Response
New Response object with imported time responses.
Expand source code
@classmethod
def from_wav(cls, fps):
    """Import responses from wav files.

    Parameters
    ----------
    fps : list
        File paths of all wav files.

    Returns
    -------
    Response
        New Response object with imported time responses.

    """
    fpi = iter(fps)
    fs, data = wavfile.read(next(fpi))
    hlist = [data] + [wavfile.read(fp)[1] for fp in fpi]

    h = np.array(hlist)
    if data.dtype in [np.uint8, np.int16, np.int32]:
        lim_orig = (np.iinfo(data.dtype).min, np.iinfo(data.dtype).max)
        lim_new = (-1.0, 1.0)
        h = _rescale(h, lim_orig, lim_new).astype(np.double)

    return cls.from_time(fs, h)
def join(tfs, axis=0, newaxis=True)

Concat or stack a set of Responses along a given axis.

Parameters

tfs : array_like
List of Responses
axis : int, optional
Indice of axis along wich to concatenate / stack TFs.
newaxis : bool, optional
If True, do not concatenate but stack arrays along a new axis.

Returns

Response
 

Note

Transfer functions need to have same sampling rate, length etc.

Expand source code
@classmethod
def join(cls, tfs, axis=0, newaxis=True):
    """Concat or stack a set of Responses along a given axis.

    Parameters
    ----------
    tfs : array_like
        List of Responses
    axis : int, optional
        Indice of axis along wich to concatenate / stack TFs.
    newaxis : bool, optional
        If True, do not concatenate but stack arrays along a new axis.

    Returns
    -------
    Response

    Note
    ----
    Transfer functions need to have same sampling rate, length etc.

    """
    joinfunc = np.stack if newaxis else np.concatenate
    tdata = joinfunc([tf.in_time for tf in tfs], axis=axis)
    return cls.from_time(tfs[0].fs, tdata)
def new_dirac(fs, T=None, n=None, nch=(1,))

Generate new allpass / dirac response.

Expand source code
@classmethod
def new_dirac(cls, fs, T=None, n=None, nch=(1,)):
    """Generate new allpass / dirac response."""
    nch = np.atleast_1d(nch)
    if T is not None:
        nt = round(fs * T)
    else:
        nt = n
    h = np.zeros((*nch, nt))
    h[..., 0] = 1
    return cls.from_time(fs, h)
def time_vector(n, fs)

Time values of filter with n taps sampled at fs.

Parameters

n : int
number of taps in FIR filter
fs : int
sampling frequency in Hertz

Returns

(n) ndarray times in seconds

Expand source code
@classmethod
def time_vector(cls, n, fs):
    """Time values of filter with n taps sampled at fs.

    Parameters
    ----------
    n : int
        number of taps in FIR filter
    fs : int
        sampling frequency in Hertz

    Returns
    -------
    (n) ndarray
        times in seconds

    """
    return time_vector(n, fs)

Instance variables

var amplitude_spectrum

Amplitude spectrum.

Expand source code
@property
def amplitude_spectrum(self):
    """Amplitude spectrum."""
    X = self.in_freq / self.nt

    if self.nt % 2 == 0:
        # zero and nyquist element only appear once in complex spectrum
        X[..., 1:-1] *= 2
    else:
        # there is no nyquist element
        X[..., 1:] *= 2

    return X
var freqs

Frequencies.

Expand source code
@property
def freqs(self):  # noqa: D401
    """Frequencies."""
    return self._freqs
var fs

Sampling frequency.

Expand source code
@property
def fs(self):  # noqa: D401
    """Sampling frequency."""
    return self._fs
var in_freq

Single sided frequency spectrum.

Returns

(… , n) ndarray Complex frequency response.

Expand source code
@property
def in_freq(self):
    """Single sided frequency spectrum.

    Returns
    -------
    (... , n) ndarray
        Complex frequency response.

    """
    if self._in_freq is None:
        self._in_freq = np.fft.rfft(self._in_time)
    return self._in_freq
var in_time

Time domain response.

Returns

(… , n) ndarray Real FIR filters.

Expand source code
@property
def in_time(self):
    """Time domain response.

    Returns
    -------
    (... , n) ndarray
        Real FIR filters.

    """
    if self._in_time is None:
        self._in_time = np.fft.irfft(self._in_freq, n=self._times.size)
    return self._in_time
var nf

Number of frequencies in frequency representation.

Expand source code
@property
def nf(self):  # noqa: D401
    """Number of frequencies in frequency representation."""
    return len(self._freqs)
var nt

Number of taps.

Expand source code
@property
def nt(self):  # noqa: D401
    """Number of taps."""
    return len(self._times)
var time_length

Length of time response in seconds.

Expand source code
@property
def time_length(self):
    """Length of time response in seconds."""
    return self._time_length
var times

Times.

Expand source code
@property
def times(self):  # noqa: D401
    """Times."""
    return self._times

Methods

def add_noise(self, snr, unit=None)

Add noise to x with relative noise level SNR.

Parameters

snr : float
relative magnitude of noise, i.e. snr = Ex/En
unit : None or str, optional
if "dB", SNR is specified in dB, i.e. SNR = 10*log(Ex/En).

Returns

Response
 
Expand source code
def add_noise(self, snr, unit=None):
    """Add noise to x with relative noise level SNR.

    Parameters
    ----------
    snr : float
        relative magnitude of noise, i.e. snr = Ex/En
    unit : None or str, optional
        if "dB", SNR is specified in dB, i.e. SNR = 10*log(Ex/En).

    Returns
    -------
    Response

    """
    return self.from_time(self.fs, noisify(self.in_time, snr, unit=unit))
def circdelay(self, dt)

Delay by circular shift.

Rounds of to closest integer delay.

Expand source code
def circdelay(self, dt):
    """Delay by circular shift.

    Rounds of to closest integer delay.
    """
    x = self.in_time
    n = int(round(dt * self.fs))
    shifted = np.roll(x, n, axis=-1)

    return self.from_time(self.fs, shifted)
def delay(self, dt, keep_length=True)

Delay time response by dt seconds.

Rounds of to closest integer delay.

Expand source code
def delay(self, dt, keep_length=True):
    """Delay time response by dt seconds.

    Rounds of to closest integer delay.
    """
    x = delay(self.fs, self.in_time, dt, keep_length=keep_length)
    return self.from_time(self.fs, x)
def export_npz(self, filename, dtype=numpy.float32)

Export impulse response as npz file.

Parameters

filename : str or Path
File path
dtype : numpy dtype
Convert to this type before saving
Expand source code
def export_npz(self, filename, dtype=np.float32):
    """Export impulse response as npz file.

    Parameters
    ----------
    filename: str or Path
        File path
    dtype: numpy dtype
        Convert to this type before saving

    """
    np.savez(
        filename, impulse_response=self.in_time.astype(dtype), samplerate=self.fs
    )
def export_wav(self, folder, name_fmt='{:02d}.wav', dtype=numpy.int16)

Export impulse response to wave files.

Dimension of data must 2.

Parameters

folder : file path
Save in this folder
name_fmt : str, optional
Format string for file names with one placeholder, e.g. 'filt1{:02d}.wav'.
dtype : one of np.float32, np.int32, np.int16, np.uint8
Data is converted to this type.
Expand source code
def export_wav(self, folder, name_fmt="{:02d}.wav", dtype=np.int16):
    """Export impulse response to wave files.

    Dimension of data must 2.

    Parameters
    ----------
    folder : file path
        Save in this folder
    name_fmt : str, optional
        Format string for file names with one placeholder, e.g. 'filt1{:02d}.wav'.
    dtype : one of np.float32, np.int32, np.int16, np.uint8
        Data is converted to this type.

    """
    data = np.atleast_2d(self.in_time)

    assert data.ndim == 2
    assert np.all(np.abs(data) <= 1.0)

    # convert and scale to new output datatype
    if dtype in [np.uint8, np.int16, np.int32]:
        lim_orig = (-1.0, 1.0)
        lim_new = (np.iinfo(dtype).min, np.iinfo(dtype).max)
        data = _rescale(data, lim_orig, lim_new).astype(dtype)
    elif dtype != np.float32:
        raise TypeError(f"dtype {dtype} is not supported by scipy.wavfile.write.")

    path = Path(folder)
    if not path.is_dir():
        path.mkdir(parents=True, exist_ok=False)

    for i in range(data.shape[0]):
        wavfile.write(path / name_fmt.format(i + 1), self.fs, data[i])
def filter(self, b, a=[1])

Filter response along one-dimension with an IIR or FIR filter.

Parameters

b : array_like
The numerator coefficient vector in a 1-D sequence.
a : array_like, optional
The denominator coefficient vector in a 1-D sequence. If a[0] is not 1, then both a and b are normalized by a[0].
Expand source code
def filter(self, b, a=[1]):
    """Filter response along one-dimension with an IIR or FIR filter.

    Parameters
    ----------
    b : array_like
        The numerator coefficient vector in a 1-D sequence.
    a : array_like, optional
        The denominator coefficient vector in a 1-D sequence.  If ``a[0]``
        is not 1, then both `a` and `b` are normalized by ``a[0]``.

    """
    return self.from_time(self.fs, lfilter(b, a, self.in_time, axis=-1))
def freq_window(self, startwindow, stopwindow, window='hann')

Apply frequency domain window.

Parameters

startwindow : None or tuple
Tuple (t1, t2) with beginning and end frequencies of window opening.
stopwindow : None or tuple
Tuple (t1, t2) with beginning and end frequencies of window closing.
window : string or tuple of string and parameter values, optional
Desired window to use. See scipy.signal.get_window for a list of windows and required parameters.

Returns

Response
Frequency windowed response object
Expand source code
def freq_window(self, startwindow, stopwindow, window="hann"):
    """Apply frequency domain window.

    Parameters
    ----------
    startwindow : None or tuple
        Tuple (t1, t2) with beginning and end frequencies of window opening.
    stopwindow : None or tuple
        Tuple (t1, t2) with beginning and end frequencies of window closing.
    window : string or tuple of string and parameter values, optional
        Desired window to use. See scipy.signal.get_window for a list of
        windows and required parameters.

    Returns
    -------
    Response
        Frequency windowed response object

    """
    n = self.times.size
    fwindow = _freq_window(self.fs, n, startwindow, stopwindow, window=window)
    new_response = self.from_freq(self.fs, self.in_freq * fwindow)

    return new_response
def non_causal_timecrop(self, length)

Cut length of non-causal impulse response.

"FFT shift, cropping on both ends, iFFT shift"

Parameters

length : float
final length in seconds

Returns

Response
New Response object new length.

Note

Can introduce delay pre-delay by a sample.

Expand source code
def non_causal_timecrop(self, length):
    """Cut length of non-causal impulse response.

    "FFT shift, cropping on both ends, iFFT shift"

    Parameters
    ----------
    length : float
        final length in seconds

    Returns
    -------
    Response
        New Response object new length.

    Note
    ----
    Can introduce delay pre-delay by a sample.

    """
    assert length < self.time_length

    cut = (self.time_length - length) / 2

    _, i_start = _find_nearest(self.times, cut)
    _, i_end = _find_nearest(self.times, self.time_length - cut)

    h = np.fft.ifftshift(np.fft.fftshift(self.in_time)[..., i_start:i_end])

    new_response = self.from_time(self.fs, h)

    if new_response.time_length != length:
        w = f"Could not precisely shrink to {length}s with fs = {self.fs}"
        warnings.warn(w)

    return new_response
def normalize(self, maxval=1)

Normalize time response.

Parameters

maxval : float, optional
Maximum amplitude in resulting time response.

Returns

Response
 
Expand source code
def normalize(self, maxval=1):
    """Normalize time response.

    Parameters
    ----------
    maxval: float, optional
        Maximum amplitude in resulting time response.

    Returns
    -------
    Response

    """
    h = self.in_time
    h /= np.abs(self.in_time).max()
    h *= maxval
    return self.from_time(self.fs, h)
def plot(self, group_delay=False, slce=None, flim=None, dblim=None, tlim=None, grpdlim=None, dbref=1, show=False, use_fig=None, label=None, unwrap_phase=False, logf=True, third_oct_f=True, plot_kw={}, **fig_kw)

Plot the response in both domains.

Parameters

group_delay : bool, optional
Display group delay instead of phase.
slce : numpy.lib.index_tricks.IndexExpression
only plot subset of responses defined by a slice. Last dimension (frequency or time) is always completely taken.
flim : tuple or None, optional
Frequency axis limits as tuple (lower, upper)
dblim : tuple or None, optional
Magnitude axis limits as tuple (lower, upper)
tlim : tuple or None, optional
Time axis limits as tuple (lower, upper)
grpdlim : tuple or None, optional
Group delay axis limit as tuple (lower, upper)
dbref : float
dB reference in magnitude plot
show : bool, optional
Run matplotlib.pyplot.show()
use_fig : matplotlib.pyplot.Figure
Reuse an existing figure.
label : None, optional
Description
unwrap_phase : bool, optional
unwrap phase in phase plot
logf : bool, optional
If True, use logarithmic frequency axis.
third_oct_f : bool, optional
Label frequency axis with third octave bands.
plot_kw : dictionary, optional
Keyword arguments passed to the plt.plot commands.
**fig_kw
Additional options passe to figure creation.
Expand source code
def plot(
    self,
    group_delay=False,
    slce=None,
    flim=None,
    dblim=None,
    tlim=None,
    grpdlim=None,
    dbref=1,
    show=False,
    use_fig=None,
    label=None,
    unwrap_phase=False,
    logf=True,
    third_oct_f=True,
    plot_kw={},
    **fig_kw,
):
    """Plot the response in both domains.

    Parameters
    ----------
    group_delay : bool, optional
        Display group delay instead of phase.
    slce : numpy.lib.index_tricks.IndexExpression
        only plot subset of responses defined by a slice. Last
        dimension (frequency or time) is always completely taken.
    flim : tuple or None, optional
        Frequency axis limits as tuple `(lower, upper)`
    dblim : tuple or None, optional
        Magnitude axis limits as tuple `(lower, upper)`
    tlim : tuple or None, optional
        Time axis limits as tuple `(lower, upper)`
    grpdlim: tuple or None, optional
        Group delay axis limit as tuple `(lower, upper)`
    dbref : float
        dB reference in magnitude plot
    show : bool, optional
        Run `matplotlib.pyplot.show()`
    use_fig : matplotlib.pyplot.Figure
        Reuse an existing figure.
    label : None, optional
        Description
    unwrap_phase : bool, optional
        unwrap phase in phase plot
    logf : bool, optional
        If `True`, use logarithmic frequency axis.
    third_oct_f: bool, optional
        Label frequency axis with third octave bands.
    plot_kw : dictionary, optional
        Keyword arguments passed to the `plt.plot` commands.
    **fig_kw
        Additional options passe to figure creation.

    """
    if use_fig is None:
        fig_kw = {**{"figsize": (10, 10)}, **fig_kw}
        fig, axes = plt.subplots(nrows=3, constrained_layout=True, **fig_kw)
    else:
        fig = use_fig
        axes = fig.axes

    self.plot_magnitude(
        use_ax=axes[0],
        slce=slce,
        dblim=dblim,
        flim=flim,
        dbref=dbref,
        label=label,
        plot_kw=plot_kw,
        logf=logf,
        third_oct_f=third_oct_f,
    )
    if group_delay:
        self.plot_group_delay(
            use_ax=axes[1],
            slce=slce,
            flim=flim,
            ylim=grpdlim,
            plot_kw=plot_kw,
            logf=logf,
            third_oct_f=third_oct_f,
        )
    else:
        self.plot_phase(
            use_ax=axes[1],
            slce=slce,
            flim=flim,
            plot_kw=plot_kw,
            unwrap=unwrap_phase,
            logf=logf,
            third_oct_f=third_oct_f,
        )
    self.plot_time(
        use_ax=axes[2], tlim=tlim, slce=slce, plot_kw=plot_kw
    )

    if show:
        plt.show()

    return fig
def plot_group_delay(self, use_ax=None, slce=None, flim=None, label=None, ylim=None, plot_kw={}, logf=True, third_oct_f=True, **fig_kw)

Plot group delay.

Expand source code
def plot_group_delay(
    self,
    use_ax=None,
    slce=None,
    flim=None,
    label=None,
    ylim=None,
    plot_kw={},
    logf=True,
    third_oct_f=True,
    **fig_kw,
):
    """Plot group delay."""
    if use_ax is None:
        fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
        fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
    else:
        ax = use_ax
        fig = ax.get_figure()

    # append frequency/time dimension to slice
    if slce is None:
        slce = [np.s_[:] for n in range(len(self.in_time.shape))]
    elif isinstance(slce, tuple):
        slce = slce + (np.s_[:],)
    else:
        slce = (slce, np.s_[:])

    # move time / frequency axis to first dimension
    freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape(
        (self.nf, -1)
    )

    df = self.freqs[1] - self.freqs[0]
    # TODO: use scipy.signal.group_delay here as below has problem at larger delays
    grpd = -np.gradient(np.unwrap(np.angle(freq_plotready)), df, axis=0)

    plotf = ax.semilogx if logf else ax.plot
    plotf(self.freqs, grpd, label=label, **plot_kw)
    ax.set_xlabel("Frequency [Hz]")
    ax.set_ylabel("Delay [s]")
    ax.set_title("Group Delay")
    ax.grid(True)

    if flim is None:
        lowlim = min(10, self.fs / 2 / 100)
        flim = (lowlim, self.fs / 2)
    ax.set_xlim(flim)

    if ylim:
        ax.set_ylim(ylim)

    if label is not None:
        ax.legend()

    if third_oct_f:
        _add_octave_band_xticks(ax)

    return fig
def plot_magnitude(self, use_ax=None, slce=None, dblim=None, flim=None, dbref=1, label=None, plot_kw={}, logf=True, third_oct_f=True, **fig_kw)

Plot magnitude response.

Expand source code
def plot_magnitude(
    self,
    use_ax=None,
    slce=None,
    dblim=None,
    flim=None,
    dbref=1,
    label=None,
    plot_kw={},
    logf=True,
    third_oct_f=True,
    **fig_kw,
):
    """Plot magnitude response."""
    # TODO: compute db limits similar to librosa.amplitude_to_db / power_to_db
    if use_ax is None:
        fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
        fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
    else:
        ax = use_ax
        fig = ax.get_figure()

    # append frequency/time dimension to slice
    if slce is None:
        slce = [np.s_[:] for n in range(len(self.in_time.shape))]
    elif isinstance(slce, tuple):
        slce = slce + (np.s_[:],)
    else:
        slce = (slce, np.s_[:])

    # move time / frequency axis to first dimension
    freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape(
        (self.nf, -1)
    )

    plotf = ax.semilogx if logf else ax.plot
    plotf(
        self.freqs,
        20 * np.log10(np.abs(freq_plotready / dbref)),
        label=label,
        **plot_kw,
    )
    ax.set_xlabel("Frequency [Hz]")
    ax.set_ylabel("Magnitude [dB]")
    ax.set_title("Frequency response")
    ax.grid(True)

    if flim is None:
        lowlim = min(10, self.fs / 2 / 100)
        flim = (lowlim, self.fs / 2)
    ax.set_xlim(flim)

    if dblim is not None:
        ax.set_ylim(dblim)

    if label is not None:
        ax.legend()

    if third_oct_f:
        _add_octave_band_xticks(ax)

    return fig
def plot_phase(self, use_ax=None, slce=None, flim=None, label=None, unwrap=False, ylim=None, plot_kw={}, logf=True, third_oct_f=True, **fig_kw)

Plot phase response.

Expand source code
def plot_phase(
    self,
    use_ax=None,
    slce=None,
    flim=None,
    label=None,
    unwrap=False,
    ylim=None,
    plot_kw={},
    logf=True,
    third_oct_f=True,
    **fig_kw,
):
    """Plot phase response."""
    if use_ax is None:
        fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
        fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
    else:
        ax = use_ax
        fig = ax.get_figure()

    # append frequency/time dimension to slice
    if slce is None:
        slce = [np.s_[:] for n in range(len(self.in_time.shape))]
    elif isinstance(slce, tuple):
        slce = slce + (np.s_[:],)
    else:
        slce = (slce, np.s_[:])

    # move time / frequency axis to first dimension
    freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape(
        (self.nf, -1)
    )
    phase = (
        np.unwrap(np.angle(freq_plotready)) if unwrap else np.angle(freq_plotready)
    )

    plotf = ax.semilogx if logf else ax.plot
    plotf(self.freqs, phase, label=label, **plot_kw)
    ax.set_xlabel("Frequency [Hz]")
    ax.set_ylabel("Phase [rad]")
    ax.set_title("Phase response")
    ax.grid(True)

    if flim is None:
        lowlim = min(10, self.fs / 2 / 100)
        flim = (lowlim, self.fs / 2)
    ax.set_xlim(flim)
    if ylim:
        ax.set_ylim(ylim)

    if label is not None:
        ax.legend()

    if third_oct_f:
        _add_octave_band_xticks(ax)

    return fig
def plot_power_in_bands(self, bands=None, use_ax=None, barkwargs={}, avgaxis=None, dbref=1, **figkwargs)

Plot signal's power in bands.

Parameters

bands : list or None, optional
List of tuples (f_center, f_lower, f_upper). If None, use third octave bands.
use_ax : matplotlib.axis.Axis or None, optional
Plot into this axis.
barkwargs : dict
Keyword arguments to axis.bar
avgaxis : int, tuple or None
Average power over these axes.
dbref : float
dB reference.
**figkwargs
Keyword arguments passed to plt.subplots

Returns

P : ndarray
Power in bands
fc : ndarray
Band frequencies
fig : matplotlib.figure.Figure
Figure
Expand source code
def plot_power_in_bands(
    self, bands=None, use_ax=None, barkwargs={}, avgaxis=None, dbref=1, **figkwargs
):
    """Plot signal's power in bands.

    Parameters
    ----------
    bands : list or None, optional
        List of tuples (f_center, f_lower, f_upper). If `None`, use third octave
        bands.
    use_ax : matplotlib.axis.Axis or None, optional
        Plot into this axis.
    barkwargs : dict
        Keyword arguments to `axis.bar`
    avgaxis : int, tuple or None
        Average power over these axes.
    dbref : float
        dB reference.
    **figkwargs
        Keyword arguments passed to plt.subplots

    Returns
    -------
    P : ndarray
        Power in bands
    fc : ndarray
        Band frequencies
    fig : matplotlib.figure.Figure
        Figure

    """
    P, fc = self.power_in_bands(bands=bands, avgaxis=avgaxis)

    nbands = P.shape[-1]
    P = np.atleast_2d(P).reshape((-1, nbands))

    if use_ax is None:
        fig, ax = plt.subplots(**figkwargs)
    else:
        ax = use_ax
        fig = ax.get_figure()

    xticks = range(1, nbands + 1)
    for i in range(P.shape[0]):
        ax.bar(xticks, 10 * np.log10(P[i] / dbref ** 2), **barkwargs)
    ax.set_xticks(xticks)
    ax.set_xticklabels(["{:.0f}".format(f) for f in fc], rotation="vertical")
    ax.grid(True)
    ax.set_xlabel("Band's center frequencies [Hz]")
    ax.set_ylabel("Power [dB]")

    return (P, fc, fig)
def plot_time(self, use_ax=None, slce=None, tlim=None, ylim=None, label=None, plot_kw={}, **fig_kw)

Plot time response.

Expand source code
def plot_time(
    self,
    use_ax=None,
    slce=None,
    tlim=None,
    ylim=None,
    label=None,
    plot_kw={},
    **fig_kw,
):
    """Plot time response."""
    if use_ax is None:
        fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
        fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
    else:
        ax = use_ax
        fig = ax.get_figure()

    # append frequency/time dimension to slice
    if slce is None:
        slce = [np.s_[:] for n in range(len(self.in_time.shape))]
    elif isinstance(slce, tuple):
        slce = slce + (np.s_[:],)
    else:
        slce = (slce, np.s_[:])

    time_plotready = np.rollaxis(self.in_time[tuple(slce)], -1).reshape(
        (self.nt, -1)
    )

    ax.plot(self.times, time_plotready, label=label, **plot_kw)
    ax.set_xlabel("Time [s]")
    ax.set_ylabel("")
    ax.set_title("Time response")
    ax.grid(True)

    if tlim:
        ax.set_xlim(tlim)
    if ylim:
        ax.set_ylim(ylim)

    if label is not None:
        ax.legend()

    return fig
def power_in_bands(self, bands=None, avgaxis=None)

Compute power of signal in third octave bands.

Power(band) = 1/T integral |X(f)| ** 2 df f in band

Parameters

bands : list of tuples, length nbands optional
Center, lower and upper frequencies of bands.
avgaxis : int, tuple or None
Average result over these axis

Returns

P : ndarray, shape (…, nbands)
Power in bands
fcs : list, length nbands
Center frequencies of bands
Expand source code
def power_in_bands(self, bands=None, avgaxis=None):
    """Compute power of signal in third octave bands.

    Power(band) =   1/T  integral  |X(f)| ** 2 df
                        f in band

    Parameters
    ----------
    bands : list of tuples, length nbands optional
        Center, lower and upper frequencies of bands.
    avgaxis: int, tuple or None
        Average result over these axis

    Returns
    -------
    P: ndarray, shape (..., nbands)
        Power in bands
    fcs: list, length nbands
        Center frequencies of bands

    """
    if bands is None:
        bands = _third_octave_bands

    # center frequencies
    fcs = np.asarray([b[0] for b in bands])
    Npow2 = 2 ** (self.nt - 1).bit_length()
    f = np.fft.fftfreq(Npow2, d=1 / self.fs)

    shape = list(self.in_freq.shape)
    shape[-1] = len(bands)
    P = np.zeros(shape)
    for i, (fc, fl, fu) in enumerate(bands):
        if fu < self.fs / 2:  # include only bands in frequency range
            iband = np.logical_and(fl <= f, f < fu)
            P[..., i] = np.sum(
                np.abs(np.fft.fft(self.in_time, n=Npow2, axis=-1)[..., iband]) ** 2
                * 2  # energy from negative and positive frequencies
                * self.dt
                / self.nt
                / self.time_length,
                axis=-1,
            )
        else:
            P[..., i] = 0

    if avgaxis is not None:
        P = P.mean(axis=avgaxis)

    return P, fcs
def psd(self, **kwargs)

Compute the power spectral density of the signal.

Parameters

kwargs
keword arguments passed to scipy.signal.welch

Returns

f : ndarray
Array of sample frequencies.
Pxx : ndarray
Power spectral density of time signal.

Notes

Use scaling='density' for power per bin bandwidth and scaling='spectrum' for power per bin.

Expand source code
def psd(self, **kwargs):
    """Compute the power spectral density of the signal.

    Parameters
    ----------
    kwargs
        keword arguments passed to scipy.signal.welch

    Returns
    -------
    f : ndarray
        Array of sample frequencies.
    Pxx : ndarray
        Power spectral density of time signal.

    Notes
    -----
    Use scaling='density' for power per bin bandwidth and scaling='spectrum' for
    power per bin.

    """
    return welch(self.in_time, fs=self.fs, **kwargs)
def resample(self, fs_new, normalize='same_gain', window=None)

Resample using Fourier method.

Parameters

fs_new : int
New sample rate
normalize : str, optional
If 'same_gain', normalize such that the gain is the same as the original signal. If 'same_amplitude', amplitudes will be preserved.
window : None, optional
Passed to scipy.signal.resample.

Returns

Response
New resampled response object.

Raises

ValueError
If resulting number of samples would be a non-integer.
Expand source code
def resample(self, fs_new, normalize="same_gain", window=None):
    """Resample using Fourier method.

    Parameters
    ----------
    fs_new : int
        New sample rate
    normalize : str, optional
        If 'same_gain', normalize such that the gain is the same
        as the original signal. If 'same_amplitude', amplitudes will be preserved.
    window : None, optional
        Passed to scipy.signal.resample.

    Returns
    -------
    Response
        New resampled response object.

    Raises
    ------
    ValueError
        If resulting number of samples would be a non-integer.

    """
    if fs_new == self.fs:
        return self

    nt_new = fs_new * self.time_length

    if nt_new % 1 != 0:
        raise ValueError(
            "New number of samples must be integer, but is {}".format(nt_new)
        )

    nt_new = int(nt_new)

    h_new = resample(self.in_time, nt_new, axis=-1, window=window)

    if normalize == "same_gain":
        h_new *= self.nt / nt_new
    elif normalize == "same_amplitude":
        pass
    else:
        raise ValueError(
            "Expected 'same_gain' or 'same_amplitude', got %s" % (normalize,)
        )

    return self.from_time(fs_new, h_new)
def resample_poly(self, fs_new, normalize='same_gain', window=('kaiser', 5.0))

Resample using polyphase filtering.

Parameters

fs_new : int
New sample rate
normalize : str, optional
If 'same_gain', normalize such that the gain is the same as the original signal. If 'same_amplitude', amplitudes will be preserved.
window : None, optional
Passed to scipy.signal.resample_poly.

Returns

Response
New resampled response object.
Expand source code
def resample_poly(self, fs_new, normalize="same_gain", window=("kaiser", 5.0)):
    """Resample using polyphase filtering.

    Parameters
    ----------
    fs_new : int
        New sample rate
    normalize : str, optional
        If 'same_gain', normalize such that the gain is the same
        as the original signal. If 'same_amplitude', amplitudes will be preserved.
    window : None, optional
        Passed to scipy.signal.resample_poly.

    Returns
    -------
    Response
        New resampled response object.

    """
    if fs_new == self.fs:
        return self

    ratio = Fraction(fs_new, self.fs)
    up = ratio.numerator
    down = ratio.denominator

    if up > 1000 or down > 1000:
        print("Warning: resampling with high ratio {}/{}".format(up, down))

    h_new = resample_poly(self.in_time, up, down, axis=-1, window=window)

    if normalize == "same_gain":
        h_new *= down / up
    elif normalize == "same_amplitude":
        pass
    else:
        raise ValueError(
            "Expected 'same_gain' or 'same_amplitude', got %s" % (normalize,)
        )

    return self.from_time(fs_new, h_new)
def time_window(self, startwindow, stopwindow, window='hann')

Apply time domain windows.

Parameters

startwindow : None or tuple
Tuple (t1, t2) with beginning and end times of window opening.
stopwindow : None or tuple
Tuple (t1, t2) with beginning and end times of window closing.
window : string or tuple of string and parameter values, optional
Desired window to use. See scipy.signal.get_window for a list of windows and required parameters.

Returns

Response
Time windowed response object
Expand source code
def time_window(self, startwindow, stopwindow, window="hann"):
    """Apply time domain windows.

    Parameters
    ----------
    startwindow : None or tuple
        Tuple (t1, t2) with beginning and end times of window opening.
    stopwindow : None or tuple
        Tuple (t1, t2) with beginning and end times of window closing.
    window : string or tuple of string and parameter values, optional
        Desired window to use. See scipy.signal.get_window for a list of
        windows and required parameters.

    Returns
    -------
    Response
        Time windowed response object

    """
    n = self.times.size
    twindow = _time_window(self.fs, n, startwindow, stopwindow, window=window)
    new_response = self.from_time(self.fs, self.in_time * twindow)

    return new_response
def timecrop(self, start, end)

Crop time response.

Parameters

start, end : float
Start and end times in seconds. Does not include sample at t=end. Use end=None to force inclusion of last sample.

Returns

Response
New Response object with cropped time.

Notes

Creates new Response object.

Examples

>>> import numpy as np
>>> from response import Response
>>> r = Response.from_time(100, np.random.normal(size=100))
>>> split = 0.2

The following holds:

>>> np.all(np.concatenate(
...     (
...         r.timecrop(0, split).in_time,
...         r.timecrop(split, None).in_time,
...     ),
...     axis=-1,
... ) == r.in_time)
True
Expand source code
def timecrop(self, start, end):
    """Crop time response.

    Parameters
    ----------
    start, end : float
        Start and end times in seconds. Does not include sample at t=end. Use
        end=None to force inclusion of last sample.

    Returns
    -------
    Response
        New Response object with cropped time.

    Notes
    -----
    Creates new Response object.

    Examples
    --------
    >>> import numpy as np
    >>> from response import Response
    >>> r = Response.from_time(100, np.random.normal(size=100))
    >>> split = 0.2

    The following holds:

    >>> np.all(np.concatenate(
    ...     (
    ...         r.timecrop(0, split).in_time,
    ...         r.timecrop(split, None).in_time,
    ...     ),
    ...     axis=-1,
    ... ) == r.in_time)
    True

    """
    if start < 0:
        start += self.time_length
    if end is not None and end < 0:
        end += self.time_length
    assert 0 <= start < self.time_length
    assert end is None or (0 < end <= self.time_length)

    _, i_start = _find_nearest(self.times, start)
    if end is None:
        i_end = None
    else:
        _, i_end = _find_nearest(self.times, end)

    h = self.in_time[..., i_start:i_end]

    new_response = self.from_time(self.fs, h)

    return new_response
def window_around_peak(self, tleft, tright, alpha, return_window=False)

Time window each impulse response around its peak value.

Parameters

tleft, tright : float
Window starts tleft seconds before and ends tright seconds after maximum of impulse response.
alpha : float
alpha parameter of scipy.signal.tukey window.
return_window : bool, optional
Also return used time window

Returns

Response
Time windowed response object.
ndarray
Time window, if return_window is True.
Expand source code
def window_around_peak(self, tleft, tright, alpha, return_window=False):
    """Time window each impulse response around its peak value.

    Parameters
    ----------
    tleft, tright : float
        Window starts `tleft` seconds before and ends `tright` seconds after maximum
        of impulse response.
    alpha : float
        `alpha` parameter of `scipy.signal.tukey` window.
    return_window : bool, optional
        Also return used time window

    Returns
    -------
    Response
        Time windowed response object.
    ndarray
        Time window, if `return_window` is `True`.

    """
    window = _construct_window_around_peak(
        self.fs, self.in_time, tleft, tright, alpha=alpha
    )

    if return_window:
        return self.from_time(self.fs, self.in_time * window), window

    return self.from_time(self.fs, self.in_time * window)
def zeropad(self, before, after)

Zeropad time response.

Parameters

before, after : int
Number of zero samples inserted before and after response.

Returns

Response
Zeropadded response
Expand source code
def zeropad(self, before, after):
    """Zeropad time response.

    Parameters
    ----------
    before, after : int
        Number of zero samples inserted before and after response.

    Returns
    -------
    Response
        Zeropadded response

    """
    assert before % 1 == 0
    assert after % 1 == 0
    dims = self.in_time.ndim

    pad_width = [(0, 0) for n in range(dims)]
    pad_width[-1] = (int(before), int(after))

    h = np.pad(self.in_time, pad_width, "constant")

    return self.from_time(self.fs, h)
def zeropad_to_length(self, n)

Zeropad time response to specific length.

Returns

Response
New response object with new length n.
Expand source code
def zeropad_to_length(self, n):
    """Zeropad time response to specific length.

    Returns
    -------
    Response
        New response object with new length n.

    """
    oldn = self.nt
    assert n >= oldn
    return self.zeropad(0, n - oldn)
def zeropad_to_power_of_2(self)

Pad time response for length of power of 2.

Returns

Response
New response object with larger, power of 2 length.
Expand source code
def zeropad_to_power_of_2(self):
    """Pad time response for length of power of 2.

    Returns
    -------
    Response
        New response object with larger, power of 2 length.

    """
    # https://stackoverflow.com/questions/14267555/find-the-smallest-power-of-2-greater-than-n-in-python
    n = 2 ** (self.nt - 1).bit_length()
    return self.zeropad(0, n - self.nt)