從Flink client提交源碼看第三方jar包的動態加載的解決方案是怎樣的,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
查看flink腳本找到執行run命令的入口類,如下:
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@入口類為:org.apache.flink.client.cli.CliFrontend。 最終會調用 parseParameters(String[] args) 方法來執行命令解析,run 命令會調用 run(params) 方法,如下:
switch (action) {
case ACTION_RUN:
run(params);
return 0;
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
case ACTION_LIST:
list(params);
return 0;
case ACTION_INFO:
info(params);
return 0;
case ACTION_CANCEL:
cancel(params);
return 0;
case ACTION_STOP:
stop(params);
return 0;
case ACTION_SAVEPOINT:
savepoint(params);
return 0;
}run 方法代碼如下
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
# 創建 PackagedProgram 對象
final PackagedProgram program =
getPackagedProgram(programOptions);
#解析獲取相關依賴jar
final List<URL> jobJars = program.getJobJarAndDependencies();
# 生成最終提交配置
final Configuration effectiveConfiguration = getEffectiveConfiguration(
activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
try {
executeProgram(effectiveConfiguration, program);
} finally {
program.deleteExtractedLibraries();
}
}run方法根據用戶傳入的參數如 main函數,jar包等信息創建出 PackagedProgram 對象,這個對象封裝了用戶提交的信息。從 getPackagedProgram()方法里可以看出。
return PackagedProgram.newBuilder() .setJarFile(jarFile) .setUserClassPaths(classpaths) .setEntryPointClassName(entryPointClass) .setConfiguration(configuration) .setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings()) .setArguments(programArgs) .build();
查看PackagedProgram構造方法,里面會創建幾個關鍵成員變量:
classpaths:用戶-C 參數傳入的信息
jarFile : 用戶的主函數的jar
extractedTempLibraries :提取出上面主jar包里 lib/ 文件夾下的所有jar包信息,供后面classloader使用
userCodeClassLoader : 用戶code的classloader,這個classloader會把classpaths,jarFile,extractedTempLibraries 都加入到classpath里。該userCodeClassLoader默認采用child_first優先策略
mainClass :用戶main函數方法 該構造方法如下:
private PackagedProgram(
@Nullable File jarFile,
List<URL> classpaths,
@Nullable String entryPointClassName,
Configuration configuration,
SavepointRestoreSettings savepointRestoreSettings,
String... args) throws ProgramInvocationException {
this.classpaths = checkNotNull(classpaths);
this.savepointSettings = checkNotNull(savepointRestoreSettings);
this.args = checkNotNull(args);
checkArgument(jarFile != null || entryPointClassName != null, "Either the jarFile or the entryPointClassName needs to be non-null.");
// whether the job is a Python job.
this.isPython = isPython(entryPointClassName);
// load the jar file if exists
this.jarFile = loadJarFile(jarFile);
assert this.jarFile != null || entryPointClassName != null;
// now that we have an entry point, we can extract the nested jar files (if any)
this.extractedTempLibraries = this.jarFile == null ? Collections.emptyList() : extractContainedLibraries(this.jarFile);
this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(
getJobJarAndDependencies(),
classpaths,
getClass().getClassLoader(),
configuration);
// load the entry point class
this.mainClass = loadMainClass(
// if no entryPointClassName name was given, we try and look one up through the manifest
entryPointClassName != null ? entryPointClassName : getEntryPointClassNameFromJar(this.jarFile),
userCodeClassLoader);
if (!hasMainMethod(mainClass)) {
throw new ProgramInvocationException("The given program class does not have a main(String[]) method.");
}
}PackagedProgram 里 getJobJarAndDependencies 方法,該方法收集了job所有依賴的jar包,這些jar包后續會提交到集群并加入到classpath路徑中。
PackagedProgram對象構造完成之后,便是創建最終的Configuration對象了,如下方法
final Configuration effectiveConfiguration = getEffectiveConfiguration( activeCommandLine, commandLine, programOptions, jobJars);
這個方法會設置兩個參數:
pipeline.classpaths: 值為getJobJarAndDependencies()和classpaths里的url
pipeline.jars: 值為getJobJarAndDependencies()返回的jar和lib文件夾下的依賴,后續提交集群的時候會根據這個把jar一起提交到集群
準備好 PackagedProgram和Configuration后,就開始執行用戶程序了,
executeProgram(effectiveConfiguration, program);
詳細代碼如下:
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout) throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
# 設置用戶上下文用戶類加載器
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));
ContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
try {
# 反射調用戶的 main 函數執行job提交
program.invokeInteractiveModeForExecution();
} finally {
ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();
}
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}最后總結一下整個流程:
執行flink run 命名傳入相關參數
創建PackagedProgram對象,準備相關jar,用戶類加載器,Configuration對象
通過反射調用用戶Main方法
構建Pipeline StreamGraph,提交job到集群
通過分析流程我們可以發現可以有兩種方式來實現動態jar的添加
動態的 把三方jar 放入 主函數jar包的lib目錄下(可以通過jar uf 命名搞定) 因為在PackagedProgram構造方法里會通過extractContainedLibraries()方法獲取jar lib目錄里的所有jar,并且這些jar會一并上傳到集群
在用戶任務main函數里,通過反射動態設置 Configuration 對象的 pipeline.classpaths , pipeline.jars 這兩個屬性 。并且還需要把第三方jar加載到Thread.contextClassLoader里。(可參見:https://zhuanlan.zhihu.com/p/278482766)
本人在項目中直接采用的是第一種方案,不會添加更多代碼。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。