A tavasz integrálása az AWS Kinezissel

1. Bemutatkozás

A Kinesis egy eszköz az adatfolyamok valós idejű gyűjtésére, feldolgozására és elemzésére, amelyet az Amazon fejlesztett ki. Az egyik fő előnye, hogy segíti az eseményvezérelt alkalmazások fejlesztését.

Ebben az oktatóanyagban néhány könyvtárat fogunk felfedezni lehetővé teszi a tavaszi alkalmazásunk számára, hogy rekordokat készítsen és felhasználjon egy Kinesis Streamből. A kódpéldák megmutatják az alapvető funkciókat, de nem a gyártásra kész kódot jelentik.

2. Előfeltétel

Mielőtt tovább mennénk, két dolgot kell tennünk.

Az első egy tavaszi projekt létrehozása, mivel itt az a cél, hogy kapcsolatba lépjünk a tavaszi projekt kinézisével.

A második egy Kinesis adatfolyam létrehozása. Ezt megtehetjük egy webböngészőből az AWS-fiókunkban. Az AWS CLI rajongók egyik alternatívája a parancssor használata. Mivel kódból fogunk kölcsönhatásba lépni vele, kéznél kell tartanunk az AWS IAM hitelesítő adatait, a hozzáférési és titkos kulcsot, valamint a régiót is.

Valamennyi gyártóunk hamis IP-címrekordokat hoz létre, míg a fogyasztók ezeket az értékeket elolvassák és felsorolják az alkalmazáskonzolon.

3. AWS SDK Java-hoz

A legelső könyvtár, amelyet használni fogunk, az AWS SDK for Java. Előnye, hogy lehetővé teszi számunkra, hogy a Kinesis Data Streamekkel való együttműködés számos részét kezeljük. Tudunk adatokat olvasni, adatokat előállítani, adatfolyamokat létrehozni és az adatfolyamokat újratervezni. Hátránya, hogy a gyártásra kész kód megszerzéséhez olyan szempontokat kell kódolnunk, mint a továbbépítés, a hibakezelés vagy a démon, hogy életben tartsuk a fogyasztót.

3.1. Maven-függőség

Az amazon-kinezis és kliens Maven-függőség mindent elhoz, amire szükségünk van, hogy működőképes példákkal rendelkezzünk. Most hozzáadjuk a sajátunkhoz pom.xml fájl:

 com.amazonaws amazon-kinezis-kliens 1.11.2 

3.2. Tavaszi beállítás

Használjuk újra a AmazonKinesis a Kinesis-adatfolyamunkkal való együttműködéshez szükséges objektum. Létrehozzuk a @Bab bennünk @SpringBootApplication osztály:

@Bean nyilvános AmazonKinesis buildAmazonKinesis () {BasicAWSCredentials awsCredentials = új BasicAWSCredentials (accessKey, secretKey); adja vissza az AmazonKinesisClientBuilder.standard () .withCredentials (új AWSStaticCredentialsProvider (awsCredentials)) .withRegion (Regions.EU_CENTRAL_1) .build (); }

Ezután határozzuk meg a aws.access.key és aws.secret.key, szükséges a helyi géphez, in alkalmazás.tulajdonságok:

aws.access.key = my-aws-access-key-goes-here aws.secret.key = my-aws-secret-key-here-megy

És elolvassuk őket a @Érték kommentár:

@Value ("$ {aws.access.key}") privát karakterlánc accessKey; @Value ("$ {aws.secret.key}") privát String secretKey;

Az egyszerűség kedvéért támaszkodunk @Ütemezett módszerek a rekordok létrehozására és felhasználására.

3.3. Fogyasztó

A Az AWS SDK Kinesis Consumer pull modellt használ, vagyis kódunk rekordokat fog meríteni a Kinesis adatfolyam szilánkjaiból:

GetRecordsRequest recordsRequest = új GetRecordsRequest (); recordsRequest.setShardIterator (shardIterator.getShardIterator ()); recordsRequest.setLimit (25); GetRecordsResult recordsResult = kinesis.getRecords (recordsRequest); while (! recordsResult.getRecords (). isEmpty ()) {recordsResult.getRecords (). stream () .map (rekord -> új karakterlánc (record.getData (). tömb ())). forEach (System.out: : println); recordsRequest.setShardIterator (recordsResult.getNextShardIterator ()); recordsResult = kinesis.getRecords (recordsRequest); }

A GetRecordsRequest object hozza létre az adatfolyam kérését. Példánkban határoztunk meg kérelemenként 25 rekord korlátot, és addig olvasunk, amíg nincs több olvasnivaló.

Azt is észrevehetjük, hogy iterációnkhoz a GetShardIteratorResult tárgy. Ezt a tárgyat a @PostConstruct módszerrel, hogy azonnal megkezdjük a nyilvántartások nyomon követését:

privát GetShardIteratorResult shardIterator; @PostConstruct private void buildShardIterator () {GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest (); readShardsRequest.setStreamName (IPS_STREAM); readShardsRequest.setShardIteratorType (ShardIteratorType.LATEST); readShardsRequest.setShardId (IPS_SHARD_ID); this.shardIterator = kinesis.getShardIterator (readShardsRequest); }

