Spark Streamingを使ってみる
Spark Streamingのお勉強のために、Twitterからツイートを取得してワードカウントしてみます。
Sparkの追加パッケージ spark-streaming-twitter を使ってデータ取得します。
ツイートを取得して単語の出現回数を数える
設定など
build.sbt に追加
libraryDependencies ++= Seq( "org.apache.spark" % "spark-core_2.11" % "1.6.0", "org.apache.spark" % "spark-streaming_2.11" % "1.6.0", "org.apache.spark" % "spark-streaming-twitter_2.11" % "1.6.0" )
Twitterへの接続情報は手抜きしてSystem.setPropertyにて、
System.setProperty("twitter4j.oauth.consumerKey", "~~~") System.setProperty("twitter4j.oauth.consumerSecret", "~~~") System.setProperty("twitter4j.oauth.accessToken", "~~~") System.setProperty("twitter4j.oauth.accessTokenSecret", "~~~")
StreamingContextの作成
val conf = new SparkConf().setMaster("local[4]").setAppName("appName") // ローカルモードなので、 val streamingContext = new StreamingContext(conf, Seconds(1)) // 第二引数の秒数毎にバッチ処理を実行する
処理内容
入力ストリームに対する処理です。
kuromojiのtokenizerでツイートを形態素解析して単語に分割してワードカウントしています。
val stream = TwitterUtils.createStream(streamingContext, None) stream.flatMap(status => { val tokenizer = Tokenizer.builder().mode(Tokenizer.Mode.NORMAL).build() val tokens = tokenizer.tokenize(status.getText) val list: scala.collection.mutable.ArrayBuffer[(String, Integer)] = new ArrayBuffer[(String, Integer)]() for (i <- 0 to tokens.size() - 1) { val token = tokens.get(i).getSurfaceForm if (token.length > 1) { list += token -> 1 } } list }).reduceByKeyAndWindow(_ + _, Seconds(20)) // 第二引数はウインドウの長さ=直前の20秒分を処理する .map { case (word, count) => (count, word) } .transform(_.sortByKey(false)) // 出現数でソート
実行
以下で処理を開始して動き続ける。
(本当は適当なタイミングで StreamingContext.stop() で終了させる)
streamingContext.start() streamingContext.awaitTermination()
とあるタイミングでの実行結果がこちら。(単語と出現回数を出現回数順で出力したもの)
https 255 :// 252 co 250 RT 244 the 49 you 42 to 40 de 33 for 31 in 28
単純な単語の出現回数なので、どんなツイートにも含まれそうな単語が上位に来てしまう。
tfidfで見てみる
単純な出現回数では、一般的過ぎる単語が上位に来るのでtfidfで見たい。
tfidf: 単語のスコア付けの方法で、特定の文書にのみ頻繁に出現する単語に高いスコアが付ける。(https://www.google.co.jp/?gws_rd=ssl#q=tfidf)
変更点
各単語が何個のツイートに出現したかを数える処理。
val dfResult = stream.flatMap(status => { val id = status.getId val tokenizer = Tokenizer.builder().mode(Tokenizer.Mode.NORMAL).build() val tokens = tokenizer.tokenize(status.getText) val set: scala.collection.mutable.Set[(String, Integer)] = scala.collection.mutable.Set.empty for (i <- 0 to tokens.size() - 1) { val token = tokens.get(i).getSurfaceForm if (token.length > 2) { set += (token -> 1) } } set }).reduceByKeyAndWindow(_ + _, Seconds(20))
ワードカウントの結果 tfResultと合わせて、単語ごとの出現回数と出現するツイート数からtfidfを求めて出力する。
tfResult.join(dfResult).map(w => { val word = w._1 val tf = w._2._1 val df = w._2._1 val value = tf * Math.log(N / df) // Nは集計対象の全ツイート数 (value, word) }).transform(_.sortByKey(false))
結果
とあるタイミングでの出力結果
the : 6.591673732008658 The : 6.437751649736401 day : 4.605170185988092 pessoas : 4.605170185988092 How : 4.605170185988092 tweet : 4.605170185988092 for : 4.605170185988092 birine : 4.605170185988092 وهو : 4.605170185988092 who : 4.605170185988092 mais : 4.605170185988092
全然パッとしない結果なので、集計期間を広げたり、stopword設定するとかで、もっと意味のありそうな単語が出てくるかもしれないです。