从Flink client提交源码看第三方jar包的动态加载的解决方案是怎样的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。
创新互联是一家专业提供裕安企业网站建设,专注与成都网站建设、成都网站设计、H5高端网站建设、小程序制作等业务。10年已为裕安众多企业、政府机构等服务。创新互联专业的建站公司优惠进行中。
1. flink run 提交流程源码分析
查看flink脚本找到执行run命令的入口类,如下:
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@
入口类为:org.apache.flink.client.cli.CliFrontend。 最终会调用 parseParameters(String[] args) 方法来执行命令解析,run 命令会调用 run(params) 方法,如下:
switch (action) { case ACTION_RUN: run(params); return 0; case ACTION_RUN_APPLICATION: runApplication(params); return 0; case ACTION_LIST: list(params); return 0; case ACTION_INFO: info(params); return 0; case ACTION_CANCEL: cancel(params); return 0; case ACTION_STOP: stop(params); return 0; case ACTION_SAVEPOINT: savepoint(params); return 0; }
run 方法代码如下
protected void run(String[] args) throws Exception { LOG.info("Running 'run' command."); final Options commandOptions = CliFrontendParser.getRunCommandOptions(); final CommandLine commandLine = getCommandLine(commandOptions, args, true); // evaluate help flag if (commandLine.hasOption(HELP_OPTION.getOpt())) { CliFrontendParser.printHelpForRun(customCommandLines); return; } final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine)); final ProgramOptions programOptions = ProgramOptions.create(commandLine); # 创建 PackagedProgram 对象 final PackagedProgram program = getPackagedProgram(programOptions); #解析获取相关依赖jar final ListjobJars = program.getJobJarAndDependencies(); # 生成最终提交配置 final Configuration effectiveConfiguration = getEffectiveConfiguration( activeCommandLine, commandLine, programOptions, jobJars); LOG.debug("Effective executor configuration: {}", effectiveConfiguration); try { executeProgram(effectiveConfiguration, program); } finally { program.deleteExtractedLibraries(); } }
run方法根据用户传入的参数如 main函数,jar包等信息创建出 PackagedProgram 对象,这个对象封装了用户提交的信息。从 getPackagedProgram()方法里可以看出。
return PackagedProgram.newBuilder() .setJarFile(jarFile) .setUserClassPaths(classpaths) .setEntryPointClassName(entryPointClass) .setConfiguration(configuration) .setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings()) .setArguments(programArgs) .build();
查看PackagedProgram构造方法,里面会创建几个关键成员变量:
classpaths:用户-C 参数传入的信息
jarFile : 用户的主函数的jar
extractedTempLibraries :提取出上面主jar包里 lib/ 文件夹下的所有jar包信息,供后面classloader使用
userCodeClassLoader : 用户code的classloader,这个classloader会把classpaths,jarFile,extractedTempLibraries 都加入到classpath里。该userCodeClassLoader默认采用child_first优先策略
mainClass :用户main函数方法 该构造方法如下:
private PackagedProgram( @Nullable File jarFile, Listclasspaths, @Nullable String entryPointClassName, Configuration configuration, SavepointRestoreSettings savepointRestoreSettings, String... args) throws ProgramInvocationException { this.classpaths = checkNotNull(classpaths); this.savepointSettings = checkNotNull(savepointRestoreSettings); this.args = checkNotNull(args); checkArgument(jarFile != null || entryPointClassName != null, "Either the jarFile or the entryPointClassName needs to be non-null."); // whether the job is a Python job. this.isPython = isPython(entryPointClassName); // load the jar file if exists this.jarFile = loadJarFile(jarFile); assert this.jarFile != null || entryPointClassName != null; // now that we have an entry point, we can extract the nested jar files (if any) this.extractedTempLibraries = this.jarFile == null ? Collections.emptyList() : extractContainedLibraries(this.jarFile); this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader( getJobJarAndDependencies(), classpaths, getClass().getClassLoader(), configuration); // load the entry point class this.mainClass = loadMainClass( // if no entryPointClassName name was given, we try and look one up through the manifest entryPointClassName != null ? entryPointClassName : getEntryPointClassNameFromJar(this.jarFile), userCodeClassLoader); if (!hasMainMethod(mainClass)) { throw new ProgramInvocationException("The given program class does not have a main(String[]) method."); } }
PackagedProgram 里 getJobJarAndDependencies 方法,该方法收集了job所有依赖的jar包,这些jar包后续会提交到集群并加入到classpath路径中。
PackagedProgram对象构造完成之后,便是创建最终的Configuration对象了,如下方法
final Configuration effectiveConfiguration = getEffectiveConfiguration( activeCommandLine, commandLine, programOptions, jobJars);
这个方法会设置两个参数:
pipeline.classpaths: 值为getJobJarAndDependencies()和classpaths里的url
pipeline.jars: 值为getJobJarAndDependencies()返回的jar和lib文件夹下的依赖,后续提交集群的时候会根据这个把jar一起提交到集群
准备好 PackagedProgram和Configuration后,就开始执行用户程序了,
executeProgram(effectiveConfiguration, program);
详细代码如下:
public static void executeProgram( PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, PackagedProgram program, boolean enforceSingleJobExecution, boolean suppressSysout) throws ProgramInvocationException { checkNotNull(executorServiceLoader); final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader(); final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); try { # 设置用户上下文用户类加载器 Thread.currentThread().setContextClassLoader(userCodeClassLoader); LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED)); ContextEnvironment.setAsContext( executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout); StreamContextEnvironment.setAsContext( executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout); try { # 反射调用户的 main 函数执行job提交 program.invokeInteractiveModeForExecution(); } finally { ContextEnvironment.unsetAsContext(); StreamContextEnvironment.unsetAsContext(); } } finally { Thread.currentThread().setContextClassLoader(contextClassLoader); } }
最后总结一下整个流程:
执行flink run 命名传入相关参数
创建PackagedProgram对象,准备相关jar,用户类加载器,Configuration对象
通过反射调用用户Main方法
构建Pipeline StreamGraph,提交job到集群
2. 提交job时,动态加载第三方jar(如udf等)
通过分析流程我们可以发现可以有两种方式来实现动态jar的添加
动态的 把三方jar 放入 主函数jar包的lib目录下(可以通过jar uf 命名搞定) 因为在PackagedProgram构造方法里会通过extractContainedLibraries()方法获取jar lib目录里的所有jar,并且这些jar会一并上传到集群
在用户任务main函数里,通过反射动态设置 Configuration 对象的 pipeline.classpaths , pipeline.jars 这两个属性 。并且还需要把第三方jar加载到Thread.contextClassLoader里。(可参见:https://zhuanlan.zhihu.com/p/278482766)
本人在项目中直接采用的是第一种方案,不会添加更多代码。
看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注创新互联行业资讯频道,感谢您对创新互联的支持。
网页标题:从Flinkclient提交源码看第三方jar包的动态加载的解决方案是怎样的
文章位置:http://lswzjz.com/article/gscsop.html