Java EE 7 kötegelt feldolgozás

1. Bemutatkozás

Képzelje el, hogy kézzel kellett elvégeznünk a feladatokat, például a fizetési jegyek feldolgozását, a kamat kiszámítását és a számlák generálását. Elég unalmas, hibára hajlamos és a kézi feladatok soha véget nem érő listája lesz!

Ebben az oktatóanyagban megnézzük a Java Batch Processing (JSR 352), a Jakarta EE platform részét, és egy nagyszerű specifikációt az ilyen feladatok automatizálásához. Az alkalmazásfejlesztőknek modellt kínál robusztus kötegelt feldolgozó rendszerek fejlesztésére, hogy azok az üzleti logikára koncentrálhassanak.

2. Maven-függőségek

Mivel a JSR 352 csak egy specifikáció, ezért hozzá kell adnunk az API-ját és a megvalósítását jberet:

 javax.batch javax.batch-api 1.0.1 org.jberet jberet-core 1.0.2.Final org.jberet jberet-support 1.0.2.Final org.jberet jberet-se 1.0.2.Final 

Hozzáadunk egy memóriában lévő adatbázist is, hogy megnézhessünk néhány reálisabb forgatókönyvet.

3. Fogalmak

A JSR 352 bemutat néhány fogalmat, amelyeket így nézhetünk meg:

Először határozzuk meg az egyes darabokat:

  • Balról indulva megvan a JobOperator. Azt kezeli a munka feldolgozásának minden szempontját, például az indítást, a leállítást és az újraindítást
  • Ezután megvan a Munka. A munka a lépések logikus gyűjteménye; egy teljes kötegelt folyamatot foglal magában
  • Egy munka 1 és n között lesz Lépéss. Minden lépés egy független, egymást követő munkaegység. Egy lépés áll olvasás bemenet, feldolgozás hogy a bemenet, és írás Kimenet
  • És végül, de nem utolsósorban, megvan a JobRepository amely a munkák futó adatait tárolja. Segít nyomon követni a munkahelyeket, állapotukat és befejezési eredményeiket

A lépések ennél valamivel részletesebbek, ezért nézzük meg ezt a következőt. Először megnézzük Chunk lépéseket, majd a Batchlets.

4. Egy darab létrehozása

Amint azt korábban elmondtuk, a darab egyfajta lépés. Gyakran használunk egy darabot egy olyan művelet kifejezésére, amelyet újra és újra végrehajtunk, mondjuk egy tétel halmazán. Ez olyan, mint a Java Stream közbenső műveletei.

Egy darab leírásakor kifejeznünk kell, honnan vegyünk elemeket, hogyan dolgozzuk fel őket, és hova küldjük el őket utána.

4.1. Tételek olvasása

Az elemek olvasásához végre kell hajtanunk ItemReader.

Ebben az esetben létrehozunk egy olvasót, amely egyszerűen kiadja az 1–10 számokat:

@Néves nyilvános osztály A SimpleChunkItemReader kiterjeszti az AbstractItemReader {private Integer [] tokent; magán egész szám; @ Injekció JobContext jobContext; @Orride public Integer readItem () dobja a Kivételt {if (count> = tokenek.hossz) {return null; } jobContext.setTransientUserData (szám); visszatérési tokenek [count ++]; } @Orride public void open (Serializable checkpoint) dobja a (z) {tokenek = új egész szám [] {1,2,3,4,5,6,7,8,9,10} kivételt; szám = 0; }}

Most csak az osztály belső állapotáról olvasunk itt. De természetesen, readItem adatbázisból tudna kihúzni, a fájlrendszerből vagy más külső forrásból.

Ne feledje, hogy ennek a belső állapotnak egy részét a rendszer használatával mentjük JobContext # setTransientUserData () ami később hasznos lesz.

Ezenkívül vegye figyelembe a ellenőrző pont paraméter. Ezt is újra felvesszük.

4.2. Az elemek feldolgozása

Természetesen az az oka, hogy darabosunk, hogy valamilyen műveletet akarunk végrehajtani a tárgyainkon!

