Spark Streamingを使ってみる

Spark Streamingのお勉強のために、Twitterからツイートを取得してワードカウントしてみます。

Sparkの追加パッケージ spark-streaming-twitter を使ってデータ取得します。

ツイートを取得して単語の出現回数を数える

設定など

build.sbt に追加

libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.11" % "1.6.0",
  "org.apache.spark" % "spark-streaming_2.11" % "1.6.0",
  "org.apache.spark" % "spark-streaming-twitter_2.11" % "1.6.0"
)

Twitterへの接続情報は手抜きしてSystem.setPropertyにて、

System.setProperty("twitter4j.oauth.consumerKey", "~~~")
System.setProperty("twitter4j.oauth.consumerSecret", "~~~")
System.setProperty("twitter4j.oauth.accessToken", "~~~")
System.setProperty("twitter4j.oauth.accessTokenSecret", "~~~")


StreamingContextの作成

val conf = new SparkConf().setMaster("local[4]").setAppName("appName") // ローカルモードなので、
val streamingContext = new StreamingContext(conf, Seconds(1)) // 第二引数の秒数毎にバッチ処理を実行する

処理内容

入力ストリームに対する処理です。
kuromojiのtokenizerでツイートを形態素解析して単語に分割してワードカウントしています。

val stream = TwitterUtils.createStream(streamingContext, None)

stream.flatMap(status => {
   val tokenizer = Tokenizer.builder().mode(Tokenizer.Mode.NORMAL).build()
   val tokens = tokenizer.tokenize(status.getText)

   val list: scala.collection.mutable.ArrayBuffer[(String, Integer)] = new ArrayBuffer[(String, Integer)]()
   for (i <- 0 to tokens.size() - 1) {
     val token = tokens.get(i).getSurfaceForm
     if (token.length > 1) {
       list += token -> 1
     }
   }
   list
 }).reduceByKeyAndWindow(_ + _, Seconds(20)) // 第二引数はウインドウの長さ=直前の20秒分を処理する
   .map { case (word, count) => (count, word) }
   .transform(_.sortByKey(false)) // 出現数でソート
実行

以下で処理を開始して動き続ける。
(本当は適当なタイミングで StreamingContext.stop() で終了させる)

streamingContext.start()
streamingContext.awaitTermination()

とあるタイミングでの実行結果がこちら。(単語と出現回数を出現回数順で出力したもの)

https	255
://	252
co	250
RT	244
the	49
you	42
to	40
de	33
for	31
in	28

単純な単語の出現回数なので、どんなツイートにも含まれそうな単語が上位に来てしまう。

tfidfで見てみる

単純な出現回数では、一般的過ぎる単語が上位に来るのでtfidfで見たい。
tfidf: 単語のスコア付けの方法で、特定の文書にのみ頻繁に出現する単語に高いスコアが付ける。(https://www.google.co.jp/?gws_rd=ssl#q=tfidf)

変更点

各単語が何個のツイートに出現したかを数える処理。

val dfResult = stream.flatMap(status => {
   val id = status.getId
   val tokenizer = Tokenizer.builder().mode(Tokenizer.Mode.NORMAL).build()
   val tokens = tokenizer.tokenize(status.getText)

   val set: scala.collection.mutable.Set[(String, Integer)] = scala.collection.mutable.Set.empty
   for (i <- 0 to tokens.size() - 1) {
     val token = tokens.get(i).getSurfaceForm
     if (token.length > 2) {
       set += (token -> 1)
     }
   }
   set
 }).reduceByKeyAndWindow(_ + _, Seconds(20))

ワードカウントの結果 tfResultと合わせて、単語ごとの出現回数と出現するツイート数からtfidfを求めて出力する。

tfResult.join(dfResult).map(w => {
   val word = w._1
   val tf = w._2._1
   val df = w._2._1
   val value = tf * Math.log(N / df) // Nは集計対象の全ツイート数

   (value, word)
 }).transform(_.sortByKey(false))
結果

とあるタイミングでの出力結果

the : 6.591673732008658
The : 6.437751649736401
day : 4.605170185988092
pessoas : 4.605170185988092
How : 4.605170185988092
tweet : 4.605170185988092
for : 4.605170185988092
birine : 4.605170185988092
وهو : 4.605170185988092
who : 4.605170185988092
mais : 4.605170185988092

全然パッとしない結果なので、集計期間を広げたり、stopword設定するとかで、もっと意味のありそうな単語が出てくるかもしれないです。