Útmutató a Java Fork / Join Framework-hez

1. Áttekintés

A fork / join keretrendszer a Java 7-ben került bemutatásra. Olyan eszközöket kínál, amelyek felgyorsítják a párhuzamos feldolgozást azáltal, hogy megkísérlik felhasználni az összes rendelkezésre álló processzormagot - ami megvalósult megosztani és meghódítani megközelítés révén.

A gyakorlatban ez azt jelenti a keret először „elágazik”, rekurzívan felosztva a feladatot kisebb, független részfeladatokra, amíg azok elég egyszerűek ahhoz, hogy aszinkron módon hajthatók végre.

Utána, kezdődik a „csatlakozás” rész, amelyben az összes részfeladat eredményei rekurzívan egyesülnek egyetlen eredménybe, vagy egy érvénytelenné váló feladat esetén a program egyszerűen megvárja, amíg minden részfeladatot végrehajtanak.

A hatékony párhuzamos végrehajtás érdekében a fork / join keretrendszer az ún ForkJoinPool, amely a munkás szálakat kezeli ForkJoinWorkerThread.

2. ForkJoinPool

A ForkJoinPool a keret szíve. Ez a ExecutorService amely kezeli a munkavállalói szálakat és eszközöket biztosít számunkra a szálkészlet állapotáról és teljesítményéről.

A munkavállalói szálak egyszerre csak egy feladatot hajthatnak végre, de a ForkJoinPool nem hoz létre külön szálat minden egyes részfeladathoz. Ehelyett a medence minden szálának megvan a maga kettős vége (vagy deque, ejtve) fedélzet), amely a feladatokat tárolja.

Ez az architektúra létfontosságú a szál munkaterhelésének kiegyensúlyozásához a munkalopási algoritmus.

2.1. Munkalopási algoritmus

Egyszerűen fogalmazva - az ingyenes szálak megpróbálják „ellopni” a munkát az elfoglalt szálak dequestől.

Alapértelmezés szerint a munkáslánc a saját deque fejétől kapja meg a feladatokat. Ha üres, a szál feladatot vesz egy másik elfoglalt szál deque-jének farkából vagy a globális belépési sorból, mivel valószínűleg itt találhatók a legnagyobb darabok.

Ez a megközelítés minimalizálja annak lehetőségét, hogy a szálak versenyezzenek a feladatokért. Csökkenti azt is, hogy a szálnak hányszor kell munkát keresnie, mivel először a rendelkezésre álló legnagyobb darabokon működik.

2.2. ForkJoinPool Azonnali beavatkozás

A Java 8-ban a legkényelmesebb módja a hozzáférésnek a ForkJoinPool statikus módszerének használata commonPool (). Ahogy a neve is sugallja, ez hivatkozást fog adni a közös készletre, amely mindegyik alapértelmezett szálkészlete ForkJoinTask.

Az Oracle dokumentációja szerint az előre definiált közös készlet használata csökkenti az erőforrás-felhasználást, mivel ez visszatartja feladatonként külön szálkészlet létrehozását.

ForkJoinPool commonPool = ForkJoinPool.commonPool ();

Ugyanez a viselkedés érhető el a Java 7-ben az a létrehozásával ForkJoinPool és hozzárendelni a nyilvános statikus egy közműosztály mezője:

public static ForkJoinPool forkJoinPool = new ForkJoinPool (2);

Most könnyen elérhető:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

Val vel ForkJoinPool's konstruktorok segítségével lehetséges egy egyedi szálkészlet létrehozása a párhuzamosság, a szálgyár és a kivételkezelő meghatározott szintjével. A fenti példában a készlet párhuzamossági szintje 2. Ez azt jelenti, hogy a pool 2 processzormagot fog használni.

3. ForkJoinTask

ForkJoinTask a belül végrehajtott feladatok alaptípusa ForkJoinPool. A gyakorlatban a két alosztály egyikét ki kell terjeszteni: a RecursiveAction mert üres feladatok és a RecursiveTask értéket adó feladatokhoz.Mindkettőjüknek van elvont módszere kiszámít() amelyben a feladat logikája van meghatározva.

3.1. RecursiveAction - egy példa

Az alábbi példában a feldolgozandó munkaegységet a jelöli Húr hívott terhelés. Bemutató célokra a feladat értelmetlen: egyszerűen nagybetűkkel írja be a bevitelt és naplózza.

A keret elágazó viselkedésének bemutatásához a példa felosztja a feladatot, ha terhelés.hossz() nagyobb, mint egy megadott küszöbhasználni a createSubtask () módszer.

A karakterlánc rekurzívan fel van osztva részstringekre, létrehozás CustomRecursiveTask ezeken az alszövegeken alapuló példányok.

Ennek eredményeként a módszer visszaadja a Lista.

A listát benyújtják a ForkJoinPool használni a invokeAll () módszer:

public class CustomRecursiveAction kiterjeszti a RecursiveAction {private String munkaterhelés = ""; privát statikus végső int THRESHOLD = 4; private static Logger logger = Naplózó.getAnonymousLogger (); public CustomRecursiveAction (String workload) {this.workload = munkaterhelés; } @Orride protected void compute () {if (workload.length ()> THRESHOLD) {ForkJoinTask.invokeAll (createSubtasks ()); } else {feldolgozás (munkaterhelés); }} privát Lista createSubtasks () {Lista részfeladatok = new ArrayList (); String partOne = munkaterhelés.substring (0, munkaterhelés.hossz () / 2); String partTwo = munkaterhelés.substring (munkaterhelés.hossz () / 2, munkaterhelés.hossz ()); subtasks.add (új CustomRecursiveAction (partOne)); subtasks.add (új CustomRecursiveAction (partTwo)); részfeladatok visszaküldése; } private void feldolgozás (String munka) {String eredmény = work.toUpperCase (); logger.info ("Ezt az eredményt ((" + eredmény + ")) a" + Thread.currentThread (). getName ()) dolgozta fel; }}

Ez a minta felhasználható saját fejlesztésére RecursiveAction osztályok. Ehhez hozzon létre egy objektumot, amely a teljes munka mennyiségét képviseli, kiválasztott egy megfelelő küszöbértéket, meghatározzon egy módszert a munka felosztására és meghatározzon egy módszert a munka elvégzésére.

3.2. RecursiveTask

Értéket adó feladatok esetében az itt látható logika hasonló, azzal a különbséggel, hogy az egyes részfeladatok eredményeit egyetlen eredmény egyesíti:

public class CustomRecursiveTask kiterjeszti a RecursiveTask {private int [] arr; privát statikus végső int THRESHOLD = 20; public CustomRecursiveTask (int [] arr) {this.arr = arr; } @Orride protected Integer compute () {if (arr.length> THRESHOLD) {return ForkJoinTask.invokeAll (createSubtasks ()) .stream () .mapToInt (ForkJoinTask :: join) .sum (); } else {visszatérés feldolgozása (arr); }} privát gyűjtemény createSubtasks () {List splitTasks = new ArrayList (); sharedTasks.add (új CustomRecursiveTask (Arrays.copyOfRange (arr, 0, arr.length / 2))); splitTasks.add (új CustomRecursiveTask (Tömbök.copyOfRange (arr, arr.length / 2, arr.length))); return splitTasks; } privát Egész feldolgozás (int [] arr) {return tömbök.stream (arr) .szűrő (a -> a> 10 && a a * 10) .sum (); }}

Ebben a példában a művet a arr mező CustomRecursiveTask osztály. A createSubtasks () módszer rekurzívan osztja fel a feladatot kisebb munkadarabokra, amíg minden darab kisebb lesz, mint a küszöb. Aztán a invokeAll () metódus elküldi az alfeladatokat a közös készletnek, és visszaadja a Jövő.

A végrehajtás kiváltásához a csatlakozik() metódust hívnak meg az egyes részfeladatokhoz.

Ebben a példában ezt a Java 8-asokkal hajtjuk végre Stream API; a összeg() módszert alkalmazzák az aleredmények és a végeredmény kombinálásának ábrázolására.

4. Feladatok benyújtása a ForkJoinPool

A feladatok elküldéséhez a szálkészlethez kevés megközelítés használható.

A Beküldés() vagy végrehajtani ()módszer (felhasználási eseteik megegyeznek):

forkJoinPool.execute (customRecursiveTask); int eredmény = customRecursiveTask.join ();

A invoke ()A módszer elágazik a feladaton, és várja az eredményt, és nem igényel semmilyen kézi csatlakozást:

int eredmény = forkJoinPool.invoke (customRecursiveTask);

A invokeAll () módszer a legkényelmesebb módja a ForkJoinTasks hoz ForkJoinPool. A feladatokat paraméterként veszi fel (két feladat, var args vagy gyűjtemény), majd a forks visszaad egy gyűjteményt Jövő tárgyakat a gyártás sorrendjében.

Alternatív megoldásként használhatja külön Villa() és csatlakozik() mód. A Villa() metódus feladatot küld egy készletnek, de nem váltja ki a végrehajtását. A csatlakozik() módszert kell használni erre a célra. Abban az esetben RecursiveAction, a csatlakozik() nem ad mást, csak nulla; mert RecursiveTask, a feladat végrehajtásának eredményét adja vissza:

customRecursiveTaskFirst.fork (); eredmény = customRecursiveTaskLast.join ();

Miénkben RecursiveTask példát használtuk invokeAll () módszer részfeladatok sorozatának beküldésére a készletbe. Ugyanaz a munka elvégezhető Villa() és csatlakozik(), bár ennek következményei vannak az eredmények sorrendjére.

A félreértések elkerülése érdekében általában célszerű használni invokeAll () módszer egynél több feladat elküldéséhez a ForkJoinPool.

5. Következtetések

A fork / join keretrendszer használata felgyorsíthatja a nagy feladatok feldolgozását, de ennek az eredménynek az eléréséhez néhány irányelvet kell követni:

  • Használjon minél kevesebb szálkészletet - a legtöbb esetben az a legjobb döntés, ha alkalmazásonként vagy rendszerenként egy szálkészletet használunk
  • Használja az alapértelmezett közös szálkészletet, ha nincs szükség speciális hangolásra
  • Használjon ésszerű küszöböt a felosztáshoz ForkJoinTask részfeladatokká
  • Kerülje az esetleges blokkolástForkJoinTasks

Az ebben a cikkben használt példák a linkelt GitHub-tárban érhetők el.