编写程序对内存中的数组OrgArray中的42个字数据按顺序进行分类?

  • 考虑主流和趋势,我们的Python学习主要基于Python3.x+
  • Python核心使用领域和发展方向:爬虫、数据科学、人工智能、运维自动化、云计算、Web技术、游戏开发等,应用十分广泛,是我们国家人工智能战略的第一编程语言,连中小学生都在学习;Python至今已经流行了30多年了,还在不断焕发生机和活力,学会Python语言可以让你可以用20年不在话下!
  • Python是一门高级编程语言,需要Python解释器编译成字节码然后在对应的虚拟机上面运行,与JAVA类似;
  • Python是一种面向对象、解释型、动态类型计算机程序设计语言, 程序无需编译成二进制代码,而是在执行时对语句一条一条编译,在程序执行过程中,可以改变变量的类型。
  • Python解释器:理解就是Python代码的解释执行环境,我们写好了Python代码,里面的print()函数、if语句、数据计算等都是一堆纯文本,如果直接打开就会直接显示的一堆的纯文字给你,为了让这些字符串可以运行,就需要Python解释器解析这些文本字符,解析成功后,计算机就会执行相应的对应的操作,比如:我们写print('hello'),Python解释器解析后,就会输出hello到电脑屏幕上面,就相当于我们写代码然后,计算机读取代码了解了我们的想法,就让计算机来干活;
  • 安装Python解释器:到官网()下载对应的下载安装即可,在终端(windows下的命令提示符)通过python --version来建议是否安装成功。如果windows提示python不是内部命令的话,可以手动把python的根目录加入到windows的系统设置的系统变量path下面,然后重新启动命令提示符应该就可以了,加入到path下面就是全局可以调用这python的命令;
  • pip包管理器:python最强大的其中一个地方是:全世界程序员无私免费开源了数以万计的Python功能模块,我们可以很方便的使用这些模块,而这些模块的安装、删除、维护等管理操作就是通过pip这个包管理器来管理的,安装python解释器的时候自带pip包管理器,pip的作用就是管理第三方的模块,通过命令:pip install 模块/包名来进行安装模块,pip help可以查看pip的其他的操作命令。
  • 我们使用的开发IDE主要是:PyCharm,官网下载:,编写好的.py的文件可以直接在PyCharm中直接运行,当然也可以使用python终端运行,只是终端运行操作稍显麻烦;

Day 4:编写Python第一个程序和官方文档介绍

缩进:Python中,四个字符为一个缩进,和其他编程语言如:Java、PHP、JavaScript相比,Python中没有{ }, 使用缩进替代代码块
续行:Python中,尽量一行把代码写完,需要强制换行并且下一行需要连起来的话,在行的末尾使用
字符转义:使用\加上需要转义的字符,和其他语言Java、PHP、JavaScript类似,如: \, \n, \t: \将\当做普通字符对待
标识符:可以理解为就是直接的文件名、变量名、函数名等的名字命名,规则有:

  1. 只能以字母或下划线开头
  2. 只能是字母、下划线、数字
  3. 不能使用Python中的关键字

常量:就是保持不变的意思,Python中没有常量,通常使用大写字母表示常量,如INIT_NUM = 1,这样我们就知道这个变量尽量不要去改变它,保持它的不变
变量:我们可以对其赋值,改变它的值,如:height = 1,height变量值为2,height = 2,height变量值变更为2了。

  • 建立第一个Python程序文档:使用PyCharm创建一个Python的项目,选择好保存路径,保存。PyCharm项目可以方便管理文件、提高效率。建立好之后,在PyCharm的项目文件夹下右键新建Python File文件(new => Python File),命名保存,这个就是Python程序文件,以.py结束。在里面输入:print('hello'),右键执行run,即可看到输出,这样第一个Python文档就建立起来啦!
  • 官方文档:我们不需要、也记不住每一个Python语法、函数等的参数、使用说明等,这个时候就需要查询一下官方的文档使用说明,通过文档我们就可以知道某个函数如何使用了,学会看文档,是每一个开发人员必备的最基本技能,官方文档地址:。在PyCharm中,光标移到某个函数上面然后执行:view =》Quick Documentation,可以快速的查看这个函数的简单文档(建议使用快捷键,我自己定义的快捷键是F2),当无法理解时,需要详细查看这个函数和用法是,可以到官方文档和各种搜索引擎进行进一步的查找相关资料。

