Reaktív rendszerek Java-ban

1. Bemutatkozás

Ebben az oktatóanyagban megismerjük a reaktív rendszerek létrehozásának alapjait a Java-ban Spring és más eszközök és keretrendszerek használatával.

Ennek során megbeszéljük, hogy a reaktív programozás hogyan ösztönzi a reaktív rendszer létrehozását. Ez segít megérteni a reaktív rendszerek és a különböző specifikációk, könyvtárak és szabványok létrehozásának logikáját, amelyeket ez az út során inspirált.

2. Mik azok a reaktív rendszerek?

Az elmúlt évtizedekben a technológiai környezet számos olyan megszakadást tapasztalt, amelyek a technológia értékének teljes átalakulásához vezettek. Az internet előtti számítástechnikai világ soha nem tudta volna elképzelni, milyen módon és módon változtatja meg napjainkat.

Az internet tömegekkel való elérhetősége és az általa ígérkező folyamatosan fejlődő tapasztalat révén az alkalmazásépítészeknek a lábukon kell állniuk, hogy megfeleljenek igényeiknek.

Alapvetően ez azt jelenti, hogy soha nem tudjuk megtervezni az alkalmazást a korábban megszokott módon. A a rendkívül érzékeny alkalmazás már nem luxus, hanem szükségszerűség.

Ez is véletlenszerű kudarcokkal és kiszámíthatatlan terheléssel áll szemben. Az óra szükségessége nem csak a helyes eredmény elérése, hanem a gyors megszerzés is! Nagyon fontos, hogy meghökkentő felhasználói élményeket biztosítsunk.

Ez szükségessé teszi egy építészeti stílus iránti igényt, amely reaktív rendszereket adhat nekünk.

2.1. Reaktív kiáltvány

Még 2013-ban, a fejlesztők csapata Jonas Boner vezetésével összeállt, hogy meghatározza az alapvető elveket a Reaktív Kiáltvány néven ismert dokumentumban. Ez alapozta meg az építészeti stílus alapját a reaktív rendszerek létrehozásában. Azóta ez a kiáltvány nagy érdeklődést váltott ki a fejlesztői közösség részéről.

Alapvetően ez a dokumentum előírja a reaktív rendszer receptje rugalmas, lazán összekapcsolt és méretezhető. Ez megkönnyíti az ilyen rendszerek fejlesztését, tolerálják a hibákat, és ami a legfontosabb, rendkívül reagál, a hihetetlen felhasználói tapasztalatok hátterét.

Tehát mi ez a titkos recept? Nos, ez alig titok! A kiáltvány meghatározza a reaktív rendszer alapvető jellemzőit vagy alapelveit:

  • Fogékony: A reaktív rendszernek gyors és következetes reakcióidőt kell biztosítania, és ezáltal a szolgáltatás minőségének állandóságát
  • Rugalmas: A reaktív rendszernek továbbra is reagálnia kell véletlen meghibásodások esetén replikáció és izolálás útján
  • Rugalmas: Egy ilyen rendszernek a költséghatékony méretezhetőség révén előre nem látható munkaterhelés esetén is reagálnia kell
  • Üzenet-vezérelt: A rendszerkomponensek közötti aszinkron üzenetátadásra kell támaszkodnia

Ezek az elvek egyszerűen és értelmesen hangzanak, de nem mindig könnyebben megvalósíthatók a komplex vállalati architektúrában. Ebben az oktatóanyagban ezeket az elveket szem előtt tartva dolgozzunk ki egy mintarendszert a Java-ban!

3. Mi a reaktív programozás?

Mielőtt folytatnánk, fontos megérteni a különbséget a reaktív programozás és a reaktív rendszerek között. Mindkét kifejezést elég gyakran használjuk, és könnyen félreértjük egymást. Mint korábban láthattuk, a reaktív rendszerek egy sajátos építészeti stílus eredménye.

Ellentétben, a reaktív programozás egy olyan programozási paradigma, ahol a hangsúly az aszinkron és nem blokkoló komponensek fejlesztésére irányul. A reaktív programozás lényege egy adatfolyam, amelyet megfigyelhetünk és reagálhatunk rá, sőt még ellennyomást is alkalmazhatunk. Ez nem blokkoló végrehajtáshoz és ezáltal jobb méretezhetőséghez vezet kevesebb végrehajtási szál mellett.

Ez nem azt jelenti, hogy a reaktív rendszerek és a reaktív programozás kizárják egymást. Valójában a reaktív programozás fontos lépés a reaktív rendszer megvalósítása felé, de ez még nem minden!

3.1. Reaktív folyamok

A reaktív folyamok egy olyan közösségi kezdeményezés, amely még 2013-ban kezdődött szabványt biztosítson a nem blokkoló ellennyomású aszinkron folyamfeldolgozáshoz. A cél itt egy olyan interfészek, módszerek és protokollok meghatározása volt, amelyek leírják a szükséges műveleteket és entitásokat.

