如何使用spark-submit提交sparkSQL任务

2022-09-05 14:19:57

前言

本文以sparkSQL在elasticsearch中的关联查询为例介绍了使用java开发spark sql应用程序提交到yarn上运行的全过程,并介绍解决了一些开发和提交中常见的问题。

准备

首先我们要搭建好spark on yarn的集群环境,我是用cdh安装和cloudera manager进行维护的。
在这里插入图片描述
这里用到的组件主要有yarn、hdfs和hive。

对于开发人员来说唯一要准备的就是代码开发环境,使用idea创建一个maven项目,在pom.xml中添加如下内容:

<properties>
        <java.version>1.8</java.version>
        <scala.version>2.10.5</scala.version>
        <!--<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>-->
        <!--禁用测试-->
        <maven.test.skip>true</maven.test.skip>
    </properties>
    <dependencies>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-spark-20_2.10</artifactId>
            <!--支持spark使用6.4.3-->
            <version>6.4.3</version>
            <exclusions>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>janino</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>commons-compiler</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>commons-compiler</artifactId>
            <version>2.7.8</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>janino</artifactId>
            <version>2.7.8</version>
        </dependency>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.13.Final</version>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.2</version>
            <exclusions>
                <exclusion>
                    <groupId>javax.servlet</groupId>
                    <artifactId>servlet-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>2.1.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>janino</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>commons-compiler</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>2.1.2</version>
            <exclusions>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>janino</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>commons-compiler</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>janino</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>commons-compiler</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.scala-lang.modules</groupId>
            <artifactId>scala-xml_2.11</artifactId>
            <version>1.0.6</version>
            <exclusions>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>janino</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.codehaus.janino</groupId>
                    <artifactId>commons-compiler</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

    </dependencies>

代码开发

创建一个包例如:com.epic.test,在其中创建一个测试用的主类如:TestSQL01。在main函数中我们需要做如下操作:

1.获取Context

使用spark-submit提交任务时,所有配置都可以通过command设置的,但是如果图方便或者想从外部配置文件导入配置的时候还是可以使用如下函数添加配置的。例如AppName和一些外部数据源的配置参数:

		SparkConf sparkConf = new SparkConf().setAppName("TestSQL01");
        sparkConf.set("es.index.auto.create","true");  //在spark中自动创建es中的索引
        sparkConf.set("es.nodes","保密"); //设置在spark中连接es的url和端口
        sparkConf.set("es.port","9200");
        sparkConf.set("es.nodes.wan.only","true");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        SQLContext sql = new SQLContext(sc);

这样我们就获取到一个SQLContext实例sql

2.创建临时表

相比较使用esRDD的方式,创建一张临时表以进行多表数据的关联查询更加方便,这里以创建elasticsearch关联查询临时表为例:

		Dataset d0 = JavaEsSparkSQL.esDF(sql, "site_base");
        d0.registerTempTable("site_base");

这样我们就创建一张临时表site_base并缓存在SparkSession中。(registerTempTable是一个弃用方法,在新版中使用的是createOrReplaceTempView)

3.执行sql语句

		Dataset df = sql.sql("SELECT * FROM site_base");

spark sql 通过将SQL语句中蕴含的逻辑映射到逻辑算子树的不同节点,经过解析、优化等阶段生成物理算子树。物理算子树会直接生成RDD或对RDD进行transformation转换操作。

4.获取结果

		for (Row row : (List<Row>)df.collectAsList()) {
            log.info(row.toString());
        }

提交

如果要使用spark-submit提交任务必须把程序打包成jar,幸好idea提供了Artifacts打包功能非常方便,打开idea的Project Structure。

在这里插入图片描述

点击"+"号,添加一个jar打包方式,唯一需要配置的是MainClass我们选择TestSQL01即可,其他都使用默认配置即可。

将jar包上传到服务器某个目录下面,使用:

spark2-submit --class com.epic.test.TestSQL01 --executor-memory 4G --num-executors 8 --master yarn-client sparksqltest01.jar

即可运行提交。
在这里插入图片描述

常见问题

1.主类名与AppName名称不一致

可能会报java.lang.ClassNotFoundException错误,修改成一致的即可。

2.重复依赖问题

很多时候我们使用maven构建项目都不可避免的产生重复导入一些第三方库的问题,我在使用idea自带的打包工具打包后,在提交任务的时候就遇到:

Exception in thread "main" java.lang.SecurityException: Invalid signature file digest for Manifest main attributes

这个错误最简单的解决办法就是打开META-INF目录,将*.SF,.DSA,.RSA文件删除即可。
在linux服务器上使用:

zip -d sparksqltest01.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF

然后重新提交jar包即可。

3.不能使用root用户提交任务

切换到其他用户即可。

  • 作者:丧心病狂の程序员
  • 原文链接:https://blog.csdn.net/vic_torsun/article/details/100108162
    更新时间:2022-09-05 14:19:57