Útmutató a Java Fork / Join Framework-hez
1. Áttekintés
A fork / join keretrendszer a Java 7-ben került bemutatásra. Olyan eszközöket kínál, amelyek felgyorsítják a párhuzamos feldolgozást azáltal, hogy megkísérlik felhasználni az összes rendelkezésre álló processzormagot - ami megvalósult megosztani és meghódítani megközelítés révén.
A gyakorlatban ez azt jelenti a keret először „elágazik”, rekurzívan felosztva a feladatot kisebb, független részfeladatokra, amíg azok elég egyszerűek ahhoz, hogy aszinkron módon hajthatók végre.
Utána, kezdődik a „csatlakozás” rész, amelyben az összes részfeladat eredményei rekurzívan egyesülnek egyetlen eredménybe, vagy egy érvénytelenné váló feladat esetén a program egyszerűen megvárja, amíg minden részfeladatot végrehajtanak.
A hatékony párhuzamos végrehajtás érdekében a fork / join keretrendszer az ún ForkJoinPool, amely a munkás szálakat kezeli ForkJoinWorkerThread.
2. ForkJoinPool
A ForkJoinPool a keret szíve. Ez a ExecutorService amely kezeli a munkavállalói szálakat és eszközöket biztosít számunkra a szálkészlet állapotáról és teljesítményéről.
A munkavállalói szálak egyszerre csak egy feladatot hajthatnak végre, de a ForkJoinPool nem hoz létre külön szálat minden egyes részfeladathoz. Ehelyett a medence minden szálának megvan a maga kettős vége (vagy deque, ejtve) fedélzet), amely a feladatokat tárolja.
Ez az architektúra létfontosságú a szál munkaterhelésének kiegyensúlyozásához a munkalopási algoritmus.
2.1. Munkalopási algoritmus
Egyszerűen fogalmazva - az ingyenes szálak megpróbálják „ellopni” a munkát az elfoglalt szálak dequestől.
Alapértelmezés szerint a munkáslánc a saját deque fejétől kapja meg a feladatokat. Ha üres, a szál feladatot vesz egy másik elfoglalt szál deque-jének farkából vagy a globális belépési sorból, mivel valószínűleg itt találhatók a legnagyobb darabok.
Ez a megközelítés minimalizálja annak lehetőségét, hogy a szálak versenyezzenek a feladatokért. Csökkenti azt is, hogy a szálnak hányszor kell munkát keresnie, mivel először a rendelkezésre álló legnagyobb darabokon működik.
2.2. ForkJoinPool Azonnali beavatkozás
A Java 8-ban a legkényelmesebb módja a hozzáférésnek a ForkJoinPool statikus módszerének használata commonPool (). Ahogy a neve is sugallja, ez hivatkozást fog adni a közös készletre, amely mindegyik alapértelmezett szálkészlete ForkJoinTask.
Az Oracle dokumentációja szerint az előre definiált közös készlet használata csökkenti az erőforrás-felhasználást, mivel ez visszatartja feladatonként külön szálkészlet létrehozását.
ForkJoinPool commonPool = ForkJoinPool.commonPool ();
Ugyanez a viselkedés érhető el a Java 7-ben az a létrehozásával ForkJoinPool és hozzárendelni a nyilvános statikus egy közműosztály mezője:
public static ForkJoinPool forkJoinPool = new ForkJoinPool (2);
Most könnyen elérhető:
ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;
Val vel ForkJoinPool's konstruktorok segítségével lehetséges egy egyedi szálkészlet létrehozása a párhuzamosság, a szálgyár és a kivételkezelő meghatározott szintjével. A fenti példában a készlet párhuzamossági szintje 2. Ez azt jelenti, hogy a pool 2 processzormagot fog használni.
3. ForkJoinTask
ForkJoinTask a belül végrehajtott feladatok alaptípusa ForkJoinPool. A gyakorlatban a két alosztály egyikét ki kell terjeszteni: a RecursiveAction mert üres feladatok és a RecursiveTask értéket adó feladatokhoz.Mindkettőjüknek van elvont módszere kiszámít() amelyben a feladat logikája van meghatározva.
3.1. RecursiveAction - egy példa
Az alábbi példában a feldolgozandó munkaegységet a jelöli Húr hívott terhelés. Bemutató célokra a feladat értelmetlen: egyszerűen nagybetűkkel írja be a bevitelt és naplózza.
A keret elágazó viselkedésének bemutatásához a példa felosztja a feladatot, ha terhelés.hossz() nagyobb, mint egy megadott küszöbhasználni a createSubtask () módszer.
A karakterlánc rekurzívan fel van osztva részstringekre, létrehozás CustomRecursiveTask ezeken az alszövegeken alapuló példányok.
Ennek eredményeként a módszer visszaadja a Lista.
A listát benyújtják a ForkJoinPool használni a invokeAll () módszer:
public class CustomRecursiveAction kiterjeszti a RecursiveAction {private String munkaterhelés = ""; privát statikus végső int THRESHOLD = 4; private static Logger logger = Naplózó.getAnonymousLogger (); public CustomRecursiveAction (String workload) {this.workload = munkaterhelés; } @Orride protected void compute () {if (workload.length ()> THRESHOLD) {ForkJoinTask.invokeAll (createSubtasks ()); } else {feldolgozás (munkaterhelés); }} privát Lista createSubtasks () {Lista részfeladatok = new ArrayList (); String partOne = munkaterhelés.substring (0, munkaterhelés.hossz () / 2); String partTwo = munkaterhelés.substring (munkaterhelés.hossz () / 2, munkaterhelés.hossz ()); subtasks.add (új CustomRecursiveAction (partOne)); subtasks.add (új CustomRecursiveAction (partTwo)); részfeladatok visszaküldése; } private void feldolgozás (String munka) {String eredmény = work.toUpperCase (); logger.info ("Ezt az eredményt ((" + eredmény + ")) a" + Thread.currentThread (). getName ()) dolgozta fel; }}
Ez a minta felhasználható saját fejlesztésére RecursiveAction osztályok. Ehhez hozzon létre egy objektumot, amely a teljes munka mennyiségét képviseli, kiválasztott egy megfelelő küszöbértéket, meghatározzon egy módszert a munka felosztására és meghatározzon egy módszert a munka elvégzésére.
3.2. RecursiveTask
Értéket adó feladatok esetében az itt látható logika hasonló, azzal a különbséggel, hogy az egyes részfeladatok eredményeit egyetlen eredmény egyesíti:
public class CustomRecursiveTask kiterjeszti a RecursiveTask {private int [] arr; privát statikus végső int THRESHOLD = 20; public CustomRecursiveTask (int [] arr) {this.arr = arr; } @Orride protected Integer compute () {if (arr.length> THRESHOLD) {return ForkJoinTask.invokeAll (createSubtasks ()) .stream () .mapToInt (ForkJoinTask :: join) .sum (); } else {visszatérés feldolgozása (arr); }} privát gyűjtemény createSubtasks () {List splitTasks = new ArrayList (); sharedTasks.add (új CustomRecursiveTask (Arrays.copyOfRange (arr, 0, arr.length / 2))); splitTasks.add (új CustomRecursiveTask (Tömbök.copyOfRange (arr, arr.length / 2, arr.length))); return splitTasks; } privát Egész feldolgozás (int [] arr) {return tömbök.stream (arr) .szűrő (a -> a> 10 && a a * 10) .sum (); }}
Ebben a példában a művet a arr mező CustomRecursiveTask osztály. A createSubtasks () módszer rekurzívan osztja fel a feladatot kisebb munkadarabokra, amíg minden darab kisebb lesz, mint a küszöb. Aztán a invokeAll () metódus elküldi az alfeladatokat a közös készletnek, és visszaadja a Jövő.
A végrehajtás kiváltásához a csatlakozik() metódust hívnak meg az egyes részfeladatokhoz.
Ebben a példában ezt a Java 8-asokkal hajtjuk végre Stream API; a összeg() módszert alkalmazzák az aleredmények és a végeredmény kombinálásának ábrázolására.
4. Feladatok benyújtása a ForkJoinPool
A feladatok elküldéséhez a szálkészlethez kevés megközelítés használható.
A Beküldés() vagy végrehajtani ()módszer (felhasználási eseteik megegyeznek):
forkJoinPool.execute (customRecursiveTask); int eredmény = customRecursiveTask.join ();
A invoke ()A módszer elágazik a feladaton, és várja az eredményt, és nem igényel semmilyen kézi csatlakozást:
int eredmény = forkJoinPool.invoke (customRecursiveTask);
A invokeAll () módszer a legkényelmesebb módja a ForkJoinTasks hoz ForkJoinPool. A feladatokat paraméterként veszi fel (két feladat, var args vagy gyűjtemény), majd a forks visszaad egy gyűjteményt Jövő tárgyakat a gyártás sorrendjében.
Alternatív megoldásként használhatja külön Villa() és csatlakozik() mód. A Villa() metódus feladatot küld egy készletnek, de nem váltja ki a végrehajtását. A csatlakozik() módszert kell használni erre a célra. Abban az esetben RecursiveAction, a csatlakozik() nem ad mást, csak nulla; mert RecursiveTask, a feladat végrehajtásának eredményét adja vissza:
customRecursiveTaskFirst.fork (); eredmény = customRecursiveTaskLast.join ();
Miénkben RecursiveTask példát használtuk invokeAll () módszer részfeladatok sorozatának beküldésére a készletbe. Ugyanaz a munka elvégezhető Villa() és csatlakozik(), bár ennek következményei vannak az eredmények sorrendjére.
A félreértések elkerülése érdekében általában célszerű használni invokeAll () módszer egynél több feladat elküldéséhez a ForkJoinPool.
5. Következtetések
A fork / join keretrendszer használata felgyorsíthatja a nagy feladatok feldolgozását, de ennek az eredménynek az eléréséhez néhány irányelvet kell követni:
- Használjon minél kevesebb szálkészletet - a legtöbb esetben az a legjobb döntés, ha alkalmazásonként vagy rendszerenként egy szálkészletet használunk
- Használja az alapértelmezett közös szálkészletet, ha nincs szükség speciális hangolásra
- Használjon ésszerű küszöböt a felosztáshoz ForkJoinTask részfeladatokká
- Kerülje az esetleges blokkolástForkJoinTasks
Az ebben a cikkben használt példák a linkelt GitHub-tárban érhetők el.