Listen to MQTT
The following code example will:
- Create a
Client
which will connect to the Shimmer Testnet. - Subscribe to a set of
topics
. - Listen to the topic defined in step 2 and log the output.
Code Example
- Rust
- Nodejs
- Python
- Java
This example uses dotenv, which is not safe to use in production environments.
// Copyright 2021 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0
//! cargo run --example 07_mqtt --features=mqtt --release
use std::sync::{mpsc::channel, Arc, Mutex};
use iota_client::{
mqtt::{MqttEvent, MqttPayload, Topic},
Client, Result,
};
// Connecting to a MQTT broker using raw ip doesn't work with TCP. This is a limitation of rustls.
#[tokio::main]
async fn main() -> Result<()> {
// Create a client instance
let client = Client::builder()
.with_node("https://api.testnet.shimmer.network")?
// .with_mqtt_broker_options(BrokerOptions::new().use_ws(false))
.finish()?;
let (tx, rx) = channel();
let tx = Arc::new(Mutex::new(tx));
let mut event_rx = client.mqtt_event_receiver();
tokio::spawn(async move {
while event_rx.changed().await.is_ok() {
let event = event_rx.borrow();
if *event == MqttEvent::Disconnected {
println!("mqtt disconnected");
std::process::exit(1);
}
}
});
client
.subscribe(
vec![
Topic::try_from("milestone-info/latest".to_string())?,
Topic::try_from("blocks".to_string())?,
Topic::try_from(
"outputs/unlock/address/atoi1qzt0nhsf38nh6rs4p6zs5knqp6psgha9wsv74uajqgjmwc75ugupx3y7x0r"
.to_string(),
)?,
],
move |event| {
println!("Topic: {}", event.topic);
match &event.payload {
MqttPayload::Json(val) => println!("{}", serde_json::to_string(&val).unwrap()),
MqttPayload::Block(block) => println!("{block:?}"),
MqttPayload::MilestonePayload(ms) => println!("{ms:?}"),
MqttPayload::Receipt(receipt) => println!("{receipt:?}"),
}
tx.lock().unwrap().send(()).unwrap();
},
)
.await?;
for i in 0..10 {
rx.recv().unwrap();
if i == 7 {
// unsubscribe from topic "blocks", will continue to receive events for "milestones/latest"
client.unsubscribe(vec![Topic::try_from("blocks".to_string())?]).await?;
}
}
client.subscriber().disconnect().await?;
// alternatively
// client.subscriber().unsubscribe().await?;
Ok(())
}
Run the Example
Run the example by running the following command:
cargo run --example 07_mqtt --features=mqtt --release
This example uses dotenv, which is not safe to use in production environments.
// Copyright 2021-2022 IOTA Stiftung
// SPDX-License-Identifier: Apache-2.0
import { Client, initLogger } from '@iota/client';
require('dotenv').config({ path: '../.env' });
// Run with command:
// node ./dist/10_mqtt.js
// Initialize MQTT listener
async function run() {
initLogger();
if (!process.env.NODE_URL) {
throw new Error('.env NODE_URL is undefined, see .env.example');
}
// Connecting to a MQTT broker using raw ip doesn't work with TCP. This is a limitation of rustls.
const client = new Client({
nodes: [process.env.NODE_URL],
});
// Array of topics to subscribe to
// Topics can be found here https://studio.asyncapi.com/?url=https://raw.githubusercontent.com/iotaledger/tips/stardust-event-api/tips/TIP-0028/event-api.yml
const topics = ['blocks'];
const callback = function (error: Error, data: string) {
console.log(JSON.parse(data));
};
await client.listen(topics, callback);
// Clear listener after 10 seconds
setTimeout(async () => {
await client.clearListeners(['blocks']);
console.log('Listener cleared');
// Exit the process
setTimeout(async () => process.exit(0), 2000);
}, 10000);
}
run();
You can run the example by running the following command from the bindings/node/examples/
folder:
node dist/10_mqtt.js
This how to guide is not available in your language of choice at the moment. Please feel free to browse more examples which may suit your requirements.
This how to guide is not available in your language of choice at the moment. Please feel free to browse more examples which may suit your requirements.
Expected Output
- Rust
- Nodejs
- Python
- Java
Topic: blocksBlock{
protocol_version: 2,
parents: Parents(BoxedSlicePrefix([
BlockId(0x2c6217376977980929b5bd9f4fd33ee06a13dc5ed41a53ffa0d20eae77a0d3d9),
BlockId(0x3971dc26622ffef0d1cf5b6d2cbfa0732014dd3271d675158bf854100087475c),
BlockId(0x5bb64f36721d02b5ef314fd0a012ef1adf779592474dac88dd24d82550bf5242),
BlockId(0xc95cfe61e184ef1885a29f7832019840d999fa4baf452f6dbb127e2dc4c1d20f)
])),
payload: OptionalPayload(Some(Transaction(TransactionPayload{
essence: Regular(RegularTransactionEssence{
network_id: 1856588631910923207,
inputs: BoxedSlicePrefix([
Utxo(UtxoInput(0x8e7b59a567f0f439ef7d3b01de094ce2f81aa8e68e09f26705dfd1d39ddf471b0000)),
Utxo(UtxoInput(0x60e8afa096f3705b6f1a694f5496d51721ea4b4e9876b219dfecdf99586949fd0100)),
Utxo(UtxoInput(0xf12c189790488ec61ce8c4d9d28ebbb7e82d322b25ec293e40fd4ef2dfe15e0d0000)),
Utxo(UtxoInput(0xd14bd0777e8ea7266a2af955126dde56ed692d9ad6b19508cfd614d4b3612a950000)),
Utxo(UtxoInput(0xf31d1bf2c93ff0578d72951471c0fda2be4efd4cfbb7a2bd03e04d64362ef6401b00)),
Utxo(UtxoInput(0xf34edd2df152ef473679ad468d6f942fadeca7004499576bba9a1a17927da3af7700))
]),
inputs_commitment: InputsCommitment(0x43448b345d2c0bc38ea6e4abd79726852a2880528a753a986deb13bf3f93af5b),
outputs: BoxedSlicePrefix([
Nft(NftOutput{
amount: BoundedU64(51000),
native_tokens: NativeTokens(BoxedSlicePrefix([
])),
nft_id: NftId(0x0000000000000000000000000000000000000000000000000000000000000000),
unlock_conditions: UnlockConditions(BoxedSlicePrefix([
Address(AddressUnlockCondition(Ed25519(Ed25519Address(0x56b1638c1bb2566e2150d4667f78a59ee0d9b9a6c1e6af4499c3d55c8fd77e01))))
])),
features: Features(BoxedSlicePrefix([
])),
immutable_features: Features(BoxedSlicePrefix([
Issuer(IssuerFeature(Ed25519(Ed25519Address(0x56b1638c1bb2566e2150d4667f78a59ee0d9b9a6c1e6af4499c3d55c8fd77e01)))),
Metadata(MetadataFeature(0x736f6d652d697066732d6c696e6b))
]))
}),
Basic(BasicOutput{
amount: BoundedU64(1009003400),
native_tokens: NativeTokens(BoxedSlicePrefix([
])),
unlock_conditions: UnlockConditions(BoxedSlicePrefix([
Address(AddressUnlockCondition(Ed25519(Ed25519Address(0x56b1638c1bb2566e2150d4667f78a59ee0d9b9a6c1e6af4499c3d55c8fd77e01))))
])),
features: Features(BoxedSlicePrefix([
]))
})
]),
payload: OptionalPayload(Some(TaggedData(TaggedDataPayload{
tag: "0x484f524e4554205370616d6d65722053656d692d4c617a79",
data: "0x57652061726520616c6c206d616465206f662073746172647573742e0a436f756e743a203032353636380a54696d657374616d703a20323032322d30392d30325431343a30353a33355a0a54697073656c656374696f6e3a20313836c2b573"
})))
}),
unlocks: Unlocks(BoxedSlicePrefix([
Signature(SignatureUnlock(Ed25519(Ed25519Signature{
public_key: 0x1eea893bc9fc06787b7e5d273cdb02e02f9a5cce9eb4e1dc513eec14e8d95047,
signature: 0xbaab2fd45c038a0661f896184aecd249c8169b4b08dda8965d7d99e3778828cbe69c95a6893115d83705887d20f85eef4d3f9d2d39e89d009649da66fdfffa0a
}))),
Reference(ReferenceUnlock(BoundedU16(0))),
Reference(ReferenceUnlock(BoundedU16(0))),
Reference(ReferenceUnlock(BoundedU16(0))),
Reference(ReferenceUnlock(BoundedU16(0))),
Reference(ReferenceUnlock(BoundedU16(0)))
]))
}))),
nonce: 1785168781326745297
}
Topic: milestone-info/latest{
"index": 798690,
"milestoneId": "0xc405fa4839b32d78b1a7f27e297d07842aa83a8ced1e5b556e446317c7348be6",
"timestamp": 1662127539
}
[...]
{
topic: 'blocks',
payload: {
"protocolVersion": 2,
"parents": [
"0x7298db112c825c949ae399b26fa01e6147e916c1dcaff2a2fc487f3beca37f3e",
"0x7d052b5938882ed514a8419f0daaa567e029fd26bb8384a782fadc73bf4197e1",
"0xb732a27feb1817bee363efdeb88fa4d3c7ff3ebf44ff72f2e3decc6bf26bfc6e",
"0xc24af5a4ebdbd4e5be7b0858ca5d3b3333d482c336cd4df3a69a487df31039c6"
],
"payload": {
"type": 5,
"tag": "0x484f524e4554205370616d6d6572",
"data": "0x57652061726520616c6c206d616465206f662073746172647573742e0a436f756e743a20353339323036340a54696d657374616d703a20323032322d30392d30325431333a34383a31375a0a54697073656c656374696f6e3a20393131c2b573"
},
"nonce": "187545"
}
},
[
...
]
This how to guide is not available in your language of choice at the moment. Please feel free to browse more examples which may suit your requirements.
This how to guide is not available in your language of choice at the moment. Please feel free to browse more examples which may suit your requirements.