本文介紹了Hazelcast噴氣變化數(shù)據(jù)捕獲的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!
問(wèn)題描述
我正在我的應(yīng)用程序中使用Hazelcast變更數(shù)據(jù)捕獲(CDC)。
(我之所以使用CDC,是因?yàn)槿绻褂肑DBC或其他替代功能將數(shù)據(jù)加載到緩存中,則需要花費(fèi)大量時(shí)間)
因此CDC將在數(shù)據(jù)庫(kù)和Hazelcast Jet之間進(jìn)行數(shù)據(jù)同步。
StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
.setCustomProperty("plugin.name", "pgoutput").setDatabaseAddress("127.0.0.1").setDatabasePort(5432)
.setDatabaseUser("postgres").setDatabasePassword("root").setDatabaseName("postgres")
.setTableWhitelist("tblName").build();
這里我有以下步驟:-
Pipeline pipeline = Pipeline.create();
// filter records based on deleted false
StreamStage<ChangeRecord> deletedFlagRecords = pipeline.readFrom(source).withoutTimestamps()
.filter(deletedFalse);
deletedFlagRecords.filter(idBasedFetch).writeTo(Sinks.logger());
在這里,我使用StreamSource<ChangeRecord> source
對(duì)象作為pipeLine
的輸入。
如您所知,source
對(duì)象是流類型。
但在我的例子中,管道數(shù)據(jù)處理取決于用戶輸入數(shù)據(jù)(一些元數(shù)據(jù))。
如果我在數(shù)據(jù)庫(kù)中進(jìn)行任何更新或刪除。Jet將更新所有流實(shí)例。
因?yàn)槲业臄?shù)據(jù)處理依賴于用戶數(shù)據(jù),所以我不想在第一步之后使用流類型。
只需要流形式的第一個(gè)StreamSource<ChangeRecord> source;
。
在下一步中,我只想對(duì)批處理流程執(zhí)行此操作;
那么如何在批處理中使用source
。
pipeLine.readFrom(source)
//始終返回Stream類型。那么如何將其轉(zhuǎn)換為批處理類型。
我又嘗試了一種方法,如:-
從source
讀取并將所有內(nèi)容沉入地圖。
pipeLine.readFrom(source).writeTo(Sinks.map("dbStreamedData", e -> e.key(), e -> e.value()));
再次構(gòu)造管道ReadFrom from map。
pipeline.readFrom(Sources.map("dbStreamedData")).writeTo(Sinks.logger());
這只是返回空數(shù)據(jù)。
所以任何建議都會(huì)很有幫助..
推薦答案
只有當(dāng)您需要持續(xù)更新數(shù)據(jù)時(shí),使用CDC源才有意義。例如,對(duì)數(shù)據(jù)庫(kù)中的每一次更新做出反應(yīng),或可能將數(shù)據(jù)加載到映射中,然后在內(nèi)存中的快照上以某個(gè)時(shí)間間隔重復(fù)運(yùn)行批處理作業(yè)。
在這種情況下,您可能只希望第一次更新發(fā)生在CDC源是最新的之后–在它從數(shù)據(jù)庫(kù)讀取所有當(dāng)前狀態(tài)并且只接收對(duì)數(shù)據(jù)庫(kù)進(jìn)行的更新之后。遺憾的是,目前(Hazelcast 5.0)無(wú)法使用Jet API判斷何時(shí)發(fā)生這種情況。
您可以使用一些特定于域的信息-具有您查詢的時(shí)間戳字段、映射中存在上次插入的記錄或類似信息。
如果要對(duì)數(shù)據(jù)庫(kù)表中的數(shù)據(jù)運(yùn)行單個(gè)批處理作業(yè),則應(yīng)使用JDBC源。
(我之所以使用CDC,是因?yàn)槿绻褂肑DBC或其他替代功能將數(shù)據(jù)加載到緩存中,則需要花費(fèi)大量時(shí)間)
使用CDC有它的開(kāi)銷,這是我們通常不會(huì)看到的。在JDBC源代碼中使用像SELECT * FROM table
這樣的普通SQL查詢比使用CDC源代碼更快。也許你沒(méi)有衡量處理整個(gè)當(dāng)前狀態(tài)所需的時(shí)間?如果使用JDBC加載數(shù)據(jù)比使用CDC加載數(shù)據(jù)確實(shí)需要更多時(shí)間,請(qǐng)向復(fù)制者提交問(wèn)題。
這篇關(guān)于Hazelcast噴氣變化數(shù)據(jù)捕獲的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,