本文翻译自 Confluent 的官方博客,原文章地址为:Building a Real-Time Streaming ETL Pipeline in 20 Minutes,由于本人各方面能力有限,如翻译有纰漏,请同学们不吝赐教。
最近,我看到有很多的演讲都在说传统的 ETL 已经死了。在传统的 ETL 流程图中,数据仓库简直就是神,ETL 的作业都是批处理形式的,所有东西都可以相互通信,同时普遍存在扩展限制。人们勉强得忍受着杂乱的数据流,因为他们心中默默得想着这都是”商业化的代价”。大家现在使用着的就是像这样杂乱的结构:
但是!我不认为 ETL 已经死了。开发者们越来越偏向于使用基于分布式系统和事件驱动应用程序的新型 ETL 工作流,企业不仅可以实时处理数据,同时还可以根据需求随时扩展。但是,这仍然需要 “提取”/“转换” 和 “加载”,不同之处在于它将数据作为一等公民对待。企业不再将数据加载到批处理中进行处理,因为这通常都是离线操作的,频率大概一天一次。他们有更多的数据源和数据类型,并且想要消除杂乱的点对点连接,所以我们可以将流处理直接嵌入到每个服务中,同时核心业务应用程序可以依赖流处理平台来分发和处理事件。本文的重点就是演示如何轻松得在 Apache Kafka 中实现这些流式的 ETL 工作流。
Kafka 是分布式的流平台,可以说是现代企业结构的核心。它提供了 Kafka Connect 框架用于从不同的数据源中提取数据,丰富的 Stream API 可以在你的核心应用程序中进行复杂的转换和分析,还有更多的 Kakfa 连接器将转换的数据加载到另外一个系统。此外,你还可以通过部署 Confluent Schema Registry 集中管理 Schema,验证兼容性,从而在数据不符合 schema 的时候提供告警(不要问我为什么要在关键任务上校验 Schema,请读这篇文章)。端到端的参考架构如下:
接下来让我们考虑这么一个应用,它通过 Kafka Streams API 运行了一些稳定的实时流处理应用,就以上面的 端到端的参考架构 举个例子,从而给大家展示:
- 运行一个 Kakfa 数据源连接器,用于从其他系统(一个 SQLite3 数据库)读取数据,并且使用简单的消息转换器(SMTs)在写入到 Kafka 之前进行空白填充。
- 使用 Java 语言,通过 Kafka Stream API 处理和丰富数据(例如:计数和求和)
- 运行一个 Kafka sink 连接器将数据从 Kafka 集群中写入到其他系统(AWS S3)
这个示例的工作流如下:
如果你想跟着我的指导在你自己的环境中试试,那么可以根据 quickstart 指导来启动一个 Kafka 集群,并且从这里下载完整的源代码。
提取数据到 Kafka
首先,我们需要从客户端应用程序中获得数据。要想在 Kafka 和其他系统间复制数据,我们可以从各种随时可用的连接器中选择合适的 Kafka 连接器。Kafka 源连接器从其他系统中加载数据放入 Kafka 中,然后 Kafka sink 连接器从 Kafka 中导出数据到其他系统。
对于我们的例子,我们想从保存在 /usr/local/lib/retail.db
的 SQLite3 数据库中提取数据,在这个数据库中有一张表 locations
,这张表有三个字端 id
/name
和 sale
,里面的内容大概是这样的:
我们要从这张表的数据创建一个数据流,然后流中的每个消息都是一个 键值对,你可能会问什么是键,什么是值,好吧,看下这个:
为了从表中提取数据到 Kafka 主题中,我们使用了免费的 Confluent Open Source 中下载的 JDBC 连接器 组件,需要注意的是,默认情况下,JDBC 连接器是不会往消息中添加键的。因为消息的键对于消息的组织和分组非常有用,所以我们可以通过 SMTs 来设置键。如果你使用的是默认的配置选项,那么数据将会以下面的配置写入到 Kafka 主题中:
但是,这却不是我想要的,所以我们需要修改配置为以下这个配置:
为了实现我们想要的配置,我们可以通过修改 JDBC 源的连接器配置文件 source-quickstart-sqlite.properties
文件来实现:
接着我们将下面几行配置添加到 JDBC 的源连接器配置文件中,以利用单消息转换(SMT)函数,将从表中提取出来的的行数据在放入 Kafka 主题之前进行处理。我们使用 ValueToKey
和 ExtractField
SMT 函数将空的键替换成从消息内容中派生出来的字段。
最后,我们再将以下几行配置添加到 JDBC 的源连接器配置文件中,将键转换成字符串类型(因为它更便于JSON 或者 Avro 序列化),将值转换成格式化的 Avro。Confulent Schema Registry 运行在 http://schemaregistry1:8081
这里为了简化操作,我们使用 JDBC 源连接器的 独立模式 连接 Kafka,需要指出的是,在生产环境中,出于扩展和容错等需要,我们通常会使用分布式模式,还会加上 Confluent 控制中心 用于集中管理。
$ bin/connect-standalone connect-standalone.properties source-quickstart-sqlite.properties
敲完这个命令之后,Kafka 连接就会运行 JDBC 连接器并且将表中的每一行都以键值对的方式写入到 Kafka 主题的尾端,对这张表的状态感兴趣的应用都可以读取这个话题。这样,每当有记录被添加到 SQLite3 数据库中的表上时,Kafka 连接会自动得将他们以消息的形式写入到相应的 Kafka 主题,然后自然而然地成为客户端应用程序可以读取的 KStream 对象啦。至此,我们已经实现将数据作为一个没有绑定,并且持续实时流了,这个实时流就是我们所谓的”数据流”!
表中的每一行都被序列化成一个 Avro 记录,我们在主题的末尾位置通过 Confluent Schema Registry 来查看消息值的 schema。
通过 Kafka Streams API 转换数据
现在源数据已经写入到 Kafka 的主题中啦,任何的应用程序到可以从 主题 中读取数据,并且通过 Schema Registry 反序列化 Avro记录 的消息值。下面给出一个简单的应用程序:kafka-acro-console-consumer:
但是,这个命令行消费者肯定不是我们的最终目标,我们只是想演示一下如何在你的客户端应用程序中使用 Kafka Streams API 来处理主题中的数据。对于如何使用这个 API 来开发应用程序,Confluent 有出色的文档可以阅读。
这里我就着重讲解一下这个应用的两个重要部分:
- 从 Kafka 主题中创建 Kafka Stream 对象
- 使用 Kafka 流处理操作进行数据转换
创建 Kafka Stream 对象
创建 Kafka Stream 对象意味着我们需要在客户端中将 Kafka 的字节记录转换成 Java 对象,因为消息的值是 Avro 记录,所以我们需要一个满足 Avro Schema 的 Java 类。我们创建了一个叫做 location.avsc
的 Avro schema 文件,用于定义客户端期望的数据结构,以 JSON 格式编写,里面包含对应于表中列的三个字端 id
/name
和 sale
。
为了使Java客户端应用程序能够反序列化在此 Avro模式 中编写的消息,我们需要具有相应的Java类(例如Location)。但是,我们不需要编写Java代码!在我们的 pom.xml 中,我们使用了可以自动生成这些类 Java代码 的 Maven插件 avro-maven-plugin。
现在Java源码是从Avro架构文件自动创建的,您的应用程序可以导入该软件包:
//导入位置的类
import io.confluent.examples.connectandstreams.avro.Location;
下一个关键步骤是配置流配置以使用适当的序列化/反序列化类并指向模式注册表:
现在你就可以创建 KStream 对象啦:
请注意,这里介绍的工作流程中省略了一些基本的 Kafka Streams 设置,如果想详细了解的话,可以查看 Streams开发人员指南。
处理和补全数据
在我们上面给出的客户端应用程序中,我们有一个 KStream对象 locationsStream
,其中包含了消息流,它的 key 是一个 <Long>
类型的 id,value 是包含id
,name
和 sale
的 <Location>
记录。
现在我们可以用流处理器进行数据转换。前面介绍过,Kafka Streams API 可以用在你核心业务的应用程序中。同时还存在各种各样的 流处理器,他们一次接收一个输入记录,然后对其进行业务处理,然后就可以向下游的处理器产生一个或多个输出记录。这些处理器可以是无状态的(例如,一次转换一个消息,或者基于某些条件过滤掉消息)或有状态(例如跨多个消息的连接,聚合或窗口数据)。
要开始使用 Kafka Streams API,这里有三个示例,其示例代码和相应的可视化数据的结果如下:
将数据流转换为新的键/值对类型。例如,我们可以使用 Map 方法将原来的
KStream<Long, Location>
对象转换成KStream<Long, Long>
键/值对的,其中 key 没变,value 变成了自由sale
的值。//从locationsStream构建KStream <Long,Long>
KStream <Long,Long> sales = locationsStream.map((k,v) -> new KeyValue <Long,Long>(k,v.getSale()));
通过首先根据 key 对消息进行分组,然后使用 count 方法来计算 key 的出现次数。Kafka Streams API 有原生的抽象用于捕获流和表:KStream 表示消息流,其中每个数据记录都表示 无界数据集 中的独立数据,而 KTable 则表示更改日志,其中每个数据记录都表示更新,我们也可以给一个 KTable 命名本地状态,这使得我们可以方便地像操作一个正常的表一样来查询它。例如,我们可以计算每个键的出现次数。
KTable<Long, Long> countKeys = sales.groupByKey(Serdes.Long(), Serdes.Long())
先根据 key 对消息进行分组,然后使用 reduce 方法对值进行求和,让特定 key 的值相加。例如,我们可以将所有消息中的给定 key 的 sale值相加,并在添加新的销售记录时实时更新。这个例子也显示了再 Kafka Stream 中我们怎么再 KStream 和 KTable 之间互相转换的。
KStream <Long,Long> salesAgg = sales.groupByKey(Serdes.Long(),Serdes.Long())
.reduce((aggValue,newValue)) - > aggValue + newValue,SALES_STORE)
.toStream();
将数据加载到其他系统
在这之前,所有数据仍然在我们的客户端应用程序中,我们可能希望将其导出到其他系统。事实上,我们可能希望很多的系统都可以称为这个数据流管道的一部分。您可以通过 Kafka sink 连接器运行多个 Kafka Connect,这样就可以让任意数量的下游系统可以接收到相同的数据。Kafka Sink 连接器可以和 Kafka源连接器同时运行。
在我们的示例中,目标系统是 AWS S3 bucket,想要往这加载数据,其实就和从 Kafka 中获取数据一样容易,这个应用程序使用以下单个语句将 KStream
从 Kafka主题 中取出:
// Write KStream to a topic
salesAgg.to(Serdes.Long(), Serdes.Long(), OUTPUT_TOPIC);
然后,Kafka Sink连接器处理下游系统的加载,你可以像运行 Kafka 的源连接器一样,再这个 Kafka 主题上运行一个 Kafka Sink 连接器。要想将数据加载到AWS S3中,你需要配置主题名称,S3区域和存储桶等连接器属性,然后用这个命令启动连接器:
$ bin/connect-standalone connect-standalone.properties s3-sink-connector.properties
总结
可能本博客文章中描述的工作流程对于这个非常简单的例子来说,显得有点臃肿,没必要,然而,考虑一下具有多个数据源和目标的现实场景,我们需要抓取数据,同时支持不断发展的各种模式,因此就需要支持复杂转换的多步骤实时工作流,同时还要保持高的可用性和容错性。Kafka 提供了一个非常灵活,可扩展的架构基础,用于构建流式的 ETL 工作流。您不想尝试将其与传统的 ETL范例 结合在一起,因为这会不可避免得使得整个过程变得混乱。通过参阅 Kafka 的架构图,您可以发现 Kafka 的流式 ETL 对实时的关键性业务应用程序是多么有用:
Kafka 流媒体平台允许关键应用程序实时处理数据:
- Kafka Connect 框架的 Kafka 连接器可以让开发者从一个系统中提取数据并将其加载到另一系统中
- Kafka Streams API 为应用程序提供流处理功能,可以一次转换一个消息或事件。这些变换可以包括在多个数据源上进行 join 操作/过滤数据/聚合某个周期时间内的数据
- Confluent Schema Registry 提供 Avro schema 来规范数据
- Confluent Control Center 提供数据的集中管理