Day 5 - 12:Python中的8种基本数据类型(内置数据类型)

  • 不可变类型(4 个):Number(数字)、String(字符串)、Tuple(元组)、bytes(字节)
  • 可变类型(4 个):List(列表)、Dictionary(字典)、Set(集合)、bytearray(字节数组)
  • 可变数据类型都可以CURD(增删改查),不可变类型是固定的,不能修改和删除,对不可变的数据类型的变量进行重新赋值是新建一个同名的变量,而不是修改这个变量的值哦(也就是在内存中新建一个变量存储空间,会增加了内存开销);不可变类型数据访问速度要比可变类型要快。所以,需要对数据进行频繁修改的,那么就使用可变数据类型,仅仅读取数据的,那么就使用不可变数据类型。

数值(Number):是不可变数据类型

  • 整型理解为整数,浮点型就理解为带小数点的,都可以有正负值;布尔值对应的真的关键字是True,值为1;假的关键字为False,对应值为0,可以和其他数字进行运算。数值类型的最常用的就是+-*/ ** %等算术运算;

列表(List):是可变数据类型

  • 使用最频繁,使用[]或list()初始化,它是可变的,包括长度和元素都可以变化,也就是初始化后还可以对里面的元素进行更改,里面元素可以放任意类型数据。
  • 基本使用:(CURD(意思是:create创建【增】、read读取【查】、update修改【改】、delete删除【删】,各种对象最常用的操作))
1.通过索引访问索引从0开始计算编程语言的索引大多数都是从0开始),lst[1]访问第2个元素
2.通过前后索引获取区间数据这也叫切片列表变量[起始索引:结束索引]包含起始索引的值不包含结束索引的值就是大于等于起始索引小于结束索引
如果起始或者结束索引为空那么就截取到最前面或者最末尾lst[:3]从索引0开始截取到不包含索引3的数值lst[:]截取全部
3.通过index(传入值)方法传入值参数可以获取对应值的索引
4.通过count(传入值)方法传入值参数可以获得对应值在列表中出现的次数
5.通过len(传入列表)方法传入列表可以获得列表的元素个数也是就列表长度
通过索引修改如lst[0] = 100修改lst列表的索引0的元素值为100
 
3.可以通过extend方法添加可迭代对象的元素可迭代可以理解为可循环对象),lst.extend(range(10))
1.可以通过remove方法传入元素值从左至右找到第一次匹配的元素删除, 且只会删除一个元素
lst.remove(3)删除元素为3的第一个元素多个3个可以使用循环删除
2.可以通过pop方法弹出指定索引的元素如果不给定索引弹出末尾元素lst.pop(2)lst.pop()
 
4.浅拷贝copy()方法复制出来的对象会跟着原始对象数据变化而变化lst.copy()

元祖(Tuple):不可变数据类型

  • 与列表类似,使用()或者tuple()初始化,一旦定义不可修改,可存储任意类型数据
  • 基本使用:CR(创建+读取),无UD(无修改和删除)
与列表也类似也有不一样的
t = (5)  这样定义的不是元祖而是一个数值
 
  • 与列表的访问基本一致:
1.通过索引访问索引从0开始计算编程语言的索引大多数都是从0开始),lst[1]访问第2个元素
2.通过前后索引获取区间数据这也叫切片列表变量[起始索引:结束索引]包含起始索引的值不包含结束索引的值就是大于等于起始索引小于结束索引
如果起始或者结束索引为空那么就截取到最前面或者最末尾lst[:3]从索引0开始截取到不包含索引3的数值lst[:]截取全部
3.通过index(传入值)方法传入值参数可以获取对应值的索引
4.通过count(传入值)方法传入值参数可以获得对应值在列表中出现的次数
5.通过len(传入列表)方法传入列表可以获得列表的元素个数也是就列表长度
  • 命名元祖:理解就是给元祖加上一个名字,它需要使用collections的namedtuple模块,如:
使用命名元祖可以在当有多个元祖的时候我们可以通过元祖的名字来区分某个元祖

字符串(String):不可变数据类型

  • 不可变对象、可循环迭代对象,是Unicode编码,是Python编程中最常用的数据类型
  • 基本使用:CR(创建+读取),无UD(无修改和删除)
单引号内可以包含以内容输出双引号双引号内也可以包含单引号 str = 'x"k"d'三引号用于换行写很长的字符串而不需要转义符 一旦定义后不可改变对其字符串变量操作的修改等都会生成新的字符串变量内存空间
字符串方法很多学会几个常用的做法后可以先大概过一遍文档里的字符串方法知道有哪些功能以后需要用到再翻文档参考即可

集合(Set):可变数据类型,无序

1.可变的无序列的不可重复的元素的集合**重复的元素会被自动去除重复部分
2.集合元素必须是可以hash类型也就是必须是hashablex不可变的数据类型就是可以hashable的
3.所以数值类型布尔类型字符串类型bytes元组None都可以作为集合的元素数据类型
  • 基本使用:(CURD(意思是:create创建、read读取、update修改、delete删除,各种对象最常用的操作))
{}使用{}的时候元素不能为空否则就是定义字典空的集合使用set()定义s5
循环迭代读取由于集合是无序的无法使用index索引查询只能迭代
 