3.4. Termelő

Most nézzük meg, hogyan kell kezeli a Kinesis adatfolyamunk rekordjainak létrehozását.

Adatokat szúrunk be a PutRecordsRequest tárgy. Ehhez az új objektumhoz hozzáadunk egy többszörös listát PutRecordsRequestEntry tárgyak:

Lista bejegyzések = IntStream.range (1, 200) .mapToObj (ipSuffix -> {PutRecordsRequestEntry entry = new PutRecordsRequestEntry (); entry.setData (ByteBuffer.wrap (("192.168.0." + IpSuffix) .getBytes ())) ; entry.setPartitionKey (IPS_PARTITION_KEY); return entry;}). gyűjt (Collectors.toList ()); PutRecordsRequest createRecordsRequest = new PutRecordsRequest (); createRecordsRequest.setStreamName (IPS_STREAM); createRecordsRequest.setRecords (bejegyzések); kinesis.putRecords (createRecordsRequest);

Létrehoztunk egy alapfogyasztót és egy szimulált IP-rekordokat. Már csak a tavaszi projekt futtatása van hátra, és meg kell tekinteni az IP-címeket az alkalmazáskonzolunkban.

4. KCL és KPL

A Kinesis Client Library (KCL) egy olyan könyvtár, amely leegyszerűsíti a rekordok felhasználását. Ez a Kinesis adatfolyamok AWS SDK Java API-jai felett is absztrakciós réteg. A színfalak mögött a könyvtár kezeli a terheléselosztást sok példányban, reagál a példányhibákra, ellenőrzi a feldolgozott rekordokat és reagál az újraterjesztésre.

A Kinesis Producer Library (KPL) olyan könyvtár, amely hasznos a Kinesis adatfolyamba történő íráshoz. Ez egy olyan absztrakciós réteget is biztosít, amely az AWS SDK Java API-k felett helyezkedik el a Kinesis adatfolyamokhoz. A jobb teljesítmény érdekében a könyvtár automatikusan kezeli a kötegelt, több szálas és újrapróbálkozási logikát.

A KCL-nek és a KPL-nek is megvan a legfőbb előnye, hogy könnyen kezelhetők, így a rekordok előállítására és felhasználására tudunk koncentrálni.

4.1. Maven-függőségek

A két könyvtár külön-külön hozható projektünkbe, ha szükséges. A KPL és a KCL bevonásához a Maven projektünkbe frissítenünk kell a pom.xml fájlt:

 com.amazonaws amazon-kinezis-producer 0.13.1 com.amazonaws amazon-kinezis-kliens 1.11.2 

4.2. Tavaszi beállítás

Az egyetlen tavaszi előkészületre van szükségünk, hogy megbizonyosodjunk arról, hogy kéznél vannak az IAM hitelesítő adatai. A. Értékei aws.access.key és aws.secret.key a mi alkalmazás.tulajdonságok fájlt, hogy elolvashassuk őket @Érték amikor szükség van.

4.3. Fogyasztó

Először is hozzon létre egy osztályt, amely megvalósítja a IRecordProcessor felületet, és meghatározza a Kinesis adatfolyam-rekordjainak kezelésére vonatkozó logikánkat, vagyis ki kell nyomtatni őket a konzolba:

public class IpProcessor hajtja végre az IRecordProcessor {@Override public void Initialize (InitializationInput InitializationInput) {} @Orride public void processRecords (ProcessRecordsInput processRecordsInput) {processRecordsInput.getRecords () .forEach (rekord -> System.get.strataln ().sor()))); } @Orride public void shutdown (ShutdownInput shutdownInput) {}}

A következő lépés az meghatározzon egy gyári osztályt, amely megvalósítja a IRecordProcessorFactory felület és visszatér egy korábban létrehozott IpProcessor tárgy:

public class IpProcessorFactory implementálja az IRecordProcessorFactory {@Orride public IRecordProcessor createProcessor () {return new IpProcessor (); }}

És most az utolsó lépés, használjuk a Munkás objektum a fogyasztói csővezeték meghatározásához. Szükségünk van egy KinesisClientLibConfiguration objektum, amely szükség esetén meghatározza az IAM hitelesítő adatait és az AWS régiót.

Át fogjuk adni a KinesisClientLibConfiguration, és a mi IpProcessorFactory tárgy, a mi Munkás majd külön szálban indítsa el. A nyilvántartások felhasználásának ezt a logikáját mindig életben tartjuk a Munkás osztályban, ezért most folyamatosan új lemezeket olvasunk:

BasicAWSCredentials awsCredentials = új BasicAWSCredentials (accessKey, secretKey); KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration (APP_NAME, IPS_STREAM, new AWSStaticCredentialsProvider (awsCredentials), IPS_WORKER) .withRegionName (Regions.EU_CENTRAL_1.getName); végső munkásmunkás = new Worker.Builder () .recordProcessorFactory (new IpProcessorFactory ()) .config (consumerConfig) .build (); CompletableFuture.runAsync (worker.run ());

