The Journey of learning Apache Flink (3) — Streaming Unconfirmed Bitcoin Transactions

https://medium.com/coinmonks/the-journey-of-learning-apache-flink-3-streaming-unconfirmed-bitcoin-transactions-f4d636e77de9?source=rss----721b17443fd5---4
Chengzhi Zhao

We have covered 1) Initial EMR and Project Setup 2) Simple Wikipedia Streaming so far. They are all coming from out of the box, if we want to build streaming applications from scratch, please keep reading.

We will build a streaming application with some popular topics that help you better understand Flink. But we haven’t cover too much where is the data from. Flink provides predefined sources that make things easier, you can find a detail list here: https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/#bundled-connectors and https://ci.apache.org/projects/flink/flink-docs-master/dev/connectors/#connectors-in-apache-bahir. That should cover most of the popular messaging system including Kafka and Kinesis.

Also Read: Replace By Fee and Unconfirmed Transactions in Bitcoin

There are multiple public real-time WebSocket that we can use, for example, Meetup.com provides Long-Polling RSVP Stream that you can hook into and build an application with live data.

Today we will use what blockchain.com provides and build a streaming application using its WebSocket API. More detail can be found https://www.blockchain.com/en/api/api_websocket

To build your own source, we can extend RichSourceFunction and override run and cancel function.

build.sbt

libraryDependencies ++= Seq(
"org.apache.flink" % "flink-core" % "1.4.0",
"org.apache.flink" % "flink-scala_2.11" % "1.4.0",
"org.apache.flink" % "flink-streaming-scala_2.11" % "1.4.0",
"org.apache.flink" % "flink-shaded-hadoop2" % "1.4.0",
"org.apache.flink" % "flink-clients_2.11" % "1.4.0",
"org.asynchttpclient" % "async-http-client" % "2.4.9"
)
  • We are using flink 1.4
  • asynchttpclient is for WebSocket

BlockChainStreamingSource.scala

import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.asynchttpclient.Dsl.asyncHttpClient
import org.asynchttpclient.ws.{WebSocket, WebSocketListener, WebSocketUpgradeHandler}

class BlockChainStreamingSource(url: String) extends RichSourceFunction[String] {
var running = true
lazy val
c= asyncHttpClient()

override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
while (running) {
c.prepareGet(url).execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(
new WebSocketListener() {
override def onOpen(websocket: WebSocket): Unit = {
websocket.sendTextFrame("{"op":"unconfirmed_sub"}")
}

override def onClose(websocket: WebSocket, code: Int, reason: String): Unit = {}

override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int): Unit = {
ctx.collect(payload)
}

override def onError(t: Throwable): Unit = {}
}).build()).get()
}
running = false
}

override def cancel(): Unit = {
running = false
}
}

  • Based on the documentation, we need to subscribe to notifications for all new bitcoin transactions by send {“op”:”unconfirmed_sub”}
  • ctx.collect(payload) will send data to the downstream events by event.
  • If streaming application been canceled, the running state will be updated to false and break the loop.

BlockchainData.scala

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object BlockchainData {
def main(args: Array[String]) : Unit = {
val url = "wss://ws.blockchain.info/inv"
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.addSource(new BlockChainStreamingSource(url))
val result = text
.map(_ => ("Count", 1))
.keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1)
result.print()
env.execute("Streaming BlockChain")
}
}

  • StreamExecutionEnvironment.getExecutionEnvironment defines that we are going to use DataStream API
  • We count the number of uncommitted transactions with a tumbling window every 5 seconds.

You should see your result: