CDH6.3.2 的pyspark读取excel表格数据写入hive中的问题汇总

news/2024/7/21 7:29:09 标签: excel, hive, hadoop

需求:内网通过Excel文件将数据同步到外网的CDH服务器中,将CDH中的文件数据写入hive中。

CDH版本为:6.3.2
spark版本为:2.4
python版本:2.7.5
操作系统:CentOS Linux 7
集群方式:yarn-cluster

一、在linux中将excel文件转换成CSV文件,然后上传到hdfs中。
为何要先转csv呢?主要原因是pyspark直接读取excel的话,涉及到版本的冲突问题。commons-collections-3.2.2.jar 在CDH6.3.2中的版本是3.2.2.但是pyspark直接读取excel要求collections4以上的版本,虽然也尝试将4以上的版本下载放进去,但是也没效果,因为时间成本的问题,所以没有做过多的尝试了,直接转为csv后再读吧。
spark引用第三方包

1.1 转csv的python代码(python脚本)

#-*- coding:utf-8 -*-
import pandas as pd
import os, xlrd ,sys

def xlsx_to_csv_pd(fn):
    path1="/home/lzl/datax/"+fn+".xlsx"
    path2="/home/lzl/datax/"+fn+".csv"
    data_xls = pd.read_excel(path1, index_col=0)
    data_xls.to_csv(path2, encoding='utf-8')

if __name__ == '__main__':
    fn=sys.argv[1]
    print(fn)
    try:
		xlsx_to_csv_pd(fn)
		print("转成成功!")
    except Exception as e:
		print("转成失败!")

1.2 数据中台上的代码(shell脚本):

#!/bin/bash
#@description:这是一句描述
#@author: admin(admin)
#@email: 
#@date: 2023-09-26 14:44:3

# 文件名称
fn="项目投运计划"

# xlsx转换成csv格式
ssh root@cdh02 " cd /home/lzl/shell; python xlsx2csv.py $fn" 

# 将文件上传到hfds上
ssh root@cdh02 "cd /home/lzl/datax; hdfs dfs -put $fn.csv /origin_data/sgd/excel/"
echo "上传成功~!"

# 删除csv文件
ssh root@cdh02 "cd /home/lzl/datax; rm -rf $fn.csv"
echo "删除成功~!"

二、pyspark写入hive
2.1 写入过程中遇到的问题点
2.1.1 每列的前后空格、以及存在换行符等问题。采取的措施是:循环列,采用trim函数、regexp_replace函数处理。

# 循环对每列去掉前后空格,以及删除换行符
import pyspark.sql.functions as F
from pyspark.sql.functions import col, regexp_replace

for name in df.columns:
    df = df.withColumn(name, F.trim(df[name]))
    df = df.withColumn(name, regexp_replace(col(name), "\n", ""))

2.1.2 个别字段存在科学计数法,需要用cast转换

from pyspark.sql.types import *

# 取消销售订单号的科学记数法
col="销售订单号"
df= df.withColumn(col,df[col].cast(DecimalType(10, 0)))

去掉换行符另一种方法:换行符问题也可以参照这个

2.2 数据中台代码(pyspark)

# -*- coding:utf-8
# coding=UTF-8

# 引入sys,方便输出到控制台时不是乱码
import  sys   
reload(sys)
sys.setdefaultencoding( "utf-8" )

# 引入模块
from pyspark.sql.types import IntegerType, DoubleType, StringType, StructType, StructField
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf, SQLContext 
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql.functions import col, regexp_replace
from pyspark.sql.types import *

# 设定资源大小
conf=SparkConf()\
    .set("spark.jars.packages","com.crealytics:spark-excel_2.11:0.11.1")\
    .set("spark.sql.shuffle.partitions", "4")\
    .set("spark.sql.execution.arrow.enabled", "true")\
    .set("spark.driver.maxResultSize","6G")\
    .set('spark.driver.memory','6G')\
    .set('spark.executor.memory','6G')

# 建立SparkSession
spark = SparkSession \
    .builder\
    .config(conf=conf)\
    .master("local[*]")\
    .appName("dataFrameApply") \
    .enableHiveSupport() \
    .getOrCreate()

# 读取cvs文件
# 文件名称和文件位置
fp= r"/origin_data/sgd/excel/项目投运计划.csv"
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("multiLine", "true") \
    .option("delimiter", ",") \
    .format("csv") \
    .load(fp)

# 查看数据类型
# df.printSchema()

# 循环对每列去掉前后空格,以及删除换行符
for name in df.columns:
    df = df.withColumn(name, F.trim(df[name]))
    df = df.withColumn(name, regexp_replace(col(name), "\n", ""))

# 取消销售订单号的科学记数法
col="销售订单号"
df= df.withColumn(col,df[col].cast(DecimalType(10, 0)))

df.show(25,truncate = False) # 查看数据,允许输出25行

# 设置日志级别 (这两个没用)
sc = spark.sparkContext
sc.setLogLevel("ERROR")

