Exploiter la puissance des requêtes dans la portée des beans : @RequestScope dans une requête non basée sur le web

Spring, le framework polyvalent, donne aux développeurs les moyens de gérer le cycle de vie des beans grâce à un ensemble de fonctionnalités remarquables. Typiquement, Spring gère le cycle de vie des beans lorsque nous utilisons des annotations telles que @Service ou @Component. Lorsque le conteneur Spring démarre, ces beans sont créés, et lors de son arrêt, ils sont gracieusement détruits. Cependant, au-delà de ces méthodes familières, il existe une multitude de possibilités pour affiner le cycle de vie des beans.

Dans ce blog post, nous allons nous plonger dans un aspect de Spring appelé "Spring Bean Scopes". Plus spécifiquement, nous explorerons les capacités polyvalentes d'un bean scoped request et découvrirons comment il peut être utilisé au-delà des limites d'une requête basée sur le web.

Relever le défi de la RequestScope

RequestScope fonctionne de manière transparente dans un contexte web lorsqu'il est géré par le Spring DispatcherServlet. Cependant, il pose un problème lors d'une tentative d'accès à un bean scoped en dehors des limites d'une requête web. Vous pouvez rencontrer une exception similaire à la suivante :

java.lang.IllegalStateException : No thread-bound request found : Are you referring request attributes outside of a actual web request, or processing a request outside of the originally receiving thread ? Si vous êtes réellement en train d'opérer dans une requête web et que vous recevez toujours ce message, votre code est probablement en train de s'exécuter en dehors de DispatcherServlet/DispatcherPortlet : Dans ce cas, utilisez RequestContextListener ou RequestContextFilter pour exposer la requête en cours.

Néanmoins, cette limitation n'est pas un défaut de conception de Spring. Le framework n'expose pas le RequestScope car il ne peut pas déterminer quand une requête commence et se termine à l'extérieur du DispatcherServlet. Au lieu de cela, il attend des développeurs qu'ils prennent en charge cette responsabilité, et nous pouvons y parvenir en suivant les étapes suivantes :

Création d'un CustomRequestScopeAttr (Principalement copié du blog post de Pranav Maniar)

import org.springframework.web.context.request.RequestAttributes;

import java.util.HashMap;
import java.util.Map;

public class CustomRequestScopeAttr implements RequestAttributes {
    private final Map requestAttributeMap = new HashMap<>();

    @Override
    public Object getAttribute(String name, int scope) {
        if (scope == RequestAttributes.SCOPE_REQUEST) {
            return requestAttributeMap.get(name);
        }

        return null;
    }

    @Override
    public void setAttribute(String name, Object value, int scope) {
        if (scope == RequestAttributes.SCOPE_REQUEST) {
            requestAttributeMap.put(name, value);
        }
    }

    @Override
    public void removeAttribute(String name, int scope) {
        if (scope == RequestAttributes.SCOPE_REQUEST) {
            requestAttributeMap.remove(name);
        }
    }

    @Override
    public String[] getAttributeNames(int scope) {
        if (scope == RequestAttributes.SCOPE_REQUEST) {
            return requestAttributeMap
                    .keySet()
                    .toArray(new String[0]);
        }

        return new String[0];
    }
 // todo implement other methods (not used. just return null)
} 

Ceci ne sera disponible et ne fonctionnera que pour les beans scoped de requêtes et non pour d'autres scopes également.

Ensuite, nous devons définir manuellement les RequestAttributes. Ceci doit être fait là où logiquement notre processus scoped de requête commence. C'est fait en appelant :

RequestContextHolder.setRequestAttributes(nouveau CustomRequestScopeAttr())  

Une fois le processus de scope de requête terminé, nous réinitialisons les RequestAttributes en les appelant :

RequestContextHolder.resetRequestAttributes() ;

