今更だけどCNN動かしてみる

しばらくサボっていて忘れられそうなので、ブログタイトルも新たに久しぶりに書きました。

chainerでCNN動かしてみた

別件でDNNの中でもFNNとかRNNとかCNNをいろいろ試しみようという何かがありまして、、、
個人的に勉強がてら最早流行でもなくなったCNN(Convolutional Neural Network)をchainerで動かしてみました。

CNNの概要

CNNの全体像は、バリエーションは様々ですがシンプルなもので下のような感じになります。
f:id:kzzzz:20160404220122p:plain

入力画像の部分的な領域に対してフィルターを通すことで、部分的な特徴を抽出します(=特徴マップ)。異なるフィルターを使うことで様々な特徴を捉えることができ、フィルターの数だけ特徴マップができることになります。この処理が畳み込みと呼ばれています。

次に、出来上がった特徴マップに対して、似たような感じで局所領域だけが見えるウインドウを移動させながら、その中の特徴的な点のみを次の層へ伝播させます。この処理をプーリングと言い、代表的な方法ではウインドウ内の最も大きな値を伝播させたりします。このプーリング層を通すことで、対象画像の中の物体の位置のズレや変形を吸収できるので識別精度を上げられるそうです。

このような畳み込み層とプーリング層を繰り返した後で、通常のニューラルネットに入力して画像の判別を行うというのが典型的なCNNの構成になります。

画像判別

本来は、
f:id:kzzzz:20160404223714p:plain
↑こんな文字とか人の顔とかの画像を入れて、
f:id:kzzzz:20160404223719p:plain
↑この辺りの部分的な特徴から、この画像は数字の5だと判別するものですが、
手元に懐かしい写真があったので、いい感じで判別できないものかと入れてみました。

使ったのは次の3種類の画像で、3クラスの分類問題としました。

  f:id:kzzzz:20160404224246j:plain:w250 f:id:kzzzz:20160404224424j:plain:w250 f:id:kzzzz:20160404225006j:plain:w250

  • 街中の風景

 f:id:kzzzz:20160404224601j:plain:w250 f:id:kzzzz:20160404224615j:plain:w250 f:id:kzzzz:20160404225114j:plain:w250

  • F1サーキット

 f:id:kzzzz:20160404224759j:plain:w250 f:id:kzzzz:20160404224810j:plain:w250 f:id:kzzzz:20160404225139j:plain:w250

訓練データが92枚、テストデータが48枚という極小データセット( ;∀;)

画像読み込み

元画像がjpgのため、OpenCVを使って読み込みました。ここで気をつけなければいけないのが、OpenCVで読み込んだデータはchainerが受け付ける構造と微妙に違うところ。
OpenCVで読み込むと(height, width, channel(=rgb)) な構造になっているところを、(channel, height, width)に変換する必要があります。

img = cv2.imread("/data/img/train/0/001.jpg") # jpg読み込み
img = np.transpose(img, (2,0,1)) # データ項目入れ替え。チャンネルを一番前に。

訓練用データとテスト用データに分けて、あらかじめ3種類に分類してラベル付けしました。
汚いコードを抜粋すると以下の様な感じです。実験では、元画像を50×50ピクセルに縮小しました。3種類のラベルごとにフォルダを作って、フォルダ名でラベルを付与しています。

x_train = []
y_train = []
for i in range(0,3):
    files = os.listdir("/data/img/train/"+str(i))
    for img_index in range(len(files)):
        if files[img_index].find('.jpg') > 0:
            img = cv2.imread("/data/img/train/"+str(i)+"/"+files[img_index])
            resized_img = cv2.resize(img, (50, 50)) # 画像を縮小
            x_train.append(np.transpose(resized_img,(2,0,1))/255.0)
            y_train.append(i)
train_data = np.array(x_train).astype(np.float32).reshape(len(x_train), 3, 50, 50)/255 # 255で割って0から1の値になるように。
train_label = np.array(y_train).astype(np.int32)

テストデータもほとんど同様なので省略。

モデル

畳み込み層とプーリング層を2回ずつ通すモデルを作りました。
フィルターのサイズやウインドウのサイズは色々変えながら動かしました。ので、下のコード中の数字も適当に決めたうちの1つです。。。

