Útmutató a Java párhuzamos gyűjtők könyvtárához
1. Bemutatkozás
A Parallel-collectors egy kis könyvtár, amely egy sor Java Stream API-gyűjtőt kínál, amelyek lehetővé teszik a párhuzamos feldolgozást - ugyanakkor megkerülik a szabványos Parallel Stream fő hiányosságait.
2. Maven-függőségek
Ha el akarjuk kezdeni a könyvtár használatát, egyetlen bejegyzést kell hozzáadnunk a Maven-be pom.xml fájl:
com.pivovarit párhuzamos gyűjtők 1.1.0
Vagy egyetlen sor a Gradle build fájljában:
fordítsd le a „com.pivovarit: parallel-collectors: 1.1.0”
A legújabb verzió megtalálható a Maven Central oldalon.
3. Párhuzamos folyamok figyelmeztetései
A Párhuzamos adatfolyamok a Java 8 egyik kiemelt témája voltak, de kiderült, hogy kizárólag nehéz CPU-feldolgozásra alkalmazhatók.
Ennek oka az volt, hogy A Párhuzamos folyamokat belső szinten egy JVM-szintű megosztott támogatta ForkJoinPool, amely korlátozott párhuzamosságot biztosított és egyetlen JVM-példányon futó összes párhuzamos adatfolyam használta.
Tegyük fel például, hogy van egy listánk az azonosítókról, és felhasználni akarjuk őket a felhasználók listájának lekérésére, és hogy ez a művelet drága.
Használhatnánk ehhez a Párhuzamos folyamokat:
Listazonosítók = tömbök. AsList (1, 2, 3); Eredmények felsorolása = ids.parallelStream () .map (i -> fetchById (i)) // minden művelet egy másodpercet vesz igénybe .collect (Collectors.toList ()); System.out.println (eredmények); // [user-1, user-2, user-3]
És valóban láthatjuk, hogy észrevehető a gyorsulás. De problematikussá válik, ha elkezdünk több párhuzamos blokkolási műveletet futtatni ... párhuzamosan. Ez gyorsan telítheti a medencét és potenciálisan hatalmas késleltetéseket eredményezhetnek. Ezért fontos a válaszfalak megépítése külön szálkészletek létrehozásával - annak megakadályozása érdekében, hogy a nem kapcsolódó feladatok befolyásolják egymás végrehajtását.
A szokás biztosítása érdekében ForkJoinPool Például kihasználhatnánk az itt leírt trükköt, de ez a megközelítés egy dokumentálatlan hackelésre támaszkodott, és hibás volt a JDK10-ig. Magában a számban olvashatunk bővebben - [JDK8190974].
4. Párhuzamos gyűjtők működés közben
A párhuzamos gyűjtők, amint a neve is mutatja, csak szabványos Stream API-gyűjtők, amelyek lehetővé teszik további műveletek párhuzamos végrehajtását a gyűjt() fázis.
ParallelCollectors (amely tükröz Gyűjtők osztály) a homlokzat, amely hozzáférést biztosít a könyvtár teljes funkcionalitásához.
Ha a fenti példát szeretnénk megismételni, egyszerűen írhatnánk:
ExecutorService végrehajtó = Executors.newFixedThreadPool (10); Listazonosítók = tömbök. AsList (1, 2, 3); CompletableFuture eredmények = ids.stream () .collect (ParallelCollectors.parallelToList (i -> fetchById (i), végrehajtó, 4)); System.out.println (results.join ()); // [user-1, user-2, user-3]
Az eredmény ugyanakkora, meg tudtuk adni az egyéni szálkészletünket, megadhattuk az egyedi párhuzamossági szintünket, és az eredmény a CompletableFuture az aktuális szál blokkolása nélkül.
A Normál Párhuzamos Áramlat viszont egyiket sem tudta elérni.
4.1. ParallelCollectors.parallelToList / ToSet ()
Amilyen intuitív, ha feldolgozni akarjuk a Folyam párhuzamosan, és gyűjtse össze az eredményeket a Lista vagy Készlet, egyszerűen használhatjuk ParallelCollectors.parallelToList vagy parallelToSet:
Listazonosítók = tömbök. AsList (1, 2, 3); Eredménylista = ids.stream () .collect (parallelToList (i -> fetchById (i), végrehajtó, 4)) .join ();
4.2. ParallelCollectors.parallelToMap ()
Ha gyűjteni akarunk Folyam elemeket a Térkép Például a Stream API-hoz hasonlóan két térképet is meg kell adnunk:
Listazonosítók = tömbök. AsList (1, 2, 3); Térkép eredmények = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), végrehajtó, 4)) .join (); // {1 = user-1, 2 = user-2, 3 = user-3}
Igényt is tudunk biztosítani Térkép példa Támogató:
Térkép eredményei = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, végrehajtó, 4)).
És egy egyedi konfliktusmegoldási stratégia:
Listazonosítók = tömbök. AsList (1, 2, 3); Térkép eredmények = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, (s1, s2) -> s1, végrehajtó, 4)) .join ();
4.3. ParallelCollectors.parallelToCollection ()
A fentiekhez hasonlóan átadhatjuk szokásunkat Gyűjtemény beszállítója ha egyedi tárolóba csomagolva szeretnénk eredményeket elérni:
Eredmények listája = ids.stream () .collect (parallelToCollection (i -> fetchById (i), LinkedList :: new, végrehajtó, 4)) .join ();
4.4. ParallelCollectors.parallelToStream ()
Ha a fentiek nem elégek, akkor valóban megszerezhetjük a Folyam példány, és ott folytassa az egyéni feldolgozást:
Térkép results = ids.stream () .collect (parallelToStream (i -> fetchById (i), végrehajtó, 4)) .thenApply (stream -> stream.collect (Collectors.groupingBy (i -> i.length ()))) .csatlakozik();
4.5. ParallelCollectors.parallel ()
Ez lehetővé teszi számunkra, hogy az eredményeket sorrendben továbbítsuk:
ids.stream () .collect (párhuzamos (i -> fetchByIdWithRandomDelay (i), végrehajtó, 4)) .forEach (System.out :: println); // user-1 // user-3 // user-2
Ebben az esetben arra számíthatunk, hogy a gyűjtő minden alkalommal más eredményeket ad vissza, mivel véletlenszerű feldolgozási késleltetést vezettünk be.
4.6. ParallelCollectors.parallelOrdered ()
Ez a lehetőség lehetővé teszi a streaming eredményekhez hasonlóan a fentieket, de fenntartja az eredeti sorrendet:
ids.stream () .collect (parallelOrdered (i -> fetchByIdWithRandomDelay (i), végrehajtó, 4)) .forEach (System.out :: println); // user-1 // user-2 // user-3
Ebben az esetben a gyűjtő mindig fenntartja a rendet, de lassabb lehet, mint a fentiek.
5. Korlátozások
Az írás pillanatában a párhuzamos gyűjtők nem működnek végtelen adatfolyamokkal még akkor is, ha rövidzárlati műveleteket alkalmaznak - ez a Stream API belsõ részei által elrendelt tervezési korlátozás. Egyszerűen fogalmazva, Folyams a gyűjtőket nem rövidzárlati műveletekként kezelik, ezért a folyamnak le kell állítania az összes upstream elemet.
A másik korlátozás az a rövidzárlati műveletek nem szakítják meg a fennmaradó feladatokat rövidzárlat után.
6. Következtetés
Láttuk, hogy a parallel-collectors könyvtár lehetővé teszi számunkra a párhuzamos feldolgozást az egyedi Java Stream API használatával Gyűjtők és CompletableFutures egyedi szálkészletek, párhuzamosság és nem blokkoló stílus felhasználása CompletableFutures.
Mint mindig, a kódrészletek is elérhetők a GitHubon.
További olvasáshoz tekintse meg a GitHub párhuzamos gyűjtők könyvtárát, a szerző blogját és a szerző Twitter-fiókját.