Listen to MQTT
EXPLANATION: Outputs
You can learn more about Outputs in the Messages, Payloads and Transactions section.
IOTA node(s) provide a Message Queuing Telemetry Transport (MQTT) layer, if enabled, which is a lightweight publish-subscribe network protocol that provides information about events that are being triggered by the IOTA network.
The iota.rs
client library supports asynchronous event listeners that can be listened to, and continuously receive
MQTT events based on a topic
, which can be:
- milestones/latest
- milestones/confirmed
- messages
- messages/referenced
- messages/indexation/{index}
- messages/{messageId}/metadata
- transactions/{transactionId}/included-message
- outputs/{outputId}
- addresses/{address}/outputs
- addresses/ed25519/{address}/outputs
- Java
- Nodejs
- Python
- Rust
public static void mqtt() {
Client iota = node();
MqttListener listener = new MqttListener() {
@Override
public void onEvent(TopicEvent event) {
System.out.println(event);
}
};
// TODO: Make listeners with the Sync trait
// iota.subscriber().withTopic(Topic.from("messages")).subscribe(listener);
}
You can reach the listener using an instance of a
Client.TopicSubscriber
object that is returned from the
Client.subscriber()
function. It offers several
chaining calls:
.topic(str)
/.topics(str[])
: A topic or list of topics that should trigger a provided callback..subscribe(cb)
: Subscribes the listener to a callback function that is being triggered every time the given topic(s) is noticed..unsubscribe(cb)
: Unsubscribes the listener from the given topics. Once unsubscribed, the given callback function is executed in the form(err, message) => {}
.
async function run() {
const {
ClientBuilder
} = require('@iota/client');
// client connects to a node that has MQTT enabled
const client = new ClientBuilder()
.node('https://api.lb-1.h.chrysalis-devnet.iota.cafe')
.build();
client.subscriber().topics(['milestones/confirmed', 'messages']).subscribe((err, data) => {
console.log(data);
// To get the message id from messages `client.getMessageId(data.payload)` can be used
})
await new Promise(resolve => setTimeout(resolve, 1500));
// unsubscribe from 'messages' topic, will continue to receive events for 'milestones/confirmed'
client.subscriber().topics(['messages']).unsubscribe((err, data) => {
console.log(data);
})
}
run()
import iota_client
import json
import os
import queue
import time
# The node mqtt url
node_url = 'https://chrysalis-nodes.iota.org/'
# The queue to store received events
q = queue.Queue()
# The MQTT broker options
broker_options = {
'automatic_disconnect': True,
'timeout': 30,
'use_ws': True,
'port': 443,
'max_reconnection_attempts': 5,
}
client = iota_client.Client(
nodes_name_password=[[node_url]], mqtt_broker_options=broker_options)
# The queue to store received events
q = queue.Queue()
# The MQTT broker options
broker_options = {
'automatic_disconnect': True,
'timeout': 5,
'use_ws': True,
'port': 443,
'max_reconnection_attempts': 5,
}
def worker():
"""The worker to process the queued events.
"""
received_events = 0
while received_events < 10:
item = q.get(True)
event = json.loads(item)
print(f'Received Event: {event}')
if event['topic'] == "message":
message_id = client.get_message_id(str(event['payload']))
print(f'Received message_id: {message_id}')
received_events += 1
# unsubscribe from topic "messages", will continue to receive events for "milestones/latest"
if received_events == 7:
client.unsubscribe_topics(['messages'])
q.task_done()
def on_mqtt_event(event):
"""Put the received event to queue.
"""
q.put(event)
if __name__ == '__main__':
client.subscribe_topics(['messages', 'milestones/confirmed'], on_mqtt_event)
worker()
client.disconnect()
q.queue.clear()
// Copyright 2021 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0
//! cargo run --example 10_mqtt --features=mqtt --release
use iota_client::{bee_message::Message, Client, MqttEvent, Result, Topic};
use std::sync::{mpsc::channel, Arc, Mutex};
// Connecting to a MQTT broker using raw ip doesn't work with TCP. This is a limitation of rustls.
#[tokio::main]
async fn main() -> Result<()> {
// Create a client instance
let mut iota = Client::builder()
.with_node("https://api.lb-0.h.chrysalis-devnet.iota.cafe")?
.finish()
.await?;
let (tx, rx) = channel();
let tx = Arc::new(Mutex::new(tx));
let mut event_rx = iota.mqtt_event_receiver();
tokio::spawn(async move {
while event_rx.changed().await.is_ok() {
let event = event_rx.borrow();
if *event == MqttEvent::Disconnected {
println!("mqtt disconnected");
std::process::exit(1);
}
}
});
iota.subscriber()
.with_topics(vec![Topic::new("milestones/latest")?, Topic::new("messages")?])
.subscribe(move |event| {
match event.topic.as_str() {
"messages" => {
let message: Message = serde_json::from_str(&event.payload).unwrap();
println!("{event:?}");
println!("{message:?}");
}
_ => println!("{event:?}"),
}
tx.lock().unwrap().send(()).unwrap();
})
.await
.unwrap();
for i in 0..10 {
rx.recv().unwrap();
if i == 7 {
// unsubscribe from topic "messages", will continue to receive events for "milestones/latest"
iota.subscriber()
.with_topics(vec![Topic::new("messages")?])
.unsubscribe()
.await?;
}
}
iota.subscriber().disconnect().await?;
// alternatively
// iota.subscriber().unsubscribe().await?;
Ok(())
}