Java Future y CompletableFuture

De ChuWiki


¿Qué es un Future en Java?[editar]

Un Future en Java es una forma de que un método devuelva inmediatamente un resultado que todavía no está disponible.

Es relativamente frecuente que al llamar a un método Java, este tarde en calcular el resultado que nos tiene que devolver y la llamada al método se quede bloqueada hasta que este termine. Son ejemplos típicos consultas a base de datos o llamadas a un web service.

En estos casos, podemos hacer que el método devuelva inmediatamente una instancia de Future. Nuestro programa podrá continuar su ejecución sin esperar y consultar a Future si el resultado está disponible sin bloquearse.

Future vs CompletableFuture[editar]

CompletableFuture es una extensión de Future que ofrece más funcionalidad. Entre otras cosas, gestiona sus propios hilos y permite encadenar y tratar en conjunto varios Future.

Future es la interface de bajo nivel y nos puede servir para cosas más simmples. Pero suele ser frecuente que tengamos que hacer tareas algo más elaboradas que Future no nos facilita y tengamos que hacer código para ello. Dos ejemplos concretos

  • Añadir un callback para que nos avise cuando el resultado esté disponible. Future no tiene esta opción, aunque podríamos hacer código para ella. CompletableFuture nos la ofrece con su método thenAccept()
  • Encadenar varios proscesos de forma que unos esperen por los resultados del anterior, formando una cadena. Con Future podemos hacer código para implementar esta cadena. CompletableFuture nos lo ofrece ya hecho con los métodos thenApply()
  • CompletableFuture tiene además métodos que permiten tratar varios CompletableFuture como si fueran uno solo. Por ejemplo, esperar que todos terminen, abortar todos si uno falla, etc.

Veamos ejemplos tanto de Future como de CompletableFuture. Los tienes completos en FutureExample.java

FutureTask[editar]

Future es una interface, por lo que no podemos instanciarla directamente. Una clase que implementa Future es FutureTask. Esta clase permite añadirle un Runnable o un Callable y luego lanzarla en un hilo.

FutureTask Runnable[editar]

El codigo de ejemplo con Runnable puede ser el siguiente

FutureTask<String> future = new FutureTask<>(() ->{
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}, "Done!");

ForkJoinPool.commonPool().submit(future);

while(!future.isDone()){
    Thread.sleep(100);
}
System.out.println("Future Task Runnable devuelve "+future.get());

Hemos instanciado FutureTask. En este caso admite dos parámetros. El primero, que hemos puesto como expresión Lambda, es un Runnable que simplemente hace una espera de un segundo. El segundo, "Done!", es el resultado que debe devolver Future cuando termine de ejecutarse el Runnable.

Lanzamos un hilo con este FutureTask. Future y por tanto FutureTask implementan a su vez la interface Runnable, por lo que podrían lazarse con un hilo normal.

new Thread(future).start();

Sin embargo, vamos a usar un pool de Threads que nos proporciona Java, ForkJoinPool.commonPool(). Este pool es el que usan internamente los CompletableFuture que vemos más adelante. Esto nos resulta útil en nuestro ejemplo porque si nuestro programa de ejemplo termina antes de que terminen los hilos, no veremos los resultados, los hilos se "abortan". Este pool tiene el método ForkJoinPool.commonPool().awaitQuiescence() que permite esperar que todos los hilos terminen. Poniéndolo antes de terminar nuestro main(), nos aseguramos de ver todos los resultados.

public static void main(String [] args) {
   // Aquí nuestros ejemplos con Future y CompletableFuture

   // Esperar máximo 5 segundos que todos los hilos terminen antes del fin de programa
   ForkJoinPool.commonPool().awaitQuiescence(5, TimeUnit.SECONDS);
}

Una vez explicado por qué usamos el pool, con submit() lanzamos el FutureTask en un hilo. A partir de aquí nuestro código sigue ejecutándose y a Future podemos preguntarle por el resultado.