# 写入hive
spark.sql("use sgd_dev")  # 指定数据库

# 创建临时表格 ,注意建表时不能用'/'和''空格分隔,否则会影响2023/9/4和2023-07-31 00:00:00这样的数据
spark.sql("""
CREATE TABLE IF NOT EXISTS ods_sgd_project_operating_plan_info_tmp (
    project_no                string         ,
    sale_order_no             string         ,
    customer_name             string         ,
    unoperating_amt           decimal(19,2)  , 
    expected_operating_time   string         ,
    operating_amt             decimal(19,2)  ,  
    operating_progress_track  string         ,
    is_Supplied               string         ,
    operating_submit_time     string         ,
    Signing_contract_time     string         ,
    remake                    string  
    )
 ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'    
""")

# 注册临时表
df.createOrReplaceTempView("hdfs_df")
# spark.sql("select * from hdfs_df limit 5").show() #查看前5行数据

# 将数据插入hive临时表中
spark.sql("""
    insert overwrite table ods_sgd_project_operating_plan_info_tmp select * from hdfs_df
""")

# 将数据导入正式环境的hive
spark.sql("""
    insert overwrite table ods_sgd_project_operating_plan_info select * from ods_sgd_project_operating_plan_info_tmp
""")

# 查看导入后的数据
spark.sql("select * from ods_sgd_project_operating_plan_info limit 20").show(20,truncate = False)

# 删除注册的临时表
spark.sql("""
    drop table hdfs_df
""")

# 删除临时表
spark.sql("""
    drop table ods_sgd_project_operating_plan_info_tmp
""")

关于spark的更多知识,可以参看Spark SQL总结


http://www.niftyadmin.cn/n/5052188.html

相关文章

ISO 26262 系列学习笔记 ———— ASIL定义(Automotive Safety Integration Level)

文章目录 介绍严重度(Severity)暴露概率(Probability of Exposure)可控性(Controllability) 介绍 如果没有另行说明,则应满足ASIL A、B、C和D各分条款的要求或建议。这些要求和建议参考了安全目…

ubuntu 安装 flowiseai

flowiseai 可以快速的搭建AI应用 安装docker 安装docker https://docs.docker.com/desktop/install/linux-install/ 安装docker-compose 安装docker-compose https://blog.csdn.net/sunyuhua_keyboard/article/details/133070011?csdn_share_tail%7B%22type%22%3A%22blo…

改进YOLOv7 | 在 ELAN 模块中添加【Triplet】【SpatialGroupEnhance】【NAM】【S2】注意力机制 | 附详细结构图

在阅读本篇博文前,请大家先阅读 YOLOv7 yaml 文件简化 这篇文章, 本篇博文改进都是基于这篇博文进一步增加的 文章目录 1. Triplet 注意力模块1.1 原理1.2 ELAN_Triplet 模块和 ELAN_H_Triplet 模块代码2. SpatialGroupEnhance 注意力模块2.1 原理2.2 ELAN_SpatialGroupEnhan…

Java常用类(一)

⭐ 基本数据类型的包装类⭐ 包装类基本知识⭐ 包装类的用途⭐ 自动装箱和拆箱⭐ 自定义一个简单的包装类 ⭐ 字符串相关类 ⭐ 基本数据类型的包装类 我们之前写过八种基本数据类型并不是对象,为了将基本类型数据和对象之间实现互相转化,Java 为每一个基…

Spring面试题11:什么是Spring的依赖注入

该文章专注于面试,面试只要回答关键点即可,不需要对框架有非常深入的回答,如果你想应付面试,是足够了,抓住关键点 面试官:说一说Spring的依赖注入 依赖注入(Dependency Injection)是Spring框架的一个核心特性,它是指通过外部容器将对象的依赖关系注入到对象中,从而…

pg数据库操作,insert(sql)插入一条数据后获返回当前插入数据的id --chatGPT

gpt: 在 PostgreSQL 数据库中,可以使用 INSERT 语句插入一条数据,并通过 RETURNING 子句来返回插入数据的 ID。以下是一个示例 Go 代码来执行这个操作: go package main import ( "database/sql" "fmt" &…

UE学习记录07----C++中使用事件委托

1.c定义多播委托,示例代码: #include "Delegates/Delegate.h"DECLARE_DYNAMIC_MULTICAST_DELEGATE_OneParam(FMyDelegate, UObject*, SelectAgent);/****/ UCLASS(Blueprintable, DisplayName "VM_PlaceEntity") class PR_PLACEE…

贪心+二分+DP+矩阵快速幂:CF461E

https://codeforces.com/contest/461/problem/E 第一步:捕捉题目信息 四种字符 → \to → 矩阵 n ≤ 1 0 18 → n\le 10^{18}\to n≤1018→ 矩阵快速幂 → \to → dp最小值最大 → \to → 二分 第二步:分析性质 s s s 未知?那如果已知怎…