Il y a 15 ans -
Temps de lecture 6 minutes
fr.xebia.concurrent.CyclicLatch
….ou comment effectuer un traitement régulièrement
Avec l’arrivée de l’api java.util.concurrent
dans le JDK 5, la programmation concurrente est à la portée de tous. Auparavant, il fallait :
- soit être un expert des APIs de bas niveau et être prêt à passer des nuits blanches à mettre au point le système,
- soit se tourner vers les serveurs d’applications J2EE et leur implémentation JMS et EJB Message Driven Bean. Dans ce cas-là, la lourdeur de l’API JMS et les contraintes de persistance et de transaction (par défaut) des serveurs JMS viendront mettre à mal au final l’utilisation de traitements parallèles et concurrents.
Dans le cadre d’un de mes projets, j’ai eu besoin d’implémenter le comportement suivant :
- les producteurs, N Threads effectuent une tâche qui entre autres collecte des données.
- le consommateur, 1 Thread collecte ces données pour les agréger.
Dans un premier temps, je me tourne vers une java.util.concurrent.BlockingQueue partagée entre les producteurs et le consommateur. Cette conception fonctionne jusqu’au moment où je me suis aperçu que
- le traitement effectué par mon consommateur pouvait être long et coûteux en ressources : il faut donc éviter d’effectuer le traitement au fil de l’eau et attendre d’avoir un certain nombre de données dans la file.
- les producteurs, suivant la vie de l’application, pouvaient à un instant donné être très nombreux (beaucoup de données en peu de temps) ou au contraire peu nombreux. Dans ce dernier cas, le remplissage de ma file prendrait des heures et je ne pouvais attendre cette condition pour lancer mon traitement.
Cahier des charges
Je me suis donc fixé le cahier des charges suivant :
Le consommateur doit vider l’ensemble de la queue et effectuer le traitement si :
- au moins N éléments ont été déposés (cf 1.)
- S secondes se sont écoulées depuis la dernière collecte (cf 2.)
J’ai commencé, bien sûr, par examiner l’API java.util.concurrent
. Deux éléments de synchronisation ont attiré mon attention sans réellement convenir à mon besoin.
- java.util.concurrent.CountDownLatch permet de signaler à un ensemble de threads que N opérations sont terminées.
- java.util.concurrent.CyclicBarrier permet à un ensemble de threads de s’attendre à un point d’exécution donnée.
J’ai décidé de mixer les deux pour écrire fr.xebia.concurrent.CyclicLatch
Utilisation
Une instance de la classe fr.xebia.concurrent.CyclicLatch
est créée avec les paramètres suivants :
- une valeur N qui indique le nombre de fois avant de que le Latch ne soit libéré
- une valeur S qui indique le timeout
// création d'un CyclicLatch avec N=100 et S=10 secondes latch = new CyclicLatch(100, 10, TimeUnit.SECONDS);
Cette instance est partagée entre les producteurs et le consommateur.
- A chaque traitement terminé, chaque producteur va décrémenter le Latch.
public class Producer { public void doit() { //Perform its own business... latch.countDown(); } }
- Le consommateur va se mettre en attente du latch
public class Consummer { public void run() { while (true) { latch.await() // attente avec les valeurs par défauts //ou latch.await(30,TimeUnit.Secondes) // attente de 30 secondes // Perform its own business.... } } }
Implémentation
La classe CyclicLatch
est implémentée autour
- d’un attribut de type CountDownLatch
final public class CyclicLatch { private final int initialcount; private CountDownLatch latch; private final ReentrantLock lock = new ReentrantLock(); public CyclicLatch(int initialcount) { this.latch = new CountDownLatch(initialcount); } }
- de 2 méthodes,
countDown()
etawait()
.
La méthode countDown()
verrouille l’accès au latch et délègue au CountDownLatch
.
public void countDown() { try { lock.lock(); latch.countDown(); } finally { lock.unlock(); } }
La méthode await()
délègue au CountDownLatch.await(long, TimeUnit)
Extrait de javadoc : Causes the current thread to wait until the latch has counted down to zero, unless the thread is interrupted or the specified waiting time elapses.
Et ensuite effectue reset safe du CountDownLatch
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { log("Waiting for latch of "+this.initialcount+" & " + timeout + " seconds timeout"); boolean result = latch.await(timeout, unit); /* Reset Latch, on timeout & on overflow */ try { lock.lock(); log("Reset Latch...." + latch); latch = new CountDownLatch(initialcount); } finally { lock.unlock(); } return result; }
L’implémentation complète du CyclicLatch permet de fixer les paramètres de taille et de timeout (méthode await()) ou à chaque attente du consommateur de positionner un nouveau timeout (méthode {{await(long, TimeUnit)}})
Tests
Dans le repository de Xebia-France, vous trouverez le code complet du CyclicLatch et un exemple de mise en œuvre avec la classe fr.xebia.xke.concurrency.NProducers1ConsumerTimedTest. Cette classe crée 4 producteurs qui remplissent une BlockinQueue
. Le consommateur doit lire les messages en utilisant bien sûr un CyclicLatch
. Suivant le paramétrage du latch on obtient les comportements suivants :
- CyclicLatch taille 100, time out 30 secondes. Les traces, ci-dessous, montre que le latch a été déclenché sur ‘Overflow’ donc plus de 100 messages dans la file, au bout d’environs 2.5 secondes
main Consumer.Consumer() MyBasket.start() Thread Consumer #0 Consumer.run() Thread Consumer #0 Waiting for latch of 100 & 5 seconds timeout Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@45a877[Count = 0] Thread Consumer #0 OverFlow, 2.4278326s Thread Consumer #0 messages #100 Thread Consumer #0 Waiting for latch of 100 & 5 seconds timeout Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@126b249[Count = 0] Thread Consumer #0 OverFlow, 2.5149238s Thread Consumer #0 messages #100 Thread Consumer #0 Waiting for latch of 100 & 5 seconds timeout Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@182f0db[Count = 0] Thread Consumer #0 OverFlow, 2.5150535s Thread Consumer #0 messages #100 Thread Consumer #0 Waiting for latch of 100 & 5 seconds timeou Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@192d342[Count = 0] Thread Consumer #0 OverFlow, 2.5142925s Thread Consumer #0 messages #100 Thread Consumer #0 Waiting for latch of 100 & 5 seconds timeout 10062 main done MyBasket.stop()
- CyclicLatch taille 100, time out 2 secondes. Les traces, ci-dessous, montrent qui le latch a été déclenché maintenant sur timeout avec une file qui contient environ 80 messages.
main Consumer.Consumer() MyBasket.start() Thread Consumer #0 Consumer.run() Thread Consumer #0 Waiting for latch of 100 & 2 seconds timeout Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@45a877[Count = 20] Thread Consumer #0 TimeOut, 2.0013237s Thread Consumer #0 messages #80 Thread Consumer #0 Waiting for latch of 100 & 2 seconds timeout Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@126b249[Count = 20] Thread Consumer #0 TimeOut, 2.000778s Thread Consumer #0 messages #80 Thread Consumer #0 Waiting for latch of 100 & 2 seconds timeout Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@182f0db[Count = 20] Thread Consumer #0 TimeOut, 2.0010734s Thread Consumer #0 messages #80 Thread Consumer #0 Waiting for latch of 100 & 2 seconds timeout Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@192d342[Count = 20] Thread Consumer #0 TimeOut, 2.0014079s Thread Consumer #0 messages #80 Thread Consumer #0 Waiting for latch of 100 & 2 seconds timeout Thread Consumer #0 Reset Latch....java.util.concurrent.CountDownLatch@6b97fd[Count = 20] Thread Consumer #0 TimeOut, 2.001409s Thread Consumer #0 messages #80 Thread Consumer #0 Waiting for latch of 100 & 2 seconds timeout 10078 main done MyBasket.stop()
Conclusion
La programmation concurrente est vraiment passionnante, mais il faut absolument devenir paranoïaque. En effet, il faut envisager tous les cas possibles: chaque instruction doit être envisagée comme pouvant être interrompue. Le simple conseil que je vous donnerais est de commencer par synchroniser toutes les sections critiques (mot-clé synchronized
). Ensuite, et après réflexion et bench, vous pourrez passer à des verrouillages de plus bas niveau si nécessaire (les JVM modernes ont fait d’énormes progrès dans ce domaine!)
Depuis que j’ai implémenté cette classe pour un projet et un besoin précis, je trouve régulièrement un nouveau cas d’utilisation. Le dernier en date est l’affichage dynamique dans une application Swing de graphes JFreeChart d’un très grand nombre de valeurs en fonction du temps.
Et vous, que ferez-vous du CyclicLatch ?
Commentaire
3 réponses pour " fr.xebia.concurrent.CyclicLatch "
Published by Dominique De Vito , Il y a 15 ans
Ne serait-ce pas une implémentation custom de la JSR-166, plus précisément la 2eme partie de cette JSR devant être inclus au sein du JDK 7 ?
Cf. http://www.insideit.fr/post/2008/09/15/Java-7-Fork/Join-%3A-Exploiter-toute-la-puissance
Published by Benoit Moussaud , Il y a 15 ans
Je pense que c’est beaucoup d’honneur d’associer le CyclicLatch à une implémentation de la JSR-166
Published by Erwan Alliaume , Il y a 15 ans
@Dominique
Vous faites certainement référence à l’un des futurs outils de concurrence proposés pour le jdk7 : le
Phaser
dont nous en avions fait une rapide présentation lors d’une de nos revues de presse. Si leCyclicLatch
de Benoit en reprend quelques principes de fonctionnement du Phaser, il n’en reprend pas son implémentation. Pour mémoire, l’une des grandes promesses de cette nouvelle version de JSR (JSR-166y) concerne la simplification de traitements parallèles dans des environnements multi-cores avec l’arrivée de la nouvelle API Fork and Join.@Benoit
Je suis certain que Doug Lea est jaloux de ton CyclicLatch :)