This topology has been written as my final project for “Real-Time Analytics with Apache Storm” course at Udacity and doesn’t have much practical sense as the maximum throughput of Bitcoin network is only about 7 transactions/second. It’s unlikely to be changed soon, at least until the number of transaction in the network approaches this constraint. Thus, it doesn’t make a lot of sense to use Apache Storm for processing. You can analyse Bitcoin transactions even with a single script (application) without a need to distribute computations unless you perform some really heavy and CPU consuming calculations. Nonetheless, it can be a good starting point for your own spout or WebSocket client in Java.

There are a couple of Java libraries available for this purpose. org.eclipse.jetty.websocket seems more mature, powerful and better maintaining, but I chose javax.websocket-client-api as sufficient for me

<dependency>
    <groupId>javax.websocket</groupId>
    <artifactId>javax.websocket-client-api</artifactId>
    <version>1.1</version>
</dependency>

and tyrus-client as a reference implementation of the API.

<dependency>
    <groupId>org.glassfish.tyrus</groupId>
    <artifactId>tyrus-client</artifactId>
    <version>1.10</version>
</dependency>
<dependency>
    <groupId>org.glassfish.tyrus</groupId>
    <artifactId>tyrus-container-grizzly-client</artifactId>
    <version>1.10</version>
</dependency>

In order to turn our POJO into WebSocket client endpoint, we need to declare it with the corresponding annotation:

@ClientEndpoint
public class BlockchainInfoClient {
    ...
}

The connection is established right in the constructor.

@OnOpen annotation makes a method to be invoked right after a connection is established. In our case, it sets a few session parameters and subscribes for unconfirmed transactions.

@OnOpen
public void onOpen(final Session session) throws IOException {
    session.setMaxIdleTimeout(0);
    session.setMaxBinaryMessageBufferSize(16384);
    session.setMaxTextMessageBufferSize(16384);

    JSONObject subscriptionMessage = new JSONObject();
    subscriptionMessage.put("op", "unconfirmed_sub");
    session.getBasicRemote().sendText(subscriptionMessage.toString());

    LOG.info(subscriptionMessage.toString());

    userSession = session;

    LOG.info("Subscribed for unconfirmed transactions from Blockchain.info.");
}

We will also need a handler for the incoming messages, which must be decorated with @OnMessage:

@OnMessage
public void onMessage(final String message, boolean isLastPartOfMessage) {
    messageBuffer.append(message);
    if (isLastPartOfMessage) {
        try {
            if (messageHandler != null) {
                messageHandler.handleMessage(messageBuffer.toString());
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        } finally {
            messageBuffer = new StringBuilder();
        }
    }

    LOG.info("Message received from Blockchain.info");
}

To access messages from the spout define MessageHandler interface and a method to inject an implementation of this interface to our class:

public void addMessageHandler(final MessageHandler msgHandler) {
    messageHandler = msgHandler;
}

public static interface MessageHandler {
    public void handleMessage(String message);
}

The spout extends BaseRichSpout abstract class

public class TransactionSpout extends BaseRichSpout {
}

and override some of it’s methods.

open is called once a spout is initialized within a worker. We need it to store a collector, initialize a queue of incoming messages and instantiate a previously created client with a handler putting messages to our queue.

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
    collector = spoutOutputCollector;
    queue = new LinkedBlockingQueue<String>(1000);

    client = new BlockchainInfoClient();
    client.addMessageHandler(new BlockchainInfoClient.MessageHandler() {
        @Override
        public void handleMessage(String message) {
            queue.offer(message);
        }
    });
}

Then we need to override the main method of each spout – nextTuple, which will either emit new tuple into a topology or simply return if there are no new transactions in the queue.

@Override
public void nextTuple() {
    String ret = queue.poll();

    if (ret==null)
    {
        Utils.sleep(50);
        return;
    }

    collector.emit(new Values(ret));
}

And at last, describe the output schema in declareOutputFields

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("transaction"));
}

The full source code as well as usage example can be found in GitHub repository.

Download competition data from Kaggle with BeautifulSoup and Mechanize

There might be several reasons why you need to get files from Kaggle via script. In my case I was playing with Theano and Lasagne and wan...… Continue reading

Running Lasagne via Jupyter on AWS GPU instance

Published on November 17, 2015