Removing sharp peaks

This example shows how the peak removal function can be used to remove sharp, short peaks from a signal, e.g. from neutron/gamma noise in an APDCAM signal.[1]

Note that the implemented peak removal algorithm assumes that the duration (~ 5x time constant) of the peaks is a few dozen samples, their total duration is at most a few percent of the signal duration and their height is always positive and such that the resulting signal derivative is significantly larger than that of anything important in the underlying signal.

from copy import deepcopy

import numpy as np
import matplotlib.pyplot as plt

import flap
import flap.testdata
Warning: could not read configuration file 'flap_defaults.cfg'.
Default location of configuration file is working directory.
INIT flap storage

Generating the test data

flap.testdata.register()

Pure signals

d = flap.get_data('TESTDATA',
                  name='TEST-[1-3]-1',
                  options={'Scaling':'Volt'},
                  coordinates={'Time':[0,0.005]},
                 )
dt = d.get_coordinate_object('Time').step[0]
flap.list_data_objects(d)
-----------------------------
<1>(data_source:"TESTDATA" exp_id:"") data_title:"Test data" shape:[3,5001][no error]
  Data name:"Signal", unit:"Volt"
  Coords:
Time [Second](Dims:1) [<Equ.><R. symm.>] Start:  0.000E+00, Steps:  1.000E-06
Sample [a.u.](Dims:1) [<Equ.><R. symm.>] Start:  0.000E+00, Steps:  1.000E+00
Signal name [a.u.](Dims:0), Shape:[3]) [<R. symm.>] Val:TEST-1-1, TEST-2-1, TEST-3-1
Column [a.u.](Dims:0), Shape:[3]) [<R. symm.>] Val:1, 2, 3
Row [a.u.](Dims:0), Shape:[3]) [<R. symm.>] Val:1, 1, 1
'\n-----------------------------\n<1>(data_source:"TESTDATA" exp_id:"") data_title:"Test data" shape:[3,5001][no error]\n  Data name:"Signal", unit:"Volt"\n  Coords:\nTime [Second](Dims:1) [<Equ.><R. symm.>] Start:  0.000E+00, Steps:  1.000E-06\nSample [a.u.](Dims:1) [<Equ.><R. symm.>] Start:  0.000E+00, Steps:  1.000E+00\nSignal name [a.u.](Dims:0), Shape:[3]) [<R. symm.>] Val:TEST-1-1, TEST-2-1, TEST-3-1\nColumn [a.u.](Dims:0), Shape:[3]) [<R. symm.>] Val:1, 2, 3\nRow [a.u.](Dims:0), Shape:[3]) [<R. symm.>] Val:1, 1, 1'

Add noise

Adding some white noise and the peaks that will be filtered

from copy import deepcopy
np.random.seed(12345) # fix the random seed for this example
# Create noisy test dataset
d_noisy = deepcopy(d)
# Add some white noise
white_noise = np.random.random(d.data.shape)*0.025
d_noisy.data += white_noise

In this example, peaks are modelled as an impulse response of a first-order system with randomized amplitude, with additional clipping to model the saturation of a sensor. Their occurrence is also randomized in this example.

def exponential_impulse_response(t, t0, tau, A):
    # t: time array to use for sampling
    # t0: time of impulse (Dirac delta) input
    # tau: time constant
    # A: amplitude
    out = np.zeros_like(t, dtype='float')

    active = (t >= t0)
    out[active] = A * np.exp(-1/tau * (t-t0)[active])

    return out
peaks = np.zeros_like(d.data)
times = d.coordinate('Time')[0]

# Number of peaks to generate
N_peaks = 10

# Time constant of peaks
tau = 1/5e5

for i in range(len(times)):
    for n in range(N_peaks):
        A = np.random.random()*2.0+0.5
        t0 = np.random.random()*times[i].max()
        peaks[i, :] += np.clip(exponential_impulse_response(times[i], t0, tau, A), a_min=-np.inf, a_max=A/2)

