Útmutató a java.util.concurrent.BlockingQueue fájlhoz

1. Áttekintés

Ebben a cikkben megnézzük az egyik leghasznosabb konstrukciót java.util.egyidejű az egyidejű termelő-fogyasztó probléma megoldására. Megnézzük a BlockingQueue interfész és az ebből a felületből származó módszerek megkönnyítik az egyidejű programok írását.

A cikk későbbi részében bemutatunk egy példát egy egyszerű programra, amely több gyártószálat és több fogyasztói szálat tartalmaz.

2. BlockingQueue Típusok

Két típusát különböztethetjük meg BlockingQueue:

  • korlátlan sor - szinte a végtelenségig növekedhet
  • korlátozott sor - maximális kapacitás definiálva

2.1. Korlátlan sor

A korlátlan sorok létrehozása egyszerű:

BlockingQueue blockingQueue = new LinkedBlockingDeque ();

A kapacitás blockingQueue értékre lesz állítva Egész.MAX_VALUE. Minden olyan művelet, amely egy elemet ad a korlátlan sorhoz, soha nem fog blokkolni, így nagyon nagyra nőhet.

A legfontosabb a gyártó-fogyasztó program megtervezésekor a korlátlan BlockingQueue használatával, hogy a fogyasztóknak képesnek kell lenniük arra, hogy olyan gyorsan fogyasszanak üzeneteket, amint a gyártók üzenetet adnak a sorba. Ellenkező esetben a memória megtelhet, és egy Elfogyott a memória kivétel.

2.2. Korlátozott sor

A sorok második típusa a korlátozott sor. Ilyen sorokat úgy hozhatunk létre, hogy a kapacitást argumentumként átadjuk egy konstruktornak:

BlockingQueue blockingQueue = new LinkedBlockingDeque (10);

Itt van egy blockingQueue amelynek kapacitása egyenlő 10. Ez azt jelenti, hogy amikor egy gyártó megpróbál egy elemet hozzáadni egy már teljes sorhoz, attól függően, hogy milyen módszert használtak hozzá (ajánlat(), add () vagy put ()), addig blokkol, amíg az objektum beszúrására rendelkezésre álló hely nem válik elérhetővé. Ellenkező esetben a műveletek sikertelenek lesznek.

A korlátozott várólista használata jó módszer az egyidejű programok tervezésére, mivel amikor egy elemet beillesztünk egy már teljes sorba, akkor a műveleteknek meg kell várniuk, amíg a fogyasztók felzárkóznak, és helyet biztosítanak a sorban. Ez fékezést biztosít nekünk minden erőfeszítés nélkül.

3. BlockingQueue API

Kétféle módszer létezik a BlockingQueue felületaz elemek várakozási sorba történő felvételéért felelős módszerek és az ezeket az elemeket lekérő módszerek. E két csoport mindegyik módszere eltérően viselkedik abban az esetben, ha a sor tele / üres.

3.1. Elemek hozzáadása

  • add () - visszatér igaz ha a beszúrás sikeres volt, különben dob egy IllegalStateException
  • put () - beszúrja a megadott elemet egy sorba, szükség esetén várva egy szabad rést
  • ajánlat () - visszatér igaz ha a beillesztés sikeres volt, különben hamis
  • ajánlat (E e, hosszú időtúllépés, TimeUnit egység) - megpróbálja betenni az elemet egy sorba, és egy megadott időkorláton belül megvárja a rendelkezésre álló helyet

3.2. Elemek lekérése

  • vesz() - megvárja a sor fejét és eltávolítja. Ha a sor üres, akkor blokkol, és várja, amíg egy elem elérhetővé válik
  • közvélemény-kutatás (hosszú időtúllépés, TimeUnit egység) - lekéri és eltávolítja a sor fejét, várakozásig a megadott várakozási időig, ha szükséges egy elem elérhetővé válásához. Visszatér nulla időtúllépés után

Ezek a módszerek jelentik a legfontosabb építőelemeket BlockingQueue interfész a termelő-fogyasztó programok felépítésekor.

4. Többszálas gyártó-fogyasztó példa

Hozzunk létre egy programot, amely két részből áll - egy Producerből és egy Consumerből.

A Producer véletlenszerű számot állít elő 0 és 100 között, és ezt a számot a-ba teszi BlockingQueue. 4 produceri szálunk lesz, és használjuk a put () blokkolási módszer, amíg szabad hely nem áll rendelkezésre a sorban.

Fontos megjegyezni, hogy meg kell akadályoznunk a fogyasztói szálak várakozását, amíg egy elem a végtelenségig megjelenik a sorban.

Jó módszer arra, hogy jelezzék a gyártótól a fogyasztóig, hogy nincs több feldolgozandó üzenet, ha speciális üzenetet küldenek egy méregtablettának. Annyi méregtablettát kell küldenünk, ahány fogyasztónk van. Aztán amikor a fogyasztó kiveszi azt a speciális méregtabletta üzenetet a sorból, akkor a végrehajtás kecsesen befejeződik.

