【spark】sql.functions



2019年01月26日    Author:Guofei

文章归类: 0x11_算法平台    文章编号: 155

版权声明:本文作者是郭飞。转载随意,但需要标明原文链接,并通知本人
原文链接:https://www.guofei.site/2019/01/26/sparkfunctions.html


这篇文章是我总结的spark官方文档中关于pyspark.sql.functions中的有关内容,源文档是按照函数的字母表顺序排列的,这里我按照功能排列。并且删除了其中的一些

用法

有几种

import pyspark.sql.functions as F
df.select(F.expr('*'),F.expr('length(name)').alias('len'),F.rand(seed=42))
df.withColumn('rand_num',F.sin(df.col1)*3)
df.selectExpr('*','power(cid2,5)')

# 针对 agg func:
df.groupBy('col1').agg(F.count('col2'))
df.groupBy('col1').agg(F.count(df.col2))

特殊常用

F.lit(3) # 增列都是一个值
F.col('col1') # 选取一列
F.when().else()

F.expr: 在 Python 里面写sql

# 最简单的用法
df.withColumn('row_num',F.expr('row_number() over(partition by col1 order by col2)'))

# 分位数
from pyspark.sql import Window
df.withColumn('percentile_25',F.expr('percentile_approx(col2, 0.25)').over(Window.partitionBy('col1'))) # 每个partition的分位数,但不聚合(所以逻辑上有点怪)

# 分位数
df.groupBy('col1').agg(F.expr('''
concat(
    round(percentile_approx(col2,0),3),'_',
    round(percentile_approx(col2,0.25),3),'_',
    round(percentile_approx(col2,0.5),3),'_',
    round(percentile_approx(col2,0.75),3),'_',
    round(percentile_approx(col2,1),3),'_'
    )
''').alias('percentile_col2'))

数学

  • 符号,取整等
    abs(col)
    signum(col) # 符号函数,返回-1,0,1
    ceil 向上取整
    floor 向下取整
    round(col, scale=0) # 小数形式的四舍五入,到scale位
    
  • 角度类
    sin(col),cos(col),tan(col)
    asin(col),acos(col),atan(col)
    sinh(col),cosh(col),tanh(col)
    radians(col) # 度数转弧度
    degrees(col) # 弧度转角度
    
  • 指数类
    exp(col)
    pow(col1,col2) # 指数,col1和col2都可以换成数字
    log(col1,col2) # 对数,col1和col2都可以换成数字
    log(col1) # 自然对数
    factorial(col) # 阶乘,sql也有这个命令
    sqrt(col)
    cbrt(col) # 三次方根
    # 其实还有更为python的混合用法:
    df1.withColumn('s1',F.sqrt(df1.col1**df1.col2+df1.col1))
    
  • 统计类
    max(col) # aggregate function
    min(col) # aggregate function
    avg(col) # aggregate function
    mean # alias for avg()
    count(col) # aggregate function
    countDistinct(col) # aggregate function
    corr(col1, col2) # 皮尔逊相关系数
    kurtosis(col) # 峰度
    skewness(col) # 偏度
    stddev(col) # Aggregate function: returns the unbiased sample standard deviation of the expression in a group
    stddev_pop(col) # Aggregate function: returns population standard deviation of the expression in a group.
    stddev_samp(col) # Aggregate function: returns the unbiased sample standard deviation of the expression in a group.
    var_pop(col) # Aggregate function: returns the population variance of the values in a group.
    var_samp(col) # Aggregate function: returns the unbiased variance of the values in a group.
    variance(col) # Aggregate function: returns the population variance of the values in a group.
    
  • 随机函数
    rand(seed=None) # 随机数
    df.withColumn('rand', F.rand(seed=42) * 3) # sql 中也有这个函数,但不能传入 seed 参数
    randn(seed=None) # 正态分布
    

