Pipeline con netty

De ChuWiki

Al final de Ejemplo Sencillo de TCP/IP con netty vimos que Netty permitía que trataramos los bytes recibidos por el socket en partes, de forma que cada uno de nuestros trozos de código haga algo con los bytes recibidos y pase los resultados al siguiente trozo. Vamos a ver aquí todo esto con detalle.

Tienes todo el código de este ejemplo en Ejemplo con netty

Tratamiento de la entrada : ChannelInboundHandler[editar]

Cuando llega un array de bytes por el socket, lo más frecuente es tener los siguientes tres trozos de código :

  • Si el que nos envía mensajes lo hace muy seguido, posiblmente recibamos todos los bytes de todos los mensajes seguidos. El primer bloque de código en recepción suele encargarse de separar los bytes que componen cada mensaje para pasarle al siguiente bloque sólo los bytes de un mensaje. Por ejemplo, si los mensajes son líneas de texto separadas por retornos de carro, el primer bloque separa las línesa, buscando los retornos de carro y enviando al siguiente bloque sólo una línea cada vez. Este bloque suele llamarse "extractor de tramas" o "Frame Extractor".
  • El segundo bloque, con la garantía de que el array de bytes que recibe es un mensaje completo en sí mismo, suele traducir estos bytes a una clase java o algo más legible por el código que un array de bytes. En el caso anterior, cogería los bytes de una línea y la convertiría en un String. Este bloque de código se suele llamar "Decodificador" o "Decoder".
  • El tercer bloque es el que hace realmente algo útil con los datos recibidos y ya convertidos en algo legible. Muestra el String por pantalla, o lo interpreta como un comando a ejecutar, o lo que sea que toque hacer con ese String en nuestra aplicación. Este bloque es el de lógica de la aplicación.

En Netty implementamos cada uno de estos bloques haciendo una clase que implemente ChannelInboundHandler, o si no queremos sobreescribir todos sus métodos, heredamos de ChannelInboundHandlerAdapter.

Aunque el extraer mensajes buscando retornos de carro y el convertir bytes a String ya los tiene netty implementados, vamos a hacerlos aquí a mano para mostrar el funcionamiento de esto. Vamos con el primero, el FrameExtractor

Frame Extractor[editar]

El código puede ser como el siguiente

package com.chuidiang.examples4;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;

public class FrameExtractor extends ChannelInboundHandlerAdapter{

    private ByteBuf buf;  // (1)

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {  // (2)
        buf= ctx.alloc().buffer();
    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {  // (3)
        if(null!=buf) {
            buf.release();
            buf=null;
        }

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  // (4)
        try {
            buf.writeBytes((ByteBuf) msg);  // (5)
            int indexOf = buf.indexOf(0, buf.readableBytes(), (byte) '\n');  // (6)
            while (-1!=indexOf) {
                    ByteBuf line = ctx.alloc().buffer();  // (7)
                    buf.readBytes(line, indexOf);  // (8)
                    buf.readByte(); // Leemos el retorno de carro para eliminarlo.
                    ctx.fireChannelRead(line);  // (9)
                    buf.discardReadBytes();
                    indexOf = buf.indexOf(0, buf.readableBytes(), (byte) '\n');
            }
        } finally {
            ReferenceCountUtil.release(msg); // (10)
        }
    }
}

Vamos con los detalles

