This topology has been written as my final project for “Real-Time Analytics with Apache Storm” course at Udacity and doesn’t have much practical sense as the maximum throughput of Bitcoin network is only about 7 transactions/second. It’s unlikely to be changed soon, at least until the number of transaction in the network approaches this constraint. Thus, it doesn’t make a lot of sense to use Apache Storm for processing. You can analyse Bitcoin transactions even with a single script (application) without a need to distribute computations unless you perform some really heavy and CPU consuming calculations. Nonetheless, it can be a good starting point for your own spout or WebSocket client in Java.

There are a couple of Java libraries available for this purpose. org.eclipse.jetty.websocket seems more mature, powerful and better maintaining, but I chose javax.websocket-client-api as sufficient for me


and tyrus-client as a reference implementation of the API.


In order to turn our POJO into WebSocket client endpoint, we need to declare it with the corresponding annotation:

public class BlockchainInfoClient {

The connection is established right in the constructor.

@OnOpen annotation makes a method to be invoked right after a connection is established. In our case, it sets a few session parameters and subscribes for unconfirmed transactions.

public void onOpen(final Session session) throws IOException {

    JSONObject subscriptionMessage = new JSONObject();
    subscriptionMessage.put("op", "unconfirmed_sub");

    userSession = session;"Subscribed for unconfirmed transactions from");

We will also need a handler for the incoming messages, which must be decorated with @OnMessage:

public void onMessage(final String message, boolean isLastPartOfMessage) {
    if (isLastPartOfMessage) {
        try {
            if (messageHandler != null) {
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        } finally {
            messageBuffer = new StringBuilder();
    }"Message received from");

To access messages from the spout define MessageHandler interface and a method to inject an implementation of this interface to our class:

public void addMessageHandler(final MessageHandler msgHandler) {
    messageHandler = msgHandler;

public static interface MessageHandler {
    public void handleMessage(String message);

The spout extends BaseRichSpout abstract class

public class TransactionSpout extends BaseRichSpout {

and override some of it’s methods.

open is called once a spout is initialized within a worker. We need it to store a collector, initialize a queue of incoming messages and instantiate a previously created client with a handler putting messages to our queue.

public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
    collector = spoutOutputCollector;
    queue = new LinkedBlockingQueue<String>(1000);

    client = new BlockchainInfoClient();
    client.addMessageHandler(new BlockchainInfoClient.MessageHandler() {
        public void handleMessage(String message) {

Then we need to override the main method of each spout – nextTuple, which will either emit new tuple into a topology or simply return if there are no new transactions in the queue.

public void nextTuple() {
    String ret = queue.poll();

    if (ret==null)

    collector.emit(new Values(ret));

And at last, describe the output schema in declareOutputFields

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("transaction"));

The full source code as well as usage example can be found in GitHub repository.

