日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長提供免費收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

本文介紹了電光為什么在三角洲湖表中寫空的處理方法,對大家解決問題具有一定的參考價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)吧!

問題描述

我的Java應(yīng)用程序使用連接到套接字服務(wù)器Spark Structured Streaming不斷獲取封裝在RDMessage對象中的傳感器測量記錄(IoT),該對象記錄協(xié)議中用于控制的消息類型。
當(dāng)消息到達時,將使用Encoder<RDMeasurement> measurementEncoder = Encoders.bean(RDMeasurement.class)檢查它們并將其轉(zhuǎn)換為數(shù)據(jù)集。

雖然流被正確讀取并且RDMeasurement對象被正確創(chuàng)建,但是輸出流被設(shè)置為無或零,具體取決于數(shù)據(jù)類型。當(dāng)我更改格式(.format("console"))時,我在DeltaFrame表或控制臺中看到這一點。

我這里錯過了什么?出什么問題了?

請參閱下面最重要的Java代碼段

public final class SocketRDMeasurement {

    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession
                .builder()
                .appName("SSSocketRDMeasurement")
                .master("local[*]")
                .getOrCreate();

        Encoder<StringArray> stringArrayEncoder = Encoders.bean(StringArray.class);
        Encoder<RDMessage> messageEncoder = Encoders.bean(RDMessage.class);
        Encoder<RDMeasurement> measurementEncoder = Encoders.bean(RDMeasurement.class);

        Dataset<Row> records = spark
                .readStream()
                .format("socket")
                .option("host", host)
                .option("port", port)
                .load();

        Dataset<String> inputReceived = records.as(Encoders.STRING());

        Dataset<StringArray> input = inputReceived.as(Encoders.STRING())
                .map((MapFunction<String, StringArray>) x ->
                                new StringArray(x),
                        stringArrayEncoder);

        Dataset<RDMessage> messages = input.map(
                (MapFunction<StringArray, RDMessage>) 
                    r -> new RDMessage(r), messageEncoder);

        Dataset<RDMeasurement> measurements = messages
                .map((MapFunction<RDMessage, RDMeasurement>) r ->
                        new RDMeasurement(), measurementEncoder);

        // The code executes without warning or error but despite the 
        // objects being created correctly the output of dataset is
        // is saved with nulls/nan
        StreamingQuery query = measurements.writeStream()
                .outputMode("append")
                .format("delta")
                .option("checkpointLocation",
                    "/opt/data/delta/_checkpoints/ss-socket-rd-measurement")
                .start("/opt/data/delta/ss-socket-rd-measurement");

        query.awaitTermination();
    }
}

public class StringArray implements Serializable {
    private String[] tokens;
    public StringArray(String tokens) {
        this.tokens = tokens.split(",");
    }

    // getters, setters and toString goes here
}

public class RDMeasurement implements Serializable {
    private String dataSourceName = null;
    private double dt = 0.0f;
    private double t0 = 0f;
    private double endTimestamp = 0L;
    private double[] valuesArray;

    public RDMeasurement() { }

    public RDMeasurement(String dataSourceName, double t0, 
        double dt, double endTimestamp, double[] valuesArray) {

        this.dataSourceName = dataSourceName;
        this.t0 = t0;
        this.dt = dt;
        this.endTimestamp = endTimestamp;
        this.valuesArray = valuesArray;
    }

    // getters, setters and toString goes here
}

public class RDMessage implements Serializable {
    String type;
    RDMeasurement rdMeasurement;

    public RDMessage(String type, RDMeasurement rdMeasurement) {
        this.type = type;
        this.rdMeasurement = rdMeasurement;
    }

    public RDMessage(StringArray stringArray) {
        this(stringArray.getTokens()[0] ,
                new RDMeasurement(stringArray.getTokens()[1],
                        Double.parseDouble(stringArray.getTokens()[2]),
                        Double.parseDouble(stringArray.getTokens()[3]),
                        Double.parseDouble(stringArray.getTokens()[4]),
                        toDoubleArray(5, stringArray))
        );
    }

    private static double[] toDoubleArray(int skip, StringArray stringArray) {
        double[] ret = new double[stringArray.getTokens().length - 5];
        for (int i = 0; i < stringArray.getTokens().length - 5; i++) {
            ret[i] = Double.parseDouble(stringArray.getTokens()[i+skip]);
        }
        return ret;
    }

    // getters, setters and toString goes here
}

每行輸入遵循以下格式:

V1_start_rd_0,ds_1,1642442598.266,1.0,1642442618.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00
V1_rd_1,ds_2,1642442619.266,1.0,1642442639.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00
V1_rd_2,ds_3,1642442640.266,1.0,1642442660.266,1.00,2.00,3.00,4.00,5.00,6.00,7.00,8.00,9.00,10.00,11.00,12.00,13.00,14.00,15.00,16.00,17.00,18.00,19.00,20.00

推薦答案

重構(gòu)Java代碼并添加調(diào)試代碼段后,我能夠識別錯誤。

參見重構(gòu):

StreamingQuery query = dataStreamReader.load()
        .as(Encoders.STRING())
        .map((MapFunction<String, StringArray>) x -> new StringArray(x),
                stringArrayEncoder)
        .map((MapFunction<StringArray, RDMessage>)
                r -> new RDMessage(r), messageEncoder)
        .map((MapFunction<RDMessage, RDMeasurement>) e ->
                e.getRdMeasurement(), measurementEncoder)
        /*
        .map((MapFunction<RDMeasurement, String>) e -> {
            if (e.getDataSourceName() != null) {
                System.out.println("???> " + e);
            }
            return e.toString();
        }, Encoders.STRING())
        .map((MapFunction<String, RDMeasurement>) s -> new RDMeasurement(s),
                measurementEncoder) 
        */
        .writeStream()
        .outputMode("append")
        .format("console")
        .start();
query.awaitTermination();

上面注釋的代碼使我能夠識別問題。

這篇關(guān)于電光為什么在三角洲湖表中寫空的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,

分享到:
標(biāo)簽:洲湖表中寫空 電光
用戶無頭像

網(wǎng)友整理

注冊時間:

網(wǎng)站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨大挑戰(zhàn)2018-06-03

數(shù)獨一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運動步數(shù)有氧達人2018-06-03

記錄運動步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評定2018-06-03

通用課目體育訓(xùn)練成績評定