  • (1) Cuando nos lleguen bytes leídos, no tenemos garantía de que llegue una línea completa con su retorno de carro, deberemos esperar a las siguientes lecturas. Necesitamos por tanto un buffer donde guardar esos bytes hasta la siguiente lectura, donde quizás sí, quizás no, llegue el retorno de carro. De igual manera, puede que nos llegue un grupo de bytes con un retorno de carro en medio, por lo que necesitamos un buffer donde guardar los bytes que van después del retorno y que no llevan su propio retorno al final, simplemente porque todavía no ha llegado.
  • (2) En el método channelActive() que es al que nos llama netty cuando el canal de comunicación está activo, aprovechamos para crear el buffer que mencionamos en el punto anterior. Usamos los métodos que netty nos proporciona para crear estos buffer.
  • (3) En el método channelInactive() que es al que nos llama netty cuando el canal de comunicación deja de estar activo, aprovechamos para liberar el buffer que creamos en el punto anterior.
  • (4) En el método channelRead() es donde está la madre de la ciencia. Resumimos lo que se pretende : buscar una línea buscando su retorno de carro, crear un ByteBuf para esa línea y pasarla al siguiente trozo de código (handler) para que lo convierta a String. Para pasar esta línea se usa el método ctx.fireChannelRead(line), que avisará al siguiente handler en la cadena pasándole el parámetro que pasamos a este método. Vamos con los detalles.
  • (5) El msg que recibimos como parámetro es un ByteBuf de netty. Añadimos todo su contenido al final de nuestro ByteBuf de acumular bytes, definido el paso (1)
  • (6) En nuestro buff buscamos un retorno de carro '\n'. y nos metemos en un bucle tratando cada uno de los que encontremos (es posible que en una sola lectura de bytes del socket nos lleguen varias líneas.
  • (7) Si hemos encontrado un retorno de carro, creamos un nuevo ByteBuf para meter ahí la línea completa y pasársela al siguiente handler.
  • (8) Metemos en ese buffer todos los bytes hasta el retorno de carro. Leemos el retorno de carro por separado para eliminarlo del buffer de entrada y no enviarlo al siguiente handler.
  • (9) Avisamos al siguiente handler, pasándole nuestro buffer line. No liberamos el buffer line porque es responsabilidad del que lo recibe.
  • (10) Liberamos el buffer que nos ha llegado por parámetro. Como ya no lo necesitamos y no se lo hemos pasado a nadie, es nuestra responsabilidad liberarlo.

StringDecoder[editar]

Nuestro siguiente ChannelInboundHander recibirá los bytes que nos envía el FrameExtractor, ya sin retorno de carro. Lo único que tiene que hacer es traducirlo a String para pasárselo al siguiente. El código puede ser como este

public class StringDecoder extends ChannelInboundHandlerAdapter{
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf =(ByteBuf)msg;
        String text = buf.toString(Charset.defaultCharset());
        ctx.fireChannelRead(text);
        buf.release();
    }
}

Nada especial. El msg que recibimos como parámetro es el ByteBuf (line) que nos entregó el handler anterior, hacemos el cast, lo convertimos a String y se lo pasamos al siguiente handler con ctx.fireChannelRead(text). El buffer que recibimos del handler anterior lo liberamos nosotros, ya que no lo necesitamos más y no se lo hemos pasado a nadie.

Lógica de negocio[editar]

Nuestro tercer handler recibirá directamente el String, así que hacemos con él ya lo que nos interese.

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String text = (String)msg;
        System.out.println(text);
    }
}

El msg recibido es el String que nos envía el handler anterior y solo tenemos que hacer el cast y sacarlo por pantalla, o lo que queramos.

Todo junto[editar]

El siguiente diagrama muestra lo que acabamos de hacer

y el código java para montar todos estos handler en el socket

           ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // (3)
                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new FrameExtractor());
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new ServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)
                    .childOption(ChannelOption.SO_KEEPALIVE, true); // (6)

Unicamente hay que tener en cuenta que se añadirán en el orden que se van añadiendo, es decir, añadimos primero el FrameExtractor porque es el primero al que llamamos.

Tratamiento de salida : ChannelOutboundHandler[editar]

Si el flujo es el contrario, es decir, somos nosotros los que escribimos en el socket para enviar bytes hacia afuera, tenemos posibilidad de hacer una cadena similar. Nosotros enviamos a un Handler hecho por nosotros el mensaje que queremos enviar pero como si fuera una estructura de datos nuestra (un String por ejemplo, pero podría ser cualquier clase nuestra de datos). Necesitaremos añadir un Handler que sea capaz de convertir esa estructura de datos (el String en nuestro ejemplo) en un array de bytes y necesitaremos otro Handler capaz de añadir a esos bytes que componen un mensaje los bytes que permitan separar ese mensaje de otro (en nuestro ejemplo, el byte de retorno de carro).

La primera clase, la que tiene el método de envío de nuestros datos, puede ser de cualquier tipo (ChannelInboundHandler o ChannelOutboundHandler), lo único importante es que tenga el channel sobbre el que quiere escribir.

Las otras dos clases (la que convierte el mensaje a bytes y la que añade el separador) deben ser ChannelOutboundHandler y sobreescribir el método write(). A este método es al que llamará netty cuando quiera escribir en el socket. Al igual que en el caso anterior, cada handler recibirá como parámetro de este método el objeto que nos entregue el handler anterior.

Lógica de negocio[editar]

Como hicimos en Ejemplo Sencillo de TCP/IP con netty usaremos el handler de cliente, que es más sencillo. Es este

package com.chuidiang.examples4;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ClientHandler extends ChannelInboundHandlerAdapter{
    private ChannelHandlerContext ctx;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        this.ctx=ctx;
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        ctx=null;
    }

    public void sendMessage(String msg){
        if (null==ctx)
            return;
        ChannelFuture channelFuture = ctx.write(msg);
        ctx.flush();
    }
}

En el método handlerAdded() nos guardamos el ChannelHandlerContext, que nos servirá para enviar mensajes por el socket. En handlerRemoved() lo eliminamos, puesto que dejaremos de estar conectados al socket. Hemos puesto un método sendMessage(String) donde enviamos el mensaje en formato nuestro (String) y no como ByteBuf. Simplemente llamamos a ctx.write(msg) para enviar este mensaje, tal cual, como String, al siguiente handler.

String Encoder[editar]

El siguiente handler en la cadena tiene que convertir este String a ByteBuf (o byte[] o cualquier otro tipo de array de bytes que entienda el siguiente handler, que como también es nuestro, solo tenemos que estar de acuerdo con nosotros mismos). Enviamos el array de buffer nuevamente con ctx.write()

