Bevezetés a Tavaszi Felhő Patakba

1. Áttekintés

A Spring Cloud Stream egy olyan keret, amely a Spring Boot és a Spring Integration tetejére épül segít az eseményvezérelt vagy üzenetvezérelt mikroszolgáltatások létrehozásában.

Ebben a cikkben néhány egyszerű példával bemutatjuk a Spring Cloud Stream koncepcióit és felépítéseit.

2. Maven-függőségek

A kezdéshez hozzá kell adnunk a Spring Cloud Starter Streamet a RabbitMQ Maven bróker-függőséggel, mint üzenetküldő-köztes szoftvert pom.xml:

 org.springframework.cloud spring-cloud-starter-stream-rabbit 1.3.0.FELHASZNÁLÁS 

És hozzáadjuk a Maven Central modulfüggőségét a JUnit támogatás engedélyezéséhez is:

 org.springframework.cloud spring-cloud-stream-test-support 1.3.0.RELEASE teszt 

3. Fő fogalmak

A mikroszolgáltatások architektúrája az „intelligens végpontok és buta csövek” elvét követi. A végpontok közötti kommunikációt olyan üzenetküldő-köztes szoftveres felek vezérlik, mint a RabbitMQ vagy az Apache Kafka. A szolgáltatások tartományi események közzétételével ezeken a végpontokon vagy csatornákon keresztül kommunikálnak.

Járjuk végig a Spring Cloud Stream keretrendszerét alkotó fogalmakat, valamint azokat az alapvető paradigmákat, amelyekkel tisztában kell lennünk az üzenetközpontú szolgáltatások felépítésében.

3.1. Konstruál

Nézzük meg a Spring Cloud Stream egy egyszerű szolgáltatását, amely hallgat bemenet kötés és választ küld a Kimenet kötés:

@SpringBootApplication @EnableBinding (Processor.class) nyilvános osztály MyLoggerServiceApplication {public static void main (String [] args) {SpringApplication.run (MyLoggerServiceApplication.class, args); } @StreamListener (Processor.INPUT) @SendTo (Processor.OUTPUT) public LogMessage richLogMessage (LogMessage log) {return new LogMessage (String.format ("[1]:% s", log.getMessage ())); }}

Az annotáció @EnableBinding konfigurálja az alkalmazást a csatornák lekötésére BEMENET és KIMENET határozza meg az interfészen belül Processzor. Mindkét csatorna olyan kötés, amely konfigurálható konkrét üzenetküldő-köztes szoftver vagy kötőanyag használatára.

Vessünk egy pillantást ezeknek a fogalmaknak a meghatározására:

  • Kötések - olyan interfészek gyűjteménye, amelyek deklaratív módon azonosítják a bemeneti és kimeneti csatornákat
  • Kötőanyag - üzenetküldés-köztes szoftverek, például Kafka vagy RabbitMQ
  • Csatorna - az üzenetküldő-köztes szoftver és az alkalmazás közötti kommunikációs cső
  • StreamListeners - üzenetkezelési módszerek babokban, amelyek automatikusan meghívásra kerülnek a csatorna üzenetére a MessageConverter elvégzi a sorosítást / deszerializációt a köztes szoftverekre jellemző események és a tartományi objektumtípusok / POJO-k között
  • Meszsálya Sémák - üzenetek sorosítására és deserializálására használják, ezek a sémák statikusan olvashatók egy helyről vagy dinamikusan betölthetők, támogatva a tartományi objektumtípusok fejlődését

3.2. Kommunikációs minták

A rendeltetési helyekre kijelölt üzeneteket a Közzététel-feliratkozás üzenetküldő minta. A kiadók az üzeneteket témákba sorolják, mindegyiket egy-egy névvel azonosítva. Az előfizetők egy vagy több téma iránt érdeklődnek. A köztes szoftver kiszűri az üzeneteket, eljuttatva az érdekes témákat az előfizetőkhöz.

Most az előfizetőket csoportosítani lehetett. A fogyasztói csoport előfizetők vagy fogyasztók halmaza, amelyet a csoport azonosítója, amelyen belül egy téma vagy a téma partíciójának üzenetei terhelés-kiegyensúlyozott módon kerülnek átadásra.

4. Programozási modell

Ez a szakasz a Spring Cloud Stream alkalmazások készítésének alapjait ismerteti.

4.1. Funkcionális tesztelés

A teszt támogatás egy kötőanyag-megvalósítás, amely lehetővé teszi a csatornákkal való interakciót és az üzenetek ellenőrzését.

Küldjünk üzenetet a fentieknek richLogMessage szolgáltatást, és ellenőrizze, hogy a válasz tartalmazza-e a szöveget “[1]: “ az üzenet elején:

