56、Flink DataStream 的管理执行配置详解

1)概述
1.执行配置

StreamExecutionEnvironment 包含了 ExecutionConfig,它允许在运行时设置作业特定的配置值。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ExecutionConfig executionConfig = env.getConfig();

以下是可用的配置选项:(默认为粗体)

  • setClosureCleanerLevel()。closure cleaner 的级别默认设置为 ClosureCleanerLevel.RECURSIVE。closure cleaner 删除 Flink 程序中对匿名 function 的调用类的不必要引用。禁用 closure cleaner 后,用户的匿名 function 可能正引用一些不可序列化的调用类。这将导致序列化器出现异常。可设置的值是: NONE:完全禁用 closure cleaner ,TOP_LEVEL:只清理顶级类而不递归到字段中,RECURSIVE:递归清理所有字段。
  • getParallelism() / setParallelism(int parallelism)。为作业设置默认的并行度。
  • getMaxParallelism() / setMaxParallelism(int parallelism)。为作业设置默认的最大并行度。此设置决定最大并行度并指定动态缩放的上限。
  • getNumberOfExecutionRetries() / setNumberOfExecutionRetries(int numberOfExecutionRetries)。设置失败任务重新执行的次数。值为零会有效地禁用容错。-1 表示使用系统默认值(在配置中定义)。该配置已弃用,请改用重启策略。
  • getExecutionRetryDelay() / setExecutionRetryDelay(long executionRetryDelay)。设置系统在作业失败后重新执行之前等待的延迟(以毫秒为单位)。在 TaskManagers 上成功停止所有任务后,开始计算延迟,一旦延迟过去,任务会被重新启动。该配置已被弃用,请改用重启策略 。
  • getExecutionMode() / setExecutionMode()。默认的执行模式是 PIPELINED。设置执行模式以执行程序。执行模式定义了数据交换是以批处理方式还是以流方式执行。
  • enableForceKryo() / disableForceKryo。默认情况下不强制使用 Kryo。强制 GenericTypeInformation 对 POJO 使用 Kryo 序列化器,即使可以将它们作为 POJO 进行分析。在某些情况下,应该优先启用该配置。例如,当 Flink 的内部序列化器无法正确处理 POJO 时。
  • enableForceAvro() / disableForceAvro()。默认情况下不强制使用 Avro。强制 Flink AvroTypeInfo 使用 Avro 序列化器而不是 Kryo 来序列化 Avro 的 POJO。
  • enableObjectReuse() / disableObjectReuse()。默认情况下,Flink 中不重用对象。启用对象重用模式会指示运行时重用用户对象以获得更好的性能。注意可能会导致bug。
  • getGlobalJobParameters() / setGlobalJobParameters()。此方法允许用户将自定义对象设置为作业的全局配置。由于 ExecutionConfig 可在所有用户定义的 function 中访问,因此这是一种使配置在作业中全局可用的简单方法。
  • addDefaultKryoSerializer(Class type, Serializer serializer)。为指定的类型注册 Kryo 序列化器实例。
  • addDefaultKryoSerializer(Class type, Class> serializerClass)。为指定的类型注册 Kryo 序列化器的类。
  • registerTypeWithKryoSerializer(Class type, Serializer serializer)。使用 Kryo 注册指定类型并为其指定序列化器。通过使用 Kryo 注册类型,该类型的序列化将更加高效。
  • registerKryoType(Class type)。如果类型最终被 Kryo 序列化,那么它将在 Kryo 中注册,以确保只有标记(整数 ID)被写入。如果一个类型没有在 Kryo 注册,它的全限定类名将在每个实例中被序列化,从而导致更高的 I/O 成本。
  • registerPojoType(Class type)。将指定的类型注册到序列化栈中。如果该类型最终被序列化为 POJO,那么该类型将注册到 POJO 序列化器中。如果该类型最终被 Kryo 序列化,那么它将在 Kryo 中注册,以确保只有标记被写入。如果一个类型没有在 Kryo 注册,它的全限定类名将在每个实例中被序列化,从而导致更高的I/O成本。

