Moving Average using Discrete Convolution
The overview (not so mathsy)
The objective for me started off very much looking at a problem of anomaly detection, where I was looking for anomalous data points in a time series. I stumbled across a blog giving a nice implementation using moving averages , the blog is really nice and the code is pretty. My two niggles was the implementation of Discrete Convolution was not clearly explained . In addition, the implementation of convolution within numpy is not so clear.
Basically what we are doing is computing a moving average so for example taking the first 5 points of a data set averaging them and moving onto the next part of the data set and averaging them. Now you could achieve this in part with a windowing function in SQL or Pandas (I have put this at the bottom). But these are pretty computationally heavy and really not the cleanest implementation. In essence what you are wanting is an output of a curve which is clean and free of noise, hence, it is good to look to signal processing, especially for time series, moving averages often provide a nice filtering method. You want a filtered output curve, over the same time series and you want to be able to capture break aways from the trend.
So, how do we get this filtered curve, well we need to multiply it by another curve and that is what we are doing in convolution, we are multiplying two curves together then adding the result. What we have is an averaging curve (a square curve), where we have an area of 1 and an amplitude of 1/window_size. This is a simple Low Pass filter in signal processing. What is nice is here we have a square function so each point in the window has the same impact on the average for that day. so in simpler terms if we wanted to create a moving average for Monday in our case if we set a window size of 7 we would take each point from the previous week add it together and divide it by the window size (a normal arithmetic average). This is also the equivalent of taking each point and multiplying it by the amplitude of our input curve 1/window_size (point-wise multiplication). Now this means that each of the last 7 days will have an equal weighting on the average (the amplitude is the same). Though we don’t need to do this we could put any distribution or wave in, for example a Gaussian will give the highest weighting to the points very close to the day we are averaging, and diminishing weights to points further away. There could be a case for having a much larger window size which still has influence from points further away from the middle ‘actual’ point. Essentially the way I think about it is we want to create a curve which is representative of our data but we know that there is noise (anomalous data) which does not adhere to the expected values. Now we are saying what should today’s value be well depending on our window size we would say the estimated value for today would have some impact from the previous days and possibly from the forward day. A moving averages objective is to hopefully capture the underlying trend in the data. Now what I did was to estimate the anomalous point I used my average ‘filtered’ curve as the predicted curve, and then I took the difference between the actual and the average curved. This give me a residual, now in my case the residuals were approximately normally distributed. The idea here is if I had a very large window size my moving average would just tend to the overall mean for the data, by using a smaller window, you are able to pick up more transient trends, such as more bookings in one week vs the next or general trend of increasing or decreasing. Now what I wanted to do is I have a deviation for each point in my data set, now if there is a small residual it means that I have a pretty good confidence regarding that point being representative. However, as my residual increases there is a greater chance that my actual point could be anomalous. Now what I mean by anomalous does not mean it is noise or that it is wrong (it could be) but it could also be temporal events, such as imagine cinema bookings could peak for the opening weekend of the latest Star Wars movie and could be a one off event every two years, and well given the box office success could easily be a good number of standard deviations from the mean. So at the end the point of picking up anomalous points is to create a slightly smarter filter, to narrow down points to investigate possibly manually or to develop another model to treat. This saves computation resources narrowing down the data set you need to analyse. Now we have our residuals, and like I said above my errors were pretty normally distributed, though this should be checked, now I liked two methods, one used was simply plot a histogram of the residuals the second is using the probplot. Now this basically creates a normal distribution for your data and basically for each of your points plots it against it’s corresponding value on a normal distribution. Therefore, if you have a perfectly normal distribution then you should have a straight line. What is great especially with the second method is that you can pretty easily see anomalous points. For example on my data set we see that we have about 6 points clearly off the curve. Now this plot will very much vary depending on the window size.
Testing Points - Anomalous or Not!
So now I see the points that I know my points are clearly off a certain normal trend, then how do I test if they are anomalous. Well we need to devise the test, now there are a bunch of tests out there but I followed the same suit as the Twitter guys using the Extreme student deviation (ESD) test. Now I will go into a bit more detail regarding why you should and can use them. This is definitely not complete and there are plenty of other methods that can be used. I have also given reference to some other great resources.
Now the origin of ESD is the Grubbs’ test. Now the Grubbs’ test is often used in experimental science to identify and remove anomalous data points. What it is essentially the same as a T-Test but successively testing each point, calculating a T-score and removing the point with the highest score, so long as it is above the critical T-score.
Now something unfortunately that was not obvious to me straight away when I first implemented the Grubbs test, was why not use 2 standard deviations as the cut off. Seeing as we are dealing with an approximately normal distribution, then I know that 95% of values is within 2 standard deviations fo the mean, therefore any points more than 2 standard deviations above the mean, could be removed. Well this isn’t quite the case the point is that we are approximately normal, though we have anomalous points which means that they don’t necessarily sit within what we would say is a normal deviation and if they are anomalous well then they should be removed which will in turn affect the mean and standard deviation of the whole data set. Although in many examples you will find by putting 2 standard deviations will generally act as a quick cutoff for points which are anomalous, it could be inaccurate as the mean is heavily affected by outliers and the larger their magnitude the greater the affect.
Now what we are going to do is we are going to calculate a T-score for each of the
A bit of theory
So this is the slightly tricky bit, when you look at the source code, you basically stumble upon lots of notes but actually only two lines of code convolve runs the correlation function which then runs a C library to actually achieve the result.
Now there are three possible options for the mode. This depends on how the windowing function is achieved, and actually quite important when it comes to dealing with boundary conditions.
‘full’ : This is as per a normal moving average.
‘same’ : This only begins to compute the average after more than half the sliding window intersects the array.
‘valid’ : This only begins to compute once the whole window is intersecting with the data.
import numpy as np import pandas as pd import matplotlib.pyplot as plt from scipy import stats %matplotlib inline def moving_average(f_t,lag): '''A moving average calculation (low pass filter) A moving average is calculated using discrete linear convolution The 'same' option is used for convolution method, this means that the window of averaging must intersect with data points with a length of >len(lag)/2. This improves dealing with boundary issues. Parameters ---------- f_t : np.array Data to calculate moving average lag : int Window size to average data points over ''' g_t = np.ones(int(lag))/float(lag) # Deal with boundaries with atleast lag/2 day window mode = 'same' average_y = np.convolve(f_t,g_t,mode) return average_y def esd_test(df_in,max_outliers=10,alpha=0.05,show_plot=True): '''Implementation of Generalized ESD test for Outliers An extension to Grubbs test to k unknown outliers, all that requires specified is the maximum number of outliers and the confidence interval. The data must be approximately normal to apply the test. From Rosner, 1983 . Parameters ---------- x : list Data to be tested for outliers, must be approximately normal max_outliers : int Maximum number of outliers to search for alpha : float Significance level show_plot : bool This shows the plot of the critical value and test statistic against number of anomalies removed, showing based on an alpha the number of anomalies to be removed to be confident of the data. Attributes ---------- ESD_stats : Pandas.DataFrame Dataframe containing the ESD test statistic and Critical value. outliers : list List containing tuple of dataframe index of outlier & the x value found to be anomolous. References ----------  http://www.itl.nist.gov/div898/handbook/eda/section3/eda35h3.htm ''' ind = list(df_in.index) x = list(df_in.values) outliers =  res_lst =  # ESD Test Statistic for each k anomaly lam_lst =  # Critical Value for each k anomaly n = len(x) for i in range(1,max_outliers+1): x_mean = np.mean(x) x_std = np.std(x,ddof=1) res = abs((x - x_mean) / x_std) max_res = np.max(res) max_ind = np.argmax(res) p = 1 - alpha / (2*(n-i+1)) t_v = stats.t.ppf(p,(n-i-1)) # Get critical values from t-distribution based on p and n lam_i = ((n-i)*t_v)/ np.sqrt((n-i-1+t_v**2)*(n-i+1)) # Calculate critical region (lambdas) res_lst.append(max_res) lam_lst.append(lam_i) if max_res > lam_i: outliers.append((ind.pop(max_ind),x.pop(max_ind))) ESD_stats = pd.DataFrame() ESD_stats['ESD Test Statistic'] = res_lst ESD_stats['Critical Value'] = lam_lst # Plot will show the point of intersection between critical value # and ESD test statistic if show_plot: df.plot() return [ESD_stats,outliers]
https://www.datascience.com/blog/python-anomaly-detection http://www.analog.com/media/en/technical-documentation/dsp-book/dsp_book_Ch15.pdf https://en.wikipedia.org/wiki/Convolution https://en.wikipedia.org/wiki/Mandelbrot_set https://docs.scipy.org/doc/numpy/reference/generated/numpy.convolve.html https://github.com/numpy/numpy/blob/v1.13.0/numpy/core/numeric.py#L978-L1073