Sebastien Dionne

  • Increase font size
  • Default font size
  • Decrease font size

Grizzly Migration Guide Part 1

E-mail Print PDF

This article will focus on creating a demo application build on the framework Grizzly.  I wanted to go in more details that the hello-world's applications.  It's not easy at first to use a new framework and even more complicated when the framework is actively in development or lack of samples and documentation.

 

This guide will follow the steps how to migrate a real world application to Grizzly framework.  I choose to do a "real-time stock quote" application.  I'll only focus on the server part, the client will be left for further article.

The principles :

 

To provide the service of quotes on stocks, the server will have a connection with a third party that provide this service. The server will listen for requests from the clients, and give back a response in real-time.  The client will ask for quotes or a quote feed (will receive all the updates on the stock).  The clients can register to more than one quote feed.  The server will handle around 5000 stock's feeds in real-time.  When the server lost the connection with the third party, the server will reconnect and resubscribe the quote feeds that was previous register.  A client can unsubscribe to a quote feed anytime.

 

I think that covert most of the cases that can happen in real world.

 

This part will focus on creating the first implementation of the demo using plain NIO.  To simplify the demo, I won't use externals libraries.  I will use the JDK 6 for this demo, if you need to compile it on a previous JDK, you will have to adapt the code, and if you plan to do that, you could use "backport " libraries to replace the concurrent API.

 

The main class is QuoteManager.  This class contain the logic behind the feed's subscription and the client's requests.  Here the most important methods in QuoteManager.

protected void init() {

    quoteSubscriptionCache = new ConcurrentHashMap<String, IResponseHandler>();
    clientHandlerByQuoteSubcriptionCache = new ConcurrentHashMap<String, List<ClientConnectionHandler>>();
    clientSubcriptionListCache = new ConcurrentHashMap<ClientConnectionHandler, ConcurrentSkipListSet<String>>();

    f_tg = new ThreadGroup("Connection IN/OUT");

    // 3th party connection
    thirdConnection = new ThirdPartyConnectionHandler();
    thirdConnection.setQuoteManager(this);

    new Thread(f_tg, thirdConnection, "ThirdPartyConnectionHandler").start();

    // incoming connection
    incomingConnectionListener = new SocketConnectionListener();
    incomingConnectionListener.setQuoteManager(this);
    incomingConnectionListener.setPort(INCOMING_PORT);

    // start the listening service
    new Thread(f_tg, incomingConnectionListener, "Incoming Connection").start();

}

In this implementation we will create one Thread by client connection and blocking SocketChannel for communicating with the client. It's not the best way to do that, but I will address that in the "Grizzly Migration Guide Part 2 ".

 

/**
 * processing
 *
 * - Open socket
 * - a new Thread (ClientConnectionHandler)
 * - start the new thread
 */
