Go中经常使用EventBus成功事情驱动编程
当天咱们要讨论的是Go言语中的事情驱动编程,特意是如何经常使用EventBus来成功这一目的。
什么是事情驱动编程?
事情驱动编程是一种编程范式,其中运行程序的流程由外部事情(如用户输入或系统触发的事情)来控制。这种方法在GUI运行、网络编程和实时系统中尤为经常出现。
为什么选用EventBus?
EventBus是一个用于Go运行的轻量级、高效的事情库,它准许您在不同组件之间传递信息,而无需它们间接相互援用。
装置EventBus
经常使用以下命令装置EventBus库:
goget.com/asaskevich/EventBus
基础用法
创立EventBus实例
import"github.com/asaskevich/EventBus"bus:=EventBus.New()
注册事情
bus.Subscribe("topic:event",func(msgstring){fmt.Println("Received:",msg)})
触发事情
bus.Publish("topic:event","HelloEventBus!")
初级用法
带有多个参数的事情
bus.Subscribe("topic:multiple",func(aint,bstring){fmt.Println("Received:",a,b)})bus.Publish("topic:multiple",42,"Hello")
敞开事情订阅
bus.Unsubscribe("topic:event")
经常使用通配符
EventBus支持经常使用通配符来订阅多个主题。
bus.Subscribe("topic:*",func(msgstring){fmt.Println("WildcardReceived:",msg)})
实战:构建一个便捷的聊天运行
假定咱们要构建一个便捷的聊天运行,其中有多个聊天室。每个聊天室都有自己的事情主题。
typeChatRoomstruct{busEventBus.Bus}funcNewChatRoom()*ChatRoom{return&ChatRoom{bus:EventBus.New(),}}func(c*ChatRoom)Join(userstring){c.bus.Subscribe("chat:"+user,func(msgstring){fmt.Println(user,"received:",msg)})}func(c*ChatRoom)Send(user,msgstring){c.bus.Publish("chat:"+user,msg)}
总结
经过经常使用EventBus,咱们可以轻松地在Go运行中成功事情驱动编程。从基础的事情订阅和颁布,到初级的通配符和多参数事情,EventBus提供了一套完整而灵敏的处置打算。这不只使咱们的代码愈加模块化和可保养,还大大提高了运行的照应性和裁减性。
关于如何设计一个基于事件驱动架构的思考
最近一直在思考一个问题:有没有这样一种可能,就是一个领域模型的状态不依赖于外部,它只负责接收外部的事件,然后根据这些事件做出响应;响应分两种:根据模型当前的内存状态进行业务逻辑处理,然后产生事件,注意:这个过程不会改变模型当前的内存状态;根据事件改变自己的状态;另外,也是最重要的,领域模型不用关心自己所产生的事件到底怎么样了,比如不关心有没有持久化,不关心是否和别的事件有并发冲突。 它只管根据自己当前的内存状态做上面这两点的响应;如果这样的设想有可能,那领域模型就是真正的中央业务逻辑处理器了,和CPU很类似了。 这样它才能真正快起来。 简单的说就是:事件->模型->事件模型只管响应事件,然后响应处理,然后产生新的事件领域模型就是一黑盒,它只能帮你处理业务逻辑,其他的什么处理结果它一概不关心;当然,领域模型肯定有它自己的状态,但这个状态是驻留在内存的,和领域模型是一体的。 我为什么会有这个想法是因为,我在想,为什么要让领域模型的处理逻辑依赖于它的处理结果是否被正确顺利持久化了?感觉这很荒唐。 既然领域模型有自己的内存状态空间,他的所有逻辑也应该只依赖于这个状态空间,不再依赖于其他任何外部的东西。 当然,以前我们设计的IRepository,实际背后都是直接从数据库取。 这样的话,领域模型的状态空间就是数据库了。 但是这样其实很不好,为什么不用内存作为领域模型的状态空间呢?现在再想想LMAX就是我刚才的想法的一个实际例子。 事件->模型->事件,这样的设计,理论上并不需要必须要求单线程来访问模型,因为领域模型不依赖于任何外部的状态,只依赖于自己所在存活内存空间;单线程有一个很大的好处就是可以防止并发冲突的产生。 我们其实完全支持多线程或集群的方式,只不过这样会有可能访问到的领域对象的状态是了老的,因为不同的机器之间的领域模型内存对象的状态需要做一些同步,访问到老数据的可能性的大小取决于并发的大小以及机器之间数据同步的快慢;LMAX之所以用单线程,是考虑了,这单线程的领域模型和性能之间,性能已经非常高其足以达到他们的要求了。 这样的架构,我觉得领域模型中的任何一个对象的一次完整的状态更新至少会响应两个事件,举个例子:先响应ChangeNoteCommand(command也是一种事件,可以理解为NoteChangeRequested),然后Note模型产生一个NoteChanged事件,注意,此时模型自己的状态还未改变,此时只是先产生了一个事件表示什么事情发生了;然后该事件(NoteChanged)最终又被发送到领域模型让其响应,此时,领域模型才去更改自己的Note状态并将最新状态保存到自己的内存空间,如一个dict中或redis中;经过对这两个事件的响应,才完成了Note的最终状态的修改;而我们以前都是从数据库取Note,然后更改,然后保存到数据库。 这样不慢才怪!通过上面的两次事件响应,可以换来领域模型对事件的极快的响应,因为完全无IO。 剩下的我们只要考虑(我目前考虑了以下六个问题):消息的序列化和反序列化;消息传递的速度;事件持久化的速度;并发冲突后重试的设计;消息丢失了怎么办;集群部署时,各台服务器之间内存的同步如何实现;需要明白的是:这些都不是领域模型该考虑的问题。 这些外围的任何问题,都不要让领域模型自己去考虑,我们应该对出现的各种问题逐个寻求解决方案。 每个问题的解决方案我大概理了下我的对策:消息的序列化和反序列化:这个简单,用BinaryFormatter,或更快的开源序列化组件,对于事件这样大小的对象可以达到每秒10W次每秒;消息传递的速度:用MSMQ/RabbitMq,等带持久化功能的队列组件;如果嫌太慢,就用ZeroMq(无消息持久化功能),但可以达到30W消息每秒;事件持久化的速度:由于事件都是跟着单个聚合根,所以我们只要确保单个聚合根的事件不会冲突(即没有重复的版本号的事件);为了更快的持久化,我们可以对事件按照聚合根或者其他方式进行分区存放,不同的服务器存放不同的聚合根的事件;这样通过集群持久化的方式可以实现多事件同时被持久化,从而提高整体的事件持久化吞吐量;如单个mongodb server每秒持久化5000个,那10个mongodb server就能每秒持久化5W个;并发冲突后怎么办:一般来说就是选择重试,但为了确保不会出现不可控的局面(可能由于某种原因一直在重试,引起消息堵塞),那需要设置一个最大的重试次数;超过最大重试次数后不再重试,然后记录日志,以供以后查找问题;这里的重试的意思是:重新找到对应该事件的command,然后再次发送该command给领域模型处理;消息丢失:丢失就丢失了呗,呵呵;要是你觉得消息决不能丢失,那就用可靠的带持久化功能的消息传输队列,如MSMQ;当然,就算消息丢失了,我们很多时候都要想想有没有影响的,一般来说,消息丢失,至少我们是知道程序有问题了的,因为模型的状态此时一定是不对的。 我们可以通过在消息发出时和接收时记录日志,这样方便以后查找消息是在哪个环节丢的;任何其他的异常出现,这个我觉得如果都是托管代码,那可以在必要的地方加try catch,然后记录日志。 至于是否要重试,还要看情形;另外,如果是多线程访问模型,或集群访问,那很多时候访问到的内存的领域对象的状态都是老的,那怎么办?其实这不是问题,因为事件持久化的时候会被检测到这种并发重复,然后对应的command会被重试。 如果一个事件被成功的持久化了,那如何让各台应用服务器知道?这个我觉得也简单,就是当事件持久化完成后,通过zeromq publish给所有的应用服务器,每台应用服务器都有一个后台的线程在不停的接收已被成功持久化了的事件,然后根据这些事件更新自己内存空间中的领域对象的状态。 这一步完全可以由框架自动做掉;这里相当于我上面提到的第二个事件(NoteChanged)是由框架自动处理的,不需要用户写代码干预;前面说到,因为是publish-subscribe模式,所以各台应用服务器上的数据就会自然保持同步了;另外,这种架构,传输的是事件,事件都是很小的,所以不用担心消息传输的性能。 对于以上的想法,有人有下面的两点担心:事件是否就是解决当前复杂软件架构的银弹?系统中如果出现海量的事件是否会出现另一种灾难?我记得不知道是谁说过,OO的本质就是消息通信。 command也好,event也好,或者直接的方法调用也好,本质上都是对象与对象之间的消息通信。 方法调用太生硬(这点我记得你曾今也提到过,当然我觉得聚合内很适合用方法调用来实现聚合内的对象的通信)command, event本质上都是通过message作为媒介,实现对象与对象之间的通信。 这让我想起有一位高人曾经说过的一个比喻,下面是摘录的他的原话:“现在的SOA、ESB之类的东西是不是就像打造一个企业的“神经脉络”,而“OO”是不是就像“神经元”,它们之间的通讯就是靠生物电脉冲,这就是消息驱动。 ”所以,我在想,软件实现用户的需求,是不是也应该有很多的对象以及很多的消息(event)这两样东西作为核心组成,对象相当于神经元,消息相当于生物电脉冲。 整个软件在运行过程中就是这样一个由对象以及消息组成的网络。 至于复杂性,我觉得框架可以帮我们实现消息通信的部分,而我们程序员要做的就是定义对象结构,然后让对象具有发送消息和接收消息的行为功能。 我觉得这点并不是很复杂吧!最近我一直在努力实现我这个想法,因为我师兄说:“我现在不相信什么架构,just show me the code”。 有想法和能实现出来是两回事,你有多少能力,你的设计能力,对细节的把控能力,程序员内在素养,一看代码便知,呵呵。 有人回复说:事件本身没有错,我想强调的是“事件”的定位问题。 “事件”是一个界与另一个界交互的方式,但界是分层次的。 用人体比喻很好理解,细胞之间的事件,组织之间的事件,器官之间的事件。 构建这样的事件体系是非常复杂的,目前的技术很难达到,不是一个EventBus就可以解决的。 针对上面的说法,我觉得这里主要还是一个编程思路的转变问题。 事件驱动天生是一种异步编程。 我之所以想自己搞一个这样的框架,主要是因为:事件驱动的编程模型让model不在有任何负担,让model只面向in memory,从而实现高性能不是梦了;事件的version机制让我们方便的实现乐观并发,确保单个聚合根内强一致,聚合根之间最终一致;然后配合框架自动实现的重试功能,可以在并发冲突后自动重试,这样极大避免command的执行失败率;事件数据不是关系型数据,所以事件产生者和处理者都可以多个,这意味着我们做集群非常容易,且事件的存储可以任意拆分,只要确保同一个聚合根的事件放在一起即可,不同聚合根的事件理论上都可以放在不同的服务器上,这样我们持久化事件也可以并发,我们只要对聚合根id+commitSequence这两个字段建立唯一索引即可。 从而克服事件持久化(IO操作)慢的瓶颈;在这么多诱人的特性面前,我们还有什么说不的理由呢?困难不要紧,我们可以一步步来,呵呵。 总比没有想法好,你说呢?-------------------------------------------------------------------------------------------------------------目前就想到这些。 后续再完善思路。 知识决定命运,学习积累知识,而正确的思维方式是一切高效学习的基础。 所以学会如何清晰地思考问题是非常重要的! 呵呵!
使用reactor eventbus进行事件驱动开发
!/reactor/core/dispatch/ 在!/reactor/创建默认的ThreadPoolExecutorDispatcher 构造器 因此,队列没有满的时候是异步的,满的时候就阻塞了。
免责声明:本文转载或采集自网络,版权归原作者所有。本网站刊发此文旨在传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及版权、内容等问题,请联系本网,我们将在第一时间删除。同时,本网站不对所刊发内容的准确性、真实性、完整性、及时性、原创性等进行保证,请读者仅作参考,并请自行核实相关内容。对于因使用或依赖本文内容所产生的任何直接或间接损失,本网站不承担任何责任。