【spark】工程实践



2018年11月15日    Author:Guofei

文章归类: 0x11_算法平台    文章编号: 159

版权声明:本文作者是郭飞。转载随意,但需要标明原文链接,并通知本人
原文链接:https://www.guofei.site/2018/11/15/sparkpractice.html


spark 脚本提交工具

实现功能:

  1. submit时传入参数sys.argv来读取submit时附加的参数
  2. 脚本中调用别的包
  3. 串行和并行提交重跑,基本不用改代码
  4. 提交的spark脚本全部运行完毕后,打印每个脚本的返回码、运行时间、运行开始时间、运行结束时间
  5. 如果有脚本运行失败,退出时返回错误码

1. 被提交脚本

重点看一下传入参数的设计 app_1_2_1.py(被提交的spark脚本):

# coding=utf-8

from pyspark.sql import SparkSession
import os
import sys
import numpy as np

# 读取入参
arrs = sys.argv
cal_dt_str = eval(arrs[1])['cal_dt_str']
spark = SparkSession.builder.appName("app_name_"+cal_dt_str).enableHiveSupport().getOrCreate()

# %% 下面放你的代码
df = spark.createDataFrame([[cal_dt_str, np.random.randint(5)]], schema=['dt', 'rand_num'])
df.write.mode('overwrite').format('orc').partitionBy('dt').saveAsTable('app.app_test_guofei8')
spark.sql("ALTER TABLE app.app_test_guofei9987 SET TBLPROPERTIES ('author' = 'guofei9987')")

2. 串行提交

command1 = '''
spark-submit   --master yarn \
      --deploy-mode {deploymode} \
      --driver-memory 6g \
      --executor-memory 10g \
      --num-executors 400 \
      --executor-cores 6 \
      --py-files ../your_py_files.zip \
      {pyfile} "{arrs}"
'''
# --master:
#      spark://host:port  独立集群作为集群url
#      yarn     yarn作为集群
#      local    本地模式
#      local[N] 本地模式,n个核心
#      local[*] 本地模式,最大核心

# --deploy-mode:
#     client本地,cluster集群

# --executor-memory 执行器的内存
# --driver-memory 驱动器的内存


import os
import datetime

cal_dt_str = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y-%m-%d')
# cal_dt_str='2019-05-05'
print('开始计算' + cal_dt_str + '的数据')

input_file = [
    {'deploymode': 'cluster', 'pyfile': 'app_1_1.py', 'arrs': {'cal_dt_str': cal_dt_str}},
]


def func_run(args):
    start_time = datetime.datetime.now()
    code = os.system(command1.format(**args))
    end_time = datetime.datetime.now()
    return args['pyfile'], code, (end_time - start_time).seconds, \
           start_time.strftime('%Y-%m-%d %H:%M:%S'), end_time.strftime('%Y-%m-%d %H:%M:%S')


result_code_list = [func_run(args=args) for args in input_file]

for i in result_code_list: print(i)
# for i in result_code_list:
#     if i[1] != 0:
#         raise error

3. 并行提交

# !/usr/bin/python
# coding=utf-8
# 并行回溯历史数据
import os
import datetime
import subprocess
import time

right_dt = datetime.datetime(year=2019, month=7, day=31)
left_dt = datetime.datetime(year=2019, month=7, day=1)
pyfile = 'app_1_1.py' # 你的待提交代码
step = -1  # 运行间隔
job_num = 5 # 同时提交这么多个

# 先把待提交的参数算出来
one_day = datetime.timedelta(days=1)
cal_dt_str_list = [(left_dt + i * one_day).strftime('%Y-%m-%d') for i in range((right_dt - left_dt).days + 1)][::step]
input_arr_list = [{'deploymode': 'client', 'pyfile': pyfile, 'arrs': {'cal_dt_str': cal_dt_str}}
                  for cal_dt_str in cal_dt_str_list
                  ]

command1 = '''
spark-submit   --master yarn \
      --deploy-mode {deploymode} \
      --driver-memory 6g \
      --executor-memory 10g \
      --num-executors 400 \
      --executor-cores 6 \
      --py-files ../core_code.zip \
      {pyfile} "{arrs}"
'''


from IPython.display import clear_output # 这个是在jupter中显示方便


def paral_submit(input_arr_list, job_num=10):
    # log是运行完的,subjob 是正在运行的,input_arr_list 是待运行的
    log, subjob = [], dict()
    while len(input_arr_list) > 0 or len(subjob) > 0:
        if len(subjob) < job_num and len(input_arr_list) > 0:
            input_arr = input_arr_list[0]
            cal_dt_str = input_arr['arrs']['cal_dt_str']
            subjob[cal_dt_str] = subprocess.Popen(command1.format(**input_arr), shell=True)
            input_arr_list = input_arr_list[1:]
        subjob_status = [[cal_dt_str, subjob[cal_dt_str].poll()] for cal_dt_str in subjob]
        subjob_status_finished = [[cal_dt_str, status] for cal_dt_str, status in subjob_status if status is not None]
        for i, j in subjob_status_finished:
            subjob.pop(i)
        log.extend(subjob_status_finished)  # 完成的 job 加入log
        time.sleep(1)  # 每秒检测一次
        clear_output() # jupyter 中显示方便,每次抹除
        print('finished:', log)
        print('running:', subjob.keys())
        print('not running:', [i['arrs']['cal_dt_str'] for i in input_arr_list])
        time.sleep(1)  # 每秒检测一次
    print('并行模块执行完毕,返回码如下:', log)
    return log, [i for i in log if i[1] != 0]


paral_submit(input_arr_list,job_num=10)

代码定时备份到git

import os
import subprocess
import time
import datetime
from IPython.display import clear_output


