Rabbitmq 和 Celery 是怎样面试和工作是两回事的

Celery+RabbitMQ快速入门
Celery+RabbitMQ快速入门
本文主要介绍Celery+RabbitMQ的入门知识
Celery 是一个异步任务队列/基于分布式消息传递作业队列,它侧重于实时操作,同样也支持调度
RabbitMQ为应用程序提供了强大的消息服务。它很容易使用,适合在云规模应用,并支持所有主流的操作系统和开发平台。RabbitMQ在Mozilla公共许可下开源
安装Celery
使用easy_install安装
sudo easy_install celery
安装RabbitMQ
在ubuntu下使用apt-get方式安装rabbitmq-server
sudo apt-get install rabbitmq-server
安装完毕之后可以使用如下命令查看MQ当前服务状态
alex@alex-pc:~/test$ sudo rabbitmqctl status
[sudo] password for alex:
Status of node 'rabbit@alex-pc' ...
[{running_applications,[{rabbit,"RabbitMQ","1.7.2"},
{mnesia,"MNESIA
CXC 138 12","4.4.12"},
{os_mon,"CPO
CXC 138 46","2.2.4"},
{sasl,"SASL
CXC 138 11","2.1.8"},
{stdlib,"ERTS
CXC 138 10","1.16.4"},
{kernel,"ERTS
CXC 138 10","2.13.4"}]},
{nodes,['rabbit@alex-pc']},
{running_nodes,['rabbit@alex-pc']}]
配置celery
选择一个测试目录,在当前路径下新建配置文件celeryconfig.py,如下:
import sys
sys.path.insert(0, os.getcwd())
CELERY_IMPORTS = ("tasks", )
CELERY_RESULT_BACKEND = "amqp"
BROKER_HOST = "localhost"
BROKER_PORT = 5672
BROKER_USER = "guest"
BROKER_PASSWORD = "guest"
BROKER_VHOST = "/
新建tasks.py,如下:
from celery.task import task
def add(x, y):
return x + y
启动celery服务
在终端中使用如下命令:
alex@alex-pc:~/test$ celeryd --loglevel=INFO
[ 23:06:27,226: WARNING/MainProcess]
-------------- celery@alex-pc v2.2.7
---- **** -----
* -- [Configuration]
-- * - **** ---
amqplib://guest@localhost:5672/
- ** ----------
celery.loaders.default.Loader
- ** ----------
. logfile:
[stderr]@INFO
- ** ----------
. concurrency: 2
- ** ----------
- *** --- * ---
-- ******* ----
--- ***** ----- [Queues]
--------------
exchange:celery (direct) binding:celery
. tasks.add
[ 23:06:27,233: INFO/PoolWorker-1] child process calling self.run()
[ 23:06:27,235: INFO/PoolWorker-2] child process calling self.run()
[ 23:06:27,236: WARNING/MainProcess] celery@alex-pc has started.
打开ipython或者任何python shell即可
In [8]: from celery.task import task
In [9]: import tasks
In [10]: res = tasks.add.delay(2,2)
In [11]: res.ready()
Out[11]: True
In [12]: res.result
Out[12]: 4
从celeryd的loginfo的输出信息中可以看到调用成功:
[ 23:06:39,081: INFO/MainProcess] Got task from broker: tasks.add[82d8c609-2cac-47e8-be20-eb6a2dc502d6]
[ 23:06:39,120: INFO/MainProcess] Task tasks.add[82d8c609-2cac-47e8-be20-eb6a2dc502d6] succeeded in 0.5s: 4
已投稿到:
以上网友发言只代表其个人观点,不代表新浪网的观点或立场。个人认为消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。&br&&br&使用场景的话,举个例子:&br&假设用户在你的软件中注册,服务端收到用户的注册请求后,它会做这些操作:&br&&ol&&li&校验用户名等信息,如果没问题会在数据库中添加一个用户记录&/li&&li&如果是用邮箱注册会给你发送一封注册成功的邮件,手机注册则会发送一条短信&/li&&li&分析用户的个人信息,以便将来向他推荐一些志同道合的人,或向那些人推荐他&/li&&li&发送给用户一个包含操作指南的系统通知&/li&&li&等等……&/li&&/ol&但是对于用户来说,注册功能实际只需要第一步,只要服务端将他的账户信息存到数据库中他便可以登录上去做他想做的事情了。至于其他的事情,非要在这一次请求中全部完成么?值得用户浪费时间等你处理这些对他来说无关紧要的事情么?所以实际当第一步做完后,服务端就可以把其他的操作放入对应的消息队列中然后马上返回用户结果,由消息队列异步的进行这些操作。&br&&br&或者还有一种情况,同时有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题,例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能。&br&&br&所以在软件的正常功能开发中,并不需要去刻意的寻找消息队列的使用场景,而是当出现性能瓶颈时,去查看业务逻辑是否存在可以异步处理的耗时操作,如果存在的话便可以引入消息队列来解决。否则盲目的使用消息队列可能会增加维护和开发的成本却无法得到可观的性能提升,那就得不偿失了。
个人认为消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼…
RabbitMQ是一个AMQP实现,传统的messaging queue系统实现,基于Erlang。老牌MQ产品了。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量还在其次。&br&&br&Kafka是linkedin开源的MQ系统,主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,0.8开始支持复制,不支持事务,适合产生大量数据的互联网服务的数据收集业务。&br&&br&ZeroMQ只是一个网络编程的Pattern库,将常见的网络请求形式(分组管理,链接管理,发布订阅等)模式化、组件化,简而言之socket之上、MQ之下。对于MQ来说,网络传输只是它的一部分,更多需要处理的是消息存储、路由、Broker服务发现和查找、事务、消费模式(ack、重投等)、集群服务等。
RabbitMQ是一个AMQP实现,传统的messaging queue系统实现,基于Erlang。老牌MQ产品了。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量还在其次。 Kafka是linkedin开源的MQ系统,主要特点是基于Pull的模式来处理…
redis 消息推送(基于分布式 pub/sub)多用于实时性较高的消息推送,并不保证可靠。&br&其他的mq和kafka保证可靠但有一些延迟(非实时系统没有保证延迟)。redis-pub/sub断电就清空,而使用redis-list作为消息推送虽然有持久化,但是又太弱智,也并非完全可靠不会丢。&br&&br&另外一点,redis 发布订阅除了表示不同的 topic 外,并不支持分组,比如kafka中发布一个东西,多个订阅者可以分组,同一个组里只有一个订阅者会收到该消息,这样可以用作负载均衡。&br&&br&比如,kafka 中发布:topic = &发布帖子& data=&文章1& 这个消息,后面有一百台服务器每台服务器都是一个订阅者,都订阅了这个 topic,但是他们可能分为三组,A组50台,用来真的做发布文章,A组50台里所有 subscriber 都订阅了这个topic。&br&&br&由于在同一组,这条消息 (topic=&发布帖子&, data=&文章1&)只会被A组里面一台当前空闲的机器收到。而B组25台服务器用于统计,C组25台服务器用于存档备份,每组只有一台会收到。&br&&br&用不同的组来决定每条消息要抄送出多少分去,用同组内哪些订阅者忙,哪些订阅者空闲来决定消息会被分到哪台服务器去处理,生产者消费者模型嘛。&br&&br&redis完全没有这类机制,这两点是最大的区别。。。。。
redis 消息推送(基于分布式 pub/sub)多用于实时性较高的消息推送,并不保证可靠。 其他的mq和kafka保证可靠但有一些延迟(非实时系统没有保证延迟)。redis-pub/sub断电就清空,而使用redis-list作为消息推送虽然有持久化,但是又太弱智,也并非完全可靠…
小红是小明的姐姐。&br&&br&&p&小红希望小明多读书,常寻找好书给小明看,之前的方式是这样:小红问小明什么时候有空,把书给小明送去,并亲眼监督小明读完书才走。久而久之,两人都觉得麻烦。&/p&&br&&p&后来的方式改成了:小红对小明说「我放到书架上的书你都要看」,然后小红每次发现不错的书都放到书架上,小明则看到书架上有书就拿下来看。&/p&&br&&p&书架就是一个消息队列,小红是生产者,小明是消费者。&/p&&br&&br&&p&&u&这带来的好处有:&/u&&/p&&br&&p&1.小红想给小明书的时候,不必问小明什么时候有空,亲手把书交给他了,小红只把书放到书架上就行了。这样小红小明的时间都更自由。&/p&&br&&p&2.小红相信小明的读书自觉和读书能力,不必亲眼观察小明的读书过程,小红只要做一个放书的动作,很节省时间。&/p&&br&&p&3.当明天有另一个爱读书的小伙伴小强加入,小红仍旧只需要把书放到书架上,小明和小强从书架上取书即可(唔,姑且设定成多个人取一本书可以每人取走一本吧,可能是拷贝电子书或复印,暂不考虑版权问题)。&/p&&br&&p&4.书架上的书放在那里,小明阅读速度快就早点看完,阅读速度慢就晚点看完,没关系,比起小红把书递给小明并监督小明读完的方式,小明的压力会小一些。&/p&&br&&br&&p&&u&这就是消息队列的四大好处:&/u&&/p&&br&&p&1.解耦&/p&&p&每个成员不必受其他成员影响,可以更独立自主,只通过一个简单的容器来联系。&/p&&p&小红甚至可以不知道从书架上取书的是谁,小明也可以不知道往书架上放书的人是谁,在他们眼里,都只有书架,没有对方。&/p&&p&毫无疑问,与一个简单的容器打交道,比与复杂的人打交道容易一万倍,小红小明可以自由自在地追求各自的人生。&/p&&br&&p&2.提速&/p&&p&小红选择相信「把书放到书架上,别的我不问」,为自己节省了大量时间。&/p&&p&小红很忙,只能抽出五分钟时间,但这时间足够把书放到书架上了。&/p&&br&&p&3.广播&/p&&p&小红只需要劳动一次,就可以让多个小伙伴有书可读,这大大地节省了她的时间,也让新的小伙伴的加入成本很低。&/p&&br&&p&4.削峰&/p&&p&假设小明读书很慢,如果采用小红每给一本书都监督小明读完的方式,小明有压力,小红也不耐烦。&/p&&p&反正小红给书的频率也不稳定,如果今明两天连给了五本,之后隔三个月才又给一本,那小明只要在三个月内从书架上陆续取走五本书读完就行了,压力就不那么大了。&/p&&br&&br&&p&&u&当然,使用消息队列也有其成本:&/u&&/p&&br&&p&1.引入复杂度&/p&&p&毫无疑问,「书架」这东西是多出来的,需要地方放它,还需要防盗。&/p&&br&&p&2.暂时的不一致性&/p&&p&假如妈妈问小红「小明最近读了什么书」,在以前的方式里,小红因为亲眼监督小明读完书了,可以底气十足地告诉妈妈,但新的方式里,小红回答妈妈之后会心想「小明应该会很快看完吧……」&/p&&p&这中间存在着一段「妈妈认为小明看了某书,而小明其实还没看」的时期,当然,小明最终的阅读状态与妈妈的认知会是一致的,这就是所谓的「最终一致性」。&/p&&br&&br&&p&&u&那么,该使用消息队列的情况需要满足什么条件呢?&/u&&/p&&br&&p&1.生产者不需要从消费者处获得反馈&/p&&p&引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走——即所谓异步——成为了可能。&/p&&p&小红放完书之后小明到底看了没有,小红根本不问,她默认他是看了,否则就只能用原来的方法监督到看完了。&/p&&br&&p&2.容许短暂的不一致性&/p&&p&妈妈可能会发现「有时候据说小明看了某书,但事实上他还没看」,只要妈妈满意于「反正他最后看了就行」,异步处理就没问题。&/p&&p&如果妈妈对这情况不能容忍,对小红大发雷霆,小红也就不敢用书架方式了。&/p&&br&&p&3.确实是用了有效果&/p&&p&即解耦、提速、广播、削峰这些方面的收益,超过放置书架、监控书架这些成本。&/p&&p&否则如果是盲目照搬,「听说老赵家买了书架,咱们家也买一个」,买回来却没什么用,只是让步骤变多了,还不如直接把书递给对方呢,那就不对了。&/p&
小红是小明的姐姐。 小红希望小明多读书,常寻找好书给小明看,之前的方式是这样:小红问小明什么时候有空,把书给小明送去,并亲眼监督小明读完书才走。久而久之,两人都觉得麻烦。 后来的方式改成了:小红对小明说「我放到书架上的书你都要看」,然后小红…
根本不是一个级别的东西。&br&&br&&b&Kafka是可靠的分布式日志存储服务。&br&&/b&&br&用简单的话来说,你可以把Kafka当作可&b&顺序写入的&/b&&b&一大卷磁带&/b&, 可以随时&b&倒带,快进&/b&到某个时间点重放。&br&&br&&br&&img src=&/v2-469ea8401e86acd04e9fbef8d5624c4f_b.jpg& data-rawwidth=&432& data-rawheight=&368& class=&origin_image zh-lightbox-thumb& width=&432& data-original=&/v2-469ea8401e86acd04e9fbef8d5624c4f_r.jpg&&&br&&br&&br&先说下日志的定义:日志是数据库的核心,是对数据库的所有变更的严格有序记录,“表”是变更的结果。日志的其他名字有: Changelog, Write Ahead Log, Commit Log, Redo Log, Journaling.&br&&br&Kafka的特征如下:&br&&br&&b&高写入速度:&/b&Kafka能以超过1Gbps NIC的速度写这盘磁带(实际可以到SATA 3速度,参考&a href=&///?target=https%3A///kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines& class=& wrap external& target=&_blank& rel=&nofollow noreferrer&&Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines)&i class=&icon-external&&&/i&&/a&&br&),充分利用了磁盘的物理特性,即,随机写入慢(磁头冲停),顺序写入快(磁头悬浮)。&br&&br&&b&高可靠性: &/b&通过zookeeper做分布式一致性,同步到任意多块磁盘上,故障自动切换选主,自愈。&br&&br&&b&高容量:&/b&通过横向扩展,LinkedIn每日通过Kafka存储的新增数据高达175TB,8000亿条消息,可无限扩容,类似把两条磁带粘到一起。&br&&br&传统业务数据库的根本缺陷在于:&br&1.
太慢,读写太昂贵,无法避免的随机寻址。(磁盘最快5ms寻址,固态又太昂贵。)&br&2.
根本无法适应持续产生的数据流,越用越慢。(索引效率问题)&br&3.
无法水平scale。(多半是读写分离,一主多备。另: NewSQL通过一致性算法,有多主。)&br&&br&针对这些问题,Kafka提出了一种方法: &br&&br&&b&“log-centric approach(以日志为中心的方法)。”&/b&&br&&br&将传统数据库分为两个独立的系统,即&b&日志系统和索引系统&/b&。&br&&br&&b&“持久化和索引分开,日志尽可能快的落地,索引按照自己的速度追赶。”&/b&&br&&br&在&b&数据可靠性&/b&在得到Kafka这种&b&快速的,类似磁带顺序记录方式保障&/b&的大前提下。数据的呈现,使用方式变得非常灵活,可以根据需要将数据流同时送入搜索系统,RDBMS系统,数据仓库系统, 图数据库系统,日志分析等这些各种不同的数据库系统。 这些不同的系统只不过是一种对&b&Kafka磁带数据&/b&的&b&一种诠释,一个侧面,一个索引,一个快照&/b&。数据丢了,没关系,重放一遍磁带即可,更多的时候,对这些各式数据库系统的维护只是需要定期做一个快照,并拷贝到一个安全的对象存储(如S3) 而已。
&br&&br&&b&一句话:“日志都是相同的日志,索引各有各的不同。”&br&&/b&&br&&b&关于流计算:&br&&/b&&br&在以流为基本抽象的存储模型下,数据流和数据流之间,可以多流混合处理,或者流和状态,状态和状态的JOIN处理,这就是Kafka Stream提供的功能。 一个简单的例子是,在用户触发了某个事件后,和用户表混合处理,产生数据增补(Augment),再进入数据仓库进行相关性分析,一些简单的窗口统计和实时分析也很容易就能满足,比如 在收到用户登录消息的时候,在线人数+1, 离线的时候-1,反应出当前系统的在线用户总数。这方面可以参考PipelineDB &a href=&///?target=https%3A///& class=& external& target=&_blank& rel=&nofollow noreferrer&&&span class=&invisible&&https://www.&/span&&span class=&visible&&/&/span&&span class=&invisible&&&/span&&i class=&icon-external&&&/i&&/a&&br&&br&Kafka会让你重新思考系统的构建方式,使以前不可能的事变为可能,是一个系统中最重要的最核心的部分,不夸张的说,系统设计都需要围绕Kafka做。强烈推荐阅读:&a href=&///?target=https%3A///oldratlee/translations/blob/master/log-what-every-software-engineer-should-know-about-real-time-datas-unifying/README.md& class=& wrap external& target=&_blank& rel=&nofollow noreferrer&&oldratlee/translations&i class=&icon-external&&&/i&&/a&
根本不是一个级别的东西。 Kafka是可靠的分布式日志存储服务。
用简单的话来说,你可以把Kafka当作可顺序写入的一大卷磁带, 可以随时倒带,快进到某个时间点重放。 先说下日志的定义:日志是数据库的核心,是对数据库的所有变更的严格有序记录,“表”是…
&ul&&li&跨系统的异步通信,所有需要异步交互的地方都可以使用消息队列。就像我们除了打电话(同步)以外,还需要发短信,发电子邮件(异步)的通讯方式。&/li&&li&多个应用之间的耦合,由于消息是平台无关和语言无关的,而且语义上也不再是函数调用,因此更适合作为多个应用之间的松耦合的接口。基于消息队列的耦合,不需要发送方和接收方同时在线。&br&&/li&&ul&&li&在企业应用集成(EAI)中,文件传输,共享数据库,消息队列,远程过程调用都可以作为集成的方法。&/li&&/ul&&li&应用内的同步变异步,比如订单处理,就可以由前端应用将订单信息放到队列,后端应用从队列里依次获得消息处理,高峰时的大量订单可以积压在队列里慢慢处理掉。由于同步通常意味着阻塞,而大量线程的阻塞会降低计算机的性能。&/li&&li&消息驱动的架构(EDA),系统分解为消息队列,和消息制造者和消息消费者,一个处理流程可以根据需要拆成多个阶段(Stage),阶段之间用队列连接起来,前一个阶段处理的结果放入队列,后一个阶段从队列中获取消息继续处理。&/li&&li&应用需要更灵活的耦合方式,如发布订阅,比如可以指定路由规则。&/li&&li&跨局域网,甚至跨城市的通讯,比如北京机房与广州机房的应用程序的通信。&/li&&/ul&
跨系统的异步通信,所有需要异步交互的地方都可以使用消息队列。就像我们除了打电话(同步)以外,还需要发短信,发电子邮件(异步)的通讯方式。多个应用之间的耦合,由于消息是平台无关和语言无关的,而且语义上也不再是函数调用,因此更适合作为多个应用…
&p&在多队列消息处理的场景中,怎样保持多个消息之间的时间顺序,是一个很经典的问题。解决方法当然是有的。&/p&&br&&p&为了讨论这个问题,让我们做一些简化问题的假设:&/p&&br&&ol&&li&有若干个消息队列A、B、C……&/li&&li&
有若干个线程在产生消息,并分别将消息加入这些队列&/li&&li&
每个消息队列有一个对应的线程(Worker
B、Worker C……),从队列中读取和处理消息&/li&&/ol&&br&&p&另外,还有一个很重要的前提:要保持多个消息之间的时间顺序,首先它们要有一个全局的时间顺序。因此,每个消息在被创建时,都将被赋予一个&b&全局唯一的、单调递增的、连续的&/b&序列号(Serial
Number,SN)。可以通过一个全局计数器来实现这一点。通过比较两个消息的SN,确定其先后顺序。&/p&&br&&p&回到问题。微博这个例子相对来说是比较简单的,可适用以下方案:&/p&&br&&p&&b&方案一:通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中。&/b&在本例中,针对某一个微博的操作(评论、通知、删除评论等),都依序放到消息队列A中(一般是通过对微博的ID进行hash来决定对应的消息队列)。既然同一个消息队列只有一个worker,那么很显然对其处理的顺序是可以保证的。已经有其他回答提到过这个方案。&/p&&br&&p&这个方案还有一个隐藏的前提,就是对某个微博的操作的消息应该被保证按时间顺序放入消息队列。也就是说,对微博的评论的消息应该先于删除评论的消息被放入队列。为了做到这一点,需要先将评论的消息放入队列,然后再使评论可见,然后删除评论才可能发生(类似于数据库中的Write
Ahead Log,WAL)。&/p&&br&&p&这个方案在某些场景中是有缺陷的。以微博为例,一个极端的情况是,所有人都在评论某一条微博,而其它微博没有人评论。结果是队列A不堪重负,而其它队列为空,系统退回到单线程处理消息的状况。在这种情况下,仍然需要把对同一个微博的评论尽量平均地放到所有的消息队列中。这样,我们仍然需要保证不同的消息队列中的消息的先后顺序。实践中,更通用的例子是,对微博B1的评论处于队列A,对微博B2的评论处于队列B,但是后者引用了前者,所以必须保证前者首先被处理。&/p&&br&&p&假设消息M1的SN=100,消息M2的SN=110。为了保证消息M1在M2之前被处理,关键是,处理M2之前,必须先检查M1是否已经被处理,如果没有,就等待。因此有了方案二:&/p&&br&&p&&b&在将M1加入消息队列A时,同时对其它每一个队列加入一个特殊消息(Block
Message,BM)。当某个消息队列处理BM时,检查M1是否已经被处理。如果M1已经被处理,则继续处理后面的消息。如果没有,则等待,直到M1被处理。&/b&&/p&&br&&p&严格地说,方案二不仅仅保证了M1和M2的时间顺序,更多地是保证了M1和所有M1之后的消息的时间顺序,因此它更适用于保证某个影响范围较大的操作的时间顺序。例如,M1对应创建一个用户的操作,而所有与此用户有关的操作都必须在创建此用户后才能执行。&/p&&br&&p&方案二也有问题,那就是,在将M1加入队列时,必须明确知道其后会有依赖于M1的操作进行。对本例而言,评论微博以后,不一定会有删除微博的操作,而为了M1而block所有其它的消息队列,无疑会造成很大的性能浪费。那么又有方案三:&/p&&br&&p&&b&每一个消息队列都记录自己处理的最后一个消息的SN,称为Last
SN,LSN。假设消息M1的SN=100,消息M2的SN=110。当处理到M2时,检查所有其它消息队列的LSN是否大于等于100:&/b&&/p&&br&&ul&&li&&b&如果所有消息队列的LSN都大于100,那么M1肯定已经被处理了。&/b&&/li&&li&&b&如果有一个或多个消息队列的LSN小于100,那么,M1可能还没有被处理,但一定已经被放到了某个队列中(因为M2已经在当前队列中了)。在这种情况下,block所有LSN大于100的队列,等待所有LSN小于100队列继续运行,直到这些队列要么LSN大于等于100,要么队列为空。&/b&&/li&&/ul&&br&&p&有一个优化条件是,任何时候,如果某个消息队列的LSN等于100,那么M1肯定已经被处理了,可以直接结束等待。&/p&&br&&p&当然,这肯定会导致性能上的额外开销。事实上,方案一和方案二的结合已经足够应付现实中的大多数情况了。&/p&
在多队列消息处理的场景中,怎样保持多个消息之间的时间顺序,是一个很经典的问题。解决方法当然是有的。 为了讨论这个问题,让我们做一些简化问题的假设: 有若干个消息队列A、B、C……
有若干个线程在产生消息,并分别将消息加入这些队列
每个消息队列有…
你的方向错了,对!我就是说你错了。&br&&br&redis是内存数据库!&br&redis是内存数据库!&br&redis是内存数据库!&br&redis他爹做了disque,你要不要试试。&br&&br&mq一般都采用订阅~发布模型,如果你考虑性能,主要关注点就放在消费模型是pull还是push。影响最大的,应该是存储结构。&br&&br&kafka的性能要在topic数量小于64的时候,才能发挥威力。partition决定的。极限情况下丢消息,例如:主写入消息后,主机器宕机,并硬盘损坏。review代码的时候发现的。&br&&br&rabbit不知道,但是rocket的性能是(万条每秒),并且能够横向无限扩展,单机topic数量在256时,性能损失较小。&br&&br&rocket可以说是kafka的变种,是阿里在充分reviewkafka代码后,开发的metaQ。在不断更新,修补以后,阿里把metaQ3.0更名为rocket,并且rocket是java写的易于维护。&br&&br&另外就是rocket和kafka有类似无限堆积的能力。想想,断电不丢消息,积压两亿条消息毫无压力,niubility&br&&br&kafka和rocket性能根本不是你需要考虑的问题。&br&&br&&br&如果使用过程中,出现性能问题,先学怎么用。肯定是你用错了。
你的方向错了,对!我就是说你错了。 redis是内存数据库! redis是内存数据库! redis是内存数据库! redis他爹做了disque,你要不要试试。 mq一般都采用订阅~发布模型,如果你考虑性能,主要关注点就放在消费模型是pull还是push。影响最大的,应该是存储…
RabbitMQ/Kafka/ZeroMQ 都能提供消息队列服务,但有很大的区别。&br&&br&在面向服务架构中通过消息代理(比如 RabbitMQ / Kafka等),使用生产者-消费者模式在服务间进行异步通信是一种比较好的思想。&br&&br&因为服务间依赖由强耦合变成了松耦合。消息代理都会提供持久化机制,在消费者负载高或者掉线的情况下会把消息保存起来,不会丢失。就是说生产者和消费者不需要同时在线,这是传统的请求-应答模式比较难做到的,需要一个中间件来专门做这件事。其次消息代理可以根据消息本身做简单的路由策略,消费者可以根据这个来做负载均衡,业务分离等。&br&&br&缺点也有,就是需要额外搭建消息代理集群(但优点是大于缺点的 ) 。&br&&br&ZeroMQ 和 RabbitMQ/Kafka 不同,它只是一个异步消息库,在套接字的基础上提供了类似于消息代理的机制。使用 ZeroMQ 的话,需要对自己的业务代码进行改造,不利于服务解耦。&br&&br&RabbitMQ 支持 AMQP(二进制),STOMP(文本),MQTT(二进制),HTTP(里面包装其他协议)等协议。Kafka 使用自己的协议。&br&&br&Kafka 自身服务和消费者都需要依赖 Zookeeper。&br&&br&RabbitMQ 在有大量消息堆积的情况下性能会下降,Kafka不会。毕竟AMQP设计的初衷不是用来持久化海量消息的,而Kafka一开始是用来处理海量日志的。&br&&br&总的来说,RabbitMQ 和 Kafka 都是十分优秀的分布式的消息代理服务,只要合理部署,不作,基本上可以满足生产条件下的任何需求。
RabbitMQ/Kafka/ZeroMQ 都能提供消息队列服务,但有很大的区别。 在面向服务架构中通过消息代理(比如 RabbitMQ / Kafka等),使用生产者-消费者模式在服务间进行异步通信是一种比较好的思想。 因为服务间依赖由强耦合变成了松耦合。消息代理都会提供持久化…
之前做过这方面的队列服务,也做过这两者甚至很多队列的对比测试,先介绍下beanstalkd吧,&br&&br&首先他们都是一种消息队列,都能完成一些处理或者请求时间较长的任务,&br&&br&beantalkd
特有的生产者模型和消费者模型,更再单个队列任务(job)上有以下特点:&br&&br&&b&任务优先级 (priority):&/b&&br&&br&任务 (job) 可以有 0~2^32 个优先级, 0 代表最高优先级。 beanstalkd 采用最大最小堆 (Min-max heap) 处理任务优先级排序, 任何时刻调用 reserve 命令的消费者总是能拿到当前优先级最高的任务, 时间复杂度为 O(logn)。&br&&br&&b&延时任务 (delay):&/b&&br&有两种方式可以延时执行任务 (job): 生产者发布任务时指定延时;或者当任务处理完毕后, 消费者再次将任务放入队列延时执行 (RELEASE with )。这种机制可以实现分布式的 java.util.Timer,这种分布式定时任务的优势是:如果某个消费者节点故障,任务超时重发 (time-to-run) 能够保证任务转移到另外的节点执行。&br&&br&&b&任务超时重发 (time-to-run):&/b&&br&Beanstalkd 把任务返回给消费者以后:消费者必须在预设的 TTR (time-to-run) 时间内发送 delete / release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把任务交给另外的消费者节点执行。如果消费者预计在 TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从系统时间重新计算 TTR (time-to-run)。&br&&br&&b&任务预留 (buried):&/b&&br&如果任务因为某些原因无法执行, 消费者可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek buried 命令查询被保留的任务,并且进行人工干预。简单的, kick 能够一次性把 n 条被保留的任务踢回队列。&br&&br&从下面几个方面吧。&br&&br&&b&速度优势&/b&&br&&br&&p&参考数据:&/p&&br&&img src=&/dbbd8ac979caf1400eda6974edd4d712_b.png& data-rawwidth=&365& data-rawheight=&157& class=&content_image& width=&365&&&br&&b&为什么可以有这样的速度?&/b&&br&beanstalkd协议基于ASCII编码运行在tcp上。客户端连接服务器并发送指令和数据,然后等待响应并关闭连接。对于每个连接,服务器按照接收命令的序列依次处理并响应。所有整型值都非负的十进制数,除非有特别声明。&br&&br&&b&场景优势&/b&&br&&br&就像我在分享时提到的,beanstalkd可以做&br&&ul&&li&延时系统,比如延迟20分钟发送短信,******,在投放的时候就设定一定的延迟时间值,让任务在延迟时间到了之后进入ready队列,等待worker预订处理。&/li&&/ul&&img src=&/b2fe6330ac4cee0ace41e47a82bf713f_b.png& data-rawwidth=&818& data-rawheight=&524& class=&origin_image zh-lightbox-thumb& width=&818& data-original=&/b2fe6330ac4cee0ace41e47a82bf713f_r.png&&&br&&ul&&li&轮询系统,如下图,一个被投放的任务,在延迟时间过后需要再检查一遍状态,如果不符合,继续释放(release with delay)为延迟投放状态(DELAYED),直到时间过期之后,再次进入ready队列,被worker预订,进行一些逻辑判断,比如微信银行卡退款是否成功,如果成功,删除该任务,如果没成功,继续释放(release with delay)为延迟投放状态。&/li&&/ul&&img src=&/8add0f140c21e738f26b1ccd8d2fa892_b.png& data-rawwidth=&781& data-rawheight=&496& class=&origin_image zh-lightbox-thumb& width=&781& data-original=&/8add0f140c21e738f26b1ccd8d2fa892_r.png&&&br&而这些,都基本上是RabbitMQ做不到的。&br&&br&&b&分享一个我之前团队的slide:&a href=&///?target=http%3A///s/1dDg2edF& class=& wrap external& target=&_blank& rel=&nofollow noreferrer&&Beanstalkd-share-slides.pdf_免费高速下载&i class=&icon-external&&&/i&&/a&&/b&
之前做过这方面的队列服务,也做过这两者甚至很多队列的对比测试,先介绍下beanstalkd吧, 首先他们都是一种消息队列,都能完成一些处理或者请求时间较长的任务, beantalkd 特有的生产者模型和消费者模型,更再单个队列任务(job)上有以下特点: 任务优先…
你的队列设计不合理,无法覆盖业务需求。&br&高并发高性能的消息处理,如果有顺序依赖的消息,要保证消息有一个hashKey,类似于数据库表分区的的分区key列。保证对同一个key的消息发送到相同的队列。&br&在此例中,A用户产生的消息(包括创建消息和删除消息)都按A的hashKey分发到同一个队列。这样再消费就可以保证业务顺序了。
你的队列设计不合理,无法覆盖业务需求。 高并发高性能的消息处理,如果有顺序依赖的消息,要保证消息有一个hashKey,类似于数据库表分区的的分区key列。保证对同一个key的消息发送到相同的队列。 在此例中,A用户产生的消息(包括创建消息和删除消息)都按…
正如楼上回复的一样,Gearman 偏重任务分发(也有调度),基本工作模式,是由 Client (多个)向 Server 去注册一些函数,当然 Server 接受到任务后,分发到这些函数上。&br&&br&RabbitMQ 则是很多消息模型的实现,比如简单的生产者消费者、发布订阅、广播,甚至可以模拟 RPC 等等,Gearman 的功能,简单的生产者消费者模型就可以实现,只是思路上稍微换一下,需要消费消息来实现,官方也有类似参考例子:&a class=& wrap external& href=&///?target=http%3A///tutorials/tutorial-two-python.html& target=&_blank& rel=&nofollow noreferrer&&RabbitMQ - RabbitMQ tutorial - Work queues &i class=&icon-external&&&/i&&/a&&br&&br&建议看下 RabbitMQ 官方这个例子教程 &a href=&///?target=http%3A///getstarted.html& class=& wrap external& target=&_blank& rel=&nofollow noreferrer&&RabbitMQ - Getting started with RabbitMQ&i class=&icon-external&&&/i&&/a& ,里面包含了几种模型的实例,很容易结合我们现实中场景。&br&&br&至于优缺点,仁者见仁。&br&&br&大概三年前吧,我给一家某行业内比较知名公司优化过 Gearman 到 RabbitMQ 的方案,高峰时候一小时消息量在 x 千万以上,优化前 Gearman Server 负载较重,包括 Client(Worker)也会经常挂掉,很多消息会丢失,后来迁移到 RabbitMQ 上面,Server 只有一台(备份一台基本从来就没用过),Client 采用 ACK 确认消息成功消费掉,后来就没有然后了。。。这里故事不在描述。&br&&br&推荐使用 RabbitMQ,不仅仅是作为消息模型,还可以改善系统架构设计。
正如楼上回复的一样,Gearman 偏重任务分发(也有调度),基本工作模式,是由 Client (多个)向 Server 去注册一些函数,当然 Server 接受到任务后,分发到这些函数上。 RabbitMQ 则是很多消息模型的实现,比如简单的生产者消费者、发布订阅、广播,甚至可…
仔细看了下生产者和消费者的代码,原因就出在生产者这句上:&br&channel.queue_bind(exchange=&messager&,queue=&test4&,routingKey=&info&);&br&&br&这句代码设置了一个绑定:&br&&发往交换机messager的,routingKey为info的所有消息都会被放置在队列test4上&&br&&br&而LZ消费者那句,又会让no_info的消息也发到同样的队列(一对交换机和队列是可以有多个绑定的)&br&&br&所以结果就是routingKey为info或者no_info的消息都会被放置到test4队列&br&&br&而消费者在取消息时,只关注队列,不关心消息是来自哪里的,只要队列中有,它就拿&br&&br&不知道这样解释LZ懂了没?
仔细看了下生产者和消费者的代码,原因就出在生产者这句上: channel.queue_bind(exchange="messager",queue="test4",routingKey="info"); 这句代码设置了一个绑定: "发往交换机messager的,routingKey为info的所有消息都会被放置在队列test4上" 而LZ消费者那…
Celery相当于给你包装了一个现成的系统,让你更加方便的在自己的项目中操作RabbitMQ这个消息队列介质。不然你什么都要自己在RabbitMQ上重新写。最直接的例子就是在Celery Python里,你只需要config一下settings,然后就可以用decorator来轻松使用消息队列了,而不用重新在RabbitMQ上写自己的脚本。
Celery相当于给你包装了一个现成的系统,让你更加方便的在自己的项目中操作RabbitMQ这个消息队列介质。不然你什么都要自己在RabbitMQ上重新写。最直接的例子就是在Celery Python里,你只需要config一下settings,然后就可以用decorator来轻松使用消息队列了…
最近也在学 RabbitMQ,如果有什么错误欢迎指正。&br&&br&&blockquote&首先,ACK机制可以保证消费者如果拿了队列的消息,处理出错了,那么队列中还有这个消息,仍然可以给下个机子来跑。&/blockquote&&br&首先你弄错了 acknowledgment 的目的。acknowledgment 是 consumer 告诉 broker 当前消息是否成功 consume,至于 broker 如何处理 NACK,取决于 consumer 是否设置了 requeue:如果 requeue=False,那么 NACK 后 broker 是会删除消息的。看看 &a href=&///?target=https%3A///confirms.html& class=& wrap external& target=&_blank& rel=&nofollow noreferrer&&RabbitMQ 官方的解释&i class=&icon-external&&&/i&&/a&。Consumer 做一个 ACK,是为了告诉 Broker 这条消息已经被成功处理了(transaction committed)。只要没收到 consumer 的 acknowledgment,broker 就会一直保存着这条消息(但不会 requeue,更不会分配给其他 consumer,直到当前 consumer 发生断开连接之类的异常)。RabbitMQ 之所以是 guaranteed delivery,这是一个关键。换言之,你的 consumer 代码必须能够处理各种异常,确保只要收到一条消息,最终一定能够执行一条 ACK / NACK(当然也没人阻止你设置 no_ack=True,干脆不用 acknowledgment 机制,这个视业务需求而定)。&br&&br&&blockquote&处理消息时出错了,一般不应该让别的 consumer 再去处理这个消息,因为多半还是要出错。&br&&br&但是,个人觉得一般处理消息出错都是因为代码逻辑或者出bug,即使 队列中后来仍然保留该消息,然后再给某一个消费者消费,不还是报错吗?&br&Ps:当然,如果一个机子宕掉,消息还有,还可以给另外的机子用,这种情景下 ACK 是很有用的。但是个人觉得这种应该是少数情况吧。&/blockquote&&br&Consumer 出错的原因远不止是自身的 bug 这么简单:&br&如果我的 consumer 需要验证消息内容,那么不合法的消息会导致 consumer 出错。&br&如果我的 consumer 需要将消息保存到数据库,那么数据库服务器挂了会导致 consumer 出错。&br&如果我的 consumer 要根据消息内容发送一个 HTTP 请求,那么 HTTP 服务挂了会导致 consumer 出错。&br&……&br&对于第 1 个情况,让别的 consumer 再处理一次不合法的消息毫无意义(反正还是要出错)。&br&对于 2、3,我个人的方法是让当前的 consumer 进行 N 次尝试,全部失败之后 NACK 这条消息。&br&&br&&blockquote&另外,不知道能不能不要 ACK 机制,然后把出错的消息存库,方便以后查bug或者重新执行。&/blockquote&&br&RabbitMQ 自带 dead lettering 机制,基本上就是 &a href=&/question//answer/& class=&internal&&天龙的回答&/a&(basic.nack 或者 basic.reject)。我没有用过这个机制,而是手动在 consumer 里面写了一小段 publisher 的逻辑来转发 dead letter 到另一个 queue,这样我可以在消息的 header 里加一些有用的 log 信息,方便我 debug。
最近也在学 RabbitMQ,如果有什么错误欢迎指正。 首先,ACK机制可以保证消费者如果拿了队列的消息,处理出错了,那么队列中还有这个消息,仍然可以给下个机子来跑。 首先你弄错了 acknowledgment 的目的。acknowledgment 是 consumer 告诉 broker 当前消息…
已有帐号?
无法登录?
社交帐号登录}

我要回帖

更多关于 融资租赁是个好工作吗 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信