Hadoop:MR以parquet格式保存文件

2022年10月28日09:15:21

需求

此前公司用MR程序解析json,将结果以text file保存在hive表的目录下。随着数据量增大,text file的性能逐渐跟不上,需要修改代码将文件格式修改成parquet。

实现

以下是以parquet保存结果的demo。

将文本中的每行以空格分隔,第一列作为id(int),第二列作为name(string),直接保存到指定目录。

[root@kudu1 job]# cat test.txt 
1 xiaoming
2 hanmeimei
3 kangkang
4 maria
5 yokiko
6 michael

以下代码只有map程序,没有reduce。以parquet格式输出步骤为:

1. 创建parquet的schema信息

parquet的数据类型有INT64, INT32, BOOLEAN, BINARY, FLOAT, DOUBLE, INT96, FIXED_LEN_BYTE_ARRAY,其中INT64对标Java的Long;INT32对标int;BINARY对标String,但是需要指定编码格式。

        String writeSchema = "message example {\n optional INT32 id;\n optional binary name (UTF8);\n}";

2. 使用ExampleOutputFormat类配置parquet的schema、压缩格式、输出目录。

        // 配置MR的configuration
        Configuration configuration = new Configuration(this.getConf());
        configuration.set("mapreduce.input.fileinputformat.split.minsize","2147483648");
        configuration.set("parquet.example.schema",writeSchema);

3. 配置map的value输出格式为org.apache.parquet.example.data.Group。

parquet是列式存储,不同列的同一行表示为一个group。可以理解为一行就是一个group。每次map都是以group格式写入parquet文件的。

        job.setMapOutputValueClass(Group.class);//TODO 设置value是parquet的Group

4. 配置job的输出格式

        LazyOutputFormat.setOutputFormatClass(job, ExampleOutputFormat.class);
        MultipleOutputs.addNamedOutput(job, "output", ExampleOutputFormat.class, NullWritable.class,Group.class);

 5. 在Mapper类中定义用于创建group的工厂类。

//map类中定义 factory属性
private SimpleGroupFactory factory;

//setup初始化方法中创建SimpleGroupFactory
factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration()));

6.map方法中创建group并赋值,写出。

            String[] strings = value.toString().split(" ");
            //TODO 创建group
            Group group = factory.newGroup();
            //TODO 为group赋值
            group.add("id",Integer.valueOf(strings[0]));
            group.add("name",strings[1]);
            //TODO 写出
            mos.write("output",  null, group);

代码

以下为完整代码

package com.zixuan.hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.example.ExampleOutputFormat;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageTypeParser;

import java.io.IOException;
import java.util.Random;

public class MrSavedAsParquet extends Configured implements Tool {
    public static void main(String[] args) throws Exception {
        int ret = ToolRunner.run(new MrSavedAsParquet(), args);
        System.exit(ret);
    }

    public int run(String[] strings) throws Exception {

        String inputDir = "/user/hive/warehouse/inputdirTest";
        String outputDir = "/user/hive/warehouse/ods.db/usertest";

        //为parquet生成schema
        String writeSchema = "message example {\n optional INT32 id;\n optional binary name (UTF8);\n}";

        // 配置MR的configuration
        Configuration configuration = new Configuration(this.getConf());
        configuration.set("mapreduce.input.fileinputformat.split.minsize","2147483648");
        configuration.set("parquet.example.schema",writeSchema);

        Job job = new Job(configuration,"UserTest");
        //配置parquet
        ExampleOutputFormat.setSchema(job, MessageTypeParser.parseMessageType(writeSchema));
        ExampleOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
        ExampleOutputFormat.setOutputPath(job, new Path(outputDir));
        //配置Job的基本信息
        job.setJarByClass(MrSavedAsParquet.class);
        job.setMapperClass(MapTest.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Group.class);//TODO 设置value是parquet的Group

        MultipleInputs.addInputPath(job,new Path(inputDir), TextInputFormat.class,MapTest.class);
        job.setNumReduceTasks(0);

        //TODO 设置输出格式是parquet
        LazyOutputFormat.setOutputFormatClass(job, ExampleOutputFormat.class);
        MultipleOutputs.addNamedOutput(job, "output", ExampleOutputFormat.class, NullWritable.class,Group.class);

        FileSystem fileSystem = FileSystem.get(configuration);
        if ( ! fileSystem.exists(new Path(inputDir))){
            System.out.print("input path does not exist!");
            return 1;
        }

        int ret = job.waitForCompletion(true) ? 0 : 1;


        return ret;
    }

    public static class MapTest extends Mapper<LongWritable, Text, NullWritable, Group> {
        // 多目录输出
        private MultipleOutputs<NullWritable, Group> mos; // 多目录输出
        //定义用于创建group的工厂类
        private SimpleGroupFactory factory;

        //初始化,创建mos和factory
        @Override
        public void setup(Context context) throws IOException, InterruptedException {
            mos = new MultipleOutputs<NullWritable,Group >(context);// 初始化mos
            factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration()));
        }

        /*
            LongWritable key是文件偏移量
            Text value是每行的数据
            Context context是上下文对象,可以获取conf中的配置项
         */
        @Override
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] strings = value.toString().split(" ");
            //TODO 创建group
            Group group = factory.newGroup();
            //TODO 为group赋值
            group.add("id",Integer.valueOf(strings[0]));
            group.add("name",strings[1]);
            //TODO 写出
            mos.write("output",  null, group);
        }

        @Override
        protected void cleanup(Context context) throws IOException, InterruptedException {
            mos.close();
        }
    }
}

文件已经保存到/user/hive/warehouse/ods.db/usertest目录下,创建ods.usertest的hive表:

CREATE TABLE ods.usertest
(
id   int,
name string
)
stored as parquet;

查询:

hive> select * from ods.usertest;
OK
1	xiaoming
2	hanmeimei
3	kangkang
4	maria
5	yokiko
6	michael
Time taken: 0.053 seconds, Fetched: 6 row(s)
  • 作者:没有文化,啥也不会
  • 原文链接:https://blog.csdn.net/x950913/article/details/110789193
    更新时间:2022年10月28日09:15:21 ,共 5493 字。