異常検知手法追加メモ

異常検知の問題点

  • 正常と異常のデータが不均衡
  • Swamping : 正常データが異常データに近い時にFalse Negativeが発生
  • Masking : 異常なデータがたくさんあり、それらが密な集合になり検知できない

手法

Isolation Forest

正常データとの距離を見る手法と異なり、異常データを分類する手法です。異常データは決定木の根の部分に近い(必要な分割数が少ない)と考えた手法です。scikit-kearnで実装あり。元のデータからサンプリングを行うことで、大量なデータがあるときに生じやすいSwampingとMaskingの影響を抑える。通常なデータセットだけを用いた異常検知も可能だが、サンプリング数を大きくする必要あり。 Isolation forest

時系列手法

スライド窓による時系列データの変換

スライド窓により部分時系列を生成する。あとはkNNなど上記の手法を適用する。スライド窓が小さいとノイズが多くなり、大きくなると軽微な変化を検知できなくなる。チューニング必須。また、ある程度滑らかな時系列の場合は隣り合う部分時系列の要素の値がほとんど同じになるので、時間的に近接した部分時系列同士の距離(類似度)は計算しないなど工夫が必要になるときも。

特異スペクトル変換法

区間の部分時系列と別区間の部分時系列を抽出し、それぞれに対して特異値分解しそれらを比較することで異常検知を行う。

AR、状態空間モデルによる異常検知

時系列データにモデルを当てはめ残差の大きさで異常度を判定する方法。予測できるモデルを用意すればいいので、LSTMなどDeepな手法も使われる。

ここまでのまとめ

一応、ここまででこの表のDeep以外の手法を理解することができました。

f:id:yamayou_1:20210327154134p:plain

(Ruff, Lukas, et al. "A Unifying Review of Deep and Shallow Anomaly Detection." arXiv preprint arXiv:2009.11732 (2020).からの引用)

異常検知7:不要な次元を含むデータからの異常検知 - PCA, Kernel PCA

不要な次元を含むデータからの異常検知

多変量のデータがある場合にそのデータセットの変数間で多重共線性があるとホテリング理論を使ってもうまくいきません。これは、共分散行列の逆行列を計算できないためです。このような場合に使われる手法がPCA(/Kernel PCA)を用いた異常検知です。

PCAとは

正常データもしくは正常データが圧倒的多数なデータからPCAなどから主部分空間(正常部分空間)を求めます。この時、新しいデータ x'が来たときに、異常度は正常部分空間から新しいデータ点がどれだけ離れているかで求めることができます。 異常度は以下のように表され、主成分のみを使用してデータを表した際にどれだけの長さが失われたのかを表しています。

