Module response
Your handy frequency and impulse response processing object.
This module supplies the Response
class: an abstraction of frequency and
impulse responses and a set of handy methods for their processing. It implements a
fluent interface for chaining the processing commands.
Find the documentation here and the source code on GitHub.
import numpy as np
from response import Response
fs = 48000 # sampling rate
T = 0.5 # length of signal
# a sine at 100 Hz
t = np.arange(int(T * fs)) / fs
x = np.sin(2 * np.pi * 100 * t)
# Do chain of processing
r = (
Response.from_time(fs, x)
# time window at the end and beginning
.time_window((0, 0.1), (-0.1, None), window="hann") # equivalent to Tukey window
# zeropad to one second length
.zeropad_to_length(fs * 1)
# circular shift to center
.circdelay(T / 2)
# resample with polyphase filter, keep gain of filter
.resample_poly(500, window=("kaiser", 0.5), normalize="same_amplitude")
# cut 0.2s at beginning and end
.timecrop(0.2, -0.2)
# apply frequency domain window
.freq_window((0, 90), (110, 500))
)
# plot magnitude, phase and time response
r.plot(show=True)
# real impulse response
r.in_time
# complex frequency response
r.in_freq
# and much more ...
Expand source code
"""Your handy frequency and impulse response processing object.
[](https://pypi.org/project/response/)
[](https://pypi.org/project/response/)
[](https://travis-ci.org/fhchl/Response)
[](https://codecov.io/gh/fhchl/Response)
This module supplies the `Response` class: an abstraction of frequency and
impulse responses and a set of handy methods for their processing. It implements a
[fluent interface][1] for chaining the processing commands.
Find the documentation [here][2] and the source code on [GitHub][3].
```python
import numpy as np
from response import Response
fs = 48000 # sampling rate
T = 0.5 # length of signal
# a sine at 100 Hz
t = np.arange(int(T * fs)) / fs
x = np.sin(2 * np.pi * 100 * t)
# Do chain of processing
r = (
Response.from_time(fs, x)
# time window at the end and beginning
.time_window((0, 0.1), (-0.1, None), window="hann") # equivalent to Tukey window
# zeropad to one second length
.zeropad_to_length(fs * 1)
# circular shift to center
.circdelay(T / 2)
# resample with polyphase filter, keep gain of filter
.resample_poly(500, window=("kaiser", 0.5), normalize="same_amplitude")
# cut 0.2s at beginning and end
.timecrop(0.2, -0.2)
# apply frequency domain window
.freq_window((0, 90), (110, 500))
)
# plot magnitude, phase and time response
r.plot(show=True)
# real impulse response
r.in_time
# complex frequency response
r.in_freq
# and much more ...
```
[1]: https://en.wikipedia.org/wiki/Fluent_interface
[2]: https://fhchl.github.io/Response/
[3]: https://github.com/fhchl/Response
"""
import warnings
from fractions import Fraction
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
from scipy.io import wavfile
from scipy.signal import get_window, lfilter, resample, resample_poly, tukey, welch
class Response(object):
"""Representation of a linear response in time and frequency domain."""
def __init__(self, fs, fdata=None, tdata=None, isEvenSampled=True):
"""Create Response from time or frequency data.
Use `from_time` or `from_freq methods` to create objects of this class!
Parameters
----------
fs : int
Sampling frequency in Hertz
fdata : (..., nt) complex ndarray, optional
Single sided frequency spectra with nt from ns to nr points.
tdata : (..., nf) real ndarray, optional
Time responses with nt from ns to nr points.
isEvenSampled : bool or None, optional
If fdata is given, this tells us if the last entry of fdata is the
Nyquist frequency or not. Must be `None` if tdata is given.
Raises
------
ValueError
if neither fdata or tdata are given.
"""
assert float(fs).is_integer()
if fdata is not None and tdata is None:
fdata = np.atleast_1d(fdata)
self._nf = fdata.shape[-1]
if isEvenSampled:
self._nt = 2 * (self._nf - 1)
else:
self._nt = 2 * self._nf - 1
self._isEvenSampled = isEvenSampled
self.__set_frequency_data(fdata)
elif tdata is not None and fdata is None:
assert np.all(np.imag(tdata) == 0), "Time data must be real."
tdata = np.atleast_1d(tdata)
self._nt = tdata.shape[-1]
self._nf = self._nt // 2 + 1
self._isEvenSampled = self._nt % 2 == 0
self.__set_time_data(tdata)
else:
raise ValueError("One and only one of fdata and tdata must be given.")
self._fs = int(fs)
self._freqs = freq_vector(self._nt, fs)
self._times = time_vector(self._nt, fs)
self._time_length = self._nt * 1 / fs
self.df = self._freqs[1] # frequency resolution
self.dt = self._times[1] # time resolution
@classmethod
def from_time(cls, fs, tdata, **kwargs):
"""Generate Response obj from time response data."""
tf = cls(fs, tdata=tdata, **kwargs)
return tf
@classmethod
def from_freq(cls, fs, fdata, **kwargs):
"""Generate Response obj from frequency response data."""
tf = cls(fs, fdata=fdata, **kwargs)
return tf
@classmethod
def from_wav(cls, fps):
"""Import responses from wav files.
Parameters
----------
fps : list
File paths of all wav files.
Returns
-------
Response
New Response object with imported time responses.
"""
fpi = iter(fps)
fs, data = wavfile.read(next(fpi))
hlist = [data] + [wavfile.read(fp)[1] for fp in fpi]
h = np.array(hlist)
if data.dtype in [np.uint8, np.int16, np.int32]:
lim_orig = (np.iinfo(data.dtype).min, np.iinfo(data.dtype).max)
lim_new = (-1.0, 1.0)
h = _rescale(h, lim_orig, lim_new).astype(np.double)
return cls.from_time(fs, h)
@classmethod
def new_dirac(cls, fs, T=None, n=None, nch=(1,)):
"""Generate new allpass / dirac response."""
nch = np.atleast_1d(nch)
if T is not None:
nt = round(fs * T)
else:
nt = n
h = np.zeros((*nch, nt))
h[..., 0] = 1
return cls.from_time(fs, h)
@classmethod
def join(cls, tfs, axis=0, newaxis=True):
"""Concat or stack a set of Responses along a given axis.
Parameters
----------
tfs : array_like
List of Responses
axis : int, optional
Indice of axis along wich to concatenate / stack TFs.
newaxis : bool, optional
If True, do not concatenate but stack arrays along a new axis.
Returns
-------
Response
Note
----
Transfer functions need to have same sampling rate, length etc.
"""
joinfunc = np.stack if newaxis else np.concatenate
tdata = joinfunc([tf.in_time for tf in tfs], axis=axis)
return cls.from_time(tfs[0].fs, tdata)
@property
def time_length(self):
"""Length of time response in seconds."""
return self._time_length
@property
def nf(self): # noqa: D401
"""Number of frequencies in frequency representation."""
return len(self._freqs)
@property
def nt(self): # noqa: D401
"""Number of taps."""
return len(self._times)
@property
def fs(self): # noqa: D401
"""Sampling frequency."""
return self._fs
@property
def freqs(self): # noqa: D401
"""Frequencies."""
return self._freqs
@property
def times(self): # noqa: D401
"""Times."""
return self._times
@property
def in_time(self):
"""Time domain response.
Returns
-------
(... , n) ndarray
Real FIR filters.
"""
if self._in_time is None:
self._in_time = np.fft.irfft(self._in_freq, n=self._times.size)
return self._in_time
@property
def in_freq(self):
"""Single sided frequency spectrum.
Returns
-------
(... , n) ndarray
Complex frequency response.
"""
if self._in_freq is None:
self._in_freq = np.fft.rfft(self._in_time)
return self._in_freq
@property
def amplitude_spectrum(self):
"""Amplitude spectrum."""
X = self.in_freq / self.nt
if self.nt % 2 == 0:
# zero and nyquist element only appear once in complex spectrum
X[..., 1:-1] *= 2
else:
# there is no nyquist element
X[..., 1:] *= 2
return X
def __set_time_data(self, tdata):
"""Set time data without creating new object."""
assert tdata.shape[-1] == self._nt
self._in_time = tdata
self._in_freq = None
def __set_frequency_data(self, fdata):
"""Set frequency data without creating new object."""
assert fdata.shape[-1] == self._nf
self._in_freq = fdata
self._in_time = None
def plot(
self,
group_delay=False,
slce=None,
flim=None,
dblim=None,
tlim=None,
grpdlim=None,
dbref=1,
show=False,
use_fig=None,
label=None,
unwrap_phase=False,
logf=True,
third_oct_f=True,
plot_kw={},
**fig_kw,
):
"""Plot the response in both domains.
Parameters
----------
group_delay : bool, optional
Display group delay instead of phase.
slce : numpy.lib.index_tricks.IndexExpression
only plot subset of responses defined by a slice. Last
dimension (frequency or time) is always completely taken.
flim : tuple or None, optional
Frequency axis limits as tuple `(lower, upper)`
dblim : tuple or None, optional
Magnitude axis limits as tuple `(lower, upper)`
tlim : tuple or None, optional
Time axis limits as tuple `(lower, upper)`
grpdlim: tuple or None, optional
Group delay axis limit as tuple `(lower, upper)`
dbref : float
dB reference in magnitude plot
show : bool, optional
Run `matplotlib.pyplot.show()`
use_fig : matplotlib.pyplot.Figure
Reuse an existing figure.
label : None, optional
Description
unwrap_phase : bool, optional
unwrap phase in phase plot
logf : bool, optional
If `True`, use logarithmic frequency axis.
third_oct_f: bool, optional
Label frequency axis with third octave bands.
plot_kw : dictionary, optional
Keyword arguments passed to the `plt.plot` commands.
**fig_kw
Additional options passe to figure creation.
"""
if use_fig is None:
fig_kw = {**{"figsize": (10, 10)}, **fig_kw}
fig, axes = plt.subplots(nrows=3, constrained_layout=True, **fig_kw)
else:
fig = use_fig
axes = fig.axes
self.plot_magnitude(
use_ax=axes[0],
slce=slce,
dblim=dblim,
flim=flim,
dbref=dbref,
label=label,
plot_kw=plot_kw,
logf=logf,
third_oct_f=third_oct_f,
)
if group_delay:
self.plot_group_delay(
use_ax=axes[1],
slce=slce,
flim=flim,
ylim=grpdlim,
plot_kw=plot_kw,
logf=logf,
third_oct_f=third_oct_f,
)
else:
self.plot_phase(
use_ax=axes[1],
slce=slce,
flim=flim,
plot_kw=plot_kw,
unwrap=unwrap_phase,
logf=logf,
third_oct_f=third_oct_f,
)
self.plot_time(
use_ax=axes[2], tlim=tlim, slce=slce, plot_kw=plot_kw
)
if show:
plt.show()
return fig
def plot_magnitude(
self,
use_ax=None,
slce=None,
dblim=None,
flim=None,
dbref=1,
label=None,
plot_kw={},
logf=True,
third_oct_f=True,
**fig_kw,
):
"""Plot magnitude response."""
# TODO: compute db limits similar to librosa.amplitude_to_db / power_to_db
if use_ax is None:
fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
else:
ax = use_ax
fig = ax.get_figure()
# append frequency/time dimension to slice
if slce is None:
slce = [np.s_[:] for n in range(len(self.in_time.shape))]
elif isinstance(slce, tuple):
slce = slce + (np.s_[:],)
else:
slce = (slce, np.s_[:])
# move time / frequency axis to first dimension
freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape(
(self.nf, -1)
)
plotf = ax.semilogx if logf else ax.plot
plotf(
self.freqs,
20 * np.log10(np.abs(freq_plotready / dbref)),
label=label,
**plot_kw,
)
ax.set_xlabel("Frequency [Hz]")
ax.set_ylabel("Magnitude [dB]")
ax.set_title("Frequency response")
ax.grid(True)
if flim is None:
lowlim = min(10, self.fs / 2 / 100)
flim = (lowlim, self.fs / 2)
ax.set_xlim(flim)
if dblim is not None:
ax.set_ylim(dblim)
if label is not None:
ax.legend()
if third_oct_f:
_add_octave_band_xticks(ax)
return fig
def plot_phase(
self,
use_ax=None,
slce=None,
flim=None,
label=None,
unwrap=False,
ylim=None,
plot_kw={},
logf=True,
third_oct_f=True,
**fig_kw,
):
"""Plot phase response."""
if use_ax is None:
fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
else:
ax = use_ax
fig = ax.get_figure()
# append frequency/time dimension to slice
if slce is None:
slce = [np.s_[:] for n in range(len(self.in_time.shape))]
elif isinstance(slce, tuple):
slce = slce + (np.s_[:],)
else:
slce = (slce, np.s_[:])
# move time / frequency axis to first dimension
freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape(
(self.nf, -1)
)
phase = (
np.unwrap(np.angle(freq_plotready)) if unwrap else np.angle(freq_plotready)
)
plotf = ax.semilogx if logf else ax.plot
plotf(self.freqs, phase, label=label, **plot_kw)
ax.set_xlabel("Frequency [Hz]")
ax.set_ylabel("Phase [rad]")
ax.set_title("Phase response")
ax.grid(True)
if flim is None:
lowlim = min(10, self.fs / 2 / 100)
flim = (lowlim, self.fs / 2)
ax.set_xlim(flim)
if ylim:
ax.set_ylim(ylim)
if label is not None:
ax.legend()
if third_oct_f:
_add_octave_band_xticks(ax)
return fig
def plot_time(
self,
use_ax=None,
slce=None,
tlim=None,
ylim=None,
label=None,
plot_kw={},
**fig_kw,
):
"""Plot time response."""
if use_ax is None:
fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
else:
ax = use_ax
fig = ax.get_figure()
# append frequency/time dimension to slice
if slce is None:
slce = [np.s_[:] for n in range(len(self.in_time.shape))]
elif isinstance(slce, tuple):
slce = slce + (np.s_[:],)
else:
slce = (slce, np.s_[:])
time_plotready = np.rollaxis(self.in_time[tuple(slce)], -1).reshape(
(self.nt, -1)
)
ax.plot(self.times, time_plotready, label=label, **plot_kw)
ax.set_xlabel("Time [s]")
ax.set_ylabel("")
ax.set_title("Time response")
ax.grid(True)
if tlim:
ax.set_xlim(tlim)
if ylim:
ax.set_ylim(ylim)
if label is not None:
ax.legend()
return fig
def plot_group_delay(
self,
use_ax=None,
slce=None,
flim=None,
label=None,
ylim=None,
plot_kw={},
logf=True,
third_oct_f=True,
**fig_kw,
):
"""Plot group delay."""
if use_ax is None:
fig_kw = {**{"figsize": (10, 5)}, **fig_kw}
fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw)
else:
ax = use_ax
fig = ax.get_figure()
# append frequency/time dimension to slice
if slce is None:
slce = [np.s_[:] for n in range(len(self.in_time.shape))]
elif isinstance(slce, tuple):
slce = slce + (np.s_[:],)
else:
slce = (slce, np.s_[:])
# move time / frequency axis to first dimension
freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape(
(self.nf, -1)
)
df = self.freqs[1] - self.freqs[0]
# TODO: use scipy.signal.group_delay here as below has problem at larger delays
grpd = -np.gradient(np.unwrap(np.angle(freq_plotready)), df, axis=0)
plotf = ax.semilogx if logf else ax.plot
plotf(self.freqs, grpd, label=label, **plot_kw)
ax.set_xlabel("Frequency [Hz]")
ax.set_ylabel("Delay [s]")
ax.set_title("Group Delay")
ax.grid(True)
if flim is None:
lowlim = min(10, self.fs / 2 / 100)
flim = (lowlim, self.fs / 2)
ax.set_xlim(flim)
if ylim:
ax.set_ylim(ylim)
if label is not None:
ax.legend()
if third_oct_f:
_add_octave_band_xticks(ax)
return fig
def plot_power_in_bands(
self, bands=None, use_ax=None, barkwargs={}, avgaxis=None, dbref=1, **figkwargs
):
"""Plot signal's power in bands.
Parameters
----------
bands : list or None, optional
List of tuples (f_center, f_lower, f_upper). If `None`, use third octave
bands.
use_ax : matplotlib.axis.Axis or None, optional
Plot into this axis.
barkwargs : dict
Keyword arguments to `axis.bar`
avgaxis : int, tuple or None
Average power over these axes.
dbref : float
dB reference.
**figkwargs
Keyword arguments passed to plt.subplots
Returns
-------
P : ndarray
Power in bands
fc : ndarray
Band frequencies
fig : matplotlib.figure.Figure
Figure
"""
P, fc = self.power_in_bands(bands=bands, avgaxis=avgaxis)
nbands = P.shape[-1]
P = np.atleast_2d(P).reshape((-1, nbands))
if use_ax is None:
fig, ax = plt.subplots(**figkwargs)
else:
ax = use_ax
fig = ax.get_figure()
xticks = range(1, nbands + 1)
for i in range(P.shape[0]):
ax.bar(xticks, 10 * np.log10(P[i] / dbref ** 2), **barkwargs)
ax.set_xticks(xticks)
ax.set_xticklabels(["{:.0f}".format(f) for f in fc], rotation="vertical")
ax.grid(True)
ax.set_xlabel("Band's center frequencies [Hz]")
ax.set_ylabel("Power [dB]")
return (P, fc, fig)
def time_window(self, startwindow, stopwindow, window="hann"):
"""Apply time domain windows.
Parameters
----------
startwindow : None or tuple
Tuple (t1, t2) with beginning and end times of window opening.
stopwindow : None or tuple
Tuple (t1, t2) with beginning and end times of window closing.
window : string or tuple of string and parameter values, optional
Desired window to use. See scipy.signal.get_window for a list of
windows and required parameters.
Returns
-------
Response
Time windowed response object
"""
n = self.times.size
twindow = _time_window(self.fs, n, startwindow, stopwindow, window=window)
new_response = self.from_time(self.fs, self.in_time * twindow)
return new_response
def freq_window(self, startwindow, stopwindow, window="hann"):
"""Apply frequency domain window.
Parameters
----------
startwindow : None or tuple
Tuple (t1, t2) with beginning and end frequencies of window opening.
stopwindow : None or tuple
Tuple (t1, t2) with beginning and end frequencies of window closing.
window : string or tuple of string and parameter values, optional
Desired window to use. See scipy.signal.get_window for a list of
windows and required parameters.
Returns
-------
Response
Frequency windowed response object
"""
n = self.times.size
fwindow = _freq_window(self.fs, n, startwindow, stopwindow, window=window)
new_response = self.from_freq(self.fs, self.in_freq * fwindow)
return new_response
def window_around_peak(self, tleft, tright, alpha, return_window=False):
"""Time window each impulse response around its peak value.
Parameters
----------
tleft, tright : float
Window starts `tleft` seconds before and ends `tright` seconds after maximum
of impulse response.
alpha : float
`alpha` parameter of `scipy.signal.tukey` window.
return_window : bool, optional
Also return used time window
Returns
-------
Response
Time windowed response object.
ndarray
Time window, if `return_window` is `True`.
"""
window = _construct_window_around_peak(
self.fs, self.in_time, tleft, tright, alpha=alpha
)
if return_window:
return self.from_time(self.fs, self.in_time * window), window
return self.from_time(self.fs, self.in_time * window)
def delay(self, dt, keep_length=True):
"""Delay time response by dt seconds.
Rounds of to closest integer delay.
"""
x = delay(self.fs, self.in_time, dt, keep_length=keep_length)
return self.from_time(self.fs, x)
def circdelay(self, dt):
"""Delay by circular shift.
Rounds of to closest integer delay.
"""
x = self.in_time
n = int(round(dt * self.fs))
shifted = np.roll(x, n, axis=-1)
return self.from_time(self.fs, shifted)
def timecrop(self, start, end):
"""Crop time response.
Parameters
----------
start, end : float
Start and end times in seconds. Does not include sample at t=end. Use
end=None to force inclusion of last sample.
Returns
-------
Response
New Response object with cropped time.
Notes
-----
Creates new Response object.
Examples
--------
>>> import numpy as np
>>> from response import Response
>>> r = Response.from_time(100, np.random.normal(size=100))
>>> split = 0.2
The following holds:
>>> np.all(np.concatenate(
... (
... r.timecrop(0, split).in_time,
... r.timecrop(split, None).in_time,
... ),
... axis=-1,
... ) == r.in_time)
True
"""
if start < 0:
start += self.time_length
if end is not None and end < 0:
end += self.time_length
assert 0 <= start < self.time_length
assert end is None or (0 < end <= self.time_length)
_, i_start = _find_nearest(self.times, start)
if end is None:
i_end = None
else:
_, i_end = _find_nearest(self.times, end)
h = self.in_time[..., i_start:i_end]
new_response = self.from_time(self.fs, h)
return new_response
def non_causal_timecrop(self, length):
"""Cut length of non-causal impulse response.
"FFT shift, cropping on both ends, iFFT shift"
Parameters
----------
length : float
final length in seconds
Returns
-------
Response
New Response object new length.
Note
----
Can introduce delay pre-delay by a sample.
"""
assert length < self.time_length
cut = (self.time_length - length) / 2
_, i_start = _find_nearest(self.times, cut)
_, i_end = _find_nearest(self.times, self.time_length - cut)
h = np.fft.ifftshift(np.fft.fftshift(self.in_time)[..., i_start:i_end])
new_response = self.from_time(self.fs, h)
if new_response.time_length != length:
w = f"Could not precisely shrink to {length}s with fs = {self.fs}"
warnings.warn(w)
return new_response
def zeropad(self, before, after):
"""Zeropad time response.
Parameters
----------
before, after : int
Number of zero samples inserted before and after response.
Returns
-------
Response
Zeropadded response
"""
assert before % 1 == 0
assert after % 1 == 0
dims = self.in_time.ndim
pad_width = [(0, 0) for n in range(dims)]
pad_width[-1] = (int(before), int(after))
h = np.pad(self.in_time, pad_width, "constant")
return self.from_time(self.fs, h)
def zeropad_to_power_of_2(self):
"""Pad time response for length of power of 2.
Returns
-------
Response
New response object with larger, power of 2 length.
"""
# https://stackoverflow.com/questions/14267555/find-the-smallest-power-of-2-greater-than-n-in-python
n = 2 ** (self.nt - 1).bit_length()
return self.zeropad(0, n - self.nt)
def zeropad_to_length(self, n):
"""Zeropad time response to specific length.
Returns
-------
Response
New response object with new length n.
"""
oldn = self.nt
assert n >= oldn
return self.zeropad(0, n - oldn)
def resample(self, fs_new, normalize="same_gain", window=None):
"""Resample using Fourier method.
Parameters
----------
fs_new : int
New sample rate
normalize : str, optional
If 'same_gain', normalize such that the gain is the same
as the original signal. If 'same_amplitude', amplitudes will be preserved.
window : None, optional
Passed to scipy.signal.resample.
Returns
-------
Response
New resampled response object.
Raises
------
ValueError
If resulting number of samples would be a non-integer.
"""
if fs_new == self.fs:
return self
nt_new = fs_new * self.time_length
if nt_new % 1 != 0:
raise ValueError(
"New number of samples must be integer, but is {}".format(nt_new)
)
nt_new = int(nt_new)
h_new = resample(self.in_time, nt_new, axis=-1, window=window)
if normalize == "same_gain":
h_new *= self.nt / nt_new
elif normalize == "same_amplitude":
pass
else:
raise ValueError(
"Expected 'same_gain' or 'same_amplitude', got %s" % (normalize,)
)
return self.from_time(fs_new, h_new)
def resample_poly(self, fs_new, normalize="same_gain", window=("kaiser", 5.0)):
"""Resample using polyphase filtering.
Parameters
----------
fs_new : int
New sample rate
normalize : str, optional
If 'same_gain', normalize such that the gain is the same
as the original signal. If 'same_amplitude', amplitudes will be preserved.
window : None, optional
Passed to scipy.signal.resample_poly.
Returns
-------
Response
New resampled response object.
"""
if fs_new == self.fs:
return self
ratio = Fraction(fs_new, self.fs)
up = ratio.numerator
down = ratio.denominator
if up > 1000 or down > 1000:
print("Warning: resampling with high ratio {}/{}".format(up, down))
h_new = resample_poly(self.in_time, up, down, axis=-1, window=window)
if normalize == "same_gain":
h_new *= down / up
elif normalize == "same_amplitude":
pass
else:
raise ValueError(
"Expected 'same_gain' or 'same_amplitude', got %s" % (normalize,)
)
return self.from_time(fs_new, h_new)
def normalize(self, maxval=1):
"""Normalize time response.
Parameters
----------
maxval: float, optional
Maximum amplitude in resulting time response.
Returns
-------
Response
"""
h = self.in_time
h /= np.abs(self.in_time).max()
h *= maxval
return self.from_time(self.fs, h)
def export_wav(self, folder, name_fmt="{:02d}.wav", dtype=np.int16):
"""Export impulse response to wave files.
Dimension of data must 2.
Parameters
----------
folder : file path
Save in this folder
name_fmt : str, optional
Format string for file names with one placeholder, e.g. 'filt1{:02d}.wav'.
dtype : one of np.float32, np.int32, np.int16, np.uint8
Data is converted to this type.
"""
data = np.atleast_2d(self.in_time)
assert data.ndim == 2
assert np.all(np.abs(data) <= 1.0)
# convert and scale to new output datatype
if dtype in [np.uint8, np.int16, np.int32]:
lim_orig = (-1.0, 1.0)
lim_new = (np.iinfo(dtype).min, np.iinfo(dtype).max)
data = _rescale(data, lim_orig, lim_new).astype(dtype)
elif dtype != np.float32:
raise TypeError(f"dtype {dtype} is not supported by scipy.wavfile.write.")
path = Path(folder)
if not path.is_dir():
path.mkdir(parents=True, exist_ok=False)
for i in range(data.shape[0]):
wavfile.write(path / name_fmt.format(i + 1), self.fs, data[i])
def export_npz(self, filename, dtype=np.float32):
"""Export impulse response as npz file.
Parameters
----------
filename: str or Path
File path
dtype: numpy dtype
Convert to this type before saving
"""
np.savez(
filename, impulse_response=self.in_time.astype(dtype), samplerate=self.fs
)
def power_in_bands(self, bands=None, avgaxis=None):
"""Compute power of signal in third octave bands.
Power(band) = 1/T integral |X(f)| ** 2 df
f in band
Parameters
----------
bands : list of tuples, length nbands optional
Center, lower and upper frequencies of bands.
avgaxis: int, tuple or None
Average result over these axis
Returns
-------
P: ndarray, shape (..., nbands)
Power in bands
fcs: list, length nbands
Center frequencies of bands
"""
if bands is None:
bands = _third_octave_bands
# center frequencies
fcs = np.asarray([b[0] for b in bands])
Npow2 = 2 ** (self.nt - 1).bit_length()
f = np.fft.fftfreq(Npow2, d=1 / self.fs)
shape = list(self.in_freq.shape)
shape[-1] = len(bands)
P = np.zeros(shape)
for i, (fc, fl, fu) in enumerate(bands):
if fu < self.fs / 2: # include only bands in frequency range
iband = np.logical_and(fl <= f, f < fu)
P[..., i] = np.sum(
np.abs(np.fft.fft(self.in_time, n=Npow2, axis=-1)[..., iband]) ** 2
* 2 # energy from negative and positive frequencies
* self.dt
/ self.nt
/ self.time_length,
axis=-1,
)
else:
P[..., i] = 0
if avgaxis is not None:
P = P.mean(axis=avgaxis)
return P, fcs
@classmethod
def time_vector(cls, n, fs):
"""Time values of filter with n taps sampled at fs.
Parameters
----------
n : int
number of taps in FIR filter
fs : int
sampling frequency in Hertz
Returns
-------
(n) ndarray
times in seconds
"""
return time_vector(n, fs)
@classmethod
def freq_vector(cls, n, fs):
"""Frequency values of filter with n taps sampled at fs up to Nyquist.
Parameters
----------
n : int
Number of taps in FIR filter
fs : int
Sampling frequency in Hertz
Returns
-------
(n // 2 + 1) ndarray
Frequencies in Hz
"""
return freq_vector(n, fs, sided='single')
def filter(self, b, a=[1]):
"""Filter response along one-dimension with an IIR or FIR filter.
Parameters
----------
b : array_like
The numerator coefficient vector in a 1-D sequence.
a : array_like, optional
The denominator coefficient vector in a 1-D sequence. If ``a[0]``
is not 1, then both `a` and `b` are normalized by ``a[0]``.
"""
return self.from_time(self.fs, lfilter(b, a, self.in_time, axis=-1))
def add_noise(self, snr, unit=None):
"""Add noise to x with relative noise level SNR.
Parameters
----------
snr : float
relative magnitude of noise, i.e. snr = Ex/En
unit : None or str, optional
if "dB", SNR is specified in dB, i.e. SNR = 10*log(Ex/En).
Returns
-------
Response
"""
return self.from_time(self.fs, noisify(self.in_time, snr, unit=unit))
def psd(self, **kwargs):
"""Compute the power spectral density of the signal.
Parameters
----------
kwargs
keword arguments passed to scipy.signal.welch
Returns
-------
f : ndarray
Array of sample frequencies.
Pxx : ndarray
Power spectral density of time signal.
Notes
-----
Use scaling='density' for power per bin bandwidth and scaling='spectrum' for
power per bin.
"""
return welch(self.in_time, fs=self.fs, **kwargs)
####################
# Module functions #
####################
def noisify(x, snr, unit=None):
"""Add noise to x with relative noise level SNR.
Parameters
----------
x : ndarray
data
snr : float
relative energy of noise, snr = Energy(x)/Energy(n).
unit : None or str, optional
if "dB", snr is specified in dB, i.e. snr = 10*log(Ex/En).
Returns
-------
ndarray
data with noise
Examples
--------
Create signal
>>> import numpy as np
>>> t = np.linspace(0, 1, 1000000, endpoint=False)
>>> x = np.sin(2*np.pi*10*t) # signal
Add noise with 6 dB SNR to a sinusoidal signal:
>>> snrdB = 6
>>> xn = noisify(x, snrdB, "dB") # signal plus noise
>>> energy_x = np.linalg.norm(x)**2
>>> energy_xn = np.linalg.norm(xn)**2
>>> snr = 10 ** (snrdB / 20)
>>> np.allclose((1 + 1/snr) * energy_x, energy_xn, rtol=1e-2)
True
"""
if unit == "dB":
snr = 10 ** (snr / 20)
if np.iscomplexobj(x):
n = np.random.standard_normal(x.shape) + 1j * np.random.standard_normal(x.shape)
else:
n = np.random.standard_normal(x.shape)
n *= 1 / np.sqrt(snr) * np.linalg.norm(x) / np.linalg.norm(n)
return x + n
def time_vector(n, fs):
"""Time values of filter with n taps sampled at fs.
Parameters
----------
n : int
number of taps in FIR filter
fs : int
sampling frequency in Hertz
Returns
-------
(n) ndarray
times in seconds
"""
T = 1 / fs
return np.arange(n, dtype=float) * T # float against int wrapping
def freq_vector(n, fs, sided="single"):
"""Frequency values of filter with n taps sampled at fs up to Nyquist.
Parameters
----------
n : int
Number of taps in FIR filter
fs : int
Sampling frequency in Hertz
sided: str
Generate frequencies for a "single" or "double" sided spectrum
Returns
-------
(n // 2 + 1) ndarray
Frequencies in Hz
"""
# use float against int wrapping
if sided == "single":
f = np.arange(n // 2 + 1, dtype=float) * fs / n
elif sided == "double":
f = np.arange(n, dtype=float) * fs / n
else:
raise ValueError("Invalid value for sided.")
return f
def delay(fs, x, dt, keep_length=True, axis=-1):
"""Delay time signal by dt seconds by inserting zeros.
Examples
--------
>>> delay(1, [1, 2, 3], 1)
array([0., 1., 2.])
>>> delay(1, [1, 2, 3], 1, keep_length=False)
array([0., 1., 2., 3.])
>>> delay(1, [1, 0, 0], -1)
array([0., 0., 0.])
>>> delay(1, [1, 0, 0], -1, keep_length=False)
array([0, 0])
"""
dn = int(round(dt * fs))
x = np.asarray(x)
n = x.shape[axis]
if dn > 0:
# delay
zeros_shape = list(x.shape)
zeros_shape[axis] = dn
zeros = np.zeros(zeros_shape)
delayed = np.concatenate((zeros, x), axis=axis)
if keep_length:
# slice that takes 0 to ntaps samples along axis
slc = [slice(None)] * len(x.shape)
slc[axis] = slice(0, n)
delayed = delayed[tuple(slc)]
elif dn < 0:
# pre-delay
slc = [slice(None)] * len(x.shape)
slc[axis] = slice(-dn, n)
delayed = x[tuple(slc)]
if keep_length:
zeros_shape = list(x.shape)
zeros_shape[axis] = -dn
zeros = np.zeros(zeros_shape)
delayed = np.concatenate((delayed, zeros), axis=axis)
else:
# no delay
delayed = x
return delayed
def delay_between(h1, h2):
"""Estimate delay of h2 relative to h1 using cross correlation.
Parameters
----------
h1 : ((N,) L) array_like
Reference signals.
h2 : ((M,) L) array_like
Delayed signals.
Returns
-------
delay : (N, M) ndarray
Delays in samples. `h2[j]` is delayed relative to `h1[i]` by `delay[i, j]`.
Examples
--------
>>> a = [1, 0, 0, 0]
>>> b = [0, 0, 1, 0]
>>> delay_between(a, b)
array(2)
"""
h1 = np.atleast_2d(h1)
h2 = np.atleast_2d(h2)
assert h1.shape[-1] == h2.shape[-1], "h1 and h2 must have same number of samples"
L = h1.shape[-1]
delay = np.zeros((h1.shape[0], h2.shape[0]), dtype=int)
for i in range(h1.shape[0]):
for j in range(h2.shape[0]):
xcorrmax = np.argmax(np.correlate(h2[j], h1[i], mode="full"))
delay[i, j] = xcorrmax - L + 1
return delay.squeeze()
def align(h, href, upsample=1):
"""Align two impulse responses using cross correlation.
Parameters
----------
h : array_like
Response that will be aligned.
href : array_like
Response to which will be aligned.
upsample : int, optional
Upsample both responses before alignment by this factor.
Returns
-------
ndarray
Time aligned version of `h`.
"""
href = resample_poly(href, upsample, 1)
h = resample_poly(h, upsample, 1)
delay = delay_between(href, h).squeeze()
h = np.roll(h, -int(delay))
h = resample_poly(h, 1, upsample)
return h
#####################
# Utility functions #
#####################
def _sample_window(n, startwindow, stopwindow, window="hann"):
"""Create a sample domain window."""
swindow = np.ones(n)
if startwindow is not None:
length = startwindow[1] - startwindow[0]
w = get_window(window, 2 * length, fftbins=False)[:length]
swindow[: startwindow[0]] = 0
swindow[startwindow[0] : startwindow[1]] = w
if stopwindow is not None:
# stop window
length = stopwindow[1] - stopwindow[0]
w = get_window(window, 2 * length, fftbins=False)[length:]
swindow[stopwindow[0] + 1 : stopwindow[1] + 1] = w
swindow[stopwindow[1] + 1 :] = 0
return swindow
def _time_window(fs, n, startwindow_t, stopwindow_t, window="hann"):
"""Create a time domain window.
Negative times are relative to the end. Short cut for end time is `None`.
"""
times = time_vector(n, fs)
T = times[-1] + times[1] # total time length
if startwindow_t is None:
startwindow_n = None
else:
startwindow_n = []
for t in startwindow_t:
if t < 0:
t += T
assert 0 <= t or t <= T
startwindow_n.append(_find_nearest(times, t)[1])
if stopwindow_t is None:
stopwindow_n = None
else:
stopwindow_n = []
for t in stopwindow_t:
if t is None:
t = times[-1]
elif t < 0:
t += T
assert 0 <= t or t <= T
stopwindow_n.append(_find_nearest(times, t)[1])
twindow = _sample_window(n, startwindow_n, stopwindow_n, window=window)
return twindow
def _freq_window(fs, n, startwindow_f, stopwindow_f, window="hann"):
"""Create a frequency domain window."""
freqs = freq_vector(n, fs)
if startwindow_f is not None:
startwindow_n = [_find_nearest(freqs, f)[1] for f in startwindow_f]
else:
startwindow_n = None
if stopwindow_f is not None:
stopwindow_n = [_find_nearest(freqs, f)[1] for f in stopwindow_f]
else:
stopwindow_n = None
fwindow = _sample_window(len(freqs), startwindow_n, stopwindow_n, window=window)
return fwindow
def _rescale(x, xlim, ylim):
"""Rescale values to new bounds.
Parameters
----------
x : ndarray
Values to rescale
xlim : tuple
Original value bounds (xmin, xmax)
ylim : float
New value bounds (ymin, ymax)
Returns
-------
ndarray
Rescaled values
"""
m = (ylim[1] - ylim[0]) / (xlim[1] - xlim[0])
c = ylim[1] - m * xlim[1]
y = m * x + c
return y
def _find_nearest(array, value):
"""Find nearest value in an array and its index.
Returns
-------
value
Value of nearest entry in array
idx
Index of that value
"""
idx = (np.abs(array - value)).argmin()
return array[idx], idx
def _construct_window_around_peak(fs, irs, tleft, tright, alpha=0.5):
"""Create time window around maximum of response.
Parameters
----------
fs : int
Sample rate.
irs : array_like
Input response.
tleft : float
Start of time window relative to impulse response peak.
tright : float
End of time window relative to impulse response peak.
alpha : float, optional
Alpha parameter of Tukey window.
Returns
-------
ndarray
Time windows.
"""
orig_shape = irs.shape
flat_irs = irs.reshape(-1, irs.shape[-1])
sleft = int(fs * tleft)
sright = int(fs * tright)
windows = np.ones(flat_irs.shape)
for i in range(flat_irs.shape[0]):
ipeak = np.argmax(np.abs(flat_irs[i]))
iwstart = max(ipeak - sleft, 0)
iwend = min(ipeak + sright, flat_irs.shape[-1])
window = tukey(iwend - iwstart, alpha=alpha)
windows[i, iwstart:iwend] *= window
windows[i, :iwstart] = 0
windows[i, iwend:] = 0
return windows.reshape(orig_shape)
def _aroll(x, n, circular=False, axis=-1, copy=True):
"""Roll each entry along axis individually.
Can be used to delay / shift each response by its own shift.
Parameters
----------
x : ndarray (Ni...,M,Nj...)
Input array
n : ndarray (Ni...,Nj...)
Delay times of each entry along axis.
circular: bool, optional
If True, wrap around ends. Else replace with zeros.
axis : int, optional
Axis along which is rolled.
copy : bool, optional
If True, operate on copy of `x`. Else roll inplace.
Returns
-------
ndarray (Ni...,M,Nj...)
Array with rolled entries
"""
n = n.astype(int)
if copy:
x = x.copy()
# move axis to first dim and reshape to 2D
xview = np.rollaxis(x, axis)
xview = xview.reshape(xview.shape[0], -1)
n = n.reshape(-1)
assert n.shape[0] == xview.shape[1], 'Shapes of x and n do not match.'
for i in range(n.shape[0]):
xview[:, i] = np.roll(xview[:, i], n[i])
if not circular:
if n[i] > 0:
xview[: n[i], i] = 0
elif n[i] < 0:
xview[n[i] :, i] = 0
return x
# center, lower, upper frequency of third octave bands
_third_octave_bands = (
(16, 13.920_292_470_942_801, 17.538_469_504_833_955),
(20, 17.538_469_504_833_95, 22.097_086_912_079_607),
(25, 22.097_086_912_079_615, 27.840_584_941_885_613),
(31, 27.840_584_941_885_602, 35.076_939_009_667_91),
(40, 35.076_939_009_667_9, 44.194_173_824_159_215),
(50, 44.194_173_824_159_23, 55.681_169_883_771_226),
(63, 55.681_169_883_771_204, 70.153_878_019_335_82),
(80, 70.153_878_019_335_82, 88.388_347_648_318_44),
(100, 88.388_347_648_318_43, 111.362_339_767_542_41),
(125, 111.362_339_767_542_41, 140.307_756_038_671_64),
(160, 140.307_756_038_671_64, 176.776_695_296_636_9),
(200, 176.776_695_296_636_86, 222.724_679_535_084_82),
(250, 222.724_679_535_084_82, 280.615_512_077_343_3),
(315, 280.615_512_077_343_3, 353.553_390_593_273_8),
(400, 353.553_390_593_273_8, 445.449_359_070_169_75),
(500, 445.449_359_070_169_63, 561.231_024_154_686_6),
(630, 561.231_024_154_686_6, 707.106_781_186_547_6),
(800, 707.106_781_186_547_6, 890.898_718_140_339_5),
(1000, 890.898_718_140_339_3, 1122.462_048_309_373_1),
(1260, 1122.462_048_309_373_1, 1414.213_562_373_095),
(1600, 1414.213_562_373_094_9, 1781.797_436_280_678_5),
(2000, 1781.797_436_280_678_5, 2244.924_096_618_746_3),
(2500, 2244.924_096_618_746_3, 2828.427_124_746_19),
(3200, 2828.427_124_746_19, 3563.594_872_561_358),
(4000, 3563.594_872_561_357, 4489.848_193_237_492_5),
(5000, 4489.848_193_237_492_5, 5656.854_249_492_38),
(6300, 5656.854_249_492_379_5, 7127.189_745_122_714),
(8000, 7127.189_745_122_714, 8979.696_386_474_985),
(10000, 8979.696_386_474_985, 11313.708_498_984_76),
(12600, 11313.708_498_984_759, 14254.379_490_245_428),
(16000, 14254.379_490_245_428, 17959.392_772_949_97),
(20000, 17959.392_772_949_966, 22627.416_997_969_518),
)
def _add_octave_band_xticks(ax, bands=np.array(_third_octave_bands)[:, 0]):
"""Add band ticks to axis."""
left, right = ax.get_xlim()
b = bands[np.logical_and(left <= bands, bands <= right)]
ax.set_xticks(b)
ax.set_xticks([], minor=True)
ax.set_xticklabels([int(round(bs)) for bs in b], minor=False)
Functions
def align(h, href, upsample=1)
-
Align two impulse responses using cross correlation.
Parameters
h
:array_like
- Response that will be aligned.
href
:array_like
- Response to which will be aligned.
upsample
:int
, optional- Upsample both responses before alignment by this factor.
Returns
ndarray
- Time aligned version of
h
.
Expand source code
def align(h, href, upsample=1): """Align two impulse responses using cross correlation. Parameters ---------- h : array_like Response that will be aligned. href : array_like Response to which will be aligned. upsample : int, optional Upsample both responses before alignment by this factor. Returns ------- ndarray Time aligned version of `h`. """ href = resample_poly(href, upsample, 1) h = resample_poly(h, upsample, 1) delay = delay_between(href, h).squeeze() h = np.roll(h, -int(delay)) h = resample_poly(h, 1, upsample) return h
def delay(fs, x, dt, keep_length=True, axis=-1)
-
Delay time signal by dt seconds by inserting zeros.
Examples
>>> delay(1, [1, 2, 3], 1) array([0., 1., 2.])
>>> delay(1, [1, 2, 3], 1, keep_length=False) array([0., 1., 2., 3.])
>>> delay(1, [1, 0, 0], -1) array([0., 0., 0.])
>>> delay(1, [1, 0, 0], -1, keep_length=False) array([0, 0])
Expand source code
def delay(fs, x, dt, keep_length=True, axis=-1): """Delay time signal by dt seconds by inserting zeros. Examples -------- >>> delay(1, [1, 2, 3], 1) array([0., 1., 2.]) >>> delay(1, [1, 2, 3], 1, keep_length=False) array([0., 1., 2., 3.]) >>> delay(1, [1, 0, 0], -1) array([0., 0., 0.]) >>> delay(1, [1, 0, 0], -1, keep_length=False) array([0, 0]) """ dn = int(round(dt * fs)) x = np.asarray(x) n = x.shape[axis] if dn > 0: # delay zeros_shape = list(x.shape) zeros_shape[axis] = dn zeros = np.zeros(zeros_shape) delayed = np.concatenate((zeros, x), axis=axis) if keep_length: # slice that takes 0 to ntaps samples along axis slc = [slice(None)] * len(x.shape) slc[axis] = slice(0, n) delayed = delayed[tuple(slc)] elif dn < 0: # pre-delay slc = [slice(None)] * len(x.shape) slc[axis] = slice(-dn, n) delayed = x[tuple(slc)] if keep_length: zeros_shape = list(x.shape) zeros_shape[axis] = -dn zeros = np.zeros(zeros_shape) delayed = np.concatenate((delayed, zeros), axis=axis) else: # no delay delayed = x return delayed
def delay_between(h1, h2)
-
Estimate delay of h2 relative to h1 using cross correlation.
Parameters
h1
:((N,) L) array_like
- Reference signals.
h2
:((M,) L) array_like
- Delayed signals.
Returns
delay
:(N, M) ndarray
- Delays in samples.
h2[j]
is delayed relative toh1[i]
bydelay()[i, j]
.
Examples
>>> a = [1, 0, 0, 0] >>> b = [0, 0, 1, 0] >>> delay_between(a, b) array(2)
Expand source code
def delay_between(h1, h2): """Estimate delay of h2 relative to h1 using cross correlation. Parameters ---------- h1 : ((N,) L) array_like Reference signals. h2 : ((M,) L) array_like Delayed signals. Returns ------- delay : (N, M) ndarray Delays in samples. `h2[j]` is delayed relative to `h1[i]` by `delay[i, j]`. Examples -------- >>> a = [1, 0, 0, 0] >>> b = [0, 0, 1, 0] >>> delay_between(a, b) array(2) """ h1 = np.atleast_2d(h1) h2 = np.atleast_2d(h2) assert h1.shape[-1] == h2.shape[-1], "h1 and h2 must have same number of samples" L = h1.shape[-1] delay = np.zeros((h1.shape[0], h2.shape[0]), dtype=int) for i in range(h1.shape[0]): for j in range(h2.shape[0]): xcorrmax = np.argmax(np.correlate(h2[j], h1[i], mode="full")) delay[i, j] = xcorrmax - L + 1 return delay.squeeze()
def freq_vector(n, fs, sided='single')
-
Frequency values of filter with n taps sampled at fs up to Nyquist.
Parameters
n
:int
- Number of taps in FIR filter
fs
:int
- Sampling frequency in Hertz
sided
:str
- Generate frequencies for a "single" or "double" sided spectrum
Returns
(n // 2 + 1) ndarray Frequencies in Hz
Expand source code
def freq_vector(n, fs, sided="single"): """Frequency values of filter with n taps sampled at fs up to Nyquist. Parameters ---------- n : int Number of taps in FIR filter fs : int Sampling frequency in Hertz sided: str Generate frequencies for a "single" or "double" sided spectrum Returns ------- (n // 2 + 1) ndarray Frequencies in Hz """ # use float against int wrapping if sided == "single": f = np.arange(n // 2 + 1, dtype=float) * fs / n elif sided == "double": f = np.arange(n, dtype=float) * fs / n else: raise ValueError("Invalid value for sided.") return f
def noisify(x, snr, unit=None)
-
Add noise to x with relative noise level SNR.
Parameters
x
:ndarray
- data
snr
:float
- relative energy of noise, snr = Energy(x)/Energy(n).
unit
:None
orstr
, optional- if "dB", snr is specified in dB, i.e. snr = 10*log(Ex/En).
Returns
ndarray
- data with noise
Examples
Create signal
>>> import numpy as np >>> t = np.linspace(0, 1, 1000000, endpoint=False) >>> x = np.sin(2*np.pi*10*t) # signal
Add noise with 6 dB SNR to a sinusoidal signal:
>>> snrdB = 6 >>> xn = noisify(x, snrdB, "dB") # signal plus noise
>>> energy_x = np.linalg.norm(x)**2 >>> energy_xn = np.linalg.norm(xn)**2 >>> snr = 10 ** (snrdB / 20) >>> np.allclose((1 + 1/snr) * energy_x, energy_xn, rtol=1e-2) True
Expand source code
def noisify(x, snr, unit=None): """Add noise to x with relative noise level SNR. Parameters ---------- x : ndarray data snr : float relative energy of noise, snr = Energy(x)/Energy(n). unit : None or str, optional if "dB", snr is specified in dB, i.e. snr = 10*log(Ex/En). Returns ------- ndarray data with noise Examples -------- Create signal >>> import numpy as np >>> t = np.linspace(0, 1, 1000000, endpoint=False) >>> x = np.sin(2*np.pi*10*t) # signal Add noise with 6 dB SNR to a sinusoidal signal: >>> snrdB = 6 >>> xn = noisify(x, snrdB, "dB") # signal plus noise >>> energy_x = np.linalg.norm(x)**2 >>> energy_xn = np.linalg.norm(xn)**2 >>> snr = 10 ** (snrdB / 20) >>> np.allclose((1 + 1/snr) * energy_x, energy_xn, rtol=1e-2) True """ if unit == "dB": snr = 10 ** (snr / 20) if np.iscomplexobj(x): n = np.random.standard_normal(x.shape) + 1j * np.random.standard_normal(x.shape) else: n = np.random.standard_normal(x.shape) n *= 1 / np.sqrt(snr) * np.linalg.norm(x) / np.linalg.norm(n) return x + n
def time_vector(n, fs)
-
Time values of filter with n taps sampled at fs.
Parameters
n
:int
- number of taps in FIR filter
fs
:int
- sampling frequency in Hertz
Returns
(n) ndarray times in seconds
Expand source code
def time_vector(n, fs): """Time values of filter with n taps sampled at fs. Parameters ---------- n : int number of taps in FIR filter fs : int sampling frequency in Hertz Returns ------- (n) ndarray times in seconds """ T = 1 / fs return np.arange(n, dtype=float) * T # float against int wrapping
Classes
class Response (fs, fdata=None, tdata=None, isEvenSampled=True)
-
Representation of a linear response in time and frequency domain.
Create Response from time or frequency data.
Use
from_time
orfrom_freq methods
to create objects of this class!Parameters
fs
:int
- Sampling frequency in Hertz
fdata
:(…, nt) complex ndarray
, optional- Single sided frequency spectra with nt from ns to nr points.
tdata
:(…, nf) real ndarray
, optional- Time responses with nt from ns to nr points.
isEvenSampled
:bool
orNone
, optional- If fdata is given, this tells us if the last entry of fdata is the
Nyquist frequency or not. Must be
None
if tdata is given.
Raises
ValueError
- if neither fdata or tdata are given.
Expand source code
class Response(object): """Representation of a linear response in time and frequency domain.""" def __init__(self, fs, fdata=None, tdata=None, isEvenSampled=True): """Create Response from time or frequency data. Use `from_time` or `from_freq methods` to create objects of this class! Parameters ---------- fs : int Sampling frequency in Hertz fdata : (..., nt) complex ndarray, optional Single sided frequency spectra with nt from ns to nr points. tdata : (..., nf) real ndarray, optional Time responses with nt from ns to nr points. isEvenSampled : bool or None, optional If fdata is given, this tells us if the last entry of fdata is the Nyquist frequency or not. Must be `None` if tdata is given. Raises ------ ValueError if neither fdata or tdata are given. """ assert float(fs).is_integer() if fdata is not None and tdata is None: fdata = np.atleast_1d(fdata) self._nf = fdata.shape[-1] if isEvenSampled: self._nt = 2 * (self._nf - 1) else: self._nt = 2 * self._nf - 1 self._isEvenSampled = isEvenSampled self.__set_frequency_data(fdata) elif tdata is not None and fdata is None: assert np.all(np.imag(tdata) == 0), "Time data must be real." tdata = np.atleast_1d(tdata) self._nt = tdata.shape[-1] self._nf = self._nt // 2 + 1 self._isEvenSampled = self._nt % 2 == 0 self.__set_time_data(tdata) else: raise ValueError("One and only one of fdata and tdata must be given.") self._fs = int(fs) self._freqs = freq_vector(self._nt, fs) self._times = time_vector(self._nt, fs) self._time_length = self._nt * 1 / fs self.df = self._freqs[1] # frequency resolution self.dt = self._times[1] # time resolution @classmethod def from_time(cls, fs, tdata, **kwargs): """Generate Response obj from time response data.""" tf = cls(fs, tdata=tdata, **kwargs) return tf @classmethod def from_freq(cls, fs, fdata, **kwargs): """Generate Response obj from frequency response data.""" tf = cls(fs, fdata=fdata, **kwargs) return tf @classmethod def from_wav(cls, fps): """Import responses from wav files. Parameters ---------- fps : list File paths of all wav files. Returns ------- Response New Response object with imported time responses. """ fpi = iter(fps) fs, data = wavfile.read(next(fpi)) hlist = [data] + [wavfile.read(fp)[1] for fp in fpi] h = np.array(hlist) if data.dtype in [np.uint8, np.int16, np.int32]: lim_orig = (np.iinfo(data.dtype).min, np.iinfo(data.dtype).max) lim_new = (-1.0, 1.0) h = _rescale(h, lim_orig, lim_new).astype(np.double) return cls.from_time(fs, h) @classmethod def new_dirac(cls, fs, T=None, n=None, nch=(1,)): """Generate new allpass / dirac response.""" nch = np.atleast_1d(nch) if T is not None: nt = round(fs * T) else: nt = n h = np.zeros((*nch, nt)) h[..., 0] = 1 return cls.from_time(fs, h) @classmethod def join(cls, tfs, axis=0, newaxis=True): """Concat or stack a set of Responses along a given axis. Parameters ---------- tfs : array_like List of Responses axis : int, optional Indice of axis along wich to concatenate / stack TFs. newaxis : bool, optional If True, do not concatenate but stack arrays along a new axis. Returns ------- Response Note ---- Transfer functions need to have same sampling rate, length etc. """ joinfunc = np.stack if newaxis else np.concatenate tdata = joinfunc([tf.in_time for tf in tfs], axis=axis) return cls.from_time(tfs[0].fs, tdata) @property def time_length(self): """Length of time response in seconds.""" return self._time_length @property def nf(self): # noqa: D401 """Number of frequencies in frequency representation.""" return len(self._freqs) @property def nt(self): # noqa: D401 """Number of taps.""" return len(self._times) @property def fs(self): # noqa: D401 """Sampling frequency.""" return self._fs @property def freqs(self): # noqa: D401 """Frequencies.""" return self._freqs @property def times(self): # noqa: D401 """Times.""" return self._times @property def in_time(self): """Time domain response. Returns ------- (... , n) ndarray Real FIR filters. """ if self._in_time is None: self._in_time = np.fft.irfft(self._in_freq, n=self._times.size) return self._in_time @property def in_freq(self): """Single sided frequency spectrum. Returns ------- (... , n) ndarray Complex frequency response. """ if self._in_freq is None: self._in_freq = np.fft.rfft(self._in_time) return self._in_freq @property def amplitude_spectrum(self): """Amplitude spectrum.""" X = self.in_freq / self.nt if self.nt % 2 == 0: # zero and nyquist element only appear once in complex spectrum X[..., 1:-1] *= 2 else: # there is no nyquist element X[..., 1:] *= 2 return X def __set_time_data(self, tdata): """Set time data without creating new object.""" assert tdata.shape[-1] == self._nt self._in_time = tdata self._in_freq = None def __set_frequency_data(self, fdata): """Set frequency data without creating new object.""" assert fdata.shape[-1] == self._nf self._in_freq = fdata self._in_time = None def plot( self, group_delay=False, slce=None, flim=None, dblim=None, tlim=None, grpdlim=None, dbref=1, show=False, use_fig=None, label=None, unwrap_phase=False, logf=True, third_oct_f=True, plot_kw={}, **fig_kw, ): """Plot the response in both domains. Parameters ---------- group_delay : bool, optional Display group delay instead of phase. slce : numpy.lib.index_tricks.IndexExpression only plot subset of responses defined by a slice. Last dimension (frequency or time) is always completely taken. flim : tuple or None, optional Frequency axis limits as tuple `(lower, upper)` dblim : tuple or None, optional Magnitude axis limits as tuple `(lower, upper)` tlim : tuple or None, optional Time axis limits as tuple `(lower, upper)` grpdlim: tuple or None, optional Group delay axis limit as tuple `(lower, upper)` dbref : float dB reference in magnitude plot show : bool, optional Run `matplotlib.pyplot.show()` use_fig : matplotlib.pyplot.Figure Reuse an existing figure. label : None, optional Description unwrap_phase : bool, optional unwrap phase in phase plot logf : bool, optional If `True`, use logarithmic frequency axis. third_oct_f: bool, optional Label frequency axis with third octave bands. plot_kw : dictionary, optional Keyword arguments passed to the `plt.plot` commands. **fig_kw Additional options passe to figure creation. """ if use_fig is None: fig_kw = {**{"figsize": (10, 10)}, **fig_kw} fig, axes = plt.subplots(nrows=3, constrained_layout=True, **fig_kw) else: fig = use_fig axes = fig.axes self.plot_magnitude( use_ax=axes[0], slce=slce, dblim=dblim, flim=flim, dbref=dbref, label=label, plot_kw=plot_kw, logf=logf, third_oct_f=third_oct_f, ) if group_delay: self.plot_group_delay( use_ax=axes[1], slce=slce, flim=flim, ylim=grpdlim, plot_kw=plot_kw, logf=logf, third_oct_f=third_oct_f, ) else: self.plot_phase( use_ax=axes[1], slce=slce, flim=flim, plot_kw=plot_kw, unwrap=unwrap_phase, logf=logf, third_oct_f=third_oct_f, ) self.plot_time( use_ax=axes[2], tlim=tlim, slce=slce, plot_kw=plot_kw ) if show: plt.show() return fig def plot_magnitude( self, use_ax=None, slce=None, dblim=None, flim=None, dbref=1, label=None, plot_kw={}, logf=True, third_oct_f=True, **fig_kw, ): """Plot magnitude response.""" # TODO: compute db limits similar to librosa.amplitude_to_db / power_to_db if use_ax is None: fig_kw = {**{"figsize": (10, 5)}, **fig_kw} fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw) else: ax = use_ax fig = ax.get_figure() # append frequency/time dimension to slice if slce is None: slce = [np.s_[:] for n in range(len(self.in_time.shape))] elif isinstance(slce, tuple): slce = slce + (np.s_[:],) else: slce = (slce, np.s_[:]) # move time / frequency axis to first dimension freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape( (self.nf, -1) ) plotf = ax.semilogx if logf else ax.plot plotf( self.freqs, 20 * np.log10(np.abs(freq_plotready / dbref)), label=label, **plot_kw, ) ax.set_xlabel("Frequency [Hz]") ax.set_ylabel("Magnitude [dB]") ax.set_title("Frequency response") ax.grid(True) if flim is None: lowlim = min(10, self.fs / 2 / 100) flim = (lowlim, self.fs / 2) ax.set_xlim(flim) if dblim is not None: ax.set_ylim(dblim) if label is not None: ax.legend() if third_oct_f: _add_octave_band_xticks(ax) return fig def plot_phase( self, use_ax=None, slce=None, flim=None, label=None, unwrap=False, ylim=None, plot_kw={}, logf=True, third_oct_f=True, **fig_kw, ): """Plot phase response.""" if use_ax is None: fig_kw = {**{"figsize": (10, 5)}, **fig_kw} fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw) else: ax = use_ax fig = ax.get_figure() # append frequency/time dimension to slice if slce is None: slce = [np.s_[:] for n in range(len(self.in_time.shape))] elif isinstance(slce, tuple): slce = slce + (np.s_[:],) else: slce = (slce, np.s_[:]) # move time / frequency axis to first dimension freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape( (self.nf, -1) ) phase = ( np.unwrap(np.angle(freq_plotready)) if unwrap else np.angle(freq_plotready) ) plotf = ax.semilogx if logf else ax.plot plotf(self.freqs, phase, label=label, **plot_kw) ax.set_xlabel("Frequency [Hz]") ax.set_ylabel("Phase [rad]") ax.set_title("Phase response") ax.grid(True) if flim is None: lowlim = min(10, self.fs / 2 / 100) flim = (lowlim, self.fs / 2) ax.set_xlim(flim) if ylim: ax.set_ylim(ylim) if label is not None: ax.legend() if third_oct_f: _add_octave_band_xticks(ax) return fig def plot_time( self, use_ax=None, slce=None, tlim=None, ylim=None, label=None, plot_kw={}, **fig_kw, ): """Plot time response.""" if use_ax is None: fig_kw = {**{"figsize": (10, 5)}, **fig_kw} fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw) else: ax = use_ax fig = ax.get_figure() # append frequency/time dimension to slice if slce is None: slce = [np.s_[:] for n in range(len(self.in_time.shape))] elif isinstance(slce, tuple): slce = slce + (np.s_[:],) else: slce = (slce, np.s_[:]) time_plotready = np.rollaxis(self.in_time[tuple(slce)], -1).reshape( (self.nt, -1) ) ax.plot(self.times, time_plotready, label=label, **plot_kw) ax.set_xlabel("Time [s]") ax.set_ylabel("") ax.set_title("Time response") ax.grid(True) if tlim: ax.set_xlim(tlim) if ylim: ax.set_ylim(ylim) if label is not None: ax.legend() return fig def plot_group_delay( self, use_ax=None, slce=None, flim=None, label=None, ylim=None, plot_kw={}, logf=True, third_oct_f=True, **fig_kw, ): """Plot group delay.""" if use_ax is None: fig_kw = {**{"figsize": (10, 5)}, **fig_kw} fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw) else: ax = use_ax fig = ax.get_figure() # append frequency/time dimension to slice if slce is None: slce = [np.s_[:] for n in range(len(self.in_time.shape))] elif isinstance(slce, tuple): slce = slce + (np.s_[:],) else: slce = (slce, np.s_[:]) # move time / frequency axis to first dimension freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape( (self.nf, -1) ) df = self.freqs[1] - self.freqs[0] # TODO: use scipy.signal.group_delay here as below has problem at larger delays grpd = -np.gradient(np.unwrap(np.angle(freq_plotready)), df, axis=0) plotf = ax.semilogx if logf else ax.plot plotf(self.freqs, grpd, label=label, **plot_kw) ax.set_xlabel("Frequency [Hz]") ax.set_ylabel("Delay [s]") ax.set_title("Group Delay") ax.grid(True) if flim is None: lowlim = min(10, self.fs / 2 / 100) flim = (lowlim, self.fs / 2) ax.set_xlim(flim) if ylim: ax.set_ylim(ylim) if label is not None: ax.legend() if third_oct_f: _add_octave_band_xticks(ax) return fig def plot_power_in_bands( self, bands=None, use_ax=None, barkwargs={}, avgaxis=None, dbref=1, **figkwargs ): """Plot signal's power in bands. Parameters ---------- bands : list or None, optional List of tuples (f_center, f_lower, f_upper). If `None`, use third octave bands. use_ax : matplotlib.axis.Axis or None, optional Plot into this axis. barkwargs : dict Keyword arguments to `axis.bar` avgaxis : int, tuple or None Average power over these axes. dbref : float dB reference. **figkwargs Keyword arguments passed to plt.subplots Returns ------- P : ndarray Power in bands fc : ndarray Band frequencies fig : matplotlib.figure.Figure Figure """ P, fc = self.power_in_bands(bands=bands, avgaxis=avgaxis) nbands = P.shape[-1] P = np.atleast_2d(P).reshape((-1, nbands)) if use_ax is None: fig, ax = plt.subplots(**figkwargs) else: ax = use_ax fig = ax.get_figure() xticks = range(1, nbands + 1) for i in range(P.shape[0]): ax.bar(xticks, 10 * np.log10(P[i] / dbref ** 2), **barkwargs) ax.set_xticks(xticks) ax.set_xticklabels(["{:.0f}".format(f) for f in fc], rotation="vertical") ax.grid(True) ax.set_xlabel("Band's center frequencies [Hz]") ax.set_ylabel("Power [dB]") return (P, fc, fig) def time_window(self, startwindow, stopwindow, window="hann"): """Apply time domain windows. Parameters ---------- startwindow : None or tuple Tuple (t1, t2) with beginning and end times of window opening. stopwindow : None or tuple Tuple (t1, t2) with beginning and end times of window closing. window : string or tuple of string and parameter values, optional Desired window to use. See scipy.signal.get_window for a list of windows and required parameters. Returns ------- Response Time windowed response object """ n = self.times.size twindow = _time_window(self.fs, n, startwindow, stopwindow, window=window) new_response = self.from_time(self.fs, self.in_time * twindow) return new_response def freq_window(self, startwindow, stopwindow, window="hann"): """Apply frequency domain window. Parameters ---------- startwindow : None or tuple Tuple (t1, t2) with beginning and end frequencies of window opening. stopwindow : None or tuple Tuple (t1, t2) with beginning and end frequencies of window closing. window : string or tuple of string and parameter values, optional Desired window to use. See scipy.signal.get_window for a list of windows and required parameters. Returns ------- Response Frequency windowed response object """ n = self.times.size fwindow = _freq_window(self.fs, n, startwindow, stopwindow, window=window) new_response = self.from_freq(self.fs, self.in_freq * fwindow) return new_response def window_around_peak(self, tleft, tright, alpha, return_window=False): """Time window each impulse response around its peak value. Parameters ---------- tleft, tright : float Window starts `tleft` seconds before and ends `tright` seconds after maximum of impulse response. alpha : float `alpha` parameter of `scipy.signal.tukey` window. return_window : bool, optional Also return used time window Returns ------- Response Time windowed response object. ndarray Time window, if `return_window` is `True`. """ window = _construct_window_around_peak( self.fs, self.in_time, tleft, tright, alpha=alpha ) if return_window: return self.from_time(self.fs, self.in_time * window), window return self.from_time(self.fs, self.in_time * window) def delay(self, dt, keep_length=True): """Delay time response by dt seconds. Rounds of to closest integer delay. """ x = delay(self.fs, self.in_time, dt, keep_length=keep_length) return self.from_time(self.fs, x) def circdelay(self, dt): """Delay by circular shift. Rounds of to closest integer delay. """ x = self.in_time n = int(round(dt * self.fs)) shifted = np.roll(x, n, axis=-1) return self.from_time(self.fs, shifted) def timecrop(self, start, end): """Crop time response. Parameters ---------- start, end : float Start and end times in seconds. Does not include sample at t=end. Use end=None to force inclusion of last sample. Returns ------- Response New Response object with cropped time. Notes ----- Creates new Response object. Examples -------- >>> import numpy as np >>> from response import Response >>> r = Response.from_time(100, np.random.normal(size=100)) >>> split = 0.2 The following holds: >>> np.all(np.concatenate( ... ( ... r.timecrop(0, split).in_time, ... r.timecrop(split, None).in_time, ... ), ... axis=-1, ... ) == r.in_time) True """ if start < 0: start += self.time_length if end is not None and end < 0: end += self.time_length assert 0 <= start < self.time_length assert end is None or (0 < end <= self.time_length) _, i_start = _find_nearest(self.times, start) if end is None: i_end = None else: _, i_end = _find_nearest(self.times, end) h = self.in_time[..., i_start:i_end] new_response = self.from_time(self.fs, h) return new_response def non_causal_timecrop(self, length): """Cut length of non-causal impulse response. "FFT shift, cropping on both ends, iFFT shift" Parameters ---------- length : float final length in seconds Returns ------- Response New Response object new length. Note ---- Can introduce delay pre-delay by a sample. """ assert length < self.time_length cut = (self.time_length - length) / 2 _, i_start = _find_nearest(self.times, cut) _, i_end = _find_nearest(self.times, self.time_length - cut) h = np.fft.ifftshift(np.fft.fftshift(self.in_time)[..., i_start:i_end]) new_response = self.from_time(self.fs, h) if new_response.time_length != length: w = f"Could not precisely shrink to {length}s with fs = {self.fs}" warnings.warn(w) return new_response def zeropad(self, before, after): """Zeropad time response. Parameters ---------- before, after : int Number of zero samples inserted before and after response. Returns ------- Response Zeropadded response """ assert before % 1 == 0 assert after % 1 == 0 dims = self.in_time.ndim pad_width = [(0, 0) for n in range(dims)] pad_width[-1] = (int(before), int(after)) h = np.pad(self.in_time, pad_width, "constant") return self.from_time(self.fs, h) def zeropad_to_power_of_2(self): """Pad time response for length of power of 2. Returns ------- Response New response object with larger, power of 2 length. """ # https://stackoverflow.com/questions/14267555/find-the-smallest-power-of-2-greater-than-n-in-python n = 2 ** (self.nt - 1).bit_length() return self.zeropad(0, n - self.nt) def zeropad_to_length(self, n): """Zeropad time response to specific length. Returns ------- Response New response object with new length n. """ oldn = self.nt assert n >= oldn return self.zeropad(0, n - oldn) def resample(self, fs_new, normalize="same_gain", window=None): """Resample using Fourier method. Parameters ---------- fs_new : int New sample rate normalize : str, optional If 'same_gain', normalize such that the gain is the same as the original signal. If 'same_amplitude', amplitudes will be preserved. window : None, optional Passed to scipy.signal.resample. Returns ------- Response New resampled response object. Raises ------ ValueError If resulting number of samples would be a non-integer. """ if fs_new == self.fs: return self nt_new = fs_new * self.time_length if nt_new % 1 != 0: raise ValueError( "New number of samples must be integer, but is {}".format(nt_new) ) nt_new = int(nt_new) h_new = resample(self.in_time, nt_new, axis=-1, window=window) if normalize == "same_gain": h_new *= self.nt / nt_new elif normalize == "same_amplitude": pass else: raise ValueError( "Expected 'same_gain' or 'same_amplitude', got %s" % (normalize,) ) return self.from_time(fs_new, h_new) def resample_poly(self, fs_new, normalize="same_gain", window=("kaiser", 5.0)): """Resample using polyphase filtering. Parameters ---------- fs_new : int New sample rate normalize : str, optional If 'same_gain', normalize such that the gain is the same as the original signal. If 'same_amplitude', amplitudes will be preserved. window : None, optional Passed to scipy.signal.resample_poly. Returns ------- Response New resampled response object. """ if fs_new == self.fs: return self ratio = Fraction(fs_new, self.fs) up = ratio.numerator down = ratio.denominator if up > 1000 or down > 1000: print("Warning: resampling with high ratio {}/{}".format(up, down)) h_new = resample_poly(self.in_time, up, down, axis=-1, window=window) if normalize == "same_gain": h_new *= down / up elif normalize == "same_amplitude": pass else: raise ValueError( "Expected 'same_gain' or 'same_amplitude', got %s" % (normalize,) ) return self.from_time(fs_new, h_new) def normalize(self, maxval=1): """Normalize time response. Parameters ---------- maxval: float, optional Maximum amplitude in resulting time response. Returns ------- Response """ h = self.in_time h /= np.abs(self.in_time).max() h *= maxval return self.from_time(self.fs, h) def export_wav(self, folder, name_fmt="{:02d}.wav", dtype=np.int16): """Export impulse response to wave files. Dimension of data must 2. Parameters ---------- folder : file path Save in this folder name_fmt : str, optional Format string for file names with one placeholder, e.g. 'filt1{:02d}.wav'. dtype : one of np.float32, np.int32, np.int16, np.uint8 Data is converted to this type. """ data = np.atleast_2d(self.in_time) assert data.ndim == 2 assert np.all(np.abs(data) <= 1.0) # convert and scale to new output datatype if dtype in [np.uint8, np.int16, np.int32]: lim_orig = (-1.0, 1.0) lim_new = (np.iinfo(dtype).min, np.iinfo(dtype).max) data = _rescale(data, lim_orig, lim_new).astype(dtype) elif dtype != np.float32: raise TypeError(f"dtype {dtype} is not supported by scipy.wavfile.write.") path = Path(folder) if not path.is_dir(): path.mkdir(parents=True, exist_ok=False) for i in range(data.shape[0]): wavfile.write(path / name_fmt.format(i + 1), self.fs, data[i]) def export_npz(self, filename, dtype=np.float32): """Export impulse response as npz file. Parameters ---------- filename: str or Path File path dtype: numpy dtype Convert to this type before saving """ np.savez( filename, impulse_response=self.in_time.astype(dtype), samplerate=self.fs ) def power_in_bands(self, bands=None, avgaxis=None): """Compute power of signal in third octave bands. Power(band) = 1/T integral |X(f)| ** 2 df f in band Parameters ---------- bands : list of tuples, length nbands optional Center, lower and upper frequencies of bands. avgaxis: int, tuple or None Average result over these axis Returns ------- P: ndarray, shape (..., nbands) Power in bands fcs: list, length nbands Center frequencies of bands """ if bands is None: bands = _third_octave_bands # center frequencies fcs = np.asarray([b[0] for b in bands]) Npow2 = 2 ** (self.nt - 1).bit_length() f = np.fft.fftfreq(Npow2, d=1 / self.fs) shape = list(self.in_freq.shape) shape[-1] = len(bands) P = np.zeros(shape) for i, (fc, fl, fu) in enumerate(bands): if fu < self.fs / 2: # include only bands in frequency range iband = np.logical_and(fl <= f, f < fu) P[..., i] = np.sum( np.abs(np.fft.fft(self.in_time, n=Npow2, axis=-1)[..., iband]) ** 2 * 2 # energy from negative and positive frequencies * self.dt / self.nt / self.time_length, axis=-1, ) else: P[..., i] = 0 if avgaxis is not None: P = P.mean(axis=avgaxis) return P, fcs @classmethod def time_vector(cls, n, fs): """Time values of filter with n taps sampled at fs. Parameters ---------- n : int number of taps in FIR filter fs : int sampling frequency in Hertz Returns ------- (n) ndarray times in seconds """ return time_vector(n, fs) @classmethod def freq_vector(cls, n, fs): """Frequency values of filter with n taps sampled at fs up to Nyquist. Parameters ---------- n : int Number of taps in FIR filter fs : int Sampling frequency in Hertz Returns ------- (n // 2 + 1) ndarray Frequencies in Hz """ return freq_vector(n, fs, sided='single') def filter(self, b, a=[1]): """Filter response along one-dimension with an IIR or FIR filter. Parameters ---------- b : array_like The numerator coefficient vector in a 1-D sequence. a : array_like, optional The denominator coefficient vector in a 1-D sequence. If ``a[0]`` is not 1, then both `a` and `b` are normalized by ``a[0]``. """ return self.from_time(self.fs, lfilter(b, a, self.in_time, axis=-1)) def add_noise(self, snr, unit=None): """Add noise to x with relative noise level SNR. Parameters ---------- snr : float relative magnitude of noise, i.e. snr = Ex/En unit : None or str, optional if "dB", SNR is specified in dB, i.e. SNR = 10*log(Ex/En). Returns ------- Response """ return self.from_time(self.fs, noisify(self.in_time, snr, unit=unit)) def psd(self, **kwargs): """Compute the power spectral density of the signal. Parameters ---------- kwargs keword arguments passed to scipy.signal.welch Returns ------- f : ndarray Array of sample frequencies. Pxx : ndarray Power spectral density of time signal. Notes ----- Use scaling='density' for power per bin bandwidth and scaling='spectrum' for power per bin. """ return welch(self.in_time, fs=self.fs, **kwargs)
Static methods
def freq_vector(n, fs)
-
Frequency values of filter with n taps sampled at fs up to Nyquist.
Parameters
n
:int
- Number of taps in FIR filter
fs
:int
- Sampling frequency in Hertz
Returns
(n // 2 + 1) ndarray Frequencies in Hz
Expand source code
@classmethod def freq_vector(cls, n, fs): """Frequency values of filter with n taps sampled at fs up to Nyquist. Parameters ---------- n : int Number of taps in FIR filter fs : int Sampling frequency in Hertz Returns ------- (n // 2 + 1) ndarray Frequencies in Hz """ return freq_vector(n, fs, sided='single')
def from_freq(fs, fdata, **kwargs)
-
Generate Response obj from frequency response data.
Expand source code
@classmethod def from_freq(cls, fs, fdata, **kwargs): """Generate Response obj from frequency response data.""" tf = cls(fs, fdata=fdata, **kwargs) return tf
def from_time(fs, tdata, **kwargs)
-
Generate Response obj from time response data.
Expand source code
@classmethod def from_time(cls, fs, tdata, **kwargs): """Generate Response obj from time response data.""" tf = cls(fs, tdata=tdata, **kwargs) return tf
def from_wav(fps)
-
Import responses from wav files.
Parameters
fps
:list
- File paths of all wav files.
Returns
Response
- New Response object with imported time responses.
Expand source code
@classmethod def from_wav(cls, fps): """Import responses from wav files. Parameters ---------- fps : list File paths of all wav files. Returns ------- Response New Response object with imported time responses. """ fpi = iter(fps) fs, data = wavfile.read(next(fpi)) hlist = [data] + [wavfile.read(fp)[1] for fp in fpi] h = np.array(hlist) if data.dtype in [np.uint8, np.int16, np.int32]: lim_orig = (np.iinfo(data.dtype).min, np.iinfo(data.dtype).max) lim_new = (-1.0, 1.0) h = _rescale(h, lim_orig, lim_new).astype(np.double) return cls.from_time(fs, h)
def join(tfs, axis=0, newaxis=True)
-
Concat or stack a set of Responses along a given axis.
Parameters
tfs
:array_like
- List of Responses
axis
:int
, optional- Indice of axis along wich to concatenate / stack TFs.
newaxis
:bool
, optional- If True, do not concatenate but stack arrays along a new axis.
Returns
Note
Transfer functions need to have same sampling rate, length etc.
Expand source code
@classmethod def join(cls, tfs, axis=0, newaxis=True): """Concat or stack a set of Responses along a given axis. Parameters ---------- tfs : array_like List of Responses axis : int, optional Indice of axis along wich to concatenate / stack TFs. newaxis : bool, optional If True, do not concatenate but stack arrays along a new axis. Returns ------- Response Note ---- Transfer functions need to have same sampling rate, length etc. """ joinfunc = np.stack if newaxis else np.concatenate tdata = joinfunc([tf.in_time for tf in tfs], axis=axis) return cls.from_time(tfs[0].fs, tdata)
def new_dirac(fs, T=None, n=None, nch=(1,))
-
Generate new allpass / dirac response.
Expand source code
@classmethod def new_dirac(cls, fs, T=None, n=None, nch=(1,)): """Generate new allpass / dirac response.""" nch = np.atleast_1d(nch) if T is not None: nt = round(fs * T) else: nt = n h = np.zeros((*nch, nt)) h[..., 0] = 1 return cls.from_time(fs, h)
def time_vector(n, fs)
-
Time values of filter with n taps sampled at fs.
Parameters
n
:int
- number of taps in FIR filter
fs
:int
- sampling frequency in Hertz
Returns
(n) ndarray times in seconds
Expand source code
@classmethod def time_vector(cls, n, fs): """Time values of filter with n taps sampled at fs. Parameters ---------- n : int number of taps in FIR filter fs : int sampling frequency in Hertz Returns ------- (n) ndarray times in seconds """ return time_vector(n, fs)
Instance variables
var amplitude_spectrum
-
Amplitude spectrum.
Expand source code
@property def amplitude_spectrum(self): """Amplitude spectrum.""" X = self.in_freq / self.nt if self.nt % 2 == 0: # zero and nyquist element only appear once in complex spectrum X[..., 1:-1] *= 2 else: # there is no nyquist element X[..., 1:] *= 2 return X
var freqs
-
Frequencies.
Expand source code
@property def freqs(self): # noqa: D401 """Frequencies.""" return self._freqs
var fs
-
Sampling frequency.
Expand source code
@property def fs(self): # noqa: D401 """Sampling frequency.""" return self._fs
var in_freq
-
Single sided frequency spectrum.
Returns
(… , n) ndarray Complex frequency response.
Expand source code
@property def in_freq(self): """Single sided frequency spectrum. Returns ------- (... , n) ndarray Complex frequency response. """ if self._in_freq is None: self._in_freq = np.fft.rfft(self._in_time) return self._in_freq
var in_time
-
Time domain response.
Returns
(… , n) ndarray Real FIR filters.
Expand source code
@property def in_time(self): """Time domain response. Returns ------- (... , n) ndarray Real FIR filters. """ if self._in_time is None: self._in_time = np.fft.irfft(self._in_freq, n=self._times.size) return self._in_time
var nf
-
Number of frequencies in frequency representation.
Expand source code
@property def nf(self): # noqa: D401 """Number of frequencies in frequency representation.""" return len(self._freqs)
var nt
-
Number of taps.
Expand source code
@property def nt(self): # noqa: D401 """Number of taps.""" return len(self._times)
var time_length
-
Length of time response in seconds.
Expand source code
@property def time_length(self): """Length of time response in seconds.""" return self._time_length
var times
-
Times.
Expand source code
@property def times(self): # noqa: D401 """Times.""" return self._times
Methods
def add_noise(self, snr, unit=None)
-
Add noise to x with relative noise level SNR.
Parameters
snr
:float
- relative magnitude of noise, i.e. snr = Ex/En
unit
:None
orstr
, optional- if "dB", SNR is specified in dB, i.e. SNR = 10*log(Ex/En).
Returns
Expand source code
def add_noise(self, snr, unit=None): """Add noise to x with relative noise level SNR. Parameters ---------- snr : float relative magnitude of noise, i.e. snr = Ex/En unit : None or str, optional if "dB", SNR is specified in dB, i.e. SNR = 10*log(Ex/En). Returns ------- Response """ return self.from_time(self.fs, noisify(self.in_time, snr, unit=unit))
def circdelay(self, dt)
-
Delay by circular shift.
Rounds of to closest integer delay.
Expand source code
def circdelay(self, dt): """Delay by circular shift. Rounds of to closest integer delay. """ x = self.in_time n = int(round(dt * self.fs)) shifted = np.roll(x, n, axis=-1) return self.from_time(self.fs, shifted)
def delay(self, dt, keep_length=True)
-
Delay time response by dt seconds.
Rounds of to closest integer delay.
Expand source code
def delay(self, dt, keep_length=True): """Delay time response by dt seconds. Rounds of to closest integer delay. """ x = delay(self.fs, self.in_time, dt, keep_length=keep_length) return self.from_time(self.fs, x)
def export_npz(self, filename, dtype=numpy.float32)
-
Export impulse response as npz file.
Parameters
filename
:str
orPath
- File path
dtype
:numpy dtype
- Convert to this type before saving
Expand source code
def export_npz(self, filename, dtype=np.float32): """Export impulse response as npz file. Parameters ---------- filename: str or Path File path dtype: numpy dtype Convert to this type before saving """ np.savez( filename, impulse_response=self.in_time.astype(dtype), samplerate=self.fs )
def export_wav(self, folder, name_fmt='{:02d}.wav', dtype=numpy.int16)
-
Export impulse response to wave files.
Dimension of data must 2.
Parameters
folder
:file path
- Save in this folder
name_fmt
:str
, optional- Format string for file names with one placeholder, e.g. 'filt1{:02d}.wav'.
dtype
:one
ofnp.float32, np.int32, np.int16, np.uint8
- Data is converted to this type.
Expand source code
def export_wav(self, folder, name_fmt="{:02d}.wav", dtype=np.int16): """Export impulse response to wave files. Dimension of data must 2. Parameters ---------- folder : file path Save in this folder name_fmt : str, optional Format string for file names with one placeholder, e.g. 'filt1{:02d}.wav'. dtype : one of np.float32, np.int32, np.int16, np.uint8 Data is converted to this type. """ data = np.atleast_2d(self.in_time) assert data.ndim == 2 assert np.all(np.abs(data) <= 1.0) # convert and scale to new output datatype if dtype in [np.uint8, np.int16, np.int32]: lim_orig = (-1.0, 1.0) lim_new = (np.iinfo(dtype).min, np.iinfo(dtype).max) data = _rescale(data, lim_orig, lim_new).astype(dtype) elif dtype != np.float32: raise TypeError(f"dtype {dtype} is not supported by scipy.wavfile.write.") path = Path(folder) if not path.is_dir(): path.mkdir(parents=True, exist_ok=False) for i in range(data.shape[0]): wavfile.write(path / name_fmt.format(i + 1), self.fs, data[i])
def filter(self, b, a=[1])
-
Filter response along one-dimension with an IIR or FIR filter.
Parameters
b
:array_like
- The numerator coefficient vector in a 1-D sequence.
a
:array_like
, optional- The denominator coefficient vector in a 1-D sequence.
If
a[0]
is not 1, then botha
andb
are normalized bya[0]
.
Expand source code
def filter(self, b, a=[1]): """Filter response along one-dimension with an IIR or FIR filter. Parameters ---------- b : array_like The numerator coefficient vector in a 1-D sequence. a : array_like, optional The denominator coefficient vector in a 1-D sequence. If ``a[0]`` is not 1, then both `a` and `b` are normalized by ``a[0]``. """ return self.from_time(self.fs, lfilter(b, a, self.in_time, axis=-1))
def freq_window(self, startwindow, stopwindow, window='hann')
-
Apply frequency domain window.
Parameters
startwindow
:None
ortuple
- Tuple (t1, t2) with beginning and end frequencies of window opening.
stopwindow
:None
ortuple
- Tuple (t1, t2) with beginning and end frequencies of window closing.
window
:string
ortuple
ofstring and parameter values
, optional- Desired window to use. See scipy.signal.get_window for a list of windows and required parameters.
Returns
Response
- Frequency windowed response object
Expand source code
def freq_window(self, startwindow, stopwindow, window="hann"): """Apply frequency domain window. Parameters ---------- startwindow : None or tuple Tuple (t1, t2) with beginning and end frequencies of window opening. stopwindow : None or tuple Tuple (t1, t2) with beginning and end frequencies of window closing. window : string or tuple of string and parameter values, optional Desired window to use. See scipy.signal.get_window for a list of windows and required parameters. Returns ------- Response Frequency windowed response object """ n = self.times.size fwindow = _freq_window(self.fs, n, startwindow, stopwindow, window=window) new_response = self.from_freq(self.fs, self.in_freq * fwindow) return new_response
def non_causal_timecrop(self, length)
-
Cut length of non-causal impulse response.
"FFT shift, cropping on both ends, iFFT shift"
Parameters
length
:float
- final length in seconds
Returns
Response
- New Response object new length.
Note
Can introduce delay pre-delay by a sample.
Expand source code
def non_causal_timecrop(self, length): """Cut length of non-causal impulse response. "FFT shift, cropping on both ends, iFFT shift" Parameters ---------- length : float final length in seconds Returns ------- Response New Response object new length. Note ---- Can introduce delay pre-delay by a sample. """ assert length < self.time_length cut = (self.time_length - length) / 2 _, i_start = _find_nearest(self.times, cut) _, i_end = _find_nearest(self.times, self.time_length - cut) h = np.fft.ifftshift(np.fft.fftshift(self.in_time)[..., i_start:i_end]) new_response = self.from_time(self.fs, h) if new_response.time_length != length: w = f"Could not precisely shrink to {length}s with fs = {self.fs}" warnings.warn(w) return new_response
def normalize(self, maxval=1)
-
Normalize time response.
Parameters
maxval
:float
, optional- Maximum amplitude in resulting time response.
Returns
Expand source code
def normalize(self, maxval=1): """Normalize time response. Parameters ---------- maxval: float, optional Maximum amplitude in resulting time response. Returns ------- Response """ h = self.in_time h /= np.abs(self.in_time).max() h *= maxval return self.from_time(self.fs, h)
def plot(self, group_delay=False, slce=None, flim=None, dblim=None, tlim=None, grpdlim=None, dbref=1, show=False, use_fig=None, label=None, unwrap_phase=False, logf=True, third_oct_f=True, plot_kw={}, **fig_kw)
-
Plot the response in both domains.
Parameters
group_delay
:bool
, optional- Display group delay instead of phase.
slce
:numpy.lib.index_tricks.IndexExpression
- only plot subset of responses defined by a slice. Last dimension (frequency or time) is always completely taken.
flim
:tuple
orNone
, optional- Frequency axis limits as tuple
(lower, upper)
dblim
:tuple
orNone
, optional- Magnitude axis limits as tuple
(lower, upper)
tlim
:tuple
orNone
, optional- Time axis limits as tuple
(lower, upper)
grpdlim
:tuple
orNone
, optional- Group delay axis limit as tuple
(lower, upper)
dbref
:float
- dB reference in magnitude plot
show
:bool
, optional- Run
matplotlib.pyplot.show()
use_fig
:matplotlib.pyplot.Figure
- Reuse an existing figure.
label
:None
, optional- Description
unwrap_phase
:bool
, optional- unwrap phase in phase plot
logf
:bool
, optional- If
True
, use logarithmic frequency axis. third_oct_f
:bool
, optional- Label frequency axis with third octave bands.
plot_kw
:dictionary
, optional- Keyword arguments passed to the
plt.plot
commands. **fig_kw
- Additional options passe to figure creation.
Expand source code
def plot( self, group_delay=False, slce=None, flim=None, dblim=None, tlim=None, grpdlim=None, dbref=1, show=False, use_fig=None, label=None, unwrap_phase=False, logf=True, third_oct_f=True, plot_kw={}, **fig_kw, ): """Plot the response in both domains. Parameters ---------- group_delay : bool, optional Display group delay instead of phase. slce : numpy.lib.index_tricks.IndexExpression only plot subset of responses defined by a slice. Last dimension (frequency or time) is always completely taken. flim : tuple or None, optional Frequency axis limits as tuple `(lower, upper)` dblim : tuple or None, optional Magnitude axis limits as tuple `(lower, upper)` tlim : tuple or None, optional Time axis limits as tuple `(lower, upper)` grpdlim: tuple or None, optional Group delay axis limit as tuple `(lower, upper)` dbref : float dB reference in magnitude plot show : bool, optional Run `matplotlib.pyplot.show()` use_fig : matplotlib.pyplot.Figure Reuse an existing figure. label : None, optional Description unwrap_phase : bool, optional unwrap phase in phase plot logf : bool, optional If `True`, use logarithmic frequency axis. third_oct_f: bool, optional Label frequency axis with third octave bands. plot_kw : dictionary, optional Keyword arguments passed to the `plt.plot` commands. **fig_kw Additional options passe to figure creation. """ if use_fig is None: fig_kw = {**{"figsize": (10, 10)}, **fig_kw} fig, axes = plt.subplots(nrows=3, constrained_layout=True, **fig_kw) else: fig = use_fig axes = fig.axes self.plot_magnitude( use_ax=axes[0], slce=slce, dblim=dblim, flim=flim, dbref=dbref, label=label, plot_kw=plot_kw, logf=logf, third_oct_f=third_oct_f, ) if group_delay: self.plot_group_delay( use_ax=axes[1], slce=slce, flim=flim, ylim=grpdlim, plot_kw=plot_kw, logf=logf, third_oct_f=third_oct_f, ) else: self.plot_phase( use_ax=axes[1], slce=slce, flim=flim, plot_kw=plot_kw, unwrap=unwrap_phase, logf=logf, third_oct_f=third_oct_f, ) self.plot_time( use_ax=axes[2], tlim=tlim, slce=slce, plot_kw=plot_kw ) if show: plt.show() return fig
def plot_group_delay(self, use_ax=None, slce=None, flim=None, label=None, ylim=None, plot_kw={}, logf=True, third_oct_f=True, **fig_kw)
-
Plot group delay.
Expand source code
def plot_group_delay( self, use_ax=None, slce=None, flim=None, label=None, ylim=None, plot_kw={}, logf=True, third_oct_f=True, **fig_kw, ): """Plot group delay.""" if use_ax is None: fig_kw = {**{"figsize": (10, 5)}, **fig_kw} fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw) else: ax = use_ax fig = ax.get_figure() # append frequency/time dimension to slice if slce is None: slce = [np.s_[:] for n in range(len(self.in_time.shape))] elif isinstance(slce, tuple): slce = slce + (np.s_[:],) else: slce = (slce, np.s_[:]) # move time / frequency axis to first dimension freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape( (self.nf, -1) ) df = self.freqs[1] - self.freqs[0] # TODO: use scipy.signal.group_delay here as below has problem at larger delays grpd = -np.gradient(np.unwrap(np.angle(freq_plotready)), df, axis=0) plotf = ax.semilogx if logf else ax.plot plotf(self.freqs, grpd, label=label, **plot_kw) ax.set_xlabel("Frequency [Hz]") ax.set_ylabel("Delay [s]") ax.set_title("Group Delay") ax.grid(True) if flim is None: lowlim = min(10, self.fs / 2 / 100) flim = (lowlim, self.fs / 2) ax.set_xlim(flim) if ylim: ax.set_ylim(ylim) if label is not None: ax.legend() if third_oct_f: _add_octave_band_xticks(ax) return fig
def plot_magnitude(self, use_ax=None, slce=None, dblim=None, flim=None, dbref=1, label=None, plot_kw={}, logf=True, third_oct_f=True, **fig_kw)
-
Plot magnitude response.
Expand source code
def plot_magnitude( self, use_ax=None, slce=None, dblim=None, flim=None, dbref=1, label=None, plot_kw={}, logf=True, third_oct_f=True, **fig_kw, ): """Plot magnitude response.""" # TODO: compute db limits similar to librosa.amplitude_to_db / power_to_db if use_ax is None: fig_kw = {**{"figsize": (10, 5)}, **fig_kw} fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw) else: ax = use_ax fig = ax.get_figure() # append frequency/time dimension to slice if slce is None: slce = [np.s_[:] for n in range(len(self.in_time.shape))] elif isinstance(slce, tuple): slce = slce + (np.s_[:],) else: slce = (slce, np.s_[:]) # move time / frequency axis to first dimension freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape( (self.nf, -1) ) plotf = ax.semilogx if logf else ax.plot plotf( self.freqs, 20 * np.log10(np.abs(freq_plotready / dbref)), label=label, **plot_kw, ) ax.set_xlabel("Frequency [Hz]") ax.set_ylabel("Magnitude [dB]") ax.set_title("Frequency response") ax.grid(True) if flim is None: lowlim = min(10, self.fs / 2 / 100) flim = (lowlim, self.fs / 2) ax.set_xlim(flim) if dblim is not None: ax.set_ylim(dblim) if label is not None: ax.legend() if third_oct_f: _add_octave_band_xticks(ax) return fig
def plot_phase(self, use_ax=None, slce=None, flim=None, label=None, unwrap=False, ylim=None, plot_kw={}, logf=True, third_oct_f=True, **fig_kw)
-
Plot phase response.
Expand source code
def plot_phase( self, use_ax=None, slce=None, flim=None, label=None, unwrap=False, ylim=None, plot_kw={}, logf=True, third_oct_f=True, **fig_kw, ): """Plot phase response.""" if use_ax is None: fig_kw = {**{"figsize": (10, 5)}, **fig_kw} fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw) else: ax = use_ax fig = ax.get_figure() # append frequency/time dimension to slice if slce is None: slce = [np.s_[:] for n in range(len(self.in_time.shape))] elif isinstance(slce, tuple): slce = slce + (np.s_[:],) else: slce = (slce, np.s_[:]) # move time / frequency axis to first dimension freq_plotready = np.rollaxis(self.in_freq[tuple(slce)], -1).reshape( (self.nf, -1) ) phase = ( np.unwrap(np.angle(freq_plotready)) if unwrap else np.angle(freq_plotready) ) plotf = ax.semilogx if logf else ax.plot plotf(self.freqs, phase, label=label, **plot_kw) ax.set_xlabel("Frequency [Hz]") ax.set_ylabel("Phase [rad]") ax.set_title("Phase response") ax.grid(True) if flim is None: lowlim = min(10, self.fs / 2 / 100) flim = (lowlim, self.fs / 2) ax.set_xlim(flim) if ylim: ax.set_ylim(ylim) if label is not None: ax.legend() if third_oct_f: _add_octave_band_xticks(ax) return fig
def plot_power_in_bands(self, bands=None, use_ax=None, barkwargs={}, avgaxis=None, dbref=1, **figkwargs)
-
Plot signal's power in bands.
Parameters
bands
:list
orNone
, optional- List of tuples (f_center, f_lower, f_upper). If
None
, use third octave bands. use_ax
:matplotlib.axis.Axis
orNone
, optional- Plot into this axis.
barkwargs
:dict
- Keyword arguments to
axis.bar
avgaxis
:int, tuple
orNone
- Average power over these axes.
dbref
:float
- dB reference.
**figkwargs
- Keyword arguments passed to plt.subplots
Returns
P
:ndarray
- Power in bands
fc
:ndarray
- Band frequencies
fig
:matplotlib.figure.Figure
- Figure
Expand source code
def plot_power_in_bands( self, bands=None, use_ax=None, barkwargs={}, avgaxis=None, dbref=1, **figkwargs ): """Plot signal's power in bands. Parameters ---------- bands : list or None, optional List of tuples (f_center, f_lower, f_upper). If `None`, use third octave bands. use_ax : matplotlib.axis.Axis or None, optional Plot into this axis. barkwargs : dict Keyword arguments to `axis.bar` avgaxis : int, tuple or None Average power over these axes. dbref : float dB reference. **figkwargs Keyword arguments passed to plt.subplots Returns ------- P : ndarray Power in bands fc : ndarray Band frequencies fig : matplotlib.figure.Figure Figure """ P, fc = self.power_in_bands(bands=bands, avgaxis=avgaxis) nbands = P.shape[-1] P = np.atleast_2d(P).reshape((-1, nbands)) if use_ax is None: fig, ax = plt.subplots(**figkwargs) else: ax = use_ax fig = ax.get_figure() xticks = range(1, nbands + 1) for i in range(P.shape[0]): ax.bar(xticks, 10 * np.log10(P[i] / dbref ** 2), **barkwargs) ax.set_xticks(xticks) ax.set_xticklabels(["{:.0f}".format(f) for f in fc], rotation="vertical") ax.grid(True) ax.set_xlabel("Band's center frequencies [Hz]") ax.set_ylabel("Power [dB]") return (P, fc, fig)
def plot_time(self, use_ax=None, slce=None, tlim=None, ylim=None, label=None, plot_kw={}, **fig_kw)
-
Plot time response.
Expand source code
def plot_time( self, use_ax=None, slce=None, tlim=None, ylim=None, label=None, plot_kw={}, **fig_kw, ): """Plot time response.""" if use_ax is None: fig_kw = {**{"figsize": (10, 5)}, **fig_kw} fig, ax = plt.subplots(nrows=1, constrained_layout=True, **fig_kw) else: ax = use_ax fig = ax.get_figure() # append frequency/time dimension to slice if slce is None: slce = [np.s_[:] for n in range(len(self.in_time.shape))] elif isinstance(slce, tuple): slce = slce + (np.s_[:],) else: slce = (slce, np.s_[:]) time_plotready = np.rollaxis(self.in_time[tuple(slce)], -1).reshape( (self.nt, -1) ) ax.plot(self.times, time_plotready, label=label, **plot_kw) ax.set_xlabel("Time [s]") ax.set_ylabel("") ax.set_title("Time response") ax.grid(True) if tlim: ax.set_xlim(tlim) if ylim: ax.set_ylim(ylim) if label is not None: ax.legend() return fig
def power_in_bands(self, bands=None, avgaxis=None)
-
Compute power of signal in third octave bands.
Power(band) = 1/T integral |X(f)| ** 2 df f in band
Parameters
bands
:list
oftuples, length nbands optional
- Center, lower and upper frequencies of bands.
avgaxis
:int, tuple
orNone
- Average result over these axis
Returns
P
:ndarray, shape (…, nbands)
- Power in bands
fcs
:list, length nbands
- Center frequencies of bands
Expand source code
def power_in_bands(self, bands=None, avgaxis=None): """Compute power of signal in third octave bands. Power(band) = 1/T integral |X(f)| ** 2 df f in band Parameters ---------- bands : list of tuples, length nbands optional Center, lower and upper frequencies of bands. avgaxis: int, tuple or None Average result over these axis Returns ------- P: ndarray, shape (..., nbands) Power in bands fcs: list, length nbands Center frequencies of bands """ if bands is None: bands = _third_octave_bands # center frequencies fcs = np.asarray([b[0] for b in bands]) Npow2 = 2 ** (self.nt - 1).bit_length() f = np.fft.fftfreq(Npow2, d=1 / self.fs) shape = list(self.in_freq.shape) shape[-1] = len(bands) P = np.zeros(shape) for i, (fc, fl, fu) in enumerate(bands): if fu < self.fs / 2: # include only bands in frequency range iband = np.logical_and(fl <= f, f < fu) P[..., i] = np.sum( np.abs(np.fft.fft(self.in_time, n=Npow2, axis=-1)[..., iband]) ** 2 * 2 # energy from negative and positive frequencies * self.dt / self.nt / self.time_length, axis=-1, ) else: P[..., i] = 0 if avgaxis is not None: P = P.mean(axis=avgaxis) return P, fcs
def psd(self, **kwargs)
-
Compute the power spectral density of the signal.
Parameters
kwargs
- keword arguments passed to scipy.signal.welch
Returns
f
:ndarray
- Array of sample frequencies.
Pxx
:ndarray
- Power spectral density of time signal.
Notes
Use scaling='density' for power per bin bandwidth and scaling='spectrum' for power per bin.
Expand source code
def psd(self, **kwargs): """Compute the power spectral density of the signal. Parameters ---------- kwargs keword arguments passed to scipy.signal.welch Returns ------- f : ndarray Array of sample frequencies. Pxx : ndarray Power spectral density of time signal. Notes ----- Use scaling='density' for power per bin bandwidth and scaling='spectrum' for power per bin. """ return welch(self.in_time, fs=self.fs, **kwargs)
def resample(self, fs_new, normalize='same_gain', window=None)
-
Resample using Fourier method.
Parameters
fs_new
:int
- New sample rate
normalize
:str
, optional- If 'same_gain', normalize such that the gain is the same as the original signal. If 'same_amplitude', amplitudes will be preserved.
window
:None
, optional- Passed to scipy.signal.resample.
Returns
Response
- New resampled response object.
Raises
ValueError
- If resulting number of samples would be a non-integer.
Expand source code
def resample(self, fs_new, normalize="same_gain", window=None): """Resample using Fourier method. Parameters ---------- fs_new : int New sample rate normalize : str, optional If 'same_gain', normalize such that the gain is the same as the original signal. If 'same_amplitude', amplitudes will be preserved. window : None, optional Passed to scipy.signal.resample. Returns ------- Response New resampled response object. Raises ------ ValueError If resulting number of samples would be a non-integer. """ if fs_new == self.fs: return self nt_new = fs_new * self.time_length if nt_new % 1 != 0: raise ValueError( "New number of samples must be integer, but is {}".format(nt_new) ) nt_new = int(nt_new) h_new = resample(self.in_time, nt_new, axis=-1, window=window) if normalize == "same_gain": h_new *= self.nt / nt_new elif normalize == "same_amplitude": pass else: raise ValueError( "Expected 'same_gain' or 'same_amplitude', got %s" % (normalize,) ) return self.from_time(fs_new, h_new)
def resample_poly(self, fs_new, normalize='same_gain', window=('kaiser', 5.0))
-
Resample using polyphase filtering.
Parameters
fs_new
:int
- New sample rate
normalize
:str
, optional- If 'same_gain', normalize such that the gain is the same as the original signal. If 'same_amplitude', amplitudes will be preserved.
window
:None
, optional- Passed to scipy.signal.resample_poly.
Returns
Response
- New resampled response object.
Expand source code
def resample_poly(self, fs_new, normalize="same_gain", window=("kaiser", 5.0)): """Resample using polyphase filtering. Parameters ---------- fs_new : int New sample rate normalize : str, optional If 'same_gain', normalize such that the gain is the same as the original signal. If 'same_amplitude', amplitudes will be preserved. window : None, optional Passed to scipy.signal.resample_poly. Returns ------- Response New resampled response object. """ if fs_new == self.fs: return self ratio = Fraction(fs_new, self.fs) up = ratio.numerator down = ratio.denominator if up > 1000 or down > 1000: print("Warning: resampling with high ratio {}/{}".format(up, down)) h_new = resample_poly(self.in_time, up, down, axis=-1, window=window) if normalize == "same_gain": h_new *= down / up elif normalize == "same_amplitude": pass else: raise ValueError( "Expected 'same_gain' or 'same_amplitude', got %s" % (normalize,) ) return self.from_time(fs_new, h_new)
def time_window(self, startwindow, stopwindow, window='hann')
-
Apply time domain windows.
Parameters
startwindow
:None
ortuple
- Tuple (t1, t2) with beginning and end times of window opening.
stopwindow
:None
ortuple
- Tuple (t1, t2) with beginning and end times of window closing.
window
:string
ortuple
ofstring and parameter values
, optional- Desired window to use. See scipy.signal.get_window for a list of windows and required parameters.
Returns
Response
- Time windowed response object
Expand source code
def time_window(self, startwindow, stopwindow, window="hann"): """Apply time domain windows. Parameters ---------- startwindow : None or tuple Tuple (t1, t2) with beginning and end times of window opening. stopwindow : None or tuple Tuple (t1, t2) with beginning and end times of window closing. window : string or tuple of string and parameter values, optional Desired window to use. See scipy.signal.get_window for a list of windows and required parameters. Returns ------- Response Time windowed response object """ n = self.times.size twindow = _time_window(self.fs, n, startwindow, stopwindow, window=window) new_response = self.from_time(self.fs, self.in_time * twindow) return new_response
def timecrop(self, start, end)
-
Crop time response.
Parameters
start
,end
:float
- Start and end times in seconds. Does not include sample at t=end. Use end=None to force inclusion of last sample.
Returns
Response
- New Response object with cropped time.
Notes
Creates new Response object.
Examples
>>> import numpy as np >>> from response import Response >>> r = Response.from_time(100, np.random.normal(size=100)) >>> split = 0.2
The following holds:
>>> np.all(np.concatenate( ... ( ... r.timecrop(0, split).in_time, ... r.timecrop(split, None).in_time, ... ), ... axis=-1, ... ) == r.in_time) True
Expand source code
def timecrop(self, start, end): """Crop time response. Parameters ---------- start, end : float Start and end times in seconds. Does not include sample at t=end. Use end=None to force inclusion of last sample. Returns ------- Response New Response object with cropped time. Notes ----- Creates new Response object. Examples -------- >>> import numpy as np >>> from response import Response >>> r = Response.from_time(100, np.random.normal(size=100)) >>> split = 0.2 The following holds: >>> np.all(np.concatenate( ... ( ... r.timecrop(0, split).in_time, ... r.timecrop(split, None).in_time, ... ), ... axis=-1, ... ) == r.in_time) True """ if start < 0: start += self.time_length if end is not None and end < 0: end += self.time_length assert 0 <= start < self.time_length assert end is None or (0 < end <= self.time_length) _, i_start = _find_nearest(self.times, start) if end is None: i_end = None else: _, i_end = _find_nearest(self.times, end) h = self.in_time[..., i_start:i_end] new_response = self.from_time(self.fs, h) return new_response
def window_around_peak(self, tleft, tright, alpha, return_window=False)
-
Time window each impulse response around its peak value.
Parameters
tleft
,tright
:float
- Window starts
tleft
seconds before and endstright
seconds after maximum of impulse response. alpha
:float
alpha
parameter ofscipy.signal.tukey
window.return_window
:bool
, optional- Also return used time window
Returns
Response
- Time windowed response object.
ndarray
- Time window, if
return_window
isTrue
.
Expand source code
def window_around_peak(self, tleft, tright, alpha, return_window=False): """Time window each impulse response around its peak value. Parameters ---------- tleft, tright : float Window starts `tleft` seconds before and ends `tright` seconds after maximum of impulse response. alpha : float `alpha` parameter of `scipy.signal.tukey` window. return_window : bool, optional Also return used time window Returns ------- Response Time windowed response object. ndarray Time window, if `return_window` is `True`. """ window = _construct_window_around_peak( self.fs, self.in_time, tleft, tright, alpha=alpha ) if return_window: return self.from_time(self.fs, self.in_time * window), window return self.from_time(self.fs, self.in_time * window)
def zeropad(self, before, after)
-
Zeropad time response.
Parameters
before
,after
:int
- Number of zero samples inserted before and after response.
Returns
Response
- Zeropadded response
Expand source code
def zeropad(self, before, after): """Zeropad time response. Parameters ---------- before, after : int Number of zero samples inserted before and after response. Returns ------- Response Zeropadded response """ assert before % 1 == 0 assert after % 1 == 0 dims = self.in_time.ndim pad_width = [(0, 0) for n in range(dims)] pad_width[-1] = (int(before), int(after)) h = np.pad(self.in_time, pad_width, "constant") return self.from_time(self.fs, h)
def zeropad_to_length(self, n)
-
Expand source code
def zeropad_to_length(self, n): """Zeropad time response to specific length. Returns ------- Response New response object with new length n. """ oldn = self.nt assert n >= oldn return self.zeropad(0, n - oldn)
def zeropad_to_power_of_2(self)
-
Pad time response for length of power of 2.
Returns
Response
- New response object with larger, power of 2 length.
Expand source code
def zeropad_to_power_of_2(self): """Pad time response for length of power of 2. Returns ------- Response New response object with larger, power of 2 length. """ # https://stackoverflow.com/questions/14267555/find-the-smallest-power-of-2-greater-than-n-in-python n = 2 ** (self.nt - 1).bit_length() return self.zeropad(0, n - self.nt)