model = chainer.FunctionSet(conv1 = F.Convolution2D(3, 20, 3, pad=1), # 入力が3チャンネル。出力が20チャンネル。フィルタサイズが3×3。paddingが1。
                            conv2 = F.Convolution2D(20, 50, 3, pad=1), # 出力が50チャンネル。
                           l1 = F.Linear(4050, 256), # <- 9*9*50  の特徴マップを256ユニットの隠れ層へ
                           l2 = F.Linear(256, 3)) # 256ユニットの隠れ層から出力層が3ユニット(最終的に分類したい種類)

def forward(x_data, y_data, train=True):
    x = chainer.Variable(np.array(x_data, dtype=np.float32), volatile=not train)
    t = chainer.Variable(np.array(y_data, dtype=np.int32), volatile=not train)
    h = F.max_pooling_2d(F.relu(model.conv1(x)), ksize=6,stride=3,pad=1) # <- ksize: 6*6のウインドウ。stride: 3ピクセルずつ移動。
    h = F.max_pooling_2d(F.relu(model.conv2(h)), ksize=4,stride=2,pad=1)
    h = F.dropout(F.relu(model.l1(h)), train=train)
    y = model.l2(h)
    if train:
        return F.softmax_cross_entropy(y, t)
    else:
        return F.accuracy(y, t)

学習

学習自体は至って普通の手順で回す感じです。抜粋は以下の通りです。データが少ないのでミニバッチで回す意味があるかどうかはアレですが、、

for epoch in six.moves.range(0, n_epoch):
    # 訓練
    for i in six.moves.range(0, N_train, batchsize_train):
        x_batch = train_data[perm[i:i + batchsize_train]]
        y_batch = train_label[perm[i:i + batchsize_train]]
        optimizer.zero_grads()
        loss = forward(x_batch, y_batch)
        loss.backward()
        optimizer.update()

    # テスト
    sum_acc = 0
    for i in six.moves.range(0, N_test, batchsize_test):
        x = test_data[perm_test[i:i + batchsize_test]]
        y = test_label[perm_test[i:i + batchsize_test]]
        acc = forward(x, y, train=False)
        sum_acc += float(acc.data) * len(y)
    print("test accuracy: {0}".format(sum_acc/N_test))

結果

上のコードでテスト結果のaccuracyは残念ながら0.5417でした。
フィルターサイズやユニット数を変えても精度に大きな変化はありませんでした。

まとめ

CNNのモデルを作って動かしてみるという目的は達成しました。
精度がよろしくなかったのは、データ量が少なすぎて学習できていないことがあると思います。が、明確な被写体がいない風景写真ばかりなので、分類しようがないという大きな問題も孕んでいそうです。そもそもCNN向きのタスクではないのかも、、、

データが少ないにもかかわらず、CPU1個では遅すぎたので、次はGPUで並列でまわしたいかなーっと思ってます。

Hadoop / Spark Conference Japan 2016 に参加してきました

2/8に開催されたHadoop / Spark Conference Japan 2016に参加してきましたのでメモなど残しておきます。
午後からの参加で見たセッションのみなので一部だけになりますが。。
www.eventbrite.com

データドリブン企業における、Hadoop基盤とETL ~niconicoでの実践例~

