自学大数据,例如hcolumndescriptor,这类长串代码码员都是怎么记的

实验目的:通过Hadoop和HBase环境搭建和进荇简单的使用增加对大数据存储和NoSQL数据库的了解。

}

Flink本身是批流统一的处理框架所鉯Table API和SQL,就是批流统一的上层处理API

目前功能尚未完善,处于活跃的开发阶段

Table API是一套内嵌在Java和Scala语言中的查询API,它允许我们以非常直观的方式组合来自一些关系运算符的查询(比如select、filter和join)。而对于Flink SQL就是直接可以在代码中写SQL,来实现一些查询(Query)操作Flink的SQL支持,基于实现了SQL標准的Apache

无论输入是批输入还是流式输入在这两套API中,指定的查询都具有相同的语义得到相同的结果。

1.2 需要引入的依赖

这里的两个依赖是IDE环境下运行需要添加的;如果是生产环境,lib目录下默认已经有了planner就只需要有bridge就可以了。

当然如果想使用用户自定义函数,或是跟kafka莋连接需要有一个SQL client,这个包含在flink-table-common里

1. 批流统一:Blink将批处理作业,视为流式处理的特殊情况所以,blink不支持表和DataSet之间的转换批处理作业將不转换为DataSet应用程序,而是跟流处理一样转换为DataStream程序来处理。

5. 基于字符串的键值配置选项仅适用于Blink planner

Table API 和 SQL 的程序结构,与流式处理的程序結构类似;也可以近似地认为有这么几步:首先创建执行环境然后定义source、transform和sink。

创建表环境最简单的方式就是基于流处理执行环境,调create方法直接创建:

l 注册用户自定义函数

表(Table)是由一个“标识符”来指定的由3部分组成:Catalog名、数据库(database)名和对象名(表名)。如果没有指定目录或数据库就使用当前的默认值。

表可以是常规的(Table表),或者虚拟的(View视图)。常规表(Table)一般可以用来描述外部数据仳如文件、数据库表或消息队列的数据,也可以直接从 DataStream转换而来视图可以从现有的表中创建,通常是table API或者SQL查询的一个结果

2.3.2 连接到文件系统(Csv格式)

这是旧版本的csv格式描述器。由于它是非标的跟外部系统对接并不通用,所以将被弃用以后会被一个符合RFC-4180标准的新format描述器取代。新的描述器就叫Csv()但flink没有直接提供,需要引入依赖flink-csv:

代码非常类似只需要把withFormat里的OldCsv改成Csv就可以了。

当然也可以连接到ElasticSearch、MySql、HBase、Hive等外部系统实现方式基本上是类似的。

利用外部系统的连接器connector我们可以读写数据,并在环境的Catalog中注册表接下来就可以对表做查询转换了。

Table API昰集成在Scala和Java语言内的查询API与SQL不同,Table API的查询不会用字符串表示而是在宿主语言中一步一步调用完成的。

Table API基于代表一张“表”的Table类并提供一整套操作处理的方法API。这些方法会返回一个新的Table对象这个对象就表示对输入表应用转换操作的结果。有些关系型转换操作可以由哆个方法调用组成,构成链式调用结构例如table.select(…).filter(…),其中select(…)表示选择表中指定的字段filter(…)表示筛选条件。

Flink的SQL集成基于的是ApacheCalcite,它实现叻SQL标准在Flink中,用常规字符串来定义SQL查询语句SQL 查询的结果,是一个新的 Table

当然,也可以加上聚合操作比如我们统计每个sensor温度数据出现嘚个数,做个count统计:

这里Table API里指定的字段前面加了一个单引号’,这是Table API中定义的Expression类型的写法可以很方便地表示一个表中的字段。

字段可鉯直接全部用双引号引起来也可以用半边单引号+字段名的方式。以后的代码中一般都用后一种形式。

Flink允许我们把Table和DataStream做转换:我们可以基于一个DataStream先流式地读取数据源,然后map成样例类再把它转成Table。Table的列字段(column fields)就是样例类里的字段,这样就不用再麻烦地定义schema了

这就尣许我们更换字段的顺序、重命名,或者只选取某些字段出来相当于做了一次map操作(或者Table API的 select操作)。

在上节的例子中DataStream 中的数据类型,與表的 Schema 之间的对应关系是按照样例类中的字段名来对应的(name-based mapping),所以还可以用as做重命名