注意:用 registerKryoType() 注册的类型对 Flink 的 Kryo 序列化器实例来说是不可用的。

  • disableAutoTypeRegistration()。自动类型注册在默认情况下是启用的。自动类型注册是将用户代码使用的所有类型(包括子类型)注册到 Kryo 和 POJO 序列化器。
  • setTaskCancellationInterval(long interval)。设置尝试连续取消正在运行任务的等待时间间隔(以毫秒为单位)。当一个任务被取消时,会创建一个新的线程,如果任务线程在一定时间内没有终止,新线程就会定期调用任务线程上的 interrupt() 方法。这个参数是指连续调用 interrupt() 的时间间隔,默认设置为 30000 毫秒,或 30秒

通过 getRuntimeContext() 方法在 Rich* function 中访问到的 RuntimeContext 也允许在所有用户定义的 function 中访问 ExecutionConfig

2.程序打包和分布式运行
a)打包程序

为了能够通过命令行或 web 界面执行打包的 JAR 文件,程序必须使用通过 StreamExecutionEnvironment.getExecutionEnvironment() 获取的 environment。当 JAR 被提交到命令行或 web 界面后,该 environment 会扮演集群环境的角色。如果调用 Flink 程序的方式与上述接口不同,该 environment 会扮演本地环境的角色。

打包程序只要简单地将所有相关的类导出为 JAR 文件,JAR 文件的 manifest 必须指向包含程序入口点(拥有公共 main 方法)的类。实现的最简单方法是将 main-class 写入 manifest 中(比如 main-class: org.apache.flinkexample.MyProgram)。main-class 属性与 Java 虚拟机通过指令 java -jar pathToTheJarFile 执行 JAR 文件时寻找 main 方法的类是相同的。

大多数 IDE 提供了在导出 JAR 文件时自动包含该属性的功能。

b)总结

调用打包后程序的完整流程包括两步:

  • 搜索 JAR 文件 manifest 中的 main-classprogram-class 属性。如果两个属性同时存在,program-class 属性会优先于 main-class 属性。对于 JAR manifest 中两个属性都不存在的情况,命令行和 web 界面支持手动传入入口点类名参数。
  • 系统接着调用该类的 main 方法。
3.并行执行
a)概述

一个 Flink 程序由多个任务 task 组成(转换/算子、数据源和数据接收器)。一个 task 包括多个并行执行的实例,且每一个实例都处理 task 输入数据的一个子集。一个 task 的并行实例数被称为该 task 的 并行度 (parallelism)。

使用 savepoints 时,应该考虑设置最大并行度。当作业从一个 savepoint 恢复时,可以改变特定算子或者整个程序的并行度,并且此设置会限定整个程序的并行度的上限。由于在 Flink 内部将状态划分为了 key-groups,且性能所限不能无限制地增加 key-groups,因此设定最大并行度是有必要的。

b)设置并行度

算子层面

单个算子、数据源和数据接收器的并行度可以通过调用 setParallelism()方法来指定。如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...];
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(value -> value.f0)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");

执行环境层次

Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。

可以通过调用 setParallelism() 方法指定执行环境的默认并行度。如果想以并行度3来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...];
DataStream<Tuple2<String, Integer>> wordCounts = [...];
wordCounts.print();

env.execute("Word Count Example");

客户端层次

将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。

在 CLI 客户端中,可以通过 -p 参数指定并行度,例如:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

在 Java/Scala 程序中,可以通过如下方式指定并行度:

try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}

系统层次

可以通过设置 Flink 配置文件中的 parallelism.default 参数,在系统层次来指定所有执行环境的默认并行度。

c)设置最大并行度

最大并行度可以在所有设置并行度的地方进行设定(客户端和系统层次除外)。与调用 setParallelism() 方法修改并行度相似,可以通过调用 setMaxParallelism() 方法来设定最大并行度。

默认的最大并行度等于将 operatorParallelism + (operatorParallelism / 2) 值四舍五入到大于等于该值的一个整型值,并且这个整型值是 2 的幂次方,注意默认最大并行度下限为 128,上限为 32768