嶋内 翔(Cloudera)志村 誠(ドワンゴ

従来データは主に経営層やマネージメント層向けにETLの結果を見せるケースが多かった。
ところが、現在では分析をする人やエンジニアなど現場で見たいケースが増えていて、ETLの結果を待たずに直接データに触りたいという要望が増えている。
→ 生データをHadoopに入れて誰でもアクセスできるようにする

hiveのスキーマオンリードの活用
- データの投入時ではなく読み込み時にスキーマを使う機能?特性?
- クエリを実行する側がスキーマを決められる

Keiser Permanenteの事例
- Hadoopに生データを入れて、ユーザ定義領域を挟むことで使いたいデータを取得できるようにしている。

niconicoでのETL事例

  • 単一のユーザIDで複数のサービスの利用履歴を管理している。
  • ユーザごとのサービスをまたいだ行動履歴を時系列で見たい(動画→生放送→生放送@スマホ→ゲーム のような)

 → データを一箇所に集める必要があるのでHadoop基盤の活用

www.slideshare.net
www.slideshare.net

ビッグデータ可視化の性能を徹底検証 ~SparkSQL、Hive on Tez、Hive LLAPを用いた既存RDBデータ処理の特徴~

新郷 美紀(NEC)蒋 逸峰(Hortonworks)

3つのデータストアに対してtableauで可視化する際の性能検証

SparkSQLでは

  • ブロードキャスト変数を使うとジョブ開始前に全ノードに小さなテーブルのようなデータを投げることができるので、うまく使えばシャッフルがなくなる=ステージが減る。
  • テーブルをメモリ上にキャッシュできるが、今回の実験では効果がなかった

メモリ量、データ量、処理内容との兼ね合い?

実行時間の比較

  • ヒットするデータが少ない場合は、hive on tezとLLAPが速い(全体のデータ量ではなくヒットしたデータ量)

hiveのチューニング

  • tableauからの実行時間の検証でクエリは自動生成されるため、クエリのチューニングはできないのでhive側のチューニングに集中した
  • 時間がかかる箇所の特定 → yarn上のアプリケーションマスターの立ち上げに4,5秒かかるなど
  • hiveは正しくデータロードすることが重要 → パーティションとORC形式でのストアがキーになる

LLAPは常時起動しているコンテナを持っていて、コンテナ立ち上げのオーバーヘッドを減らすのと、インメモリなキャッシュを聞かせられるメリットがある。

www.slideshare.net

Spark MLlib Now and Beyond

石川 有(リクルートテクノロジーズ)

Apache Sparkの機械学習パッケージMLlibの現状と今後のロードマップについての発表。
MLlibの中にmllibとmlの2つのパッケージがある。

  • mllibではRDD形式でデータを扱う。mlではDataFrame形式で扱う。
  • mlはML Pipelinesと連携できて、前処理、チューニング、検証などの枠組みが用意されている。
  • 学習モデルの永続化サポートがされるようになった。
    Pythonで学習したモデルを永続化して、Scalaで予測に使ったり、Spark Streamingに使ったりできる。

www.slideshare.net

基幹業務もHadoopで!! ~ローソンにおける店舗発注業務へのHadoop + Hive導入と、 その取り組みについて~

須田 桂伍(フューチャーアーキテクト)

店舗発注業務 = 店員がタブレットで商品棚を確認しながら発注する業務 → 本部センターで一括処理したい。
エンタープライズでは扱うデータ量は少ない割にバリエーションが多い。

  • 常時処理は常時起動しているEMRでupdate insertして、ピーク時のバッチ処理の時にテーブルを洗い替えする
  • 途中で何かあった場合の再処理
    → 1店舗の処理を1ワークとして、その中に必要なSQLを全部書いておく。途中で止まった場合は最初から実行する。

ちなみに、Sqoopを使ってMySQLにインポートする場合、MySQLimportで文字化けが発生する
→ Sqoopのコード中のMySQLUtilの中のISO...と直書きされているところをUTF8と書きなおしてビルドすれば動くらしいです。

www.slideshare.net

Maintainable Cloud Architecture of Hadoop

佐々木 海(Treasure Data)

maintainableなHadoopに必要な条件として、

  • stateless
    何か起こった場合に置き換え可能。状態を持つのは一箇所に集約する。(クラウド上など安心できるところへ)
  • mobility
    場所を移動できる(データとかジョブとか)
  • queueing
    リトライできる

PlazmaDBを使った構成では、statefulなのはPlazmaDBのみに集約する

  • PlazmaDBの内部はPostgreSQLとS3(またはRiak)で構成されている。
  • 変更があった場合、S3(またはRiak)に順次書き込まれて、commitメッセージが来たタイミングでPostgreSQLに入れる。
  • S3(またはRiak)はimmutableで変更されることがないため、整合性が保たれる。

クラスタのバージョンアップ対応

  • クラスタ(CDH, HDP, Apache) のバージョンが変更された場合、worker(APIサーバ)が全クライアントを持っていて、接続時に接続先のクラスタのバージョンに合わせてconfigを切り替えて接続する。
  • CircleCIでクラスタのバージョンも管理していて、バージョンアップした場合には対応するクライアントをS3にアップロードする。workerが実行する際に対応するクライアントをS3からダウンロードして使う。

www.slideshare.net

感想

導入事例の紹介が多かったため、アーキテクチャまわりのチューニングや性能検証について聞けて勉強になりました。個人的には、春頃にSpark2.0が出てくるそうで、速度10倍とフロント側API導入で大きく変わりそうで楽しみです。
午前のセッションだったので見れなかったのですが、、、

www.slideshare.net

Apache Spark MLlibのレコメンドアルゴリズムを使う

1年くらい前にレコメンドロジックを実装するにあたってSpark MLlibのmllibパッケージ内のRDD版を使っていたのですが、
データの整形など不便だったため、その後リリースされたmlパッケージ以下のDataFrame版を使ってみたい。
きっと楽なはずということでサンプル的なものを実装してみました。


RDDとDataFrame

  • RDD(Resilient Distributed Datasets)は、Spark上で分散処理可能なimmutableなデータセット
  • DataFrameは、RDDを構造化したもので、RDBのテーブル的に処理が可能

DataFrameがより抽象化されたレイヤーで扱えるので、使う側からは楽なのとDataFrameのAPIがジョブを最適化してくれるというメリットもあります。

レコメンドアルゴリズム

Spark MLlibに用意されているALSというクラスを使います。
ALS(Alternative Least Squares=交互最小二乗法)とは行列分解の手法の1つ。
通常の協調フィルタリングでは、ユーザ×アイテムの行列の中で類似したユーザやアイテムを見つけようとしますが、
ユーザの特徴を表す行列とアイテムの特徴を表す行列に分解し、それぞれの行列内での類似度から推薦するアイテムを見つけようというアプローチが行列分解です。
すべてのユーザ×アイテムのペアの中で、購入したりコンバージョンしたという履歴のある組み合わせは非常に少ないため、ユーザ×アイテムの行列にはほとんど値が入らない=疎(スパース)な行列となってしまいます。
このような疎な行列では、履歴が少ないため類似するユーザやアイテムを見つけられないという問題があります。
これを解決する方法の1つが行列分解で、行列分解した後の2つの行列は、元の行列に比べるとスパースではなくなっているため、推薦するアイテムが発見しやすくなります。
下の図のように、ユーザ数やアイテム数よりも小さい値kでユーザとアイテムの特徴を表現する行列2つに分解します。
f:id:kzzzz:20160209094316p:plain:w600

元の行列と分解後の行列の誤差が最小となるように、分解後の行列を近似で求めます。
求め方としては、アイテムの特徴行列を初期値ランダムなどで固定して誤差が最小となるユーザの特徴行列を求める。次にユーザの特徴行列を固定してアイテムの特徴行列を求める。これを繰り返し実行して最適な行列を求めるのが、交互最小二乗法=ALSです。

使ったデータ

MovieLensのこちらMovieLens Latest Datasets | GroupLens のデータを使いました。
ユーザ数: 668
映画数: 10325
ユーザがランク付けした映画の総数(ユーザ×映画×レーティング): 105339
レーティングは0から5の0.5刻みです。

実装

RDD版とDataFrame版で実装してみました。

準備
val data = ContextManager.sc.textFile(path).map(line => {
    val words = line.split(",")
    new Rating(words(0).toInt, words(1).toInt, words(2).toDouble)
})

元データがcsvなのでsplitして、ユーザID・アイテムID・レーティングをRatingクラスに入れてます。

val array = Random.shuffle(data.collect().toSeq)
val train = ContextManager.sc.parallelize(array.slice(0, array.length - 500)).map(row => {
    new Rating(row.getAs[Int]("user"), row.getAs[Int]("product"), row.getAs[Double]("rating"))
})
val test = ContextManager.sc.parallelize(array.slice(array.length - 500, array.length)).map(row => {
    new Rating(row.getAs[Int]("user"), row.getAs[Int]("product"), row.getAs[Double]("rating"))
})

訓練データとテストデータに分割したいのですが、ランダムな分割方法がよくわからなかったので、上でRDDにしたのを配列にしてシャッフルして分割するというやり方で、、
一応これで訓練データとテストデータができました。
500件だけテストデータとして使います。
trainとtestはRDD[Rating]型にしています。

RDD版実装

val rank = 10
val numIterations = 10
val model = ALS.train(train, rank, numIterations)

rank は上の図のkに対応するもので、特徴を表す次元数。
numIterationsALSの繰り返し数。どちらも決めで。。
ALS.train でモデル生成。

model.predict(test.map(rating => {
    (rating.user, rating.product)
}))

テストデータのユーザIDrating.userとアイテムIDrating.productでレーティング値を予測する。

結果の一部がこちら↓

+----+-------+------+------------------+
|user|product|rating|        prediction|
+----+-------+------+------------------+
|   1|   1136|   5.0|  3.84121223321302|
|   3|     21|   5.0| 4.545432438241894|
|   5|  81847|   5.0|2.1582961471937283|
|  33|   2997|   5.0|3.4247050236701773|
|  42|   1353|   5.0| 3.890686456361979|
|  66|   2324|   5.0| 3.892941137800877|
|  68|   1683|   5.0|3.5995781206844573|
|  88|     50|   5.0| 5.044828149527467|
|  88|   1079|   5.0| 4.710543650493504|
|  88|   1276|   5.0|4.4993008731591715|
+----+-------+------+------------------+
| 320|    204|   0.5| 4.178416376461781|
| 130|   2763|   1.0|2.4046661964557794|
| 199|   3937|   1.0| 2.671809457491024|
| 224|  34530|   1.0|2.8963020348986666|
| 354|   5213|   1.0|1.6520255250357625|
| 410|   2815|   1.0|1.2346216278871809|
| 410|   3879|   1.0|1.4271989033641381|
| 463|   4306|   1.0|3.7166309995771063|
| 530|   6731|   1.0| 2.786540674684764|
| 569|    196|   1.0| 2.209090681293506|
+----+-------+------+------------------+

ratingが実際にユーザがつけたrating(正解)で、predictionが予測結果。
正解のratingが高いところと低いところを一部取り出してみたものですが、
一部大きく外しているものの、正解のratingに近い予測結果が出ている感じ。

DataFrame版実装

mlパッケージ内の学習器は基本的にfitメソッドで学習して、transformメソッドで分類・予測するという流れになります。

val model = new ALS()
    .setUserCol("user")
    .setItemCol("product")
    .setRank(10)
    .setMaxIter(10)
    .fit(train.toDF)

DataFrame版では、new ALS().fit(train.toDF)でDataFrameに変換したtrainを入力としてモデルを生成します。
途中でパラメータを設定していますが、RDD版との違いとして、setUserCol("user")setItemCol("product")で訓練データのDataFrame中でユーザIDとアイテムIDが格納されているカラム名を指定しています。

val result=model.transform(test.toDF)

transformメソッドで予測した結果もDataFrameでpredictionというカラムに予測結果が入るようになっています。
結果の一部抜粋がこちら

+----+-------+------+----------+
|user|product|rating|prediction|
+----+-------+------+----------+
|   1|   1136|   5.0|  4.201646|
|   3|     21|   5.0| 3.9351418|
|   5|  81847|   5.0| 3.5261602|
|  33|   2997|   5.0| 4.0439353|
|  42|   1353|   5.0| 4.1179585|
|  66|   2324|   5.0|  4.052952|
|  68|   1683|   5.0| 3.2956479|
|  88|     50|   5.0| 4.8985147|
|  88|   1079|   5.0| 4.4943614|
|  88|   1276|   5.0| 4.5683627|
+----+-------+------+----------+
| 320|    204|   0.5| 1.6473958|
| 130|   2763|   1.0| 2.2569048|
| 199|   3937|   1.0| 3.0569887|
| 224|  34530|   1.0| 1.3953632|
| 354|   5213|   1.0|  2.322424|
| 410|   2815|   1.0| 1.8290529|
| 410|   3879|   1.0| 1.7294786|
| 463|   4306|   1.0|  3.564923|
| 530|   6731|   1.0| 2.4670124|
| 569|    196|   1.0| 2.4912937|
+----+-------+------+----------+

こちらも正解のratingが高いところ、低いところを見ると、ほぼ近い予測結果になっている。

評価

テストデータに対する予測精度の指標であるRMSEを計算してみました。

RMSE(Root Mean Squared Error)

予測値と実測値の差の平均の平方根
0に近いほうが良い
参考 https://weather.gc.ca/verification/scores/rmse_e.html

天気予報の気温予測の検証にも使われてるもよう
www.data.jma.go.jp

計算結果

RDD版で、1.0970
DataFrame版では、0.8564

RMSEは相対的な評価指標なのですが、評価対象がいないので、とりあえず絶対に勝てそうなランダムなケースと比較してみよう
ということで、ランダムにレーティングを予測した場合のRMSEは、1.9248

無事、でたらめな予測には勝てましたd(>_< )Good!!


今回使ったデータはもともと綺麗なcsvだったのでRDD版もDataFrame版もあまり違いがない印象ですが、通常はめんどくさいETLがあったり、グリッドサーチやCrossValidationでチューニング・モデル検証が発生するので、この辺を簡単にできるML Pipelinesが使えるDataFrame版のメリットが大きそうです。

Spark Streamingを使ってみる

Spark Streamingのお勉強のために、Twitterからツイートを取得してワードカウントしてみます。

Sparkの追加パッケージ spark-streaming-twitter を使ってデータ取得します。

ツイートを取得して単語の出現回数を数える

設定など

build.sbt に追加

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.11" % "1.6.0",
  "org.apache.spark" % "spark-streaming_2.11" % "1.6.0",
  "org.apache.spark" % "spark-streaming-twitter_2.11" % "1.6.0"
)