4.4. Termelő

Most definiáljuk a KinesisProducerConfiguration objektum, hozzáadva az IAM hitelesítő adatokat és az AWS régiót:

BasicAWSCredentials awsCredentials = új BasicAWSCredentials (accessKey, secretKey); KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration () .setCredentialsProvider (new AWSStaticCredentialsProvider (awsCredentials)) .setVerifyCertificate (false) .setRegion (Regions.EU_CENTRAL_1.getName); this.kinesisProducer = új KinesisProducer (producerConfig);

Felvesszük a kinezisTermelő korábban létrehozott objektum a @Ütemezett folyamatosan dolgozzon és készítsen rekordokat a Kinesis adatfolyamunkhoz:

IntStream.range (1, 200) .mapToObj (ipSuffix -> ByteBuffer.wrap (("192.168.0." + IpSuffix) .getBytes ())) .forEach (bejegyzés -> kinesisProducer.addUserRecord (IPS_STREAM, IPS_PARTITION_ );

5. Tavaszi felhő-patak kötőanyag kinezis

Két könyvtárat láttunk már, mindkettőt a tavaszi ökoszisztémán kívül hozták létre. Hát most nézze meg, hogy a Tavaszi Felhő Patak Binder Kinesis hogyan egyszerűsítheti tovább életünket miközben a Tavaszi Felhő-patak tetejére épít.

5.1. Maven-függőség

A Maven-függőség, amelyet meg kell határoznunk a Spring Cloud Stream Binder Kinesis alkalmazásunkban:

 org.springframework.cloud spring-cloud-stream-binder-kinesis 1.2.1. KÖZLEMÉNY 

5.2. Tavaszi beállítás

EC2 rendszeren történő futtatáskor a szükséges AWS tulajdonságok automatikusan megtalálhatók, ezért nem szükséges meghatározni őket. Mivel példáinkat egy helyi gépen futtatjuk, meg kell határoznunk az IAM hozzáférési kulcsunkat, a titkos kulcsunkat és a régiót az AWS-fiókunkhoz. Letiltottuk az alkalmazás automatikus CloudFormation veremnév-felismerését is:

cloud.aws.credentials.access-key = my-aws-access-key cloud.aws.credentials.secret-key = my-aws-secret-key cloud.aws.region.static = eu-central-1 cloud.aws .stack.auto = hamis

A Spring Cloud Stream három interfésszel rendelkezik, amelyeket felhasználhatunk az adatfolyam-összerendeléshez:

  • A Mosogató adat befogadására szolgál
  • A Forrás a nyilvántartások kiadására szolgál
  • A Processzor mindkettő kombinációja

Ha szükséges, definiálhatjuk saját felületeinket is.

5.3. Fogyasztó

A fogyasztó meghatározása két részből álló munka. Először meghatározzuk a alkalmazás.tulajdonságok, az adatfolyam, amelyből felhasználjuk:

spring.cloud.stream.bindings.input.destination = live-ips spring.cloud.stream.bindings.input.group = live-ips-group spring.cloud.stream.bindings.input.content-type = text / plain

És ezután határozzunk meg egy tavaszt @Összetevő osztály. Az annotáció @EnableBinding (Sink.osztály) lehetővé teszi számunkra, hogy a Kinesis adatfolyamból olvassunk le a módszerrel, a -vel kiegészítve @StreamListener (Sink.INPUT):

@EnableBinding (Sink.class) public class IpConsumer {@StreamListener (Sink.INPUT) public void consume (String ip) {System.out.println (ip); }}

5.4. Termelő

A producer két részre is felosztható. Először meg kell határoznunk a folyam tulajdonságait belül alkalmazás.tulajdonságok:

spring.cloud.stream.bindings.output.destination = live-ips spring.cloud.stream.bindings.output.content-type = text / plain

És akkor hozzátesszük @EnableBinding (Source.class) egy tavaszon @Összetevő és hozzon létre új tesztüzeneteket néhány másodpercenként:

@Component @EnableBinding (Source.class) public class IpProducer {@Autowired private Source source; @Scheduled (fixedDelay = 3000L) private void produc () {IntStream.range (1, 200) .mapToObj (ipSuffix -> "192.168.0." + IpSuffix) .forEach (entry -> source.output (). Send ( MessageBuilder.withPayload (bejegyzés) .build ())); }}

Ennyi kell a Spring Cloud Stream Binder Kinesis működéséhez. Most egyszerűen elindíthatjuk az alkalmazást.

6. Következtetés

Ebben a cikkben azt láttuk, hogyan integrálhatjuk tavaszi projektünket két AWS-könyvtárral a Kinesis Data Stream használatához. Láttuk azt is, hogyan használhatjuk a Spring Cloud Stream Binder Kinesis könyvtárat a megvalósítás még egyszerűbbé tételéhez.

A cikk forráskódja a Github oldalon található.