Azóta számos megvalósítás jelent meg több programozási nyelven, amely megfelel a reaktív folyamok specifikációjának. Ezek közé tartozik az Akka Streams, a Ratpack és a Vert.x, hogy csak néhányat említsünk.

3.2. Reaktív könyvtárak Java számára

A reaktív folyamok mögött meghúzódó kezdeti célkitűzéseket végül hivatalos Java Java könyvtárként kellett beépíteni. Ennek eredményeként a reaktív folyamok specifikációja szemantikailag egyenértékű a Java Flow könyvtárral, amelyet a Java 9 vezet be.

Ezen kívül van néhány népszerű választás a reaktív programozás megvalósítására a Java-ban:

  • Reaktív kiterjesztések: A közismertebb nevén ReactiveX API-k biztosítják az aszinkron programozást megfigyelhető folyamokkal. Ezek több programozási nyelvhez és platformhoz érhetők el, beleértve a Java-t is, ahol RxJava néven ismerik
  • Projektreaktor: Ez egy másik reaktív könyvtár, a reaktív folyamok specifikációján alapuló alapfelvétel, amelynek célja a JVM nem alkalmazásainak felépítése. Ez történetesen a tavaszi ökoszisztéma reaktív kötegének alapja is

4. Egyszerű alkalmazás

Az oktatóanyag céljaira kifejlesztünk egy egyszerű alkalmazást, amely a mikroszolgáltatások architektúráján alapul, minimális kezelőfelülettel. Az alkalmazásarchitektúrának elegendő elemmel kell rendelkeznie egy reaktív rendszer létrehozásához.

Alkalmazásunkhoz végpontok közötti reaktív programozást és más mintákat és eszközöket fogadunk el a reaktív rendszer alapvető jellemzőinek megvalósításához.

4.1. Építészet

Kezdjük azzal, hogy meghatározzuk egy egyszerű alkalmazásarchitektúra, amely nem feltétlenül mutatja meg a reaktív rendszerek jellemzőit. Innentől kezdve elvégezzük a szükséges változtatásokat ezen jellemzők egyesével történő eléréséhez.

Tehát először is kezdjük egy egyszerű architektúra meghatározásával:

Ez egy meglehetősen egyszerű architektúra, amely egy csomó mikroszolgáltatással megkönnyíti a kereskedelmi felhasználási esetet, ahol megrendelhetünk. A felhasználói élmény előhívója is van, és minden kommunikáció REST-ként történik a HTTP-n keresztül. Ezenkívül minden mikroszolgáltatás egyedi adatbázisokban kezeli adatait, ezt a szolgáltatást adatbázisnak nevezik.

Folytatjuk, és létrehozzuk ezt az egyszerű alkalmazást a következő alfejezetekben. Ez lesz a miénk megérteni ennek az építészetnek a tévedéseit valamint az elvek és gyakorlatok elfogadásának módjai és eszközei annak érdekében, hogy ezt reaktív rendszerré alakítsuk át.

4.3. Készlet-mikroszolgáltatás

A készlet mikroszolgáltatása lesz a termékek és azok jelenlegi készletének kezeléséért felelős. Ez lehetővé teszi a készlet megváltoztatását is a megrendelések feldolgozása során. A szolgáltatás fejlesztéséhez a Spring Boot-ot fogjuk használni a MongoDB-vel.

Kezdjük azzal, hogy definiálunk egy vezérlőt néhány végpont felfedésére:

@GetMapping public list getAllProducts () {return productService.getProducts (); } @PostMapping public Order processOrder (@RequestBody Order order) {return productService.handleOrder (order); } @DeleteMapping public order revertOrder (@RequestBody Order order) {return productService.revertOrder (order); }

és egy szolgáltatás az üzleti logikánk befogadására:

@Transactional public Order handleOrder (Order order) {order.getLineItems () .forEach (l -> {Product> p = productRepository.findById (l.getProductId ()) .orElseThrow (() -> new RuntimeException ("Nem található a termék: "+ l.getProductId ())); if (p.getStock ()> = l.getQuantity ()) {p.setStock (p.getStock () - l.getQuantity ()); productRepository.save ( p);} else {dobjon új RuntimeException-t ("A termék nincs raktáron:" + l.getProductId ());}}); return order.setOrderStatus (OrderStatus.SUCCESS); } @Transactional public Order revertOrder (Order order) {order.getLineItems () .forEach (l -> {Product p = productRepository.findById (l.getProductId ()) .orElseThrow (() -> new RuntimeException ("Nem található a termék: "+ l.getProductId ())); p.setStock (p.getStock () + l.getQuantity ()); productRepository.save (p);}); return order.setOrderStatus (OrderStatus.SUCCESS); }

Vegye figyelembe, hogy mi vagyunk az egységek tranzakción belüli fenntartása, amely biztosítja, hogy kivételek esetén ne következzen be következetlen állapot.

