Bevezetés a Java szálkészletekbe

1. Bemutatkozás

Ez a cikk a Java szálkészleteinek áttekintése - kezdve a szokásos Java könyvtár különböző implementációival, majd a Google Guava könyvtárával.

2. A szálmedence

A Java-ban a szálak rendszerszintű szálakhoz vannak hozzárendelve, amelyek az operációs rendszer erőforrásai. Ha ellenőrizhetetlenül hoz létre szálakat, akkor hamar elfogyhatnak ezek az erőforrások.

A szálak közötti kontextusváltást az operációs rendszer is elvégzi - a párhuzamosság emulálása érdekében. Leegyszerűsített nézet: minél több szálat hoz létre, annál kevesebb időt tölt el az egyes szálak tényleges munkájával.

A Thread Pool minta segít megtakarítani az erőforrásokat egy többszálú alkalmazásban, és a párhuzamosságot bizonyos előre meghatározott korlátokban is tartalmazza.

Amikor szálkészletet használ, akkor írd meg párhuzamos kódodat párhuzamos feladatok formájában, és küldd el végrehajtásra egy szálkészlet egyik példányának. Ez a példány több, újra használt szálat vezérel a feladatok végrehajtásához.

A minta lehetővé teszi vezérelheti az alkalmazás által létrehozott szálak számát, életciklusukat, valamint a feladatok végrehajtásának ütemezését és a beérkező feladatok sorban tartását.

3. Menetkészletek Java-ban

3.1. Végrehajtók, Végrehajtó és ExecutorService

A Végrehajtók A helper osztály számos módszert tartalmaz az előre konfigurált szálkészlet-példányok létrehozásához. Ezek az órák jó hely a kezdéshez - használja, ha nem kell egyéni finomhangolást alkalmaznia.

A Végrehajtó és ExecutorService az interfészeket a Java különböző szálkészlet-megvalósításaihoz használják. Általában meg kell tartsa a kódját elválasztva a szálkészlet tényleges megvalósításától és használja ezeket a felületeket az alkalmazás során.

A Végrehajtó interfész egyetlen kivégezni benyújtási módszer Futható végrehajtási példányok.

Itt egy gyors példa hogyan használhatja a Végrehajtók API egy Végrehajtó példány egyetlen szálkészlettel és korlátlan sorral a feladatok egymás utáni végrehajtására. Itt egyetlen feladatot hajtunk végre, amely egyszerűen kinyomtatja aHelló Világ" a képernyőn. A feladatot lambda-ként (Java 8 szolgáltatás) küldik be, amire következtetni lehet Futható.

Végrehajtó végrehajtó = Executors.newSingleThreadExecutor (); végrehajtó.execute (() -> System.out.println ("Hello World"));

A ExecutorService felület számos módszert tartalmaz a a feladatok előrehaladásának ellenőrzése és a szolgáltatás megszüntetésének kezelése. Ezen a felületen elküldheti végrehajtásra a feladatokat, és a visszaküldött segítségével vezérelheti azok végrehajtását is Jövő példa.

A következő példában, létrehozunk egy ExecutorService, küldjön be egy feladatot, majd használja a visszaküldöttet Jövő’S kap módszer arra, hogy megvárjuk, amíg a beküldött feladat befejeződik, és az érték visszaadódik:

ExecutorService végrehajtóService = Executors.newFixedThreadPool (10); Jövő jövő = végrehajtóSzolgáltatás.submit (() -> "Hello World"); // néhány művelet String eredmény = jövő.get ();

Természetesen való életben általában nem akar hívni future.get () azonnal, de halassza el a hívást, amíg valóban nem lesz szüksége a számítás értékére.

A Beküldés metódus túlterhelt, hogy bármelyik Futható vagy Hívható mindkettő funkcionális interfész, és lambdaként adható át (a Java 8-tól kezdve).

FuthatóAz egyetlen módszer nem vet ki kivételt és nem ad vissza értéket. A Hívható A kezelőfelület kényelmesebb lehet, mivel lehetővé teszi számunkra, hogy kivételt dobjunk és egy értéket adjunk vissza.