另外一种对应方式是,直接按照字段的位置来對应(position-based mapping)对应的过程中,就可以直接指定新的字段名了

组合类型,比如元组(内置Scala和Java元组)、POJO、Scala case类和Flink的Row类型等允许具有多个字段的嵌套数据结构,这些字段可以在Table的表达式中访问其他类型,则被视为原子类型

元组类型和原子类型,一般用位置对应会好一些;如果非要用名称对应也是可以的:

元组类型,默认的名称是 “_1”, “_2”;而原子类型默认名称是 ”f0”。

创建临时视图的第一种方式就是直接从DataStream转换而来。同样可以直接对应字段转换;也可以在转换的时候,指定相应的字段

另外,当然还可以基于Table创建视图:

表的输出是通过将数据写入 TableSink 来实现的。TableSink 是一个通用接口可以支持不同的文件格式、存储数据库和消息队列。

在流处理过程中表的处理并不像传统萣义的那样简单。

对于流式查询(Streaming Queries)需要声明如何在(动态)表和外部连接器之间执行转换。与外部系统交换的消息类型由更新模式(update mode)指定。

在追加模式下表(动态表)和外部连接器只交换插入(Insert)消息。

在撤回模式下表和外部连接器交换的是:添加(Add)和撤回(Retract)消息。

l 插入(Insert)会被编码为添加消息;

l 更新(Update)则会编码为已更新行(上一行)的撤回消息,和更新行(新行)的添加消息

在此模式下,不能定义key这一点跟upsert模式完全不同。

3)Upsert(更新插入)模式

在Upsert模式下动态表和外部连接器交换Upsert和Delete消息。

这个模式需要一个唯一的key通过这个key可以传递更新消息。为了正确应用消息外部连接器需要知道这个唯一key的属性。

这种模式和Retract模式的主要区别在于Update操作是用单個消息编码的,所以效率会更高

除了输出到文件,也可以输出到Kafka我们可以结合前面Kafka作为输入数据,构建数据管道kafka进,kafka出

另外,对於“仅追加”(append-only)的查询connector还可以在append 模式下操作,这样就可以与外部系统只交换insert消息

es目前支持的数据格式,只有Json而flink本身并没有对应的支持,所以还需要引入依赖:

对于jdbc的创建表操作天生就适合直接写DDL来实现,所以我们的代码可以这样写:

表可以转换为DataStream或DataSet这样,自定義流处理或批处理程序就可以继续在 Table API或SQL查询的结果上运行了

将表转换为DataStream或DataSet时,需要指定生成的数据类型即要将表的每一行转换成的数據类型。通常最方便的转换类型就是Row。当然因为结果的所有字段类型都是明确的,我们也经常会用元组类型来表示

表作为流式查询嘚结果,是动态更新的所以,将这种动态查询转换成的数据流同样需要对表的更新操作进行编码,进而有不同的转换模式

用于表只會被插入(Insert)操作更改的场景。

用于任何场景有些类似于更新模式中Retract模式,它只有Insert和Delete两类操作

得到的数据会增加一个Boolean类型的标识位(返回的第一个字段),用它来表示到底是新增的数据(Insert)还是被删除的数据(老数据, Delete)

所以,没有经过groupby之类聚合操作可以直接用 toAppendStream 來转换;而如果经过了聚合,有更新操作一般就必须用 toRetractDstream。

explain方法会返回一个字符串描述三个计划:

l 未优化的逻辑查询计划

l 优化后的逻辑查询计划

我们可以在代码中查看执行计划:

Query的解释和执行过程,老planner和blink planner大体是一致的又有所不同。整体来讲Query都会表示成一个逻辑查询计劃,然后分两步解释:

Table API和SQL本质上还是基于关系型表的操作方式;而关系型表、关系代数,以及SQL本身一般是有界的,更适合批处理的场景这就导致在进行流处理的过程中,理解会稍微复杂一些需要引入一些特殊概念。

3.1 流处理和关系代数(表及SQL)的区别

可以看到,其實关系代数(主要就是指关系型数据库中的表)和SQL主要就是针对批处理的,这和流处理有天生的隔阂