Nézzünk meg egy termelői osztályt:

public class NumbersProducer megvalósítja a Runnable {private BlockingQueue numbersQueue; privát végső int méregtabletta; private final int poisonPillPerProducer; public NumbersProducer (BlockingQueue numbersQueue, int poisonPill, int poisonPillPerProducer) {this.numbersQueue = számokQueue; this.poisonPill = méregPill; this.poisonPillPerProducer = poisonPillPerProducer; } public void run () {try {generatorNumbers (); } catch (InterruptedException e) {Szál.currentThread (). megszakítás (); }} private void generatorNumbers () dobja az InterruptedException {for (int i = 0; i <100; i ++) {numbersQueue.put (ThreadLocalRandom.current (). nextInt (100)); } for (int j = 0; j <poisonPillPerProducer; j ++) {numbersQueue.put (poisonPill); }}}

Produktorkonstruktorunk érvként a BlockingQueue amelyet a feldolgozás összehangolására használnak a termelő és a fogyasztó között. Látjuk azt a módszert generálszámok () 100 elemet tesz sorba. Méregtablettára is szükség van, hogy megtudjuk, milyen típusú üzenetet kell a sorba tenni, amikor a végrehajtás befejeződik. Ezt az üzenetet el kell adni poisonPillPerProducer alkalommal sorba.

Minden fogyasztó elvesz egy elemet a BlockingQueue felhasználásával vesz() metódust, így addig blokkol, amíg egy elem nem lesz a sorban. Miután vett egy Egész szám egy sorból ellenőrzi, hogy az üzenet méregtabletta-e, ha igen, akkor egy szál végrehajtása befejeződik. Ellenkező esetben kinyomtatja az eredményt a normál kimenetre az aktuális szál nevével együtt.

Ez betekintést enged a fogyasztóink belső működésébe:

NumbersConsumer public class valósítja meg a Runnable {private BlockingQueue queue; privát végső int méregtabletta; public NumbersConsumer (BlockingQueue queue, int poisonPill) {this.queue = queue; this.poisonPill = méregPill; } public void run () {try {while (true) {Egész szám = queue.take (); if (szám.egyenlő (méregPill)) {return; } System.out.println (Thread.currentThread (). GetName () + "eredmény:" + szám); }} catch (InterruptedException e) {Szál.currentThread (). megszakítás (); }}}

A fontos dolog, amit észre kell venni, a sor használata. Ugyanúgy, mint a producer konstruktornál, egy sor kerül átadásra argumentumként. Megtehetjük, mert BlockingQueue kifejezett szinkronizálás nélkül megosztható a szálak között.

Most, hogy megvan a termelőnk és a fogyasztónk, elkezdhetjük a programunkat. Meg kell határoznunk a sor kapacitását, és 100 elemre kell állítanunk.

4 gyártói szálat szeretnénk, és a fogyasztói szálak száma megegyezik a rendelkezésre álló processzorok számával:

int KÖTÖTT = 10; int N-TERMELŐK = 4; int N_CONSUMERS = Runtime.getRuntime (). availableProcessors (); int poisonPill = Egész.MAX_ÉRTÉK; int poisonPillPerProducer = N_FOGYASZTÓK / N_PRODUCEREK; int mod = N_CONSUMERS% N_PRODUCERS; BlockingQueue queue = új LinkedBlockingQueue (BOUND); for (int i = 1; i <N_PRODUCERS; i ++) {new Thread (új NumbersProducer (várólista, poisonPill, poisonPillPerProducer)). start (); } for (int j = 0; j <N_FELHASZNÁLÓK; j ++) {új szál (új NumbersConsumer (várólista, poisonPill)). start (); } új szál (új NumbersProducer (várólista, poisonPill, poisonPillPerProducer + mod)). start (); 

BlockingQueue kapacitású konstrukció segítségével jön létre. 4 gyártót és N fogyasztót hozunk létre. Meghatározzuk a méregtabletta üzenetünket Egész.MAX_VALUE mert ilyen értéket normál munkakörülmények között soha nem küld termelőnk. A legfontosabb, amit itt észre kell venni BlockingQueue a munka összehangolására szolgál közöttük.

Amikor lefuttatjuk a programot, 4 producer szál véletlenszerű lesz Egész számok a BlockingQueue és a fogyasztók ezeket az elemeket kiveszik a sorból. Minden szál normál kimenetre nyomtatja a szál nevét és az eredményt.

5. Következtetés

Ez a cikk bemutatja a BlockingQueue és elmagyarázza azokat a módszereket, amelyekkel elemeket lehet hozzáadni és lekérni belőle. Megmutattuk azt is, hogyan lehet többszálas gyártói-fogyasztói programot felépíteni BlockingQueue a termelők és a fogyasztók közötti munka összehangolása.

Ezeknek a példáknak és kódrészleteknek a megvalósítása megtalálható a GitHub projektben - ez egy Maven-alapú projekt, ezért könnyen importálhatónak és futtathatónak kell lennie.