[tex: \displaystyle a(x') = ||x'||^{2} = (x' - \hat{\mu})^{T}(I{M} - U{M}^{T})(x' - \hat{\mu}) ]

コード

データ準備

import pyper
import pandas as pd
from datetime import datetime

r = pyper.R(use_pandas='True')

print(r("""
library(MASS)

data(Cars93)

cc<-c('Min.Price', 'Price', 'Max.Price', 'MPG.city', 'MPG.highway', 'EngineSize', 'Horsepower', 'RPM', 'Rev.per.mile', 'Fuel.tank.capacity',
      'Length', 'Wheelbase', 'Width', 'Turn.circle', 'Weight', 'Make')
mask <- is.element(colnames(Cars93),cc)
X <- Cars93[,mask]
"""))

X = pd.DataFrame(r.get("X"))
X = X.set_index(['Make'])
X = X.astype(float)
X.head()

訓練とテストで分割

from sklearn.model_selection import train_test_split

X_train, X_test = train_test_split(X, test_size=0.1, random_state=42)

PCA

# Standard Scaler
from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
X_train_sc = pd.DataFrame(scaler.fit_transform(X_train))
X_test_sc = pd.DataFrame(scaler.transform(X_test))

# PCA
from sklearn.decomposition import PCA

pca = PCA()
pca.fit(X_train_sc)

# 累積寄与率を図示する
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
plt.gca().get_xaxis().set_major_locator(ticker.MaxNLocator(integer=True))
plt.plot([0] + list( np.cumsum(pca.explained_variance_ratio_)), "-o")
plt.xlabel("Number of principal components")
plt.ylabel("Cumulative contribution rate")
plt.grid()
plt.show()

f:id:yamayou_1:20210318163446p:plain

# calc anomaly scores
Um = pd.DataFrame(pca.components_[0:6, :]).T
S2 = np.identity(X.shape[1]) - Um.dot(Um.T)

def CalcAnomaly(x, S2):
    return x.T.dot(S2).dot(x)

X_train_sc.columns = S2.columns
a_train = [CalcAnomaly(X_train_sc.iloc[i, :], S2) for i in range(X_train_sc.shape[0])] 

# threshold
threshold = pd.Series(a_train).quantile(0.9)
print(threshold)

# test
X_test_sc.columns = S2.columns
a_test = [CalcAnomaly(X_test_sc.iloc[i, :], S2) for i in range(X_test_sc.shape[0])] 

# plot
plt.scatter(X_test_sc.index, a_test)
plt.plot([min(X_test_sc.index), max(X_test_sc.index)], [threshold, threshold], ls = "dashed")
plt.ylim([0, 10])
plt.show()

f:id:yamayou_1:20210318163553p:plain

振り返り

線形の関係でない場合カーネル主成分分析を用いる必要があります。その際、異常度をデータ1点だけでみるのではなく、ある一定期間のデータ集合に対して、正常データと比べる方法が用いられる場合もあります。その場合の異常度の式はデータセット D, D'でそれぞれ主成分分析を行い、行列2ノルムの最大値の大きさをみます。これは、2つの主成分空間の比較に基づくもので、相互部分空間法という名前で呼ばれています。カーネルPCAのように表現力が高く、パラメータに結果が左右されやすい時には同じような対処法を考えることができそうです。

異常検知6:分布が一山にならない場合 - One-Class SVM

One-Class SVM

One-Class SVMではデータの主要部分を含む球を考え、その球の外側の点を異常とします。 異常度は以下のように与えられます。ここで、 Rは球の半径です。SVMと同様にカーネルトリックを行うことで非線形にも対応可能なので、非線形の関係があるデータに対して使われる古典的な手法になっています。

 \displaystyle
a(x') = ||x' - b^{*}||^{2} - R^{* 2}

コード

データ準備

# import packages
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from scipy import stats
from scipy.spatial import distance
from scipy.stats import chi2
from sklearn.model_selection import train_test_split
import pyper
import pandas as pd
from datetime import datetime

r = pyper.R(use_pandas='True')

print(r("""
x <- rbind(matrix(rnorm(120),ncol=2),matrix(rnorm(120,mean=3),ncol=2))
"""))

X = pd.DataFrame(r.get("x"))

One-Class SVM

from sklearn.preprocessing import StandardScaler
from sklearn.svm import OneClassSVM

scaler = StandardScaler()
X_sc = scaler.fit_transform(X)

# 3sigma=99.7% -> nu=0.003
# if gammma="auto", 1 / n_featuresになる。そのため適当なので、適宜チューニング必要あり
model = OneClassSVM(nu=0.003, kernel="rbf", gamma="auto")
model.fit(X_sc)

#Signed distance to the separating hyperplane.
#Signed distance is positive for an inlier and negative for an outlier.
#-> 大きいほど密、小さいほど疎
y_pred = model.decision_function(X_sc)

# calc is_anomaly
is_anomaly = np.where(y_pred < 0, True, False)

# 可視化
plt.scatter(X_sc[:, 0], X_sc[:, 1], c=is_anomaly)
plt.show()

f:id:yamayou_1:20210318145638p:plain

nu = 0.003では少なすぎる感があるので今回はnu = 0.1で試してみます。

# nu = 0.1でやり直し
# 3sigma=99.7% -> nu=0.003
# if gammma="auto", 1 / n_featuresになる。そのため適当なので、適宜チューニング必要あり
model = OneClassSVM(nu=0.1, kernel="rbf", gamma="auto")
model.fit(X_sc)

#Signed distance to the separating hyperplane.
#Signed distance is positive for an inlier and negative for an outlier.
#-> 大きいほど密、小さいほど疎
y_pred = model.decision_function(X_sc)

is_anomaly = np.where(y_pred < 0, True, False)

plt.scatter(X_sc[:, 0], X_sc[:, 1], c=is_anomaly)
plt.show()

f:id:yamayou_1:20210318145653p:plain

データの分布に偏りがある場合

ある資料でデータに偏りがあるとうまくいかないと見たので試してみます。データセットはLOFを試した時のデータセットを使用してます。

データ準備

# input Davis 
input_name = "./input/Davis.csv"
df = pd.read_csv(input_name, index_col=0)
df = df[["weight", "height"]]
df.head()


# mean 
means = df[["weight", "height"]].mean() - [40, 100]
# cov matrix 
cov_matrix = np.cov(df[["weight", "height"]].T, ddof=0) + 30
print(means, cov_matrix)

# 元のデータの平均ベクトルと分散共分散行列を編集したものを参考にしてサンプリング
from scipy.stats import multivariate_normal
data1 = np.random.multivariate_normal(means, cov_matrix, size=10)
df = df.append(pd.DataFrame(data1, columns=["weight", "height"]))

# visualization
plt.scatter(df["weight"], df["height"])
plt.show()

OC SVM

左下のデータがかなり異常データと判断されています。苦手なのがはっきり分かりました。

X = df

scaler = StandardScaler()
X_sc = scaler.fit_transform(X)

# 3sigma=99.7% -> nu=0.003
# if gammma="auto", 1 / n_featuresになる。そのため適当なので、適宜チューニング必要あり
model = OneClassSVM(nu=0.1, kernel="rbf", gamma="auto")
model.fit(X_sc)

#Signed distance to the separating hyperplane.
#Signed distance is positive for an inlier and negative for an outlier.
#-> 大きいほど密、小さいほど疎
y_pred = model.decision_function(X_sc)

# calc is_anomaly
is_anomaly = np.where(y_pred < 0, True, False)

# 可視化
plt.scatter(X_sc[:, 0], X_sc[:, 1], c=is_anomaly)
plt.show()

f:id:yamayou_1:20210318150906p:plain

振り返り

データの分布に偏りがある場合、それらの異常値に対応するのは難しいかもしれません。そのような場合が観れた際にはLOFの検討や併用を検討したほうが良さそうです。

異常検知5:分布が一山にならない場合 -GMM

混合正規分布モデル(GMM)とは

KNNではクラスリングの結果をどれだけ信じれば良いのか不明であり、また一般的にクラスター同士が重なっていない場合ばかりではない。そのため、GMMを用いることでクラスター同士の重なりを表現できます。よく使われている方法であり、かつDLベースの手法でも使われたりします。 異常度はこれまでの議論と同じように以下のように定義されています。

 \displaystyle
a(x') = -\ln \sum_{k=1}^{K} \hat{\pi_k} N(x'|\hat{\mu_k}, \hat{\Sigma_k})

コード

データ準備

Davisデータセットを使用します。

# import packages
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from scipy import stats
from scipy.spatial import distance
from scipy.stats import chi2
from sklearn.model_selection import train_test_split
input_name = "./input/Davis.csv"
df = pd.read_csv(input_name, index_col=0)
df = df[["weight", "height"]]
df_test = df
df = df.drop(12)
df.head()

GMM

sklearn実装を使用します。異常度が-model.score_samples(X)だけで計算できるので便利です。今回は単純なデータなのでn_components=2を採用してます。複雑な多次元のデータではハイパーパラメータチューニングが難しいのは想像できます。BIC基準で決めるやり方が一般的なようですが、現実的にBICのみでは難しいと思われるため、アンサンブルモデルなどを考える法が良さそうです。

from sklearn.mixture import GaussianMixture

model = GaussianMixture(n_components=2)
model.fit(df)

# BICを計算可能
print(model.bic(df))

# clusterを可視化
clusters = model.predict(df)
plt.scatter(df["weight"], df["height"], c=clusters)
plt.show()

f:id:yamayou_1:20210318133553p:plain

異常度を計算

# Compute the weighted log probabilities for each sample.
score = -model.score_samples(df_test)

# 閾値を決定
threshold = pd.Series(score).quantile(0.8)

# 異常かどうかを決定
y_pred = np.where(score > threshold, -1, 1)

# scoreをプロット
plt.scatter(df_test.index, score)
plt.plot([min(df_test.index), max(df_test.index)], [threshold, threshold], ls = "dashed")
plt.show()

f:id:yamayou_1:20210318133714p:plain

振り返り

GMMのハイパーパラメタチューニングは難しいため、たくさんモデルを訓練して平均値を最終出力とする場合もあるようです。

異常検知4:分布が一山にならない場合 - 局所外れ値度

局所外れ値度

以前のKNN法の弱点としてはデータ密度が空間の場所によりばらつきがあると最適なkを決めるのが難しい点です。この課題は以下のような例を見ると分かりやすいです。右上と左下にクラスターのデータ密度に差があるので、右上のクラスターの半径の基準で考えると、左下のクラスターは外れ値のように扱われてしまいます。

コード

データ準備

# import packages
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from scipy import stats
from scipy.spatial import distance
from scipy.stats import chi2
from sklearn.model_selection import train_test_split
# input Davis 
input_name = "./input/Davis.csv"
df = pd.read_csv(input_name, index_col=0)
df = df[["weight", "height"]]
df.head()


# mean 
means = df_train[["weight", "height"]].mean() - [40, 100]
# cov matrix 
cov_matrix = np.cov(df_train[["weight", "height"]].T, ddof=0) + 30
print(means, cov_matrix)

# 元のデータの平均ベクトルと分散共分散行列を編集したものを参考にしてサンプリング
from scipy.stats import multivariate_normal
data1 = np.random.multivariate_normal(means, cov_matrix, size=10)
df = df.append(pd.DataFrame(data1, columns=["weight", "height"]))

# visualization
plt.scatter(df["weight"], df["height"])
plt.show()

f:id:yamayou_1:20210317174240p:plain

KNNを試す

左下のクラスターのほとんどが外れ値(黄色の点)と認識されています。これは以前説明したKNNの判定方法を考慮すると妥当な結果です。

from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors
from sklearn.pipeline import Pipeline

# 変数間でスケールの違いの影響を防ぐため
scaler = StandardScaler()
df_sc = scaler.fit_transform(df)

# k:ハイパーパラメータ
k = 1
model = NearestNeighbors(n_neighbors=k)
model.fit(df_sc)

# 訓練データの半径の95%の値を異常とみなす閾値として決定
distances, ids = model.kneighbors(df_sc, n_neighbors=k+1, return_distance=True)
threshold = pd.Series(distances[:, k]).quantile(0.95)
print(threshold)

# 検証用データの訓練データへの距離を算出
distances, ids = model.kneighbors(df_sc, n_neighbors=k+1, return_distance=True)
is_anomaly = distances[:, k] > threshold

# 可視化
plt.scatter(df["weight"], df["height"], c=is_anomaly)
plt.show()

f:id:yamayou_1:20210317175352p:plain

局所外れ値度

上の問題の対処方法として考えられたのが局所外れ値度です。この手法は、近傍のk点のデータに対してその点の近傍k点との距離を見ることでデータ空間の疎密を考慮して異常度を定義します。式としては以下の通りです。分子の x'と周りの近傍k点との距離 d_k(x')が大きい値を示しても、分母も同様に大きければ異常ではないことが分かります。実装はskleanを使用してます。

デフォルトでは外れ値の割合を決めるcontaminationautoになってますが、うまく行かない場合があるようです。今回はうまくいくのですが、適当に0.05で決めうちしてます。

 \displaystyle
a_{LOF}(x') = \frac{1}{k}\sum_x \frac{d_k(x')}{d_k(x)}

