本文介紹了一個(gè)數(shù)據(jù)流作業(yè)內(nèi)的并行管道的處理方法,對(duì)大家解決問題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧!
問題描述
我要在GCP上的一個(gè)數(shù)據(jù)流作業(yè)內(nèi)運(yùn)行兩個(gè)并行管道。我已經(jīng)創(chuàng)建了一個(gè)管道,并且工作正常,但我希望在不創(chuàng)建另一個(gè)作業(yè)的情況下創(chuàng)建另一個(gè)管道。
我搜索了這么多答案,但沒有找到任何代碼示例:(
如果我這樣運(yùn)行它,它不工作:
pipe1.run();
pipe2.run();
顯示”已有活動(dòng)作業(yè)名稱…如果要提交第二個(gè)作業(yè),請(qǐng)嘗試使用--jobName
重新設(shè)置其他名稱”
推薦答案
您可以將其他輸入應(yīng)用于管道,這將導(dǎo)致一個(gè)作業(yè)中的單獨(dú)管道。例如:
public class ExamplePipeline {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> linesForPipelineOne = pipeline.apply(Create.of("A1", "B1"));
PCollection<String> linesToWriteFromPipelineOne = linesForPipelineOne.apply("Pipeline 1 transform",
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Pipeline one:" + c.element());
c.output(c.element() + " extra message.");
}
}));
linesToWriteFromPipelineOne.apply((TextIO.write().to("file.txt")));
PCollection<String> linesForPipelineTwo = pipeline.apply(Create.of("A2", "B2"));
linesForPipelineTwo.apply("Pipeline 2 transoform",
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Pipeline two:" + c.element());
}
}));
pipeline.run();
}
如您所見,您還可以將兩個(gè)(或更多)獨(dú)立的PBegin應(yīng)用于具有多個(gè)PDone/接收器的管道。在此示例中,"pipeline 1"
將輸出轉(zhuǎn)儲(chǔ)并寫入文件,"pipeline 2"
僅將其轉(zhuǎn)儲(chǔ)到屏幕。
如果您在GCP上使用DataflowRunner
運(yùn)行此命令,則圖形用戶界面將顯示2個(gè)未連接的”管道”。
這篇關(guān)于一個(gè)數(shù)據(jù)流作業(yè)內(nèi)的并行管道的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,