Twitterへの接続情報は手抜きしてSystem.setPropertyにて、

System.setProperty("twitter4j.oauth.consumerKey", "~~~")
System.setProperty("twitter4j.oauth.consumerSecret", "~~~")
System.setProperty("twitter4j.oauth.accessToken", "~~~")
System.setProperty("twitter4j.oauth.accessTokenSecret", "~~~")


StreamingContextの作成

val conf = new SparkConf().setMaster("local[4]").setAppName("appName") // ローカルモードなので、
val streamingContext = new StreamingContext(conf, Seconds(1)) // 第二引数の秒数毎にバッチ処理を実行する

処理内容

入力ストリームに対する処理です。
kuromojiのtokenizerでツイートを形態素解析して単語に分割してワードカウントしています。

val stream = TwitterUtils.createStream(streamingContext, None)

stream.flatMap(status => {
   val tokenizer = Tokenizer.builder().mode(Tokenizer.Mode.NORMAL).build()
   val tokens = tokenizer.tokenize(status.getText)

   val list: scala.collection.mutable.ArrayBuffer[(String, Integer)] = new ArrayBuffer[(String, Integer)]()
   for (i <- 0 to tokens.size() - 1) {
     val token = tokens.get(i).getSurfaceForm
     if (token.length > 1) {
       list += token -> 1
     }
   }
   list
 }).reduceByKeyAndWindow(_ + _, Seconds(20)) // 第二引数はウインドウの長さ=直前の20秒分を処理する
   .map { case (word, count) => (count, word) }
   .transform(_.sortByKey(false)) // 出現数でソート