public void run() {

    init();

    while (!isShutdown()) {
        try {
            SocketChannel socketChannel = f_serverSocket.accept();

            // create a new clientConnectionhandler
            ClientConnectionHandler client = new ClientConnectionHandler(getQuoteManager());
            client.setSocketChannel(socketChannel);

            System.out.println("new client connection established");

            Thread t = new Thread(f_threadGroup, client, "Client");
            t.start();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

 

When a query will be return by the function : getQuery(), the QuoteManager will process it.  Now take a look how we parse the requests send by the client.

 

/**
* find the query formatted : xxx|symbol[eoq]
*
* the buffer could contain more than one query.
*
* ex : msg1 = quote|aaa[eoq]
*    : msg2 = feed|bbb[eoq]_zzzzz
*
* The first query will be return and the rest will be kept in the buffer
* for the next pass.
*
* @param buf buf
* @return the query found
* @throws IOException exception
*/
public String getQuery(ByteBuffer buf) throws IOException {

    while (f_socketChannel.isConnected()) {

        // check if there is already a request in the buffer
        String query = parseQuery();
        if (query != null) {
            return query;
        }

        // Clear the buffer and read bytes from socket
        buf.clear();

        int numBytesRead = f_socketChannel.read(buf);

        if (numBytesRead == -1) {
            // No more bytes can be read from the channel
            close();
        } else {
            // To read the bytes, flip the buffer
            buf.flip();

            if (buf.hasRemaining()) {

                //System.out.println("Remaining=" + buf.remaining());

                // on remplit le BB cumulatifBB avec le BB buf
                f_cumulatifBB = getBB(f_cumulatifBB, buf);

                // parseQuery
                return parseQuery();

            }
        }
    }

    return null;
}

In this function we do two criticals things :

read from the socketChannel and check if there is a valid query in the buffer

copy the remaining data in the buffer to a cumulative buffer from the next time.

 

the line :int numBytesRead = f_socketChannel.read(buf);  is the blocking part.  The function will block until data is receive on the socketChannel.

 

The next step is parse the buffer and find a valid query.  In this application I didn't use a fix length message, it's really dynamic.  The query finish when the pattern "[eoq]" is found.

 

/**
* Parse the buffer and look for a valid query
* @return a Query
* @throws IOException exception
*/
protected String parseQuery() throws IOException {

    String query = null;

    if (f_cumulatifBB.hasRemaining()) {
        // On lit le buffer cumulatif pour voir si on trouverait la EOQ
        ByteBuffer tmp = f_cumulatifBB.duplicate();
        tmp.flip();

        // decode the buffer
        String msg = f_asciiDecoder.decode(tmp).toString();

        //System.out.println("msg=" + msg);

        int index = msg.indexOf("[eoq]");
        if (index > -1) {

            query = msg.substring(0, index);
            //System.out.println("Query = " + query);

            // We need to kept what is after the EOQ
            f_cumulatifBB.clear();
            f_cumulatifBB.put(msg.substring(index + "[eoq]".length()).getBytes());
        } else {
            //System.out.println("no EOQ in this iteration");
        }
    }

    return query;
}

Because the query is not fix length, we have to check each time if "[eoq]" is found.  If the query is found, we extract it from the buffer and keep the remaining data to another buffer, because it's possible to receive multiple request in the same buffer.  Like this :

feed|aaa[eoq]feeb|b

In this pass we will extract the query "feed|aaa[eoq]"  and keep the rest into the buffer : "feed|b"  and the next time we receive data we will have "feed|bbb[eoq]" into the buffer.

 

Processing the command

 

The query parsed by the ClientConnectionHandler will be send to the QuoteManager.

 

Like I said there are only 3 commands availables, the each command has is ICommand implementation.  The processing of the query is done by his ICommand class.  The QuoteManager obtain the ICommand by getCommand().

/**
* Process the request from the client.
*
* @param clientHandler the connection from the client
* @param query the query
*/
public void processQuery(ClientConnectionHandler clientHandler, String query) {

    // extract the command
    int index = query.indexOf(ARG_SEP);
    String commandName = query.substring(0, index);

    ICommand command = getCommand(commandName);

    if (command == null) {
        System.out.println("Commande non supportee : commandName = [" + commandName + "]");
        return;
    }

    // send the query
    command.process(query, clientHandler);

}

/**
* Return a class that will handle the command
* requested by the client
*
* The supported commands are : quit, quote, feed
*
* @param commandName name of the command
* @return the class that will handle the command
*/
public ICommand getCommand(String commandName) {

    // we could use Spring to load the config, but we are in a demo :)

    if (commandName.equalsIgnoreCase("quit")) {
        return new QuitCommand(this);
    } else if (commandName.equalsIgnoreCase("quote")) {
        return new QuoteCommand(this);
    } else if (commandName.equalsIgnoreCase("feed")) {
        return new FeedCommand(this);
    }

    return null;
}

At this point, the client request was receive by server and in process.  All the remaining work will be done by the ThirdPartyConnectionHandler.  In this demo, I didn't provide a actual stock quote service, but instead I created a simulator of quotes (bid/ask).  I could have done a simulator server and do a connection with this server, but I prefer to keep it simple.  So I created a random quotes in the ICommandRequest.  Take a look of the next Image.

 

 

Because we ask for a feed request, it's a FeedCommandRequest that will be use to handle the quotes (bid/ask).  The ThirdPartyConnection will retrieve the quotes (here it's a simulator, but it could be in a cache) and send it back by the IResponseHandler receive by the ICommand.

/**
 * Send the request to the 3th party
 *
 * @param quoteSubscription the quote requested
 * @param responseHandler handler that will received the response
 * @return CommandRequest request
 * @throws Exception exception
 */
public ICommandRequest sendFeedRequest(String quoteSubscription, IResponseHandler responseHandler) throws Exception {

    FeedCommandRequest request = new FeedCommandRequest();
    request.setQuoteSubscription(quoteSubscription);
    request.setResponseHandler(responseHandler);

    responseHandler.setCommandRequest(request);

    // we send the query to the third party and we receive notification by the
    // responseHandler .. but here the update are generated in the CommandRequest class in a thread
    // not efficient.. but it's a DEMO :)  and because it's a QUOTE, we send
    // the update only to this client
    request.startGenerateQuote();

    return request;
}

 

Like I said the Feed request is the more complex, so when the sendToClient is called, it's not only send the response to one client, but all the clients subscribes to the stock.  In the FeedResponseHandler, the client's list for the quote subscription is retrieve and browse.  The quote (bid/ask) is send to all clients in that list.

 

public void sendToClient(StringBuffer sb) {

    ByteBuffer writeBuffer = ByteBuffer.allocate(sb.toString().getBytes().length);

    String quoteSubcription = getCommand().getSubscription();

    List<ClientConnectionHandler> list = getCommand().getQuoteManager().getClientHandlerByQuoteSubcription(quoteSubcription);

    // HOW to skip the clientConnectionHandler if it was closed ?

    if (list != null) {
        for (Iterator<ClientConnectionHandler> iterator = list.iterator(); iterator.hasNext();) {
            ClientConnectionHandler clientConnectionHandler = iterator.next();

            writeBuffer.put(sb.toString().getBytes());
            writeBuffer.flip();

            System.out.println("SENDING FEED TO CLIENT = [" + sb.toString() + "]");

            try {
                if (clientConnectionHandler.getSocketChannel().isConnected()) {
                    clientConnectionHandler.getSocketChannel().write(writeBuffer);
                }
            } catch (IOException e) {
                e.printStackTrace();
                // sometime obtain this error

                // java.nio.channels.ClosedChannelException

                // le client n'est pas connecte
                clientConnectionHandler.close();
            }

            writeBuffer.rewind();

        }
    }
}

Let's talk about the quit command.  When a client send a quit request to the server, or when the client close his connection, the QuoteManager will unsubscribe the client of his cache.  If this client was the only one subscribe to the stock feed, the QuoteManager, will close the IResponseHandler with the 3th party, because there will be not clients that are interesting in the next stock update.

 

/**
* unsubscribe all the quoteSubcription for this client.  If there is no more client subcribe
* to a quoteSubcripotion, the subcription will be close.
* @param clientConnectionHandler the client connection handler
*/
public void unsubcribeClient(ClientConnectionHandler clientConnectionHandler) {

    ConcurrentSkipListSet<String> quoteSubcriptionList = getQuoteSubscription(clientConnectionHandler);

    // for each quoteSubcription we need to check if there are other client that are subcribe to this quoteSubcription
    // and we remove this client from the List.
    for (Iterator<String> iterator = quoteSubcriptionList.iterator(); iterator.hasNext();) {
        String quoteSubcription = iterator.next();

        List<ClientConnectionHandler> listClient = getClientHandlerByQuoteSubcription(quoteSubcription);
        listClient.remove(clientConnectionHandler);

        // if empty, the quoteSubcription is not needed anymore, so we close it
        if (listClient.size() == 0) {
            IResponseHandler responseHandler = getResponseHandlerFromQuoteSubcription(quoteSubcription);

            responseHandler.getCommandRequest().close();

            quoteSubscriptionCache.remove(quoteSubcription);

        }

    }
}

I think we are ready to see it live (live for me:) ).  I'll use a telnet to connect to the server and send requests.

 

Here the output in the server console, when we start the server.

NIOGateway started
listening for incoming TCP Connections on port : 5000

 

Now it's time to connect to the server and send a command : feed|aaa[eoq]

 

telnet localhost 5000

feed|aaa[eoq] (paste it in the console)

 

as soon the request is receive by the server you will see that in the server console :

new client connection established
query found [feed|aaa]
SENDING FEED TO CLIENT = [SYMBOL=[aaa] BID = 27.014468688065794| ASK = 31.911143507306804]
SENDING FEED TO CLIENT = [SYMBOL=[aaa] BID = 46.96027985720251| ASK = 40.82132916110618]
SENDING FEED TO CLIENT = [SYMBOL=[aaa] BID = 1.7131267924565088| ASK = 9.244953384119619]

 

and the telnet console you will receive the bid/ask sent.

SYMBOL=[aaa] BID = 27.014468688065794| ASK = 31.911143507306804

SYMBOL=[aaa] BID = 46.96027985720251| ASK = 40.82132916110618

SYMBOL=[aaa] BID = 1.7131267924565088| ASK = 9.244953384119619

 

I can send another request to the server : feed|bbb[eoq]

that will be add the bbb's bid/ask and the client and server console.

 

and when the client send the quit command, all the update will stop in the server and the client, and the connection with the server will be closed.

 

in the server console :

query found [quit|aaa]
ClientConnection close

 

Like I said in the beginning of this article, the server will handle the disconnection with the 3th party.   To simulate a disconnection with the 3th party, I do it in the run() of the ThirdPartyConnectionHandler, and you will see it in the server console :

Simulate a disconnection from the 3th party
Reconnecting...

 

In the article, we saw the baselines how to create a real world application using the NIO.  The SocketChannel are used instead of the plain socket read/write.  In the first implementation, there is one Thread created by client and the SocketChannel is blocking.  To handle the asynchronous requests we use callback object (IResponseHandler).  To avoid to request the sames stocks to the 3th party, we use cache in the QuoteManager.  Each client that sent a feed request is subscribe to the stock in keep in the cache, and when the connection is closed for this client, the server will unsubscribe it and close the request with the 3th party if required.

 

Download the source code here : to run the application java -jar nio_quotestock_demo_v1.jar

In the next article "Grizzly Migration Guide Part 2 ", I'll show you how to use non blocking connection using Selector.

 

 

 

 


Reddit! Del.icio.us! JoomlaVote! Google! Live! Facebook! Technorati! StumbleUpon! Yahoo! Free social bookmarking plugins and extensions for Joomla! websites!
Comments
Add New Search
Write comment
Name:
Email:
 
Website:
Title:
UBBCode:
[b] [i] [u] [url] [quote] [code] [img] 
 
 
:angry::0:confused::cheer:B):evil::silly::dry::lol::kiss::D:pinch:
:(:shock::X:side::):P:unsure::woohoo::huh::whistle:;):s
:!::?::idea::arrow:
 
Please input the anti-spam code that you can read in the image.

3.26 Copyright (C) 2008 Compojoom.com / Copyright (C) 2007 Alain Georgette / Copyright (C) 2006 Frantisek Hliva. All rights reserved."

Last Updated ( Sunday, 10 August 2008 18:42 )