StructuredTaskScope en Java 21

De ChuWiki


¿Qué es StructuredTaskScope?[editar]

A veces queremos dividir una tarea grande en subtareas más pequeñas que se puedan ejecutar en paralelo. De esta forma, podemos lanzar varios hilos y que cada hilo se ocupe de una subtarea concreta. Finalmente, tendremos que esperar que todo los hilos terminen sus tareas para obtener el resultado de la tarea grande.

La clase StructuredTaskScope nos ayuda a organizar todo esto. Vamos a ver un ejemplo, pero ...

StructuredTaskScope es una característica nueva de Java 21 que todavía no está 100% verificada, puede dar algún problema. Por ello, está deshabilitada por defecto. Si queremos usarla, debemos habilitarla explícitamente con la opción --enable-preview en el compilado

Ejemplo con StructuredTaskScope[editar]

Vamos con el ejemplo.

Creación del Callable[editar]

Primero creamos una clase que sirva como "tarea" que es encarga de parte del trabajo. Debe implementar Callable indicando el tipo de resultado que devuelve. Double en nuestro ejemplo

class MyTask implements Callable<Double>{
    @Override
    public Double call() throws Exception {
        Thread.sleep((long)(Math.random()*1000));
        return Math.random();
    }
}

Hemos llamado a la clase MyTask. El método que debe implementar es Double call(). Dentro metemos un retardo aleatorio para simular que estamos tardande en ejecutar la tarea y al final devolvemos un resultado, en este caso un número aleatorio para no complicarnos la vida.

Uso de StructuredTaskScope[editar]

El siguiente código crea StructuredTaskScope, lanza cinco MyTask, espera los resultados y los suma todos.

public class StructuredConcurrencyExample {
    public static void main(String[] args) {
        // Creacion del StructuredTaskScope que va a usar Taks que devuelven un Double
        try (StructuredTaskScope scope = new StructuredTaskScope<Double>()){

            // Creamos la lista de Tasks, la lanzamos con StructuredTaskScope.fork() y nos guardamos
            // en la lista el StructuredTaskScope.Subtask que nos devuelve.
            List<StructuredTaskScope.Subtask<Double>> tasks = new ArrayList<>();
            IntStream.range(0,5).forEach(i -> {
                final StructuredTaskScope.Subtask<Double> fork = scope.fork(new MyTask());
                tasks.add(fork);
            });

            // Esperamos que todas las task terminen
            scope.join();

            // Recogemos el resultado de cada StructuredTaskScope.Subtask y lo sumamos.
            double counter = 0.0;
            for (StructuredTaskScope.Subtask<Double> task : tasks) {
                counter = counter+task.get();
            }

            // Sacamos por pantalla el resultado.
            System.out.printf("Suma = %f\n",counter);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Explicamos las partes importantes del código.

Cuando terminemos con StructuredTaskScope debemos cerrarlo con close(). Para asegurarnos el cierre, hacemos el new dentro de una estructura try-with-resources. Ponemos el new dentro de los paréntesis de try() {...} y así el bloque try se encarga de cerrarlo automáticamente al terminar.

Para lanzar las subtareas, se usa scope.fork(new MyTask()), siendo scope la instancia de StructuredTaskScope. Esta llamaa a fork() devuelve un StructuredTaskScope.Subtask<> que es de donde podremos obtener el resultado de la subtarea. Por ello, debemos guardarnos todos estos StructuredTaskScope.Subtask<> en algún sitio, una lista en nuestro ejemplo, para luego poder pedirles los resultados.

Así que a modo de ejemplo hacemos un bucle de cinco para crear cinco subtareas, lanzarlas con scope.fork() y guardamos en una lista lo que nos va devolviendo esta llamada.

Con scope.join() esperamos a que todas las subtareas terminen.

Una vez terminadas, recorremos la lista de StructuredTaskScope.Subtask<> llamando al método get() de cada uno de los elementos. Este método nos devuelve el resultado de cada subtarea. Simplemente los vamos sumando a modo de ejemplo para obtener el resultado total.

StructuredTaskScope Shutdown on Failure[editar]

Es posible que alguna de las subtareas lance una excecpión en su proceso. Si es así, con el código que tenemos hasta ahora, no la detectaremos hasta que llamemos al método get() de StructuredTaskScope.Subtask<>

Java nos ofrece la clase StructuredTaskScope.ShutdownOnFailure, hija de StructuredTaskScope pero que permite abortar automáticamente todo el proceso si alguna subatera lanza una excepción. Tienes el código completo en StructuredConcurrencyShutdownExample.java, pero veamos aquí cuales son las diferencias.

La clase a instanciar es StructuredTaskScope.ShutdownOnFailure en vez de StructuredTaskScope. Lo hacemos también en un try-with-resources

try (var scope = new StructuredTaskScope.ShutdownOnFailure()){
   ...
}

Y en vez de llamar a join(), llamamos a join().throwIfFailed(). Esto lanzará una excepción si alguna de las subtareas ha fallado lanzando una excepción

try (var scope = new StructuredTaskScope.ShutdownOnFailure()){
   ...
   scope.join().throwIfFailed();
   ...
} catch (ExecutionException e) {
   ...
}

StructuredTaskScope Shutdown on Success[editar]

A veces nos interesa lanzar varios hilos en paralelo haciendo las mismas cuentas para obtener un mismo resultado. Quizás porque cada hilo implementa un algoritmo distinto para obtener el mismo resultado y no siempre uno de los algoritmos es siempre el más rápido. En estos casos, nos interesa terminar todo el proeceso en cuanto una cualquiera de las subtareas encuentre el resultado.

El código es igual, pero se intancia la clase StructuredTaskScope.ShutdownOnSuccess. Esta clase, hija de StructuredTaskScope, hace terminar todas las subtareas en cuanto una de ellas devuelve un resultado. Y este resultado se obtiene con le método result(). Puedes ver el ejemplo completo en StructuredConcurrencySuccessExample.java, pero vemos aqui el esqueleto

try (var scope = new StructuredTaskScope.ShutdownOnSuccess<Double>()){
   ...
   scope.join();
   System.out.printf("Primero en terminar = %f\n",scope.result());
}

Simplemente instanciamos la clase StructuredTaskScope.ShutdownOnSuccess, lanzamos todas las subtareas, que no se muestra por ser igual que el primer ejemplo. Se hace un scope.joint() para esperar a la primera tarea que termine Y se recoge su resultado con scope.result().

StructuredTaskScope timeout[editar]

Al hacer join() esperamos indefinidamente a que las tareas terminen. Bien todas, bien hasta que una falle, bien hasta que una se complete, según hemos visto en los tres casos anteriores. Tenemos, no obstante, de fijar un tiempo máximo de espera

try (var scope = new StructuredTaskScope<Double>()) {
   ...
   scope.joinUntil(Instant.ofEpochMilli(System.currentTimeMillis()+1000));
   ...
} catch (TimeoutException e) {
   ...
}

Hemos usado joinUntil() en vez de join(). joinUntil() admite como parámetro un Instant, que es un instante de tiempo medido como número de milisegundos desde el 1 Ene 1970 a las 0:00:00 GMT. Es decir, no inidcamos cuánto timepo queremos esperar, sino hasta que fecha/hora concreta queremos esperar expresado en milisegundos.

Así que si queremos esperar, por ejemplo, un segundo, una forma fácil de hacerlo es sumar 1000 milisegundos (1 segundo) a System.currentTimeMillis() que nos da el instante actual.

Si pasa el tiempo, saltará una excepción TimeoutException que podemos capturar para hacer el tratamiento que consideremos.