日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

本文介紹了Hazelcast噴氣變化數(shù)據(jù)捕獲的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!

問(wèn)題描述

我正在我的應(yīng)用程序中使用Hazelcast變更數(shù)據(jù)捕獲(CDC)。

(我之所以使用CDC,是因?yàn)槿绻褂肑DBC或其他替代功能將數(shù)據(jù)加載到緩存中,則需要花費(fèi)大量時(shí)間)

因此CDC將在數(shù)據(jù)庫(kù)和Hazelcast Jet之間進(jìn)行數(shù)據(jù)同步。

    StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
                .setCustomProperty("plugin.name", "pgoutput").setDatabaseAddress("127.0.0.1").setDatabasePort(5432)
                .setDatabaseUser("postgres").setDatabasePassword("root").setDatabaseName("postgres")
                .setTableWhitelist("tblName").build();

這里我有以下步驟:-

Pipeline pipeline = Pipeline.create();

// filter records based on deleted false
StreamStage<ChangeRecord> deletedFlagRecords = pipeline.readFrom(source).withoutTimestamps()
                .filter(deletedFalse);

deletedFlagRecords.filter(idBasedFetch).writeTo(Sinks.logger());

在這里,我使用StreamSource<ChangeRecord> source對(duì)象作為pipeLine的輸入。

如您所知,source對(duì)象是流類型。

但在我的例子中,管道數(shù)據(jù)處理取決于用戶輸入數(shù)據(jù)(一些元數(shù)據(jù))。

如果我在數(shù)據(jù)庫(kù)中進(jìn)行任何更新或刪除。Jet將更新所有流實(shí)例。

因?yàn)槲业臄?shù)據(jù)處理依賴于用戶數(shù)據(jù),所以我不想在第一步之后使用流類型。

只需要流形式的第一個(gè)StreamSource<ChangeRecord> source;

在下一步中,我只想對(duì)批處理流程執(zhí)行此操作;

那么如何在批處理中使用source

pipeLine.readFrom(source)//始終返回Stream類型。那么如何將其轉(zhuǎn)換為批處理類型。

我又嘗試了一種方法,如:-

source讀取并將所有內(nèi)容沉入地圖。
pipeLine.readFrom(source).writeTo(Sinks.map("dbStreamedData", e -> e.key(), e -> e.value()));

再次構(gòu)造管道ReadFrom from map。

pipeline.readFrom(Sources.map("dbStreamedData")).writeTo(Sinks.logger());

這只是返回空數(shù)據(jù)。

所以任何建議都會(huì)很有幫助..

推薦答案

只有當(dāng)您需要持續(xù)更新數(shù)據(jù)時(shí),使用CDC源才有意義。例如,對(duì)數(shù)據(jù)庫(kù)中的每一次更新做出反應(yīng),或可能將數(shù)據(jù)加載到映射中,然后在內(nèi)存中的快照上以某個(gè)時(shí)間間隔重復(fù)運(yùn)行批處理作業(yè)

在這種情況下,您可能只希望第一次更新發(fā)生在CDC源是最新的之后–在它從數(shù)據(jù)庫(kù)讀取所有當(dāng)前狀態(tài)并且只接收對(duì)數(shù)據(jù)庫(kù)進(jìn)行的更新之后。遺憾的是,目前(Hazelcast 5.0)無(wú)法使用Jet API判斷何時(shí)發(fā)生這種情況。

您可以使用一些特定于域的信息-具有您查詢的時(shí)間戳字段、映射中存在上次插入的記錄或類似信息。

如果要對(duì)數(shù)據(jù)庫(kù)表中的數(shù)據(jù)運(yùn)行單個(gè)批處理作業(yè),則應(yīng)使用JDBC源。

(我之所以使用CDC,是因?yàn)槿绻褂肑DBC或其他替代功能將數(shù)據(jù)加載到緩存中,則需要花費(fèi)大量時(shí)間)

使用CDC有它的開(kāi)銷,這是我們通常不會(huì)看到的。在JDBC源代碼中使用像SELECT * FROM table這樣的普通SQL查詢比使用CDC源代碼更快。也許你沒(méi)有衡量處理整個(gè)當(dāng)前狀態(tài)所需的時(shí)間?如果使用JDBC加載數(shù)據(jù)比使用CDC加載數(shù)據(jù)確實(shí)需要更多時(shí)間,請(qǐng)向復(fù)制者提交問(wèn)題。

這篇關(guān)于Hazelcast噴氣變化數(shù)據(jù)捕獲的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,

分享到:
標(biāo)簽:Hazelcast 變化 噴氣 捕獲 數(shù)據(jù)
用戶無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定