A lift tool was instrumented to measure the acceleration of the lower beam. A Nexus 6 android phone was used as the data acquisition device with the help of the Physics Toolbox app. Multiple lifts were performed. Acceleration in three axes and the total magnitude were recorded and uploaded to a cloud storage provider as a csv file.
We want to extract the spectral information from the acceleration time series. We use the discrete fast fourier transform (DFT or FFT) to compute the acceleration versus frequency.
import numpy as np
import pandas as pd
import datetime
import scipy
from pint import UnitRegistry
u = UnitRegistry()
import matplotlib
import matplotlib.pyplot as plt
import pylab
%matplotlib inline
%config InlineBackend.figure_format = 'svg'
acc = pd.read_csv('guardian_accelerometer_data_20160526.csv',
header=0,
names=['time',
'x',
'y',
'z',
'magnitude'],
dtype={'time':datetime.time,
'x':np.float64,
'y':np.float64,
'z':np.float64,
'magnitude':np.float64})
Fix time column
from dateutil.parser import parse
def munge_time(x):
return parse(':'.join(x.split(':')[:-1]))
acc['time'] = acc['time'].map(munge_time)
print(acc)
ax = acc.plot(x='time',
y='magnitude')
ax.set_ylabel("Acceleration (g)")
ax = acc.plot(x='time',
y='z')
ax.set_ylabel("Acceleration, z (g)")
Calculate sampling rate
print('Duration of test: {}'.format(acc['time'].iloc[-1] - acc['time'].iloc[0]))
print('Average time between samples: {}'.format((acc['time'].iloc[-1] - acc['time'].iloc[0])/len(acc)))
delta_t = (acc['time'].iloc[-1] - acc['time'].iloc[0])/len(acc)
print(len(acc))
time_fine = [acc['time'].iloc[0],]
for i in range(1,len(acc.time)):
time_fine.append(time_fine[-1] + delta_t)
acc['time_fine'] = time_fine
print(delta_t.microseconds/1.0e6)
T = (acc['time_fine'].iloc[-1] - acc['time_fine'].iloc[0]).seconds
print('Duration of test (seconds): {}'.format(T))
Filter to times of actual lifts. We filter the complete time history to extract the time history for each discrete lift which will be processed later.
lifts = {1:{}}
lifts[1]['acc'] = acc[(acc['time'] > '2016-05-30 9:50:00') & (acc['time'] < '2016-05-30 9:52:00')]
lifts[2] = {}
lifts[2]['acc'] = acc[(acc['time'] > '2016-05-30 9:56:00') & (acc['time'] < '2016-05-30 9:58:00')]
lifts[3] = {}
lifts[3]['acc'] = acc[(acc['time'] > '2016-05-30 10:00:00') & (acc['time'] < '2016-05-30 10:02:00')]
lifts[5] = {}
lifts[5]['acc'] = acc[(acc['time'] > '2016-05-30 10:15:00') & (acc['time'] < '2016-05-30 10:17:00')]
lifts[6] = {}
lifts[6]['acc'] = acc[(acc['time'] > '2016-05-30 10:19:00') & (acc['time'] < '2016-05-30 10:21:00')]
from scipy.signal import butter, lfilter, freqz
def butter_lowpass(cutoff, fs, order=5):
nyq = 0.5 * fs
normal_cutoff = cutoff / nyq
b, a = butter(order, normal_cutoff, btype='low', analog=False)
return b, a
def butter_lowpass_filter(data, cutoff, fs, order=5):
b, a = butter_lowpass(cutoff, fs, order=order)
y = lfilter(b, a, data)
return y
Filter requirements
order = 2
f_s = 1.0/(delta_t.microseconds/1.0e6) # sample rate, Hz
print('Sampling rate: {:0.2f} Hz'.format(f_s))
cutoff = 10 # desired cutoff frequency of the filter, Hz
Get the filter coefficients so we can check its frequency response.
b, a = butter_lowpass(cutoff, f_s, order)
Plot the frequency response.
w, h = freqz(b, a, worN=8000)
#plt.subplot(2, 1, 1)
plt.plot(0.5*f_s*w/np.pi, np.abs(h), 'b')
plt.plot(cutoff, 0.5*np.sqrt(2), 'ko')
plt.axvline(cutoff, color='k')
plt.xlim(0, 0.5*f_s)
plt.title("Lowpass Filter Frequency Response")
plt.xlabel('Frequency [Hz]')
plt.grid()
Filter the data, and plot both the original and filtered signals
y = butter_lowpass_filter(acc['magnitude'], cutoff, f_s, order)
DC component (~1g)
y.mean()
from matplotlib import dates
hfmt = dates.DateFormatter('%m/%d %H:%M')
ax = plt.subplot(1, 1, 1)
plt.plot(acc['time_fine'], y, 'g-', linewidth=2, label='filtered data')
plt.xlabel('Time')
plt.grid()
plt.legend()
ax.xaxis.set_major_locator(dates.MinuteLocator())
ax.xaxis.set_major_formatter(hfmt)
ax.set_ylim(bottom = 0)
plt.xticks(rotation='vertical')
plt.subplots_adjust(bottom=.3)
plt.subplots_adjust(hspace=0.35)
plt.show()
The following approach is borrowed from https://gist.github.com/jedludlow/3919130.
The FFT and a matching vector of frequencies
print(acc['magnitude'].mean())
Subtract the DC component (~1g)
# test
# Number of samplepoints
#N = 1024
# sampling frequency
#f_s = 750.0
# sample spacing
#T = 1.0 / 750
#x = np.linspace(0.0, N*T, N)
#y = 2*np.sin(75.0 * 2.0*np.pi*x)
#print(y)
#fft_x = np.fft.fft(y)
#actual
fft_x = np.fft.fft(acc['magnitude'] - acc['magnitude'].mean())
n = len(fft_x)
freq = np.fft.fftfreq(n, 1/f_s)
print(n)
print(freq)
print(max(fft_x))
print(min(fft_x))
print(max(np.abs(fft_x)))
print(min(np.abs(fft_x)))
plt.plot(np.abs(fft_x))
Note that frequencies in the FFT and the freq vector go from zero to some larger positive number then from a large negative number back toward zero. We can swap that so that the DC component is in the center of the vector while maintaining a two-sided spectrum.
fft_x_shifted = np.fft.fftshift(fft_x)
freq_shifted = np.fft.fftshift(freq)
plt.plot(freq_shifted, np.abs(fft_x_shifted))
plt.xlabel("Frequency (Hz)")
It's actually more common to look at just the first half of the unshifted FFT and frequency vectors and fold all the amplitude information into the positive frequencies. Furthermore, to get ampltude right, we must normalize by the length of the original FFT. Note the factor of $2/n$ in the following which accomplishes both the folding and scaling.
half_n = n // 2
print(half_n)
fft_x_half = (2 / n) * fft_x[:half_n]
freq_half = freq[:half_n]
print(np.abs(fft_x_half))
data = np.abs(fft_x_half)
plt.plot(freq_half, data)
plt.xlabel("Frequency (Hz)")
plt.ylabel("Amplitude")
from scipy import signal
len(freq_half)
np.argmax(freq_half > 20)
We can use the peakutils module to find the peaks in the FFT
import peakutils
peakind = peakutils.indexes(data, thres=0.4, min_dist=100)
print('Number of relative peaks: {}'.format(len(peakind)))
print(freq_half[peakind])
print(data[peakind])
The multi-step procedure described above can be compressed into 6 lines of code
fft_x_filt = np.fft.fft((y - y.mean()))
freq_filt = np.fft.fftfreq(n, 1/f_s)
fft_x_shifted_filt = np.fft.fftshift(fft_x_filt)
freq_shifted_filt = np.fft.fftshift(freq_filt)
fft_x_half_filt = (2 / n) * fft_x_filt[:half_n]
freq_half_filt = freq_filt[:half_n]
data_filt = np.abs(fft_x_half_filt)
plt.plot(freq_half_filt, data_filt)
plt.xlabel("Frequency (Hz)")
plt.ylabel("Amplitude")
plt.xlim([0,10])
len(freq_half_filt)
np.argmax(freq_half_filt > 20)
Find the peaks using the peakutils module
peakind_filt = peakutils.indexes(data_filt, thres=0.4, min_dist=100)
print('Number of relative peaks: {}'.format(len(peakind_filt)))
print(freq_half[peakind_filt])
print(data_filt[peakind_filt])
The above transforms operated on the entire duration of the experiment. We are not too interested in the acceleration between lifts. We thus window the data to extract the acceleration time series for each lift event.
from matplotlib import dates
hfmt = dates.DateFormatter('%m/%d %H:%M')
def plot_lift_acc(lift_dict):
ax = plt.subplot(1, 1, 1)
plt.plot(lift_dict['acc']['time_fine'], lift_dict['acc']['magnitude'], 'b-', label='data')
plt.xlabel('Time')
plt.grid()
plt.legend()
ax.xaxis.set_major_locator(dates.MinuteLocator())
ax.xaxis.set_major_formatter(hfmt)
ax.set_ylim(bottom = 0)
plt.xticks(rotation='vertical')
plt.subplots_adjust(bottom=.3)
plt.subplots_adjust(hspace=0.35)
plt.show()
def plot_lift_acc_filt(lift_dict):
ax = plt.subplot(1, 1, 1)
plt.plot(lift_dict['acc']['time_fine'], lift_dict['y'], 'g-', linewidth=2, label='filtered data')
plt.xlabel('Time')
plt.grid()
plt.legend()
ax.xaxis.set_major_locator(dates.MinuteLocator())
ax.xaxis.set_major_formatter(hfmt)
ax.set_ylim(bottom = 0.8)
plt.xticks(rotation='vertical')
plt.subplots_adjust(bottom=.3)
plt.subplots_adjust(hspace=0.35)
plt.show()
def calc_lift_fft(lift_dict):
y = lift_dict['y'] - lift_dict['y'].mean()
n_lift = len(y)
w = blackman(n_lift)
fft_lift_filt = np.fft.fft(y)
half_n_lift = n_lift // 2
freq_lift_filt = np.fft.fftfreq(n_lift, 1/f_s)
fft_lift_shifted_filt = np.fft.fftshift(fft_lift_filt)
freq_lift_shifted_filt = np.fft.fftshift(freq_lift_filt)
lift_dict['fft_lift_half_filt'] = (2 / n_lift) * fft_lift_filt[:half_n_lift]
lift_dict['freq_lift_half_filt'] = freq_lift_filt[:half_n_lift]
def plot_lift_fft(lift_dict):
plt.plot(lift_dict['freq_lift_half_filt'], np.abs(lift_dict['fft_lift_half_filt']))
plt.xlabel("Frequency (Hz)")
plt.ylabel("Amplitude")
plt.xlim([0,15])
plt.show()
def process_lift(lift_dict):
lift_dict['y'] = butter_lowpass_filter(lift_dict['acc']['magnitude'], cutoff, f_s, order)
print('DC component of acceleration: {:0.2f}'.format(lift_dict['y'].mean()))
plot_lift_acc(lift_dict)
plot_lift_acc_filt(lift_dict)
calc_lift_fft(lift_dict)
plot_lift_fft(lift_dict)
lift_dict['peak_indices'] = peakutils.indexes(np.abs(lift_dict['fft_lift_half_filt']),
thres=0.4,
min_dist=100)
print('Number of relative peaks: {}'.format(len(lift_dict['peak_indices'])))
frequencies = ','.join(['{:0.2f}'.format(f) for f in
lift_dict['freq_lift_half_filt'][lift_dict['peak_indices']]])
print('{}'.format(frequencies))
process_lift(lifts[1])
process_lift(lifts[2])
process_lift(lifts[3])
process_lift(lifts[5])
process_lift(lifts[6])