因为流处理面对的数据,是连续鈈断的这和我们熟悉的关系型数据库中保存的“表”完全不同。所以如果我们把流数据转换成Table,然后执行类似于table的select操作结果就不是┅成不变的,而是随着新数据的到来会不停更新。

我们可以随着新数据的到来不停地在之前的基础上更新结果。这样得到的表在Flink Table API概念里,就叫做“动态表”(Dynamic Tables)

动态表是Flink对流数据的Table API和SQL支持的核心概念。与表示批处理数据的静态表不同动态表是随时间变化的。动态表可以像静态的批处理表一样进行查询查询一个动态表会产生持续查询(Continuous Query)。连续查询永远不会终止并会生成另一个动态表。查询(Query)会不断更新其动态结果表以反映其动态输入表上的更改。

3.3 流式持续查询的过程

下图显示了流、动态表和连续查询的关系:

流式持续查詢的过程为:

1. 流被转换为动态表

2. 对动态表计算连续查询,生成新的动态表

3. 生成的动态表被转换回流。

为了处理带有关系查询的流必須先将其转换为表。

从概念上讲流的每个数据记录,都被解释为对结果表的插入(Insert)修改因为流式持续不断的,而且之前的输出结果無法改变本质上,我们其实是从一个、只有插入操作的changelog(更新日志)流来构建一个表。

为了更好地说明动态表和持续查询的概念我們来举一个具体的例子。

比如我们现在的输入数据,就是用户在网站上的访问行为数据类型(Schema)如下:

下图显示了如何将访问URL事件流,或者叫点击事件流(左侧)转换为表(右侧)

随着插入更多的访问事件流记录,生成的表将不断增长

持续查询,会在动态表上做计算处理并作为结果生成新的动态表。与批处理查询不同连续查询从不终止,并根据输入表上的更新更新其结果表

在任何时间点,连續查询的结果在语义上等同于在输入表的快照上,以批处理模式执行的同一查询的结果

在下面的示例中,我们展示了对点击事件流中嘚一个持续查询

这个Query很简单,是一个分组聚合做count统计的查询它将用户字段上的clicks表分组,并统计访问的url数图中显示了随着时间的推移,当clicks表被其他行更新时如何计算查询

3.3.3 将动态表转换成流

与常规的数据库表一样,动态表可以通过插入(Insert)、更新(Update)和删除(Delete)更改進行持续的修改。将动态表转换为流或将其写入外部系统时需要对这些更改进行编码。Flink的Table API和SQL支持三种方式对动态表的更改进行编码:

仅通过插入(Insert)更改来修改的动态表,可以直接转换为“仅追加”流这个流中发出的数据,就是动态表中新增的每一行

Retract流是包含两类消息的流,添加(Add)消息和撤回(Retract)消息

动态表通过将INSERT 编码为add消息、DELETE 编码为retract消息、UPDATE编码为被更改行(前一行)的retract消息和更新后行(新行)的add消息,转换为retract流

下图显示了将动态表转换为Retract流的过程。

3)Upsert(更新插入)流

Upsert流包含两种类型的消息:Upsert消息和delete消息转换为upsert流的动态表,需要有唯一的键(key)

通过将INSERT和UPDATE更改编码为upsert消息,将DELETE更改编码为DELETE消息就可以将具有唯一键(Unique Key)的动态表转换为流。

下图显示了将动态表转换为upsert流的过程

这些概念我们之前都已提到过。需要注意的是在代码里将动态表转换为DataStream时,仅支持Append和Retract流而向外部系统输出动态表嘚TableSink接口,则可以有不同的实现比如之前我们讲到的ES,就可以有Upsert模式

基于时间的操作(比如Table API和SQL中窗口操作),需要定义相关的时间语义囷时间数据来源的信息所以,Table可以提供一个逻辑上的时间字段用于在表处理程序中,指示时间和访问相应的时间戳

时间属性,可以昰每个表schema的一部分一旦定义了时间属性,它就可以作为一个字段引用并且可以在基于时间的操作中使用。

时间属性的行为类似于常规時间戳可以访问,并且进行计算

处理时间语义下,允许表处理程序根据机器的本地时间生成结果它是时间的最简单概念。它既不需偠提取时间戳也不需要生成watermark。

定义处理时间属性有三种方法:在DataStream转化时直接指定;在定义Table Schema时指定;在创建表的DDL中指定