Végül - hogy a fordító következtethessen a Hívható típusú, egyszerűen adja vissza az értéket a lambda-ból.

További példák a ExecutorService felületet és a jövőt, olvassa el az „Útmutató a Java ExecutorService-hoz” című részt.

3.2. ThreadPoolExecutor

A ThreadPoolExecutor egy kibontható szálkészlet, sok paraméterrel és horgokkal a finomhangoláshoz.

A fő konfigurációs paraméterek, amelyeket itt megvitatunk, a következők: corePoolSize, maximumPoolSize, és keepAliveTime.

A készlet rögzített számú magszálból áll, amelyeket állandóan bent tartanak, és néhány túlzott szálból, amelyek szaporodhatnak, majd megszűnhetnek, amikor már nincs rájuk szükség. A corePoolSize A paraméter a mag szálak száma, amelyeket példányosítunk és megtartunk a készletben. Amikor új feladat érkezik, ha az összes központi szál foglalt, és a belső várakozási sor megtelt, akkor a medencét hagyjuk felnőni maximumPoolSize.

A keepAliveTime paraméter az az időintervallum, amelyre a túlzott szálak (a corePoolSize) készenléti állapotban létezhetnek. Alapértelmezés szerint a ThreadPoolExecutor csak a nem magszálakat veszi figyelembe eltávolítás céljából. Annak érdekében, hogy ugyanazt az eltávolítási politikát alkalmazzuk az alapvető szálakra, használhatjuk a allowCoreThreadTimeOut (true) módszer.

Ezek a paraméterek a felhasználási esetek széles körére terjednek ki, de a legjellemzőbb konfigurációk a Végrehajtók statikus módszerek.

Például, newFixedThreadPool módszer létrehozza a ThreadPoolExecutor egyenlővel corePoolSize és maximumPoolSize paraméterértékek és egy nulla keepAliveTime. Ez azt jelenti, hogy ebben a szálkészletben a szálak száma mindig megegyezik:

ThreadPoolExecutor végrehajtó = (ThreadPoolExecutor) Executors.newFixedThreadPool (2); végrehajtó.submit (() -> {Szál.alszik (1000); return null;}); végrehajtó.submit (() -> {Szál.alszik (1000); return null;}); végrehajtó.submit (() -> {Szál.alszik (1000); return null;}); assertEquals (2, végrehajtó.getPoolSize ()); assertEquals (1, végrehajtó.getQueue (). méret ());

A fenti példában példányosítjuk a ThreadPoolExecutor fix 2 szálszámmal. Ez azt jelenti, hogy ha az egyidejűleg futó feladatok száma mindig kevesebb vagy egyenlő kettővel, akkor azonnal végrehajtják őket. Másképp, Néhány ilyen feladat sorba kerülhet, hogy megvárja a sorát.

Hármat hoztunk létre Hívható olyan feladatok, amelyek nehéz munkát utánoznak, ha 1000 milliszekundumot alszanak. Az első két feladatot egyszerre hajtják végre, a harmadikat pedig a sorban kell várni. Ellenőrizhetjük a getPoolSize () és getQueue (). size () módszereket azonnal a feladatok benyújtása után.

Egy másik előre konfigurált ThreadPoolExecutor létrehozható a Executors.newCachedThreadPool () módszer. Ez a módszer egyáltalán nem fogad számos szálat. A corePoolSize valójában 0-ra van állítva, és a maximumPoolSize értékre van állítva Egész.MAX_VALUE erre a példára. A keepAliveTime 60 másodperc.

Ezek a paraméterértékek azt jelentik a gyorsítótárazott szálkészlet korlátok nélkül növekedhet, hogy tetszőleges számú beküldött feladatot befogadhasson. De amikor a szálakra már nincs szükség, 60 másodperc inaktivitás után megsemmisítik őket. Tipikus használati eset, amikor az alkalmazásban sok rövid életű feladat van.

