本文介紹了如何使用睡覺接口在沒有使用的情況下從Sink(或一般稱為RichFunction)獲取作業名稱?的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學習吧!
問題描述
如標題所示。雖然getJobId
在RuntimeContext
中可用,但作業名稱不可用。
https://nightlies.apache.org/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/common/functions/RuntimeContext.html
嘗試從配置中獲取似乎效果不佳:
@Override
public void open(Configuration parameters) throws Exception {
String jobName = parameters.getString(PipelineOptions.NAME); // this is null
}
我們如何運行獨立示例管道:
public static void main(String... args) {
try {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// some pipeline setup
env.execute("This-is-job-name");
} catch (Exception e) {
// logging
}
推薦答案
假設您將作業名稱作為參數傳遞給作業,您希望將其設置如下:
public static void main(String... args) {
ParameterTool parameterTool = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);
然后這應該會起作用
@Override
public void open(Configuration parameters) throws Exception {
ParameterTool params = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
String jobName = params.get(nameOfParameterWithJobName);
}
傳遞給open
的配置始終為空–這是一種不再使用的過時機制。未更改方法簽名以避免破壞公共API。
將此類信息傳遞給RichFunction的另一個好方法是將其傳遞給構造函數。
這篇關于如何使用睡覺接口在沒有使用的情況下從Sink(或一般稱為RichFunction)獲取作業名稱?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,