【spark】DataFrame



2018年03月30日    Author:Guofei

文章归类: 0x11_算法平台    文章编号: 154

版权声明:本文作者是郭飞。转载随意,但需要标明原文链接,并通知本人
原文链接:https://www.guofei.site/2018/03/30/sparkdataframe.html


DataFrame的生成见于另一篇文章
DataFrame转RDD后也有一系列的使用技巧,见于另一篇文章

这里介绍DataFrame的操作

基本操作

导入所需包

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("appName").enableHiveSupport().getOrCreate()

新建

df = spark.createDataFrame([[1, 2], [1, 3], [2, 3], [2, 4]], schema=['col1', 'col2'])
spark.createDataFrame(<pd.DataFrame>)
spark.createDataFrame(<rdd>)

show

df.limit(5)  # 取前5行,不同的是,是transformation
df.show()  # 返回20条数据
df.show(30)  # 返回30条数据
df.collect()  # 返回一个list,里面多个Row

# df.take(5) #类似RDD,等价于 df.limit(n).collect()
# df.head(5) #类似RDD,等价于 df.take(n)
# df.first() #返回一个Row,等价于 df.head(1)


df.describe()  # 返回统计值,是一个action,但返回的是DataFrame
df.describe('col1', 'col2')  # 返回指定字段的统计值


df.columns  # 返回一个list,内容是列名
df.dtypes


df.drop('col1', 'col2')  # 删除某些列
df.withColumnRenamed('col1', 'col1_new')  # 给指定列改名

查询

df.filter('col1=1 and col2="abc"')  # 与 df.where 效果完全相同
df.filter(df.col1 > 5)

df.select('col1', 'col2')
df.selectExpr('col1 as id', 'col2*2 as col2_value')

数据清洗类操作

# 去重
df.distinct()  # 返回一个去重的DataFrame
df.dropDuplicates()
df.dropDuplicates(subset=['col1', 'col2'])  # 按字段去重,其它未指定的字段只会保留1个


df.dropna(how='any', thresh=None, subset=None)  # 返回一个去 null 后的 DataFrame (不会修改df)
# how='any' / 'all'
# thresh: 1) 使 how 失效,2) 如果某一行的非空值个数小于 thresh,就 drop 这一行.
# subset: 你定义的 how/thresh 规则作用于哪些字段

df.fillna(0)

例子:一个数据表,可能重复的数据’id’这个字段也不一样,那么要去重就只能在除id字段以外的所有字段中去重,这么写:

df.dropDuplicates(subset=[i for i in df.columns if i != 'id'])

统计分析类操作

orderBy

df.orderBy(['col1','col2'], ascending=[0,1])

分位数

df.approxQuantile('col1', [0.25,0.75], 0.05) # 返回一个list,大小与第二个参数相同,表示分位数。
# 第一个参数是列名,第二个参数是分位数,第三个参数是准确度,设定为0时代价巨大

df.corr('col1','col2') # 返回一个数字,相关系数。目前只支持两个字段,只支持Person相关系数
df.cov('col1','col2')

pivot

df=spark.createDataFrame(pd_df)
df.groupBy('w', 'x').pivot('y').sum('z')
# 详解:
# 1. groupby 后面的内容作为 index (因为 spark.DataFrame 不搞 index,因此作为普通列)
# 2. pivot 后面的内容作为 col
# 3. 后面接的agg func 作为返回的表里面的 value. 例如 count(), sum('z'), mean,  avg, max, min
# 3_2. sum('z1', 'z2') 产生多个列
# df.groupBy('w', 'x').pivot('y', [20,21,22]).sum('z') # pivot 的第二个参数用来限定 col 所取的范围
# df.groupBy('w', 'x').pivot('y') 是一个 <GroupedData> ,因此后面可以跟 agg 等操作(agg ,apply)

groupby

df.groupby('col1') # 返回一个GroupedData对象,可以对这个对象进行很多操作

#例1
df.groupby('col1').max('col2','col3')
# min,sum,mean,count
df.groupBy('col1').max() # 不加参数就是对所有列做同样的操作


# agg1:默认函数
df.groupby('col1').agg({'col2':'mean','col3':'sum'}) # 似乎不能与F混用

# agg2:F中的函数
from pyspark.sql import functions as F
df.groupBy('col1').agg(F.countDistinct('col2').alias('col2_cnt'))

# agg3:自定义函数
## agg3_1:udf作用于被 groupBy 的列,一一映射就有意义
spark.udf.register('udf_func1',lambda x:x+1)
df.groupBy('a').agg({'a':'udf_func1','b':'std'})
## agg3_2:udf作用于普通列:
def func(x):
    return x + 1