add方法old_set.add(1)添加重复的数据会自动去重的哦
pop方法随机删除不能指定索引一次删除一个而列表没有传入索引是从后面往前删除的old_set.pop()

字典(Dict):可变数据类型,无序

  • 可变的,无序的,key不可重复的key-value键值对集合,key必须是可哈希(hash)的数据类型,也就是不可变数据类型,如:数值类型,布尔类型,字符串类型,bytes,元组,None,这个与集合的元素的数据类型要求类似哦
  • 基本使用:(CURD(意思是:create创建、read读取、update修改、delete删除,各种对象最常用的操作))
d = {} 要注意和集合区分哦
通过get()方法访问如下
直接把数据赋值给key索引的字典
 
keys()获得所有字典键
遍历使用for in遍历如下
 
defaultdict使用基本类型的字典如果访问一个不存在的key的时候就会报错
为了避免报错我们可以使用collections的defaultdict这样访问的时候就不会报错了并且返回默认值
 

字节(bytes):不可变类型,有序

Python3新增,用于对二进制数据的处理

1. 与字符串类似字符串是有编码的编码不对就出现文字变乱可以使用utf-8gbk等编码这样就避免乱码
而字节bytes是没有编码的不会出现乱码所以在某些场景下可以使用字节来进行数据传输而确保没有乱码
在socket通讯中数据的传输就是使用bytes的
2. 字符串和字节都可以互相转换字符串.encode()方法就可以转换为字节
字节.decode()方法就可以还原为字符串

字节数组(bytearray):可变类型,有序

  • 是字节bytes的可变类型,用于频繁修改数据的场景,如:视频或者图片识别处理中,需要不断的存储更新图像数据字节,如果使用bytes就会不断的创建新的内存空间,消耗大量资源,但是使用bytearry就开辟一个内存空间就够了,可以节省大量资源。

Day14:时间数据类型