# Add the sharp peaks
d_noisy.data += peaks

The signal to be filtered:

plt.figure()
d_noisy.plot(axes=['Time'], plot_type='multi xy')
plt.show()
../../_images/1ae6fbf1baaf990e5491138eb35eb03da2fcf11dcc7cd46030a35e268f04c263.png

Zoom in on a single peak:

plt.figure()
d_noisy.slice_data({'Signal name' : 'TEST-1-1', 'Time' : flap.Intervals(0.00015, 0.00020)}).plot()
plt.show()
../../_images/48cb849d8387c34aea994a464972236ba27b352b64df02401c6bce254dbf59cb.png

Removing the peaks

With infinitely fast sampling, the peak width would be about 5 time constants:

int(5*tau / dt)
10

However, the sampling time and the time constant are close, so max_width_samples has to be increased a bit.

The value of diff_limit (measured in units of data unit / time) depends on both the peaks and the underlying value of the noise and the signal as well.

In a practical situation, these two parameters should be calibrated from measurements.

Options for peak removal follow the standard FLAP option style (flap_defaults.cfg can also be used):

peak_removal_options = {
    'diff_limit' : 25000,
    'max_width_samples' : 12,
    'return_num_detected_peaks' : True,
}

Remove the peaks with the given denoising parameters:

d_removed = d_noisy.remove_sharp_peaks(options=peak_removal_options)

As return_num_detected_peaks is set to True, we can check how many peaks were removed:

for key, value in d_removed.info.items():
    print(f'{key}: {value}')
num_peak_samples_detected: [94 88 89]
num_peaks_detected: [10 10 10]
num_peak_samples_removed: [94 88 89]
num_peaks_removed: [10 10 10]

10 peaks were removed from every signal. (The samples inbetween were interpolated.)

Check the results:

plt.figure()
d_removed.plot(axes=['Time'], plot_type='multi xy')
plt.show()
../../_images/723c457e78c5464aba7692c1398492a937c2f40424776f9e537584cd7c6abe10.png

Advanced functionality

Now, we do not want to remove anything, but would like to analyze the detected peaks.

peak_removal_options2 = {
    'diff_limit' : 25000,
    'max_width_samples' : 12,
    'remove_peaks' : False,
    'return_num_detected_peaks' : True,
    'return_loc_detected_samples' : True,
    'return_peak_shapes' : True,
}
d_removed_2, peak_widths_array, peak_amplitudes_array = d_noisy.remove_sharp_peaks(options=peak_removal_options2)

Peak locations

As return_loc_detected_samples is True, an additional coordinate named ‘Peak detected’ is added to the returned data object, which contains whether the associated sample has been detected to be a part of a sample:

flap.list_data_objects(d_removed_2)
-----------------------------
<1>(data_source:"TESTDATA" exp_id:"") data_title:"Test data" shape:[3,5001][no error]
  Data name:"Signal", unit:"Volt"
  Coords:
Time [Second](Dims:1) [<Equ.><R. symm.>] Start:  0.000E+00, Steps:  1.000E-06
Sample [a.u.](Dims:1) [<Equ.><R. symm.>] Start:  0.000E+00, Steps:  1.000E+00
Signal name [a.u.](Dims:0), Shape:[3]) [<R. symm.>] Val:TEST-1-1, TEST-2-1, TEST-3-1
Column [a.u.](Dims:0), Shape:[3]) [<R. symm.>] Val:1, 2, 3
Row [a.u.](Dims:0), Shape:[3]) [<R. symm.>] Val:1, 1, 1
Peak detected [Bool](Dims:0,1), Shape:[3,5001]) [<R. symm.>] Val. range:  0.000E+00 -  1.000E+00
'\n-----------------------------\n<1>(data_source:"TESTDATA" exp_id:"") data_title:"Test data" shape:[3,5001][no error]\n  Data name:"Signal", unit:"Volt"\n  Coords:\nTime [Second](Dims:1) [<Equ.><R. symm.>] Start:  0.000E+00, Steps:  1.000E-06\nSample [a.u.](Dims:1) [<Equ.><R. symm.>] Start:  0.000E+00, Steps:  1.000E+00\nSignal name [a.u.](Dims:0), Shape:[3]) [<R. symm.>] Val:TEST-1-1, TEST-2-1, TEST-3-1\nColumn [a.u.](Dims:0), Shape:[3]) [<R. symm.>] Val:1, 2, 3\nRow [a.u.](Dims:0), Shape:[3]) [<R. symm.>] Val:1, 1, 1\nPeak detected [Bool](Dims:0,1), Shape:[3,5001]) [<R. symm.>] Val. range:  0.000E+00 -  1.000E+00'
peak_detected_0 = np.asarray(d_removed_2.coordinate('Peak detected')[0][0], dtype='bool')

plt.figure()

plt.plot(d_removed_2.coordinate('Time')[0][0], d_removed_2.data[0], label='noisy signal')
plt.scatter(d_removed_2.coordinate('Time')[0][0][peak_detected_0], d_removed_2.data[0][peak_detected_0], s=4, c='r', zorder=100, label='peak samples detected')

plt.legend()
plt.xlabel('Time [Second]')
plt.ylabel('Signal [Volt]')

plt.show()
../../_images/87c722ac082fa9f8ee05e3d4e5941c36df52a3e8f99647bf547de633cda9b53b.png

Peak shapes

The two arrays additional arrays returned due to setting return_peak_shapes have a shape similar to the data in the original data object:

print(peak_amplitudes_array.shape)
(3,)
print(peak_widths_array.shape)
(3,)

They contain separate data objects for each signal, with the time coordinates being the center times of the detected peaks:

for i in range(len(peak_amplitudes_array)):
    flap.list_data_objects(peak_amplitudes_array[i])
    print()
-----------------------------
<1>(data_source:"" exp_id:"") data_title:"" shape:[10][no error]
  Data name:"Signal", unit:"Volt"
  Coords:
Time [Second](Dims:0), Shape:[10]) [<R. symm.>] Val: 1.745E-04,  6.445E-04,  1.301E-03,  1.326E-03,  1.896E-03,  2.220E-03,  3.079E-03,  3.593E-03,  4.490E-03,  4.586E-03


-----------------------------
<1>(data_source:"" exp_id:"") data_title:"" shape:[10][no error]
  Data name:"Signal", unit:"Volt"
  Coords:
Time [Second](Dims:0), Shape:[10]) [<R. symm.>] Val: 2.935E-04,  9.165E-04,  1.167E-03,  1.396E-03,  1.667E-03,  2.080E-03,  2.535E-03,  2.759E-03,  3.943E-03,  4.784E-03


-----------------------------
<1>(data_source:"" exp_id:"") data_title:"" shape:[10][no error]
  Data name:"Signal", unit:"Volt"
  Coords:
Time [Second](Dims:0), Shape:[10]) [<R. symm.>] Val: 1.420E-04,  2.875E-04,  3.995E-04,  2.334E-03,  2.515E-03,  2.873E-03,  3.106E-03,  3.363E-03,  3.559E-03,  4.756E-03
for i in range(len(peak_amplitudes_array)):
    flap.list_data_objects(peak_widths_array[i])
    print()
-----------------------------
<1>(data_source:"" exp_id:"") data_title:"" shape:[10][no error]
  Data name:"Samples", unit:"[n.a.]"
  Coords:
Time [Second](Dims:0), Shape:[10]) [<R. symm.>] Val: 1.745E-04,  6.445E-04,  1.301E-03,  1.326E-03,  1.896E-03,  2.220E-03,  3.079E-03,  3.593E-03,  4.490E-03,  4.586E-03