from sklearn.neighbors import LocalOutlierFactor

model = LocalOutlierFactor(n_neighbors=5, novelty=True, contamination=0.05)
model.fit(df_sc)
is_anomaly = model.predict(df_sc)

# 可視化
plt.scatter(df["weight"], df["height"], c=~is_anomaly)
plt.show()

f:id:yamayou_1:20210317175643p:plain

振り返り

KNNがうまくいかない例でもうまくいくケースが多いようです。
しかし、contaminationのパラメータが効く例もあるようなので、ハイパラチューニングが必要なのに注意です。また、KNNと同じで距離ベースの手法なので多次元に弱いです。

異常検知3:分布が一山にならない場合 - KNN法

KNNにおける異常検知とは

ホテリング理論やMT法では1つの正規分布を仮定して、そのデータの出現確率が低ければ異常と見なすことができるような異常度を算出しました。
KNN法ではデータ間の距離を考えることで2つの異常判定基準を考えることができます。

  1. k基準:M次元の球を考えて、球の半径の中に入る標本の数kがある基準値以下であれば異常

  2.  \epsilon基準:観測値に近い順にk個の標本を選んだ時、それを囲む球の半径 \epsilon_kがある基準値以上であれば異常

つまり、異常データであれば近傍に正常なデータがないはずなので、半径 \epsilon内のデータ点が少ない/周辺のk個を含む球の半径が大きくなるということです。 この方法は、データ分布がある程度一様の場合や単峯性である場合に有効です。また、時系列データにおける異常パターン検出には主な手法として使われています。

