0%

Flink剖析系列之Yarn Session Cluster 和 Yarn Per Job 模式作业提交流程分析

前言

由于在开发中使用Flink大多数使用的是Flink Yarn cluster 模式运行,认识的一些同学的公司也是基于这种模式,所以今天就深入探讨一下这种模式的启动流程。

文章结构

文章结构如下:

  1. 启动命令介绍与shell脚本启动流程解析
  2. CliFronted类解析(Flink程序启动入口)
  3. YarnClusterDescriptor类解析

image

环境

flink源码基于Flink1.9

启动命令介绍与shell脚本启动流程解析

启动命令

在项目开发中,flink的启动命令一般如下

1
2
3
4
5
6
7
nohup /usr/local/flink/current/bin/flink run -m yarn-cluster \
-yD metrics.reporter.promgateway.jobName=DEMO \
-yD metrics.reporter.promgateway.randomJobNameSuffix=false \
-yD metrics.reporter.promgateway.deleteOnShutdown=true \
-yn 3 -ys 6 -yjm 4096 -ytm 6144 -ynm Demo \
-c com.demo.MainClass demo-1.0-SNAPSHOT.jar \
1>demo.log 2>demo_error.log &

相关解释

image

flink脚本解析

image

主要步骤有三步

  1. 如果target是软连接,则循环拿到最终的执行目录
  2. 执行config.sh
  3. 初始化之后拿到java执行目录,冰执行难CliFronted.java

config.sh 脚本解析

因此主要还是要看 config.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

constructFlinkClassPath() {

}

manglePath(){

}

manglePathList(){

}
readFromConfig() {

}

###

从上面的脚本中可以看出,config.sh 做的事情很简单,主要是读取配置文件,加载环境变量
下面是readFromConfig函数的代码截图,逻辑很简单,读取配置属性

image

CliFronted类解析(Flink程序启动入口)

下面继续跟进到CliFronted类中,这是一个带有main函数的类,是整个应用的启动入口。

image

main函数主要步骤

  1. 获取配置目录
  2. 加载全局配置
  3. 加载自定义CommondLine,这个CustomCommandLine的英文释义是“Custom command-line interface to load hooks for the command-line interface.”,翻译一下就是“自定义命令行接口来加载命令行接口的钩子。” 是一个接口,主要的子类有两个,一个FlinkYarnSessionCli,一个DefaultCli,主要作用是处理命令行,比如判断是否符合当前的类型,以及获取集群id,解析命令行参数等等。
    image
  4. 构造CliFrontend,并且调用parseParameters

parseParameters 方法很简单,就是根据第一个参数的值调用响应的方法,比如我们是run,则调用run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* Executions the run action.
*
* @param args Command line arguments for the run action.
*/
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");

//前面几行主要是运行参数相关的
final Options commandOptions = CliFrontendParser.getRunCommandOptions();

final Options commandLineOptions =
CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);

final CommandLine commandLine =
CliFrontendParser.parse(commandLineOptions, args, true);

final RunOptions runOptions = new RunOptions(commandLine);

// evaluate help flag
if (runOptions.isPrintHelp()) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}

if (!runOptions.isPython()) {
// Java program should be specified a JAR file
if (runOptions.getJarFilePath() == null) {
throw new CliArgsException("Java program should be specified a JAR file.");
}
}

final PackagedProgram program;
try {
LOG.info("Building program from JAR file");
//构建打包程序,主要包含jar文件,类路径,主类,程序参数
program = buildProgram(runOptions);
}
catch (FileNotFoundException e) {
throw new CliArgsException("Could not build the program from JAR file.", e);
}
//根据commandLine得到响应的CustomCommandLine
final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);

try {
//执行程序
runProgram(customCommandLine, commandLine, runOptions, program);
} finally {
program.deleteExtractedLibraries();
}
}

这里请看 FlinkYarnSessionCli 的 isActive,因为我们传入了 -m yarn-cluster,所以 jobManagerOption 为 yarn-cluster,而ID = “yarn-cluster”;所以第一个条件就满足,所以返回的是FlinkYarnSessionCli

FlinkYarnSessionCli的类结构图

image

1
2
3
4
5
6
7
8
9
10
@Override
public boolean isActive(CommandLine commandLine) {
String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
boolean yarnJobManager = ID.equals(jobManagerOption);
boolean yarnAppId = commandLine.hasOption(applicationId.getOpt());
return yarnJobManager
|| yarnAppId
|| (isYarnPropertiesFileMode(commandLine)
&& yarnApplicationIdFromYarnProperties != null);
}

下面接着看runProgroam方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
private <T> void runProgram(CustomCommandLine<T> customCommandLine,	
CommandLine commandLine,RunOptions runOptions,PackagedProgram program)
throws ProgramInvocationException, FlinkException {
final ClusterDescriptor<T> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
try {
final T clusterId = customCommandLine.getClusterId(commandLine);

final ClusterClient<T> client;
// 此处clusterId如果不为null,则表示是session模式
/*
* Yarn模式:
* 1. Job模式:每个flink job 单独在yarn上声明一个flink集群
* 2. Session模式:在集群中维护flink master,即一个yarn application master,运行多个job。
*/
// directly deploy the job if the cluster is started in job mode and detached
if (clusterId == null && runOptions.getDetachedMode()) {
// Job + Detached模式
int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
//工具类从jar包中构建JobGraph
final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
// 这里部署JobCluster,内部在Yarn集群中启动应用,应用入口为JobClusterEntrypoint
client = clusterDescriptor.deployJobCluster(
clusterSpecification,
jobGraph,
runOptions.getDetachedMode());
try {
client.shutdown();
} catch (Exception e) {
LOG.info("Could not properly shut down the client.", e);
}
} else {
final Thread shutdownHook;
if (clusterId != null) {
//session模式
client = clusterDescriptor.retrieve(clusterId);
shutdownHook = null;
} else {
//Job + non-Detached模式
// also in job mode we have to deploy a session cluster because the job
// might consist of multiple parts (e.g. when using collect)
//在作业模式下,我们还必须部署一个会话集群,
//因为作业可能包含多个部分(例如,使用collect时),
//提供Dispatcher,ResourceManager和WebMonitorEndpoint等服务
final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
client = clusterDescriptor.deploySessionCluster(clusterSpecification);
// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
// there's a race-condition here if cli is killed before shutdown hook is installed
//非DetachedMode 需要add一个清理资源的苟泽
if (!runOptions.getDetachedMode()
&& runOptions.isShutdownOnAttachedExit()) {
shutdownHook =
ShutdownHookUtil.addShutdownHook(client::shutDownCluster,
client.getClass().getSimpleName(), LOG);
} else {
shutdownHook = null;
}
}
try {
...
//优化图,程序提交
executeProgram(program, client, userParallelism);
} finally {
...
}
}
} finally {
...
}
}

YarnClusterDescriptor类解析

两种模式示意图

图片来源于官网

Yarn Job 模式

每一个Flink Job 在Yarn上启动一个FLink集群,提交一次,生成一个Yarn Session,并且如果有 -d 命令参数,则启动 Yarn Job 模式下面的 Per Job 模式,有一些细微的差别。

这种适合大作业模式,一般项目中用这种比较多,可以更好的资源隔离,防止互相干扰。

image

Yarn Session 模式

需要先执行 yarn-session.sh 命令,yarn集群中维护Flink Master,即一个yarn application master,运行多个job。启动任务之前需要先启动一个一直运行的Flink集群,这种适合小作业模式

image

类结构图

YarnClusterDescriptor的类结构图如下,

image

来看看 customCommandLine.createClusterDescriptor(commandLine); 调用堆栈如下,返回YarnClusterDescriptor。

1
2
3
4
5
6
7
8
9
customCommandLine.createClusterDescriptor(commandLine);
createClusterDescriptor //FlinkYarnSessionCli
createDescriptor //FlinkYarnSessionCli
getClusterDescriptor(); //FlinkYarnSessionCli
return YarnClusterDescriptor
//设置jar路径
//设置队列名称
//设置ZK等其他配置
return yarnClusterDescriptor

YarnClusterDescriptor主要有两个方法核心方法,deployJobCluster 用于部署 per job 模式的作业,,deploySessionCluster用于部署小session类型的作业。 startAppMaster用于启动ApplicationMaster等组件。

Yarn Session Cluster模式部署源码分析