-----------------------------
<1>(data_source:"" exp_id:"") data_title:"" shape:[10][no error]
  Data name:"Samples", unit:"[n.a.]"
  Coords:
Time [Second](Dims:0), Shape:[10]) [<R. symm.>] Val: 2.935E-04,  9.165E-04,  1.167E-03,  1.396E-03,  1.667E-03,  2.080E-03,  2.535E-03,  2.759E-03,  3.943E-03,  4.784E-03


-----------------------------
<1>(data_source:"" exp_id:"") data_title:"" shape:[10][no error]
  Data name:"Samples", unit:"[n.a.]"
  Coords:
Time [Second](Dims:0), Shape:[10]) [<R. symm.>] Val: 1.420E-04,  2.875E-04,  3.995E-04,  2.334E-03,  2.515E-03,  2.873E-03,  3.106E-03,  3.363E-03,  3.559E-03,  4.756E-03

The amplitude array contains the height of the peaks relative to the underlying signal:

plt.figure()

d_removed_2.slice_data({'Signal name' : 'TEST-1-1'}).plot()
peak_amplitudes_array[0].plot(plot_type='scatter')

plt.show()
../../_images/28a05c8938b53da0c1c515e95455682e9075722f16226adb62da5a655d388222.png

These arrays can be used to generate various statistics on the peaks:

peak_amplitudes = np.array([])

for d_i in peak_amplitudes_array.flat:
    peak_amplitudes = np.concatenate((peak_amplitudes, d_i.data))

plt.figure()
plt.hist(peak_amplitudes)
plt.xlabel('Peak amplitude size (data unit)')
plt.show()
../../_images/e3ba13011bbdb894d890d3d3c3f75ccd1b2ceaa558127001e1bfafd0c46148a2.png
peak_widths = np.array([])

for d_i in peak_widths_array.flat:
    peak_widths = np.concatenate((peak_widths, d_i.data))

plt.figure()
plt.hist(peak_widths)
plt.xlabel('Peak width size (samples)')
plt.show()
../../_images/0e555aea3df02bf54fa75e9eebb4e098bed9c9b97318320072ba741508a191bb.png

Analyze the effect of diff_limit on the number of samples removed:

diff_limits = np.linspace(5000, 50000, 30)

removed_peak_samples_number = []

for diff_limit_i in diff_limits:
    peak_removal_options_i = {
        'diff_limit' : diff_limit_i,
        'max_width_samples' : 12,
        'remove_peaks' : False,
        'return_num_detected_peaks' : True,
    }
    
    d_removed_i = d_noisy.remove_sharp_peaks(options=peak_removal_options_i)

    removed_peak_samples_number.append(np.sum(d_removed_i.info['num_peak_samples_detected']))

plt.figure()
plt.plot(diff_limits, removed_peak_samples_number, marker='o')
plt.yscale('log')

plt.xlabel('diff_limit [data unit / time]')
plt.ylabel('Number of samples removed as peak [1]')

plt.grid()

plt.show()
../../_images/126f81286d9701c727d1edbcf3a3ebe345f80bb34d516e6c7cf76655008311fe.png

Effect of sampling and phase shift on detected peak duration

If a an impulse occurs with a phase shift relative to the sampling time instants, the sampled response will have a different observed duration depending on the (sampling time) / (time constant) ratio, as well as the phase shift. The following two graphs illustrate this effect.

tau = 1

dt_n = np.linspace(0.005*tau, 2*tau, 50)
phase_shift_over_dt_n = np.linspace(-0.5, 0.5, 50)
A_test = 1

A_limit = A_test * np.exp(-5)  # 5 time constants exponential decay

num_samples = np.full((len(phase_shift_over_dt_n), len(dt_n)), np.nan)