@RunWith (SpringJUnit4ClassRunner.class) @ContextConfiguration (class = MyLoggerServiceApplication.class) @DtiesContext nyilvános osztály MyLoggerApplicationTests {@Autowired privát processzor cső; @Autowired privát MessageCollector messageCollector; @Test public void whenSendMessage_thenResponseShouldUpdateText () {pipe.input () .send (MessageBuilder.withPayload (új LogMessage ("Ez az én üzenetem")) .build ()); Object payload = messageCollector.forChannel (pipe.output ()) .poll () .getPayload (); assertEquals ("[1]: Ez az én üzenetem", payload.toString ()); }}

4.2. Egyéni csatornák

A fenti példában a Processzor felületet a Spring Cloud biztosítja, amelynek csak egy bemenete és egy kimeneti csatornája van.

Ha valami másra van szükségünk, például egy bemenetre és két kimeneti csatornára, létrehozhatunk egy egyedi processzort:

nyilvános felület MyProcessor {String INPUT = "myInput"; @Input SubscribableChannel myInput (); @Output ("myOutput") MessageChannel anOutput (); @Output MessageChannel anotherOutput (); }

A tavasz biztosítja számunkra ennek a felületnek a megfelelő megvalósítását. A csatornanevek az alábbihoz hasonló kommentárokkal állíthatók be @Output („myOutput”).

Egyébként Spring a metódusneveket fogja használni csatornanevként. Ezért van három csatornánk myInput, myOutput, és anotherOutput.

Most képzeljük el, hogy az üzeneteket egy kimenetre akarjuk irányítani, ha az érték kisebb, mint 10, és egy másik kimenetre, ha az érték nagyobb vagy egyenlő, mint 10:

@Autowired privát MyProcessor processzor; @StreamListener (MyProcessor.INPUT) public void routeValues ​​(Integer val) {if (val <10) {processor.anOutput (). Send (message (val)); } else {processzor.anotherOutput (). send (üzenet (val)); }} private static final Message message (T val) {return MessageBuilder.withPayload (val) .build (); }

4.3. Feltételes feladás

Használni a @StreamListener annotációval, mi is tudunk szűrje azokat az üzeneteket, amelyekre a fogyasztóban számítunk bármely olyan feltétel használatával, amelyet SpEL kifejezésekkel definiálunk.

Példaként használhatnánk a feltételes diszpécsert másik megközelítésként az üzenetek különböző kimenetekbe történő irányításához:

@Autowired privát MyProcessor processzor; @StreamListener (target = MyProcessor.INPUT, condition = "payload = 10") public void routeValuesToAnotherOutput (Integer val) {processor.anotherOutput (). Send (message (val)); }

Az egyetlen Ennek a megközelítésnek az a korlátozása, hogy ezek a módszerek nem adhatnak vissza értéket.

5. Beállítás

Állítsuk be azt az alkalmazást, amely feldolgozza a RabbitMQ alkusz üzenetét.

5.1. Kötőanyag-konfiguráció

Konfigurálhatjuk alkalmazásunkat az alapértelmezett kötőanyag-megvalósítás használatára META-INF / rugó.kötők:

nyúl: \ org.springframework.cloud.stream.binder.rabbit.config.RabbitMessageChannelBinderConfiguration

Vagy hozzáadhatjuk a RabbitMQ iratgyűjtő könyvtárát az osztályútvonalhoz azáltal, hogy belefoglaljuk ez a függőség:

 org.springframework.cloud tavasz-felhő-patak-kötőanyag-nyúl 1.3.0. 

Ha nincs megadva kötőanyag-megvalósítás, akkor a Spring közvetlen üzenetkommunikációt fog használni a csatornák között.

5.2. RabbitMQ konfiguráció

A 3.1. Szakaszban található példa konfigurálásához a RabbitMQ iratgyűjtő használatához frissítenünk kell a alkalmazás.yml található src / main / resources:

tavasz: felhő: folyam: összerendelések: input: rendeltetési hely: queue.log.messages iratgyűjtő: local_rabbit output: destination: queue.pretty.log.messages binder: local_rabbit binders: local_rabbit: type: nyúl környezet: spring: rabbitmq: host: port : 5672 felhasználónév: jelszó: virtuális-gazdagép: /

A bemenet a kötés az úgynevezett cserét fogja használni queue.log.messages, és a Kimenet kötés fogja használni a cserét queue.pretty.log.messages. Mindkét kötés az úgynevezett kötőanyagot fogja használni helyi_nyúl.

Ne feledje, hogy nem kell előre létrehoznunk a RabbitMQ cseréket vagy sorokat. Az alkalmazás futtatásakor mindkét csere automatikusan létrejön.

Az alkalmazás teszteléséhez a RabbitMQ kezelő webhelyet használhatjuk üzenet közzétételére. Ban,-ben Üzenet közzététele a tőzsde panelje queue.log.messages, JSON formátumban kell megadnunk a kérést.

5.3. Az üzenetkonvertálás testreszabása

A Spring Cloud Stream lehetővé teszi számunkra, hogy üzenetkonvertálást alkalmazzunk bizonyos tartalomtípusokra. A fenti példában JSON formátum helyett egyszerű szöveget szeretnénk megadni.