在上一篇文章中,我们梳理了Python中的基本数据类型,也就是Python自带的数据类型,而时间这个数据类型,来自Python的标准库time,所以,需要导入才可以使用。Python需要做时间相关的处理时,我们就使用这个time标准库来实现;

  • 时间戳:就是从格林威治时间(简称GMT,但目前我们开发中基本都是用世界标准时间UTC,UTC=GMT)1970年01月01日0时0分0秒(北京时间1970年01月01日08时00分00秒,北京时间比UTC时间晚8小时),到所给定日期时间的总秒数(浮点型),如果没有指定日期,就会获得到当前时间的时间戳,获取当前时间戳的语法为:time.time(),在实际应用中,我们经常存储到数据库里面可以存储当前时间戳的整数来方便后续调用,比如:写一篇文章,我要记录发布时间,就把这个发布时间的时间戳整数记录下来,存储整数比字符串程序访问速度更快。取整可以使用:int(
  • 有了时间戳,可以通过localtime方法格式化时间戳为本地的时间,得到当前时间的元组(包括年月日时分秒等),如:time.localtime()
  • 通过localtime方法获得了元组后,可以通过asctime方法将元组转变为字符串(欧美国家的时间格式)
  • 通过localtime方法获得了元组后,还可以通过strftime方法格式化字符串,接收以时间元组,并返回以可读字符串表示的当地时间,格式由参数format决定,这个灵活性就比较强了,如:time.strftime("%Y-%m-%d
  • 可以通过strptime方法根据指定的格式把一个时间字符串解析为时间元组
  • 可以通过sleep方法推迟调用线程的运行,可通过参数secs指秒数,表示进程挂起的时间
  • 更多time方法参考文档:

datetime模块,time模块的增强版,功能更多更强大

  • python中的datetime模块提供了操作日期和时间功能,该模块提供的五种核心对象分别是:datetime(时间日期类型), date(日期类型), time(时间类 型), tzinfo(时区类型),timedelta(时间差类型)
  • 可以通过timestamp方法计算时间戳
  • 使用类似time模块,更多请参考文档:
}

消息队列有很多使用场景,比较常见的有3个:解耦、异步、削峰。

  1. 解耦:传统的软件开发模式,各个模块之间相互调用,数据共享,每个模块都要时刻关注其他模块的是否更改或者是否挂掉等等,使用消息队列,可以避免模块之间直接调用,将所需共享的数据放在消息队列中,对于新增业务模块,只要对该类消息感兴趣,即可订阅该类消息,对原有系统和业务没有任何影响,降低了系统各个模块的耦合度,提高了系统的可扩展性。
  2. 异步:消息队列提供了异步处理机制,在很多时候应用不想也不需要立即处理消息,允许应用把一些消息放入消息中间件中,并不立即处理它,在之后需要的时候再慢慢处理。
  3. 削峰:在访问量骤增的场景下,需要保证应用系统的平稳性,但是这样突发流量并不常见,如果以这类峰值的标准而投放资源的话,那无疑是巨大的浪费。使用消息队列能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。消息队列的容量可以配置的很大,如果采用磁盘存储消息,则几乎等于“无限”容量,这样一来,高峰期的消息可以被积压起来,在随后的时间内进行平滑的处理完成,而不至于让系统短时间内无法承载而导致崩溃。在电商网站的秒杀抢购这种突发性流量很强的业务场景中,消息队列的强大缓冲能力可以很好的起到削峰作用。

2.2 说一说生产者与消费者模式

所谓生产者-消费者问题,实际上主要是包含了两类线程。一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库。生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为。而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。但是,这个共享数据区域中应该具备这样的线程间并发协作的功能:

  1. 如果共享数据区已满的话,阻塞生产者继续生产数据放置入内;
  2. 如果共享数据区为空的话,阻塞消费者继续消费数据。

在Java语言中,实现生产者消费者问题时,可以采用三种方式:

2.3 消息队列如何保证顺序消费?

在生产中经常会有一些类似报表系统这样的系统,需要做 MySQL 的 binlog 同步。比如订单系统要同步订单表的数据到大数据部门的 MySQL 库中用于报表统计分析,通常的做法是基于 Canal 这样的中间件去监听订单数据库的 binlog,然后把这些 binlog 发送到 MQ 中,再由消费者从 MQ 中获取 binlog 落地到大数据部门的 MySQL 中。

在这个过程中,可能会有对某个订单的增删改操作,比如有三条 binlog 执行顺序是增加、修改、删除。消费者愣是换了顺序给执行成删除、修改、增加,这样能行吗?肯定是不行的。不同的消息队列产品,产生消息错乱的原因,以及解决方案是不同的。下面我们以RabbitMQ、Kafka、RocketMQ为例,来说明保证顺序消费的办法。

对于 RabbitMQ 来说,导致上面顺序错乱的原因通常是消费者是集群部署,不同的消费者消费到了同一订单的不同的消息。如消费者A执行了增加,消费者B执行了修改,消费者C执行了删除,但是消费者C执行比消费者B快,消费者B又比消费者A快,就会导致消费 binlog 执行到数据库的时候顺序错乱,本该顺序是增加、修改、删除,变成了删除、修改、增加。如下图:

RabbitMQ 的问题是由于不同的消息都发送到了同一个 queue 中,多个消费者都消费同一个 queue 的消息。解决这个问题,我们可以给 RabbitMQ 创建多个 queue,每个消费者固定消费一个 queue 的消息,生产者发送消息的时候,同一个订单号的消息发送到同一个 queue 中,由于同一个 queue 的消息是一定会保证有序的,那么同一个订单号的消息就只会被一个消费者顺序消费,从而保证了消息的顺序性。如下图:

对于 Kafka 来说,一个 topic 下同一个 partition 中的消息肯定是有序的,生产者在写的时候可以指定一个 key,通过我们会用订单号作为 key,这个 key 对应的消息都会发送到同一个 partition 中,所以消费者消费到的消息也一定是有序的。

那么为什么 Kafka 还会存在消息错乱的问题呢?问题就出在消费者身上。通常我们消费到同一个 key 的多条消息后,会使用多线程技术去并发处理来提高消息处理速度,否则一条消息的处理需要耗时几十 毫秒,1 秒也就只能处理几十条消息,吞吐量就太低了。而多线程并发处理的话,binlog 执行到数据库的时候就不一定还是原来的顺序了。如下图:

Kafka 从生产者到消费者消费消息这一整个过程其实都是可以保证有序的,导致最终乱序是由于消费者端需要使用多线程并发处理消息来提高吞吐量,比如消费者消费到了消息以后,开启 32 个线程处理消息,每个线程线程处理消息的快慢是不一致的,所以才会导致最终消息有可能不一致。

所以对于 Kafka 的消息顺序性保证,其实我们只需要保证同一个订单号的消息只被同一个线程处理的就可以了。由此我们可以在线程处理前增加个内存队列,每个线程只负责处理其中一个内存队列的消息,同一个订单号的消息发送到同一个内存队列中即可。如下图:

但是当消费者有多台机器的时候,会组成一个 Consumer Group,Consumer Group 中的每台机器都会负责消费一部分 MessageQueue 的消息,所以可能消费者A消费了 MessageQueue1 的消息执行增加操作,消费者B消费了 MessageQueue2 的消息执行修改操作,消费者C消费了 MessageQueue3 的消息执行删除操作,但是此时消费 binlog 执行到数据库的时候就不一定是消费者A先执行了,有可能消费者C先执行删除操作,因为几台消费者是并行执行,是不能够保证他们之间的执行顺序的。如下图:

要解决 RocketMQ 的乱序问题,我们只需要想办法让同一个订单的 binlog 进入到同一个 MessageQueue 中就可以了。因为同一个 MessageQueue 内的消息是一定有序的,一个 MessageQueue 中的消息只能交给一个 Consumer 来进行处理,所以 Consumer 消费的时候就一定会是有序的。

2.4 消息队列如何保证消息不丢?

丢数据一般分为两种,一种是mq把消息丢了,一种就是消费时将消息丢了。下面从rabbitmq和kafka分别说一下,丢失数据的场景。

RabbitMQ丢失消息分为如下几种情况:

  1. 生产者将数据发送到RabbitMQ的时候,可能在传输过程中因为网络等问题而将数据弄丢了。

  2. 如果没有开启RabbitMQ的持久化,那么RabbitMQ一旦重启数据就丢了。所以必须开启持久化将消息持久化到磁盘,这样就算RabbitMQ挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,RabbitMQ还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。

  3. 主要是因为消费者消费时,刚消费到还没有处理,结果消费者就挂了,这样你重启之后,RabbitMQ就认为你已经消费过了,然后就丢了数据。

针对上述三种情况,RabbitMQ可以采用如下方式避免消息丢失:

    • 可以选择使用RabbitMQ提供是事务功能,就是生产者在发送数据之前开启事务,然后发送消息,如果消息没有成功被RabbitMQ接收到,那么生产者会受到异常报错,这时就可以回滚事务,然后尝试重新发送。如果收到了消息,那么就可以提交事务。这种方式有明显的缺点,即RabbitMQ事务开启后,就会变为同步阻塞操作,生产者会阻塞等待是否发送成功,太耗性能会造成吞吐量的下降。
    • 可以开启confirm模式。在生产者那里设置开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如何写入了RabbitMQ之中,RabbitMQ会给你回传一个ack消息,告诉你这个消息发送OK了。如果RabbitMQ没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。

    事务机制是同步的,你提交了一个事物之后会阻塞住,但是confirm机制是异步的,发送消息之后可以接着发送下一个消息,然后RabbitMQ会回调告知成功与否。 一般在生产者这块避免丢失,都是用confirm机制。

  1. 设置消息持久化到磁盘,设置持久化有两个步骤:

    • 创建queue的时候将其设置为持久化的,这样就可以保证RabbitMQ持久化queue的元数据,但是不会持久化queue里面的数据。
    • 发送消息的时候讲消息的deliveryMode设置为2,这样消息就会被设为持久化方式,此时RabbitMQ就会将消息持久化到磁盘上。 必须要同时开启这两个才可以。

    而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前RabbitMQ挂了,数据丢了,生产者收不到ack回调也会进行消息重发。

  2. 使用RabbitMQ提供的ack机制,首先关闭RabbitMQ的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。

Kafka丢失消息分为如下几种情况:

  1. 生产者没有设置相应的策略,发送过程中丢失数据。

  2. Kafka自己丢消息:

    比较常见的一个场景,就是Kafka的某个broker宕机了,然后重新选举partition的leader时。如果此时follower还没来得及同步数据,leader就挂了,然后某个follower成为了leader,它就少了一部分数据。

  3. 消费者消费到了这个数据,然后消费之自动提交了offset,让Kafka知道你已经消费了这个消息,当你准备处理这个消息时,自己挂掉了,那么这条消息就丢了。

针对上述三种情况,Kafka可以采用如下方式避免消息丢失:

  1. 关闭自动提交offset,在自己处理完毕之后手动提交offset,这样就不会丢失数据。

  2. Kafka自己丢消息:

    一般要求设置4个参数来保证消息不丢失:

  3. 在kafka服务端设置 min.isync.replicas 参数,这个值必须大于1,表示 要求一个leader至少感知到有至少一个follower在跟自己保持联系正常同步数据,这样才能保证leader挂了之后还有一个follower。
  4. 在生产者端设置 acks=all ,表示 要求每条每条数据,必须是写入所有replica副本之后,才能认为是写入成功了。
  5. 在生产者端设置 retries=MAX (很大的一个值),表示这个是要求一旦写入事变,就无限重试。
  6. 如果按照上面设置了ack=all,则一定不会丢失数据,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。

  7. 数据,大数据量的数据处理上。

  8. RabbitMQ:以broker为中心,有消息的确认机制。

    Kafka:以consumer为中心,没有消息的确认机制。

  9. RabbitMQ:支持消息的可靠的传递,支持事务,不支持批量操作,基于存储的可靠性的要求存储可以采用内存或硬盘,吞吐量小。

    Kafka:内部采用消息的批量处理,数据的存储和获取是本地磁盘顺序批量操作,消息处理的效率高,吞吐量高。

2.5 消息队列如何保证不重复消费?

先大概说一说可能会有哪些重复消费的问题。首先就是比如rabbitmq、rocketmq、kafka,都有可能会出现消费重复消费的问题,正常。因为这问题通常不是mq自己保证的,是给你保证的。然后我们挑一个kafka来举个例子,说说怎么重复消费吧。

kafka实际上有个offset的概念,就是每个消息写进去,都有一个offset,代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的offset来继续消费吧。

但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接kill进程了,再重启。这会导致consumer有些消息处理了,但是没来得及提交offset,尴尬了。重启之后,少数消息会再次消费一次。

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。举个例子,假设你有个系统,消费一条往数据库里插入一条,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下已经消费过了,直接扔了,不就保留了一条数据?

一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性幂等性。通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。

想要保证不重复消费,其实还要结合业务来思考,这里给几个思路:

  1. 比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update一下。
  2. 比如你是写redis,那没问题了,反正每次都是set,天然幂等性。
  3. 比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

还有比如基于数据库的唯一键来保证重复数据不会重复插入多条,我们之前线上系统就有这个问题,就是拿到数据的时候,每次重启可能会有重复,因为kafka消费者还没来得及提交offset,重复数据拿到了以后我们插入的时候,因为有唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据。

2.6 MQ处理消息失败了怎么办?

一般生产环境中,都会在使用MQ的时候设计两个队列:一个是核心业务队列,一个是死信队列。核心业务队列,就是比如专门用来让订单系统发送订单消息的,然后另外一个死信队列就是用来处理异常情况的。

比如说要是第三方物流系统故障了,此时无法请求,那么仓储系统每次消费到一条订单消息,尝试通知发货和配送,都会遇到对方的接口报错。此时仓储系统就可以把这条消息拒绝访问,或者标志位处理失败!注意,这个步骤很重要。

一旦标志这条消息处理失败了之后,MQ就会把这条消息转入提前设置好的一个死信队列中。然后你会看到的就是,在第三方物流系统故障期间,所有订单消息全部处理失败,全部会转入死信队列。然后你的仓储系统得专门有一个后台线程,监控第三方物流系统是否正常,能否请求的,不停的监视。一旦发现对方恢复正常,这个后台线程就从死信队列消费出来处理失败的订单,重新执行发货和配送的通知逻辑。死信队列的使用,其实就是MQ在生产实践中非常重要的一环,也就是架构设计必须要考虑的。

整个过程,如下图所示:

2.7 请介绍消息队列推和拉的使用场景

推模式是服务器端根据用户需要,由目的、按时将用户感兴趣的信息主动发送到用户的客户端。

  • 对用户要求低,方便用户获取需要的信息;
  • 及时性好,服务器端及时地向客户端推送更新动态信息,吞吐量大。
  • 不能确保发送成功,推模式采用广播方式,只有服务器端和客户端在同一个频道上,推模式才有效,用户才能接收到信息;
  • 没有信息状态跟踪,推模式采用开环控制技术,一个信息推送后的状态,比如客户端是否接收等,无从得知;
  • 针对性较差。推送的信息可能并不能满足客户端的个性化需求。

拉模式是客户端主动从服务器端获取信息。

  • 针对性强,能满足客户端的个性化需求;
  • 信息传输量较小,网络中传输的只是客户端的请求和服务器端对该请求的响应;
  • 服务器端的任务轻。服务器端只是被动接收查询,对客户端的查询请求做出响应。
  • 实时性较差,针对于服务器端实时更新的信息,客户端难以获取实时信息;
  • 对于客户端用户的要求较高,需要对服务器端具有一定的了解。

在实际生产应用中,通常会使用Kafka作为消息传输的数据管道,RabbitMQ作为交易数据作为数据传输管道,主要的取舍因素则是是否存在丢数据的可能。RabbitMQ在金融场景中经常使用,具有较高的严谨性,数据丢失的可能性更小,同事具备更高的实时性。而Kafka优势主要体现在吞吐量上,虽然可以通过策略实现数据不丢失,但从严谨性角度来讲,大不如RabbitMQ。而且由于Kafka保证每条消息最少送达一次,有较小的概率会出现数据重复发送的情况。详细来说,它们之间主要有如下的区别:

  1. RabbitMQ:用于实时的,对可靠性要求较高的消息传递上。

    Kafka:用于处于活跃的流式数据,大数据量的数据处理上。

  2. RabbitMQ:以broker为中心,有消息的确认机制。

    Kafka:以consumer为中心,没有消息的确认机制。

  3. RabbitMQ:支持消息的可靠的传递,支持事务,不支持批量操作,基于存储的可靠性的要求存储可以采用内存或硬盘,吞吐量小。

    Kafka:内部采用消息的批量处理,数据的存储和获取是本地磁盘顺序批量操作,消息处理的效率高,吞吐量高。

Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间,但是实际上,Kafka的特性之一就是高吞吐率。即使是普通的服务器,Kafka也可以轻松支持每秒百万级的写入请求,超过了大部分的消息中间件,这种特性也使得Kafka在日志处理等海量数据场景广泛应用。

下面从数据写入和读取两方面分析,为什么Kafka速度这么快:

Kafka会把收到的消息都写入到硬盘中,它绝对不会丢失数据。为了优化写入速度Kafka采用了两个技术,顺序写入和MMFile 。

磁盘读写的快慢取决于你怎么使用它,也就是顺序读写或者随机读写。在顺序读写的情况下,磁盘的顺序读写速度和内存持平。因为硬盘是机械结构,每次读写都会寻址->写入,其中寻址是一个“机械动作”,它是最耗时的。所以硬盘最讨厌随机I/O,最喜欢顺序I/O。为了提高读写硬盘的速度,Kafka就是使用顺序I/O。

而且Linux对于磁盘的读写优化也比较多,包括read-ahead和write-behind,磁盘缓存等。如果在内存做这些操作的时候,一个是JAVA对象的内存开销很大,另一个是随着堆内存数据的增多,JAVA的GC时间会变得很长,使用磁盘操作有以下几个好处:

  1. 磁盘顺序读写速度超过内存随机读写;
  2. JVM的GC效率低,内存占用大。使用磁盘可以避免这一问题;
  3. 系统冷启动后,磁盘缓存依然可用。

下图就展示了Kafka是如何写入数据的, 每一个Partition其实都是一个文件 ,收到消息后Kafka会把数据插入到文件末尾(虚框部分):

这种方法有一个缺陷——没有办法删除数据 ,所以Kafka是不会删除数据的,它会把所有的数据都保留下来,每个消费者(Consumer)对每个Topic都有一个offset用来表示读取到了第几条数据 。

即便是顺序写入硬盘,硬盘的访问速度还是不可能追上内存。所以Kafka的数据并不是实时的写入硬盘 ,它充分利用了现代操作系统分页存储来利用内存提高I/O效率。Memory Mapped Files(后面简称mmap)也被翻译成 内存映射文件,在64位操作系统中一般可以表示20G的数据文件,它的工作原理是直接利用操作系统的Page来实现文件到物理内存的直接映射。完成映射之后你对物理内存的操作会被同步到硬盘上(操作系统在适当的时候)。

通过mmap,进程像读写硬盘一样读写内存(当然是虚拟机内存),也不必关心内存的大小有虚拟内存为我们兜底。使用这种方式可以获取很大的I/O提升,省去了用户空间到内核空间复制的开销(调用文件的read会把数据先放到内核空间的内存中,然后再复制到用户空间的内存中。)

但也有一个很明显的缺陷——不可靠,写到mmap中的数据并没有被真正的写到硬盘,操作系统会在程序主动调用flush的时候才把数据真正的写到硬盘。Kafka提供了一个参数——producer.type来控制是不是主动flush,如果Kafka写入到mmap之后就立即flush然后再返回Producer叫 同步 (sync);写入mmap之后立即返回Producer不调用flush叫异步

传统模式下,当需要对一个文件进行传输的时候,其具体流程细节如下:

  • 调用read函数,文件数据被copy到内核缓冲区;
  • read函数返回,文件数据从内核缓冲区copy到用户缓冲区;
  • write函数调用,将文件数据从用户缓冲区copy到内核与socket相关的缓冲区;
  • 数据从socket缓冲区copy到相关协议引擎。

以上细节是传统read/write方式进行网络文件传输的方式,我们可以看到,在这个过程当中,文件数据实际上是经过了四次copy操作:硬盘->内核buf->用户buf->socket相关缓冲区->协议引擎。而sendfile系统调用则提供了一种减少以上多次copy,提升文件传输性能的方法。

在内核版本2.1中,引入了sendfile系统调用,以简化网络上和两个本地文件之间的数据传输。sendfile的引入不仅减少了数据复制,还减少了上下文切换。运行流程如下:

  • sendfile系统调用,文件数据被copy至内核缓冲区;
  • 再从内核缓冲区copy至内核中socket相关的缓冲区;
  • 最后再socket相关的缓冲区copy到协议引擎。

相较传统read/write方式,2.1版本内核引进的sendfile已经减少了内核缓冲区到user缓冲区,再由user缓冲区到socket相关缓冲区的文件copy,而在内核版本2.4之后,文件描述符结果被改变,sendfile实现了更简单的方式,再次减少了一次copy操作。

在Apache、Nginx、lighttpd等web服务器当中,都有一项sendfile相关的配置,使用sendfile可以大幅提升文件传输性能。Kafka把所有的消息都存放在一个一个的文件中,当消费者需要数据的时候Kafka直接把文件发送给消费者,配合mmap作为文件读写方式,直接把它传给sendfile。

在很多情况下,系统的瓶颈不是CPU或磁盘,而是网络IO,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。进行数据压缩会消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑。

  • 如果每个消息都压缩,但是压缩率相对很低,所以Kafka使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩;
  • Kafka允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩;
  • Kafka支持多种压缩协议,包括Gzip和Snappy压缩协议。

Kafka速度的秘诀在于,它把所有的消息都变成一个批量的文件,并且进行合理的批量压缩,减少网络IO损耗,通过mmap提高I/O速度,写入数据的时候由于单个Partion是末尾添加所以速度最优。读取数据的时候配合sendfile直接暴力输出。

RabbitMQ可能丢失消息分为如下几种情况:

  1. 生产者将数据发送到RabbitMQ的时候,可能在传输过程中因为网络等问题而将数据弄丢了。

  2. 如果没有开启RabbitMQ的持久化,那么RabbitMQ一旦重启数据就丢了。所以必须开启持久化将消息持久化到磁盘,这样就算RabbitMQ挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢失。除非极其罕见的情况,RabbitMQ还没来得及持久化自己就挂了,这样可能导致一部分数据丢失。

  3. 主要是因为消费者消费时,刚消费到还没有处理,结果消费者就挂了,这样你重启之后,RabbitMQ就认为你已经消费过了,然后就丢了数据。

针对上述三种情况,RabbitMQ可以采用如下方式避免消息丢失:

    • 可以选择使用RabbitMQ提供是事务功能,就是生产者在发送数据之前开启事务,然后发送消息,如果消息没有成功被RabbitMQ接收到,那么生产者会受到异常报错,这时就可以回滚事务,然后尝试重新发送。如果收到了消息,那么就可以提交事务。这种方式有明显的缺点,即RabbitMQ事务开启后,就会变为同步阻塞操作,生产者会阻塞等待是否发送成功,太耗性能会造成吞吐量的下降。
    • 可以开启confirm模式。在生产者那里设置开启了confirm模式之后,每次写的消息都会分配一个唯一的id,然后如何写入了RabbitMQ之中,RabbitMQ会给你回传一个ack消息,告诉你这个消息发送OK了。如果RabbitMQ没能处理这个消息,会回调你一个nack接口,告诉你这个消息失败了,你可以进行重试。而且你可以结合这个机制知道自己在内存里维护每个消息的id,如果超过一定时间还没接收到这个消息的回调,那么你可以进行重发。

    事务机制是同步的,你提交了一个事物之后会阻塞住,但是confirm机制是异步的,发送消息之后可以接着发送下一个消息,然后RabbitMQ会回调告知成功与否。 一般在生产者这块避免丢失,都是用confirm机制。

  1. 设置消息持久化到磁盘,设置持久化有两个步骤:

    • 创建queue的时候将其设置为持久化的,这样就可以保证RabbitMQ持久化queue的元数据,但是不会持久化queue里面的数据。
    • 发送消息的时候讲消息的deliveryMode设置为2,这样消息就会被设为持久化方式,此时RabbitMQ就会将消息持久化到磁盘上。 必须要同时开启这两个才可以。

    而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前RabbitMQ挂了,数据丢了,生产者收不到ack回调也会进行消息重发。

息,然后RabbitMQ会回调告知成功与否。 一般在生产者这块避免丢失,都是用confirm机制。

  1. 设置消息持久化到磁盘,设置持久化有两个步骤:

    • 创建queue的时候将其设置为持久化的,这样就可以保证RabbitMQ持久化queue的元数据,但是不会持久化queue里面的数据。
    • 发送消息的时候讲消息的deliveryMode设置为2,这样消息就会被设为持久化方式,此时RabbitMQ就会将消息持久化到磁盘上。 必须要同时开启这两个才可以。

    而且持久化可以跟生产的confirm机制配合起来,只有消息持久化到了磁盘之后,才会通知生产者ack,这样就算是在持久化之前RabbitMQ挂了,数据丢了,生产者收不到ack回调也会进行消息重发。

  2. 使用RabbitMQ提供的ack机制,首先关闭RabbitMQ的自动ack,然后每次在确保处理完这个消息之后,在代码里手动调用ack。这样就可以避免消息还没有处理完就ack。

}

聚合全网技术文章,根据你的阅读喜好进行个性推荐

深圳市奥思网络科技有限公司版权所有

}

我要回帖

更多关于 使用一维数组实现顺序栈 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信