-
Notifications
You must be signed in to change notification settings - Fork 43
Spark Streaming
Michael Nitschinger edited this page May 6, 2015
·
2 revisions
This section is incomplete, since dp2 we can also create streams
We are working on a streaming source and better auto mapping, but you can very easily feed documents out of a Dstream
into couchbase.
Make sure to import com.couchbase.spark.streaming._
so you have the saveToCouchbase
method on your streaming context path. The following example grabs data out of a Twitter stream, groups the tags together and then converts it into a JsonArrayDocument
and stores it in the configured bucket:
val stream = TwitterUtils
// create the stream
.createStream(ssc, None)
// extract hashtags from tweets
.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
// add 1 to each tag to prepare it for reduce
.map((_, 1))
// reduce by a 1 second window and emit a new rdd in one second as well
.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(1), Seconds(1))
// flip the data for sorting
.map {case (topic, count) => (count, topic)}
// sort descending
.transform(_.sortByKey(false))
// filter out not so popular tags (for example set to 3)
.filter(_._1 >= minTagCount)
// map the count and topic onto a single key for easy grouping
.map(countAndTopic => ("aggr", countAndTopic))
// group all data into a single RDD item so we can store it as a document
.groupByKey()
// map from the list of tuples into a JsonArrayDocument. use a custom document id per second
.map(data => {
val id = "tags::" + System.currentTimeMillis() / 1000
val content = JsonArray.create()
data._2.foreach(tuple => content.add(JsonObject.create().put("tag", tuple._2).put("cnt", tuple._1)))
JsonArrayDocument.create(id, content)
})
// store the document in couchbase
.saveToCouchbase()