En nuestro ejemplo, nos limitamos a preguntar cada cien milisegundos si future.isDone(), es decir, si ha terminado. Una vez que ha terminado, llamando a future.get() obtnemos el resultado.

La llamada a future.get() se queda bloqueada si el hilo no hubiera terminado. Y lanzaría una excepción si nuestro Runnable hubiera terminado con una excepción.

Evidentemente, si después de lanzar el hilo vamos a esperar el resultado, no tiene sentido hacer el bucle llamando a isDone(), bastaría llamar a get() que se quedará bloqueda hasta que esté el resultado. Incluso no tiene sentido lanzar la tarea que tarda en un hilo separado con un FutureTask. Así que toma este código sólo como una explicación de como lanzar un FutureTask y como obtener luego su resultado. En un ejemplo real, después de lanzar el FutureTaks, nuestro código se dedicaría a hacer otras cosas.

Aparte, te surgirá otra duda. ¿Para qué sirve todo esto si ya tenemos el resultado y lo hemos pasado en el constructor de FutureTask?. Bien, no es un ejemplo que tenga mucho sentido para esperar por un resultado que puede tardar en obtenerse. Este ejemplo tiene sentido si tuvieramos que lanzar varios FutureTask con sus correspondientes Runnable, los metemos en una lista de FutureTask pendientes de terminar y usamos el dato como identificador del FutureTask. De esta forma, podríamos recorrer la lista periódicamente para ver si alguno ha terminado y con future.get() saber cual de ellos ha terminado.

FutureTask Callable[editar]

Vamos con un ejemplo con un poco más de sentido. UnCallable es como un Runnable, pero que devuelve un resultado al terminar. Veamos el siguiente código de ejemplo

FutureTask<String> future = new FutureTask<>(() ->{
    Thread.sleep(1000);
    return "Done!";
});

ForkJoinPool.commonPool().submit(future);

while(!future.isDone()){
    Thread.sleep(100);
}
System.out.println("Future Task Callable devuelve " + future.get());

El ejemplo es exactamente igual que el caso anterior, pero con dos diferencias. Una es que el Callable que pasamos como Lambda, tiene un return con el resultado. Y la segunda diferencia es que el constructor de FutureTask no necesita el segundo parámetro con el dato, puesto que es el Callable el que lo devuelve.

Por lo demás es igual, se lanza el hilo en el pool común, se espera resultado con future.isDone() y se obtiene con future.get() cuando está disponible.

Este ejemplo si cuadra más con la explicación que dimos al principio. El Callable podría hacer la consulta a la base de datos o al webservice, devolviendo el resultado cuando lo tenga, y así no se bloquea el hilo principal de nuestra aplicación.

FutureTask cancel[editar]

FutureTask nos ofrece el método cancel() para interrumpir el Runnable o Callable. Dependiendo de lo que esté haciendo el Runnable o Callable y de como estén implementados, puede ser que la llamada a interrupt() del hilo tenga efecto, no tenga efecto o que haga saltar una excepción. Veamos un ejemplo

FutureTask<String> future = new FutureTask<>(() ->{
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e){
        System.out.println("Future Task Callable interrumpido "+e);
        return null;
    }
    return "Done!";
});

ForkJoinPool.commonPool().submit(future);

Thread.sleep(100);
future.cancel(true);

try {
    System.out.println("Future Task Callable devuelve" + future.get());
} catch (CancellationException e) {
    System.out.println("Future Task Callable cancelado "+e);
}

Es el mismo ejemplo de antes, pero hemos llamado a cancel(true). Si pasamos true como parámetro, estamos indicando que queremos que se llame a un interrupt() del hilo. Si pasamos un false, no se interrumpirá el hilo, que seguirá trabajando, pero estamos indicando que ya no tenemos interés en el resultado.

Tras llamar a cancel(), sea con true o on false, la llamada a get() no se quedará bloqueada en espera de que termine el hilo y lanzará una excepción CancellationException.

