四种主要类型的对比分析-消息队列选型指南 (四种主要类型信用证)
消息队列基础
1.1 什么是消息队列?
消息队列是在消息传输过程中存储消息的容器,用于接收消息并以文件的方式存储,一个消息队列可以被一个或多个消费者消费,包含以下 3 个元素:
- 生产者:生成并发送消息
- 消息队列:存储消息
- 消费者:消费消息
1.2 消息队列模式
1.3 消息队列应用场景
消息队列广泛应用于以下场景:
- 解耦系统组件
- 异步处理任务
- 缓存消息
- 消息发布订阅
- 流数据处理
常用消息队列
由于 ActiveMQ 5.x 的官方社区维护越来越少,较少在大规模吞吐场景中使用,因此我们主要讲解 Kafka、RabbitMQ 和 RocketMQ。
2.1 Kafka
重要概念
Kafka 工作原理
2.2 RocketMQ
重要概念
RocketMQ 架构
2.3 RabbitMQ
重要概念
RabbitMQ 工作原理
异同对比
特征 | Kafka | RocketMQ | RabbitMQ |
---|---|---|---|
模式 | 发布订阅 | 发布订阅 | 消息代理 |
可靠性 | 高可靠(副本机制) | 高可靠(CommitLog 和消息索引) | 比较可靠(事务机制) |
吞吐量 | 高吞吐量(单机百万级消息/秒) | 高吞吐量(单机百万级消息/秒) | 中等吞吐量 |
延迟 | 低延迟(毫秒级) | 低延迟(毫秒级) | 中等延迟 |
可扩展性 | 高可扩展性(水平扩展) | 高可扩展性(水平扩展) | 中等可扩展性 |
高可用性 | 高可用性(故障转移) | 高可用性(故障转移) | 比较高可用性(集群模式) |
生态系统 | 完善的生态系统(Kafak Connect、Kafka Streams) | 完善的生态系统(RocketMQ Console、RocketMQ Dashboard) | 完善的生态系统(RabbitMQ Management Plugin、Spring AMQP) |
技术选型
场景 | 推荐消息队列 |
---|---|
大数据流处理 | Kafka |
金融交易消息 | RocketMQ |
系统集成 | RabbitMQ |
低延迟场景 | Kafka、RocketMQ |
高并发场景 | Kafka、RocketMQ |
高可靠性场景 | Kafka、RocketMQ |
分布式场景 | Kafka、RocketMQ |
总结
Kafka、RocketMQ 和 RabbitMQ 都是优秀的开源消息队列,各有其优缺点。在实际应用中,需要根据具体场景和需求选择合适的队列。
Android 中的“子线程”解析
Android 中线程可分为 主线程 和 子线程 两类,其中主线程也就是 UI线程 ,它的主要这作用就是运行四大组件、处理界面交互。 子线程则主要是处理耗时任务,也是我们要重点分析的。 首先 Java 中的各种线程在 Android 里是通用的,Android 特有的线程形态也是基于 Java 的实现的,所以有必要先简单的了解下 Java 中的线程,本文主要包括以下内容: 在 Java 中要创建子线程可以直接继承 Thread 类,重写 run() 方法: 或者实现 Runnable 接口,然后用Thread执行Runnable,这种方式比较常用: 简单的总结下:Callable 和 Runnable 类似,都可以用来处理具体的耗时任务逻辑的,但是但具体的差别在哪里呢?看一个小例子: 定义 MyCallable 实现了Callable 接口,和之前 Runnable 的 run() 方法对比下, call() 方法是有返回值的哦,泛型就是返回值的类型: 一般会通过线程池来执行 Callable (线程池相关内容后边会讲到),执行结果就是一个 Future 对象: 可以看到,通过线程池执行 MyCallable 对象返回了一个Future对象,取出执行结果。 Future 是一个接口,从其内部的方法可以看出它提供了取消任务(有坑!!!)、判断任务是否完成、获取任务结果的功能:Future 接口有一个 FutureTask 实现类,同时 FutureTask 也实现了 Runnable 接口,并提供了两个构造函数: 用 FutureTask 一个参数的构造函数来改造下上边的例子:FutureTask 内部有一个 done() 方法,代表 Callable 中的任务已经结束,可以用来获取执行结果: 所以 Future+Callable 的组合可以更方便的获取子线程任务的执行结果,更好的控制任务的执行,主要的用法先说这么多了,其实 AsyncTask 内部也是类似的实现! 注意, Future 并不能取消掉运行中的任务,这点在后边的 AsyncTask 解析中有提到。 Java 中线程池的具体的实现类是 ThreadPoolExecutor ,继承了 Executor 接口,这些线程池在 Android 中也是通用的。 使用线程池的好处: 常用的构造函数如下: 一个常规线程池可以按照如下方式来实现: 执行任务: 基于 ThreadPoolExecutor ,系统扩展了几类具有新特性的线程池: 线程池可以通过 execute() 、 submit() 方法开始执行任务,主要差别从方法的声明就可以看出,由于 submit() 有返回值,可以方便得到任务的执行结果: 要关闭线程池可以使用如下方法: IntentService 是 Android 中一种特殊的 Service,可用于执行后台耗时任务,任务结束时会自动停止,由于属于系统的四大组件之一,相比一般线程具有较高的优先级,不容易被杀死。 用法和普通 Service 基本一致,只需要在 onHandleIntent() 中处理耗时任务即可: 至于 HandlerThread,它是 IntentService 内部实现的重要部分,细节内容会在 IntentService 源码中说到。 IntentService 首次创建被启动的时候其生命周期方法 onCreate() 会先被调用,所以我们从这个方法开始分析: 这里出现了 HandlerThread 和 ServiceHandler 两个类,先搞明白它们的作用,以便后续的分析。 首先看 HandlerThread 的核心实现: 首先它继承了 Thread 类,可以当做子线程来使用,并在run() 方法中创建了一个消息循环系统、开启消息循环。 ServiceHandler 是 IntentService 的内部类,继承了 Handler,具体内容后续分析: 现在回过头来看onCreate() 方法主要是一些初始化的操作, 首先创建了一个thread对象,并启动线程,然后用其内部的 Looper 对象 创建一个mServiceHandler对象,将子线程的 Looper 和 ServiceHandler 建立了绑定关系,这样就可以使用 mServiceHandler 将消息发送到子线程去处理了。 生命周期方法onStartCommand() 方法会在 IntentService 每次被启动时调用,一般会这里处理启动 IntentService 传递 Intent 解析携带的数据: 又调用了 start() 方法: 就是用 mServiceHandler 发送了一条包含 startId 和 intent 的消息,消息的发送还是在主线程进行的,接下来消息的接收、处理就是在子线程进行的: 当接收到消息时,通过 onHandleIntent() 方法在子线程处理 intent 对象, onHandleIntent() 方法执行结束后,通过 stopSelf(1) 等待所有消息处理完毕后终止服务。 为什么消息的处理是在子线程呢?这里涉及到 Handler 的内部消息机制,简单的说,因为 ServiceHandler 使用的 Looper 对象就是在 HandlerThread 这个子线程类里创建的,并通过 () 开启消息循环,不断从消息队列(单链表)中取出消息,并执行,截取 loop() 的部分源码:dispatchMessage() 方法间接会调用 handleMessage() 方法,所以最终 onHandleIntent() 就在子线程中划线执行了,即 HandlerThread 的 run() 方法。 这就是 IntentService 实现的核心,通过 HandlerThread+Hanlder 把启动 IntentService 的 Intent 从主线程切换到子线程,实现让 Service 可以处理耗时任务的功能! AsyncTask 是 Android 中轻量级的异步任务抽象类,它的内部主要由线程池以及 Handler 实现,在线程池中执行耗时任务并把结果通过 Handler 机制中转到主线程以实现UI操作。 典型的用法如下: 从 Android3.0 开始,AsyncTask 默认是串行执行的: 如果需要并行执行可以这么做: AsyncTask 的源码不多,还是比较容易理解的。 根据上边的用法,可以从 execute() 方法开始我们的分析: 看到 @MainThread 注解了吗?所以 execute() 方法需要在主线程执行哦! 进而又调用了 executeOnExecutor() : 可以看到,当任务正在执行或者已经完成,如果又被执行会抛出异常!回调方法 onPreExecute() 最先被执行了。 传入的 sDefaultExecutor 参数,是一个自定义的串行线程池对象,所有任务在该线程池中排队执行: 可以看到 SerialExecutor 线程池仅用于任务的排队, THREAD_POOL_EXECUTOR 线程池才是用于执行真正的任务,就是我们线程池部分讲到的 ThreadPoolExecutor : 再回到 executeOnExecutor() 方法中,那么 (mFuture) 就是触发线程池开始执行任务的操作了。 那 executeOnExecutor() 方法中的 mWorker 是什么? mFuture 是什么?答案在 AsyncTask 的构造函数中: 原来 mWorker 是一个 Callable 对象, mFuture 是一个 FutureTask 对象,继承了 Runnable 接口。 所以 mWorker 的 call() 方法会在 mFuture 的 run() 方法中执行,所以 mWorker 的 call() 方法在线程池得到执行! 同时 doInBackground() 方法就在 call() 中方法,所以我们自定义的耗时任务逻辑得到执行,不就是我们第二部分讲的那一套吗!doInBackground() 的返回值会传递给 postResult() 方法: 就是通过 Handler 将最终的耗时任务结果从子线程发送到主线程,具体的过程是这样的, getHandler() 得到的就是 AsyncTask 构造函数中初始化的 mHandler , mHander 又是通过 getMainHandler() 赋值的: 可以在看到 sHandler 是一个 InternalHandler 类对象: 所以 getHandler() 就是在得到在主线程创建的 InternalHandler 对象,所以 就可以完成耗时任务结果从子线程到主线程的切换,进而可以进行相关UI操作了。 当消息是 MESSAGE_POST_RESULT 时,代表任务执行完成, finish() 方法被调用: 如果任务没有被取消的话执行 onPostExecute() ,否则执行 onCancelled() 。 如果消息是 MESSAGE_POST_PROGRESS , onProgressUpdate() 方法被执行,根据之前的用法可以 onProgressUpdate() 的执行需要我们手动调用 publishProgress() 方法,就是通过 Handler 来发送进度数据: 进行中的任务如何取消呢?AsyncTask 提供了一个 cancel(boolean mayInterruptIfRunning) ,参数代表是否中断正在执行的线程任务,但是呢并不靠谱, cancel() 的方法注释中有这么一段: 大致意思就是调用 cancel() 方法后, onCancelled(Object) 回调方法会在 doInBackground() 之后被执行而 onPostExecute() 将不会被执行,同时你应该 doInBackground() 回调方法中通过 isCancelled() 来检查任务是否已取消,进而去终止任务的执行! 所以只能自己动手了: AsyncTask 整体的实现流程就这些了,源码是最好的老师,自己跟着源码走一遍有些问题可能就豁然开朗了!
大数据如何入门
首先我们要了解Java语言和Linux操作系统,这两个是学习大数据的基础,学习的顺序不分前后。
大数据
Java :只要了解一些基础即可,做大数据不需要很深的Java 技术,学java SE 就相当于有学习大数据基础。
Linux:因为大数据相关软件都是在Linux上运行的,所以Linux要学习的扎实一些,学好Linux对你快速掌握大数据相关技术会有很大的帮助,能让你更好的理解hadoop、hive、hbase、spark等大数据软件的运行环境和网络环境配置,能少踩很多坑,学会shell就能看懂脚本这样能更容易理解和配置大数据集群。还能让你对以后新出的大数据技术学习起来更快。
Hadoop:这是现在流行的大数据处理平台几乎已经成为大数据的代名词,所以这个是必学的。Hadoop里面包括几个组件HDFS、MapReduce和YARN,HDFS是存储数据的地方就像我们电脑的硬盘一样文件都存储在这个上面,MapReduce是对数据进行处理计算的,它有个特点就是不管多大的数据只要给它时间它就能把数据跑完,但是时间可能不是很快所以它叫数据的批处理。
Zookeeper:这是个万金油,安装Hadoop的HA的时候就会用到它,以后的Hbase也会用到它。它一般用来存放一些相互协作的信息,这些信息比较小一般不会超过1M,都是使用它的软件对它有依赖,对于我们个人来讲只需要把它安装正确,让它正常的run起来就可以了。
Mysql:我们学习完大数据的处理了,接下来学习学习小数据的处理工具mysql数据库,因为一会装hive的时候要用到,mysql需要掌握到什么层度那?你能在Linux上把它安装好,运行起来,会配置简单的权限,修改root的密码,创建数据库。这里主要的是学习SQL的语法,因为hive的语法和这个非常相似。
Sqoop:这个是用于把Mysql里的数据导入到Hadoop里的。当然你也可以不用这个,直接把Mysql数据表导出成文件再放到HDFS上也是一样的,当然生产环境中使用要注意Mysql的压力。
Hive:这个东西对于会SQL语法的来说就是神器,它能让你处理大数据变的很简单,不会再费劲的编写MapReduce程序。有的人说Pig那?它和Pig差不多掌握一个就可以了。
Oozie:既然学会Hive了,我相信你一定需要这个东西,它可以帮你管理你的Hive或者MapReduce、Spark脚本,还能检查你的程序是否执行正确,出错了给你发报警并能帮你重试程序,最重要的是还能帮你配置任务的依赖关系。我相信你一定会喜欢上它的,不然你看着那一大堆脚本,和密密麻麻的crond是不是有种想屎的感觉。
Hbase:这是Hadoop生态体系中的NOSQL数据库,他的数据是按照key和value的形式存储的并且key是唯一的,所以它能用来做数据的排重,它与MYSQL相比能存储的数据量大很多。所以他常被用于大数据处理完成之后的存储目的地。
Kafka:这是个比较好用的队列工具,队列是干吗的?排队买票你知道不?数据多了同样也需要排队处理,这样与你协作的其它同学不会叫起来,你干吗给我这么多的数据(比如好几百G的文件)我怎么处理得过来,你别怪他因为他不是搞大数据的,你可以跟他讲我把数据放在队列里你使用的时候一个个拿,这样他就不在抱怨了马上灰流流的去优化他的程序去了,因为处理不过来就是他的事情。而不是你给的问题。当然我们也可以利用这个工具来做线上实时数据的入库或入HDFS,这时你可以与一个叫Flume的工具配合使用,它是专门用来提供对数据进行简单处理,并写到各种数据接受方(比如Kafka)的。
Spark:它是用来弥补基于MapReduce处理数据速度上的缺点,它的特点是把数据装载到内存中计算而不是去读慢的要死进化还特别慢的硬盘。特别适合做迭代运算,所以算法流们特别稀饭它。它是用scala编写的。Java语言或者Scala都可以操作它,因为它们都是用JVM的。
免责声明:本文转载或采集自网络,版权归原作者所有。本网站刊发此文旨在传递更多信息,并不代表本网赞同其观点和对其真实性负责。如涉及版权、内容等问题,请联系本网,我们将在第一时间删除。同时,本网站不对所刊发内容的准确性、真实性、完整性、及时性、原创性等进行保证,请读者仅作参考,并请自行核实相关内容。对于因使用或依赖本文内容所产生的任何直接或间接损失,本网站不承担任何责任。