{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%pylab inline\n", "import sk_dsp_comm.sigsys as ss\n", "#from sk_dsp_comm.pyaudio_helper import pyaudio_helper as pah\n", "import sk_dsp_comm.fir_design_helper as fir_d\n", "import scipy.signal as signal\n", "import scipy.io as io\n", "from ipywidgets import interact, interactive, fixed, interact_manual\n", "import ipywidgets as widgets\n", "from IPython.display import Audio, display\n", "from IPython.display import Image, SVG" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pylab.rcParams['savefig.dpi'] = 100 # default 72\n", "%config InlineBackend.figure_formats=['svg'] # SVG inline viewing" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Introduction\n", "A simplified block diagram of PyAudio *streaming-based* (nonblocking) signal processing when using `pyaudio_helper` and `ipython` widgets." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Image(\"audio_files/pyaudio_dsp_IO.png\", width=\"90%\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Available Audio I/O Devices\n", "If you add or delete devices by plugging or unplugging USB audio ibterface, this list becomdes invalid. Restart the kernel and run again to get the correct device index list. For two channel apps both the input and output devices must support two channels. For the Sabrent USB audio devices, which has one input and two outputs, Windows for example may improperly list the devices as having two inputs. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "pah.available_devices()\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Index 0 device name = Built-in Microph, inputs = 2, outputs = 0\n", "\n", "Index 1 device name = Built-in Output, inputs = 0, outputs = 2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Real-Time Loop Through\n", "Here we set up a simple `callback` function that passes the input samples directly to the output. The module `pyaudio_helper` provides a class for managing a `pyaudio` stream object, capturing the samples processed by the `callback` function, and collection of performance metrics. Once the `callback` function is written/declared a `DSPIOStream` object can be created and then the `stream(Tsec)` method can be executed to start the input/output processing, e.g.,\n", "\n", "```python\n", "import pyaudio_helper as pah\n", "\n", "DSP_IO = pah.DSPIOStream(callback,in_idx, out_idx)\n", "DSP_IO.interactive_stream(Tsec = 2, numChan = 1)\n", "```\n", "where `in_idx` is the index of the chosen input device found using `available_devices()` and similarly `out_idx` is the index of the chosen output device.\n", "\n", "* The `callback` function must be written first as the function name used by the object to call the *callback*." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### A Minimal Callback\n", "No globals required here as there is no instrumentation configured, externally defined algorithm coefficients, and no widgets being used." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# define a pass through, y = x, callback\n", "def callback(in_data, frame_count, time_info, status):\n", " # convert byte data to ndarray\n", " in_data_nda = np.frombuffer(in_data, dtype=np.int16)\n", " #***********************************************\n", " # DSP operations here\n", " # Here we simply pass the input to the output, i.e.\n", " # y[n] = x[n]\n", " x = in_data_nda.astype(float32)\n", " y = x\n", " # Typically more DSP code here \n", " #\n", " #***********************************************\n", " # Convert from float back to int16\n", " y = y.astype(int16)\n", " # Convert ndarray back to bytes\n", " return y.tobytes(), pah.pyaudio.paContinue" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### A Basic Callback\n", "This callback makes use of the instrumentation capabilities of the `DSPIOStream` and also has a simple lowpass filter waiting *in-the-wings* if a line of code in commented out and a following line is uncomments, e.g.,\n", "```python\n", " #y = x\n", " # Typically more DSP code here\n", " y, zi = signal.lfilter(b,a,x,zi=zi) # for FIR or simple IIR\n", "```\n", "Notice that `globals` are now used for the `DSP_IO` object, the filter coefficients in arrays, `a` and `b`, and also the filter states held in the array `zi`. In its present form te filtering is commented out, but can be uncommented to allow a simple 1st-order IIR lowpass filter to operate on one channel of audio streaming through the system." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Add a simple IIR LPF\n", "fs = 48000 # Assummed sampling rate\n", "f3 = 1000 # Hz\n", "a = [1, -exp(-2*pi*f3/fs)]\n", "b = [1 - exp(-2*pi*f3/fs)]\n", "zi = signal.lfiltic(b,a,[0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# define a pass through, y = x, callback\n", "def callback(in_data, frame_length, time_info, status):\n", " global DSP_IO, b, a, zi\n", " DSP_IO.DSP_callback_tic()\n", " # convert byte data to ndarray\n", " in_data_nda = np.frombuffer(in_data, dtype=np.int16)\n", " #***********************************************\n", " # DSP operations here\n", " # Here we apply a linear filter to the input\n", " x = in_data_nda.astype(float32)\n", " y = x\n", " # Typically more DSP code here\n", " #y, zi = signal.lfilter(b,a,x,zi=zi) # for FIR or simple IIR\n", " #***********************************************\n", " # Save data for later analysis\n", " # accumulate a new frame of samples\n", " DSP_IO.DSP_capture_add_samples(y) \n", " #***********************************************\n", " # Convert from float back to int16\n", " y = y.astype(int16)\n", " DSP_IO.DSP_callback_toc()\n", " # Convert ndarray back to bytes\n", " #return (in_data_nda.tobytes(), pyaudio.paContinue)\n", " return y.tobytes(), pah.pyaudio.paContinue" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "DSP_IO = pah.DSPIOStream(callback,in_idx=0,out_idx=1,fs=48000,Tcapture=0)\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Index 0 device name = Built-in Microph, inputs = 2, outputs = 0\n", "\n", "Index 1 device name = Built-in Output, inputs = 0, outputs = 2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python \n", "DSP_IO.interactive_stream(Tsec=0,numChan=1) \n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Image(\"audio_files/start_stop_stream.png\", width='55%')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### iMic Noise Capture Gain Flatness\n", "With the iMic plugged in the input/output device indices can be reconfigured to use the iMic index for both the input output streams. The [Analog Discovery (AD2)](https://store.digilentinc.com/analog-discovery-2-100msps-usb-oscilloscope-logic-analyzer-and-variable-power-supply/) is then used to drive a white noise test signal into the ADC and capture the output from the DAC. This allows us to measure the ADC-DAC frequency response using a long-term time average spectral estimate capabilities of the AD2. A second test capture is to use `DSP_IO.DSP_capture_add_samples(y)` to capture the response of the ADC alone, and perform spectral analysis here in the Jupyter notebook. For this capture we set `Tcapture=20`s two cells above and `Tsec=20` one cell above. A comparison of the ADC-alone and ADC-DAC spectrum normalized to look like the frequency response is done in the cell below." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "f_AD,Mag_AD = loadtxt('audio_files/Loop_through_noise_SA_iMic.csv',\n", " delimiter=',',skiprows=6,unpack=True)\n", "plot(f_AD,Mag_AD-Mag_AD[100])\n", "ylim([-10,5])\n", "xlim([0,20e3])\n", "ylabel(r'ADC Gain Flatness (dB)')\n", "xlabel(r'Frequency (Hz)')\n", "legend((r'ADC-DAC from AD2 SA dB Avg',))\n", "title(r'Loop Through Gain Flatness using iMic at $f_s = 48$ kHz')\n", "grid();" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The callback stats when capturing data using `DSP_IO.DSP_capture_add_samples(y)` and a plot of the time domain samples." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "Nstop = 1000\n", "plot(arange(0,len(DSP_IO.data_capture[:Nstop]))/48000,DSP_IO.data_capture[:Nstop])\n", "DSP_IO.stream_stats()\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note for a attributes used in the above examples the `frame_length` is always 1024 samples and the sampling rate $f_s = 48$ ksps. The ideal callback period is this\n", "$$\n", " T_{cb} = \\frac{1024}{480100} = 21.33\\ \\text{(ms)}\n", "$$" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next consider what the captures `tic` and `toc` data revels about the processing. Calling the method `cb_active_plot()` produces a plot similar to what an electrical engineer would see what using a logic analyzer to show the time spent in an *interrupt service routine* of an *embedded system*. The latency is also evident. You expect to see a minimum latency of two frame lengths (input buffer fill and output buffer fill),e.g.,\n", "\n", "$$\n", " T_\\text{latency} >= 2\\times \\frac{1024}{48000} \\times 1000 = 42.6\\ \\text{(ms)}\n", "$$\n", "\n", "The host processor is multitasking, so the latency can be even greater. A true real-time DSP system would give the signal processing high priority and hence much lower is expected, particularly if the `frame_length` can be made small." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Real-Time Filtering\n", "Here we set up a `callback` function that filters the input samples and then sends them to the output. \n", "\n", "```python\n", "import pyaudio_helper as pah\n", "\n", "DSP_IO = pah.DSPIOStream(callback,in_idx, out_idx)\n", "DSP_IO.interactive_stream(2,1)\n", "```\n", "where `in_idx` is the index of the chosen input device found using `available_devices()` and similarly `out_idx` is the index of the chosen output device.\n", "\n", "* The `callback` function must be written first as the function name is used by the object to call the *callback*\n", "* To demonstrate this we first design some filters that can be used in testing" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "b = fir_d.fir_remez_bpf(2700,3200,4800,5300,.5,50,48000,18)\n", "a = [1]\n", "fir_d.freqz_resp_list([b],[1],'dB',48000)\n", "ylim([-60,5])\n", "grid();\n", "zi = signal.lfiltic(b,a,[0])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "f_AD,Mag_AD = loadtxt('audio_files/FIR_BPF_2700_3200_4800_5300_p5dB_50dB_48k.csv',\n", " delimiter=',',skiprows=6,unpack=True)\n", "plot(f_AD,Mag_AD-max(Mag_AD)+1)\n", "f = arange(0,20000,10)\n", "w,H_BPF = signal.freqz(b,1,2*pi*f/48000)\n", "plot(f,20*log10(abs(H_BPF)))\n", "ylabel(r'Gain (dB)')\n", "xlabel(r'Frequency (Hz)')\n", "legend((r'AD2 Noise Measured',r'Design Theory'))\n", "title(r'4 kHz 182-Tap FIR Bandpass Design at $f_s = 48$ kHz')\n", "ylim([-60,5])\n", "xlim([2000,8000])\n", "grid();" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Design an IIR Notch\n", "b, a = ss.fir_iir_notch(2000,48000,r= 0.9)\n", "fir_d.freqz_resp_list([b],[a],'dB',48000,4096)\n", "ylim([-60,5])\n", "grid();\n", "zi = signal.lfiltic(b,a,[0])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create some global variables for the filter coefficients and the filter state array (recall that a filter has memory)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# define callback (#2)\n", "def callback2(in_data, frame_count, time_info, status):\n", " global DSP_IO, b, a, zi\n", " DSP_IO.DSP_callback_tic()\n", " # convert byte data to ndarray\n", " in_data_nda = np.frombuffer(in_data, dtype=np.int16)\n", " #***********************************************\n", " # DSP operations here\n", " # Here we apply a linear filter to the input\n", " x = 5*in_data_nda.astype(float32)\n", " #y = x\n", " # The filter state/(memory), zi, must be maintained from frame-to-frame \n", " # for FIR or simple IIR\n", " y, zi = signal.lfilter(b,a,x,zi=zi) \n", " # for IIR use second-order sections\n", " #y, zi = signal.sosfilt(sos,x,zi=zi) \n", " #***********************************************\n", " # Save data for later analysis\n", " # accumulate a new frame of samples\n", " DSP_IO.DSP_capture_add_samples(y) \n", " #***********************************************\n", " # Convert from float back to int16\n", " y = y.astype(int16)\n", " DSP_IO.DSP_callback_toc()\n", " return y.tobytes(), pah.pyaudio.paContinue" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "DSP_IO = pah.DSPIOStream(callback2,2,2,fs=48000,Tcapture=0)\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "DSP_IO.interactive_stream(Tsec=0,numChan=1)\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Image(\"audio_files/start_stop_stream.png\", width='55%')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Playback Only Using an Audio Loop\n", "A playback audio loop is created using the `pah.loop_audio` class filled with samples input from a `wav` file. In the example below we take a two-channel (stereo) `wav` file and convert to one channel." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# define callback (3)\n", "# Here we configure the callback to play back a wav file \n", "def callback3(in_data, frame_count, time_info, status):\n", " global DSP_IO, x\n", " DSP_IO.DSP_callback_tic()\n", " \n", " # Ignore in_data when generating output only\n", " #***********************************************\n", " global x\n", " # Note wav is scaled to [-1,1] so need to rescale to int16\n", " y = 32767*x.get_samples(frame_count)\n", " # Perform real-time DSP here if desired\n", " #\n", " #***********************************************\n", " # Save data for later analysis\n", " # accumulate a new frame of samples\n", " DSP_IO.DSP_capture_add_samples(y)\n", " #***********************************************\n", " # Convert from float back to int16\n", " y = y.astype(int16)\n", " DSP_IO.DSP_callback_toc()\n", " return y.tobytes(), pah.pyaudio.paContinue" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "fs, x_wav2 = ss.from_wav('Music_Test.wav')\n", "x_wav = (x_wav2[:,0] + x_wav2[:,1])/2\n", "x = pah.loop_audio(x_wav)\n", "DSP_IO = pah.DSPIOStream(callback3,0,1,fs=44100,Tcapture=2)\n", "DSP_IO.interactive_stream(20) # play for 20s but capture only the last 2s\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Image(\"audio_files/start_stop_stream.png\", width='55%')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "Npts = 96000\n", "Nstart = 0\n", "plot(arange(len(DSP_IO.data_capture[Nstart:Nstart+Npts]))*1000/44100,\n", " DSP_IO.data_capture[Nstart:Nstart+Npts]/2**(16-1))\n", "title(r'A Portion of the capture buffer')\n", "ylabel(r'Normalized Amplitude')\n", "xlabel(r'Time (ms)')\n", "grid();\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Image(\"audio_files/music_buffer_plot.png\", width=\"75%\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, the spectrum of the output signal. To apply custom scaling we use a variation of `psd()` found in the `sigsys` module. If we are plotting the spectrum of *white* noise sent through a filter, the output PSD will be of the form $\\sigma_w^2|H(e^{j2\\pi f/f_s})|^2$, where $\\sigma_w^2$ is the variance of the noise driving the filter. You may choose to overlay a plot of" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Widgets Examples" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Stereo Gain Sliders" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "L_gain = widgets.FloatSlider(description = 'L Gain', \n", " continuous_update = True,\n", " value = 1.0,\n", " min = 0.0, \n", " max = 2.0, \n", " step = 0.01, \n", " orientation = 'vertical')\n", "R_gain = widgets.FloatSlider(description = 'R Gain', \n", " continuous_update = True,\n", " value = 1.0,\n", " min = 0.0, \n", " max = 2.0, \n", " step = 0.01, \n", " orientation = 'vertical')\n", "\n", "#widgets.HBox([L_gain, R_gain])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# L and Right Gain Sliders\n", "def callback(in_data, frame_count, time_info, status): \n", " global DSP_IO, L_gain, R_gain\n", " DSP_IO.DSP_callback_tic()\n", " # convert byte data to ndarray\n", " in_data_nda = np.frombuffer(in_data, dtype=np.int16)\n", " # separate left and right data\n", " x_left,x_right = DSP_IO.get_LR(in_data_nda.astype(float32))\n", " #***********************************************\n", " # DSP operations here\n", " y_left = x_left*L_gain.value\n", " y_right = x_right*R_gain.value\n", " \n", " #***********************************************\n", " # Pack left and right data together\n", " y = DSP_IO.pack_LR(y_left,y_right)\n", " # Typically more DSP code here \n", " #***********************************************\n", " # Save data for later analysis\n", " # accumulate a new frame of samples\n", " DSP_IO.DSP_capture_add_samples_stereo(y_left,y_right)\n", " #***********************************************\n", " # Convert from float back to int16\n", " y = y.astype(int16)\n", " DSP_IO.DSP_callback_toc()\n", " # Convert ndarray back to bytes\n", " #return (in_data_nda.tobytes(), pyaudio.paContinue)\n", " return y.tobytes(), pah.pyaudio.paContinue" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "DSP_IO = pah.DSPIOStream(callback,0,1,fs=48000,t_capture=0)\n", "start_stop_buttons = DSP_IO.interactive_stream(0,2)\n", "widgets.VBox([start_stop_buttons, widgets.HBox([L_gain, R_gain])])\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Image(\"audio_files/left_right_gain.png\", width=\"65%\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cross Panning" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "panning = widgets.FloatSlider(description = 'Panning (%)', \n", " continuous_update = True, # Continuous updates\n", " value = 50.0,\n", " min = 0.0, \n", " max = 100.0, \n", " step = 0.1, \n", " orientation = 'horizontal')\n", "#display(panning)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Panning\n", "def callback(in_data, frame_count, time_info, status):\n", " global DSP_IO, panning\n", " DSP_IO.DSP_callback_tic()\n", " # convert byte data to ndarray\n", " in_data_nda = np.frombuffer(in_data, dtype=np.int16)\n", " # separate left and right data\n", " x_left,x_right = DSP_IO.get_LR(in_data_nda.astype(float32))\n", " #***********************************************\n", " # DSP operations here\n", " y_left = (100-panning.value)/100*x_left \\\n", " + panning.value/100*x_right\n", " y_right = panning.value/100*x_left \\\n", " + (100-panning.value)/100*x_right\n", " \n", " #***********************************************\n", " # Pack left and right data together\n", " y = DSP_IO.pack_LR(y_left,y_right)\n", " # Typically more DSP code here \n", " #***********************************************\n", " # Save data for later analysis\n", " # accumulate a new frame of samples\n", " DSP_IO.DSP_capture_add_samples_stereo(y_left,y_right)\n", " #***********************************************\n", " # Convert from float back to int16\n", " y = y.astype(int16)\n", " DSP_IO.DSP_callback_toc()\n", " # Convert ndarray back to bytes\n", " #return (in_data_nda.tobytes(), pyaudio.paContinue)\n", " return y.tobytes(), pah.pyaudio.paContinue" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "FRAMES = 512\n", "# Create streaming object\n", "DSP_IO = pah.DSPIOStream(callback,0,1,\n", " fs=48000,\n", " frame_length = FRAMES,\n", " t_capture=0) \n", "\n", "# interactive_stream runs in a thread \n", "#so widget can be used\n", "start_stop_buttons = DSP_IO.interactive_stream(0,2)\n", "\n", "# display panning widget\n", "widgets.VBox([start_stop_buttons, panning])\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Image(\"audio_files/cross_panning.png\", width='55%')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Three Band Equalizer\n", "Here we consider a three-band equalizer operating on a music loop. Each peaking filter has system function in the $z$-domain defined by\n", "$$\n", "H_{pk}(z) = C_\\text{pk}\\frac{1 + b_1 z^{-1} + b_2 z^{-2}}{1 + a_1 z^{-1} + a_2 z^{-2}}\n", "$$\n", "\n", "where the filter coefficients are given by\n", "$$\\begin{align}\n", "C_\\text{pk} &= \\frac{1+k_q\\mu}{1+k_q}\\\\\n", " k_q &= \\frac{4}{1+\\mu} \\tan\\left(\\frac{2\\pi f_c/f_s}{2Q}\\right) \\\\\n", " b_1 &= \\frac{-2\\cos(2\\pi f_c/f_s)}{1+k_q\\mu} \\\\\n", " b_2 &= \\frac{1-k_q\\mu}{1+k_q\\mu} \\\\\n", " a_1 &= \\frac{-2\\cos(2\\pi f_c/f_s)}{1+k_q} \\\\\n", " a_2 &= \\frac{1 - k_q}{1+k_q}\n", "\\end{align}$$\n", "\n", "where\n", "$$\n", "\\mu = 10^{G_\\text{dB}/20},\\ \\ Q \\in [2, 10]\n", "$$\n", "\n", "and and $f_c$ is the center frequency in Hz relative to sampling rate $f_s$ in Hz, and $G_\\text{dB}$ is the peaking filter gain in dB. Conveniently, the function `peaking` is available in the module `sk_dsp_comm.sigsys`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "band1 = widgets.FloatSlider(description = '100 Hz', \n", " continuous_update = True, # Continuous updates\n", " value = 20.0,\n", " min = -20.0, \n", " max = 20.0, \n", " step = 1, \n", " orientation = 'vertical')\n", "band2 = widgets.FloatSlider(description = '1000 Hz', \n", " continuous_update = True, # Continuous updates\n", " value = 10.0,\n", " min = -20.0, \n", " max = 20.0, \n", " step = 1, \n", " orientation = 'vertical')\n", "band3 = widgets.FloatSlider(description = '8000 Hz', \n", " continuous_update = True, # Continuous updates\n", " value = -10.0,\n", " min = -20.0, \n", " max = 20.0, \n", " step = 1, \n", " orientation = 'vertical')\n", "\n", "Gain = widgets.FloatSlider(description = 'Gain', \n", " continuous_update = True,\n", " value = 0.2,\n", " min = 0.0, \n", " max = 2.0, \n", " step = 0.01, \n", " orientation = 'vertical')\n", "\n", "#widgets.HBox([Gain,band1,band2,band3])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "b_b1,a_b1 = ss.peaking(band1.value,100,Q=3.5,fs=48000)\n", "zi_b1 = signal.lfiltic(b_b1,a_b1,[0])\n", "b_b2,a_b2 = ss.peaking(band2.value,1000,Q=3.5,fs=48000)\n", "zi_b2 = signal.lfiltic(b_b2,a_b2,[0])\n", "b_b3,a_b3 = ss.peaking(band3.value,8000,Q=3.5,fs=48000)\n", "zi_b3 = signal.lfiltic(b_b3,a_b3,[0])\n", "b_12,a_12 = ss.cascade_filters(b_b1,a_b1,b_b2,a_b2)\n", "b_123,a_123 = ss.cascade_filters(b_12,a_12,b_b3,a_b3)\n", "f = logspace(log10(50),log10(10000),100)\n", "w,H_123 = signal.freqz(b_123,a_123,2*pi*f/48000)\n", "semilogx(f,20*log10(abs(H_123)))\n", "ylim([-20,20])\n", "ylabel(r'Gain (dB)')\n", "xlabel(r'Frequency (Hz)')\n", "grid();" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# define a pass through, y = x, callback\n", "def callback(in_data, frame_count, time_info, status):\n", " global DSP_IO, zi_b1,zi_b2,zi_b3, x\n", " global Gain, band1, band2, band3\n", " DSP_IO.DSP_callback_tic()\n", " # convert byte data to ndarray\n", " in_data_nda = np.frombuffer(in_data, dtype=np.int16)\n", " #***********************************************\n", " # DSP operations here\n", " # Here we apply a linear filter to the input\n", " #x = in_data_nda.astype(float32)\n", " x = Gain.value*20000*x_loop.get_samples(frame_count)\n", " # DSP code here\n", " b_b1,a_b1 = ss.peaking(band1.value,100,Q=3.5,fs=48000)\n", " z1, zi_b1 = signal.lfilter(b_b1,a_b1,x,zi=zi_b1) \n", " b_b2,a_b2 = ss.peaking(band2.value,1000,Q=3.5,fs=48000)\n", " z2, zi_b2 = signal.lfilter(b_b2,a_b2,z1,zi=zi_b2)\n", " b_b3,a_b3 = ss.peaking(band3.value,8000,Q=3.5,fs=48000)\n", " y, zi_b3 = signal.lfilter(b_b3,a_b3,z2,zi=zi_b3)\n", " #***********************************************\n", " # Save data for later analysis\n", " # accumulate a new frame of samples\n", " DSP_IO.DSP_capture_add_samples(y) \n", " #***********************************************\n", " # Convert from float back to int16\n", " y = y.astype(int16)\n", " DSP_IO.DSP_callback_toc()\n", " # Convert ndarray back to bytes\n", " #return (in_data_nda.tobytes(), pyaudio.paContinue)\n", " return y.tobytes(), pah.pyaudio.paContinue" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```python\n", "fs, x_wav2 = ss.from_wav('audio_files/Music_Test.wav')\n", "x_wav = (x_wav2[:,0] + x_wav2[:,1])/2\n", "x_loop = pah.loop_audio(x_wav)\n", "DSP_IO = pah.DSPIOStream(callback,0,1,fs=44100,t_capture=0)\n", "start_stop_buttons = DSP_IO.interactive_stream(0,1)\n", "widgets.VBox([start_stop_buttons, widgets.HBox([Gain,band1,band2,band3])])\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "Image(\"audio_files/three_band_widgets.png\", width=\"55%\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "f_AD,Mag_AD = loadtxt('audio_files/ThreeBand_Peak_100_p20_1k_p10_8k_m10_fs_48k.csv',\n", " delimiter=',',skiprows=6,unpack=True)\n", "semilogx(f_AD,Mag_AD+55)\n", "semilogx(f,20*log10(abs(H_123)))\n", "ylabel(r'Gain (dB)')\n", "xlabel(r'Frequency (Hz)')\n", "legend((r'AD2 Noise Measured',r'Design Theory'))\n", "title(r'Three Band Equalizer: $f_{center} = [100,1000,800]$, $Q = 3.5$')\n", "ylim([-20,20])\n", "xlim([50,10000])\n", "grid();" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.2" } }, "nbformat": 4, "nbformat_minor": 1 }