En nuestro ejemplo, como el Callable estará en un sleep(), al llamar a cancel(true), saltará la excepción InterruptedException que se sacará por pantalla y el hilo terminará inmediatamente. Si hubieramos llamado con cancel(false), el hilo seguiría con su sleep(), terminaría de forma normal cuando pasara el tiempo, devolvería el resultado y ... FutureTask lo ignoraría, no nos lo devovería en get()

Future timeout[editar]

La llamada a future.get() admite dos parámetros para indicar un tiempo de timeout de espera

try {
    String value = future.get(500, TimeUnit.MILLISECONDS);
    System.out.println("Future Task Callable devuelve " + value);
} catch (TimeoutException e) {
    System.out.println("Future Task Callable Timeout "+e);
}

Los parámetros son el tiempo máximo que queremos esperar y la unidad de tiempo para dicho valor, quinientos milisegundos en el ejemplo. Si el resultado está disponible antes, lo obtendremos en cuanto esté disponible. Si pasado ese tiempo no está disponible, salta una TimeoutException.

CompletableFuture[editar]

Como hemos comentado, CompletableFuture es una extensión de Future con ciertas facilidades. Por un lado, se encarga ella de lanzar y gestionar los hilos, a diferencia de Future que teniamos que lanzar el hilo nosotros por seprado. Además, CompletableFuture ofrece muchos métodos útiles para tareas más o menos comunes que con Future tendriamos que codificar una y otra vez.

Veamos algunos de estos ejemplos.

Future callback : CompletableFuture thenAccept[editar]

Suele ser útil que cuando un Future termine con su trabajo, nos avise para poder ir a recoger el resultado. Lo que comunmente se conoce como añadirle un "callback" al que llamar cuando termine. Future no tiene esta posiblidad, aunque prodriamos siempre hacer codigo para implementarla. Afortunadamente CompletableFuture sí la tiene. Veamos el ejemplo

CompletableFuture.supplyAsync(() -> {
    sleep(1000);
    return "Done!";
}).thenAccept(value -> {
    System.out.println("Completable Future devuelve " + value);
});

System.out.println("Completable Future. El hilo principal No espera resultado");

Llamando a CompletableFuture.supplyAsync() le pasamos un Supplier (similar a la interface Callable que devuelve un resultado) y esta misma llamada lanza el hilo usando el pool de hilos ForkJoinPool.commonPool() que mencionamos antes.

Pasamos el Supplier como una expresión Lambda que espera un segundo simulando que tarda en obtener el resultado y finalmente devuelve el resultado.

supplyAsync() devuelve un CompletableFuture y esté tiene un méodo thenAccpet() al que se llamará cuando el resultado esté disponible. Este méodo acepta un Consumer al que pasrá el resultado. En nuestro ejemplo lo hemos pasado como expresión Lambda y simplemente sacamos por pantalla el resultado.

La última línea es simplemente para verificar que nuestro hilo principal no se queda bloqueado en espera del resultado. Veremos que sale por pantalla este println() un segundo antes que el de dentro del thenAccept()

CompletableFuture thenApply[editar]

A veces cuando termina un Future queremos lanzar un segundo Future que use los resultados del primero y luego un tercero que use los resultados del segundo y así sucesivamente. Con Future tendríamos que implementarlo con código. CompletableFuture nos ofrece el método theApply() para ir encadenando Future que usa cada uno los resultados del anterior. Veamos un ejemplo

CompletableFuture.supplyAsync(() -> {
    System.out.println("CompletableFuture chain generando resultado ");
    sleep(1000);
    return "Done!";
}).thenApply(value -> {
    System.out.println("CompletableFuture chain pasando a minúsculas ");
    sleep(1000);
    return value.toLowerCase();})
.thenApply(value -> {
    System.out.println("CompletableFuture chain pasando a bytes ");
    sleep(1000);
    return value.getBytes();})
.thenAccept(value -> {
    System.out.println(Arrays.toString(value));
});