En appelant ces méthodes RequestContextHolder.setRequestAttributes(new CustomRequestScopeAttr()) pour démarrer le processus et RequestContextHolder.resetRequestAttributes() pour le terminer - nous prenons le contrôle de la détermination du début et de la fin d'une requête.

Dans les sections suivantes, nous explorerons différents scénarios pour illustrer la manière dont nous pouvons déterminer efficacement le début et la fin d'une requête.

Utiliser les requêtes étendues dans @Async

La rencontre d'une annotation @Async au sein d'une requête web peut entraîner une IllegalStateException lors d'une tentative d'accès à un bean scoped de la requête. Pour remédier à cela, nous devons définir les RequestAttributes lors de la création du fil asynchrone. Cela implique la création d'une AsyncConfiguration personnalisée pour gérer la création de fils asynchrones, où nous définissons les RequestAttributes en conséquence.

Le texte suivant est un exemple d'AsyncConfiguration:

@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {
    private final TaskExecutionProperties taskExecutionProperties;

    @Override
    @Bean(name = "taskExecutor")
    public Executor getAsyncExecutor() {
        log.debug("Creating Async Task Executor");

        return getExecutor(
                taskExecutionProperties.getPool(),
                taskExecutionProperties.getThreadNamePrefix()
        );
    }
    private Executor getExecutor(TaskExecutionProperties.Pool pool, String threadNamePrefix) {
        ContextAwarePoolExecutor executor = new ContextAwarePoolExecutor();

        executor.setCorePoolSize(pool.getCoreSize());
        executor.setMaxPoolSize(pool.getMaxSize());
        executor.setQueueCapacity(pool.getQueueCapacity());
        executor.setThreadNamePrefix(threadNamePrefix);

        return executor;
    }

Lorsque vous entrez un bloc @Async, getAsyncExecutor() est appelé, et "taskExecutor" est le nom par défaut. Alternativement, vous pouvez passer un nom d'exécuteur dans l'annotation @Async. La configuration est alors transmise à notre ContextAwarePoolExecutor personnalisé, qui est défini comme suit :

import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.context.request.RequestContextHolder;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;

public class ContextAwarePoolExecutor extends ThreadPoolTaskExecutor {
    /**
     * @param task the {@code Callable} to execute (never {@code null}) - is the actual method we want to call
     */
    @Override
    public  Future submit(Appelable task) {
        return super.submit(
                new ContextAwareCallable(RequestContextHolder.currentRequestAttributes(), task)
        );
    }

    @Override
    public  ListenableFuture submitListenable(Appelable task) {
        return super.submitListenable(
                new ContextAwareCallable(RequestContextHolder.currentRequestAttributes(), task)
        );
    }
}

Le ContextAwarePoolExecutor gère l'exécution réelle du fil nouvellement créé et s'assure que les RequestAttributes sont correctement définis :

import org.springframework.web.context.request.RequestAttributes ;
import org.springframework.web.context.request.RequestContextHolder ;

import java.util.concurrent.Callable ;

public class ContextAwareCallable implements Callable {
    private final CustomRequestScopeAttr requestAttributes;
    private Callable task ;

    public ContextAwareCallable(Attributs de la requête requestAttributes, Callable task) {
        this.task = task;
        this.requestAttributes = cloneRequestAttributes(requestAttributes);
    }

    @Override
    public T call() throws Exception {
        try {
            RequestContextHolder.setRequestAttributes(requestAttributes);
            return task.call();
        } finally {
            RequestContextHolder.resetRequestAttributes();
        }
    }

    /**
     * this is needed, because once the main thread is finished, the object may get garbage collected, even if the async thread is not finished
     *
     * @param requestAttributes
     * @return
     */
    private CustomRequestScopeAttr cloneRequestAttributes(RequestAttributes requestAttributes) {
        CustomRequestScopeAttr clonedRequestAttribute = null;

        try {
            clonedRequestAttribute = new CustomRequestScopeAttr();

            for (String name : requestAttributes.getAttributeNames(RequestAttributes.SCOPE_REQUEST)) {
                clonedRequestAttribute.setAttribute(
                        name,
                        requestAttributes.getAttribute(name, RequestAttributes.SCOPE_REQUEST),
                        RequestAttributes.SCOPE_REQUEST
                );
            }
            return clonedRequestAttribute;
        } catch (Exception e) {
            return new CustomRequestScopeAttr();
        }
    }
}

La classe ContextAwareCallable implémente l'interface Callable et gère l'exécution du nouveau thread. Dans sa méthode call(), nous définissons les RequestAttributes pour "marquer" le début de la requête. Comme nous passons les RequestAttributes du thread principal, nous devons les cloner avant de les définir. Cela évite les problèmes où le fil principal peut se terminer avant le fil asynchrone, ce qui conduit à une accumulation de déchets d'attributs. Dans le bloc final de la méthode call(), nous "marquons" la fin de la requête en réinitialisant les RequestAttributes. Cette approche permet de cascader plusieurs appels asynchrones, car les attributs de requête sont transmis et toujours clonés pour chaque nouveau fil.

En utilisant ce mécanisme, on peut utiliser sans problème les annotations @Async tout en conservant la fonctionnalité des requêtes scoped beans.

Utiliser des requêtes étendues avec Pub/Sub

La règle de pare-feu pour la connexion au backend s'applique uniquement à une balise spécifique, qui ressemble à un nœud. Par exemple : "gke-staging-456b4340-node "Mais il s'agit d'une balise réseau, qui se trouve sur chaque instance de calcul du cluster. Pour que le heaWhen travaille avec des événements de consommation Pub/Sub, qui sont des requêtes provenant d'une file d'attente plutôt que des interactions basées sur le web, l'utilisation de requêtes scoped beans nécessite le paramétrage de RequestAttributes. Heureusement, la définition du début et de la fin d'une requête dans un "contexte" Pub/Sub est simple, car nous avons une visibilité claire sur le début et la fin d'une requête. Dans notre cas, nous utilisons Spring Cloud GCP Pub/Sub.

Nous avons une classe PubSubConsumer qui ressemble un peu à cela :

@Slf4j
public abstract class PubSubConsumer { 
...
    public MessageReceiver receiver() {
        return (PubsubMessage message, AckReplyConsumer consumer) -> {
            try {
                String messageString = parseMessageToString(message, consumer);
                if (messageString == null) {
                    return;
                }

                startConsumeProcess(messageString);

                consumer.ack();
            } catch (NackException e) {
                // we are fine. just nack and try again
                log.info("received nack exception. we will nack this queue entry", e);
                consumer.nack();
            } catch (AckException e) {
                // we are fine. we can ack this one
                log.info("received ack exception. we will ack this queue entry", e);
                consumer.ack();
            } catch (Exception e) {
                // we are not fine
                log.error("error while receiving message from subscription {}", subscription, e);
                consumer.nack();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        };
    }

    protected T parseStringToPayloadType(String messageString) {
        try {
            return objectMapper.readValue(messageString, payloadType);
        } catch (IOException e) {
           ...
        }
    }

    protected String parseMessageToString(PubsubMessage message, AckReplyConsumer consumer) {
        log.info("receive message from subscription {} with payload {}", subscription, message);
        if (message == null || message.getData() == null) {
            ...
        }
        return message.getData().toStringUtf8();
    }

    /**
     * actual consumer process logic.
     *
     * @param messageString String content of a message.
     * @throws Exception
     */
    protected void startConsumeProcess(String messageString) throws Exception {
        RequestContextHolder.setRequestAttributes(new CustomRequestScopeAttr());

        T payload = parseStringToPayloadType(messageString);

        setContextVariables(payload);

        consume(payload);
    }
...
}

Lors du démarrage du processus de consommation dans la méthode startConsumeProcess() , nous définissons les attributs RequestScope . Dans le bloc final du récepteur réel, nous réinitialisons les RequestAttributes. L'ensemble de l'événement Pub/Sub, du début du consommateur jusqu'à la fin, forme une requête complète, garantissant que la gestion des requêtes scoped beans fonctionne sans problème dans ce contexte.

Même si vous invoquez une méthode asynchrone au sein du consommateur, cette approche reste efficace. Veillez simplement à mettre en œuvre les modifications du chapitre "Async" pour gérer correctement les scénarios asynchrones. En combinant ces stratégies, vous pouvez utiliser en toute confiance des requêtes scoped beans dans les événements Pub/Sub, facilitant ainsi un traitement robuste et efficace des messages au sein de votre application Spring.

Faire face à Java ParallelStream

A l'heure actuelle, je n'ai malheureusement pas trouvé de solution viable / générique pour accéder à une requête scoped bean à l'intérieur d'un java ParallelStream. Le problème sous-jacent réside dans l'utilisation d'un fork/join pool commun par les flux Java pour la parallélisation. Ces threads ne sont ni créés ni configurés par Spring ou le développeur, contrairement à ce que nous avons fait dans la section "Async". Bien que vous puissiez définir manuellement les attributs de la requête pour chaque ParallelStream, ce n'est pas une solution pratique, surtout si vous travaillez déjà avec plusieurs ParallelStreams, ce qui entraîne des conflits potentiels.

Une autre approche serait de créer un ForkJoinPool personnalisé et de l'utiliser pour le streaming parallèle. Cela pourrait ressembler à quelque chose comme ceci (copié d'ici) :

final int parallelism = 4;
ForkJoinPool forkJoinPool = null;
try {
    forkJoinPool = new ForkJoinPool(parallelism);
    final List primes = forkJoinPool.submit(() ->
        // Parallel task here, for example
        IntStream.range(1, 1_000_000).parallel()
                .filter(PrimesPrint::isPrime)
                .boxed().collect(Collectors.toList())
    ).get();
    System.out.println(primes);
} catch (InterruptedException | ExecutionException e) {
    throw new RuntimeException(e);
} finally {
    if (forkJoinPool != null) {
        forkJoinPool.shutdown();
    }
}

Cette approche requiert explicitement l'envoi de tâches au forkJoinPool personnalisé à chaque fois que vous souhaitez effectuer des opérations parallèles, ce qui rend l'utilisation de ParallelStreams impraticable.

Nous avons pris la décision d'accepter la limitation de ne pas pouvoir utiliser les request scoped beans à l'intérieur de ParallelStreams. Au lieu de cela, nous adoptons des stratégies alternatives pour atteindre nos objectifs sans recourir aux requêtes scoptées dans ces scénarios d'exécution parallèle. Bien que cette limitation puisse présenter certaines contraintes, la comprendre et travailler autour d'elle nous permet d'utiliser efficacement ParallelStreams au sein de notre application Spring.

Résumé

Les requêtes scoped beans dans Spring offrent une flexibilité significative, étendant leur utilisation au-delà des requêtes traditionnelles basées sur le web. Cependant, lorsqu'ils utilisent ces beans en dehors du contexte web standard, les développeurs doivent prendre la responsabilité de définir le début et la fin de chaque processus. Spring ne peut pas gérer cela automatiquement dans des environnements ou des flux de processus personnalisés. À travers différents scénarios, tels que le traitement asynchrone, les événements Pub/Sub, et les pools de fils personnalisés, nous avons démontré des techniques efficaces pour gérer les requêtes des requêtes scoped beans. La seule limitation que nous ayons rencontrée jusqu'à présent est avec ParallelStreams, où il nous manque encore une solution générique pour accéder aux requêtes scoptées. Malgré cette limitation, la compréhension des contraintes et l'utilisation de stratégies alternatives permettent aux développeurs d'exploiter le plein potentiel des request-scoped beans dans leurs applications Spring.

Kai Müller
Ingénieur logiciel