python
--version
来建议是否安装成功。如果windows提示python不是内部命令的话,可以手动把python的根目录加入到windows的系统设置的系统变量path下面,然后重新启动命令提示符应该就可以了,加入到path下面就是全局可以调用这python的命令;
pip install
模块/包名
来进行安装模块,pip help
可以查看pip的其他的操作命令。
缩进:Python中,四个字符为一个缩进,和其他编程语言如:Java、PHP、JavaScript相比,Python中没有{ }, 使用缩进替代代码块
续行:Python中,尽量一行把代码写完,需要强制换行并且下一行需要连起来的话,在行的末尾使用
字符转义:使用\加上需要转义的字符,和其他语言Java、PHP、JavaScript类似,如: \, \n, \t: \将\当做普通字符对待
标识符:可以理解为就是直接的文件名、变量名、函数名等的名字命名,规则有:
常量:就是保持不变的意思,Python中没有常量,通常使用大写字母表示常量,如INIT_NUM = 1,这样我们就知道这个变量尽量不要去改变它,保持它的不变
变量:我们可以对其赋值,改变它的值,如:height = 1,height变量值为2,height = 2,height变量值变更为2了。
view =》Quick
Documentation
,可以快速的查看这个函数的简单文档(建议使用快捷键,我自己定义的快捷键是F2),当无法理解时,需要详细查看这个函数和用法是,可以到官方文档和各种搜索引擎进行进一步的查找相关资料。
1.通过索引访问,索引从0开始计算(编程语言的索引大多数都是从0开始),如:lst[1],访问第2个元素
2.通过前后索引获取区间数据:这也叫切片,列表变量[起始索引:结束索引],包含起始索引的值,不包含结束索引的值,就是大于等于起始索引,小于结束索引;
如果起始或者结束索引为空,那么就截取到最前面或者最末尾,如:lst[:3],从索引0开始,截取到不包含索引3的数值;lst[:],截取全部;
3.通过index(传入值)方法,传入值参数可以获取对应值的索引
4.通过count(传入值)方法,传入值参数可以获得对应值在列表中出现的次数
5.通过len(传入列表)方法,传入列表可以获得列表的元素个数,也是就列表长度
通过索引修改,如lst[0] = 100,修改lst列表的索引0的元素值为100
3.可以通过extend方法,添加可迭代对象的元素(可迭代可以理解为可循环对象),如:lst.extend(range(10))
1.可以通过remove方法,传入元素值,从左至右找到第一次匹配的元素删除, 且只会删除一个元素,
如:lst.remove(3),删除元素为3的第一个元素,多个3个,可以使用循环删除
2.可以通过pop方法,弹出指定索引的元素,如果不给定索引,弹出末尾元素,如:lst.pop(2),lst.pop()
4.浅拷贝:copy()方法,复制出来的对象,会跟着原始对象数据变化而变化,如:lst.copy()
元祖(Tuple):不可变数据类型
与列表类似,使用()或者tuple()初始化,一旦定义不可修改,可存储任意类型数据
基本使用:CR(创建+读取),无UD(无修改和删除)
与列表也类似,也有不一样的
t = (5) : 这样定义的不是元祖,而是一个数值
1.通过索引访问,索引从0开始计算(编程语言的索引大多数都是从0开始),如:lst[1],访问第2个元素
2.通过前后索引获取区间数据:这也叫切片,列表变量[起始索引:结束索引],包含起始索引的值,不包含结束索引的值,就是大于等于起始索引,小于结束索引;
如果起始或者结束索引为空,那么就截取到最前面或者最末尾,如:lst[:3],从索引0开始,截取到不包含索引3的数值;lst[:],截取全部;
3.通过index(传入值)方法,传入值参数可以获取对应值的索引
4.通过count(传入值)方法,传入值参数可以获得对应值在列表中出现的次数
5.通过len(传入列表)方法,传入列表可以获得列表的元素个数,也是就列表长度
1.可变的,无序列的,不可重复的元素的集合**(重复的元素会被自动去除重复部分)
2.集合元素必须是可以hash类型,也就是必须是hashable,x不可变的数据类型就是可以hashable的
3.所以,数值类型,布尔类型,字符串类型,bytes,元组,None都可以作为集合的元素数据类型
循环迭代读取:由于集合是无序的,无法使用index索引查询,只能迭代
add方法:如:old_set.add(1),添加重复的数据会自动去重的哦
直接把数据赋值给key索引的字典,如:
keys():获得所有字典键
遍历:使用for in遍历,如下:
defaultdict:使用基本类型的字典,如果访问一个不存在的key的时候,就会报错,
为了避免报错,我们可以使用collections的defaultdict,这样访问的时候,就不会报错了,并且返回默认值,如:
字节(bytes):不可变类型,有序
Python3新增,用于对二进制数据的处理
1. 与字符串类似,字符串是有编码的,编码不对就出现文字变乱,可以使用utf-8、gbk等编码,这样就避免乱码;
而字节bytes是没有编码的,不会出现乱码,所以,在某些场景下可以使用字节来进行数据传输而确保没有乱码。
在socket通讯中,数据的传输就是使用bytes的;
2. 字符串和字节都可以互相转换,字符串.encode()方法就可以转换为字节,
字节.decode()方法就可以还原为字符串;
在上一篇文章中,我们梳理了Python中的基本数据类型,也就是Python自带的数据类型,而时间这个数据类型,来自Python的标准库time,所以,需要导入才可以使用。Python需要做时间相关的处理时,我们就使用这个time标准库来实现;
time.time()
,在实际应用中,我们经常存储到数据库里面可以存储当前时间戳的整数来方便后续调用,比如:写一篇文章,我要记录发布时间,就把这个发布时间的时间戳整数记录下来,存储整数比字符串程序访问速度更快。取整可以使用:int(
有了时间戳,可以通过localtime方法格式化时间戳为本地的时间,得到当前时间的元组(包括年月日时分秒等),如:time.localtime()
通过localtime方法获得了元组后,可以通过asctime方法将元组转变为字符串(欧美国家的时间格式)
通过localtime方法获得了元组后,还可以通过strftime方法格式化字符串,接收以时间元组,并返回以可读字符串表示的当地时间,格式由参数format决定,这个灵活性就比较强了,如:time.strftime("%Y-%m-%d
可以通过strptime方法根据指定的格式把一个时间字符串解析为时间元组
可以通过sleep方法推迟调用线程的运行,可通过参数secs指秒数,表示进程挂起的时间
更多time方法参考文档:
datetime模块,time模块的增强版,功能更多更强大
python中的datetime模块提供了操作日期和时间功能,该模块提供的五种核心对象分别是:datetime(时间日期类型), date(日期类型), time(时间类 型), tzinfo(时区类型),timedelta(时间差类型)
可以通过timestamp方法计算时间戳
使用类似time模块,更多请参考文档:
消息队列有很多使用场景,比较常见的有3个:解耦、异步、削峰。
所谓生产者-消费者问题,实际上主要是包含了两类线程。一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库。生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为。而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。但是,这个共享数据区域中应该具备这样的线程间并发协作的功能:
在Java语言中,实现生产者消费者问题时,可以采用三种方式:
在生产中经常会有一些类似报表系统这样的系统,需要做 MySQL 的 binlog 同步。比如订单系统要同步订单表的数据到大数据部门的 MySQL 库中用于报表统计分析,通常的做法是基于 Canal 这样的中间件去监听订单数据库的 binlog,然后把这些 binlog 发送到 MQ 中,再由消费者从 MQ 中获取 binlog 落地到大数据部门的 MySQL 中。
在这个过程中,可能会有对某个订单的增删改操作,比如有三条 binlog 执行顺序是增加、修改、删除。消费者愣是换了顺序给执行成删除、修改、增加,这样能行吗?肯定是不行的。不同的消息队列产品,产生消息错乱的原因,以及解决方案是不同的。下面我们以RabbitMQ、Kafka、RocketMQ为例,来说明保证顺序消费的办法。
对于 RabbitMQ 来说,导致上面顺序错乱的原因通常是消费者是集群部署,不同的消费者消费到了同一订单的不同的消息。如消费者A执行了增加,消费者B执行了修改,消费者C执行了删除,但是消费者C执行比消费者B快,消费者B又比消费者A快,就会导致消费 binlog 执行到数据库的时候顺序错乱,本该顺序是增加、修改、删除,变成了删除、修改、增加。如下图:
RabbitMQ 的问题是由于不同的消息都发送到了同一个 queue 中,多个消费者都消费同一个 queue 的消息。解决这个问题,我们可以给 RabbitMQ 创建多个 queue,每个消费者固定消费一个 queue 的消息,生产者发送消息的时候,同一个订单号的消息发送到同一个 queue 中,由于同一个 queue 的消息是一定会保证有序的,那么同一个订单号的消息就只会被一个消费者顺序消费,从而保证了消息的顺序性。如下图:
对于 Kafka 来说,一个 topic 下同一个 partition 中的消息肯定是有序的,生产者在写的时候可以指定一个 key,通过我们会用订单号作为 key,这个 key 对应的消息都会发送到同一个 partition 中,所以消费者消费到的消息也一定是有序的。
那么为什么 Kafka 还会存在消息错乱的问题呢?问题就出在消费者身上。通常我们消费到同一个 key 的多条消息后,会使用多线程技术去并发处理来提高消息处理速度,否则一条消息的处理需要耗时几十 毫秒,1 秒也就只能处理几十条消息,吞吐量就太低了。而多线程并发处理的话,binlog 执行到数据库的时候就不一定还是原来的顺序了。如下图:
Kafka 从生产者到消费者消费消息这一整个过程其实都是可以保证有序的,导致最终乱序是由于消费者端需要使用多线程并发处理消息来提高吞吐量,比如消费者消费到了消息以后,开启 32 个线程处理消息,每个线程线程处理消息的快慢是不一致的,所以才会导致最终消息有可能不一致。
所以对于 Kafka 的消息顺序性保证,其实我们只需要保证同一个订单号的消息只被同一个线程处理的就可以了。由此我们可以在线程处理前增加个内存队列,每个线程只负责处理其中一个内存队列的消息,同一个订单号的消息发送到同一个内存队列中即可。如下图:
但是当消费者有多台机器的时候,会组成一个 Consumer Group,Consumer Group 中的每台机器都会负责消费一部分 MessageQueue 的消息,所以可能消费者A消费了 MessageQueue1 的消息执行增加操作,消费者B消费了 MessageQueue2 的消息执行修改操作,消费者C消费了 MessageQueue3 的消息执行删除操作,但是此时消费 binlog 执行到数据库的时候就不一定是消费者A先执行了,有可能消费者C先执行删除操作,因为几台消费者是并行执行,是不能够保证他们之间的执行顺序的。如下图:
要解决 RocketMQ 的乱序问题,我们只需要想办法让同一个订单的 binlog 进入到同一个 MessageQueue 中就可以了。因为同一个 MessageQueue 内的消息是一定有序的,一个 MessageQueue 中的消息只能交给一个 Consumer 来进行处理,所以 Consumer 消费的时候就一定会是有序的。
丢数据一般分为两种,一种是mq把消息丢了,一种就是消费时将消息丢了。下面从rabbitmq和kafka分别说一下,丢失数据的场景。
RabbitMQ丢失消息分为如下几种情况:
生产者将数据发送到RabbitMQ的时候,可能在传输过程中因为网络等问题而将数据弄丢了。
如果没有开启RabbitMQ的持久化,那么RabbitMQ一旦重启数据就丢了。所以必须开启持久化将消息持久化到磁盘,这样就算RabbitMQ挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,RabbitMQ还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。
主要是因为消费者消费时,刚消费到还没有处理,结果消费者就挂了,这样你重启之后,RabbitMQ就认为你已经消费过了,然后就丢了数据。
针对上述三种情况,RabbitMQ可以采用如下方式避免消息丢失:
事务机制是同步的,你提交了一个事物之后会阻塞住,但是confirm机制是异步的,发送消息之后可以接着发送下一个消息,然后RabbitMQ会回调告知成功与否。 一般在生产者这块避免丢失,都是用confirm机制。
设置消息持久化到磁盘,设置持久化有两个步骤:
而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前RabbitMQ挂了,数据丢了,生产者收不到ack回调也会进行消息重发。
使用RabbitMQ提供的ack机制,首先关闭RabbitMQ的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。
Kafka丢失消息分为如下几种情况:
生产者没有设置相应的策略,发送过程中丢失数据。
Kafka自己丢消息:
比较常见的一个场景,就是Kafka的某个broker宕机了,然后重新选举partition的leader时。如果此时follower还没来得及同步数据,leader就挂了,然后某个follower成为了leader,它就少了一部分数据。
消费者消费到了这个数据,然后消费之自动提交了offset,让Kafka知道你已经消费了这个消息,当你准备处理这个消息时,自己挂掉了,那么这条消息就丢了。
针对上述三种情况,Kafka可以采用如下方式避免消息丢失:
关闭自动提交offset,在自己处理完毕之后手动提交offset,这样就不会丢失数据。
Kafka自己丢消息:
一般要求设置4个参数来保证消息不丢失:
min.isync.replicas
参数,这个值必须大于1,表示 要求一个leader至少感知到有至少一个follower在跟自己保持联系正常同步数据,这样才能保证leader挂了之后还有一个follower。
acks=all
,表示 要求每条每条数据,必须是写入所有replica副本之后,才能认为是写入成功了。
retries=MAX
(很大的一个值),表示这个是要求一旦写入事变,就无限重试。
如果按照上面设置了ack=all,则一定不会丢失数据,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
数据,大数据量的数据处理上。
RabbitMQ:以broker为中心,有消息的确认机制。
Kafka:以consumer为中心,没有消息的确认机制。
RabbitMQ:支持消息的可靠的传递,支持事务,不支持批量操作,基于存储的可靠性的要求存储可以采用内存或硬盘,吞吐量小。
Kafka:内部采用消息的批量处理,数据的存储和获取是本地磁盘顺序批量操作,消息处理的效率高,吞吐量高。
先大概说一说可能会有哪些重复消费的问题。首先就是比如rabbitmq、rocketmq、kafka,都有可能会出现消费重复消费的问题,正常。因为这问题通常不是mq自己保证的,是给你保证的。然后我们挑一个kafka来举个例子,说说怎么重复消费吧。
kafka实际上有个offset的概念,就是每个消息写进去,都有一个offset,代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的offset来继续消费吧。
但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接kill进程了,再重启。这会导致consumer有些消息处理了,但是没来得及提交offset,尴尬了。重启之后,少数消息会再次消费一次。
其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。举个例子,假设你有个系统,消费一条往数据库里插入一条,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下已经消费过了,直接扔了,不就保留了一条数据?
一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性幂等性。通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。
想要保证不重复消费,其实还要结合业务来思考,这里给几个思路:
还有比如基于数据库的唯一键来保证重复数据不会重复插入多条,我们之前线上系统就有这个问题,就是拿到数据的时候,每次重启可能会有重复,因为kafka消费者还没来得及提交offset,重复数据拿到了以后我们插入的时候,因为有唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据。
一般生产环境中,都会在使用MQ的时候设计两个队列:一个是核心业务队列,一个是死信队列。核心业务队列,就是比如专门用来让订单系统发送订单消息的,然后另外一个死信队列就是用来处理异常情况的。
比如说要是第三方物流系统故障了,此时无法请求,那么仓储系统每次消费到一条订单消息,尝试通知发货和配送,都会遇到对方的接口报错。此时仓储系统就可以把这条消息拒绝访问,或者标志位处理失败!注意,这个步骤很重要。
一旦标志这条消息处理失败了之后,MQ就会把这条消息转入提前设置好的一个死信队列中。然后你会看到的就是,在第三方物流系统故障期间,所有订单消息全部处理失败,全部会转入死信队列。然后你的仓储系统得专门有一个后台线程,监控第三方物流系统是否正常,能否请求的,不停的监视。一旦发现对方恢复正常,这个后台线程就从死信队列消费出来处理失败的订单,重新执行发货和配送的通知逻辑。死信队列的使用,其实就是MQ在生产实践中非常重要的一环,也就是架构设计必须要考虑的。
整个过程,如下图所示:
推模式是服务器端根据用户需要,由目的、按时将用户感兴趣的信息主动发送到用户的客户端。
拉模式是客户端主动从服务器端获取信息。
在实际生产应用中,通常会使用Kafka作为消息传输的数据管道,RabbitMQ作为交易数据作为数据传输管道,主要的取舍因素则是是否存在丢数据的可能。RabbitMQ在金融场景中经常使用,具有较高的严谨性,数据丢失的可能性更小,同事具备更高的实时性。而Kafka优势主要体现在吞吐量上,虽然可以通过策略实现数据不丢失,但从严谨性角度来讲,大不如RabbitMQ。而且由于Kafka保证每条消息最少送达一次,有较小的概率会出现数据重复发送的情况。详细来说,它们之间主要有如下的区别:
RabbitMQ:用于实时的,对可靠性要求较高的消息传递上。
Kafka:用于处于活跃的流式数据,大数据量的数据处理上。
RabbitMQ:以broker为中心,有消息的确认机制。
Kafka:以consumer为中心,没有消息的确认机制。
RabbitMQ:支持消息的可靠的传递,支持事务,不支持批量操作,基于存储的可靠性的要求存储可以采用内存或硬盘,吞吐量小。
Kafka:内部采用消息的批量处理,数据的存储和获取是本地磁盘顺序批量操作,消息处理的效率高,吞吐量高。
Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间,但是实际上,Kafka的特性之一就是高吞吐率。即使是普通的服务器,Kafka也可以轻松支持每秒百万级的写入请求,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。
下面从数据写入和读取两方面分析,为什么Kafka速度这么快:
Kafka会把收到的消息都写入到硬盘中,它绝对不会丢失数据。为了优化写入速度Kafka采用了两个技术,顺序写入和MMFile 。
磁盘读写的快慢取决于你怎么使用它,也就是顺序读写或者随机读写。在顺序读写的情况下,磁盘的顺序读写速度和内存持平。因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最讨厌随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。
而且Linux对于磁盘的读写优化也比较多,包括read-ahead和write-behind,磁盘缓存等。如果在内存做这些操作的时候,一个是JAVA对象的内存开销很大,另一个是随着堆内存数据的增多,JAVA的GC时间会变得很长,使用磁盘操作有以下几个好处:
下图就展示了Kafka是如何写入数据的, 每一个Partition其实都是一个文件 ,收到消息后Kafka会把数据插入到文件末尾(虚框部分):
这种方法有一个缺陷——没有办法删除数据 ,所以Kafka是不会删除数据的,它会把所有的数据都保留下来,每个消费者(Consumer)对每个Topic都有一个offset用来表示读取到了第几条数据 。
即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以Kafka的数据并不是实时的写入硬盘 ,它充分利用了现代操作系统分页存储来利用内存提高I/O效率。Memory Mapped Files(后面简称mmap)也被翻译成 内存映射文件,在64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。
通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小有虚拟内存为我们兜底。使用这种方式可以获取很大的I/O提升,省去了用户空间到内核空间复制的开销(调用文件的read会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中。)
但也有一个很明显的缺陷——不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。Kafka提供了一个参数——producer.type来控制是不是主动flush,如果Kafka写入到mmap之后就立即flush然后再返回Producer叫 同步 (sync);写入mmap之后立即返回Producer不调用flush叫异步
传统模式下,当需要对一个文件进行传输的时候,其具体流程细节如下:
以上细节是传统read/write方式进行网络文件传输的方式,我们可以看到,在这个过程当中,文件数据实际上是经过了四次copy操作:硬盘->内核buf->用户buf->socket相关缓冲区->协议引擎。而sendfile系统调用则提供了一种减少以上多次copy,提升文件传输性能的方法。
在内核版本2.1中,引入了sendfile系统调用,以简化网络上和两个本地文件之间的数据传输。sendfile的引入不仅减少了数据复制,还减少了上下文切换。运行流程如下:
相较传统read/write方式,2.1版本内核引进的sendfile已经减少了内核缓冲区到user缓冲区,再由user缓冲区到socket相关缓冲区的文件copy,而在内核版本2.4之后,文件描述符结果被改变,sendfile实现了更简单的方式,再次减少了一次copy操作。
在Apache、Nginx、lighttpd等web服务器当中,都有一项sendfile相关的配置,使用sendfile可以大幅提升文件传输性能。Kafka把所有的消息都存放在一个一个的文件中,当消费者需要数据的时候Kafka直接把文件发送给消费者,配合mmap作为文件读写方式,直接把它传给sendfile。
在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。
Kafka速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优。读取数据的时候配合sendfile直接暴力输出。
RabbitMQ可能丢失消息分为如下几种情况:
生产者将数据发送到RabbitMQ的时候,可能在传输过程中因为网络等问题而将数据弄丢了。
如果没有开启RabbitMQ的持久化,那么RabbitMQ一旦重启数据就丢了。所以必须开启持久化将消息持久化到磁盘,这样就算RabbitMQ挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,RabbitMQ还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。
主要是因为消费者消费时,刚消费到还没有处理,结果消费者就挂了,这样你重启之后,RabbitMQ就认为你已经消费过了,然后就丢了数据。
针对上述三种情况,RabbitMQ可以采用如下方式避免消息丢失:
事务机制是同步的,你提交了一个事物之后会阻塞住,但是confirm机制是异步的,发送消息之后可以接着发送下一个消息,然后RabbitMQ会回调告知成功与否。 一般在生产者这块避免丢失,都是用confirm机制。
设置消息持久化到磁盘,设置持久化有两个步骤:
而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前RabbitMQ挂了,数据丢了,生产者收不到ack回调也会进行消息重发。
息,然后RabbitMQ会回调告知成功与否。 一般在生产者这块避免丢失,都是用confirm机制。
设置消息持久化到磁盘,设置持久化有两个步骤:
而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前RabbitMQ挂了,数据丢了,生产者收不到ack回调也会进行消息重发。
使用RabbitMQ提供的ack机制,首先关闭RabbitMQ的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。
聚合全网技术文章,根据你的阅读喜好进行个性推荐
深圳市奥思网络科技有限公司版权所有
版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。