Hemos encadenado tres procesos que tardan (lo simulamos con el sleep()) y al final sacamos el resultado. El primer proceso genera un String de resultado "Done!". El segundo, lo convierte a minúsculas y el tercero lo convierte en un array de bytes. Finalmente, se saca por pantalla el array de bytes.

Cada proceso se pasa como una expresión Lambda. El primero, ya lo vimos, con el método supplyAsync() que es el que lanza el hilo. Los siguientes, como expresiones Lambda en el método thenApply(). Recibirán el resultado del proceso anterior y devolverán su propio resultado. Finalmente, thenAccept() recibe el resultado del último proceso y termina la cadena, sacando el resultado.

CompletableFuture thanApply vs thenApplyAsync. thenAccpet vs thenAcceptAsync[editar]

Tanto thenAccept() como thenApply() tienen sus versiones asíncronas. ¿Cual es la diferencia entre ellas?.

Las versiones no asíncronas solo lanzan un hilo nuevo si el resultado del proceso anterior no está disponible. Si estuviera disponible, no lanzan un hilo nuevo. Esto podría hacer que nuestro hilo principal se bloqueara. Imagina este código

CompletableFuture.supplyAsync(() -> {
    System.out.println("CompletableFuture chain generando resultado ");
    return "Done!";
.thenApply(value -> {
    System.out.println("CompletableFuture chain pasando a bytes ");
    sleep(1000);
    return value.getBytes();})
.thenAccept(value -> {
    System.out.println(Arrays.toString(value));
});

Es similar al anterior. Solo dos diferencias. Hemos quitado el proceso de pasar a minúsculas solo por simplificar. Y hemos quitado el sleep() del primero proceso, el del supplyAsync(). Ahora ese proceso no tarda en generar el resultado.

Como el resultado está disponible inmediatamente, el thenApply() que pasa a minúsculas no lanza un hilo nuevo. El paso a minúsuclas se ejecuta en el hilo principal de nuestro código. Y como el proceso de pasar a minúsculas tarda un segundo, nuestro código se queda bloqueado ese segundo.

Aquí es donde podemos usar las veriones asincronas de los métodos. Estas lanzan un nuevo hilo tanto si el resultado está disponible como si no. Consumimos más hilos del pool, pero nos aseguramos de no quedarnos bloqueados.

Si queremos ser eficientes y tenems cierto conocimiento de si el proceso puede tardar o no, podemos usar las versiones sincrona o asíncorona según el proceso que le vayamos a mandar. Por ejemplo, podemos usar thenAccept si sólo vamos a sacar el resultado por pantalla. O podemos usar thenAcceptAsyn() si lo que queremos es guardar ese resultado en una base de datos, que puede tardar.

CompletableFuture join[editar]

CompletableFuture tiene el método join() que de forma similar a get() se queda bloqueado hasta que el resultado esté disponible y nos lo devuelve. La diferencia entre uno y otro es que get() puede lanzar excepciones ExecutionException y InterruptedException que estamos obligados a capturar, mientras que join() puede lanzar una CompletionException que no es necesario capturar, aunque podemos hacerlo si lo necesitamos.

get() tiene además la opción con timeout, que join no tiene.

Veamos un ejemplo con join()

final CompletableFuture<String> completableReturn = CompletableFuture.supplyAsync(() -> {
   sleep(1000);
   return "Done!";
});

final String result = completableReturn.join();
System.out.println("CompletableFuture devuelve " + result);

CompletableFuture exceptionally[editar]

Con join() hemos visto que no es necesario capturar la excepción, pero si salta y no la capturamos, la excepción se propagará por el resto del programa. El méodo CompletableFuture.exceptionally() nos permite capturarla y devolver un valor por defecto para el caso de que hay algún problema. Veamos un ejemplo

final CompletableFuture<String> completableException = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(100);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return Integer.toString(1/0); // La división 1/0 lanza excepción
}).exceptionally(e -> "Algo ha ido mal. La excepción  es "+e.getMessage());

