Java 中的流收集器
1.概述
Java 在 Java 8 中引進了 Stream API,從此它就成為 Java 開發中的主要內容。它易於使用、理解和維護,並提供順序和並行處理選項。
但是,只能執行固定數量的中間操作,靈活性較低。為了克服這個限制, Java 24引入了[Gatherer](https://docs.oracle.com/en/java/javase/24/docs/api/java.base/java/util/stream/Gatherer.html)
接口,為中間操作提供了更大的靈活性。
在本教程中,我們將了解什麼是 Gatherers 以及如何使用它們。
2. 流收集器
Stream Gatherers 的目標是允許自訂中間操作,使管道更加靈活和富有表現力。它們支援非同步和增量處理,同時允許自訂資料分組或累積。
它們可以將元素轉換為 m 對 n 的關係,追蹤先前看到的元素以決定後續元素的轉換,實現並行執行,並將無限流轉換為有限流。
接下來,讓我們來看看組成 Gatherer 的不同功能。
2.1.收集器函數
Gatherer 有四個函數定義如何收集和轉換元素:
-
**initializer()**
:一個可選函數,用於在處理流程時儲存狀態。它為轉換提供了初始狀態。 -
**integrator()**
:整合新的流元素,可選地在處理狀態的上下文中,並可選地向下游發射元素。也能夠根據條件匹配提前終止處理。它控制轉換的核心行為。 -
**combiner()**
:一個可選函數,為收集器提供並行處理能力。透過組合兩種狀態並行處理元素時此功能很有用。如果沒有它,或者輸入流未標記為並行,則 Gatherer 將按順序處理。 -
**finisher()**
:當流中沒有剩餘元素可供使用時呼叫的可選函數。它對於緩衝或滑動視窗等狀態操作很有用
接下來我們來看看一些內建的Gatherers。
2.2. fold()
fold()
將多個元素組合起來,以有序的方式產生最終結果。與reduce()
函數相比,其優點是我們仍然可以在流中使用結果。
我們來看一個程式碼範例:
@Test
void givenNumbers_whenFolded_thenSumIsEmitted() {
Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
Stream folded = numbers.gather(Gatherers.fold(() -> 0, Integer::sum));
List<Integer> resultList = folded.toList();
assertEquals(1, resultList.size());
assertEquals(Integer.valueOf(15), resultList.getFirst());
}
我們已經用初始值0
初始化了fold()
方法,並希望對輸入流中的所有數字求和。由於收集器是中間操作,我們將結果收集到清單中並驗證預期結果。
2.3. mapConcurrent()
顧名思義, mapConcurrent()
在給定並發限制的情況下將函數並行應用於所有元素。它幫助我們避免管理線程池或使用Callable
或Future.
我們將分析下面的程式碼範例:
@Test
void givenWords_whenMappedConcurrently_thenUppercasedWordsAreEmitted() {
Stream<String> words = Stream.of("a", "b", "c", "d");
List<String> resultList = words.gather(Gatherers.mapConcurrent(2, String::toUpperCase)).toList();
assertEquals(4, resultList.size());
assertEquals(List.of("A", "B", "C", "D"),resultList);
}
我們將maxConcurrency
設為2
,這是函數toUpperCase()
所需的最大並發度,我們驗證了預期的輸出。
2.4. scan()
scan()
執行增量累積,即從初始狀態開始,評估當前狀態並將其應用於當前元素以產生下游的值。
在下面的程式碼範例中,我們驗證了相同的內容:
@Test
void givenNumbers_whenScanned_thenRunningTotalsAreEmitted() {
Stream<Integer> numbers = Stream.of(1, 2, 3, 4);
List<Integer> resultList = numbers.gather(Gatherers.scan(() -> 0, Integer::sum)).toList();
assertEquals(4, resultList.size());
assertEquals(List.of(1, 3, 6, 10),resultList);
}
我們使用scan().
我們提供了一個initial
值0
,隨後計算所有輸入值的累計總數。
2.5. windowSliding()
顧名思義,它與滑動視窗演算法的實作有關。如果視窗大小大於流輸入,那麼將只有一個視窗包含所有流元素。一般來說,它將輸入元素聚集在配置大小的滑動視窗中。
我們來看下面的例子:
@Test
void givenNumbers_whenWindowedSliding_thenOverlappingWindowsAreEmitted() {
List<List<Integer>> expectedOutput = List.of(List.of(1, 2, 3), List.of(2, 3, 4), List.of(3, 4, 5));
Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
List<List<Integer>> resultList = numbers.gather(Gatherers.windowSliding(3))
.toList();
assertEquals(3, resultList.size());
assertEquals(expectedOutput,resultList);
}
如預期的那樣,我們得到了一個m 到 n 的輸入元素映射,這些元素被分組到配置大小的清單中。
3.用例
到目前為止,我們已經看到了對中間操作的內建支援。
現在,讓我們探索如何為不同的輸入輸出關係建立自訂Gatherer
。
3.1.一對一
Gatherer
的唯一強制功能是integrator()
。我們將String
元素的輸入流轉換為它們的長度(一對一映射),同時僅定義integrator()
:
@Test
void givenStrings_whenUsingCustomGatherer_thenLengthsAreCalculated() {
List<Integer> expectedOutput = List.of(5, 6, 3);
Stream<String> inputStrings = Stream.of("apple", "banana", "cat");
List<Object> resultList = inputStrings.gather(Gatherer.of((state, element, downstream) -> {
downstream.push(element.length());
return true;
}))
.toList();
assertEquals(3, resultList.size());
assertEquals(expectedOutput, resultList);
}
我們將Gatherer
integrator()
定義為一個 lambda 表達式,將String
的長度推送到下游。我們也可以透過擴充Gatherer
介面來定義自訂Gatherer
類別。
3.2.一對多
我們將以String
元素Stream
作為輸入,並透過拆分句子來產生單字的組合。
讓我們定義一個自訂的Gatherer
來探索不同的功能:
public class SentenceSplitterGatherer implements Gatherer<String, List<String>,String> {
@Override
public Supplier<List<String>> initializer() {
return ArrayList::new;
}
@Override
public BinaryOperator<List<String>> combiner() {
return (left, right) -> {
left.addAll(right);
return left;
};
}
@Override
public Integrator<List<String>, String, String> integrator() {
return (state, element, downstream) -> {
var words = element.split("\\s+");
for (var word : words) {
state.add(word);
downstream.push(word);
}
return true;
};
}
}
在SentenceSplitterGatherer
中,我們定義了initializer()
,它會傳回一個空的ArrayList
作為初始狀態。接下來,我們有並行處理能力所需的combiner()
。最後,我們有integrator()
邏輯,在其中我們拆分字串並更新狀態並進行下游進一步處理。
讓我們使用一些簡單的句子來驗證我們的自訂Gatherer
是否按預期工作:
@Test
void givenSentences_whenUsingCustomOneToManyGatherer_thenWordsAreExtracted() {
List<String> expectedOutput = List.of("hello", "world", "java", "streams");
Stream<String> sentences = Stream.of("hello world", "java streams");
List<String> words = sentences.gather(new SentenceSplitterGatherer())
.toList();
assertEquals(expectedOutput, words);
}
3.3.多對一
讓我們定義一個自訂的Gatherer
,其中我們初始化一個空的ArrayList
,定義Integer
數值流的求和邏輯,最後定義**finisher()
邏輯,當上游沒有更多元素時執行該邏輯**:
public class NumericSumGatherer implements Gatherer<Integer, ArrayList<Integer>, Integer> {
@Override
public Supplier<ArrayList<Integer>> initializer() {
return ArrayList::new;
}
@Override
public Integrator<ArrayList<Integer>, Integer, Integer> integrator() {
return new Integrator<>() {
@Override
public boolean integrate(ArrayList<Integer> state, Integer element, Downstream<? super Integer> downstream) {
if (state.isEmpty()) {
state.add(element);
} else {
state.addFirst(state.getFirst() + element);
}
return true;
}
};
}
@Override
public BiConsumer<ArrayList<Integer>, Downstream<? super Integer>> finisher() {
return (state, downstream) -> {
if (!downstream.isRejecting() && !state.isEmpty()) {
downstream.push(state.getFirst());
state.clear();
}
};
}
}
這裡的想法是將Stream.
讓我們透過一個簡單的測試案例來驗證一下:
@Test
void givenNumbers_whenUsingCustomManyToOneGatherer_thenSumIsCalculated() {
Stream<Integer> inputValues = Stream.of(1, 2, 3, 4, 5, 6);
List<Integer> result = inputValues.gather(new NumericSumGatherer())
.toList();
Assertions.assertEquals(Integer.valueOf(21), result.getFirst());
}
3.4.多對多
之前,我們看到了內建的windowSliding()
Gatherer
是如何運作的。
讓我們使用自訂邏輯實現相同的功能,並驗證預期輸出是否與內建Gatherer
的輸出相同:
public class SlidingWindowGatherer implements Gatherer<Integer, ArrayList<Integer>, List<Integer>> {
@Override
public Supplier<ArrayList<Integer>> initializer() {
return ArrayList::new;
}
@Override
public Integrator<ArrayList<Integer>, Integer, List<Integer>> integrator() {
return new Integrator<>() {
@Override
public boolean integrate(ArrayList<Integer> state, Integer element, Downstream<? super List<Integer>> downstream) {
state.add(element);
if (state.size() == 3) {
downstream.push(new ArrayList<>(state));
state.removeFirst();
}
return true;
}
};
}
@Override
public BiConsumer<ArrayList<Integer>, Downstream<? super List<Integer>>> finisher() {
return (state, downstream) -> {
if (state.size()==3) {
downstream.push(new ArrayList<>(state));
}
};
}
}
我們初始化一個空視窗並將大小設為3
。在整合和完成時,我們只推送配置大小的視窗。
讓我們使用與內建Gatherer:
@Test
void givenNumbers_whenWindowedSliding_thenOverlappingWindowsAreEmitted() {
List<List<Integer>> expectedOutput = List.of(List.of(1, 2, 3), List.of(2, 3, 4), List.of(3, 4, 5));
Stream<Integer> numbers = Stream.of(1, 2, 3, 4, 5);
List<List<Integer>> resultList = numbers.gather(new SlidingWindowGatherer())
.toList();
Assertions.assertEquals(3, resultList.size());
Assertions.assertEquals(expectedOutput, resultList);
}
4. 結論
在本文中,我們首先探討了 Gatherer API 的功能及其解決的挑戰,即為中間Stream
操作提供與collect()
為終端操作提供的類似的功能。
接下來,我們簡單介紹了 API 的不同功能以及一些可用的內建Gatherer
。
最後,我們針對不同的輸入輸出關係實現了一些自訂的Gatherer
實現,同時更詳細地研究了不同的功能實現。
與往常一樣,程式碼可在 GitHub 上取得。