下方图片解释了Apache Spark DataFrame写入API的流程。
它始于对写入数据的API调用,支持的格式包括 CSV、JSON 或 Parquet。
流程根据选择的保存模式(追加、覆盖、忽略或报错)而分岔。
每种模式执行必要的检查和操作,例如分区和数据写入处理。
流程以数据的最终写入或错误结束,取决于这些检查和操作的结果。
Apache Spark 是一个开源的分布式计算系统,提供了强大的平台用于处理大规模数据。
写入 API 是 Spark 数据处理能力的基本组成部分,允许用户将数据从他们的 Spark 应用程序写入或输出到不同的数据源。
理解 Spark 写入 API
数据源
Spark 支持将数据写入各种数据源,包括但不限于:
•分布式文件系统,如 HDFS•云存储,如 AWS S3、Azure Blob Storage•传统数据库(包括 SQL 和 NoSQL)•大数据文件格式(Parquet、Avro、ORC)
DataFrameWriter
写入 API 的核心类是 DataFrameWriter
,它提供配置和执行写入操作的功能。
通过在 DataFrame 或 Dataset 上调用 .write
方法获得 DataFrameWriter
。
写入模式
指定 Spark 在写入数据时应如何处理现有数据的模式。
常见的模式包括:
append
:将新数据添加到现有数据中。overwrite
:用新数据覆盖现有数据。ignore
:如果数据已存在,则忽略写入操作。errorIfExists
(默认):如果数据已存在,则抛出错误。
格式规范
可以使用 .format("formatType")
方法指定输出数据的格式,如 JSON、CSV、Parquet 等。
分区
为了实现有效的数据存储,可以使用 .partitionBy("column")
方法根据一个或多个列对输出数据进行分区。
配置选项
可以使用 .option("key", "value")
方法设置特定于数据源的各种选项,如压缩、CSV 文件的自定义分隔符等。
保存数据
最后,使用 .save("path")
方法将 DataFrame 写入指定的路径。
其他方法如 .saveAsTable("tableName")
也可用于不同的写入场景。
from pyspark.sql import SparkSession
from pyspark.sql import Row
import os
# 初始化 SparkSession
spark = SparkSession.builder
.appName("DataFrameWriterSaveModesExample")
.getOrCreate()
# 示例数据
data = [
Row(name="Alice", age=25, country="USA"),
Row(name="Bob", age=30, country="UK")
]
# 附加数据用于追加模式
additional_data = [
Row(name="Carlos", age=35, country="Spain"),
Row(name="Daisy", age=40, country="Australia")
]
# 创建 DataFrames
df = spark.createDataFrame(data)
additional_df = spark.createDataFrame(additional_data)
# 定义输出路径
output_path = "output/csv_save_modes"
# 函数:列出目录中的文件
def list_files_in_directory(path):
files = os.listdir(path)
return files
# 显示初始 DataFrame
print("初始 DataFrame:")
df.show()
# 使用覆盖模式写入 CSV 格式
df.write.csv(output_path, mode="overwrite", header=True)
print("覆盖模式后的文件:", list_files_in_directory(output_path))
# 显示附加 DataFrame
print("附加 DataFrame:")
additional_df.show()
# 使用追加模式写入 CSV 格式
additional_df.write.csv(output_path, mode="append", header=True)
print("追加模式后的文件:", list_files_in_directory(output_path))
# 使用忽略模式写入 CSV 格式
additional_df.write.csv(output_path, mode="ignore", header=True)
print("忽略模式后的文件:", list_files_in_directory(output_path))
# 使用 errorIfExists 模式写入 CSV 格式
try:
additional_df.write.csv(output_path, mode="errorIfExists", header=True)
except Exception as e:
print("errorIfExists 模式中发生错误:", e)
# 停止 SparkSession
spark.stop()
Spark 架构概述
在 Apache Spark 中写入 DataFrame 遵循一种顺序流程。
Spark 基于用户 DataFrame 操作创建逻辑计划,优化为物理计划,并分成阶段。
系统按分区处理数据,对其进行日志记录以确保可靠性,并带有定义的分区和写入模式写入到本地存储。
Spark 的架构确保在计算集群中高效管理和扩展数据写入任务。
从 Spark 内部架构的角度来看,Apache Spark 写入 API 涉及了解 Spark 如何在幕后管理数据处理、分发和写入操作。
让我们来详细了解:
1.驱动程序和执行器: Spark 采用主从架构。驱动节点运行应用程序的 main() 函数并维护有关 Spark 应用程序的信息。执行器节点执行数据处理和写入操作。
2.DAG 调度器: 当触发写入操作时,Spark 的 DAG(有向无环图)调度器将高级转换转换为一系列可以在集群中并行执行的阶段。
3.任务调度器: 任务调度器在每个阶段内启动任务。这些任务分布在执行器之间。
1.执行计划和物理计划: Spark 使用 Catalyst 优化器创建高效的执行计划。这包括将逻辑计划(要做什么)转换为物理计划(如何做),考虑到分区、数据本地性和其他因素。
在Spark内部写入数据
数据分布: Spark 中的数据分布在分区中。当启动写入操作时,Spark 首先确定这些分区中的数据布局。
写入任务执行: 每个分区的数据由一个任务处理。这些任务在不同的执行器之间并行执行。
写入模式和一致性:
- 对于
overwrite
和append
模式,Spark 确保一致性,通过管理数据文件的替换或添加来实现。 - 对于基于文件的数据源,Spark 以分阶段的方式写入数据,先写入临时位置再提交到最终位置,有助于确保一致性和处理故障。
格式处理和序列化: 根据指定的格式(例如,Parquet、CSV),Spark 使用相应的序列化器将数据转换为所需的格式。执行器处理此过程。
分区和文件管理:
- 如果指定了分区,则Spark在写入之前根据这些分区对数据进行排序和组织。这通常涉及在执行器之间移动数据。
- Spark 试图最小化每个分区创建的文件数量,以优化大文件大小,在分布式文件系统中更有效。
错误处理和容错: 在写入操作期间,如果任务失败,Spark 可以重试任务,确保容错。但并非所有写入操作都是完全原子的,特定情况可能需要手动干预以确保数据完整性。
优化技术:
- Catalyst 优化器: 为效率优化写入计划,例如最小化数据移动。
- Tungsten: Spark 的 Tungsten 引擎优化数据序列化和反序列化过程中的内存和 CPU 使用。
写入提交协议: Spark 使用写入提交协议来协调特定数据源的任务提交和中止过程,确保对写入数据的一致视图。
Spark 的写入 API 旨在实现高效和可靠的数据写入,它以复杂的方式编排任务分发、数据序列化和文件管理。
它利用 Spark 的核心组件,如 DAG 调度器、任务调度器和 Catalyst 优化器,有效地执行写入操作。