Apache Spark MLlibのレコメンドアルゴリズムを使う

1年くらい前にレコメンドロジックを実装するにあたってSpark MLlibのmllibパッケージ内のRDD版を使っていたのですが、
データの整形など不便だったため、その後リリースされたmlパッケージ以下のDataFrame版を使ってみたい。
きっと楽なはずということでサンプル的なものを実装してみました。


RDDとDataFrame

  • RDD(Resilient Distributed Datasets)は、Spark上で分散処理可能なimmutableなデータセット
  • DataFrameは、RDDを構造化したもので、RDBのテーブル的に処理が可能

DataFrameがより抽象化されたレイヤーで扱えるので、使う側からは楽なのとDataFrameのAPIがジョブを最適化してくれるというメリットもあります。

レコメンドアルゴリズム

Spark MLlibに用意されているALSというクラスを使います。
ALS(Alternative Least Squares=交互最小二乗法)とは行列分解の手法の1つ。
通常の協調フィルタリングでは、ユーザ×アイテムの行列の中で類似したユーザやアイテムを見つけようとしますが、
ユーザの特徴を表す行列とアイテムの特徴を表す行列に分解し、それぞれの行列内での類似度から推薦するアイテムを見つけようというアプローチが行列分解です。
すべてのユーザ×アイテムのペアの中で、購入したりコンバージョンしたという履歴のある組み合わせは非常に少ないため、ユーザ×アイテムの行列にはほとんど値が入らない=疎(スパース)な行列となってしまいます。
このような疎な行列では、履歴が少ないため類似するユーザやアイテムを見つけられないという問題があります。
これを解決する方法の1つが行列分解で、行列分解した後の2つの行列は、元の行列に比べるとスパースではなくなっているため、推薦するアイテムが発見しやすくなります。
下の図のように、ユーザ数やアイテム数よりも小さい値kでユーザとアイテムの特徴を表現する行列2つに分解します。
f:id:kzzzz:20160209094316p:plain:w600

元の行列と分解後の行列の誤差が最小となるように、分解後の行列を近似で求めます。
求め方としては、アイテムの特徴行列を初期値ランダムなどで固定して誤差が最小となるユーザの特徴行列を求める。次にユーザの特徴行列を固定してアイテムの特徴行列を求める。これを繰り返し実行して最適な行列を求めるのが、交互最小二乗法=ALSです。

使ったデータ

MovieLensのこちらMovieLens Latest Datasets | GroupLens のデータを使いました。
ユーザ数: 668
映画数: 10325
ユーザがランク付けした映画の総数(ユーザ×映画×レーティング): 105339
レーティングは0から5の0.5刻みです。

実装

RDD版とDataFrame版で実装してみました。

準備
val data = ContextManager.sc.textFile(path).map(line => {
    val words = line.split(",")
    new Rating(words(0).toInt, words(1).toInt, words(2).toDouble)
})

元データがcsvなのでsplitして、ユーザID・アイテムID・レーティングをRatingクラスに入れてます。

val array = Random.shuffle(data.collect().toSeq)
val train = ContextManager.sc.parallelize(array.slice(0, array.length - 500)).map(row => {
    new Rating(row.getAs[Int]("user"), row.getAs[Int]("product"), row.getAs[Double]("rating"))
})
val test = ContextManager.sc.parallelize(array.slice(array.length - 500, array.length)).map(row => {
    new Rating(row.getAs[Int]("user"), row.getAs[Int]("product"), row.getAs[Double]("rating"))
})

訓練データとテストデータに分割したいのですが、ランダムな分割方法がよくわからなかったので、上でRDDにしたのを配列にしてシャッフルして分割するというやり方で、、
一応これで訓練データとテストデータができました。
500件だけテストデータとして使います。
trainとtestはRDD[Rating]型にしています。

RDD版実装

val rank = 10
val numIterations = 10
val model = ALS.train(train, rank, numIterations)

rank は上の図のkに対応するもので、特徴を表す次元数。
numIterationsALSの繰り返し数。どちらも決めで。。
ALS.train でモデル生成。

model.predict(test.map(rating => {
    (rating.user, rating.product)
}))

テストデータのユーザIDrating.userとアイテムIDrating.productでレーティング値を予測する。

結果の一部がこちら↓

