本文介紹了Kafka Streams:如何獲得SessionWindow的第一個(gè)和最后一個(gè)記錄?的處理方法,對(duì)大家解決問(wèn)題具有一定的參考價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)吧!
問(wèn)題描述
默認(rèn)情況下,.windowedBy(SessionWindows.with(Duration.ofSeconds(60))
為每個(gè)傳入記錄返回一條記錄。
結(jié)合使用.count()
和.filter()
可以輕松檢索第一條記錄。
使用
.suppress(Suppressed.untilWindowCloses(unbounded()))
還可以輕松檢索最后一條記錄。
所以…我做了兩次處理,您可以看到修改后的字?jǐn)?shù)統(tǒng)計(jì)示例:
final KStream<String, String> streamsBranches = builder.<String,String>stream("streams-plaintext-input");
streamsBranches
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W+")))
.groupBy((key, value) -> ""+value)
.windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.toStream()
.map((wk, v) -> new KeyValue<>(wk.key(), v == null ? -1l : v))
.filter((wk, v) -> v == 1)
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
streamsBranches
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\W+")))
.groupBy((key, value) -> ""+value)
.windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
.count(Materialized.with(Serdes.String(), Serdes.Long()))
.suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream()
.map((wk, v) -> new KeyValue<>(wk.key(), v))
.filter((wk, v) -> v != null)
.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
但我想知道是否有更簡(jiǎn)單、更漂亮的方法來(lái)做同樣的事情。
推薦答案
我認(rèn)為您應(yīng)該使用SessionWindowedKStream::aggregate(...)
,并根據(jù)您的邏輯將結(jié)果累加到聚合器(第一個(gè)和最后一個(gè)值)
示例代碼可能如下所示:
streamsBranches.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofSeconds(60)).grace(Duration.ofSeconds(2)))
.aggregate(
AggClass::new,
(key, value, oldAgg) -> oldAgg.update(value),
(key, agg1, agg2) -> agg1.merge(agg2),
Materialized.with(Serdes.String(), new AggClassSerdes())
).suppress(Suppressed.untilWindowCloses(unbounded()))
.toStream().map((wk, v) -> new KeyValue<>(wk.key(), v))
.to("streams-wordcount-output", Produced.with(Serdes.String(), new AggClassSerdes()));
其中AggClass
是累加器,AggClassSerdes
是累加器Serdes
public class AggClass {
private String first;
private String last;
public AggClass() {}
public AggClass(String first, String last) {
this.first = first;
this.last = last;
}
public AggClass update(String value) {
if (first == null)
first = value;
last = value;
return this;
}
public AggClass merge(AggClass other) {
if (this.first == null)
return other;
else return new AggClass(this.first, other.last);
}
}
這篇關(guān)于Kafka Streams:如何獲得SessionWindow的第一個(gè)和最后一個(gè)記錄?的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,