Bármikor visszatérünk nulla egy elemfeldolgozóból dobjuk ki azt az elemet a kötegből.

Tehát mondjuk itt, hogy csak a páros számokat akarjuk megtartani. Használhatunk egy ItemProcessor hogy visszautasítással elutasítja a különöseket nulla:

@Néves nyilvános osztály A SimpleChunkItemProcessor megvalósítja az ItemProcessor {@Override public Integer processItem (Object t) {Integer item = (Integer) t; % 2 == 0 elem visszaküldése item: null; }}

processItem egyszer felhívják minden egyes tételünkhöz ItemReader kibocsájt.

4.3. Tételek írása

Végül a munka a ItemWriter így megírhatjuk az átalakított tételeinket:

@Néves nyilvános osztály A SimpleChunkWriter kiterjeszti az AbstractItemWriter {Lista feldolgozva = új ArrayList (); A @Orride public void writeItems (List elemek) kivételt dob ​​a {items.stream (). Map (Integer.class :: cast) .forEach (feldolgozott :: add); }} 

Milyen hosszú elemeket? Egy pillanat alatt meghatározzuk a darab méretét, amely meghatározza a küldött lista méretét writeItems.

4.4. Egy darab meghatározása egy munkában

Most mindezt összeállítottuk egy XML fájlban a JSL vagy a Job Specification Language használatával. Vegye figyelembe, hogy felsoroljuk olvasónkat, processzort, darabokat és egy darab méretet is:

A darab mérete azt mutatja meg, hogy a darabon belüli előrehaladás milyen gyakran van elkötelezve a job-tárház felé, ami fontos a befejezés garantálásához, ha a rendszer egy része meghibásodik.

Helyeznünk kell ezt a fájlt META-INF / kötegelt feladatok mert.befőttes üveg fájlok és a WEB-INF / osztályok / META-INF / kötegelt feladatok mert .háború fájlokat.

Munkánknak megadtuk az azonosítót „SimpleChunk”, szóval próbáljuk meg ezt egységteszten.

Most a munkákat aszinkron módon hajtják végre, ami bonyolulttá teszi őket. A mintában feltétlenül nézze meg a mi BatchTestHelper amely megkérdezi és várja, amíg a munka befejeződik:

@Test public void givenChunk_thenBatch_completesWithSuccess () dobja a Kivételt {JobOperator jobOperator = BatchRuntime.getJobOperator (); Hosszú végrehajtásId = jobOperator.start ("simpleChunk", új tulajdonságok ()); JobExecution jobExecution = jobOperator.getJobExecution (végrehajtásId); jobExecution = BatchTestHelper.keepTestAlive (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); } 

Tehát ilyenek a darabok. Vessünk egy pillantást a kötegekre.

5. Batchlet létrehozása

Nem minden illik megfelelően az iteratív modellbe. Például lehet egy olyan feladatunk, amelyre egyszerűen szükségünk van egyszer hívja meg, futtassa a befejezésig, és adja vissza a kilépési állapotot.

A kötegelt szerződés meglehetősen egyszerű:

@Names public class A SimpleBatchLet kiterjeszti az AbstractBatchlet {@Orride public String process () dobja a Kivételt {return BatchStatus.COMPLETED.toString (); }}

Ahogy a JSL:

És tesztelhetjük ugyanolyan megközelítéssel, mint korábban:

