博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
springboot集成mqtt
阅读量:7258 次
发布时间:2019-06-29

本文共 2636 字,大约阅读时间需要 8 分钟。

MQTT(Message Queuing Telemetry Transport)是基于二进制消息的发布/订阅编程模式的消息协议,非常适合需要低功耗和网络带宽有限的IoT场景。这里简单介绍一下如何在springboot中集成。

maven

org.springframework.boot
spring-boot-starter-integration
org.springframework.integration
spring-integration-stream
org.springframework.integration
spring-integration-mqtt

配置client factory

@Bean    public MqttPahoClientFactory mqttClientFactory() {        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();        factory.setServerURIs("tcp://demo:1883");//        factory.setUserName("guest");//        factory.setPassword("guest");        return factory;    }

配置consumer

@Bean    public IntegrationFlow mqttInFlow() {        return IntegrationFlows.from(mqttInbound())                .transform(p -> p + ", received from MQTT")                .handle(logger())                .get();    }    private LoggingHandler logger() {        LoggingHandler loggingHandler = new LoggingHandler("INFO");        loggingHandler.setLoggerName("siSample");        return loggingHandler;    }    @Bean    public MessageProducerSupport mqttInbound() {        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",                mqttClientFactory(), "siSampleTopic");        adapter.setCompletionTimeout(5000);        adapter.setConverter(new DefaultPahoMessageConverter());        adapter.setQos(1);        return adapter;    }

配置producer

@Bean    public IntegrationFlow mqttOutFlow() {        //console input//        return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),//                e -> e.poller(Pollers.fixedDelay(1000)))//                .transform(p -> p + " sent to MQTT")//                .handle(mqttOutbound())//                .get();        return IntegrationFlows.from(outChannel())                .handle(mqttOutbound())                .get();    }        @Bean    public MessageChannel outChannel() {        return new DirectChannel();    }    @Bean    public MessageHandler mqttOutbound() {        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory());        messageHandler.setAsync(true);        messageHandler.setDefaultTopic("siSampleTopic");        return messageHandler;    }

配置MessagingGateway

@MessagingGateway(defaultRequestChannel = "outChannel")public interface MsgWriter {    void write(String note);}

这样就大功告成了

doc

转载地址:http://bokdm.baihongyu.com/

你可能感兴趣的文章
在 CentOS 下手工安装 Docker v1.1x
查看>>
<meta>标签基础
查看>>
Java中三种代理模式
查看>>
阅读《构建之法》十一、十二、十三章之感
查看>>
线程面试题50道
查看>>
第二阶段团队项目冲刺站立会议(六)
查看>>
Android三种播放视频的方式
查看>>
AOP方法增强自身内部方法调用无效 SpringCache 例子
查看>>
CentOS 7 安装 JDK
查看>>
正则表达式
查看>>
对配置文件内的固定内容加密解密
查看>>
epoll函数知识点
查看>>
pta l2-5(集合相似度)
查看>>
poj1019(打表预处理+数学)
查看>>
【转载】关于防火墙的初次接触
查看>>
集合和泛型 一
查看>>
php 把一个一维数组的值依次赋值到二维数组中的每一项
查看>>
CF_315C_Sereja and Contest
查看>>
vue,下级页面刷新导致路由跳转带过来的数据消失的解决方法
查看>>
border的简略
查看>>