Ezeken kívül meg kell határoznunk a tartomány entitásokat, a tárház felületét és egy csomó konfigurációs osztályt, amelyek szükségesek ahhoz, hogy minden megfelelően működjön.

De mivel ezek többnyire kazánlemezek, kerülni fogjuk őket, és ezekre a cikk utolsó szakaszában található GitHub adattárban lehet hivatkozni.

4.4. Szállítási Microservice

A szállítási mikroszolgáltatás sem lesz nagyon más. Ez lesz az felelős annak ellenőrzéséért, hogy létrejöhet-e szállítás a megrendeléshez és hozzon létre egyet, ha lehetséges.

Mint korábban, definiálunk egy vezérlőt a végpontjaink bemutatásához, valójában csak egyetlen végpontot:

@PostMapping nyilvános rendelési folyamat (@RequestBody rendelés) {return shippingService.handleOrder (rendelés); }

és egy szolgáltatás a megrendelés szállításával kapcsolatos üzleti logika befogadására:

public Order handleOrder (Order order) {LocalDate shippingDate = null; if (LocalTime.now (). isAfter (LocalTime.parse ("10:00")) && LocalTime.now (). isBefore (LocalTime.parse ("18:00"))) {shippingDate = LocalDate.now () .plusDays (1); } else {dobja be az új RuntimeException szolgáltatást ("Az aktuális idő meghaladja a megrendelés határértékeit."); } shipmentRepository.save (new Shipment () .setAddress (order.getShippingAddress ()) .setShippingDate (shippingDate)); return order.setShippingDate (shippingDate) .setOrderStatus (OrderStatus.SUCCESS); }

Egyszerű szállítási szolgáltatásunk csak a megrendelések leadásának érvényes időablakát ellenőrzi. Kerüljük a kazán többi kódjának megvitatását, mint korábban.

4.5. Rendelje meg a Microservice szolgáltatást

Végül meghatározunk egy rendelési mikroszolgáltatást, amelyik lesz felelős egy új rend létrehozásán kívül. Érdekes, hogy hangszeres szolgáltatásként is játszik majd, ahol kommunikálni fog a leltárszolgálattal és a megrendelés szállítási szolgáltatásával.

Határozzuk meg a vezérlőnket a szükséges végpontokkal:

@PostMapping public Order create (@RequestBody Order order) {Rendelés feldolgozottOrder = orderService.createOrder (rendelés); if (OrderStatus.FAILURE.equals (processingOrder.getOrderStatus ())) {dobjon új RuntimeException-t ("A megrendelés feldolgozása sikertelen volt, próbálkozzon újra később."); } return processOrder; } @GetMapping nyilvános lista getAll () {return orderService.getOrders (); }

És egy szolgáltatás a megrendelésekhez kapcsolódó üzleti logika befogadására:

public Order createOrder (Order order) {logikai siker = igaz; Megrendelés savedOrder = orderRepository.save (rendelés); Rendelés inventResponse = null; próbálkozzon az {inventResponse = restTemplate.postForObject (készletServiceUrl, rendelés, Rendelés.osztály); } fogás (Kivétel ex) {siker = hamis; } Order shippingResponse = null; próbáld ki a {shippingResponse = restTemplate.postForObject (shippingServiceUrl, order, Order.class); } fogás (Kivétel ex) {siker = hamis; HttpEntity deleteRequest = új HttpEntity (rendelés); ResponseEntity deleteResponse = restTemplate.exchange (készletServiceUrl, HttpMethod.DELETE, deleteRequest, Order.class); } if (siker) {savedOrder.setOrderStatus (OrderStatus.SUCCESS); savedOrder.setShippingDate (shippingResponse.getShippingDate ()); } else {savedOrder.setOrderStatus (OrderStatus.FAILURE); } return orderRepository.save (savedOrder); } public getOrders () {return orderRepository.findAll (); }

A megrendelések kezelése, amikor a készlet- és szállítási szolgáltatásokat hívjuk össze, korántsem ideális. Megosztott a több mikroszolgáltatással végzett tranzakciók önmagukban összetett téma, és túlmutatnak az oktatóanyag keretein.

Az oktatóanyag később azonban meglátja, hogy a reaktív rendszer hogyan tudja bizonyos mértékben elkerülni az elosztott tranzakciók szükségességét.

Mint korábban, a kazánlap többi kódját sem fogjuk átélni. Erre azonban hivatkozni lehet a GitHub repóban.

4.6. Front-end

Adjunk hozzá egy felhasználói felületet is, hogy a beszélgetés teljes legyen. A felhasználói felület Angular alapú lesz, és egyszerű, egyoldalas alkalmazás lesz.

Muszáj lesz hozzon létre egy egyszerű összetevőt az Angular-ban a létrehozási és letöltési parancsok kezeléséhez. Különösen fontos az a rész, ahol felhívjuk az API-t a megrendelés létrehozásához:

createOrder () {let fejlécek = new HttpHeaders ({'Content-Type': 'application / json'}); let options = {headers: headers} this.http.post ('// localhost: 8080 / api / order', this.form.value, options) .subscribe ((response) => {this.response = response}, (hiba) => {this.error = hiba})}

A fenti kódrészlet elvárja, hogy a megrendelés adatait olyan formában rögzítsék és elérhetővé tegyék a komponens hatókörén belül. Az Angular fantasztikus támogatást nyújt egyszerű és összetett űrlapok létrehozásához reaktív és sablon által vezérelt űrlapok használatával.

Fontos az a rész is, ahol korábban létrehozott megrendeléseket kapunk:

getOrders () {this.previousOrders = this.http.get ('' // localhost: 8080 / api / megrendelések '')}

Felhívjuk figyelmét, hogy az Angular HTTP modul az aszinkron jellegű, ezért RxJS-t ad vissza Megfigyelhetős. A reakciót véleményünk szerint úgy kezelhetjük, hogy egy aszinkron csövön keresztül vezetjük át őket:

Eddig leadott megrendelései:

  • Megrendelés azonosítója: {{order.id}}, Megrendelés állapota: {{order.orderStatus}}, Rendelési üzenet: {{order.responseMessage}}

Természetesen az Angular működéséhez sablonok, stílusok és konfigurációk szükségesek, de ezekre a GitHub adattárban lehet hivatkozni. Felhívjuk figyelmét, hogy itt mindent egyetlen komponensbe csomagoltunk, ideális esetben nem ezt kellene tennünk.

De ebben az oktatóanyagban ezek az aggodalmak nem terjednek ki.

4.7. Az alkalmazás telepítése

Most, hogy elkészítettük az alkalmazás összes egyes részét, hogyan kellene tovább haladnunk? Nos, ezt mindig megtehetjük manuálisan. De vigyáznunk kell arra, hogy ez hamarosan unalmassá válhat.

Ehhez az oktatóanyaghoz a Docker Compose alkalmazást használjuk építse fel és telepítse alkalmazásunkat egy Docker gépen. Ehhez meg kell adnunk egy szabványos Dockerfile-t minden szolgáltatáshoz, és létre kell hoznunk egy Docker Compose fájlt a teljes alkalmazáshoz.

Lássuk, hogyan dokkoló-összeállít.yml fájl kinézete:

verzió: '3' szolgáltatások: frontend: build: ./frontend portok: - "80:80" order-service: build: ./order-service portok: - "8080: 8080" készlet-szolgáltatás: build: ./inventory -szolgáltatási portok: - "8081: 8081" szállítási szolgáltatás: build: ./hajózási-szolgáltatási portok: - "8082: 8082"

Ez a Docker Compose szolgáltatások meglehetősen szokásos meghatározása, és nem igényel különösebb figyelmet.

4.8. Problémák ezzel az architektúrával

Most, hogy van egy egyszerű alkalmazásunk, több szolgáltatás kölcsönhatásban áll egymással, megvitathatjuk az architektúra problémáit. A következő szakaszokban megpróbáljuk megoldani, és végül eljutunk abba az állapotba, ahol alkalmazásunkat reaktív rendszerré alakítottuk volna!

Bár ez az alkalmazás messze nem egy gyártási szintű szoftver, és számos kérdés merül fel, mégis a reaktív rendszerek motivációjával kapcsolatos kérdésekre összpontosítson:

  • A készletszolgáltatás vagy a szállítási szolgáltatás kudarcának lépcsőzetes hatása lehet
  • A külső rendszerek és az adatbázis felé irányuló hívások mind blokkoló jellegűek
  • A központi telepítés nem képes automatikusan kezelni a hibákat és az ingadozó terheléseket

5. Reaktív programozás

Hívások blokkolása bármely programban gyakran kritikus erőforrásokat eredményeznek, amelyek csak arra várnak, hogy történjenek dolgok. Ide tartoznak az adatbázis-hívások, a webszolgáltatásokhoz intézett hívások és a fájlrendszeres hívások. Ha felszabadíthatjuk a végrehajtás szálait ebből a várakozásból, és biztosítunk egy mechanizmust, amellyel visszakerülhetünk, ha az eredmények rendelkezésre állnak, ez sokkal jobb erőforrás-felhasználást eredményez.

Ezt teszi számunkra a reaktív programozási paradigma elfogadása. Bár sok ilyen hívás esetén át lehet váltani egy reaktív könyvtárra, lehet, hogy ez nem mindenre lehetséges. Számunkra szerencsére a Spring sokkal könnyebbé teszi a reaktív programozás használatát a MongoDB és a REST API-kkal:

A Spring Data Mongo támogatja a reaktív hozzáférést a MongoDB Reactive Streams Java illesztőprogramon keresztül. Ez biztosítja ReactiveMongoTemplate és ReactiveMongoRepository, mindkettő kiterjedt térképészeti funkcióval rendelkezik.

A Spring WebFlux biztosítja a tavaszi reaktív verem webkeretet, amely lehetővé teszi a nem blokkoló kódot és a reaktív folyamok ellennyomását. Reaktív könyvtáraként használja a reaktort. Továbbá előírja Web Ügyfél HTTP kérések végrehajtásához reaktív folyamok ellennyomásával. A Reactor Netty-t használja HTTP kliens könyvtárként.

5.1. Készletszolgáltatás

Kezdjük azzal, hogy megváltoztatjuk a végpontokat, hogy reaktív kiadókat bocsássanak ki:

@GetMapping public Flux getAllProducts () {return productService.getProducts (); }
@PostMapping public Mono processOrder (@RequestBody Order order) {return productService.handleOrder (rendelés); } @DeleteMapping public Mono revertOrder (@RequestBody Order order) {return productService.revertOrder (order); }

Nyilvánvaló, hogy a szolgáltatáson is szükséges módosításokat kell végrehajtanunk:

@Transactional public Mono handleOrder (Order order) {return Flux.fromIterable (order.getLineItems ()) .flatMap (l -> productRepository.findById (l.getProductId ())) .flatMap (p -> {int q = order. getLineItems (). stream () .filter (l -> l.getProductId (). egyenlő (p.getId ())) .findAny (). get () .getQuantity (); if (p.getStock ()> = = q) {p.setStock (p.getStock () - q); return productRepository.save (p);} else {return Mono.error (new RuntimeException ("A termék nincs raktáron:" + p.getId ()) );}}) .majd (Mono.just (order.setOrderStatus ("SIKER"))); } @Transactional public Mono revertOrder (Order order) {return Flux.fromIterable (order.getLineItems ()) .flatMap (l -> productRepository.findById (l.getProductId ())) .flatMap (p -> {int q = order .getLineItems (). stream () .filter (l -> l.getProductId (). egyenlő (p.getId ())) .findAny (). get () .getQuantity (); p.setStock (p.getStock (p.getStock ( ) + q); return productRepository.save (p);}) .ma (Mono.just (order.setOrderStatus ("SIKER"))); }

5.2. Szállítási szolgáltatás

Hasonlóképpen megváltoztatjuk szállítási szolgáltatásunk végpontját:

@PostMapping nyilvános monofolyamat (@RequestBody rendelés) {return shippingService.handleOrder (rendelés); }

És a szolgáltatás változásai a reaktív programozás érdekében:

public Mono handleOrder (Order order) {return Mono.just (order) .flatMap (o -> {LocalDate shippingDate = null; if (LocalTime.now (). isAfter (LocalTime.parse ("10:00")) && LocalTime .now (). isBefore (LocalTime.parse ("18:00"))) {shippingDate = LocalDate.now (). plusDays (1);} else {return Mono.error (new RuntimeException ("Az aktuális idő ki van kapcsolva) a megrendelés leadásának korlátai. "));} return shipmentRepository.save (new Shipment () .setAddress (order.getShippingAddress ()) .setShippingDate (shippingDate));}) .map (s -> order.setShippingDate (s). getShippingDate ()) .setOrderStatus (OrderStatus.SUCCESS)); }

5.3. Rendelési szolgáltatás

Hasonló változtatásokat kell végrehajtanunk a megrendelés szolgáltatás végpontjaiban:

@PostMapping public Mono create (@RequestBody Order order) {return orderService.createOrder (order) .flatMap (o -> {if (OrderStatus.FAILURE.equals (o.getOrderStatus ())) {return Mono.error (new RuntimeException ( "A megrendelés feldolgozása sikertelen, próbálkozzon újra később." + O.getResponseMessage ()));} else {return Mono.just (o);}}); } @GetMapping public Flux getAll () {return orderService.getOrders (); }

A szolgáltatás változásai jobban érintettek lesznek, mivel ki kell használnunk a tavaszt Web Ügyfél a készlet és a szállítási reaktív végpontok meghívása:

public Mono createOrder (Order order) {return Mono.just (order) .flatMap (orderRepository :: save) .flatMap (o -> {return webClient.method (HttpMethod.POST) .uri (inventorServiceUrl) .body (BodyInserters.fromValue) (o)) .exchange ();}) .onErrorResume (err -> {return Mono.just (order.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (err.getMessage ()));}) .flatMap (o -> {if (! OrderStatus.FAILURE.equals (o.getOrderStatus ())) {return webClient.method (HttpMethod.POST) .uri (shippingServiceUrl) .body (BodyInserters.fromValue (o)) .exchange ();} else { return Mono.just (o);}}) .onErrorResume (err -> {return webClient.method (HttpMethod.POST) .uri (inventorServiceUrl) .body (BodyInserters.fromValue (order)) .retrieve () .bodyToMono (Order) .class) .map (o -> o.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (err.getMessage ()));}) .map (o -> {if (! OrderStatus.FAILURE.equals (o.getOrderStatus ( ))) {return order.setShippingDate (o.getShippingDate ()) .setOrderStatus (OrderStatus.SUCCESS);} else {return order.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (o.getResponseMessage ()); }}) .flatMap (orderRepository :: save); } public Flux getOrders () {return orderRepository.findAll (); }

