The Journey of learning Apache Flink (3) — Streaming Unconfirmed Bitcoin Transactions
Chengzhi Zhao

We have covered 1) Initial EMR and Project Setup 2) Simple Wikipedia Streaming so far. They are all coming from out of the box, if we want to build streaming applications from scratch, please keep reading.

We will build a streaming application with some popular topics that help you better understand Flink. But we haven’t cover too much where is the data from. Flink provides predefined sources that make things easier, you can find a detail list here: and That should cover most of the popular messaging system including Kafka and Kinesis.

Also Read: Replace By Fee and Unconfirmed Transactions in Bitcoin

There are multiple public real-time WebSocket that we can use, for example, provides Long-Polling RSVP Stream that you can hook into and build an application with live data.

Today we will use what provides and build a streaming application using its WebSocket API. More detail can be found

To build your own source, we can extend RichSourceFunction and override run and cancel function.


libraryDependencies ++= Seq(
"org.apache.flink" % "flink-core" % "1.4.0",
"org.apache.flink" % "flink-scala_2.11" % "1.4.0",
"org.apache.flink" % "flink-streaming-scala_2.11" % "1.4.0",
"org.apache.flink" % "flink-shaded-hadoop2" % "1.4.0",
"org.apache.flink" % "flink-clients_2.11" % "1.4.0",
"org.asynchttpclient" % "async-http-client" % "2.4.9"
  • We are using flink 1.4
  • asynchttpclient is for WebSocket


import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.asynchttpclient.Dsl.asyncHttpClient
import{WebSocket, WebSocketListener, WebSocketUpgradeHandler}

class BlockChainStreamingSource(url: String) extends RichSourceFunction[String] {
var running = true
lazy val
c= asyncHttpClient()

override def run(ctx: SourceFunction.SourceContext[String]): Unit = {
while (running) {
c.prepareGet(url).execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener(
new WebSocketListener() {
override def onOpen(websocket: WebSocket): Unit = {

override def onClose(websocket: WebSocket, code: Int, reason: String): Unit = {}

override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int): Unit = {

override def onError(t: Throwable): Unit = {}
running = false

override def cancel(): Unit = {
running = false

  • Based on the documentation, we need to subscribe to notifications for all new bitcoin transactions by send {“op”:”unconfirmed_sub”}
  • ctx.collect(payload) will send data to the downstream events by event.
  • If streaming application been canceled, the running state will be updated to false and break the loop.


import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object BlockchainData {
def main(args: Array[String]) : Unit = {
val url = "wss://"
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.addSource(new BlockChainStreamingSource(url))
val result = text
.map(_ => ("Count", 1))
env.execute("Streaming BlockChain")

  • StreamExecutionEnvironment.getExecutionEnvironment defines that we are going to use DataStream API
  • We count the number of uncommitted transactions with a tumbling window every 5 seconds.

You should see your result: