首页 » 软件优化 » 使用Kafka Streams创建流数据管道(拓扑处理器主题创建输入)

使用Kafka Streams创建流数据管道(拓扑处理器主题创建输入)

萌界大人物 2024-11-04 11:10:51 0

扫一扫用手机浏览

文章目录 [+]

> A simple Kafka Streams topology

Kafka Streams的关键概念

· 流是无界的,不断更新的数据集,由有序,可重播和容错的键值对序列组成。

使用Kafka Streams创建流数据管道(拓扑处理器主题创建输入) 软件优化
(图片来自网络侵删)

· 流处理器是拓扑中的一个节点,它一次从拓扑中的上游处理器接收一个输入记录,对其应用操作,并可以选择向其下游处理器生成一个或多个输出记录。

· 源处理器是没有任何上游处理器的处理器。

· 接收器处理器是没有任何下游处理器的处理器。

入门

在本教程中,我将使用Kafka和Kafka Streams的Java API。
我将假设您对使用Maven来构建Java项目有基本的了解,并且对Kafka有基本的了解,并且已经设置了Kafka实例。
Lenses.io提供了一种快速简便的容器化解决方案,用于在此处设置Kafka实例。

首先,我们需要将kafka-clients和kafka-streams作为依赖项添加到项目pom.xml中:

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.6.0</version></dependency><dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.6.0</version></dependency>建立流式拓扑

流拓扑需要一个或多个输入,中间和输出主题。
可在此处找到有关创建新Kafka主题的信息。
一旦创建了必要的主题,就可以创建流式拓扑。
这是为输入主题创建拓扑的示例,其中值序列化为JSON(由GSON序列化/反序列化)。

import java.util.Properties;import com.google.gson.JsonObject;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.KStream;// ... //Properties props = new Properties();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, LOCATION_OF_KAFKA_BROKERS);props.put(StreamsConfig.APPLICATION_ID_CONFIG, "MyStreamingApp");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());StreamsBuilder builder = new StreamsBuilder();KStream<String, JsonObject> inputStream = builder.stream(inputTopic);KafkaStreams streams = new KafkaStreams(builder.build(), config);streams.start();

上面的示例是一个非常简单的流式拓扑,但是到目前为止,它实际上并没有做任何事情。
需要特别注意的是,拓扑是由执行前一代码段的应用程序执行和保留的,该拓扑不会在Kafka代理中运行。
由创建的应用程序支付所有拓扑处理开销。

可以通过执行以下命令来停止正在运行的拓扑:

streams.close();

为了使这种拓扑更加有用,我们需要定义基于规则的分支(或边)。
在下一个示例中,我们基于JSON消息有效负载中特定字段的值创建具有3个分支的基本拓扑。

public static final double SOME_CONSTANT = ...;// ... //// define conditions for each branch (edge) of topologyPredicate<String, JsonObject> greaterThan = (String key, JsonObject value) -> { double dValue = value.get("my_double_value").getAsDouble(); return dValue > SOME_CONSTANT;};Predicate<String, JsonObject> lessThan = (String key, JsonObject value) -> { double dValue = value.get("my_double_value").getAsDouble(); return dValue < SOME_CONSTANT;};Predicate<String, JsonObject> equalTo = (String key, JsonObject value) -> { double dValue = value.get("my_double_value").getAsDouble(); // epsilon is an arbitrarily small real number, such as 1e-15 return Math.abs(dValue - SOME_CONSTANT) < epsilon;};Predicate<String, JsonObject>[] conditions = new Predicate<>[] { greaterThan, lessThan, equalTo };KStream<String, JsonElement>[] branches = inputStream.branch(conditions);// define a output topic for each branch (edge)branches[0].to("greater-than-topic");branches[1].to("less-than-topic");branches[2].to("equal-to-topic");KafkaStreams streams = new KafkaStreams(builder.build(), config);streams.start();

我们刚刚创建的拓扑如下图所示:

上一个示例中分支的下游使用者可以使用与任何其他Kafka主题完全相同的方式来使用分支主题。
下游处理器可以产生自己的输出主题。
因此,将下游处理器的结果与原始输入主题结合起来可能很有用。
我们还可以使用Kafka Streams API定义规则,以将结果输出主题加入单个流中。

交叉流

Kafka Streams通过SQL连接对其流连接功能进行建模。
共有三种连接:

· 内部联接:当两个输入主题都有具有相同键的记录时,发出输出。

· 左连接:为左输入或主输入主题中的每个记录发出输出。
如果另一个主题没有给定键的值,则将其设置为null。

· 外部联接:为任一输入主题中的每个记录发出输出。
如果只有一个源包含密钥,则另一个为null。

对于我们的示例,我们将输入记录流和下游处理器的结果连接在一起。
在这种情况下,最有意义的是将输入主题视为主要主题进行左联接。
这将确保加入的流始终输出原始输入记录,即使没有可用的处理器结果也是如此。

import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.JoinWindows;import org.apache.kafka.streams.kstream.KStream;// ... //StreamsBuilder builder = new StreamsBuilder();KStream<String, JsonObject> inputStream = builder.stream(inputTopic);KStream<String, JsonObject> resultStream = builder.stream(resultTopic);// left join with default serializers and deserializersKStream<String, JsonObject> joined = inputStream.leftJoin(resultStream, (inputValue, outputValue) -> "record=" + inputValue + ", results=" + outputValue, / ValueJoiner / JoinWindows.of(TimeUnit.MINUTES.toMillis(5)));joined.to(outputTopic);KafkaStreams streams = new KafkaStreams(builder.build(), config);streams.start();

最终的整体拓扑如下图所示:

以编程方式可以使用相同的服务来创建和执行两个流拓扑,但是在示例中我避免这样做以使图保持非循环状态。

参考文献

· Confluent文档

· 了解Kafka流的流处理

· Java KafkaStreams简介

(本文翻译自Jason Snouffer的文章《Creating a streaming data pipeline with Kafka Streams》,参考:https://itnext.io/creating-a-streaming-data-pipeline-with-kafka-streams-898fb352a7b7)

标签:

相关文章

你是开发者(学习开发者学生东西语言)

自我身份的认同/不同的行为方式仅以此篇,献给仍是学生的我们废话不多说,直接进入正片——学生与开发者的恩怨情仇0.0学生:书生百无一...

软件优化 2025-02-09 阅读948 评论0