実行

以下で処理を開始して動き続ける。
(本当は適当なタイミングで StreamingContext.stop() で終了させる)

streamingContext.start()
streamingContext.awaitTermination()

とあるタイミングでの実行結果がこちら。(単語と出現回数を出現回数順で出力したもの)

https	255
://	252
co	250
RT	244
the	49
you	42
to	40
de	33
for	31
in	28

単純な単語の出現回数なので、どんなツイートにも含まれそうな単語が上位に来てしまう。

tfidfで見てみる

単純な出現回数では、一般的過ぎる単語が上位に来るのでtfidfで見たい。
tfidf: 単語のスコア付けの方法で、特定の文書にのみ頻繁に出現する単語に高いスコアが付ける。(https://www.google.co.jp/?gws_rd=ssl#q=tfidf)

変更点

各単語が何個のツイートに出現したかを数える処理。

val dfResult = stream.flatMap(status => {
   val id = status.getId
   val tokenizer = Tokenizer.builder().mode(Tokenizer.Mode.NORMAL).build()
   val tokens = tokenizer.tokenize(status.getText)

   val set: scala.collection.mutable.Set[(String, Integer)] = scala.collection.mutable.Set.empty
   for (i <- 0 to tokens.size() - 1) {
     val token = tokens.get(i).getSurfaceForm
     if (token.length > 2) {
       set += (token -> 1)
     }
   }
   set
 }).reduceByKeyAndWindow(_ + _, Seconds(20))

ワードカウントの結果 tfResultと合わせて、単語ごとの出現回数と出現するツイート数からtfidfを求めて出力する。

tfResult.join(dfResult).map(w => {
   val word = w._1
   val tf = w._2._1
   val df = w._2._1
   val value = tf * Math.log(N / df) // Nは集計対象の全ツイート数

   (value, word)
 }).transform(_.sortByKey(false))
結果

とあるタイミングでの出力結果

the : 6.591673732008658
The : 6.437751649736401
day : 4.605170185988092
pessoas : 4.605170185988092
How : 4.605170185988092
tweet : 4.605170185988092
for : 4.605170185988092
birine : 4.605170185988092
وهو : 4.605170185988092
who : 4.605170185988092
mais : 4.605170185988092

全然パッとしない結果なので、集計期間を広げたり、stopword設定するとかで、もっと意味のありそうな単語が出てくるかもしれないです。