spark.udf.register('func',func)
df.selectExpr('func(a)')
## agg3_2: udf作用于groupBy之后的普通列
#(方案有几种,见于下面)

aggfunc 方案1

先把被汇总的数据放到一个 list 中,然后用 UDF 处理这个list

step1:生成数据

from pyspark.sql import SparkSession
import pandas as pd
import scipy.stats as stats

spark = SparkSession.builder.appName("appName").enableHiveSupport().getOrCreate()

pd_df = pd.DataFrame(stats.norm().rvs(size=(100, 3)), columns=['type', 'col1', 'col2'])
pd_df.type = (pd_df.type > 0) * 1.0
df = spark.createDataFrame(pd_df)

step2: 用 collect_list 汇总数据

import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, ArrayType

df2 = df.groupBy('type'). \
    agg(F.collect_list('col1').alias('col1_lst'),
        F.collect_list('col2').alias('col2_lst'))

step3:构建 udf 并计算得到结果

def func1(x, y):
    return float(stats.pearsonr(x, y)[0])


# 方法1(文本):
spark.udf.register('func1', func1, returnType=DoubleType())
df2.selectExpr('type', 'func1(col1_lst,col2_lst) as corr_1_2').show()

# 方法2(借助F):
udf_func1 = F.udf(func1, DoubleType())
df2.select('type', udf_func1('col1_lst', 'col2_lst').alias('corr_1_2')).show()

额外来讲,可以返回一个list

def func2(col1, col2):
    return max(col1), min(col1), min(col2)


spark.udf.register('func2', func2, returnType=ArrayType(DoubleType()))
df3 = df2.selectExpr('type', 'func2(col1_lst,col2_lst) as col3_lst')

# 也可以同上用 F 来实现:
# 不多写了

# 展开:
df3.selectExpr('*', 'explode(col3_lst) as col4')
df3.selectExpr('*', 'posexplode(col3_lst) as (col4,col5)')

aggfunc 方案2

# http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.apply
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def substract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(substract_mean).show()

一下是相关网站:
http://spark.apache.org/docs/2.1.1/api/python/index.html
https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData
https://blog.csdn.net/dabokele/article/details/52802150
https://blog.csdn.net/sparkexpert/article/details/51042970

UDF 详解

import pyspark.sql.functions as F
from pyspark.sql.types import DoubleType, IntegerType, StringType, ArrayType
import numpy as np


def myfunc(col1, col2):
    return float(np.sin(col1 + 1) + col2)



# 方法1:F形式
myudf = F.udf(myfunc, returnType=DoubleType())  # 这个类型一定要正确指定,否则结果为 null
df.select('col1', 'col2', myudf('col1', 'col2').alias('sin_col1'))

# 或者:
df.withColumn('sin_col1', myudf('col1', 'col2'))



# 方法2:文本形式
spark.udf.register('myudf', myfunc, returnType=DoubleType())
df.selectExpr('col1', 'col2', 'myudf(col1,col2) as sin_col1')

例子:UDF 如何处理 list
(见于上面的 aggfunc)

非groupby下的agg

df1.agg({'col1':'max','col2':'min'}) # 返回1行2列的 DataFrame

合并操作

1. 纵向

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("appName").enableHiveSupport().getOrCreate()
df1 = spark.createDataFrame([[1, 2], [1, 3], [2, 3], [2, 4]], schema=['col1', 'col2'])
df2 = spark.createDataFrame([[1, 2], [1, 4], [2, 3], [2, 3]], schema=['col1', 'col3'])

df1.union(df2)  # 并集:纵向合并,不会删除重复
df1.intersect(df2)  # 交集
df1.subtract(df2)  # 差集

注意,这里的交集和差集是按照整个列

2. 横向

df1.join(df2) #  笛卡尔积,慎用!
df1.join(df2, on='col1')
a.join(b,on=['id','dt'],how='inner')
# innner, left, right, outer
# left_semi,以左表为准,在右表中查找匹配的记录,相当于 in ,并且只返回左表的字段。如果左表的 key 中有 null,会忽略这条记录
# left_anti, 以左表为准,找到右边不匹配的记录,相当于 not in。只返回左表的字段。如果左表的 key 中有 null,会筛选出来这条记录,无论右表中有没有 null




# 进阶用法
a.join(b,on=a.id==b.id,how='right').show()
a.join(b,on=[a.id==b.id,a.col1>b.col2+1],how='right').show()

一个增加效率的技巧:

df2=df2.repartition(number_of_executors)
# 否则,如果小文件过多,只会让一个 executors 去计算

参考文献

https://blog.csdn.net/wy250229163/article/details/52354278


您的支持将鼓励我继续创作!