本文介紹了電光為什么在三角洲湖表中寫空的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧!
問題描述
我的Java應(yīng)用程序使用連接到套接字服務(wù)器的Spark Structured Streaming
不斷獲取封裝在RDMessage對象中的傳感器測量記錄(IoT),該對象記錄協(xié)議中用于控制的消息類型。
當(dāng)消息到達時,將使用Encoder<RDMeasurement> measurementEncoder = Encoders.bean(RDMeasurement.class)
檢查它們并將其轉(zhuǎn)換為數(shù)據(jù)集。
雖然流被正確讀取并且RDMeasurement
對象被正確創(chuàng)建,但是輸出流被設(shè)置為無或零,具體取決于數(shù)據(jù)類型。當(dāng)我更改格式(.format("console")
)時,我在DeltaFrame表或控制臺中看到這一點。
我這里錯過了什么?出什么問題了?
請參閱下面最重要的Java代碼段
public final class SocketRDMeasurement {
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession
.builder()
.appName("SSSocketRDMeasurement")
.master("local[*]")
.getOrCreate();
Encoder<StringArray> stringArrayEncoder = Encoders.bean(StringArray.class);
Encoder<RDMessage> messageEncoder = Encoders.bean(RDMessage.class);
Encoder<RDMeasurement> measurementEncoder = Encoders.bean(RDMeasurement.class);
Dataset<Row> records = spark
.readStream()
.format("socket")
.option("host", host)
.option("port", port)
.load();
Dataset<String> inputReceived = records.as(Encoders.STRING());
Dataset<StringArray> input = inputReceived.as(Encoders.STRING())
.map((MapFunction<String, StringArray>) x ->
new StringArray(x),
stringArrayEncoder);
Dataset<RDMessage> messages = input.map(
(MapFunction<StringArray, RDMessage>)
r -> new RDMessage(r), messageEncoder);
Dataset<RDMeasurement> measurements = messages
.map((MapFunction<RDMessage, RDMeasurement>) r ->
new RDMeasurement(), measurementEncoder);
// The code executes without warning or error but despite the
// objects being created correctly the output of dataset is
// is saved with nulls/nan
StreamingQuery query = measurements.writeStream()
.outputMode("append")
.format("delta")
.option("checkpointLocation",
"/opt/data/delta/_checkpoints/ss-socket-rd-measurement")
.start("/opt/data/delta/ss-socket-rd-measurement");
query.awaitTermination();
}
}
public class StringArray implements Serializable {
private String[] tokens;
public StringArray(String tokens) {
this.tokens = tokens.split(",");
}
// getters, setters and toString goes here
}
public class RDMeasurement implements Serializable {
private String dataSourceName = null;
private double dt = 0.0f;
private double t0 = 0f;
private double endTimestamp = 0L;
private double[] valuesArray;
public RDMeasurement() { }
public RDMeasurement(String dataSourceName, double t0,
double dt, double endTimestamp, double[] valuesArray) {
this.dataSourceName = dataSourceName;
this.t0 = t0;
this.dt = dt;
this.endTimestamp = endTimestamp;
this.valuesArray = valuesArray;
}
// getters, setters and toString goes here
}
public class RDMessage implements Serializable {
String type;
RDMeasurement rdMeasurement;
public RDMessage(String type, RDMeasurement rdMeasurement) {
this.type = type;
this.rdMeasurement = rdMeasurement;
}
public RDMessage(StringArray stringArray) {
this(stringArray.getTokens()[0] ,
new RDMeasurement(stringArray.getTokens()[1],
Double.parseDouble(stringArray.getTokens()[2]),
Double.parseDouble(stringArray.getTokens()[3]),
Double.parseDouble(stringArray.getTokens()[4]),
toDoubleArray(5, stringArray))
);
}
private static double[] toDoubleArray(int skip, StringArray stringArray) {
double[] ret = new double[stringArray.getTokens().length - 5];
for (int i = 0; i < stringArray.getTokens().length - 5; i++) {
ret[i] = Double.parseDouble(stringArray.getTokens()[i+skip]);
}
return ret;
}
// getters, setters and toString goes here
}
每行輸入遵循以下格式:
V1_start_rd_0,ds_1,1642442598.266,1.0,1642442618.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00
V1_rd_1,ds_2,1642442619.266,1.0,1642442639.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00
V1_rd_2,ds_3,1642442640.266,1.0,1642442660.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00
推薦答案
重構(gòu)Java代碼并添加調(diào)試代碼段后,我能夠識別錯誤。
參見重構(gòu):
StreamingQuery query = dataStreamReader.load()
.as(Encoders.STRING())
.map((MapFunction<String, StringArray>) x -> new StringArray(x),
stringArrayEncoder)
.map((MapFunction<StringArray, RDMessage>)
r -> new RDMessage(r), messageEncoder)
.map((MapFunction<RDMessage, RDMeasurement>) e ->
e.getRdMeasurement(), measurementEncoder)
/*
.map((MapFunction<RDMeasurement, String>) e -> {
if (e.getDataSourceName() != null) {
System.out.println("???> " + e);
}
return e.toString();
}, Encoders.STRING())
.map((MapFunction<String, RDMeasurement>) s -> new RDMeasurement(s),
measurementEncoder)
*/
.writeStream()
.outputMode("append")
.format("console")
.start();
query.awaitTermination();
上面注釋的代碼使我能夠識別問題。
這篇關(guān)于電光為什么在三角洲湖表中寫空的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,