public class StringEncoder extends ChannelOutboundHandlerAdapter{
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        String text = (String)msg;
        ByteBuf buf = Unpooled.copiedBuffer(text.getBytes());
        ctx.write(buf,promise);
    }
}

Hemos heredado de ChannelOutboundHandlerAdapter para no sobreescribir todos los métodos de la interface y hemos sobreescrito el método write(). En este método recibidmos msg como String, que es lo que nos ha enviado el handler anterior. Lo convertimos a un ByteBuf y lo escribmos con ctx.write(). Este ByteBuf es lo que recibirá nuestro siguiente handler en la cadena.

Frame Maker[editar]

El último handler sólo tiene que añadir el retorno de carro. Nuevamente heredamos de ChannelOutboundHandlerAdapter y sobreescribimos el método write()

public class FrameMaker extends ChannelOutboundHandlerAdapter{
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buff = ctx.alloc().buffer();
        buff.writeBytes((ByteBuf)msg);
        buff.writeByte('\n');
        ctx.write(buff,promise);
        ReferenceCountUtil.release(msg);
    }
}


Creamos un nuevo ByteBuf, copiamos con writeBytes() el contenido que nos ha llegado y añadimos con writeByte() el retorno de carro. Pasamos este nuevo ByteBuf con retorno de carro a netty para que lo mande por socket (no hay más handler nuestros, así que netty lo mandará por el socket) y liberamos el msg que hemos recibido.

Todo junto[editar]

El siguiente diagrama muestra lo que acabamos de hacer, se lee de abajo a arriba

y en el código, debemos poner

            Bootstrap b = new Bootstrap(); // (2)
            b.group(workerGroup)
                    .channel(NioSocketChannel.class) // (3)
                    .handler(new ChannelInitializer<SocketChannel>() { // (4)
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new FrameMaker());
                            ch.pipeline().addLast(new StringEncoder());
                            ch.pipeline().addLast(new ClientHandler);

                        }
                    })

Un detalle, en el caso de ChannelOutboundHandler, los handler se llaman en sentido inverso a cómo se añaden. Por eso, si llamamos al método sendMessage() de ClientHandler, este pasará el String al que está encima (StringEncoder) y este a su vez al que está encima (FrameMaker)

Y todo más junto aun[editar]

En una misma cadena, podemos poner ChannelInboundHandler y ChannelOutuboundHandler mezclados. Veamos el trozo del servidor con todo junto

            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // (3)
                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new FrameExtractor());
                            ch.pipeline().addLast(new FrameMaker());
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new StringEncoder());
                            ch.pipeline().addLast(serverHandler);

                        }
                    })

Cuando llega un mensaje del socket, se llamará a los ChannelInboundHandler en el orden en que se han añadido, es decir, FrameExtractor primero, StringDecoder después y finalmente serverHandler.

Cuando escribimos en el socket usando uno de estos handler (en nuestro ejemplo se haría desde serverHandler), se llamará a los ChannelOutboundHandler desde este handler (serverHandler) en sentido contrario a como se han añadido (de abajo a arriba), es decir, serverHandler al escribir pasa sus datos a StringEncoder y este a FrameMaker, que finalmente es el que envía realemente bytes por el socket.

Y otro detalle más[editar]

Y otro detalle más, que ya hemos mencionado, pero repetimos. Este ejemplo es tan habitual que no necesitamos hacer los FrameExtractor y FrameMaker ni el StringEncoder ni StringDecoder. Netty los tiene ya implementados. El código con los de netty quedaría

            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // (3)
                    .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LineBasedFrameDecoder(1000));
                            ch.pipeline().addLast(new LineEncoder(Charset.defaultCharset()));
                            ch.pipeline().addLast(new StringDecoder(Charset.defaultCharset()));
                            ch.pipeline().addLast(serverHandler);

                        }
                    })

LineBasedFrameDecoder hace las veces de nuestro FrameExtrator, busca retornos de carro y pasa los grupos de bytes entre retornos de carro al siguiente handler. StringDecoder recoge esos bytes y los pasa a String, entregándoselos como tales a serverHandler.

Cuando escribimos un String usando el método sendMessage() de nuestro serverHandler, lo recibe LineEncoder, que le añade el retorno de carro y lo converte a String (hace de una sola vez lo que hacíamos con nuestro StringEncoder y FrameMaker).

Por supuesto, estas clases de netty están mucho más curradas que las que hemos hecho de ejemlo. LineBasedFrameEncoder tiene en cuenta retornos de carro estilo windows (\r\n) y de linux (\n), aparte de un máximo de caracteres (1000 en ejemplo) a partir del cual genera la línea aunque no haya encontrado un retorno de carro, por si no llega nunca. LineEncoder y StringDecoder admiten el Charset a usar, que en nuestro ejemplo era a piñón fijo del defaultCharset()