字符串

  • 大小写
    initcap(col) 每个单词的首字母大写一个元素的字符串中可以有多个单词)(sql也有这个命令
    lower(col) 转为小写
    upper(col) # 转为大写
    
  • 填充
    lpad(col, len, pad) # 用pad把col填充到len长度,pad放左边
    rpad(col, len, pad) # 用pad把col填充到len长度,pad放右边
    rtrim(col) # 删掉右边的空格
    ltrim(col) # 删掉左边的空格
    trim(col) # 两边都删
    
  • 反转、截取
    repeat(col, n) # 字符串重复n次
    reverse(col) # 把 string 或 array 反转
    substring(str, pos, len) # 截取从pos开始,len长度
    
  • 匹配
    instr(str, substr) # str中,substr第一次出现的位置,可以用sql
    locate(substr, str, pos=1) # str中,pos后,substr第一次出现的位置号,位置号从1开始,如果没找到就返回0
    
  • 正则
    split(str, pattern) # 正则表达式分割
    regexp_replace(str, pattern, replacement)
    

时间

  • 时间生成
    current_date() # 当前日期
    current_timestamp() # 当前时间
    date_format(date, format) # 把时间转化成指定格式
    date_trunc(format, timestamp) # 把时间四舍五入到某一个精度
    # format – ‘year’, ‘yyyy’, ‘yy’, ‘month’, ‘mon’, ‘mm’, ‘day’, ‘dd’, ‘hour’, ‘minute’, ‘second’, ‘week’, ‘quarter’
    
  • 时间加减
    add_months(start, months) # 增加一个月,mysql可以用 `select DATE_ADD('2018-02-05',INTERVAL 1 month) as dt` 但spark中不行
    # start 可以是字符串格式
    date_sub(start, days)
    datediff(end_date, start_date)
    months_between(end_date, start_date, roundOff=True)
    # 1. 返回月份差
    # 2. date1大于date2,返回正值
    # 3. 默认返回整数,但roundOff=False 时返回8位小数
    next_day(date, dayOfWeek) # 返回下一个“周n”,dayOfWeek=“Mon”, “Tue”, “Wed”, “Thu”, “Fri”, “Sat”, “Sun”
    
  • 取位(都返回int格式)
    year(col) # 取年
    quarter(col) # 取季度
    month(col) # 取月
    hour(col) # 取小时
    minute(col) # 取分钟
    second(col) # 秒
    
  • 取dayof
    dayofmonth(col)
    dayofweek(col)
    dayofyear(col)
    weekofyear(col) # 周一到周日算是一周()
    last_day(date) # 所在月的最后一天
    

array

  • 生成
    array(*cols) # 把多列粘合成一个类似 array<double> 的格式
    sequence(start, stop, step=None) # 等差数列组成的array,入参可以是列名
    
  • 集合运算
    array_contains(array_col, value)  # 包含
    # 如果 array_col 为null,返回null
    # 如果 array_col 包含value,返回value
    # 如果 array_col 不包含value,返回false
    array_distinct(array_col) # 移除 array_col 中的重复
    array_except(array_col1, array_col2) # 生成一个array,其元素在col1中,但不在col2中
    array_intersect(col1, col2) # 差集(剔除重复)
    array_union(col1, col2) #交集(剔除重复)
    arrays_overlap(a1, a2) # 是否存在交集:
    # 如果有共同的元素,返回 true
    # 如果没有共同元素,如果任一有空元素,返回 null
    # 否则返回false
    
  • 修改
    array_position(col, value) # 返回 array 从的第一个value 的序号,如果没有则返回null,序号是从1开始的
    array_remove(col, element) # 从array中删除所有的element
    array_repeat(col, count) # array中的元素重复3次,生成一个新array
    array_sort(col) # 返回升序array,null放到最后
    sort_array(col, asc=True) # 也是排序
    arrays_zip(*cols) # 效果相当于 Python 的 list(zip(a,b)) http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.arrays_zip
    slice(x, start, length) # 切片取数,例如 slice(df.x, 2, 2)
    shuffle(col) # 打乱顺序
    
  • 统计
    size(col) # array的长度
    covar_pop(col1, col2) # 两个array的相关系数
    covar_samp(col1, col2) #相关系数
    array_max(col) # array中的最大值
    array_min(col) # array中的最小值
    
  • 拼接和展开
    array_join(col, delimiter, null_replacement=None)  
    # 将array拼接成一个字符串,其中分割符为 delimiter  
    # 如果指定了null_replacement,用null_replacement 替换空值,否则视空值不存在
    concat(*cols) #也是拼接(可用于 string,binary,array)如果array有一个为空,返回空。这个函数与sql的concat不太一样
    concat_ws(sep, *cols) # 用sep链接
    explode_outer(col)
    # 如果 col是一个 <array> 那么生成一个新列,col,把 <array>中的元素竖着变成列
    # 如果 col 是一个 <map> 那么生成两个新列,key, value, 把map中的元素竖着变成列
    df = spark.createDataFrame([(1, ["foo", "bar"], {"x": 1.0}), (2, [], {}), (3, None, None)],("id", "an_array", "a_map"))
    df.select("id", "an_array", F.explode_outer("a_map")).show()
    df.select("id", "a_map", F.explode_outer("an_array")).show()
    

其它

F.greatest('col1','col2',...) # 多列中最大的那个,sql也有这个命令
F.least('col1','col2',...) # 最小的那个

F.coalesce # 取第一个不为空的元素(sql也有这个命令)

when(condition, value)
df.select(when(df['age'] == 2, df.age + 1).otherwise(4).alias("age"))

aggfunction

collect_list
collect_set
sum
sumDistinct

窗口函数

lag(col, count=1, default=None) # 窗口函数,返回往前数第count行的内容( sql也有同样的命令)( 机器上好像报错)
lead(col, count=1, default=None) # 窗口函数,返回往后数第count行的内容( sql也有同样的命令)( 机器上好像报错)
first(col, ignorenulls=False) # aggfunction,返回第一个
last(col, ignorenulls=False) # aggfunction,返回最后一个
row_number()
cume_dist()
dense_rank()
lead(col, count=1, default=None)  # returns the ntile group id (from 1 to n inclusive)
percent_rank()
rank()

其它

下面这些函数都可以方便地用其它函数代替(虽然运行效率会慢点儿),个人不建议多用下面这些函数,如果你的队友不是很熟悉spark.sql.functions,可能会对他们来说可读性太差,影响合作。

expm1:$e^{x-1}$
hypot(col1, col2) # 等价于 sqrt{(a^2 + b^2)}
atan2(col1, col2) # 输入x,y坐标,输出theta
log10(col)
log2(col)
log1p(col) # $\log(x+1)$

bround(col, scale=0) # 小数形式的四舍五入,与round(col, scale=0)重复

## 不常用
base64
basestring
bin
approx_count_distinct

自增ID

增添一列递增、唯一(但不连续)的数字

import pyspark.sql.functions as fn
df_abnormal_id=df1.select(fn.monotonically_increasing_id().alias('id'),'*')

参考文献

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions
https://blog.csdn.net/liam08/article/details/79663018


您的支持将鼓励我继续创作!