如何中止迭代Java Stream forEach
過濾Java中的可選流
Java 8並行流中的自定義線程池
合併Java流
map()和flatMap()之間的區別
Java流的字符串操作
如何使用索引迭代流
Java中將迭代器轉變爲流
如何在Java中獲取流的最後一個元素?
將字符串轉換為字符流
Java中的“流已被操作或關閉”異常
Java 8和無限流
如何向流中添加單個元素
Java 8中的原始類型流
在Java Stream API中有所不同
Java 9 Stream API的改進
Java Spliterator簡介
如何在Java 8流中使用if / else邏輯
Java 8謂詞鏈
具有Lambda表達式的Java流過濾器
用Java流求和
Java 8 Streams peek() API
與Map一起使用流
Java 8並行流中的自定義線程池
1.概述
Java 8引入了S treams的概念, treams是對數據執行批量操作的有效方法。並且可以在支持並發的環境中獲得併行Streams 。
這些流可以帶來更高的性能-以多線程開銷為代價。
在本快速教程中,我們將研究**Stream API的最大局限之一,**並了解如何使並行流與自定義ThreadPool實例一起使用-或者有一個庫來處理這個問題。
2.並行Stream
讓我們從一個簡單的示例開始-在任何Collection類型上調用parallelStream方法-這將返回一個可能的並行Stream :
@Test
public void givenList_whenCallingParallelStream_shouldBeParallelStream(){
List<Long> aList = new ArrayList<>();
Stream<Long> parallelStream = aList.parallelStream();
assertTrue(parallelStream.isParallel());
}
在此類Stream中發生的默認處理使用ForkJoinPool.commonPool(),這是整個應用程序共享的Thread Pool 。
3.自定義Thread Pool
實際上,在處理stream時,我們可以傳遞自定義ThreadPool 。
下面的示例讓並行Stream使用自定義Thread Pool來計算1到1,000,000(含)之間的長值之和:
@Test
public void giveRangeOfLongs_whenSummedInParallel_shouldBeEqualToExpectedTotal()
throws InterruptedException, ExecutionException {
long firstNum = 1;
long lastNum = 1_000_000;
List<Long> aList = LongStream.rangeClosed(firstNum, lastNum).boxed()
.collect(Collectors.toList());
ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(
() -> aList.parallelStream().reduce(0L, Long::sum)).get();
assertEquals((lastNum + firstNum) * lastNum / 2, actualTotal);
}
我們使用了並行度為4的ForkJoinPool構造函數。需要進行一些實驗才能確定不同環境的最佳值,但是一個很好的經驗法則是根據CPU的核心數量來選擇數字。
接下來,我們處理了並行Stream的內容,並在reduce調用中將其匯總。
這個簡單的示例可能無法展示使用自定義Thread Pool的全部用處,但是在我們不希望將通用Thread Pool與長期運行的任務捆綁在一起的情況下(例如處理來自網絡源的數據),其好處顯而易見。 ,或者應用程序中的其他組件正在使用公共Thread Pool 。
4。結論
我們已經簡要介紹瞭如何使用自定義Thread Pool運行並行Stream 。在正確的環境中,並通過適當使用並行度,在某些情況下可以提高性能。
可以在Github上找到本文引用的完整代碼示例。