Skip to content

Spark Streaming

Michael Nitschinger edited this page May 6, 2015 · 2 revisions

This section is incomplete, since dp2 we can also create streams


We are working on a streaming source and better auto mapping, but you can very easily feed documents out of a Dstream into couchbase.

Writing DStreams into Couchbase

Make sure to import com.couchbase.spark.streaming._ so you have the saveToCouchbase method on your streaming context path. The following example grabs data out of a Twitter stream, groups the tags together and then converts it into a JsonArrayDocument and stores it in the configured bucket:

    val stream = TwitterUtils
      // create the stream
      .createStream(ssc, None)
      // extract hashtags from tweets
      .flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
      // add 1 to each tag to prepare it for reduce
      .map((_, 1))
      // reduce by a 1 second window and emit a new rdd in one second as well
      .reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(1), Seconds(1))
      // flip the data for sorting
      .map {case (topic, count) => (count, topic)}
      // sort descending
      .transform(_.sortByKey(false))
      // filter out not so popular tags (for example set to 3)
      .filter(_._1 >= minTagCount)
      // map the count and topic onto a single key for easy grouping
      .map(countAndTopic => ("aggr", countAndTopic))
      // group all data into a single RDD item so we can store it as a document
      .groupByKey()
      // map from the list of tuples into a JsonArrayDocument. use a custom document id per second
      .map(data => {
        val id = "tags::" + System.currentTimeMillis() / 1000
        val content = JsonArray.create()
        data._2.foreach(tuple => content.add(JsonObject.create().put("tag", tuple._2).put("cnt", tuple._1)))
        JsonArrayDocument.create(id, content)
      })
      // store the document in couchbase
      .saveToCouchbase()
Clone this wiki locally