ThreadPoolExecutor végrehajtó = (ThreadPoolExecutor) Executors.newCachedThreadPool (); végrehajtó.submit (() -> {Szál.alszik (1000); return null;}); végrehajtó.submit (() -> {Szál.alszik (1000); return null;}); végrehajtó.submit (() -> {Szál.alszik (1000); return null;}); assertEquals (3, végrehajtó.getPoolSize ()); assertEquals (0, végrehajtó.getQueue (). méret ());

A fenti példában a sor mérete mindig nulla lesz, mert belsőleg a SynchronousQueue példányt használunk. A SynchronousQueue, pár betét és eltávolítani a műveletek mindig egyidejűleg zajlanak, így a sor valójában soha nem tartalmaz semmit.

A Executors.newSingleThreadExecutor () Az API létrehoz egy másik tipikus formáját ThreadPoolExecutor egyetlen szálat tartalmaz. Az egyszálú végrehajtó ideális egy eseményhurok létrehozásához. A corePoolSize és maximumPoolSize paraméterek megegyeznek 1-vel, és a keepAliveTime nulla.

A fenti példában szereplő feladatokat egymás után hajtjuk végre, így a zászló értéke 2 lesz a feladat befejezése után:

AtomicInteger számláló = new AtomicInteger (); ExecutorService végrehajtó = Executors.newSingleThreadExecutor (); végrehajtó.beküldés (() -> {számláló.készlet (1);}); végrehajtó.submit (() -> {számláló.összehasonlítAndSet (1, 2);});

Ezenkívül ez ThreadPoolExecutor változhatatlan burkolattal van díszítve, így a létrehozás után nem lehet újra konfigurálni. Ne feledje, hogy ez az oka annak is, hogy nem vethetjük a ThreadPoolExecutor.

3.3. ScheduledThreadPoolExecutor

A ScheduledThreadPoolExecutor kiterjeszti a ThreadPoolExecutor osztály és megvalósítja a ScheduledExecutorService interfész számos további módszerrel:

  • menetrend A módszer lehetővé teszi egy feladat végrehajtását egy meghatározott késleltetés után;
  • scheduleAtFixedRate A módszer lehetővé teszi egy feladat végrehajtását egy meghatározott kezdeti késleltetés után, majd ismételten, egy bizonyos időtartam alatt; a időszak érv az idő a feladatok kezdési időpontjai között mérve, tehát a végrehajtási arány rögzített;
  • scheduleWithFixedDelay módszer hasonló scheduleAtFixedRate annyiban, hogy ismételten végrehajtja az adott feladatot, de a megadott késleltetés az az előző feladat vége és a következő kezdete között mérve; a végrehajtási arány az adott feladat végrehajtásához szükséges idő függvényében változhat.

A Executors.newScheduledThreadPool () módszert általában a ScheduledThreadPoolExecutor adottval corePoolSize, korlátlan maximumPoolSize és nulla keepAliveTime. A feladat végrehajtásának ütemezése 500 milliszekundum alatt:

ScheduledExecutorService végrehajtó = Executors.newScheduledThreadPool (5); végrehajtó.schedule (() -> {System.out.println ("Hello World");}, 500, TimeUnit.MILLISECONDS);

A következő kód bemutatja, hogyan lehet egy feladatot végrehajtani 500 ezredmásodperces késés után, majd megismételni 100 milliszekundumonként. A feladat ütemezése után megvárjuk, amíg háromszor elindul a CountDownLatch zár, majd törölje a Future.cancel () módszer.

CountDownLatch lock = new CountDownLatch (3); ScheduledExecutorService végrehajtó = Executors.newScheduledThreadPool (5); ScheduledFuture future = végrehajtó.scheduleAtFixedRate (() -> {System.out.println ("Hello World"); lock.countDown ();}, 500, 100, TimeUnit.MILLISECONDS); lock.await (1000, TimeUnit.MILLISECONDS); jövő.törlés (igaz);

3.4. ForkJoinPool

ForkJoinPool központi része a villa / csatlakozás Java 7-ben bevezetett keretrendszer. Megoldja a több feladat ívása rekurzív algoritmusokban. Egy egyszerű használatával ThreadPoolExecutor, akkor gyorsan elfogynak a szálak, mivel minden feladat vagy részfeladat futtatásához saját szál szükséges.

