When doing pattern analysis with time series, a question that can come up is:
How similar are two patterns?
Obviously there can be multiple answers to this question and in typical Finance fashion we will construct a metric to provide us with an answer.
Let
and
be 2 sets of prices.
We can then take their percentage returns
and
.
Now we can consider the Euclidean Distance between the 2 series as a metric of similarity for the original time series of prices.
This metric is quite versatile. Notice we get the percentage returns instead of log returns or just differencing the time series. This is done to eliminate the difference in pricing scales that often occurs between time series so we can have a robust tool to analyse lots of different data and compare them to each other.
Below is the implementation of such an idea in Python code.
import pandas as pd
import multiprocessing
import math
from joblib import Parallel, delayed
import itertools
from tqdm import tqdm
import numpy as np
class SimilarPatterns:
def __init__(self, dataset, price_col, date_col, step_range, test_pattern):
self.dataset = dataset
self.price_col = price_col
self.date_col = date_col
self.step_range = step_range
self.test_pattern = test_pattern
self.build_returns_col()
self.pattern_database = self.build_pattern_database()
self.ranked_patterns = self.pattern_rank()
self.df = self.top100_rank()
@staticmethod
def dist(x,y):
x = np.nan_to_num(x, copy=True, nan=0.0)
y = np.nan_to_num(y, copy=True, nan=0.0)
if len(x) > len(y):
y_zero_pad = np.append(y, [0 for i in range( len(x) - len(y) )],
axis=0)
D = np.linalg.norm(x-y_zero_pad)
elif len(y) > len(x):
x_zero_pad = np.append(x, [0 for i in range( len(y) - len(x) )],
axis=0)
D = np.linalg.norm(x_zero_pad-y)
else:
D = np.linalg.norm(x-y)
return D
@staticmethod
def sliding_window(df, step):
patterns = []
for i in range(len(df)):
if i < len(df) - step:
patterns.append( df[i:i+step] )
return patterns
@staticmethod
def returns_col(df, price_col):
df['returns'] = df[price_col].pct_change().fillna(method='bfill')
return df
@staticmethod
def marked_pattern(df, df_pattern, date_col):
pattern_mark = []
df_dates = [dt[:10] for dt in df[date_col]]
df_pattern_dates = [dt[:10] for dt in df_pattern[date_col]]
for dt in df_dates:
if dt in df_pattern_dates:
pattern_mark.append('yes')
else:
pattern_mark.append('no')
df['marked_pattern'] = pattern_mark
start_dt = str(
int(df_pattern[date_col].min()[:4]) - 2
)
end_dt = str(
int(df_pattern[date_col].max()[:4]) + 2
)
df_slice = df.loc[
(df[date_col] > start_dt) & (df[date_col] < end_dt)
]
return df_slice
def build_returns_col(self):
self.dataset['returns'] = self.dataset[self.price_col].pct_change().fillna(method='bfill')
self.test_pattern['returns'] = self.test_pattern[self.price_col].pct_change().fillna(method='bfill')
def build_pattern_database(self):
total = []
for i in tqdm(range(min(self.step_range), max(self.step_range)+1)):
total.append(
SimilarPatterns.sliding_window(self.dataset, i)
)
return list(itertools.chain(*total))
def pattern_rank(self):
pattern_dict = {}
pattern_rank_dict = {}
for p in tqdm(self.pattern_database):
current = p['returns'].values
pattern_dict[self.dist(current, self.test_pattern['returns'])] = p
for d in sorted(tqdm(pattern_dict.keys())):
pattern_rank_dict[d] = pattern_dict[d]
return pattern_rank_dict
def top100_rank(self):
pattern_dict = list(self.ranked_patterns.items())
patterns = []
for df in tqdm(pattern_dict):
df_slice = SimilarPatterns.marked_pattern(
self.dataset, df[1], self.date_col
)
if len(df_slice['marked_pattern'].value_counts()) > 1:
patterns.append(df_slice)
return patterns