Útmutató a Java SynchronousQueue-hoz

1. Áttekintés

Ebben a cikkben megnézzük a SynchronousQueue tól java.util.egyidejű csomag.

Egyszerűen fogalmazva, ez a megvalósítás lehetővé teszi számunkra, hogy szálak között biztonságos módon cseréljünk információt.

2. API áttekintés

A SynchronousQueue csak van két támogatott művelet: vesz() és put (), és mindketten blokkolnak.

Például, ha elemet akarunk hozzáadni a sorhoz, akkor meg kell hívnunk a put () módszer. Ez a módszer addig blokkol, amíg más szál nem hívja meg a vesz() módszer, jelezve, hogy készen áll egy elem felvételére.

Habár a SynchronousQueue rendelkezik egy sor interfésszel, akkor arra kell gondolnunk, mint egy két elem közötti szál egyetlen elemének cserepontjára, amelyben az egyik szál egy elemet ad le, egy másik szál pedig ezt az elemet veszi fel.

3. Az átadás-végrehajtás megosztott változó használatával

Hogy megtudja, miért SynchronousQueue olyan hasznos lehet, hogy egy logikát két szál közötti megosztott változó használatával valósítunk meg, majd ezt követően átírjuk ezt a logikát SynchronousQueue így a kódunk sokkal egyszerűbb és olvashatóbb.

Tegyük fel, hogy két szálunk van - termelő és fogyasztó -, és amikor a termelő egy megosztott változó értékét állítja be, ezt a tényt szeretnénk jelezni a fogyasztói szálnak. Ezután a fogyasztói szál egy megosztott változóból fog lekérni egy értéket.

A CountDownLatch e két szál összehangolása annak megakadályozása érdekében, hogy a fogyasztó hozzáférjen egy megosztott változó olyan értékéhez, amelyet még nem állítottak be.

Meghatározzuk a sharedState változó és a CountDownLatch amelyet a feldolgozás koordinálására használnak:

ExecutorService végrehajtó = Executors.newFixedThreadPool (2); AtomicInteger sharedState = új AtomicInteger (); CountDownLatch countDownLatch = new CountDownLatch (1);

A gyártó véletlenszerű egész számot menti a sharedState változót, és hajtsa végre a visszaszámlálás() módszer a countDownLatch, jelezve a fogyasztónak, hogy képes lekérni egy értéket a sharedState:

Futható producer = () -> {Egész szám előállítottElement = SzálLokálisRandom .current () .nextInt (); sharedState.set (producerElement); countDownLatch.countDown (); };

A fogyasztó várja a countDownLatch használni a várják() módszer. Amikor a gyártó jelzi, hogy a változó be lett állítva, a fogyasztó lekéri azt a sharedState:

Futható fogyasztó = () -> {try {countDownLatch.await (); Egész számot fogyasztott elem = sharedState.get (); } catch (InterruptedException ex) {ex.printStackTrace (); }};

Végül, de nem utolsósorban kezdjük el programunkat:

végrehajtó.execute (producer); végrehajtó.execute (fogyasztó); végrehajtó.awaitTermination (500, TimeUnit.MILLISECONDS); végrehajtó.leállítás (); assertEquals (countDownLatch.getCount (), 0);

A következő kimenetet fogja produkálni:

Elem mentése: -1507375353 a cserepontra felemésztett egy elemet: -1507375353 a cserepontról

Láthatjuk, hogy ez sok kód egy ilyen egyszerű funkció megvalósításához, mint például egy elem cseréje két szál között. A következő részben megpróbáljuk jobbá tenni.

4. A handoffok végrehajtása a SynchronousQueue

Vezessük be most ugyanazt a funkciót, mint az előző szakaszban, de a-val SynchronousQueue. Kettős hatása van, mert használhatjuk a szálak közötti állapotcserére és a művelet összehangolására, hogy ne kelljen mást használnunk SynchronousQueue.

Először meghatározunk egy sort:

ExecutorService végrehajtó = Executors.newFixedThreadPool (2); SynchronousQueue queue = új SynchronousQueue ();

A producer felhívja a put () addig blokkol, amíg valamelyik másik szál el nem vesz egy elemet a sorból:

Futható producer = () -> {Egész szám előállítottElement = SzálLokálisRandom .current () .nextInt (); próbáld ki a {queue.put (producerElement); } catch (InterruptedException ex) {ex.printStackTrace (); }};

A fogyasztó egyszerűen megszerzi az elemet a vesz() módszer:

Futtatható fogyasztó = () -> {try {Egész egész fogyasztott elem = várólista. take (); } catch (InterruptedException ex) {ex.printStackTrace (); }};

Ezután elindítjuk programunkat:

végrehajtó.execute (producer); végrehajtó.execute (fogyasztó); végrehajtó.awaitTermination (500, TimeUnit.MILLISECONDS); végrehajtó.leállítás (); assertEquals (queue.size (), 0);

A következő kimenetet fogja produkálni:

Elem: 339626897 elmentése a cserepontba felemésztette a 339626897 elemet a cserepontról

Láthatjuk, hogy a SynchronousQueue szálak közötti cserepontként használják, ami sokkal jobb és érthetőbb, mint az előző példa, amely a megosztott állapotot CountDownLatch.

5. Következtetés

Ebben a gyors bemutatóban megnéztük a SynchronousQueue konstrukció. Létrehoztunk egy programot, amely megosztotta az adatokat két szál között megosztott állapot használatával, majd átírta azt a programot a SynchronousQueue konstrukció. Ez cserepontként szolgál, amely koordinálja a termelő és a fogyasztói szálat.

Ezeknek a példáknak és kódrészleteknek a megvalósítása megtalálható a GitHub projektben - ez egy Maven projekt, ezért könnyen importálhatónak és futtathatónak kell lennie.