Útmutató a Java párhuzamos gyűjtők könyvtárához

1. Bemutatkozás

A Parallel-collectors egy kis könyvtár, amely egy sor Java Stream API-gyűjtőt kínál, amelyek lehetővé teszik a párhuzamos feldolgozást - ugyanakkor megkerülik a szabványos Parallel Stream fő hiányosságait.

2. Maven-függőségek

Ha el akarjuk kezdeni a könyvtár használatát, egyetlen bejegyzést kell hozzáadnunk a Maven-be pom.xml fájl:

 com.pivovarit párhuzamos gyűjtők 1.1.0 

Vagy egyetlen sor a Gradle build fájljában:

fordítsd le a „com.pivovarit: parallel-collectors: 1.1.0”

A legújabb verzió megtalálható a Maven Central oldalon.

3. Párhuzamos folyamok figyelmeztetései

A Párhuzamos adatfolyamok a Java 8 egyik kiemelt témája voltak, de kiderült, hogy kizárólag nehéz CPU-feldolgozásra alkalmazhatók.

Ennek oka az volt, hogy A Párhuzamos folyamokat belső szinten egy JVM-szintű megosztott támogatta ForkJoinPool, amely korlátozott párhuzamosságot biztosított és egyetlen JVM-példányon futó összes párhuzamos adatfolyam használta.

Tegyük fel például, hogy van egy listánk az azonosítókról, és felhasználni akarjuk őket a felhasználók listájának lekérésére, és hogy ez a művelet drága.

Használhatnánk ehhez a Párhuzamos folyamokat:

Listazonosítók = tömbök. AsList (1, 2, 3); Eredmények felsorolása = ids.parallelStream () .map (i -> fetchById (i)) // minden művelet egy másodpercet vesz igénybe .collect (Collectors.toList ()); System.out.println (eredmények); // [user-1, user-2, user-3]

És valóban láthatjuk, hogy észrevehető a gyorsulás. De problematikussá válik, ha elkezdünk több párhuzamos blokkolási műveletet futtatni ... párhuzamosan. Ez gyorsan telítheti a medencét és potenciálisan hatalmas késleltetéseket eredményezhetnek. Ezért fontos a válaszfalak megépítése külön szálkészletek létrehozásával - annak megakadályozása érdekében, hogy a nem kapcsolódó feladatok befolyásolják egymás végrehajtását.

A szokás biztosítása érdekében ForkJoinPool Például kihasználhatnánk az itt leírt trükköt, de ez a megközelítés egy dokumentálatlan hackelésre támaszkodott, és hibás volt a JDK10-ig. Magában a számban olvashatunk bővebben - [JDK8190974].

4. Párhuzamos gyűjtők működés közben

A párhuzamos gyűjtők, amint a neve is mutatja, csak szabványos Stream API-gyűjtők, amelyek lehetővé teszik további műveletek párhuzamos végrehajtását a gyűjt() fázis.

ParallelCollectors (amely tükröz Gyűjtők osztály) a homlokzat, amely hozzáférést biztosít a könyvtár teljes funkcionalitásához.

Ha a fenti példát szeretnénk megismételni, egyszerűen írhatnánk:

ExecutorService végrehajtó = Executors.newFixedThreadPool (10); Listazonosítók = tömbök. AsList (1, 2, 3); CompletableFuture eredmények = ids.stream () .collect (ParallelCollectors.parallelToList (i -> fetchById (i), végrehajtó, 4)); System.out.println (results.join ()); // [user-1, user-2, user-3]

Az eredmény ugyanakkora, meg tudtuk adni az egyéni szálkészletünket, megadhattuk az egyedi párhuzamossági szintünket, és az eredmény a CompletableFuture az aktuális szál blokkolása nélkül.

A Normál Párhuzamos Áramlat viszont egyiket sem tudta elérni.

4.1. ParallelCollectors.parallelToList / ToSet ()

Amilyen intuitív, ha feldolgozni akarjuk a Folyam párhuzamosan, és gyűjtse össze az eredményeket a Lista vagy Készlet, egyszerűen használhatjuk ParallelCollectors.parallelToList vagy parallelToSet:

Listazonosítók = tömbök. AsList (1, 2, 3); Eredménylista = ids.stream () .collect (parallelToList (i -> fetchById (i), végrehajtó, 4)) .join ();

4.2. ParallelCollectors.parallelToMap ()

Ha gyűjteni akarunk Folyam elemeket a Térkép Például a Stream API-hoz hasonlóan két térképet is meg kell adnunk:

Listazonosítók = tömbök. AsList (1, 2, 3); Térkép eredmények = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), végrehajtó, 4)) .join (); // {1 = user-1, 2 = user-2, 3 = user-3}

Igényt is tudunk biztosítani Térkép példa Támogató:

Térkép eredményei = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, végrehajtó, 4)). 

És egy egyedi konfliktusmegoldási stratégia:

Listazonosítók = tömbök. AsList (1, 2, 3); Térkép eredmények = ids.stream () .collect (parallelToMap (i -> i, i -> fetchById (i), TreeMap :: new, (s1, s2) -> s1, végrehajtó, 4)) .join ();

4.3. ParallelCollectors.parallelToCollection ()

A fentiekhez hasonlóan átadhatjuk szokásunkat Gyűjtemény beszállítója ha egyedi tárolóba csomagolva szeretnénk eredményeket elérni:

Eredmények listája = ids.stream () .collect (parallelToCollection (i -> fetchById (i), LinkedList :: new, végrehajtó, 4)) .join ();

4.4. ParallelCollectors.parallelToStream ()

Ha a fentiek nem elégek, akkor valóban megszerezhetjük a Folyam példány, és ott folytassa az egyéni feldolgozást:

Térkép results = ids.stream () .collect (parallelToStream (i -> fetchById (i), végrehajtó, 4)) .thenApply (stream -> stream.collect (Collectors.groupingBy (i -> i.length ()))) .csatlakozik();

4.5. ParallelCollectors.parallel ()

Ez lehetővé teszi számunkra, hogy az eredményeket sorrendben továbbítsuk:

ids.stream () .collect (párhuzamos (i -> fetchByIdWithRandomDelay (i), végrehajtó, 4)) .forEach (System.out :: println); // user-1 // user-3 // user-2 

Ebben az esetben arra számíthatunk, hogy a gyűjtő minden alkalommal más eredményeket ad vissza, mivel véletlenszerű feldolgozási késleltetést vezettünk be.

4.6. ParallelCollectors.parallelOrdered ()

Ez a lehetőség lehetővé teszi a streaming eredményekhez hasonlóan a fentieket, de fenntartja az eredeti sorrendet:

ids.stream () .collect (parallelOrdered (i -> fetchByIdWithRandomDelay (i), végrehajtó, 4)) .forEach (System.out :: println); // user-1 // user-2 // user-3 

Ebben az esetben a gyűjtő mindig fenntartja a rendet, de lassabb lehet, mint a fentiek.

5. Korlátozások

Az írás pillanatában a párhuzamos gyűjtők nem működnek végtelen adatfolyamokkal még akkor is, ha rövidzárlati műveleteket alkalmaznak - ez a Stream API belsõ részei által elrendelt tervezési korlátozás. Egyszerűen fogalmazva, Folyams a gyűjtőket nem rövidzárlati műveletekként kezelik, ezért a folyamnak le kell állítania az összes upstream elemet.

A másik korlátozás az a rövidzárlati műveletek nem szakítják meg a fennmaradó feladatokat rövidzárlat után.

6. Következtetés

Láttuk, hogy a parallel-collectors könyvtár lehetővé teszi számunkra a párhuzamos feldolgozást az egyedi Java Stream API használatával Gyűjtők és CompletableFutures egyedi szálkészletek, párhuzamosság és nem blokkoló stílus felhasználása CompletableFutures.

Mint mindig, a kódrészletek is elérhetők a GitHubon.

További olvasáshoz tekintse meg a GitHub párhuzamos gyűjtők könyvtárát, a szerző blogját és a szerző Twitter-fiókját.