@Test public void givenBatchlet_thenBatch_completeWithSuccess () dobja a Kivételt {JobOperator jobOperator = BatchRuntime.getJobOperator (); Hosszú végrehajtásId = jobOperator.start ("simpleBatchLet", új Tulajdonságok ()); JobExecution jobExecution = jobOperator.getJobExecution (végrehajtásId); jobExecution = BatchTestHelper.keepTestAlive (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

Tehát megvizsgáltunk néhány különböző módszert a lépések végrehajtására.

Most nézzük meg a mechanizmusokat az előrelépés jelölése és garantálása.

6. Egyedi ellenőrző pont

A kudarcok biztosan a munka közepén történnek. Csak az egészet kell kezdenünk, vagy valahogy ott kezdhetjük, ahol abbahagytuk?

Ahogy a neve is sugallja, ellenőrzőpontok segítsen nekünk rendszeresen könyvjelzőt beállítani hiba esetén.

Alapértelmezés szerint a darabos feldolgozás vége természetes ellenőrző pont.

A sajátunkkal azonban testre szabhatjuk CheckpointAlgorithm:

@Néves nyilvános osztály A CustomCheckPoint kiterjeszti az AbstractCheckpointAlgorithm {@Inject JobContext jobContext; @A nyilvános logikai érték felülírása az isReadyToCheckpoint () dobja a (z) {int counterRead = (Egész szám) jobContext.getTransientUserData () kivételt; return counterRead% 5 == 0; }}

Emlékszel arra a számra, amelyet korábban átmeneti adatokba helyeztünk? Itt, azzal kihúzhatjuk JobContext # getTransientUserDatakijelenteni, hogy minden ötödik feldolgozott számra el akarunk kötelezni magunkat.

Enélkül minden darab, vagy esetünkben minden 3. szám végén egy elkövetés történne.

És akkor ezt összeegyeztetjük a pénztár-algoritmus irányelv az XML-ben a darabunk alatt:

Teszteljük a kódot, ismét megjegyezve, hogy a kazán néhány lépése el van rejtve BatchTestHelper:

@Test public void givenChunk_whenCustomCheckPoint_thenCommitCountIsThree () dobja a Kivételt {// ... indítsa el a munkát, és várja meg a befejezését .hosszúérték ())); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

Tehát valószínűleg 2-es elkötelezettség-számításra számíthatunk, mivel tíz elemünk van, és úgy állítottuk be az elkötelezettségeket, hogy minden 5. tétel legyen. De, a keret még egy végső olvasatot elkötelez a végén annak biztosítása érdekében, hogy minden feldolgozásra kerüljön, ami eljuttat minket 3-ig.

Ezután nézzük meg, hogyan kell kezelni a hibákat.

7. Kivételek kezelése

Alapértelmezés szerint, az álláskezelő a munkánkat úgy jelöli meg NEM SIKERÜLT kivétel esetén.

Változtassuk meg az elemolvasónkat, hogy megbizonyosodjunk arról, hogy nem sikerül:

@ A nyilvános egész felülbírálása a readItem () segítségével kiveti az {if (tokens.hasMoreTokens ()) {String tempTokenize = tokens.nextToken () karakterláncot; dobja új RuntimeException (); } return null; }

És akkor tesztelje:

@Test public void whenChunkError_thenBatch_CompletesWithFailed () dobja a Kivételt {// ... indítsa el a munkát, és várja meg a assertEquals befejezését (jobExecution.getBatchStatus (), BatchStatus.FAILED); }

De ezt az alapértelmezett viselkedést számos módon felülírhatjuk:

  • kihagyási határ meghatározza a kivételek számát, amelyet ez a lépés figyelmen kívül hagy, mielőtt kudarcot vallana
  • próbálkozzon újra meghatározza, hogy hányszor kell a kezelőnek újra megpróbálnia a lépést, mielőtt kudarcot vallana
  • átugorható-kivétel-osztály megad egy sor kivételt, amelyeket a darabos feldolgozás figyelmen kívül hagy

Tehát úgy szerkeszthetjük a munkánkat, hogy figyelmen kívül hagyja RuntimeException, valamint néhány más, csak illusztráció:

És most átmegy a kódunk:

@Test public void givenChunkError_thenErrorSkipped_CompletesWithSuccess () dobja a Kivételt {// ... indítsa el a munkát, és várja meg a befejezését .hosszúérték ())); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8. Több lépés végrehajtása

Korábban említettük, hogy egy munkának tetszőleges számú lépése lehet, ezért nézzük meg ezt most.

8.1. A következő lépés indítása

Alapértelmezés szerint, minden lépés a munka utolsó lépése.

A kötegelt jobon belül a következő lépés végrehajtásához kifejezetten meg kell adnunk a következő attribútum a lépésdefiníción belül:

Ha elfelejtjük ezt az attribútumot, akkor a sorrendben következő lépés nem kerül végrehajtásra.

És láthatjuk, hogy néz ki ez az API-ban:

@Test public void givenTwoSteps_thenBatch_CompleteWithSuccess () dobja a Kivételt {// ... indítsa el a munkát, és várja meg a teljesítést assertEquals (2, jobOperator.getStepExecutions (végrehajtásiId) .size ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8.2. Áramlik

A lépések sorrendje be is zárható a folyam. Amikor a folyamat befejeződött, akkor a teljes folyamat áttér a végrehajtási elemre. Továbbá az áramláson belüli elemek nem tudnak áttérni az áramláson kívüli elemekre.

Mondjuk végrehajthatunk két lépést egy áramlás belsejében, majd ezt az áramlást átválthatjuk egy elszigetelt lépésbe:

És továbbra is függetlenül láthatjuk az egyes lépések végrehajtását:

@Test public void givenFlow_thenBatch_CompleteWithSuccess () dobja a Kivételt {// ... indítsa el a munkát, és várja meg a teljesítést assertEquals (3, jobOperator.getStepExecutions (végrehajtásiId) .size ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

8.3. Döntések

Megvan az if / else támogatás is döntéseket. A döntések előírják a lépések, folyamatok és osztások közötti sorrend meghatározásának testreszabott módja.

A lépésekhez hasonlóan az átmeneti elemeken is működik, mint pl következő amely irányíthatja vagy megszüntetheti a munka végrehajtását.

Nézzük meg, hogyan konfigurálható a munka:

Bármi döntés elemet egy megvalósítandó osztállyal kell konfigurálni Döntő. Feladata a döntés visszaküldése a Húr.

Minden egyes következő belül döntés olyan, mint a ügy a kapcsoló nyilatkozat.

8.4. Hasad

Hasad hasznosak, mivel lehetővé teszik az áramlások egyidejű végrehajtását:

Természetesen, ez azt jelenti, hogy a megrendelés nem garantált.

Megerősítjük, hogy még mindig futnak. Az áramlási lépéseket tetszőleges sorrendben hajtjuk végre, de az elszigetelt lépés mindig utolsó lesz:

@Test public void givenSplit_thenBatch_CompletesWithSuccess () dobja a Kivételt {// ... indítsa el a munkát, és várja meg a befejezést. List stepExecutions = jobOperator.getStepExecutions (végrehajtásiId); assertEquals (3, stepExecutions.size ()); assertEquals ("splitJobSequenceStep3", stepExecutions.get (2) .getStepName ()); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

9. Munka felosztása

A Java-kódunkon belüli kötegelt tulajdonságokat is felhasználhatjuk, amelyeket a munkánkban definiáltunk.

Három szinten lehet skálázni őket - a munka, a lépés és a kötegelt tárgy.

Lássunk néhány példát arra, hogyan fogyasztottak.

Amikor a tulajdonságokat munka szinten szeretnénk felhasználni:

@ Injekció JobContext jobContext; ... jobProperties = jobContext.getProperties (); ...

Ez fokozatosan is felhasználható:

@ Injekció StepContext stepContext; ... stepProperties = stepContext.getProperties (); ...

Amikor a tulajdonságokat kötegelt artefaktum szinten akarjuk felhasználni:

@Inject @BatchProperty (név = "név") privát karakterlánc nameString;

Ez jól jön a partíciókkal.

Lásd: osztásokkal egyidejűleg futtathatjuk az áramlásokat. De mi is partíció egy lépés n elemkészletek vagy külön bemenetek beállítása, lehetővé téve számunkra a munka több szálra bontásának másik módját.

Az egyes partíciók által elvégzendő munka szegmensének megértéséhez kombinálhatjuk a tulajdonságokat a partíciókkal:

10. Állítsa le és indítsa újra

Most ez a munkahelyek meghatározása. Most beszéljünk egy percig azok kezeléséről.

Egységtesztjeinken már láttuk, hogy kaphatunk példányt JobOperator tól től BatchRuntime:

JobOperator jobOperator = BatchRuntime.getJobOperator ();

És akkor elkezdhetjük a munkát:

Hosszú végrehajtási ID = jobOperator.start ("simpleBatchlet", új tulajdonságok ());

Megállíthatjuk azonban a munkát is:

jobOperator.stop (végrehajtásId);

Végül pedig újraindíthatjuk a munkát:

végrehajtási azonosító = jobOperator.restart (végrehajtási azonosító, új tulajdonságok ());

Nézzük meg, hogyan állíthatjuk le a futó munkát:

@Test public void givenBatchLetStarted_whenStopped_thenBatchStopped () dobja a Kivételt {JobOperator jobOperator = BatchRuntime.getJobOperator (); Hosszú végrehajtásId = jobOperator.start ("simpleBatchLet", új Tulajdonságok ()); JobExecution jobExecution = jobOperator.getJobExecution (végrehajtásId); jobOperator.stop (végrehajtásId); jobExecution = BatchTestHelper.keepTestStopped (jobExecution); assertEquals (jobExecution.getBatchStatus (), BatchStatus.STOPPED); }

És ha egy tétel van LEÁLLÍTOTT, akkor újraindíthatjuk:

@Test public void givenBatchLetStopped_whenRestarted_thenBatchCompletesSuccess () {// ... indítsa és állítsa le a job assertEquals (jobExecution.getBatchStatus (), BatchStatus.STOPPED); végrehajtásId = jobOperator.restart (jobExecution.getExecutionId (), új tulajdonságok ()); jobExecution = BatchTestHelper.keepTestAlive (jobOperator.getJobExecution (végrehajtásiId)); assertEquals (jobExecution.getBatchStatus (), BatchStatus.COMPLETED); }

11. Állások lekérése

Amikor kötegelt feladatot nyújt be, akkor a kötegelt futásidejű létrehoz egy példányát JobExecution hogy nyomon kövesse.

A JobExecution végrehajtási azonosítóhoz használhatjuk a JobOperator # getJobExecution (végrehajtásId) módszer.

És, StepExecution hasznos információkat nyújt a lépés végrehajtásának nyomon követéséhez.

A StepExecution végrehajtási azonosítóhoz használhatjuk a JobOperator # getStepExecutions (végrehajtásId) módszer.

És ebből számos mutatót kaphatunk a lépésről StepExecution # getMetrics:

@Test public void givenChunk_whenJobStarts_thenStepsHaveMetrics () dobja a Kivételt {// ... indítsa el a munkát, és várja meg a teljesítést assertTrue (jobOperator.getJobNames (). Tartalmaz ("simpleChunk")); assertTrue (jobOperator.getParameters (végrehajtásiId) .isEmpty ()); StepExecution stepExecution = jobOperator.getStepExecutions (végrehajtásId) .get (0); Map metricTest = BatchTestHelper.getMetricsMap (stepExecution.getMetrics ()); assertEquals (10L, metricTest.get (Metric.MetricType.READ_COUNT) .longValue ()); assertEquals (5L, metricTest.get (Metric.MetricType.FILTER_COUNT) .longValue ()); assertEquals (4L, metricTest.get (Metric.MetricType.COMMIT_COUNT) .longValue ()); assertEquals (5L, metricTest.get (Metric.MetricType.WRITE_COUNT) .longValue ()); // ... és még sok más! }

12. Hátrányok

A JSR 352 nagy teljesítményű, bár számos területen hiányzik:

  • Úgy tűnik, hogy nincsenek olyan olvasók és írók, akik más formátumokat is képesek feldolgozni, például a JSON-t
  • A generikumok nem támogatják
  • A particionálás csak egyetlen lépést támogat
  • Az API nem kínál semmit az ütemezés támogatásához (bár a J2EE külön ütemezési modullal rendelkezik)
  • Aszinkron jellege miatt a tesztelés kihívást jelenthet
  • Az API meglehetősen részletes

13. Következtetés

Ebben a cikkben a JSR 352-t néztük meg, és megismertük a darabokat, a kötegeket, a hasadásokat, az áramlásokat és még sok minden mást. Mégis, alig kapartuk el a felszínt.

Mint mindig, a demokód megtalálható a GitHubon.