コード

以下のコードでは \epsilon基準で異常かどうかの判断を行なっています。

データ準備

ホテリング理論の時と同様に身長と体重のデータを用います。

# import packages
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from scipy import stats
from scipy.spatial import distance
from scipy.stats import chi2
from sklearn.model_selection import train_test_split
input_name = "./input/Davis.csv"
df = pd.read_csv(input_name, index_col=0)
df = df[["weight", "height"]]
df.head()

ここで異常データを含む方を検証データにしてます。

df_test = df.iloc[:20, :]
df_train = df.iloc[20:, :]

print(df_train.shape, df_test.shape)

SklearnのNearestNeighbors

簡単にやる方法を探していたらSklearnに実装がありました。コードは以下のように簡単に実行できます。 結果として、訓練データからある一定距離離れている2点が異常と判断されています。

from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import NearestNeighbors
from sklearn.pipeline import Pipeline
# 変数間でスケールの違いの影響を防ぐため
scaler = StandardScaler()
df_train_sc = scaler.fit_transform(df_train)

# k:ハイパーパラメータ
k = 1
model = NearestNeighbors(n_neighbors=k)
model.fit(df_train_sc)

# 訓練データの半径の95%の値を異常とみなす閾値として決定
distances, ids = model.kneighbors(df_train_sc, n_neighbors=k+1, return_distance=True)
threshold = pd.Series(distances[:, k]).quantile(0.95)

