在 Java 中合併(連接)兩個具有相同列名的資料框
1.概述
在實際應用中處理大型資料集時,資料分散在多個檔案或資料來源的情況並不少見。然而,我們有時可能需要將這些資料集合併成一個視圖,以便進一步處理。在這種情況下,我們可以利用 Apache Spark,因為它提供了 DataFrames 來合併資料集。
在本教程中,我們將探討如何使用 Apache Spark 在 Java 中組合或連接兩個具有相同欄位名稱的 DataFrame。
2.問題陳述
在這裡,我們希望透過附加行將兩個具有相同模式( id, name
)的Spark DataFrame組合成一個DataFrame 。為了演示,讓我們考慮以下兩個DataFrame, df1
和df2
。
第一個,
df1
包含兩行:
+---+-----+
| id| name|
+---+-----+
| 1|Alice|
| 2| Bob|
+---+-----+
第二個,
df2
也包含兩行:
+---+-------+
| id| name|
+---+-------+
| 3|Charlie|
| 4| Diana|
+---+-------+
最後,我們預期會得到兩個組合 DataFrame 的輸出:
+---+-------+
| id| name|
+---+-------+
| 1| Alice|
| 2| Bob|
| 3|Charlie|
| 4| Diana|
+---+-------+
我們提供了一些範例來示範如何實現這種逐行連接以及單元測試。
在我們繼續之前,讓我們確保我們的系統上安裝了以下內容:
- Java
11
或更高版本 – 運行 Spark 應用程式所需 - Apache Maven – 用於管理相依性和建置項目
- Apache Spark(
3.x
)-在本機執行 Spark 作業
為了驗證我們的設置,我們可以執行命令來確認 Java、Maven 和 Spark 的版本:
# Check Java version
$ java -version
# Check Maven version
$ mvn -version
# Check Spark version
$ spark-submit --version
有了這些工具,現在讓我們看看範例所需的依賴項:
這些是我們在下一節中新增到pom.xml
檔案的依賴項。
3. 為 Spark 設定 Maven 項目
首先,讓我們建立一個名為sparkdataframeconcat
的 Maven 專案:
$ mvn archetype:generate \
-DgroupId=com.baeldung.spark.dataframeconcat \
-DartifactId=sparkdataframeconcat \
-DarchetypeArtifactId=maven-archetype-quickstart \
-DinteractiveMode=false
接下來,在目錄sparkdataframeconcat
中,讓我們使用一些依賴項和插件來更新pom.xml
檔:
<dependencies>
<!-- Spark Core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>3.5.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Spark SQL -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Hadoop Common -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.6</version>
</dependency>
<!-- Log4j2 API -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.22.1</version>
</dependency>
<!-- Log4j2 Core -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.22.1</version>
</dependency>
<!-- Log4j2 SLF4J binding -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j2-impl</artifactId>
<version>2.22.1</version>
</dependency>
<!-- JUnit 5 -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.10.2</version>
<scope>test</scope>
</dependency>
</dependencies>
新增功能可支援 Spark 和 JUnit 5。
此外,讓我們包含一個<build>
部分:
<build>
<plugins>
<!-- Compiler plugin -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
<!-- Surefire plugin for JUnit 5 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.2.5</version>
<configuration>
<useModulePath>false</useModulePath>
</configuration>
</plugin>
</plugins>
</build>
此部分配置 Maven 編譯器插件以使用 Java 11 和 Surefire 插件來執行 JUnit 5 測試。
最後,讓我們新增用於日誌記錄的 Log4j2 配置。為了確保我們的日誌訊息出現在控制台中,讓我們建立設定檔src/main/resources/log4j2.properties
:
status = error
name = SparkLoggingConfig
appender.console.type = Console
appender.console.name = ConsoleAppender
appender.console.target = SYSTEM_OUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = [%p] %c - %m%n
rootLogger.level = info
rootLogger.appenderRefs = console
rootLogger.appenderRef.console.ref = ConsoleAppender
該文件的作用如下:
- 定義一個控制台附加器,將日誌印到
System.out
- 使用模式佈局(
[%p] %c – %m%n
)來格式化具有等級、記錄器名稱和訊息的訊息 - 將根記錄器層級設定為
info
,以便我們的logger.info()
呼叫出現在控制台中
現在,我們的 Maven 專案設定已準備就緒。需要說明的是,Spark 依賴項提供了 DataFrame 支持,JUnit 5 支援測試,而 Log4j2 則確保我們能夠看到有意義的日誌訊息。
4. 實現按行連接
在本節中,讓我們建立ConcatRowsExample
類別:
public class ConcatRowsExample {
private static final Logger logger = LoggerFactory.getLogger(ConcatRowsExample.class);
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Row-wise Concatenation Example")
.master("local[*]")
.getOrCreate();
try {
// Create sample data
List<Person> data1 = Arrays.asList(
new Person(1, "Alice"),
new Person(2, "Bob")
);
List<Person> data2 = Arrays.asList(
new Person(3, "Charlie"),
new Person(4, "Diana")
);
Dataset<Row> df1 = spark.createDataFrame(data1, Person.class);
Dataset<Row> df2 = spark.createDataFrame(data2, Person.class);
logger.info("First DataFrame:");
df1.show();
logger.info("Second DataFrame:");
df2.show();
// Row-wise concatenation using reusable method
Dataset<Row> combined = concatenateDataFrames(df1, df2);
logger.info("After row-wise concatenation:");
combined.show();
} finally {
spark.stop();
}
}
/**
* Concatenates two DataFrames row-wise using unionByName.
*/
public static Dataset<Row> concatenateDataFrames(Dataset<Row> df1, Dataset<Row> df2) {
return df1.unionByName(df2);
}
public static class Person implements java.io.Serializable {
private int id;
private String name;
public Person() {
}
public Person(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
我們來簡單分析一下上面的類別:
-
SparkSession spark = SparkSession.builder()…
– 初始化 Spark 會話,是使用 Spark 中的 DataFrames 的主要入口點 -
Dataset<Row> df1 = spark.createDataFrame(…, Person.class)
– 建立第一個 DataFrame,其中包含行 (1, Alice
) 和 (2, Bob
) -
Dataset<Row> df2 = spark.createDataFrame(…, Person.class)
– 建立第二個 DataFrame,其中包含行 (3, Charlie
) 和 (4, Diana
) -
logger.info(“…”)
– 將雙引號內定義的訊息寫入日誌 -
df1.show(), df2.show()
– 分別將第一個和第二個 DataFrame 的內容印到控制台進行檢查 -
Dataset<Row> combined = concatenateDataFrames(df1, df2)
– 呼叫可重複使用的concatenateDataFrames()
方法,使用 Spark 的unionByName()
方法按行連接兩個 DataFrame -
combined.show()
– 顯示最終組合的 DataFrame -
spark.stop()
– 停止 Spark 會話並釋放資源
上面的範例展示如何使用 Spark 的unionByName()
方法在 Java 中組合(連接)兩個具有相同列名的 DataFrame。
Spark 還提供了一個union()
方法,它根據列的順序(而不是列名)進行匹配。但是,如果列的位置發生變化,但列名保持不變,則可能會導致一些細微的錯誤。相較之下, unionByName()
則根據列名進行匹配,這使得它在生產工作負載下更安全、更可靠,尤其是在處理不斷變化的模式時。
此外,由於兩個 DataFrame 共享相同的模式,Spark 可以輕鬆地將第二個 DataFrame 的行附加到第一個 DataFrame,從而產生單一統一的資料集。
5. 測試按行連接
為了驗證我們的類別是否按預期工作,讓我們建立ConcatRowsExampleUnitTest
類別:
class ConcatRowsExampleUnitTest {
private static SparkSession spark;
private Dataset<Row> df1;
private Dataset<Row> df2;
@BeforeAll
static void setupClass() {
spark = SparkSession.builder()
.appName("Row-wise Concatenation Test")
.master("local[*]")
.getOrCreate();
}
@BeforeEach
void setup() {
df1 = spark.createDataFrame(
Arrays.asList(
new ConcatRowsExample.Person(1, "Alice"),
new ConcatRowsExample.Person(2, "Bob")
),
ConcatRowsExample.Person.class
);
df2 = spark.createDataFrame(
Arrays.asList(
new ConcatRowsExample.Person(3, "Charlie"),
new ConcatRowsExample.Person(4, "Diana")
),
ConcatRowsExample.Person.class
);
}
@AfterAll
static void tearDownClass() {
spark.stop();
}
@Test
void givenTwoDataFrames_whenConcatenated_thenRowCountMatches() {
Dataset<Row> combined = ConcatRowsExample.concatenateDataFrames(df1, df2);
assertEquals(
4,
combined.count(),
"The combined DataFrame should have 4 rows"
);
}
@Test
void givenTwoDataFrames_whenConcatenated_thenSchemaRemainsSame() {
Dataset<Row> combined = ConcatRowsExample.concatenateDataFrames(df1, df2);
assertEquals(
df1.schema(),
combined.schema(),
"Schema should remain consistent after concatenation"
);
}
@Test
void givenTwoDataFrames_whenConcatenated_thenDataContainsExpectedName() {
Dataset<Row> combined = ConcatRowsExample.concatenateDataFrames(df1, df2);
assertTrue(
combined
.filter("name = 'Charlie'")
.count() > 0,
"Combined DataFrame should contain Charlie"
);
}
}
以下是測試文件的細目:
-
givenTwoDataFrames_whenConcatenated_thenRowCountMatches() {…}
– 驗證連接後,組合的 DataFrame 包含4
行,在本例中,2 from df1 + 2 from df2
-
givenTwoDataFrames_whenConcatenated_thenSchemaRemainsSame() {…}
– 確保連線後保留架構(id, name
) -
givenTwoDataFrames_whenConcatenated_thenDataContainsExpectedName() {…}
– 確認第二個 DataFrame (Charlie
) 中的特定資料存在於組合結果中
Spark 作業通常需要處理大量資料。因此,在測試中儘早發現架構不匹配或缺失行可以避免代價高昂的運行時故障。
至此,我們來運行專案。
6. 編譯、運行和測試項目
我們先編譯專案:
$ mvn clean compile
之後,讓我們運行主類別ConcatRowsExample
:
$ mvn exec:java -Dexec.mainClass="com.baeldung.spark.dataframeconcat.ConcatRowsExample"
...
[INFO] com.baeldung.spark.dataframeconcat.ConcatRowsExample - First DataFrame:
...
+---+-----+
| id| name|
+---+-----+
| 1|Alice|
| 2| Bob|
+---+-----+
[INFO] com.baeldung.spark.dataframeconcat.ConcatRowsExample - Second DataFrame:
+---+-------+
| id| name|
+---+-------+
| 3|Charlie|
| 4| Diana|
+---+-------+
[INFO] com.baeldung.spark.dataframeconcat.ConcatRowsExample - After row-wise concatenation:
...
+---+-------+
| id| name|
+---+-------+
| 1| Alice|
| 2| Bob|
| 3|Charlie|
| 4| Diana|
+---+-------+
...
記錄器會將描述性訊息(例如“First DataFrame:”
寫入日誌,而show()
方法則會將每個 DataFrame 的實際表格內容列印到控制台。在連接之前分別顯示兩個 DataFrame 可以簡化調試,因為我們可以在合併之前確認兩個輸入都是正確的。
最後,讓我們執行測試:
$ mvn test
...
[INFO] Results:
[INFO]
[INFO] Tests run: 3, Failures: 0, Errors: 0, Skipped: 0
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 30.741 s
[INFO] Finished at: 2025-09-18T12:44:18+03:00
[INFO] ------------------------------------------------------------------------
上面,我們看到所有 JUnit 測試都通過了。
7. 結論
在本文中,我們示範如何使用 Apache Spark 在 Java 中組合(連接)兩個具有相同列名的 DataFrame。
我們利用 Spark 的unionByName()
方法安全地將一個 DataFrame 中的行追加到另一個 DataFrame 中,同時確保其模式的一致性。此外,我們也建立了 JUnit 測試來驗證連線後的 DataFrame 是否保留了模式以及預期的資料。
透過這種方法,我們可以處理分批到達的資料。例如,合併來自多個文件、來源或分區的數據,以便進一步分析。因此,有了這些設置,我們可以輕鬆擴展此範例以處理更大的資料集。
與往常一樣,原始碼可在 GitHub 上取得。