Spark SQL 2.4.8 操作 Dataframe的两种方式

一、测试数据

7369,smith,clerk,7902,1980/12/17,800,20
7499,allen,salesman,7698,1981/2/20,1600,300,30
7521,ward,salesman,7698,1981/2/22,1250,500,30
7566,jones,manager,7839,1981/4/2,2975,20
7654,martin,salesman,7698,1981/9/28,1250,1400,30
7698,blake,manager,7839,1981/5/1,2850,30
7782,clark,manager,7839,1981/6/9,2450,10
7788,scott,analyst,7566,1987/4/19,3000,20
7839,king,president,1981/11/17,5000,10
7844,turner,salesman,7698,1981/9/8,1500,0,30
7876,adams,clerk,7788,1987/5/23,1100,20
7900,james,clerk,7698,1981/12/3,9500,30
7902,ford,analyst,7566,1981/12/3,3000,20
7934,miller,clerk,7782,1982/1/23,1300,10

二、创建dataframe

方式一:dsl方式操作

  • 实例化sparkcontext和sparksession对象
  • 利用structtype类型构建schema,用于定义数据的结构信息
  • 通过sparkcontext对象读取文件,生成rdd
  • 将rdd[string]转换成rdd[row]
  • 通过sparksession对象创建dataframe
  • 完整代码如下:
package com.scala.demo.sql

import org.apache.spark.{sparkconf, sparkcontext}
import org.apache.spark.sql.{row, sparksession}
import org.apache.spark.sql.types.{datatype, datatypes, structfield, structtype}

object demo01 {
  def main(args: array[string]): unit = {
    // 1.创建sparkcontext和sparksession对象
    val sc = new sparkcontext(new sparkconf().setappname("demo01").setmaster("local[2]"))
    val sparksession = sparksession.builder().getorcreate()

    // 2. 使用structtype来定义schema
    val myschema = structtype(list(
      structfield("empno", datatypes.integertype, false),
      structfield("ename", datatypes.stringtype, false),
      structfield("job", datatypes.stringtype, false),
      structfield("mgr", datatypes.stringtype, false),
      structfield("hiredate", datatypes.stringtype, false),
      structfield("sal", datatypes.integertype, false),
      structfield("comm", datatypes.stringtype, false),
      structfield("deptno", datatypes.integertype, false)
    ))
    // 3. 读取数据
    val emprdd = sc.textfile("file:///d:\\testdatas\\emp.csv")

    // 4. 将其映射成row对象
    val rowrdd = emprdd.map(line => {
      val strings = line.split(",")
      row(strings(0).toint, strings(1), strings(2), strings(3), strings(4), strings(5).toint,strings(6), strings(7).toint)
    })

    // 5. 创建dataframe
    val dataframe = sparksession.createdataframe(rowrdd, myschema)

    // 6. 展示内容 dsl
	dataframe.groupby("deptno").sum("sal").as("result").sort("sum(sal)").show()
  }
}

结果如下:

 

方式二:sql方式操作

  • 实例化sparkcontext和sparksession对象
  • 创建case class emp样例类,用于定义数据的结构信息
  • 通过sparkcontext对象读取文件,生成rdd[string]
  • 将rdd[string]转换成rdd[emp]
  • 引入spark隐式转换函数(必须引入)
  • 将rdd[emp]转换成dataframe
  • 将dataframe注册成一张视图或者临时表
  • 通过调用sparksession对象的sql函数,编写sql语句
  • 停止资源
  • 具体代码如下:
package com.scala.demo.sql

import org.apache.spark.rdd.rdd
import org.apache.spark.sql.{row, sparksession}
import org.apache.spark.{sparkconf, sparkcontext}
import org.apache.spark.sql.types.{datatype, datatypes, structfield, structtype}

// 0. 数据分析
// 7499,allen,salesman,7698,1981/2/20,1600,300,30
// 1. 定义emp样例类
case class emp(empno:int,empname:string,job:string,mgr:string,hiredate:string,sal:int,comm:string,deptno:int)

object demo02 {
  def main(args: array[string]): unit = {
    // 2. 读取数据将其映射成row对象
    val sc = new sparkcontext(new sparkconf().setmaster("local[2]").setappname("demo02"))
    val maprdd = sc.textfile("file:///d:\\testdatas\\emp.csv")
      .map(_.split(","))

    val rowrdd:rdd[emp] = maprdd.map(line => emp(line(0).toint, line(1), line(2), line(3), line(4), line(5).toint, line(6), line(7).toint))

    // 3。创建dataframe
    val spark = sparksession.builder().getorcreate()
    // 引入spark隐式转换函数
    import spark.implicits._
    // 将rdd转成dataframe
    val dataframe = rowrdd.todf

    // 4.2 sql语句操作
    // 1、将dataframe注册成一张临时表
    dataframe.createorreplacetempview("emp")
    // 2. 编写sql语句进行操作
    spark.sql("select deptno,sum(sal) as total from emp group by deptno order by total desc").show()

    // 关闭资源
    spark.stop()
    sc.stop()
  }
}

结果如下:

到此这篇关于spark sql 2.4.8 操作 dataframe的两种方式的文章就介绍到这了,更多相关spark sql 操作 dataframe内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!

(0)
上一篇 2022年3月21日
下一篇 2022年3月21日

相关推荐