本文介紹了為什么當(dāng)我發(fā)送兩個(gè)輸入流時(shí),Spark Streaming停止工作?的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!
問(wèn)題描述
我正在開(kāi)發(fā)一個(gè)Spark流應(yīng)用程序,其中我需要使用來(lái)自兩個(gè)服務(wù)器的輸入流,每個(gè)服務(wù)器每秒向Spark上下文發(fā)送一條JSON消息。
我的問(wèn)題是,如果我只在一個(gè)流上執(zhí)行操作,一切都運(yùn)行得很好。但如果我有來(lái)自不同服務(wù)器的兩個(gè)流,那么Spark在可以打印任何東西之前凍結(jié),并且只有在兩個(gè)服務(wù)器都發(fā)送了它們必須發(fā)送的所有JSON消息時(shí)(當(dāng)它檢測(cè)到socketTextStream
沒(méi)有接收數(shù)據(jù)時(shí))才開(kāi)始重新工作。
以下是我的代碼:
JavaReceiverInputDStream<String> streamData1 = ssc.socketTextStream("localhost",996,
StorageLevels.MEMORY_AND_DISK_SER);
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream("localhost", 9995,StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<Integer, String> dataStream1= streamData1.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(1, stream);
return streamPair;
}
});
JavaPairDStream<Integer, String> dataStream2= streamData2.mapToPair(new PairFunction<String, Integer, String>() {
public Tuple2<Integer, String> call(String stream) throws Exception {
Tuple2<Integer,String> streamPair= new Tuple2<Integer, String>(2, stream);
return streamPair;
}
});
dataStream2.print(); //for example
請(qǐng)注意,沒(méi)有錯(cuò)誤消息,Spark Simple在啟動(dòng)上下文后凍結(jié),雖然我從端口收到JSON消息,但它沒(méi)有顯示任何內(nèi)容。
非常感謝。
推薦答案
查看Spark Streaming documentation中的這些警告,并查看它們是否適用:
要記住的要點(diǎn)
在本地運(yùn)行Spark Streaming程序時(shí),請(qǐng)勿使用local或local1為主URL。這兩種情況都意味著只有一個(gè)線程將用于本地運(yùn)行任務(wù)。如果您使用的是基于接收器的輸入DStream(例如Sockets、Kafka、Flume等),則將使用單個(gè)線程來(lái)運(yùn)行接收器,而不會(huì)留下處理接收到的數(shù)據(jù)的線程。因此,在本地運(yùn)行時(shí),請(qǐng)始終使用”local[n]”作為主URL,其中n>要運(yùn)行的接收器的數(shù)量(有關(guān)如何設(shè)置主URL的信息,請(qǐng)參閱Spark屬性)。
將邏輯擴(kuò)展到在集群上運(yùn)行,分配給Spark Streaming應(yīng)用程序的核心數(shù)必須多于接收器數(shù)。否則,系統(tǒng)將接收數(shù)據(jù),但無(wú)法處理數(shù)據(jù)。
這篇關(guān)于為什么當(dāng)我發(fā)送兩個(gè)輸入流時(shí),Spark Streaming停止工作?的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,