为最大并行度设置一个非常大的值将会降低性能,因为一些 state backends 需要维持内部的数据结构,而这些数据结构将会随着 key-groups 的数目而扩张(key-group 是状态重新分配的最小单元)。

从之前的作业恢复时,改变该作业的最大并发度将会导致状态不兼容。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/771800.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Oracle连接mysql

oracle使用的11g&#xff0c;在一台windows服务器&#xff1b;mysql使用的是5.7版本&#xff0c;在另一台windows服务器&#xff0c;这两个服务器之间的网络是互通的。做BI时&#xff0c;要获取不同数据源的数据&#xff0c;这些数据源可能是Oracle&#xff0c;也可能是sqlserv…

股票分析-20240628

今日关注&#xff1a; 20240626 六日涨幅最大: ------1--------300386--------- 飞天诚信 五日涨幅最大: ------1--------300386--------- 飞天诚信 四日涨幅最大: ------1--------300386--------- 飞天诚信 三日涨幅最大: ------1--------300386--------- 飞天诚信 二日涨幅最…

【pytorch14】感知机

单层感知机模型 对于单层的感知机&#xff0c;它的激活函数是一个sigmoid 对于符号的定义做一个规范化&#xff0c;输入层每一层进行一个编号 输入是第0层&#xff0c;上标0表示属于输入层&#xff0c;下标0到n表示一共有n个节点(这里严格来说应该是0~n-1&#xff0c;为了书写…

Zoom使用的基本步骤和注意事项

Zoom是一款功能强大的视频会议软件&#xff0c;广泛应用于远程办公、在线教育、团队协作等多个场景。以下是Zoom使用的基本步骤和注意事项&#xff1a; 一、注册与登录 注册Zoom账户&#xff1a; 访问Zoom官方网站&#xff08;如zoom.us&#xff09;&#xff0c;点击“注册”…

Games101学习笔记 Lecture16 Ray Tracing 4 (Monte Carlo Path Tracing)