try {
    String value = completableException.join();
    System.out.println("Completable Future con Exception devuelve "+ value);
} catch (CompletionException e){
    System.out.println("Completable Future con Exception da excepción "+e);
}

Nuestra tarea principal devuelve como String el resultado de dividir uno entre cero. Esto lanzará una excepción. Al método exceptionally le pasamos un Lambda que recibirá la excepción y que debe devolver un resultado alternativo. A este Lambda sólo se le llamara si el proceso anterior termina con una excepción. Si termina normalmente, no se llamará al Lambda de exceptionallyz(). Como si no hubieramos puesto nada.

Nuestro metodo devuelve un texto alternativo: "Algo ha ido mal. La excepción es " y el mensaje con la excepción. Si ejecutamos el código, veremos que join() no hace saltar ninguna excepción y que saca por pantalla este texto alternativo, como si todo hubiera ido bien.

CompletableFuture join all[editar]

A veces hemos lanzado varias CompletableFuture de forma independiente, pero necesitamos esperar que se completen todas antes de procesar los resultados. Y si una falla, no nos interesa ya procesar el resto. Podemos conseguir esto con allOf().join(), como el siguiente ejemplo

final CompletableFuture<String> completableReturn = CompletableFuture.supplyAsync(() -> {
    sleep(100);
    return "Done!";
});
final CompletableFuture<String> completableException = CompletableFuture.supplyAsync(() -> {
    sleep(1000);
    return Integer.toString(1/0);
});

try {
    CompletableFuture.allOf(completableException, completableReturn).join();
    System.out.println("CompletableFuture join ofAll, completableReturn devuelve "+completableReturn.join());
    System.out.println("CompletableFuture join ofAll, completableException devuelve "+completableException.join());
} catch (Exception e){
    System.out.println("CompletableFuture join ofAll da fallo global "+ e);
}
</syntaxhighight>

Hemos lanzado dos <code>CompletableFuture</code>. La primera tarda poco (cien milisegundos) y da un resultado correcto. La segunda tarda algo más (un segundo) y falla con una excepción "División por cero".

<code>CompletableFuture.allOf()</code> nos devuelve una <code>CompletableFuture</code> con todas las que le pasemos como parámetro dentro. Un <code>join()</code> a esa nueva <code>CompletableFuture</code> conjunta termina sin fallos si todas terminan sin fallo y da una excepción si cualquiera de ellas da una excepción.

No nos devuelve ningún resultado, puesto que hay varias <code>CompletableFuture</code> involucradas y cada una tiene su propio resultado. Así que si todo va bien, debemos preguntar una por una a las <code>CompletableFuture</code>. Eso , tenemos la garantía de que no nos quedaremos bloqueados, porque la llamada <code>allOf().join()</code> nos garantiza que todas han terminado ya.

=== CompletableFuture anyOf ===

De forma distinta a <code>allOf()</code>, el método <code>anyOf</code> nos devuelve la primera <code>CompletableFuture</code> que termine de forma exitosa o no. Veamos un código de ejemplo

<syntaxhighlight lang="java">
final CompletableFuture<String> completableReturn = CompletableFuture.supplyAsync(() -> {
    sleep(100);
    return "Done!";
});
final CompletableFuture<String> completableException = CompletableFuture.supplyAsync(() -> {
    sleep(1000);
    return Integer.toString(1/0);
});

try {
    System.out.println(CompletableFuture.anyOf(completableException, completableReturn).join());
} catch (Exception e){
    System.out.println("CompletableFuture join ofAll da fallo global "+ e);
}

Nuevamente dos CompletableFuture. Una tarda poco y da un resultado correcto, la otra tarda más y da una excepción. Esta vez sacamos por pantalla el resultado de anyOf().join() que dará el resultado de la tarea que termine antes. En este caso la existosa. Si terminara antes la de fallo, la llamada a anyOf().join() haria saltar una excepción.