Ez a reaktív API-kkal történő hangszerelés nem könnyű feladat, és gyakran hibára hajlamos, valamint nehéz hibakeresni. A következő szakaszban megnézzük, hogyan lehet ezt egyszerűsíteni.

5.4. Front-end

Most, hogy az API -ink képesek események közvetítésére, amint azok bekövetkeznek, teljesen természetes, hogy ezt képesnek kell lennünk felhasználni a kezelőfelületünkön is. Szerencsére az Angular támogatja az EventSource szolgáltatást, a kiszolgáló által küldött események kezelőfelülete.

Nézzük meg, hogyan tudjuk az összes korábbi megrendelésünket események folyamaként feldolgozni:

getOrderStream () {return Megfigyelhető.create ((megfigyelő) => {let eventSource = új EventSource ('// localhost: 8080 / api / megrendelések') eventSource.onmessage = (event) => {let json = JSON.parse ( event.data) this.orders.push (json) this._zone.run (() => {observer.next (this.orders)})} eventSource.onerror = (hiba) => {if (eventSource.readyState = == 0) {eventSource.close () this._zone.run (() => {observer.complete ()})}} else {this._zone.run (() => {observer.error ('EventSource hiba: '+ hiba)})}}})}}

6. Üzenetvezérelt építészet

