Tavaszi köteg a particionálóval

1. Áttekintés

A Spring Batch korábbi bevezetésében a keretrendszert szakaszos feldolgozó eszközként mutattuk be. Megvizsgáltuk a konfigurációs részleteket és az egyszálú, egyfolyamatú feladat végrehajtásának megvalósítását is.

Egy munka megvalósításához néhány párhuzamos feldolgozással számos lehetőség áll rendelkezésre. Magasabb szinten a párhuzamos feldolgozásnak két módja van:

  1. Egyfolyamatos, többszálas
  2. Több folyamat

Ebben a rövid cikkben a partíciókkal foglalkozunk Lépés, amely egyszeres és több folyamatból álló feladatokra is megvalósítható.

2. Egy lépés felosztása

A Spring Batch particionálással lehetőséget nyújtunk a Lépés:

A particionálás áttekintése

A fenti kép a Munka particionálva Lépés.

Van egy Lépés úgynevezett „Mester”, akinek végrehajtása „Slave” lépésekre oszlik. Ezek a rabszolgák átvehetik a mester helyét, és az eredmény változatlan marad. A mester és a rabszolga is Lépés. A rabszolgák lehetnek távoli szolgáltatások vagy csak lokálisan futtató szálak.

Szükség esetén átadhatjuk az adatokat a master-től a slave-hez. A metaadatok (vagyis a JobRepository) biztosítja, hogy minden rabszolgát csak egyszer futtassanak a Munka.

Itt található az egész működését bemutató sorrenddiagram

Particionálási lépés

Mint látható, a PartitionStep hajtja a kivégzést. A PartitionHandler felelős azért, hogy a „Mester” munkáját felossza a „Rabszolgákra”. A jobb szélső Lépés a rabszolga.

3. A Maven POM

A Maven-függőségek megegyeznek az előző cikkünkben említettekkel. Vagyis a Spring Core, a Spring Batch és az adatbázis függősége (esetünkben SQLite).

4. Konfiguráció

Bevezető cikkünkben láttunk egy példát néhány pénzügyi adat konvertálására CSV-ből XML-fájlba. Hosszabbítsuk ugyanazt a példát.

Itt 5 CSV-fájl pénzügyi adatait konvertáljuk megfelelő XML-fájlokká egy többszálú megvalósítás segítségével.

Ezt egyetlen felhasználásával érhetjük el Munka és Lépés particionálás. Öt szálunk lesz, egy-egy a CSV-fájlokhoz.

Először hozzunk létre egy állást:

@Bean (name = "partitionerJob") public Job particionálóJob () dobja az UnexpectedInputException, MalformedURLException, ParseException {return jobs.get ("partitioningJob") .start (partitionStep ()) .build (); }

Mint láthatjuk, ez Munka -val kezdődik PartíciózásStep. Ez a mester lépésünk, amelyet különféle rabszolgalépésekre osztunk:

A @Bean public Step partícióStep () dobja az UnexpectedInputException, MalformedURLException, ParseException {return steps.get ("partitionStep") .partitioner ("slaveStep", partitioner ()) .step (slaveStep ()) .tepExecutor (taskExecutor () (); }

Itt fogjuk létrehozni a Lépjen a StepBuilderFactory segítségével. Ehhez meg kell adnunk a SlaveSteps és a Particionáló.

A Partíció egy interfész, amely lehetőséget nyújt a bemeneti értékek meghatározására az egyes slave-ekhez. Más szóval, a feladatok megfelelő szálakra bontásának logikája megy itt.

Hozzunk létre egy megvalósítást, az úgynevezett CustomMultiResourcePartitioner, ahol a bemeneti és kimeneti fájlneveket a ExecutionContext továbbadni minden rabszolga lépésnek:

public class CustomMultiResourcePartitioner implementálja a Partitionert {@Orride public Map partition (int gridSize) {Map map = new HashMap (gridSize); int i = 0, k = 1; for (Erőforrás-erőforrás: erőforrások) {ExecutionContext context = new ExecutionContext (); Assert.state (resource.exists (), "Az erőforrás nem létezik:" + erőforrás); context.putString (kulcsnév, erőforrás.getFilename ()); context.putString ("opFileName", "output" + k +++ ". xml"); map.put (PARTITION_KEY + i, kontextus); i ++; } visszatérési térkép; }}

Ehhez az osztályhoz létrehozzuk a babot is, ahol megadjuk a bemeneti fájlok forráskönyvtárát:

@Bean public CustomMultiResourcePartitioner particionáló () {CustomMultiResourcePartitioner partitioner = új CustomMultiResourcePartitioner (); Erőforrás [] erőforrások; próbáld ki az {resources = resourcessePatternResolver .getResources ("fájl: src / main / resources / input / *. csv"); } catch (IOException e) {dobjon új RuntimeException-t ("I / O problémák a" + "bemeneti fájlminta megoldásakor.", e); } partitioner.setResources (erőforrások); visszatérő partíció; }

Meghatározzuk a rabszolga lépést, csakúgy, mint bármely más lépést az olvasóval és az íróval. Az olvasó és az író ugyanaz lesz, mint amit a bevezető példánkban láttunk, azzal a különbséggel, hogy a fájlnév paramétert a StepExecutionContext.

Ne feledje, hogy ezeket a babokat fokozatosan kell lefedni, hogy képesek legyenek megkapni a stepExecutionContext params, minden lépésnél. Ha nem választanák őket lépcsős körbe, akkor a babjaikat először létrehozzák, és nem fogadják el a fájlneveket a lépés szintjén:

@StepScope @Bean public FlatFileItemReader itemReader (@Value ("# {stepExecutionContext [fileName]}" ") Karakterlánc fájlnév) dobja az UnexpectedInputException, ParseException {FlatFileItemReader olvasó = új FlatFileItemReader (); DelimitedLineTokenizer tokenizer = új DelimitedLineTokenizer (); String [] tokenek = {"felhasználónév", "felhasználónév", "tranzakció dátuma", "összeg"}; tokenizer.setNames (tokenek); reader.setResource (új ClassPathResource ("input /" + fájlnév)); DefaultLineMapper lineMapper = új DefaultLineMapper (); lineMapper.setLineTokenizer (tokenizer); lineMapper.setFieldSetMapper (új RecordFieldSetMapper ()); olvasó.setLinesToSkip (1); reader.setLineMapper (lineMapper); visszatérő olvasó; } 
@Bean @StepScope public ItemWriter itemWriter (Marshaller marshaller, @Value ("# {stepExecutionContext [opFileName]}") Karakterlánc fájlnév) MalformedURLException {StaxEventItemWriter itemWriter = new StaxEventItemWriter (); itemWriter.setMarshaller (marshaller); itemWriter.setRootTagName ("tranzakciórekord"); itemWriter.setResource (új ClassPathResource ("xml /" + fájlnév)); return itemWriter; }

Miközben a szolga lépésben megemlítjük az olvasót és az írót, az argumentumokat nullként adhatjuk át, mert ezeket a fájlneveket nem fogjuk használni, mivel a stepExecutionContext:

A @Bean public Step slaveStep () dobja az UnexpectedInputException, MalformedURLException, ParseException {return steps.get ("slaveStep"). Chunk (1) .reader (itemReader (null)) .writer (itemWriter (marshaller (), null)) .build (); }

5. Következtetés

Ebben az oktatóanyagban megvitattuk, hogyan lehet egy munkát párhuzamosan feldolgozni a Spring Batch segítségével.

Mint mindig, ennek a példának a teljes megvalósítása elérhető a GitHubon.