为了完成今天的综合案例实战,我使用的是美国加州 1990 年房屋普查的数据集。
数据集中的每一个数据都代表着一块区域内房屋和人口的基本信息,总共包括 9 项:该地区中心的纬度(latitude)该地区中心的经度(longitude)区域内所有房屋屋龄的中位数(housingMedianAge)区域内总房间数(totalRooms)区域内总卧室数(totalBedrooms)区域内总人口数(population)区域内总家庭数(households)区域内人均收入中位数(medianIncome)该区域房价的中位数(medianHouseValue)
https://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html
from pyspark.sql import SparkSession
# 初始化SparkSession和SparkContext
spark = SparkSession.builder
.master("local")
.appName("California Housing ")
.config("spark.executor.memory", "1gb")
.getOrCreate()
sc = spark.sparkContext
# 读取数据并创建RDD
rdd = sc.textFile('/Users/yourName/Downloads/CaliforniaHousing/cal_housing.data')
# 读取数据每个属性的定义并创建RDD
header = sc.textFile('/Users/yourName/Downloads/CaliforniaHousing/cal_housing.domain')
header.collect()
[u'longitude: continuous.', u'latitude: continuous.', u'housingMedianAge: continuous. ', u'totalRooms: continuous. ', u'totalBedrooms: continuous. ', u'population: continuous. ', u'households: continuous. ', u'medianIncome: continuous. ', u'medianHouseValue: continuous. ']
rdd.take(2)
[u'-122.230000,37.880000,41.000000,880.000000,129.000000,322.000000,126.000000,8.325200,452600.000000', u'-122.220000,37.860000,21.000000,7099.000000,1106.000000,2401.000000,1138.000000,8.301400,358500.000000']
rdd = rdd.map(lambda line: line.split(","))
rdd.take(2)
[[u'-122.230000', u'37.880000', u'41.000000', u'880.000000', u'129.000000', u'322.000000', u'126.000000', u'8.325200', u'452600.000000'], [u'-122.220000', u'37.860000', u'21.000000', u'7099.000000', u'1106.000000', u'2401.000000', u'1138.000000', u'8.301400', u'358500.000000']]
具体来说,就是需要把之前用数组代表的对象,转换成为 Row 对象,再用 toDF() 函数转换成 DataFrame。
from pyspark.sql import Row
df = rdd.map(lambda line: Row(longitude=line[0],
latitude=line[1],
housingMedianAge=line[2],
totalRooms=line[3],
totalBedRooms=line[4],
population=line[5],
households=line[6],
medianIncome=line[7],
medianHouseValue=line[8])).toDF()
df.show()
def convertColumn(df, names, newType)
for name in names:
df = df.withColumn(name, df[name].cast(newType))
return df
columns = ['households', 'housingMedianAge', 'latitude', 'longitude', 'medianHouseValue', 'medianIncome', 'population', 'totalBedRooms', 'totalRooms']
df = convertColumn(df, columns, FloatType())
现在我们可以用 show() 函数打印出这个 DataFrame 所含的数据表。
这里每一列的数据格式都是 string,但是,它们其实都是数字,所以我们可以通过 cast() 函数把每一列的类型转换成 float。
转换成数字有很多优势。比如,我们可以按某一列,对所有对象进行排序,也可以计算平均值等。比如,下面这段代码就可以统计出所有建造年限各有多少个房子。
df.groupBy("housingMedianAge").count().sort("housingMedianAge",ascending=False).show()
预处理通过上面的数据分析,你可能会发现这些数据还是不够直观。具体的问题有:房价的值普遍都很大,我们可以把它调整成相对较小的数字;有的属性没什么意义,比如所有房子的总房间数和总卧室数,我们更加关心的是平均房间数;在我们想要构建的线性模型中,房价是结果,其他属性是输入参数。所以我们需要把它们分离处理;有的属性最小值和最大值范围很大,我们可以把它们标准化处理。
对于第一点,我们观察到大多数房价都是十万起的,所以可以用 withColumn() 函数把所有房价都除以 100000。
对于第二点,我们可以添加如下三个新的列:每个家庭的平均房间数:roomsPerHousehold每个家庭的平均人数:populationPerHousehold卧室在总房间的占比:bedroomsPerRoom
同样,有的列是我们并不关心的,比如经纬度,这个数值很难有线性的意义。所以我们可以只留下重要的信息列。
对于第三点,最简单的办法就是把 DataFrame 转换成 RDD,然后用 map() 函数把每个对象分成两部分:房价和一个包含其余属性的列表,然后在转换回 DataFrame。
df = df.withColumn("medianHouseValue", col("medianHouseValue")/100000)
df = df.withColumn("roomsPerHousehold", col("totalRooms")/col("households"))
.withColumn("populationPerHousehold", col("population")/col("households"))
.withColumn("bedroomsPerRoom", col("totalBedRooms")/col("totalRooms"))
df = df.select("medianHouseValue",
"totalBedRooms",
"population",
"households",
"medianIncome",
"roomsPerHousehold",
"populationPerHousehold",
"bedroomsPerRoom")
from pyspark.ml.linalg import DenseVector
input_data = df.rdd.map(lambda x: (x[0], DenseVector(x[1:])))
df = spark.createDataFrame(input_data, ["label", "features"])
我们重新把两部分重新标记为“label”和“features”,label 代表的是房价,features 代表包括其余参数的列表。
对于第四点,数据的标准化我们可以借助 Spark 的机器学习库 Spark ML 来完成。
from pyspark.ml.feature import StandardScaler
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
scaler = standardScaler.fit(df)
scaled_df = scaler.transform(df)
在第二行,我们创建了一个 StandardScaler,它的输入是 features 列,输出被我们命名为 features_scaled。第三、第四行,我们把这个 scaler 对已有的 DataFrame 进行处理,让我们看下代码块里显示的输出结果。
我们可以清楚地看到,这一行新增了一个 features_scaled 的列,它里面每个数据都是标准化过的,我们应该用它,而非 features 来训练模型。
上面的预处理都做完后,我们终于可以开始构建线性回归模型了。首先,我们需要把数据集分为训练集和测试集,训练集用来训练模型,测试集用来评估模型的正确性。DataFrame 的 randomSplit() 函数可以很容易的随机分割数据,这里我们将 80% 的数据用于训练,剩下 20% 作为测试集。
train_data, test_data = scaled_df.randomSplit([.8,.2],seed=123)
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol='features_scaled', labelCol="label", maxIter=10, regParam=0.3, elasticNetParam=0.8)
linearModel = lr.fit(train_data)
现在有了模型,我们终于可以用 linearModel 的 transform() 函数来预测测试集中的房价,并与真实情况进行对比。代码如下所示。
predicted = linearModel.transform(test_data)
predictions = predicted.select("prediction").rdd.map(lambda x: x[0])
labels = predicted.select("label").rdd.map(lambda x: x[0])
predictionAndLabel = predictions.zip(labels).collect()
我们用 RDD 的 zip() 函数把预测值和真实值放在一起,这样可以方便地进行比较。比如让我们看一下前两个对比结果。
predictionAndLabel.take(2)
[(1.4491508524918457, 1.14999), (1.5831547768979277, 0.964)]
这一讲我们通过一个真实的数据集,通过以下步骤解决了一个实际的数据处理问题:
- 观察并了解数据集
- 数据清洗
- 数据的预处理
- 训练模型
- 评估模型