本次主要是对thingsboard的源码进行一个简单的分析,从设备发送遥测数据到平台的角度跟进后端代码逻辑探究其流转过程,好了,话不多说,直接进入正题。
创新互联专注于江南网站建设服务及定制,我们拥有丰富的企业做网站经验。 热诚为您提供江南营销型网站建设,江南网站制作、江南网页设计、江南网站官网定制、小程序制作服务,打造江南网络公司原创品牌,更为您提供江南网站排名全网营销落地服务。数据流转大概流程找到MqttTransportService类==》通过@PostConstruct注解在项目启动后进入init()方法
==》里面绑定了MqttTransportServerInitializer类即mqtt服务初始化
==》MqttTransportServerInitializer类继承ChannelInitializer类重写了initChannel方法
==》initChannel方法里绑定了MqttTransportHandler
==》进入MqttTransportHandler的channelRead方法,验证消息类型为mqtt时转入processMqttMsg方法
==》processMqttMsg里进行判断:消息类型为连接时转入processConnect,设备session为临时的转入processProvisionSessionMsg
否则转入enqueueRegularSessionMsg方法,这里先探讨转入enqueueRegularSessionMsg
==》转入enqueueRegularSessionMsg后调用processMsgQueue将消息投递到队列
==》跟进去发现里面调用了processRegularSessionMsg方法
==》processRegularSessionMsg里根据消息的类型进行转发,比如:发布,订阅,取消订阅,取消连接等等
==》跟进PUBLISH,转入processPublish方法
==》转入processDevicePublish,进入发现这里根据消息的主题进行转发,这里选择isDeviceTelemetryTopic对应的transportService.process接口实现
==》发现这里对消息封装了一下之后转入sendToRuleEngine,将消息发送到规则链
》继续跟进进入sendToRuleEngine,发现调用ruleEngineMsgProducer.send,即将消息通过生产者发送到队列
这里对应多个实现,例如:inMemory,Kafka,RabbitMQ等等,默认发送到inMemory内存
》有生产者那肯定有消费者,我们找到DefaultTbRuleEngineConsumerService核心消费者
》找到launchMainConsumers方法》launchConsumer》consumerLoop
》发现consumerLoop是个循环,将消息取出来消费,转submitMessage方法
》然后转入forwardToRuleEngineActor》调用actorContext.tell,这里开始就是Actor模型流转了,不清楚的可以去百度搜索一下
》首先调用appActor.tell通过根appActor调用tell方法转入enqueue方法,里面对消息进行了分类,分为高优先级和正常消息队列
还有initActor()方法创建一系列actor,大概流程:AppActor》TenantActor》RuleChainActor》RuleNodeActor,我们先转入tryProcessQueue方法
==>然后发现调用了processMailbox,发现这里是将之前分类的消息依次取出来然后调用actor.process(msg)方法依次向下流转处理消息
》ContextAwareActor》process==》doProcess==》…
详细流转过程我们首先找到位于common模块下transport模块下的mqtt下的MqttTransportService类
在init方法中我们可以看到:其实现了bossGroup,workerGroup两个线程组,绑定端口,绑定Handler相关的内容,这是netty相关的学过的应该非常熟悉,这里也可以猜想到thingsboard里的mqtt是基于netty封装实现的。
项目在启动时,基于spring的@Service注解和 @PostConstruct注解,会进入到我们的init()方法里,然后转入MqttTransportServerInitializer,即mqtt服务初始化过程。
进入到MqttTransportServerInitializer方法
这里主要是继承了ChannelInitializer类重写了initChannel初始化管道方法,
设置一些解码,ip过滤相关的,然后转入MqttTransportHandler具体的逻辑处理。
转入MqttTransportHandler我们直接看到channelRead方法,客户端在连接mqtt服务端时会进入到这里
这里我们可以看到,主要对消息做了一个判断,是否是mqtt类型的消息,是则转入processMqttMsg方法处理mqtt消息。
在这个方法里面我们可以看到,消息类型为连接时转入processConnect方法处理连接,返回相关ack确认,设备session为临时的转入processProvisionSessionMsg,否则转入enqueueRegularSessionMsg处理消息,这里我们主要关注最后一个。
这里可以看到消息进入时做了一个判断,消息数量大于队列容纳消息长度时直接return,这个也可以在application.yml里面配置的。然后直接调用processMsgQueue方法处理消息到队列,我们跟进去看看。
这里我们可以看到,根据消息的类型来做后续的逻辑处理,例如:发布,订阅,取消订阅,心跳等等。这里我们进入PUBLISH消息上报这里。
这里有判断,我们直接转入processDevicePublish方法,上面那个是和网关相关的我们目前没涉及到,先不管。
看到processDevicePublish这个方法里面有很多个分支,例如:发布设备属性,设备遥测数据,设备远程控制等等,我们主要跟进设备遥测数据这里。
这里我们可以看到,对消息进行一下封装,加入了设备名称,设备类型,发送的时间戳,然后将消息发送到规则链,进入sendToRuleEngine方法
可以看到这里指定了后面要发送到队列的名称,默认为Main,从
String defaultQueueName = deviceProfile.getDefaultQueueName();
queueName = defaultQueueName != null ? defaultQueueName : ServiceQueue.MAIN;可以看出在设备配置里面是可以指定发送到那个队列的名称的
我们继续跟进sendToRuleEngine方法
这里可以看到我们的消息是通过ruleEngineMsgProducer生产者进行发送到队列的, ruleEngineMsgProducer.send这个接口对应了多个实现
有基于内存的,Kafka,RabbitMQ等等,默认是基于内存的,这个可以在配置文件内修改。
有生产者那么对应肯定有消费者,此时我们需要转入消费者,看看它后面的逻辑运转是怎样的。
我们找到DefaultTbRuleEngineConsumerService核心消费类
这里可以明显地看到消费者依次从队列里取出消息处理后调用submitMessage处理后续逻辑
forwardToRuleEngineActor从这个方法名字我们看到消息将推送到规则链Actor,后面就开始进入Actor模型流转了,这个不清楚的可以百度搜索一下。
首先进入的是AppActor.tell方法,这个是顶级actor,是一切actor创建的源头。
流转过程:
AppActor==》TenantActor==》RuleChainActor==》RuleNodeActor
我们接着跟进
!](https://img-blog.csdnimg.cn/6d61049e3bc64c8aa8f60262b0868439.png#pic_center)
这里消息进行了分类,创建了两个存储消息的队列,一个高优先级队列,一个正常队列,根据消息的类型进行添加,后调用tryProcessQueue方法进一步处理消息,
initActor()这个对应初始化各类的actor,
我们进入tryProcessQueue方法。
可以看到使用了线程池创建的线程去执行我们的processMailbox方法
在processMailbox方法里,可以看到,是先从高优先级队列里取消息进行处理,直至其高优先级队列为空时才从正常队列里取消息进行处理,消息取出后调用actor.process方法进行消息的处理流转,整个过程一直循环。
跟进actor.process方法,进入ContextAwareActor的process方法,进入doProcess方法,然后流转到各个分类的actor,这里我们选择一个进行跟进
ruleChainActor
可以看到这里也有很多分支,这里我们选两个进行分析:首先是:onRuleNodeToSelfMsg,
跟进后发现调用tbNode.onMsg方法进入我们的节点的消息处理方法,点击我们可以发现对应很多个节点实现,这个对应的就是我们前台web界面规则链里的一个个流转的节点
我们随机选择一个进入CalculateDeltaNode
可以看到每个节点都有init(),onMsg方法,初始化和对应节点的逻辑实现,后续如果自己想自定义节点时可以参照着写。到这里我们基本清楚了整个消息流转的流程,先告一段落了。
再回到之前那个onQueueToRuleEngineMsg
我们选择onQueueToRuleEngineMsg,对应消息流转到规则链节点。
这几个也可以跟进去看看,最终也是流转到了规则链节点里,消息一直在这几个过程里面循环,就先讲到这里了,后面有时间再继续研究。
thingsboard的源码还是比较复杂一点的,其中有很多的分支流转,循环嵌套,而且很多地方使用的都是异步操作,线程池操作,断点分析有时也不容易,不过不管是代码规范还是逻辑上还是比较严谨的,耐心一点还是比较容易理解清楚整个流程的。
你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧
当前名称:thingsboard源码分析--从设备数据上报的角度-创新互联
网站链接:http://lswzjz.com/article/dsodhe.html