A villa / csatlakozás keretrendszerben bármilyen feladat szaporodhat (Villa) számos részfeladatot, és várja meg azok teljesítését a csatlakozik módszer. A. Előnye villa / csatlakozás keretrendszer az nem hoz létre új szálat az egyes feladatokhoz vagy részfeladatokhoz, helyette a Work Stealing algoritmus megvalósítását. Ezt a keretrendszert alaposan leírja az “Útmutató a Java fork / join keretrendszerhez” című cikkben.

Nézzünk meg egy egyszerű példát a használatra ForkJoinPool csomópontfán való áthaladás és az összes levélérték összegének kiszámítása. Itt van egy fa egyszerű megvalósítása, amely egy csomópontból, egy int érték és gyermekcsomópontok halmaza:

statikus osztály TreeNode {int érték; Gyerekek; TreeNode (int érték, TreeNode ... gyermekek) {this.value = érték; this.children = Sets.newHashSet (gyerekek); }}

Most, ha egy fa összes értékét párhuzamosan szeretnénk összegezni, akkor végre kell hajtanunk a RecursiveTask felület. Minden feladat megkapja a saját csomópontját, és hozzáadja az értékét az értékeinek összegéhez gyermekek. Összegének kiszámításához gyermekek értékek, a feladat végrehajtása a következőket teszi:

  • patakok a gyermekek készlet,
  • ezen a folyamon térképeket készít, létrehozva egy újat CountingTask minden elemre,
  • az egyes részfeladatokat villával végrehajtja,
  • összegyűjti az eredményeket a csatlakozik módszer minden egyes elágazó feladathoz,
  • összegzi az eredményeket a Collectors.summingInt gyűjtő.
public static class CountingTask kiterjeszti a RecursiveTask {private final TreeNode csomópontot; public CountingTask (TreeNode csomópont) {this.node = csomópont; } @ Felülírás védett egész számítása () {return node.value + node.children.stream () .map (childNode -> new CountingTask (childNode) .fork ()) .collect (Collectors.summingInt (ForkJoinTask :: join)) ; }}

A tényleges fán történő számítás futtatásához szükséges kód nagyon egyszerű:

TreeNode tree = új TreeNode (5, új TreeNode (3), új TreeNode (2, új TreeNode (2), új TreeNode (8))); ForkJoinPool forkJoinPool = ForkJoinPool.commonPool (); int összeg = forkJoinPool.invoke (új CountingTask (fa));

4. Thread Pool megvalósítása Guavában

A Guava egy népszerű Google segédprogram-könyvtár. Számos hasznos párhuzamossági osztálya van, beleértve a ExecutorService. A megvalósító osztályok nem érhetők el közvetlen példányosítás vagy alosztályozás céljából, ezért példányaik létrehozásához az egyetlen belépési pont a Végrehajtók segítő osztály.

4.1. Guava hozzáadása Maven-függőségként

Adja hozzá a következő függőséget a Maven pom fájljához, hogy a Guava könyvtár bekerüljön a projektbe. A Guava könyvtár legújabb verzióját a Maven Central adattárban találja meg:

 com.google.guava guava 19.0 

4.2. Közvetlen végrehajtó és közvetlen végrehajtó szolgáltatás

Néha a feladatot az aktuális szálban vagy egy szálkészletben szeretné végrehajtani, bizonyos feltételektől függően. Legszívesebben egyetlenet használna Végrehajtó felületet, és csak kapcsolja át a megvalósítást. Bár nem olyan nehéz megvalósítani a Végrehajtó vagy ExecutorService amely az aktuális szálban végrehajtja a feladatokat, akkor is meg kell írnia néhány bojler kódot.

Örömmel, hogy Guava előre definiált példányokat nyújt számunkra.

Itt egy példa amely egy feladat végrehajtását mutatja ugyanazon szálon. Noha a megadott feladat 500 ezredmásodpercet alszik, mégis blokkolja az aktuális szálat, és az eredmény azonnal elérhető a végrehajtani a hívás befejeződött:

Végrehajtó végrehajtó = MoreExecutors.directExecutor (); Végrehajtott AtomicBoolean = új AtomicBoolean (); végrehajtó.execute (() -> {próbálkozzon {Thread.sleep (500);} catch (InterruptedException e) {e.printStackTrace ();} végrehajtva.set (true);}); assertTrue (végrehajtva.get ());

A. Által visszaadott példány directExecutor () A módszer valójában egy statikus szingulett, így ennek a módszernek a használata egyáltalán nem jelent általános költséget az objektum létrehozásában.

Előnyben kell részesítenie ezt a módszert a MoreExecutors.newDirectExecutorService () mert ez az API minden híváshoz teljes körű végrehajtó szolgáltatást hoz létre.

4.3. Kilépés az Executor Services szolgáltatásból

Egy másik gyakori probléma az a virtuális gép leállítása miközben egy szálkészlet még mindig futtatja feladatait. Még a lemondási mechanizmus megléte esetén sem garantált, hogy a feladatok szépen fognak viselkedni és leállítják munkájukat, amikor a végrehajtó szolgáltatás leáll. Ez azt eredményezheti, hogy a JVM a végtelenségig lóg, miközben a feladatok folyamatosan végzik munkájukat.

Ennek a problémának a megoldására Guava bemutatja a kivégző végrehajtó szolgáltatások családját. Ezeken alapulnak démon szálak, amelyek a JVM-mel együtt fejeződnek be.

Ezek a szolgáltatások egy leállítási kampót is hozzáadnak a Runtime.getRuntime (). AddShutdownHook () metódust, és megakadályozza, hogy a virtuális gép konfigurált ideig szüneteltessen, mielőtt feladja az akasztott feladatokat.

A következő példában egy végtelen ciklust tartalmazó feladatot küldünk be, de egy kilépő végrehajtó szolgáltatást használunk 100 milliszekundum konfigurált idővel, hogy megvárjuk a feladatokat a virtuális gép leállításakor. A exitingExecutorService a helyén ez a feladat a virtuális gép korlátlan ideig fennakadást okozna:

ThreadPoolExecutor végrehajtó = (ThreadPoolExecutor) Executors.newFixedThreadPool (5); ExecutorService executorService = MoreExecutors.getExitingExecutorService (végrehajtó, 100, TimeUnit.MILLISECONDS); executorService.submit (() -> {while (true) {}});

4.4. Hallgató díszítők

A hallgatói dekorátorok lehetővé teszik, hogy becsomagolja a ExecutorService és fogadja Hallható jövő példák a feladat benyújtásakor az egyszerű helyett Jövő példányok. A Hallható jövő interfész kiterjed Jövő és egyetlen további módszerrel rendelkezik addListener. Ez a módszer lehetővé teszi egy olyan hallgató hozzáadását, amelyet későbbi befejezéskor hívnak meg.

Ritkán akarja használni ListenableFuture.addListener () módszer, de az nélkülözhetetlen a legtöbb segítő módszerhez Határidős hasznossági osztály. Például a Futures.allAsList () módszerrel több is kombinálható Hallható jövő példányok egyetlen Hallható jövő ami az összes jövő együttes sikeres befejezésével fejeződik be:

ExecutorService végrehajtóService = Executors.newCachedThreadPool (); ListeningExecutorService listeningExecutorService = MoreExecutors.listeningDecorator (végrehajtóSzolgáltatás); ListenableFuture future1 = listeningExecutorService.submit (() -> "Hello"); ListenableFuture future2 = listeningExecutorService.submit (() -> "Világ"); Karakterlánc-köszöntés = Futures.allAsList (jövő1, jövő2) .get () .stream () .collect (Gyűjtők.csatlakozás ("")); assertEquals ("Hello World", üdvözlet);

5. Következtetés

Ebben a cikkben megvitattuk a Thread Pool mintázatát és annak megvalósítását a standard Java könyvtárban és a Google Guava könyvtárában.

A cikk forráskódja elérhető a GitHubon.