for i in range(len(phase_shift_over_dt_n)):
    for j in range(len(dt_n)):
        dt = dt_n[j]
        times = np.arange(0, 10*tau, step=dt)
        num_samples[i, j] = np.sum(exponential_impulse_response(times, dt + phase_shift_over_dt_n[i] * dt, tau, A_test) >= A_limit) * dt / tau

# remove upper corner plot
fig = plt.figure(figsize=(7,7), constrained_layout=True)

axcorner = plt.subplot2grid((3, 3), (0, 2))
axcorner.axis('off')

axmain = plt.subplot2grid((3, 3), (1, 0), colspan=2, rowspan=2)

psodtn_step = phase_shift_over_dt_n[1] - phase_shift_over_dt_n[0]
dtn_step = dt_n[1] - dt_n[0]

m = axmain.imshow(
    num_samples.T,
    extent=(
        np.min(phase_shift_over_dt_n) - psodtn_step / 2,
        np.max(phase_shift_over_dt_n) + psodtn_step / 2,
        np.min(dt_n) - dtn_step / 2,
        np.max(dt_n) + dtn_step / 2,
    ),
    aspect='auto',
    origin='lower',
    interpolation='none',
)

axmain.set_xlabel('phase shift / sampling time')
axmain.set_ylabel('sampling time / time constant')

fig.colorbar(
    m,
    ax=axmain,
    label='Sampled peak duration\n(# of sampled peak values * sampling time / time constant)',
    orientation='horizontal',
)

vertMean = np.mean(num_samples, axis=1)
vertStd = np.std(num_samples, axis=1)
axtop = plt.subplot2grid((3, 3), (0, 0), colspan=2, sharex=axmain)
axtop.fill_between(phase_shift_over_dt_n, vertMean - vertStd, vertMean + vertStd, alpha=0.5)
axtop.plot(phase_shift_over_dt_n, vertMean)
    
axtop.label_outer()
axtop.set_ylim(4, 6)
axtop.set_yticks(np.arange(4, 6+0.1, 0.5))
axtop.set_ylabel('Sampled peak duration')
axtop.grid()

horzMean = np.mean(num_samples, axis=0)
horzStd = np.std(num_samples, axis=0)
axright = plt.subplot2grid((3, 3), (1, 2), rowspan=2, sharey=axmain)
axright.fill_betweenx(dt_n, horzMean - horzStd, horzMean + horzStd, alpha=0.5)
axright.plot(horzMean, dt_n)

axright.label_outer()
axright.set_xlim(axtop.get_ylim())
axright.set_xticks(axtop.get_yticks())
axright.set_xlabel('Sampled peak duration')
axright.grid()

plt.suptitle('Effect of phase shift and sampling time on sampled peak duration')
plt.show()
../../_images/fb83afb8f78d930fb538c1bdaf76b1247b926d47e1a7b754703411530671d541.png

An illustrative case corresponding to the top right region of the above map:

dt = 2
tau = 1

phase_shift_over_dt = +0.3

times = np.arange(0, 10*tau, step=dt)
pulse_sampled = exponential_impulse_response(times, dt + phase_shift_over_dt * dt, tau, 1)
detected = pulse_sampled >= np.exp(-5)

times_ref = np.arange(0, 10*tau, step=dt/100)
pulse_ref = exponential_impulse_response(times_ref, dt + phase_shift_over_dt * dt, tau, 1)

plt.figure()

plt.plot(
    times_ref,
    pulse_ref,
    zorder=-2,
)

plt.scatter(
    times[detected],
    pulse_sampled[detected],
    c='red',
    label='sampled values over threshold',
)

plt.scatter(
    times[~detected],
    pulse_sampled[~detected],
    c='gray',
    label='sampled values below threshold',
)

plt.hlines(np.exp(-5), times.min(), times.max(), ls='--', color='k')

plt.xlabel('Time')
plt.ylabel('Signal value')

plt.legend()

plt.show()
../../_images/61ab5f250dff016b3a8b1bc173ebad03833d142001a463f3a928503b3cbc6e8d.png