A Pattern Similarity Metric

When doing pattern analysis with time series, a question that can come up is:

How similar are two patterns?

Obviously there can be multiple answers to this question and in typical Finance fashion we will construct a metric to provide us with an answer.

Let \{a_1, ..., a_n\} and \{b_1, ..., b_n\} be 2 sets of prices.
We can then take their percentage returns 
\{ra_2, ..., ra_n\} and \{rb_2, ..., rb_n\}.
Now we can consider the Euclidean Distance between the 2 series as a metric of similarity for the original time series of prices.
D = \sqrt{ (ra_2-rb_2)^2 ... (ra_n-rb_n)^2 }

This metric is quite versatile. Notice we get the percentage returns instead of log returns or just differencing the time series. This is done to eliminate the difference in pricing scales that often occurs between time series so we can have a robust tool to analyse lots of different data and compare them to each other.

Below is the implementation of such an idea in Python code.

import pandas as pd
import multiprocessing
import math
from joblib import Parallel, delayed
import itertools
from tqdm import tqdm
import numpy as np

class SimilarPatterns:

    def __init__(self, dataset, price_col,  date_col, step_range, test_pattern):
        self.dataset = dataset
        self.price_col = price_col
        self.date_col = date_col
        self.step_range = step_range
        self.test_pattern = test_pattern
        self.build_returns_col()
        self.pattern_database = self.build_pattern_database()
        self.ranked_patterns = self.pattern_rank()
        self.df = self.top100_rank()


    @staticmethod
    def dist(x,y):
        
        x = np.nan_to_num(x, copy=True, nan=0.0)
        y = np.nan_to_num(y, copy=True, nan=0.0)


        if len(x) > len(y):

            y_zero_pad = np.append(y, [0 for i in range( len(x) - len(y) )],
            axis=0)

            D = np.linalg.norm(x-y_zero_pad)
        elif len(y) > len(x):

            x_zero_pad = np.append(x, [0 for i in range( len(y) - len(x) )],
            axis=0)

            D = np.linalg.norm(x_zero_pad-y)
        else:
            D = np.linalg.norm(x-y)

        return D

    @staticmethod
    def sliding_window(df, step):

        patterns = []
        
        for i in range(len(df)):
            if i < len(df) - step:
                patterns.append( df[i:i+step] )

        return patterns

    @staticmethod
    def returns_col(df, price_col):

        df['returns'] = df[price_col].pct_change().fillna(method='bfill')

        return df

    @staticmethod
    def marked_pattern(df, df_pattern, date_col):

        pattern_mark = []
        df_dates = [dt[:10] for dt in df[date_col]]
        df_pattern_dates = [dt[:10] for dt in df_pattern[date_col]]

        for dt in df_dates:
            if dt in df_pattern_dates:
                pattern_mark.append('yes')
            else:
                pattern_mark.append('no')

        df['marked_pattern'] = pattern_mark

        start_dt = str(
            int(df_pattern[date_col].min()[:4]) - 2
        )

        end_dt = str(
        int(df_pattern[date_col].max()[:4]) + 2
        )

        df_slice = df.loc[
            (df[date_col] > start_dt) & (df[date_col] < end_dt)
        ]

        return df_slice

    def build_returns_col(self):

        self.dataset['returns'] = self.dataset[self.price_col].pct_change().fillna(method='bfill')
        self.test_pattern['returns'] = self.test_pattern[self.price_col].pct_change().fillna(method='bfill')

    def build_pattern_database(self):

        total = []

        for i in tqdm(range(min(self.step_range), max(self.step_range)+1)):
            total.append(
                SimilarPatterns.sliding_window(self.dataset, i)
            )

        return list(itertools.chain(*total))
    
    def pattern_rank(self):

        pattern_dict = {}
        pattern_rank_dict = {}

        for p in tqdm(self.pattern_database):
            current = p['returns'].values
            pattern_dict[self.dist(current, self.test_pattern['returns'])] = p

        for d in sorted(tqdm(pattern_dict.keys())):
            pattern_rank_dict[d] = pattern_dict[d]
        
        return pattern_rank_dict


    def top100_rank(self):

        pattern_dict = list(self.ranked_patterns.items())

        patterns = []

        for df in tqdm(pattern_dict):
            df_slice = SimilarPatterns.marked_pattern(
                self.dataset, df[1], self.date_col
            )
            if len(df_slice['marked_pattern'].value_counts()) > 1:

                patterns.append(df_slice)
        
        return patterns