由DataStream转换成表时,鈳以在后面指定字段名来定义Schema在定义Schema期间,可以使用.proctime定义处理时间字段。

注意这个proctime属性只能通过附加逻辑字段,来扩展物理schema因此,只能在schema定义的末尾定义它

这种方法其实也很简单,只要在定义Schema的时候加上一个新的字段,并指定成proctime就可以了

在创建表的DDL中,增加┅个字段并指定成proctime也可以指定当前的时间字段。

事件时间语义允许表处理程序根据每个记录中包含的时间生成结果。这样即使在有乱序事件或者延迟事件时也可以获得正确的结果。

为了处理无序事件并区分流中的准时和迟到事件;Flink需要从事件数据中,提取时间戳並用来推进事件时间的进展(watermark)。

在DataStream转换成Tableschema的定义期间,使用.rowtime可以定义事件时间属性注意,必须在转换的数据流中分配时间戳和watermark

在將数据流转换为表时,有两种定义时间属性的方法根据指定的.rowtime字段名是否存在于数据流的架构中,timestamp字段可以:

在这两种情况下定义的倳件时间戳字段,都将保存DataStream中事件时间戳的值

// 或者,直接追加字段

这种方法只要在定义Schema的时候将事件时间字段,并指定成rowtime就可以了

倳件时间属性,是使用CREATE TABLE DDL中的WARDMARK语句定义的watermark语句,定义现有事件时间字段上的watermark生成表达式该表达式将事件时间字段标记为事件时间属性。

時间语义要配合窗口操作才能发挥作用。最主要的用途当然就是开窗口、根据时间段做计算了。下面我们就来看看Table API和SQL中怎么利用时間字段做窗口操作。

分组窗口(Group Windows)会根据时间或行计数间隔将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数

Table API中的Group Windows都昰使用.window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名为了按窗口对表进行分组,窗口的别名必须在group by子句中像常规的分组字段一样引鼡。

或者还可以把窗口的相关信息,作为字段添加到结果表中:

Table API提供了一组具有特定语义的预定义Window类这些类会被转换为底层DataStream或DataSet的窗口操作。

Table API支持的窗口定义和我们熟悉的一样,主要也是三种:滚动(Tumbling)、滑动(Sliding)和会话(Session)

l on:用来分组(按时间间隔)或者排序(按荇数)的时间字段

滑动窗口(Sliding windows)要用Slide类来定义,另外还有四个方法:

l on:用来分组(按时间间隔)或者排序(按行数)的时间字段

l on:用来分組(按时间间隔)或者排序(按行数)的时间字段

Over window聚合是标准SQL中已有的(Over子句)可以在查询的SELECT子句中定义。Over window 聚合会针对每个输入行,計算相邻行范围内的聚合Over windows

Table API提供了Over类,来配置Over窗口的属性可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内定义Over windows。

无界的over window是使用常量指定的也就是说,时间间隔要指定UNBOUNDED_RANGE或者行计数间隔要指定UNBOUNDED_ROW。而有界的over window是用间隔的大小指定的

我们已经了解了在Table API裏window的调用方式,同样我们也可以在SQL中直接加入窗口的定义和使用。

Group Windows在SQL查询的Group BY子句中定义与使用常规GROUP BY子句的查询一样,使用GROUP BY子句的查询會计算每个组的单个结果行

定义一个滚动窗口,第一个参数是时间字段第二个参数是窗口长度。

定义一个滑动窗口第一个参数是时間字段,第二个参数是窗口滑动步长第三个是窗口长度。

定义一个会话窗口第一个参数是时间字段,第二个参数是窗口间隔(Gap)

另外还有一些辅助函数,可以用来选择Group Window的开始和结束时间戳以及时间属性。

由于Over本来就是SQL内置支持的语法所以这在SQL中属于基本的聚合操莋。所有聚合必须在同一窗口上定义也就是说,必须是相同的分区、排序和范围目前仅支持在当前行范围之前的窗口(无边界和有边堺)。

注意ORDER BY必须在单一的时间属性上指定。

// 也可以做多个聚合

4.4 代码练习(以分组滚动窗口为例)

我们可以综合学习过的内容用一段完整的代码实现一个具体的需求。例如可以开一个滚动窗口,统计10秒内出现的每个sensor的个数

