Serializacion en Hazelcast con Kryo

De ChuWiki

Hazelcast permite crear memoria distribuida, en forma de Map, Set, Queue, etc, entre varios ejecutables. Para ello, necesita serializar los objetos java, para que puedan circular como array de bytes de un ordenador o ejecutable a otro. Hazelcast usa por defecto la serialización de java, es decir, clases que implementen la interface Serializable, pero esta no es precisamente la más eficiente ni en consumo de CPU al serializar/deserializar, ni en espacio ocupado en bytes. Por ello, Hazelcast ofrece la posibilidad de indicar nuestros propios algoritmos de serialización. Veamos aquí un ejemplo con una librería de serialización bastante más eficiente: Kryo

Dependencias de Hazelcast y Kryo[editar]

Por supuesto, en nuestro proyecto Maven o gradle necesitaremos poner las dependencias de Hazelcast y de Kryo. En el momento de escribir este artículo, las versiones más modernas son

<dependency>
   <groupId>com.hazelcast</groupId>
   <artifactId>hazelcast</artifactId>
   <version>4.1.1</version>
   <scope>compile</scope>
</dependency>
<dependency>
   <groupId>com.esotericsoftware</groupId>
   <artifactId>kryo</artifactId>
   <version>5.0.3</version>
</dependency>

Configurar la serialización de Hazelcast[editar]

Para indicarle a Hazelcast qué serialización queremos que use, una hecha por nosotros, debemos incluirla en la configuración antes de obtener la instancia de Hazelcast. El código sería algo como esto

Config config = new Config();
config.getSerializationConfig().setGlobalSerializerConfig(myGlobalConfig);
...
HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);

Se crea una instancia de Config de hazelcast y le cambiamos el globalSErializar por uno de nuestra cosecha: myGlobalConfig. Este no es más que una instancia de la clase Hazelcast GlobalSerializerConfig en la que metemos nuestro propio serializador


GlobalSerializerConfig myGlobalConfig = new GlobalSerializerConfig();
myGlobalConfig.setOverrideJavaSerialization(true).setImplementation(new MyDataSerializer());

Config config = new Config();
config.getSerializationConfig().setGlobalSerializerConfig(myGlobalConfig);
...
HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);

Es sólo instanciar GlobalSerializerConfig, decirle que queremos reemplazar la serialización por defecto de java con nuestra propia implementación.

El serializador para Hazelcast[editar]

Nuestro serializador debe implementar la interface StreamSerializer<Object> que sólo tiene dos métodos: void write(ObjectDataOutput objectDataOutput, Object data) y Object read(ObjectDataInput objectDataInput). Veamos la implementación

package com.chuidiang.ejemplos.subscription;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.StreamSerializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class MyDataSerializer implements StreamSerializer<Object>{

    @Override
    public void write(ObjectDataOutput objectDataOutput, Object data) throws IOException {
        Kryo kryo = new Kryo();

        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        Output output = new Output(bos);
        kryo.writeClassAndObject(output, data);
        objectDataOutput.writeByteArray(output.getBuffer());
    }

    @Override
    public Object read(ObjectDataInput objectDataInput) throws IOException {
        Kryo kryo = new Kryo();
        byte [] buffer = objectDataInput.readByteArray();

        Input input = new Input(buffer);

        Object data = kryo.readClassAndObject(input);

        return data;
    }

    @Override
    public int getTypeId() {
        return 1;
    }

    @Override
    public void destroy() {

    }
}

La clase, como hemos comentado, implementa StreamSerializer<Object> y escribe sus dos métodos.

write[editar]

El método write recibe dos parámetros: el dato que queremos convertir a bytes (serializar) y un ObjectDataOutput donde tenemos que escribir esos bytes. Para hacerlo usando Kryo, los pasos son:

  • Obtener una instancia de Kryo. Aquí como es un ejemplo sencillo, hacemos un new directamente. Esto puede no ser lo más eficiente, idealmente deberíamos usar una única instancia de Kryo para reaprovecharla siempre y teniendo en cuenta que Kryo no es thread-safe, por lo que requeriría algún tipo de sincronización. Como nuestro ejemplo es sólo ilustrativo, hacemos el new de Kryo cada vez, aunque no sea lo más eficiente.
  • Creamos un ByteArrayOutputStream para guardar los bytes que nos dará Kryo cuando serialize nuestra clase.
  • Metemos el ByteArrayOutputStream en un Output de Kryo.
  • Llamamos a kryo.writeClassAndObject(output, data); que es el método de Kryo que serializa la clase. Admite el Output y el data que queremos serializar.
  • Ya solo nos queda meter los bytes en el parámetro ObjectDataOutput que Hazelcast nos pasa. Así que llamamos a objectDataOutput.writeByteArray() pasando como parámetro el buffer, ya relleno, de Output

Listo, quizás parece un poco enrevesado porque estamos haciendo compatibles tres tipos de clases: java, hazelcast y kryo, pero el concepto es sencillo. Convertir el dato a bytes usando kryo y meterlo en el dataoutput de hazelcast.

read[editar]

El método read es muy similar al método write. Recibe un ObjectDataInput que tiene los bytes que ha leído Hazelcast. Tenemos que convertir esos bytes a una clase usando Kryo y devloverlo en el return.

Como antes:

  • Se obtiene una instancia de Kryo, con el mismo apunte sobre eficiencia que hicimos en el método write.
  • Obtenemos el array de bytes que nos pasa Hazelcast con la llamada byte [] buffer = objectDataInput.readByteArray();
  • Metemos ese array de bytes en un Input de Kryo.
  • Llamamos a kryo.readClassAndObject(input) para obtener la clase a partir de los bytes (deserialización).
  • Y se devuelve el objeto obtenido.

Serializar clase que no implementa Serializable[editar]

Como prueba de que esto está funcionando, podemos crear una clase que no implemente Serializable y meterla en un Map de Hazelcast para ver que se puede almacenar y recuperar. El código de esa clase de datos de prueba puede ser sencillote, como esta

package com.chuidiang.ejemplos.subscription;

import java.util.Date;

public class Data {
    public int value;
    public Date date;

    @Override
    public String toString() {
        return "Data{" +
                "value=" + value +
                ", date=" + date +
                '}';
    }
}

Ponemos public los atributos para no guarrear el código con setter y getter.

Y el main de prueba sería algo como esto

   public static void main(String[] args){

      // Configuracion de la serializacion deseada
      GlobalSerializerConfig myGlobalConfig = new GlobalSerializerConfig();
      myGlobalConfig.setOverrideJavaSerialization(true).setImplementation(new MyDataSerializer());
      Config config = new Config();
      config.getSerializationConfig().setGlobalSerializerConfig(myGlobalConfig);

      // Obtencion de instancia de Hazelcast y de un Map
      HazelcastInstance instance = Hazelcast.newHazelcastInstance(config);
      IMap<String, Data> otherMap = instance.getMap("otherMap");

      // Creamos y metemos un Data que no implementa Serializable
      Data data = new Data();
      data.date=new Date();
      data.value=1000;
      otherMap.put("key", data);

      // Se lee el dato y se saca por pantalla para ver que es correcto.
      Data readData = otherMap.get("key");
      System.out.println(readData);


      // Matamos Hazelcast
      instance.shutdown();
   }