def upload(dir_name):
    '''
    上传git
    '''
    os.chdir(dir_name)
    command_list = ['git add -A', 'git commit -m "update"', 'git push']
    status_list = [subprocess.check_output(command, shell=True).decode('utf-8') for command in command_list]
    return [datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')] + status_list


def make_log(dir_name):
    '''
    上传信息保存到txt中
    '''
    git_upload_log = upload(dir_name)
    git_print_log = '\n\n' + '-' * 10 + '\n' + '\n'.join(git_upload_log)
    with open(dir_name + '/update_log.txt', 'a') as f:
        f.writelines(git_print_log)
    return git_print_log

# 调试
# dir_name='/home/guofei8/project'
# git_print_log=make_log(dir_name)
# print(git_print_log)

# 定时运行
dir_name = '/home/guofei/project'
for i in range(100):
    clear_output()
    git_print_log = make_log(dir_name)
    print(git_print_log)
    time.sleep(60 * 60 * 24)

文件树

不算太美观,但是代码够短、实用

import os
def my_listdir(level, path, tree):
    for i in os.listdir(path):
        tree.append('│    ' * level + '├───' + i)
        if os.path.isdir(path + i):
            my_listdir(level + 1, path + i + os.sep, tree)


path = '/work/project'
tree = [os.path.split(path)[-1]]
my_listdir(0, path + os.sep, tree)
print('\n'.join(tree))

轻量的优点就是改动方便,例如,我不想显示 ‘.ipynb_checkpoints’,’.git’ 这两类文件夹下的文件,就可以这么改:

import os
def my_listdir(level, path, tree):
    for i in os.listdir(path):
        if i in ['.ipynb_checkpoints', '.git']:
            continue
        tree.append('│    ' * level + '├───' + i)
        if os.path.isdir(path + i):
            my_listdir(level + 1, path + i + os.sep, tree)


path = '/work/project'
tree = ['project']
my_listdir(0, path + os.sep, tree)
print('\n'.join(tree))

批量处理hive表

import subprocess

tables_all=subprocess.check_output('''
hive -e 'use app;show tables'
''',shell=True).decode('utf-8').split('\n')

table_guofei=[table for table in tables_all if table.find('guofei')>=0]
for table in table_guofei:
    drop_code=subprocess.check_output('''
    hive -e 'drop table app.{table}'
    '''.format(table=table),shell=True)
    print(drop_code,table)

串行循环

调研数据时,往往需要循环写表。

import datetime
right_dt = datetime.datetime(year=2019, month=8, day=1)
left_dt = datetime.datetime(year=2019, month=2, day=1)
pyfile = 'long_1.py'
step = 1  # 运行间隔
job_num = 1
IsFirstLoop=True
oneday=datetime.timedelta(days=1)
# paral_run.paral_submit(left_dt=left_dt, right_dt=right_dt, pyfile=pyfile, step=step, job_num=job_num)

cal_dt = left_dt if step > 0 else right_dt
while True:
    if cal_dt>right_dt or cal_dt<left_dt:break
    cal_dt_str=cal_dt.strftime('%Y-%m-%d')
    submit(cal_dt_str,spark,IsFirstLoop)
    print(cal_dt_str+' done@'+datetime.datetime.now().strftime('%H:%M:%S')+',  ')
    IsFirstLoop=False
    cal_dt+=step*oneday

print('All done!')

你的脚本

def submit(cal_dt_str, spark, IsFirstLoop):
    if IsFirstLoop:
        write_mode='overwrite'
    else:
        write_mode='append'

    # 这里是你的脚本
    # 然后写表:
    df.write.mode(write_mode).format('orc').partitionBy('dt').saveAsTable('app.app_guofei8_test')
    if IsFirstLoop:
        spark.sql("ALTER TABLE app.app_guofei8_test_data_0909 SET TBLPROPERTIES ('author' = 'guofei8')")

废弃

pivot

  • 借用pandas
    import pandas as pd
    import numpy as np
    pd_df=pd.DataFrame(np.arange(40).reshape(4,-1).T,columns=list('wxyz'))
    pd_df.w=pd_df.w%2
    pd_df.x=pd_df.x//3
    pd_df.pivot_table(index='w',columns='x',values='y',aggfunc=sum)
    

方案4:借助rdd

df.rdd.map(lambda row: ((row['sku_id']), row)).groupByKey().flatMap(lambda row : func(row))

示例(因为目前平台的udf没配置好,所以用rdd来代替,如下)

import scipy.stats as stats
import scipy
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("appName").enableHiveSupport().getOrCreate()

pd_df = pd.DataFrame(scipy.stats.norm().rvs(size=(3000, 3)), columns=list('abc'))
pd_df.a = (pd_df.a > 0.5) * 1

df = spark.createDataFrame(pd_df)


def myfunc1(row, mystr):
    '''
    :param row: (row[0],row[1)结构,
    其中row[0]是一个row['key'],是一个元素
    row[1]是 <iterable of row>
    :param mystr: 可以额外自定义一些输入
    :return: 返回一定是 <iterable> ,其中的每个元素就是新rdd的一行
    '''
    key = row[0]
    x, y = [], []
    for i in row[1]:
        x.append(i['b'])
        y.append(i['c'])
    # pd_df = pd.DataFrame(list(row[1]), columns=list('abc'))  # 一行转为DataFrame. 需要手动定义 columns
    return [[key, sum(x), sum(y)]]


rdd1 = df.rdd.map(lambda row: ((row['a']), row)).groupByKey() \
    .flatMap(lambda row: myfunc1(row, 'cool!'))

df1 = spark.createDataFrame(rdd1, schema=['a', 'b', 'c'])
df1.show()
pd_df1 = df1.toPandas()

您的支持将鼓励我继续创作!