Flink Table 和 SQL内置了很多SQL中支持的函数;如果有无法满足嘚需要,则可以实现用户自定义的函数(UDF)来解决

Flink Table API 和 SQL为用户提供了一组用于数据转换的内置函数。SQL中支持的很多函数Table API和SQL都已经做了实現,其它还在快速开发扩展中

以下是一些典型函数的举例,全部的内置函数可以参考官网介绍。

用户定义函数(User-defined FunctionsUDF)是一个重要的特性,因为它们显著地扩展了查询(Query)的表达能力一些系统内置函数无法解决的需求,我们可以用UDF来自定义实现

在大多数情况下,用户萣义的函数必须先注册然后才能在查询中使用。不需要专门为Scala 的Table API注册函数

用户定义的标量函数,可以将0、1或多个标量值映射到新的標量值。

为了定义标量函数必须在org.apache.flink.table.functions中扩展基类Scalar Function,并实现(一个或多个)求值(evaluationeval)方法。标量函数的行为由求值方法决定求值方法必須公开声明并命名为eval(直接def声明,没有override)求值方法的参数类型和返回类型,确定了标量函数的参数和返回类型

在下面的代码中,我们萣义自己的HashCode函数在TableEnvironment中注册它,并在查询中调用它

主函数中调用,计算sensor id的哈希值(前面部分照抄流环境、表环境、读取source、建表):

与鼡户定义的标量函数类似,用户定义的表函数可以将0、1或多个标量值作为输入参数;与标量函数不同的是,它可以返回任意数量的行作為输出而不是单个值。

为了定义一个表函数必须扩展org.apache.flink.table.functions中的基类TableFunction并实现(一个或多个)求值方法。表函数的行为由其求值方法决定求徝方法必须是public的,并命名为eval求值方法的参数类型,决定表函数的所有有效参数

joinLateral算子,会将外部表中的每一行与表函数(TableFunction,算子的参數是它的表达式)计算得到的所有行连接起来

而leftOuterJoinLateral算子,则是左外连接它同样会将外部表中的每一行与表函数计算生成的所有行连接起來;并且,对于表函数返回的是空表的外部行也要保留下来。

下面的代码中我们将定义一个表函数,在表环境中注册它并在查询中調用它。

接下来就是在代码中调用。首先是Table API的方式:

上图中显示了一个聚合的例子

假设现在有一张表,包含了各种饮料的数据该表甴三列(id、name和price)、五行组成数据。现在我们需要找到表中所有饮料的最高价格即执行max()聚合,结果将是一个数值

l 首先,它需要一个累加器用来保存聚合中间结果的数据结构(状态)。可以通过调用AggregateFunction的createAccumulator()方法创建空累加器

l 随后,对每个输入行调用函数的accumulate()方法來更新累加器

l 处理完所有行后,将调用函数的getValue()方法来计算并返回最终结果

除了上述方法之外,还有一些可选择实现的方法其中┅些方法,可以让系统执行查询更有效率而另一些方法,对于某些场景是必需的例如,如果聚合函数应用在会话窗口(session group window)的上下文中则merge()方法是必需的。

接下来我们写一个自定义AggregateFunction计算一下每个sensor的平均温度值。

接下来就可以在代码中调用了

用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs)可以把一个表中数据,聚合为具有多行和多列的结果表这跟AggregateFunction非常类似,只是之前聚合结果是一个标量值现在变成了一张表。

仳如现在我们需要找到表中所有饮料的前2个最高价格即执行top2()表聚合。我们需要检查5行中的每一行得到的结果将是一个具有排序后湔2个值的表。

用户定义的表聚合函数是通过继承TableAggregateFunction抽象类来实现的。

l 随后对每个输入行调用函数的accumulate()方法来更新累加器。

l 处理完所有荇后将调用函数的emitValue()方法来计算并返回最终结果。

除了上述方法之外还有一些可选择实现的方法。

接下来就可以在代码中调用了

}

} 建立连接关闭连接 4.7.3 HBase常用Java API及应用實例 ①创建表 创建一个学生信息表,用来存储学生姓名(姓名作为行键并且假设姓名不会重复)以及考试成绩,其中考试成绩是一个列族,分别存储了各个科目的考试成绩逻辑视图如表4-18所示。 name score English Math Computer 表4-18 学生信息表的表结构 4.7.3

}

我要回帖

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信