在Java中,使用EMqtt库可以实现MQTT消息的订阅和过滤。以下是一个简单的示例,展示了如何使用EMqtt客户端订阅主题并根据主题过滤消息。
- 首先,确保已将EMqtt库添加到项目中。如果使用Maven,可以在pom.xml文件中添加以下依赖:
org.eclipse.paho org.eclipse.paho.client.mqttv3 1.2.5
- 创建一个EMqtt客户端实例,并连接到MQTT代理:
import org.eclipse.paho.client.mqttv3.*; public class EMqttClient { public static void main(String[] args) { String brokerUrl = "tcp://broker.emqx.io:1883"; String clientId = "JavaEMqttClient"; MqttClient client = new MqttClient(brokerUrl, clientId); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); connOpts.setAutomaticReconnect(true); try { client.connect(connOpts); } catch (MqttException e) { System.out.println("Failed to connect to MQTT broker."); e.printStackTrace(); return; } } }
- 订阅主题并根据主题过滤消息。在这个例子中,我们将订阅主题
test/topic
,并且只处理包含单词"hello"的消息:
import org.eclipse.paho.client.mqttv3.*; public class EMqttClient { // ... (省略连接到MQTT代理的代码) public static void main(String[] args) { // ... (省略连接到MQTT代理的代码) try { // 订阅主题 String topic = "test/topic"; client.subscribe(topic); // 处理接收到的消息 client.setCallback(new MqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { String payload = new String(message.getPayload()); if (payload.contains("hello")) { System.out.println("Received message: " + payload); } } @Override public void connectionLost(Throwable cause) throws Exception { System.out.println("Connection lost."); cause.printStackTrace(); } @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("Delivery complete."); } }); // 保持客户端运行,以便持续接收消息 Thread.sleep(10000); } catch (MqttException | InterruptedException e) { System.out.println("Error occurred."); e.printStackTrace(); } finally { try { client.disconnect(); } catch (MqttException e) { e.printStackTrace(); } } } }
在这个示例中,我们订阅了test/topic
主题,并在messageArrived
回调方法中检查消息负载是否包含单词"hello"。如果包含,则打印消息。这样,我们就实现了消息过滤功能。