clusterDescriptor.deploySessionCluster(clusterSpecification)调用堆栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
clusterDescriptor.deploySessionCluster(clusterSpecification) //这里传入YarnSessionClusterEntrypoint 类
//This method will block until the ApplicationMaster/JobManager have been deployed on YARN.
deployInternal()
validateClusterSpecification()
checkYarnQueues()
yarnClient.createApplication()
validateClusterResources() //用于对比请求的资源(slot,mem)与yarn剩余资源的对比,
//并返回一个集群规范(描述)
startAppMaster() // return ApplicationReport
//初始化文件系统
//将应用程序主jar复制到文件系统
//创建一个本地资源来指向目标jar路径
//logback log4j 日志配置检查
//上传文件,设置flink配置(taskmanager number slot等)
//将jobGraph序列化到文件并且上传
//安全相关配置
setupApplicationMasterContainer(yarnClusterEntrypoint)
//设置执行入口 与yarn集群打交道的Yarn终端
// 此Entrypoint会提供webMonitor、resourceManager、dispatcher 等服务
startCommandValues.put("class", yarnClusterEntrypoint);
//设置java执行文件,jvm参数,heap大小,日志配置等
//构建启动命令
amContainer.setLocalResources(localResources); //设置本地资源为刚才上传的文件
//设置ApplicationMaster的环境变量和配置
// Setup CLASSPATH and environment variables for ApplicationMaster
final Map<String, String> appMasterEnv = new HashMap<>();
...
if (dynamicPropertiesEncoded != null) {
appMasterEnv.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
}
amContainer.setEnvironment(appMasterEnv); //给contaier设置变量
yarnClient.submitApplication(appContext); //调用yarnClient提交应用程序
loop: while (true) {
//获取提交application的state
//一直循环下去,知道状态为Killed则抛出异常,如果为Running,则提交应用成功
}

核心方法是 deployInternal,做了以下几件事情:

  1. 校验集群资源
  2. 检查队列
  3. 创建Yarn Application
  4. 启动ApplicationMaster
    1. 初始化文件系统
    2. 将应用程序主jar复制到文件系统
    3. 创建一个本地资源来指向目标jar路径
    4. logback log4j 日志配置检查
    5. 上传文件,设置flink配置(taskmanager number slot等)
    6. 将jobGraph序列化到文件并且上传(如果是Per Job 模式)
    7. 安全相关配置
    8. 设置Container相关的配置,比如设置container的入口,配置jvm参数等。
    9. 提交application

Yarn Per Job 模式部署源码解析

下面来看看 Per-job model的启动流程,Per job model 在CliFronted类中的runProgram中 line 233 行调用PackageProgramUtils生成了JobGraph实例,并且把实例传入到了 YarnclusterDescriptor 的 deployJobCluster方法,deployJobCluster方法调用getYarnJobClusterEntrypoint方法拿到的正是YarnJobClusterEntrypoint类,然后再次调用 AbstractYarnClusterDescriptor 的 deployInternal 方法,流程与Yarn Session模式一模一样。

image

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public ClusterClient<ApplicationId> deployJobCluster(
ClusterSpecification clusterSpecification,
JobGraph jobGraph,
boolean detached) throws ClusterDeploymentException {

// this is required because the slots are allocated lazily
jobGraph.setAllowQueuedScheduling(true);

try {
return deployInternal(
clusterSpecification,
"Flink per-job cluster",
getYarnJobClusterEntrypoint(),
jobGraph,
detached);
} catch (Exception e) {
throw new ClusterDeploymentException
("Could not deploy Yarn job cluster.", e);
}
}

在 line 507 中,调用startAppMaster的时候会传入 jobGraph

image

继续跟进,在startAppMaster方法中的有一个判断,如果jobGraph不为空,则会把这个文件上传。

image

小结

在本文中主要介绍了项目开发的一般启动脚本和解析,然后详细介绍了flink命令和config.sh的脚本源码。 接着解读了Flink入口类CliFrontend类,并介绍了YarnClusterDescriptor类和两种不同的Yarn Session Job 模式的源码。

后面会继续介绍两种模式的不同的入口类了,在下一章将会深入探讨这两种模式的启动流程。
ClusterEntrypoint 的类图如下
image