# 検証用データの訓練データへの距離を算出
df_test_sc = scaler.transform(df_test)
distances, ids = model.kneighbors(df_test_sc, return_distance=True)
is_anomaly = distances[:, k-1] > threshold

# 可視化
plt.scatter(df_train["weight"], df_train["height"])
plt.scatter(df_test["weight"], df_test["height"], c=is_anomaly)
plt.show()

f:id:yamayou_1:20210316184715p:plain

振り返り

KNN全般に言えることですが、基本的に多次元になってくると弱いです。また、閾値の決め方に左右されるので実用上は試行錯誤が必要になります。

[追加] ラベル付きのデータに対するKNNにおける異常検知とは

次に、正常と異常のラベルがつけられたN個のデータを含む訓練データDを使用したKNN法による異常検知手法をやってみます。一般的に、新たな観測値 x'が異常かどうかを判定するためには対数尤度比を異常度として使用するのがいいことが知られています。これは、ネイマン=ピアソンの補題と呼ばれています。

 \displaystyle
a(x') = \ln \frac{p(x'| y = +1)}{p(x'| y = -1)}

そこで、kNNで上式の異常標本に対するxの確率密度関数を求めれば異常度を計算することができます。この計算は以下のように算出できます。

k近傍のうち異常ラベルの割合:

 \displaystyle
p(y = +1|x', D) = \frac{N^{1}(x')}{k}

 \displaystyle
p(x'|y = -1, D) = \frac{p(y = -1|x', D)p(x')}{p(y = -1)} = \frac{p(x')}{k} \frac{N^{0}(x')}{\pi_0}

同様に y = +1の時も計算でき、最終的に異常度は以下のように計算できることが分かります。

 \displaystyle
a(x') = \ln \frac{\pi^{0}N^{1}(x')}{\pi^{1}N^{0}(x')}

コード

以下、コードです。

データ準備

データセット探しがめんどくさかったので、RのUScrimeデータセットを使用します。ここで、犯罪率が1200以上を異常、それ以下を正常とみなします。

# import packages
import pyper
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime

r = pyper.R(use_pandas='True')

#print(r("""
#install.packages('MASS', repos = 'http://cran.us.r-project.org')
#install.packages('vars', repos = 'http://cran.us.r-project.org')
#"""))

print(r("""
library(MASS)

# 第2, 16変数を削除
X <- UScrime[, -c(2, 16)]; M <- col(X) 

# 16番目がyに対応
y <- UScrime[, 16]; N <- length(y)

road <- data.frame(X)

index_name <- rownames(X)
"""))

df_X = pd.DataFrame(r.get("X"))
df_y = pd.Series(r.get("y"))

df_X.columns = ['M', 'Ed', 'Po1', 'Po2', 'LF', 'M.F', 'Pop', 'NW',
                'U1', 'U2', 'GDP', 'Ineq', 'Prob', 'Time']

df_X = df_X.astype(float)
df_y = df_y.astype(float)

df_X.index = r.get("index_name")
df_y.index = r.get("index_name")

display(df_X.head(), df_y.head())

# 異常だったら1、正常では0
is_anomaly = np.where(df_y > 1200, 1, 0)
print(is_anomaly)

y = is_anomaly

訓練と検証に分割

基本的に異常データは少ないのでleave-one-outが使われるようです。今回は1割を検証用に使用してます。

from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(df_X, y, test_size=0.1, random_state=42, stratify=y)

KNNで異常検知

結果自体はうまく異常と識別できています。

from sklearn.neighbors import KNeighborsClassifier

# model
k = 5
model = KNeighborsClassifier(n_neighbors=k)
model.fit(X_train, y_train)

# 訓練データから異常かどうかの閾値を決める
## X_testのk近傍点のインデックスと距離を取得
dist, ind = model.kneighbors(X_train)

## 訓練データの異常度を算出
N_1 = y_train[ind].sum(axis=1)/k
N_0 = 1 - N_1
pi_1 = y_train.sum()/len(y_train)
pi_0 = 1- pi_1
anomaly = np.log((pi_0*N_1)/(pi_1*N_0) + 10**-5)

## 閾値をthreshold = pd.Series(anomaly).quantile(pi_0)を参考にして決定
## そのままの値を採用するとうまくいかない
#threshold = pd.Series(anomaly).quantile(pi_0)
threshold = 0.8

## 検証
# X_testのk近傍点のインデックスと距離を取得
dist, ind = model.kneighbors(X_test)

N_1 = y_train[ind].sum(axis=1)/k
N_0 = 1 - N_1

anomaly = np.log((pi_0*N_1)/(pi_1*N_0) + 10**-5)

print(anomaly > threshold, y_test)

f:id:yamayou_1:20210318111605p:plain

振り返り

異常データ自体が少ないのでデータ分割や閾値決めに注意が必要なのが分かりました。

異常検知2:マハラノビス=タグチ法(MT法)

今回は、マハラノビス=タグチ法(MT法)を説明します。

MT法とは

MT法はホテリング理論に基づく検出方法に、異常変数の選択手法を組み合わせることで、ホテリング理論の課題であった個別の変数の異常度を見ることができない点に対応します。

手順としては、以下の通りです。

1. 準備

正常データのデータセット Dと異常データセット D'を用意する。ここで、変数の数はともにMとする。

2. 分布推定

 Dをもとに標本平均と標本分散共分散行列を求める。

3. 異常度の計算

 Dの中の各標本に対して1変数あたりのマハラノビス距離を求める。

4. 異常判定1

 Dの標本が正常範囲に入るように1変数あたりのマハラノビス距離の閾値を決める。

5. 異常判定2

以下の話は D'の異常度が Dの異常度より大きいことが前提です。

 D'の標本に対して、M変数の中からいくつかの変数を選び、その変数集合の1変数あたりの異常度を計算する。
このステップにより、個々の変数の寄与が数値化されて、どの変数が異常だったのかを示すことが可能です。指標としては、以下のような式が提案されています。 ここで、 qは変数の取捨選択パターンを区別する添字で、 M_qはパターンqにおける変数の数、 a_qはパターンqにおける (M_q, M_q)次元の共分散行列を使った時の異常度です。この時、分子の1は異常度を変数の数で割った時の期待値が1であることを表しています。これは、異常度が自由度M、スケール1のカイ二乗分布の期待値がM、分散が2Mに従うことにより、1変数あたりの異常度 a(x)/Mは期待値が1で標準偏差 (2/M)^{1/2}ことから来ています。そのため、マハラノビス距離では値が1以下の領域を「単位空間」と呼びます。

[tex: \displaystyle SN_q = -10 \log{10} \frac{1}{N'} \sum{n=1}^{N'} \frac{1}{a_q(x'^{n}+M_{q})} ]

ここで、変数を1個ずつ見る場合はパターンqはM通りあり、 M_q = 1です。この時、もし第i成分が D'のデータの異常に大きく寄与している場合は、 SN_qも大きくなるはずです。従って、 SN_qを見ることで各変数ごとの異常どの寄与度を定量化できます。

コードの説明

それでは、以下でコードを確認します。

手順0. データの準備

pyperを使用してRのMASSパッケージからroadデータセットを使用してます。

# import packages
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from scipy import stats
from scipy.spatial import distance
from scipy.stats import chi2
import pyper
import pandas as pd
from datetime import datetime

r = pyper.R(use_pandas='True')

#print(r("""
#install.packages('MASS', repos = 'http://cran.us.r-project.org')
#"""))

print(r("""
library(MASS)

data(road)

# driversで割る
X <- road / road$drivers 

# 対数変換
X <- as.matrix(log(X[,-2] + 1))

road <- data.frame(X)

index_name <- rownames(X)
"""))

df = pd.DataFrame(r.get("road"))
df.columns = ['deaths', 'popden', 'rural', 'temp', 'fuel']
df = df.astype(float)

df.index = r.get("index_name")

df.head()

手順1. 分布推定

ホテリング理論の時同様に標本平均と分散共分散行列を計算します。

# mean 
means = df.mean()

# cov matrix 
cov_matrix = np.cov(df.T, ddof=0)
print(means, cov_matrix)

手順2. 1変数あたりのマハラノビス距離を計算

# 分散共分散行列の逆行列を計算
cov_matrix_i = np.linalg.pinv(cov_matrix)

# 異常度を計算 (マハラノビス距離を計算)
anomaly_scores = np.array([distance.mahalanobis(x, means, cov_matrix_i)**2 for x in df.values]) / len(df.columns)

手順3. データの可視化

最初に説明した通り期待値が1なので、閾値の値は1にしています。

この時、分子の1は異常度を変数の数で割った時の期待値が1であることを表しています。これは、異常度が自由度M、スケール1のカイ二乗分布の期待値がM、分散が2Mに従うことにより、1変数あたりの異常度 a(x)/Mは期待値が1で標準偏差 (2/M)^{1/2}ことから来ています。そのため、マハラノビス距離では値が1以下の領域を「単位空間」と呼びます。

結果より、Alaska, Calif, DC, Maine, Montの異常度が大きいことがわかります。

threshold = 1

plt.figure(figsize=(16,8))
plt.scatter(df.index, anomaly_scores)
plt.plot([min(df.index), max(df.index)], [threshold, threshold], ls = "dashed")
plt.xlabel("Index")
plt.ylabel("Anomaly scores")
plt.ylim(0, 6)
plt.show()

f:id:yamayou_1:20210315171832p:plain

手順4. SN比解析

 N' = 1, M_q = 1として各変数のSN比を計算します。この場合の SN_qは単純な式になり以下のようになります。
ここで、異常データとして「Calif」のデータを見ます。負の標準偏差は平均からの偏差が標準偏差に比べて小さいときに生じます。この結果、異常度のほとんどが「fuel」が原因であることが分かります。

 \displaystyle
SN_q = 10 log_{10} \frac{(x_q' - \hat{\mu_q})^{2}}{\hat{\sigma_q}^2}

sn_q = 10.*np.log10((df.loc["Calif", :] - means)**2/np.diag(cov_matrix))

plt.bar(df.columns, sn_q)
plt.show()

f:id:yamayou_1:20210315172259p:plain

振り返り

 a(x)/Mの期待値は1だが、標準偏差 (2/M)^{1/2}であり、自由度により標準偏差が異なることには注意が必要。 また、実際にMT法が搭載されているシステム(https://www.toshiba-dme.co.jp/dme/coretec/hin/hin_mt.htm)を見ると、正常システムをマハラノビス距離が2以下の値にしている。これは自由度1の時の標準偏差を考慮してのことだと思われる。そこから2〜4で異常に近く、4以上で異常と判断しているようなので、このような基準を自由度が増えた際には考慮する必要がありそう。