SparkSql输出数据
一、普通文件输出方式
方式一:给定输出数据源的类型和地址
1 2 3 | df.write.format("json").save(path) df.write.format("csv").save(path) df.write.format("parquet").save(path) |
方式二:直接调用对应数据源类型的方法
1 2 3 | df.write.json(path) df.write.csv(path) df.write.parquet(path) |
append: 追加模式,当数据存在时,继续追加
overwrite: 覆写模式,当数据存在时,覆写以前数据,存储当前最新数据;
error/errorifexists: 如果目标存在就报错,默认的模式
ignore: 忽略,数据存在时不做任何操作
代码编写模板:
1 | df.write.mode(saveMode="append").format("csv").save(path) |
代码演示普通的文件输出格式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | import os from pyspark.sql import SparkSession if __name__ = = '__main__' : # 配置环境 os.environ[ 'JAVA_HOME' ] = 'C:/Program Files/Java/jdk1.8.0_241' # 配置Hadoop的路径,就是前面解压的那个路径 os.environ[ 'HADOOP_HOME' ] = 'D:/hadoop-3.3.1' # 配置base环境Python解析器的路径 os.environ[ 'PYSPARK_PYTHON' ] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径 os.environ[ 'PYSPARK_DRIVER_PYTHON' ] = 'C:/ProgramData/Miniconda3/python.exe' spark = SparkSession.builder.master( "local[2]" ).appName("").config( "spark.sql.shuffle.partitions" , 2 ).getOrCreate() df = spark.read.json( "../../datas/person.json" ) # 获取年龄最大的人的名字 df.createOrReplaceTempView( "persons" ) rsDf = spark.sql( """ select name,age from persons where age = (select max(age) from persons) """ ) # 将结果打印到控制台 #rsDf.write.format("console").save() #rsDf.write.json("../../datas/result",mode="overwrite") #rsDf.write.mode(saveMode='overwrite').format("json").save("../../datas/result") #rsDf.write.mode(saveMode='overwrite').format("csv").save("../../datas/result1") #rsDf.write.mode(saveMode='overwrite').format("parquet").save("../../datas/result2") #rsDf.write.mode(saveMode='append').format("csv").save("../../datas/result1") # text 保存路径为hdfs 直接报错,不支持 #rsDf.write.mode(saveMode='overwrite').text("hdfs://bigdata01:9820/result") #rsDf.write.orc("hdfs://bigdata01:9820/result",mode="overwrite") rsDf.write.parquet( "hdfs://bigdata01:9820/result" , mode = "overwrite" ) spark.stop() |
二、保存到数据库中
代码演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | import os # 导入pyspark模块 from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession if __name__ = = '__main__' : # 配置环境 os.environ[ 'JAVA_HOME' ] = 'D:\Download\Java\JDK' # 配置Hadoop的路径,就是前面解压的那个路径 os.environ[ 'HADOOP_HOME' ] = 'D:\\bigdata\hadoop-3.3.1\hadoop-3.3.1' # 配置base环境Python解析器的路径 os.environ[ 'PYSPARK_PYTHON' ] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径 os.environ[ 'PYSPARK_DRIVER_PYTHON' ] = 'C:/ProgramData/Miniconda3/python.exe' spark = SparkSession.builder.master( 'local[*]' ).appName('').config( "spark.sql.shuffle.partitions" , 2 ).getOrCreate() df5 = spark.read. format ( "csv" ).option( "sep" , "\t" ).load( "../../datas/zuoye/emp.tsv" )\ .toDF( 'eid' , 'ename' , 'salary' , 'sal' , 'dept_id' ) df5.createOrReplaceTempView( 'emp' ) rsDf = spark.sql( "select * from emp" ) rsDf.write. format ( "jdbc" ) \ .option( "driver" , "com.mysql.cj.jdbc.Driver" ) \ .option( "url" , "jdbc:mysql://bigdata01:3306/mysql" ) \ .option( "user" , "root" ) \ .option( "password" , "123456" ) \ .option( "dbtable" , "emp1" ) \ .save(mode = "overwrite" ) spark.stop() # 使用完后,记得关闭 |
三、保存到hive中
代码演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 | import os # 导入pyspark模块 from pyspark import SparkContext, SparkConf from pyspark.sql import SparkSession if __name__ = = '__main__' : # 配置环境 os.environ[ 'JAVA_HOME' ] = 'D:\Download\Java\JDK' # 配置Hadoop的路径,就是前面解压的那个路径 os.environ[ 'HADOOP_HOME' ] = 'D:\\bigdata\hadoop-3.3.1\hadoop-3.3.1' # 配置base环境Python解析器的路径 os.environ[ 'PYSPARK_PYTHON' ] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径 os.environ[ 'PYSPARK_DRIVER_PYTHON' ] = 'C:/ProgramData/Miniconda3/python.exe' os.environ[ 'HADOOP_USER_NAME' ] = 'root' spark = SparkSession \ .builder \ .appName( "HiveAPP" ) \ .master( "local[2]" ) \ .config( "spark.sql.warehouse.dir" , 'hdfs://bigdata01:9820/user/hive/warehouse' ) \ .config( 'hive.metastore.uris' , 'thrift://bigdata01:9083' ) \ .config( "spark.sql.shuffle.partitions" , 2 ) \ .enableHiveSupport() \ .getOrCreate() df5 = spark.read. format ( "csv" ).option( "sep" , "\t" ).load( "../../datas/zuoye/emp.tsv" ) \ .toDF( 'eid' , 'ename' , 'salary' , 'sal' , 'dept_id' ) df5.createOrReplaceTempView( 'emp' ) rsDf = spark.sql( "select * from emp" ) rsDf.write.saveAsTable( "spark.emp" ) spark.stop() # 使用完后,记得关闭 |