+----+-------+------+------------------+
|user|product|rating|        prediction|
+----+-------+------+------------------+
|   1|   1136|   5.0|  3.84121223321302|
|   3|     21|   5.0| 4.545432438241894|
|   5|  81847|   5.0|2.1582961471937283|
|  33|   2997|   5.0|3.4247050236701773|
|  42|   1353|   5.0| 3.890686456361979|
|  66|   2324|   5.0| 3.892941137800877|
|  68|   1683|   5.0|3.5995781206844573|
|  88|     50|   5.0| 5.044828149527467|
|  88|   1079|   5.0| 4.710543650493504|
|  88|   1276|   5.0|4.4993008731591715|
+----+-------+------+------------------+
| 320|    204|   0.5| 4.178416376461781|
| 130|   2763|   1.0|2.4046661964557794|
| 199|   3937|   1.0| 2.671809457491024|
| 224|  34530|   1.0|2.8963020348986666|
| 354|   5213|   1.0|1.6520255250357625|
| 410|   2815|   1.0|1.2346216278871809|
| 410|   3879|   1.0|1.4271989033641381|
| 463|   4306|   1.0|3.7166309995771063|
| 530|   6731|   1.0| 2.786540674684764|
| 569|    196|   1.0| 2.209090681293506|
+----+-------+------+------------------+

ratingが実際にユーザがつけたrating(正解)で、predictionが予測結果。
正解のratingが高いところと低いところを一部取り出してみたものですが、
一部大きく外しているものの、正解のratingに近い予測結果が出ている感じ。

DataFrame版実装

mlパッケージ内の学習器は基本的にfitメソッドで学習して、transformメソッドで分類・予測するという流れになります。

val model = new ALS()
    .setUserCol("user")
    .setItemCol("product")
    .setRank(10)
    .setMaxIter(10)
    .fit(train.toDF)

DataFrame版では、new ALS().fit(train.toDF)でDataFrameに変換したtrainを入力としてモデルを生成します。
途中でパラメータを設定していますが、RDD版との違いとして、setUserCol("user")setItemCol("product")で訓練データのDataFrame中でユーザIDとアイテムIDが格納されているカラム名を指定しています。

val result=model.transform(test.toDF)

transformメソッドで予測した結果もDataFrameでpredictionというカラムに予測結果が入るようになっています。
結果の一部抜粋がこちら

+----+-------+------+----------+
|user|product|rating|prediction|
+----+-------+------+----------+
|   1|   1136|   5.0|  4.201646|
|   3|     21|   5.0| 3.9351418|
|   5|  81847|   5.0| 3.5261602|
|  33|   2997|   5.0| 4.0439353|
|  42|   1353|   5.0| 4.1179585|
|  66|   2324|   5.0|  4.052952|
|  68|   1683|   5.0| 3.2956479|
|  88|     50|   5.0| 4.8985147|
|  88|   1079|   5.0| 4.4943614|
|  88|   1276|   5.0| 4.5683627|
+----+-------+------+----------+
| 320|    204|   0.5| 1.6473958|
| 130|   2763|   1.0| 2.2569048|
| 199|   3937|   1.0| 3.0569887|
| 224|  34530|   1.0| 1.3953632|
| 354|   5213|   1.0|  2.322424|
| 410|   2815|   1.0| 1.8290529|
| 410|   3879|   1.0| 1.7294786|
| 463|   4306|   1.0|  3.564923|
| 530|   6731|   1.0| 2.4670124|
| 569|    196|   1.0| 2.4912937|
+----+-------+------+----------+

こちらも正解のratingが高いところ、低いところを見ると、ほぼ近い予測結果になっている。

評価

テストデータに対する予測精度の指標であるRMSEを計算してみました。

RMSE(Root Mean Squared Error)

予測値と実測値の差の平均の平方根
0に近いほうが良い
参考 https://weather.gc.ca/verification/scores/rmse_e.html

天気予報の気温予測の検証にも使われてるもよう
www.data.jma.go.jp

計算結果

RDD版で、1.0970
DataFrame版では、0.8564

RMSEは相対的な評価指標なのですが、評価対象がいないので、とりあえず絶対に勝てそうなランダムなケースと比較してみよう
ということで、ランダムにレーティングを予測した場合のRMSEは、1.9248

無事、でたらめな予測には勝てましたd(>_< )Good!!


今回使ったデータはもともと綺麗なcsvだったのでRDD版もDataFrame版もあまり違いがない印象ですが、通常はめんどくさいETLがあったり、グリッドサーチやCrossValidationでチューニング・モデル検証が発生するので、この辺を簡単にできるML Pipelinesが使えるDataFrame版のメリットが大きそうです。