Lecture16 Ray Tracing 4 (Monte Carlo Path Tracing 一、蒙特卡洛积分 Monte Carlo Integration二、路径追踪 Path tracing1.Whitted-Style Ray Tracings Problems2.只考虑直接光照时3.考虑全局光照①考虑物体的反射光②俄罗斯轮盘赌 RR &#xff08;得到正确shade函数&#x…

精准畜牧业:多维传感监测及分析动物采食行为

全球畜牧业呈现出一个动态且复杂的挑战。近几十年来&#xff0c;它根据对动物产品需求的演变进行了适应&#xff0c;动物生产系统需要提高其效率和环境可持续性。在不同的畜牧系统中有效行动取决于科学技术的进步&#xff0c;这允许增加照顾动物健康和福祉的数量。精准畜牧业技…

JavaScript-WebAPI

文章目录 JS组成什么是 webApis 和APIDOM 简介document 对象 获取 DOM 对象利用css选择器来获取DOM元素选择指定css选择器的所有元素其他获取DOM元素方法&#xff08;了解&#xff09; 操作元素内容对象.innerText对象.innerHTML 操作元素属性操作元素常用属性操作元素样式属性…

pytorch中的contiguous()

官方文档&#xff1a;https://pytorch.org/docs/stable/generated/torch.Tensor.contiguous.html 其描述contiguous为&#xff1a; Returns a contiguous in memory tensor containing the same data as self tensor. If self tensor is already in the specified memory forma…

羊大师:羊奶养生,解锁健康之道的新密码

在探寻健康与养生的旅途中&#xff0c;我们总渴望找到那把开启健康之门的钥匙。而今&#xff0c;羊奶以其独特的营养价值和健康益处&#xff0c;正悄然成为那把解锁健康之道的新密码。 羊奶&#xff0c;自古以来便是自然赋予的珍贵礼物。它富含优质蛋白、多种维生素及矿物质&am…

pandas数据分析(6)

算数运算 和Numpy数组一样&#xff0c;DataFrame和Series也利用了向量化技术。例如&#xff1a; 不过pandas真正强大之初在于自动对齐机制&#xff1a;当对多个DataFrame使用算数运算符时&#xff0c;pandas会自动将它们按照列或行索引对齐。 结果DataFrame的索引和列是两个Da…

day02-统计数据

numpy统计学 1.求平均值[数组名.mean()/np.mean(数组名)] m1 np.arange(20).reshape((4,5))m1.mean() #9.5若想要求某一维的平均值&#xff0c;设置axis参数&#xff0c;多维数组元素指定&#xff1a; axis 0&#xff0c;将从上往下计算。axis 1&#xff0c;将从左往右计算…

VIM介绍

VIM&#xff08;Vi IMproved&#xff09;是一种高度可配置的文本编辑器&#xff0c;用于有效地创建和更改任何类型的文本。它是从 vi 编辑器发展而来的&#xff0c;后者最初是 UNIX 系统上的一个文本编辑器。VIM 以其键盘驱动的界面和强大的文本处理能力而闻名&#xff0c;是许…

拼接各列内容再分组统计

某个表格的第1列是人名&#xff0c;后面多列是此人某次采购的产品&#xff0c;一个人一次可以采购多个同样的产品&#xff0c;也可以多次采购。 ABCD1JohnAppleAppleOrange2PaulGrape3JohnPear4SteveLycheeGrape5JessicaApple 需要整理成交叉表&#xff0c;上表头是产品&…

透过 Go 语言探索 Linux 网络通信的本质

大家好&#xff0c;我是码农先森。 前言 各种编程语言百花齐放、百家争鸣&#xff0c;但是 “万变不离其中”。对于网络通信而言&#xff0c;每一种编程语言的实现方式都不一样&#xff1b;但其实&#xff0c;调用的底层逻辑都是一样的。linux 系统底层向上提供了统一的 Sock…

openlayers中区域掩膜的实现

概述 在前文完成了mapboxGL中区域掩膜的实现。近日有人问到说在openlayers中如何实现&#xff0c;本文就带大家看看如何在openlayers中实现区域掩膜。 实现效果 实现 1. 实现思路 在地图容器中添加一个canvas&#xff0c;设置其在map之上&#xff1b;监听map的postrender事…

Vue2-Vue Router前端路由实现思路

1.路由是什么&#xff1f; Router路由器&#xff1a;数据包转发设备&#xff0c;路由器通过转发数据包&#xff08;数据分组&#xff09;来实现网络互连 Route路由&#xff1a;数据分组从源到目的地时&#xff0c;决定端到端路径的网络范围的进程 | - 网络层 Distribute分发…

时空预测+特征分解!高性能!EMD-Transformer和Transformer多变量交通流量时空预测对比

时空预测特征分解&#xff01;高性能&#xff01;EMD-Transformer和Transformer多变量交通流量时空预测对比 目录 时空预测特征分解&#xff01;高性能&#xff01;EMD-Transformer和Transformer多变量交通流量时空预测对比效果一览基本介绍程序设计参考资料 效果一览 基本介绍…

顶级10大AI测试工具

每周跟踪AI热点新闻动向和震撼发展 想要探索生成式人工智能的前沿进展吗&#xff1f;订阅我们的简报&#xff0c;深入解析最新的技术突破、实际应用案例和未来的趋势。与全球数同行一同&#xff0c;从行业内部的深度分析和实用指南中受益。不要错过这个机会&#xff0c;成为AI领…

Oracle Database 23ai新特性:DB_DEVELOPER_ROLE角色

角色介绍 从 Oracle Database 23ai 开始&#xff0c;新角色“DB_DEVELOPER_ROLE”允许管理员快速分配开发人员为 Oracle 数据库设计、构建和部署应用程序所需的所有必要权限。&#xff08;包括构建数据模型所需的系统权限以及监视和调试应用程序所需的对象权限&#xff09;。通…

【数据结构】02.顺序表

一、顺序表的概念与结构 1.1线性表 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有限序列。线性表是⼀种在实际中广泛使用的数据结构&#xff0c;常见的线性表&#xff1a;顺序表、链表、栈、队列、字符串… 线性表在逻辑上是线性结构&#xff0…