MQTT kliens Java-ban

1. Áttekintés

Ebben az oktatóanyagban megtudhatjuk, hogyan adhatunk hozzá MQTT üzenetküldést egy Java projektbe az Eclipse Paho projekt által biztosított könyvtárak segítségével.

2. MQTT alapozó

Az MQTT (MQ Telemetry Transport) egy üzenetküldési protokoll az egyszerű és könnyű módszer igényének kielégítésére hozták létre az adatok alacsony energiaigényű, például ipari alkalmazásokban használt készülékekről történő továbbítását.

Az IoT (tárgyak internete) eszközeinek növekvő népszerűségével az MQTT egyre növekvő felhasználást tapasztalt, ami az OASIS és az ISO által szabványosítottá vált.

A protokoll egyetlen üzenetküldési mintát támogat, nevezetesen a Közzététel-Feliratkozás mintát: az ügyfél által küldött minden üzenethez társított „téma” tartozik, amelyet a bróker használ az előfizetős ügyfelekhez történő továbbításhoz. A témák neve egyszerű karakterlánc lehet, például:oiltempVagy útvonal-szerű karakterláncmotor / 1 / rpm“.

Üzenetek fogadása érdekében az ügyfél feliratkozik egy vagy több témára a pontos nevével vagy a támogatott helyettesítő karakterek egyikét tartalmazó karakterlánccal („#” többszintű témákhoz és „+” egyszintűekhez).

3. Projekt beállítása

A Paho könyvtár Maven projektbe történő felvételéhez hozzá kell adnunk a következő függőséget:

 org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.0 

Az Eclipse Paho Java könyvtár modul legújabb verziója letölthető a Maven Central oldalról.

4. Az ügyfél beállítása

A Paho könyvtár használatakor az első dolog, amit meg kell tennünk annak érdekében, hogy üzeneteket küldjünk és / vagy fogadjunk egy MQTT brókertől, megvalósításának megszerzése IMqttClient felület. Ez a felület tartalmazza az alkalmazás által megkövetelt összes módszert a kiszolgálóval való kapcsolat létrehozásához, az üzenetek küldéséhez és fogadásához.

A Paho ennek a felületnek két megvalósításával jön ki a dobozból, egy aszinkron (MqttAsyncClient) és egy szinkron (MqttClient).Esetünkben a szinkron verzióra koncentrálunk, amelynek egyszerűbb a szemantikája.

Maga a beállítás két lépésből áll: először létrehozunk egy példányt a MqttClient osztályt, majd csatlakoztatjuk a szerverünkhöz. A következő alfejezet részletezi ezeket a lépéseket.

4.1. Új létrehozása IMqttClient Példa

A következő kódrészlet bemutatja, hogyan hozhat létre újat IMqttClient szinkron példány:

String publisherId = UUID.randomUUID (). ToString (); IMqttClient publisher = új MqttClient ("tcp: //iot.eclipse.org: 1883", publisherId);

Ebben az esetben, a rendelkezésre álló legegyszerűbb konstruktort használjuk, amely megkapja az MQTT közvetítőnk végpont címét és egy ügyfél azonosítót, amely egyedi módon azonosítja ügyfelünket.

Esetünkben véletlenszerű UUID-t használtunk, így minden futtatáskor új kliens azonosító jön létre.

A Paho további konstruktorokat is kínál, amelyeket felhasználhatunk a nem jóváhagyott üzenetek és / vagy a ScheduledExecutorService a protokollmotor implementációja által megkövetelt háttérfeladatok futtatására használják.

Az általunk használt kiszolgáló-végpont egy nyilvános MQTT-közvetítő, amelyet a Paho projekt üzemeltet, amely lehetővé teszi, hogy bárki, aki rendelkezik internetkapcsolattal, hitelesítés nélkül tesztelhesse az ügyfeleket.

4.2. Csatlakozás a szerverhez

Újonnan létrehozott MqttClient a példány nincs csatlakoztatva a szerverhez. Ezt úgy tesszük, hogy felhívjuk connect () módszer, opcionálisan elhaladva a MqttConnectOptions példány, amely lehetővé teszi számunkra a protokoll egyes aspektusainak testreszabását.

Ezeket a lehetőségeket különösen olyan információk továbbítására használhatjuk, mint a biztonsági adatok, a munkamenet-helyreállítási mód, az újracsatlakozási mód és így tovább.

A MqttConnectionOptions osztály kiteszi ezeket az opciókat egyszerű tulajdonságokként, amelyeket normál szetter módszerekkel állíthatunk be. Csak a forgatókönyvünkhöz szükséges tulajdonságokat kell beállítanunk - a többi alapértelmezett értéket vesz fel.

A szerverrel való kapcsolat létrehozásához használt kód általában így néz ki:

MqttConnectOptions options = új MqttConnectOptions (); options.setAutomaticReconnect (true); options.setCleanSession (true); options.setConnectionTimeout (10); kiadó.csatlakozás (opciók);

Itt meghatározzuk a csatlakozási lehetőségeket, hogy:

  • Hálózati hiba esetén a könyvtár automatikusan megpróbálja újra csatlakozni a kiszolgálóhoz
  • Elveti az előző futtatás elküldött üzeneteit
  • A kapcsolat időtúllépése 10 másodpercre van állítva

5. Üzenetek küldése

Üzenetek küldése egy már csatlakoztatott eszközzel MqttClient nagyon egyértelmű. Az egyiket használjuk közzétenni () metódusváltozatok a hasznos terhelés elküldéséhez, amely mindig bájt tömb, egy adott témára, a következő szolgáltatásminőségi lehetőségek egyikét használva:

  • 0 - „egyszerre” szemantika, más néven „tűz-felejts”. Akkor használja ezt a beállítást, ha az üzenetvesztés elfogadható, mivel nem igényel semmiféle nyugtázást vagy kitartást
  • 1 - „legalább egyszer” szemantika. Akkor használja ezt a beállítást, ha az üzenet elvesztése nem elfogadható és előfizetői képesek kezelni a duplikátumokat
  • 2 - „pontosan egyszer” szemantika. Akkor használja ezt a beállítást, ha az üzenet elvesztése nem elfogadható és előfizetőid nem tudják kezelni az ismétléseket

Mintaprojektünkben a MotorTemperatureSensor osztály egy olyan szenzor szerepét játssza, amely minden alkalommal új hőmérsékleti értéket produkál, amikor arra hivatkozunk hívás() módszer.

Ez az osztály hajtja végre a Hívható felületet, így könnyedén használhatjuk az egyik ExecutorService a java.util.egyidejű csomag:

public class EngineTemperatureSensor megvalósítja a Callable {// ... magántagokat kihagyta a nyilvános EngineTemperatureSensor (IMqttClient kliens) {this.client = kliens; } @Orride public Void call () dobja a Kivételt {if (! Client.isConnected ()) {return null; } MqttMessage msg = readEngineTemp (); msg.setQos (0); msg.setRetained (true); client.publish (TÉMA, üzenet); return null; } privát MqttMessage readEngineTemp () {double temp = 80 + rnd.nextDouble () * 20.0; bájt [] hasznos teher = Karakterlánc.formátum ("T:% 04.2f", temp) .getBytes (); return new MqttMessage (hasznos teher); }}

A MqttMessage magában foglalja a hasznos terhelést, a kért szolgáltatásminőséget és a megtartva zászló az üzenethez. Ez a jelző jelzi a brókernek, hogy meg kell őriznie ezt az üzenetet, amíg az előfizető el nem fogyasztja.

Használhatjuk ezt a funkciót egy „utolsó ismert jó” viselkedés megvalósítására, így amikor egy új előfizető csatlakozik a szerverhez, azonnal megkapja a megtartott üzenetet.

6. Üzenetek fogadása

Az üzenetek fogadásához az MQTT alkustól, használnunk kell az egyik Iratkozz fel() módszerváltozatok, amelyek lehetővé teszik számunkra, hogy meghatározzuk:

  • Egy vagy több témaszűrő azokhoz az üzenetekhez, amelyeket meg akarunk fogadni
  • A kapcsolódó QoS
  • A visszahívás kezelő a beérkezett üzenetek feldolgozásához

A következő példában bemutatjuk, hogyan lehet üzenetfigyelőt hozzáadni egy meglévőhöz IMqttClient példány üzeneteket fogadni egy adott témáról. Használjuk a CountDownLatch szinkronizálási mechanizmusként a visszahívás és a fő végrehajtási szál között, csökkentve minden egyes új üzenet érkezésekor.

A mintakódban mást használtunk IMqttClient például üzeneteket fogadni. Csak azért tettük, hogy világosabbá tegyük, melyik ügyfél mit csinál, de ez nem Paho korlátozás - ha szeretné, ugyanazt az ügyfelet használhatja üzenetek közzétételéhez és fogadásához:

CountDownLatch kapottSignal = új CountDownLatch (10); subscriber.subscribe (EngineTemperatureSensor.TOPIC, (topic, msg) -> {byte [] payload = msg.getPayload (); // ... hasznos terhelés kezelése kihagyva kapottSignal.countDown ();}); kapottSignal.await (1, TimeUnit.MINUTES);

A Iratkozz fel() a fent használt változat egy IMqttMessageListener második érvként.

Esetünkben egy egyszerű lambda függvényt használunk, amely feldolgozza a hasznos terhet és csökkenti a számlálót. Ha nem érkezik elegendő üzenet a megadott időablakba (1 perc), a várják() A módszer kivételt vet.

A Paho használatakor nem kell kifejezetten visszaigazolnunk az üzenet átvételét. Ha a visszahívás rendesen visszatér, a Paho feltételezi, hogy sikeres a fogyasztása, és nyugtázást küld a szervernek.

Ha a visszahívás egy Kivétel, az ügyfél leáll. Felhívjuk figyelmét, hogy ez a QoS 0 szinttel küldött üzenetek elvesztését vonja maga után.

Az 1. vagy 2. szintű QoS-sel küldött üzeneteket a kiszolgáló újra elküldi, amint az ügyfél újra csatlakozik és újra feliratkozik a témára.

7. Következtetés

Ebben a cikkben bemutattuk, hogyan adhatunk támogatást az MQTT protokollhoz Java-alkalmazásainkban az Eclipse Paho projekt által biztosított könyvtár segítségével.

Ez a könyvtár kezeli az összes alacsony szintű protokoll-részletet, lehetővé téve számunkra, hogy a megoldás más aspektusaira összpontosítsunk, miközben jó teret hagyunk belső jellemzőinek fontos szempontjainak, például az üzenetmegmaradás testreszabására.

A cikkben látható kód elérhető a GitHubon.