Apache RocketMQ tavaszi csomagtartóval

1. Bemutatkozás

Ebben az oktatóanyagban létrehozunk egy üzenetgyártót és -fogyasztót a Spring Boot és az Apache RocketMQ, egy nyílt forráskódú elosztott üzenetküldő és streaming adatplatform segítségével.

2. Függőségek

A Maven projektekhez hozzá kell adnunk a RocketMQ Spring Boot Starter függőségét:

 org.apache.rocketmq rocketmq-spring-boot-starter 2.0.4 

3. Üzenetek előállítása

Példaként létrehozunk egy alapüzenet-előállítót, amely eseményeket küld, amikor a felhasználó hozzáad vagy eltávolít egy elemet a bevásárlókosárból.

Először állítsuk be a szerver helyünket és a csoport nevét a saját könyvtárunkban alkalmazás.tulajdonságok:

rocketmq.name-server = 127.0.0.1: 9876 rocketmq.producer.group = cart-producer-group

Ne feledje, hogy ha több névszerverünk van, akkor tetszés szerint felsorolhatjuk őket host: port; host: port.

Most, hogy egyszerű legyünk, létrehozunk egy CommandLineRunner alkalmazást, és generáljon néhány eseményt az alkalmazás indításakor:

@SpringBootApplication public class CartEventProducer végrehajtja a CommandLineRunner {@Autowired private RocketMQTemplate rocketMQTemplate; public static void main (String [] args) {SpringApplication.run (CartEventProducer.class, args); } public void run (String ... args) dobja a {rocketMQTemplate.convertAndSend ("cart-item-add-topic", new CartItemEvent ("bike", 1)) kivételt; rocketMQTemplate.convertAndSend ("cart-item-add-topic", új CartItemEvent ("számítógép", 2)); rocketMQTemplate.convertAndSend ("cart-item-removed-topic", új CartItemEvent ("bike", 1)); }}

A CartItemEvent csak két tulajdonságból áll - az elem azonosítója és a mennyiség:

class CartItemEvent {private String itemId; privát int mennyiség; // kivitelező, mérőeszközök és beállítók}

A fenti példában a convertAndSend () módszer, a. által meghatározott általános módszer AbstractMessageSendingTemplate absztrakt osztály, kosáreseményeink elküldéséhez. Két paraméterre van szükség: egy cél, amely esetünkben egy téma neve, és egy üzenet hasznos terhe.

4. Üzenetfogyasztó

A RocketMQ üzenetek fogyasztása ugyanolyan egyszerű, mint a Spring jelölésű komponens létrehozása @RocketMQMessageListener és a RocketMQListener felület:

@SpringBootApplication public class CartEventConsumer {public static void main (String [] args) {SpringApplication.run (CartEventConsumer.class, args); } @Service @RocketMQMessageListener (topic = "cart-item-add-topic", consumerGroup = "cart-consumer_cart-item-add-topic") public class CardItemAddConsumer implementates RocketMQListener {public void onMessage (CartItemEvent addItemEvent) {log.info ( "Elem hozzáadása: {}", addItemEvent); // további logika}} @Service @RocketMQMessageListener (topic = "cart-item-removed-topic", consumerGroup = "cart-consumer_cart-item-removed-topic") public class CardItemRemoveConsumer implementates RocketMQListener {public void onMessage (CartItemEvent removeItemEvent) {log.info ("Elem eltávolítása: {}", removeItemEvent); // további logika}}}

Külön komponenst kell létrehoznunk minden üzenet témához, amelyet hallgatunk. Ezen hallgatók mindegyikében meghatározzuk a téma nevét és a fogyasztói csoport nevét a @RocketMQMessageListener annotáció.

5. Szinkron és aszinkron átvitel

Az előző példákban a convertAndSend módszer az üzeneteink elküldéséhez. Van néhány más lehetőségünk is.

Felhívhatnánk például syncSend ami különbözik attól convertAndSend mert visszatér SendResult tárgy.

Használható például annak ellenőrzésére, hogy az üzenetünket sikeresen elküldtük, vagy megkapjuk az azonosítóját:

public void run (String ... args) dobja a {SendResult addBikeResult = rocketMQTemplate.syncSend ("cart-item-add-topic", new CartItemEvent ("bike", 1)) kivételt; SendResult addComputerResult = rocketMQTemplate.syncSend ("cart-item-add-topic", új CartItemEvent ("számítógép", 2)); SendResult removeBikeResult = rocketMQTemplate.syncSend ("cart-item-removed-topic", új CartItemEvent ("bike", 1)); }

Mint convertAndSend, ezt a módszert csak akkor küldjük vissza, amikor a küldés befejeződik.

A nagy megbízhatóságot igénylő esetekben szinkron átvitelt kell használnunk, például fontos értesítési üzenetek vagy SMS-értesítések esetén.

Másrészről inkább azt szeretnénk, hogy aszinkron módon küldjük el az üzenetet, és értesítést kapjunk, amikor a küldés befejeződik.

Ezt megtehetjük asyncSend, amely a SendCallback paraméterként és azonnal visszatér:

rocketMQTemplate.asyncSend ("cart-item-add-topic", new CartItemEvent ("bike", 1), new SendCallback () {@Orride public void onSuccess (SendResult sendResult) {log.error ("Sikeresen elküldve a kosár elemét") ;} @Orride public void onException (Throwable dobható) {log.error ("Kivétel a kosár küldése közben", dobható);}});

Aszinkron átvitelt alkalmazzuk olyan esetekben, ahol nagy teljesítményre van szükség.

Végül használhatjuk azokat a forgatókönyveket, ahol nagyon nagy az átviteli követelmény sendOneWay ahelyett asyncSend. sendOneWay eltér a asyncSend annyiban, hogy nem garantálja az üzenet elküldését.

Az egyirányú továbbítás a szokásos megbízhatósági esetekhez is használható, például naplók gyűjtéséhez.

6. Üzenetek küldése tranzakcióban

A RocketMQ lehetőséget nyújt számunkra üzenetek küldésére egy tranzakción belül. Megtehetjük a sendInTransaction () módszer:

MessageBuilder.withPayload (new CartItemEvent ("bike", 1)). Build (); rocketMQTemplate.sendMessageInTransaction ("teszt-tranzakció", "téma-név", msg, null);

Továbbá meg kell valósítanunk a RocketMQLocalTransactionListener felület:

@RocketMQTransactionListener (txProducerGroup = "teszt-tranzakció") osztály TransactionListenerImpl implementálja a RocketMQLocalTransactionListener {@Override public RocketMQLocalTransactionState executeLocalTransaction (Message msg, Object arg) {// ... helyi tranzakció return, return ROL } @Orride public RocketMQLocalTransactionState checkLocalTransaction (Message msg) {// ... ellenőrizze a tranzakció állapotát és adja vissza a ROLLBACK, a COMMIT vagy az ismeretlen visszatérést RocketMQLocalTransactionState.COMMIT; }}

Ban ben sendMessageInTransaction (), az első paraméter a tranzakció neve. Meg kell egyeznie a @RocketMQTransactionListenerTagmezője txProducerGroup.

7. Üzenetgyártó konfigurálása

Konfigurálhatjuk az üzenet előállítójának szempontjait is:

  • rocketmq.producer.send-message-timeout: Az üzenet időkorlátja ezredmásodpercben - az alapértelmezett érték 3000
  • rocketmq.producer.compress-message-body -küszöb: Küszöbérték, amely felett a RocketMQ tömöríti az üzeneteket - az alapértelmezett érték 1024.
  • rocketmq.producer.max-message-size: Az üzenet maximális mérete bájtokban - az alapértelmezett érték 4096.
  • rocketmq.producer.retry-times-when-send-async-sikertelen: A belsőleg aszinkron módban végrehajtandó újrapróbálkozások maximális száma a hiba elküldése előtt - az alapértelmezett érték 2.
  • rocketmq.producer.retry-next-server: Jelzi, hogy meg kell-e próbálkozni egy másik közvetítővel a hiba belső elküldésével kapcsolatban - az alapértelmezett érték hamis.
  • rocketmq.producer.retry-times-when-send-sikertelen: A belsőleg aszinkron módban végrehajtandó újrapróbálkozások maximális száma a hiba elküldése előtt - az alapértelmezett érték 2.

8. Következtetés

Ebben a cikkben megtanultuk, hogyan kell üzeneteket küldeni és fogyasztani az Apache RocketMQ és a Spring Boot használatával. Mint mindig, minden forráskód elérhető a GitHubon.