Az első probléma, amellyel foglalkozni fogunk, a szolgáltatás-szolgáltatás közötti kommunikációhoz kapcsolódik. Épp most, ezek a kommunikáció szinkron, ami több problémát is felvet. Ezek közé tartozik a lépcsőzetes hibák, az összetett hangszerelés és az elosztott tranzakciók, hogy csak néhányat említsünk.

A probléma megoldásának nyilvánvaló módja az, hogy ezeket a kommunikációkat aszinkronokká tesszük. A üzenetközvetítő a szolgáltatás-szolgáltatás közötti kommunikáció megkönnyítése érdekében meg tudja csinálni a trükköt helyettünk. Üzenetközvetítőként a Kafkát, a Kafka tavaszát pedig üzenetek előállításához és felhasználásához fogjuk használni:

Egyetlen témát használunk különböző rendelési állapotú rendelési üzenetek előállításához és fogyasztásához, hogy a szolgáltatások reagáljanak.

Lássuk, hogyan kell megváltoztatni az egyes szolgáltatásokat.

6.1. Készletszolgáltatás

Kezdjük azzal, hogy meghatározzuk a készletszolgáltatás üzenettermelőjét:

@Autowired privát KafkaTemplate kafkaTemplate; public void sendMessage (Rendelési sorrend) {this.kafkaTemplate.send ("megrendelések", megrendelés); }

Ezután meg kell határoznunk egy üzenetfogyasztót a készletszolgáltatás számára, hogy reagáljon a téma különböző üzeneteire:

A @KafkaListener (topic = "megrendelések", groupId = "készlet") public void consume (Rendelési sorrend) dobja az IOException {if (OrderStatus.RESERVE_INVENTORY.equals (order.getOrderStatus ())) {productService.handleOrder (rendelés) .doOnSuccess ( o -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_SUCCESS));}) .doOnError (e -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_FAILURE); SetResponse) }).Iratkozz fel(); } else if (OrderStatus.REVERT_INVENTORY.equals (order.getOrderStatus ())) {productService.revertOrder (order) .doOnSuccess (o -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_REVERT_SU). e -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.INVENTORY_REVERT_FAILURE) .setResponseMessage (e.getMessage ()));}). előfizetés (); }}

Ez azt is jelenti, hogy most már biztonságosan eldobhatjuk a redundáns végpontokat a vezérlőnkből. Ezek a változások elegendőek ahhoz, hogy aszinkron kommunikációt érjünk el alkalmazásunkban.

6.2. Szállítási szolgáltatás

A szállítási szolgáltatás változásai viszonylag hasonlóak ahhoz, amit korábban a készletszolgáltatással tettünk. Az üzenet előállítója ugyanaz, és az üzenet fogyasztója a szállítási logikára jellemző:

A @KafkaListener (topic = "megrendelések", groupId = "szállítás") public void consume (Rendelési sorrend) IOException-t dob ​​{if (OrderStatus.PREPARE_SHIPPING.equals (order.getOrderStatus ())) {shippingService.handleOrder (rendelés) .doOnSuccess ( o -> {orderProducer.sendMessage (order.setOrderStatus (OrderStatus.SHIPPING_SUCCESS) .setShippingDate (o.getShippingDate ()));}) .doOnError (e -> {orderProducer.sendMessage (order.setOrderStatus (order.setOrderStatus). (e.getMessage ()));}). feliratkozás (); }}

