在 Apache Spark 中,可以使用 DataFrame API 或 Dataset API 进行数据过滤。这里分别介绍两种 API 的过滤方法。
- 使用 DataFrame API 进行数据过滤:
首先,需要创建一个 DataFrame。以下是一个简单的示例:
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType spark = SparkSession.builder \ .appName("Filter Example") \ .getOrCreate() data = https://www.yisu.com/ask/[("Alice", 34), ("Bob", 45), ("Cathy", 29), ("David", 31)] columns = ["Name", "Age"] schema = StructType([StructField(column, StringType(), nullable=False) for column in columns]) df = spark.createDataFrame(data, schema=schema) df.show()
输出:
+-----+---+ | Name|Age| +-----+---+ |Alice| 34| | Bob| 45| |Cathy| 29| |David| 31| +-----+---+
接下来,使用 filter()
方法进行数据过滤。例如,我们只保留年龄大于等于 30 岁的人:
from pyspark.sql.functions import col filtered_df = df.filter(col("Age") >= 30) filtered_df.show()
输出:
+-----+---+ | Name|Age| +-----+---+ |Alice| 34| | Bob| 45| |David| 31| +-----+---+
- 使用 Dataset API 进行数据过滤:
首先,需要创建一个 Dataset。以下是一个简单的示例:
from pyspark.sql import SparkSession from pyspark.sql.types import StructType, StructField, StringType from pyspark.sql.functions import col spark = SparkSession.builder \ .appName("Filter Example") \ .getOrCreate() data = https://www.yisu.com/ask/[("Alice", 34), ("Bob", 45), ("Cathy", 29), ("David", 31)] columns = ["Name", "Age"] schema = StructType([StructField(column, StringType(), nullable=False) for column in columns]) # 使用 map() 方法将元组转换为 (Row 对象, 1) 的形式 mapped_data = https://www.yisu.com/ask/data.map(lambda x: (Row(*x), 1))"row", "count") ds.show()
输出:
+----+-----+ |row |count| +----+-----+ |[Alice,34]| 1| |[ Bob,45]| 1| |[Cathy,29]| 1| |[David,31]| 1| +----+-----+
接下来,使用 filter()
方法进行数据过滤。例如,我们只保留年龄大于等于 30 岁的人:
filtered_ds = ds.filter(col("row.Age") >= 30) filtered_ds.show()
输出:
+----+-----+ |row |count| +----+-----+ |[Alice,34]| 1| |[ Bob,45]| 1| |[David,31]| 1| +----+-----+
这样,我们就完成了使用 DataFrame API 和 Dataset API 进行数据过滤的操作。