Experiments with HiveMQ MQTT server and client
The main purpose of the blog posts is to persist some instructions I have written for myself. However, I'm happy if someone else finds these beneficial too.
DISCLAIMER. The content is provided as is. Absolutely no warranty of any kind.
– Petri Kannisto
Introduction
This blog post is about a brief experiment with HiveMQ that provides both an MQTT server and client libraries. The experiment showed that it's somewhat easy to start up.
We chose HiveMQ for this experiment because it's a promising candidate for a custom authorization scheme in MQTT, which is our current research objective. While there are multiple MQTT products available, there are differences in what these enable.
Regarding the alternatives, I already had familiarity with Mosquitto, but this has been implemented in C and is therefore not favourable for customization (as I consider C a language for microcontrollers rather than any higher-level development). Mosquitto has a plugin system, but an existing plugin wouldn't provide any help in novel approaches. Mosca seemed promising at first, as its NodeJs implementation is likely easy to customize. However, Mosca was last updated in 2018 (which is four years ago at the time of writing) and there is no support for the newest MQTT 5.0, which enables more advanced authentication or authorization schemes. In contrast, HiveMQ seems to provide a possibility to customize authorization with self-made handler functions (see https://www.hivemq.com/docs/hivemq/4.7/extensions/authorization.html). HiveMQ has been implemented in Java, which is platform-independent and a syntactically friendly language for new developers.
Although HiveMQ is a commercial product, there is an open-source community version, therefore you can start with no cost. Still, you can pay for professional support, but more importantly you can expect the product to continue its life into the future due to the customer base.
Environment
This experiment was made with Ubuntu 20.04.4 LTS (desktop) and Eclipse JEE 2021-12 (4.22.0). The installed Java platform was the "default-jdk" of Ubuntu, which implemented Java 11.
Server setup
You can get HiveMQ server (community edition) from GitHub. At the time of writing, the newest version is 2021.3: https://github.com/hivemq/hivemq-community-edition/releases/tag/2021.3
To run the server, you just navigate into the bin
folder and call:
sudo ./run.sh
Create your client
The message bus is useless without clients. In this section, you can find the code I used to create a client that can be either a message producer or consumer.
Project setup with Maven
I had no previous experience about Maven, which is a build automation tool. I decided to try this, which appeared not complex.
To install Maven in Ubuntu, you can just use apt
.
Next, create a Maven project in Eclipse. Upon creation, I encountered the previously unknown concept "archetype". I decided to use "maven-archetype-simple" that sounded good.
During the creation, I was also asked about "group name" and "artifact name".
I had no idea what these are, so I put "mygroup" and "myartifact".
The result was that these formed the package hierarchy for the source code,
i.e., mygroup.myartifact
. I am unsure if this still applies, but at
least in the past it was encouraged to start the package with a domain name.
E.g., the package could start with "org.kannisto".
Enable HiveMQ with Maven
Regarding HiveMQ, there are a few Maven details to consider. These are documented here: https://github.com/hivemq/hivemq-mqtt-client
In particular, you have to ensure the right compiler versions in
pom.xml
. In my project, these were 1.7 by default but should be 1.8:
<properties>
...
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
To add a dependency to HiveMQ, add the following dependency into pom.xml
:
<dependencies>
...
<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>1.3.0</version>
</dependency>
</dependencies>
Client code
I followed the instructions here https://github.com/hivemq/hivemq-mqtt-client. Still, I made the class structure on my own input.
The client code has the following files:
App.java
: the "main" programMyMqttClient
: an abstract base class for both publishing and subscribingMyMqttPublisher
: publisher classMyMqttSubscriber
: subscriber class
The idea is that to try HiveMQ, you would start both a publisher and subscriber to see the messages being communicated.
Please note: Although you would set up the host addresses, secure
communication and such in
a proper setup, the defaults seem to use localhost
with no security features
as the default. While insecure, this is OK for a quick experiment with no
machine-to-machine traffic. However, you have been warned!
Another note: This is a quick experiment to just see it running. Please look at HiveMQ manuals for best practices and to understand the APIs.
App.java
package mygroup.myartifact;
import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
* Generic HiveMQ client application.
* @author Petri Kannisto
*/
public class App
{
public static void main(String[] args)
{
BufferedReader reader = null;
try
{
reader = new BufferedReader(new InputStreamReader(System.in));
while (true)
{
System.out.println("Please select the functionality:");
System.out.println("- p for publisher");
System.out.println("- s for subscriber");
System.out.print(" > ");
String input = reader.readLine().trim().toLowerCase();
switch (input)
{
case "s":
new MyMqttSubscriber().run();
break;
case "p":
new MyMqttPublisher().run();
break;
default:
continue; // Ask again
}
break; // End loop
}
}
catch (Exception e)
{
System.err.println(e.toString());
}
finally
{
if (reader != null)
{
try
{
reader.close();
}
catch (Exception e)
{}
}
}
}
}
MyMqttClient.java
package mygroup.myartifact;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
/**
* Abstract base class for MQTT clients.
* @author Petri Kannisto
*/
public abstract class MyMqttClient
{
private final String m_topic = "mytopic"; // The topic is hard coded
private final String m_elaboration;
/**
* Constructor.
* @param elaboration The hint to be shown to the user about what to give as the input.
*/
protected MyMqttClient(String elaboration)
{
m_elaboration = elaboration;
}
protected String getTopic()
{
return m_topic;
}
/**
* Runs the client.
*/
public void run()
{
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
try
{
// Create client
Mqtt5Client client = Mqtt5Client.builder().build();
// Get the "blocking" client API
Mqtt5BlockingClient blockingClient = client.toBlocking();
write("Connecting... ");
blockingClient.connect();
writeLine("connected!");
try
{
// Repeat until the user wants to quit
while (true)
{
writeLine("Please give input, either:");
writeLine("- q to Quit");
writeLine("- " + m_elaboration);
write(" > ");
String userInput = reader.readLine().trim();
if (userInput.toLowerCase().equals("q"))
{
writeLine("Quitting.");
break;
}
// Do the client-specific thing
doYourThing(client, userInput);
}
}
finally
{
blockingClient.disconnect();
}
}
catch (Exception e)
{
System.err.println(e.toString());
}
finally
{
try
{
reader.close();
}
catch (Exception e)
{}
}
}
/**
* Implements the client-specific workflow.
* @param client MQTT client.
* @param userInput The user input received.
*/
protected abstract void doYourThing(Mqtt5Client client, String userInput);
/**
* Writes a line.
* @param msg Line to be written.
*/
protected void writeLine(String msg)
{
System.out.println(msg);
}
/**
* Writes a message with no line break at the end.
* @param msg Message to be written.
*/
protected void write(String msg)
{
System.out.print(msg);
}
}
MyMqttPublisher.java
package mygroup.myartifact;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
/**
* Publishes data to MQTT.
* @author Petri Kannisto
*/
public class MyMqttPublisher extends MyMqttClient
{
public MyMqttPublisher()
{
super("Message to publish");
}
@Override
protected void doYourThing(Mqtt5Client client, String userInput)
{
String topic = getTopic();
write("Now publishing to topic '" + topic + "'... ");
client.
toBlocking().
publishWith().
topic(topic).
payload(userInput.getBytes()).
send();
writeLine("published!");
}
}
MyMqttSubscriber.java
package mygroup.myartifact;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient.Mqtt5Publishes;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
/**
* Subscribes for data from MQTT.
* @author Petri Kannisto
*/
public class MyMqttSubscriber extends MyMqttClient
{
public MyMqttSubscriber()
{
super("ENTER to start listening to messages");
}
@Override
protected void doYourThing(Mqtt5Client client, String userInput)
{
Mqtt5BlockingClient blockingClient = client.toBlocking();
String topic = getTopic();
try (Mqtt5Publishes publishes = blockingClient.publishes(MqttGlobalPublishFilter.ALL))
{
blockingClient.
subscribeWith().
topicFilter(topic).
send();
writeLine("Subscribed to topic '" + topic + "'");
while (true)
{
try
{
Mqtt5Publish publishMessage = publishes.receive();
writeLine("Message received:");
writeLine(new String(publishMessage.getPayloadAsBytes()));
}
catch (InterruptedException e)
{
break; // Quit; likely never reached!
}
}
// In any actual application, the client should unsubscribe before exit.
// It's unclear if this happens in this application when CTRL+C is used to quit.
blockingClient.unsubscribeWith().topicFilter(topic).send();
writeLine("Unsubscribed from topic '" + topic + "'");
}
}
}