Most már biztonságosan eldobhatjuk az összes végpontot a vezérlőnkben, mivel már nincs rájuk szükségünk.

6.3. Rendelési szolgáltatás

A megrendeléssel kapcsolatos változások egy kicsit jobban érintettek lesznek, mivel itt végeztünk minden hangszerelést korábban.

Ennek ellenére az üzenet előállítója változatlan marad, és az üzenetfogyasztó felveszi a megrendelés szolgáltatásspecifikus logikáját:

A @KafkaListener (topic = "megrendelések", groupId = "megrendelések") public void fogyaszt (rendelési sorrend) IOException-t dob ​​{if (OrderStatus.INITIATION_SUCCESS.equals (order.getOrderStatus ())) {orderRepository.findById (order.getId () ) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.RESERVE_INVENTORY)); return o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ()) (}) .fájl : mentés) .subscribe (); } else if ("KÉSZLET-SIKER" .egyenlő (order.getOrderStatus ())) {orderRepository.findById (order.getId ()) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.PREPARE_SHIPPING)) ; return o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository :: save) .subscribe (); } else if ("SHIPPING-FAILURE" .equals (order.getOrderStatus ())) {orderRepository.findById (order.getId ()) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.REVERT_INVENTORY)) ; return o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository :: save) .subscribe (); } else {orderRepository.findById (order.getId ()) .map (o -> {return o.setOrderStatus (order.getOrderStatus ()) .setResponseMessage (order.getResponseMessage ());}) .flatMap (orderRepository :: save ) .Iratkozz fel(); }}

A a fogyasztó itt csupán a különböző rendelési státuszú rendelési üzenetekre reagál. Ez adja a koreográfiát a különböző szolgáltatások között.

Végül a megrendelési szolgáltatásunknak is módosítania kell a koreográfia támogatásához:

public Mono createOrder (Order order) {return Mono.just (order) .flatMap (orderRepository :: save) .map (o -> {orderProducer.sendMessage (o.setOrderStatus (OrderStatus.INITIATION_SUCCESS)); return o;}). onErrorResume (err -> {return Mono.just (order.setOrderStatus (OrderStatus.FAILURE) .setResponseMessage (err.getMessage ()));}) .flatMap (orderRepository :: save); }

Vegye figyelembe, hogy ez sokkal egyszerűbb, mint az a szolgáltatás, amelyet reaktív végpontokkal kellett írnunk az utolsó szakaszban. Aszinkron a koreográfia gyakran sokkal egyszerűbb kódot eredményez, bár ez az esetleges következetesség, valamint a bonyolult hibakeresés és figyelés árával jár. Mint sejtjük, a kezelőfelületünk nem kapja meg azonnal a megrendelés végleges állapotát.

7. Konténeres hangszerelési szolgáltatás

A rejtvény utolsó darabja, amelyet meg akarunk oldani, a telepítéssel kapcsolatos.

Amit az alkalmazásban kívánunk, az a bőséges redundancia és a hajlandóság az automatikus szükséglet függvényében felfelé vagy lefelé történő skálázásra.

A szolgáltatások konténerezését már elértük a Docker révén, és a közöttük lévő függőségeket a Docker Compose segítségével kezeljük. Bár ezek önmagukban fantasztikus eszközök, nem segítenek abban, hogy elérjük azt, amit szeretnénk.

Ezért mi szükségük van egy konténeres hangszerelési szolgáltatásra, amely gondoskodhat az alkalmazásunk redundanciájáról és méretezhetőségéről. Bár több lehetőség van, az egyik népszerű közül a Kubernetes található. A Kubernetes egy felhő szállító-agnosztikus módszert kínál számunkra a konténeres munkaterhelések nagymértékben skálázható telepítésének eléréséhez.

A Kubernetes a Dockerhez hasonló konténereket a Pods-ba tekeri, amelyek a legkisebb egység. Ezenkívül használhatjuk a Telepítést a kívánt állapot deklaratív leírására.

A telepítéssel létrejön a ReplicaSets, amely belsőleg felelős a hüvelyek felneveléséért. Leírhatunk minimális számú azonos hüvelyt, amelyeknek bármikor működniük kell. Ez redundanciát és ezáltal magas rendelkezésre állást biztosít.

Nézzük meg, hogyan definiálhatunk Kubernetes telepítést az alkalmazásainkhoz:

apiVersion: apps / v1 fajta: Telepítési metaadatok: név: készlet-telepítési specifikáció: replikák: 3 választó: matchLabels: név: készlet-telepítési sablon: metaadatok: címkék: név: készlet-telepítési specifikáció: tárolók: - név: készletkép: inventor-service-async: legújabb portok: - containerPort: 8081 --- apiVersion: apps / v1 fajta: Telepítési metaadatok: név: szállítás-telepítés specifikáció: replikák: 3 választó: matchLabels: név: szállítás-telepítési sablon: metaadatok: címkék : név: szállítási-telepítési specifikáció: tárolók: - név: szállítási kép: shipping-service-async: legújabb portok: - containerPort: 8082 --- apiVersion: apps / v1 fajta: Telepítési metaadatok: név: order-deployment spec: replikák : 3 választó: matchLabels: név: rendelés-telepítési sablon: metaadatok: címkék: név: megrendelés-telepítés specifikációja: tárolók: - név: rendelési kép: order-service-async: legújabb portok: - containerPort: 8080

Itt deklaráljuk a telepítést, hogy bármikor fenntartsunk három azonos másolatot a hüvelyekről. Ez ugyan jó módszer a redundancia hozzáadására, de nem biztos, hogy változó terhelésekhez elegendő. A Kubernetes egy másik, a Horizontal Pod Autoscaler néven ismert erőforrást biztosít, amely képes a megfigyelt mutatók alapján skálázhatja a központi telepítések számát mint a CPU kihasználtsága.

Felhívjuk figyelmét, hogy a Kubernetes-fürtön tárolt alkalmazás méretezhetőségi szempontjaival foglalkozunk. Ez nem feltétlenül jelenti azt, hogy maga az alapul szolgáló fürt méretezhető. Magas rendelkezésre állású Kubernetes-fürt létrehozása nem triviális feladat, és meghaladja az oktatóanyag kereteit.

8. Eredményes reaktív rendszer

Most, hogy több fejlesztést hajtottunk végre architektúránkban, talán ideje ezt értékelni a reaktív rendszer definíciója alapján. Az értékelést a reaktív rendszerek négy jellemzőjével összevetjük, amelyet korábban az oktatóanyagban tárgyaltunk:

  • Fogékony: A reaktív programozási paradigma elfogadásának segítenünk kell a végpontok közötti blokkolatlanság elérésében, és ezáltal egy reagáló alkalmazásban
  • Rugalmas: A Kubernetes telepítése a ReplicaSet segítségével a kívánt számú hüvelynek rugalmasságot biztosít a véletlen hibák ellen
  • Rugalmas: A Kubernetes-fürtnek és erőforrásoknak meg kell adniuk a szükséges támogatást ahhoz, hogy rugalmasak legyünk a kiszámíthatatlan terhelésekkel szemben
  • Üzenet-vezérelt: Ha az összes szolgáltatás-szolgáltatás közötti kommunikációt aszinkron módon kezeljük a Kafka közvetítőn keresztül, akkor itt segíthetünk

Bár ez elég ígéretesnek tűnik, még korántsem ért véget. Hogy őszinte legyek, a az igazán reaktív rendszerre való törekvésnek folyamatos fejlesztéseknek kell lennie. Soha nem előzhetjük meg mindazt, ami kudarcot vallhat egy nagyon összetett infrastruktúrában, ahol az alkalmazásunk csak egy kis része.

A reaktív rendszer így fog megbízhatóságot igényelnek az egészet alkotó minden részről. A fizikai hálózattól kezdve az olyan infrastrukturális szolgáltatásokig, mint a DNS, mindnek összhangban kell lennie, hogy segítsen nekünk a végcél elérésében.

Gyakran előfordulhat, hogy nem tudjuk kezelni és biztosítani a szükséges garanciákat ezekre az alkatrészekre. És itt van a kezelt felhő infrastruktúra segít enyhíteni fájdalmainkat. Számos olyan szolgáltatás közül választhatunk, mint az IaaS (Infeastrure-as-a-Service), a BaaS (Backend-as-a-Service) és a PaaS (Platform-as-a-Service), hogy a felelősségeket külső felekre ruházzuk át. Ez a lehető legnagyobb mértékben ránk hagyja az alkalmazás felelősségét.

9. Következtetés

Ebben az oktatóanyagban áttekintettük a reaktív rendszerek alapjait, és hogyan hasonlítjuk össze azokat a reaktív programozással. Létrehoztunk egy egyszerű alkalmazást több mikroszolgáltatással, és kiemeltük azokat a problémákat, amelyeket egy reaktív rendszerrel kívánunk megoldani.

Továbbá folytattuk a reaktív programozás, az üzenetalapú architektúra és a tároló-hangszerelési szolgáltatás bevezetését az architektúrában egy reaktív rendszer megvalósítása érdekében.

Végül megvitattuk az eredményül kapott architektúrát és azt, hogy ez hogyan marad a reaktív rendszer felé vezető út! Ez az oktatóanyag nem vezet be minden olyan eszközt, keretet vagy mintát, amely segíthet a reaktív rendszer létrehozásában, de megismerteti az utat.

Szokás szerint a cikk forráskódja megtalálható a GitHubon.


$config[zx-auto] not found$config[zx-overlay] not found