Ehhez meg kell alkalmazzon egyedi átalakítást a következőre: LogMessage használva MessageConverter:

@SpringBootApplication @EnableBinding (Processor.class) nyilvános osztály MyLoggerServiceApplication {// ... @Bean public MessageConverter providesTextPlainMessageConverter () {return new TextPlainMessageConverter (); } // ...}
public class TextPlainMessageConverter kiterjeszti az AbstractMessageConverter {public TextPlainMessageConverter () {super (new MimeType ("text", "plain")); } @Orride védett logikai támogatások (Class clazz) {return (LogMessage.class == clazz); } @Orride protected Object convertFromInternal (Üzenet, Osztály targetClass, Object conversionHint) {Object payload = message.getPayload (); Karakterlánc szövege = hasznos karakterlánc String? (String) hasznos teher: új String ((byte []) hasznos teher); return new LogMessage (szöveg); }}

Miután végrehajtotta ezeket a változtatásokat, térjen vissza a Üzenet közzététele panel, ha beállítjuk a fejlécetcontentTypes" nak nek "szöveg / sima”És a hasznos teherHelló Világ“, Úgy kell működnie, mint korábban.

5.4. Fogyasztói csoportok

Alkalmazásunk több példányának futtatásakor valahányszor új üzenet érkezik egy bemeneti csatornán, minden előfizető értesítést kap.

Legtöbbször csak egyszer kell feldolgoznunk az üzenetet. A Spring Cloud Stream ezt a viselkedést a fogyasztói csoportokon keresztül valósítja meg.

Ennek a viselkedésnek az engedélyezéséhez minden fogyasztói kötés használhatja a tavasz.felhő.folyam.kötések..csoport tulajdonság a csoport nevének megadásához:

tavasz: felhő: folyam: összerendelések: input: cél: queue.log.messages iratgyűjtő: local_rabbit csoport: logMessageConsumers ...

6. Üzenet-vezérelt mikroszolgáltatások

Ebben a szakaszban bemutatjuk az összes szükséges funkciót a Spring Cloud Stream alkalmazások futtatásához mikroszolgáltatások környezetében.

6.1. Nagyítás

Ha több alkalmazás fut, fontos biztosítani, hogy az adatok megfelelően oszlanak meg a fogyasztók között. Ehhez a Spring Cloud Stream két tulajdonságot kínál:

  • tavasz.felhő.folyam.instanceCount - a futó alkalmazások száma
  • tavasz.felhő.folyam.instanceIndex - az aktuális alkalmazás indexe

Például, ha a fentiekből két példányt telepítettünk MyLoggerServiceApplication alkalmazás, az ingatlan tavasz.felhő.folyam.instanceCount 2-nek kell lennie mind az alkalmazások, mind a tulajdonság szempontjából tavasz.felhő.folyam.instanceIndex 0-nak és 1-nek kell lennie.

Ezeket a tulajdonságokat automatikusan beállítja, ha a Spring Cloud Stream alkalmazásokat a Spring Data Flow használatával telepítjük, a cikkben leírtak szerint.

6.2. Felosztás

A domain események lehetnek Particionált üzenetek. Ez segít, amikor vagyunk a tárolás növelése és az alkalmazások teljesítményének javítása.

A tartományi eseménynek általában van partíciós kulcsa, így ugyanazon a partíción végzi a kapcsolódó üzeneteket.

Tegyük fel, hogy azt akarjuk, hogy a naplóüzeneteket az üzenet első betűjével osszuk fel, amely a partíciós kulcs lenne, és két partícióra csoportosítsuk őket.

Egy partíció lenne a kezdő naplóüzenetek számára A-M és egy másik partíció N-Z. Ez két tulajdonság segítségével konfigurálható:

  • spring.cloud.stream.bindings.output.producer.partitionKeyExpression - a hasznos terhek felosztásának kifejezése
  • spring.cloud.stream.bindings.output.producer.partitionCount - a csoportok száma

Néha a partíció kifejezés túl bonyolult ahhoz, hogy csak egy sorba írja. Ezekben az esetekben megírhatjuk egyedi partíciós stratégiánkat a tulajdonság segítségével spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass.

6.3. Egészségügyi mutató

Mikroszolgáltatások keretében nekünk is meg kell észlelni, ha egy szolgáltatás leáll, vagy meghibásodni kezd. A Spring Cloud Stream biztosítja az ingatlant menedzsment.egészség.kötők.engedélyezett hogy lehetővé tegyék a kötőanyagok egészségügyi mutatóit.

Az alkalmazás futtatásakor lekérdezhetjük az egészségi állapotot a következő címen: //:/Egészség.

7. Következtetés

Ebben az oktatóanyagban bemutattuk a Spring Cloud Stream fő koncepcióit, és néhány egyszerű példán keresztül bemutattuk a RabbitMQ-n keresztüli egyszerű példákat. További információ a Spring Cloud Streamről itt található.

A cikk forráskódja megtalálható a GitHub oldalon.