需求
此前公司用MR程序解析json,将结果以text file保存在hive表的目录下。随着数据量增大,text file的性能逐渐跟不上,需要修改代码将文件格式修改成parquet。
实现
以下是以parquet保存结果的demo。
将文本中的每行以空格分隔,第一列作为id(int),第二列作为name(string),直接保存到指定目录。
[root@kudu1 job]# cat test.txt
1 xiaoming
2 hanmeimei
3 kangkang
4 maria
5 yokiko
6 michael
以下代码只有map程序,没有reduce。以parquet格式输出步骤为:
1. 创建parquet的schema信息
parquet的数据类型有INT64, INT32, BOOLEAN, BINARY, FLOAT, DOUBLE, INT96, FIXED_LEN_BYTE_ARRAY,其中INT64对标Java的Long;INT32对标int;BINARY对标String,但是需要指定编码格式。
String writeSchema = "message example {\n optional INT32 id;\n optional binary name (UTF8);\n}";
2. 使用ExampleOutputFormat类配置parquet的schema、压缩格式、输出目录。
// 配置MR的configuration
Configuration configuration = new Configuration(this.getConf());
configuration.set("mapreduce.input.fileinputformat.split.minsize","2147483648");
configuration.set("parquet.example.schema",writeSchema);
3. 配置map的value输出格式为org.apache.parquet.example.data.Group。
parquet是列式存储,不同列的同一行表示为一个group。可以理解为一行就是一个group。每次map都是以group格式写入parquet文件的。
job.setMapOutputValueClass(Group.class);//TODO 设置value是parquet的Group
4. 配置job的输出格式
LazyOutputFormat.setOutputFormatClass(job, ExampleOutputFormat.class);
MultipleOutputs.addNamedOutput(job, "output", ExampleOutputFormat.class, NullWritable.class,Group.class);
5. 在Mapper类中定义用于创建group的工厂类。
//map类中定义 factory属性
private SimpleGroupFactory factory;
//setup初始化方法中创建SimpleGroupFactory
factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration()));
6.map方法中创建group并赋值,写出。
String[] strings = value.toString().split(" ");
//TODO 创建group
Group group = factory.newGroup();
//TODO 为group赋值
group.add("id",Integer.valueOf(strings[0]));
group.add("name",strings[1]);
//TODO 写出
mos.write("output", null, group);
代码
以下为完整代码
package com.zixuan.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.example.ExampleOutputFormat;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageTypeParser;
import java.io.IOException;
import java.util.Random;
public class MrSavedAsParquet extends Configured implements Tool {
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new MrSavedAsParquet(), args);
System.exit(ret);
}
public int run(String[] strings) throws Exception {
String inputDir = "/user/hive/warehouse/inputdirTest";
String outputDir = "/user/hive/warehouse/ods.db/usertest";
//为parquet生成schema
String writeSchema = "message example {\n optional INT32 id;\n optional binary name (UTF8);\n}";
// 配置MR的configuration
Configuration configuration = new Configuration(this.getConf());
configuration.set("mapreduce.input.fileinputformat.split.minsize","2147483648");
configuration.set("parquet.example.schema",writeSchema);
Job job = new Job(configuration,"UserTest");
//配置parquet
ExampleOutputFormat.setSchema(job, MessageTypeParser.parseMessageType(writeSchema));
ExampleOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
ExampleOutputFormat.setOutputPath(job, new Path(outputDir));
//配置Job的基本信息
job.setJarByClass(MrSavedAsParquet.class);
job.setMapperClass(MapTest.class);
job.setInputFormatClass(TextInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Group.class);//TODO 设置value是parquet的Group
MultipleInputs.addInputPath(job,new Path(inputDir), TextInputFormat.class,MapTest.class);
job.setNumReduceTasks(0);
//TODO 设置输出格式是parquet
LazyOutputFormat.setOutputFormatClass(job, ExampleOutputFormat.class);
MultipleOutputs.addNamedOutput(job, "output", ExampleOutputFormat.class, NullWritable.class,Group.class);
FileSystem fileSystem = FileSystem.get(configuration);
if ( ! fileSystem.exists(new Path(inputDir))){
System.out.print("input path does not exist!");
return 1;
}
int ret = job.waitForCompletion(true) ? 0 : 1;
return ret;
}
public static class MapTest extends Mapper<LongWritable, Text, NullWritable, Group> {
// 多目录输出
private MultipleOutputs<NullWritable, Group> mos; // 多目录输出
//定义用于创建group的工厂类
private SimpleGroupFactory factory;
//初始化,创建mos和factory
@Override
public void setup(Context context) throws IOException, InterruptedException {
mos = new MultipleOutputs<NullWritable,Group >(context);// 初始化mos
factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration()));
}
/*
LongWritable key是文件偏移量
Text value是每行的数据
Context context是上下文对象,可以获取conf中的配置项
*/
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strings = value.toString().split(" ");
//TODO 创建group
Group group = factory.newGroup();
//TODO 为group赋值
group.add("id",Integer.valueOf(strings[0]));
group.add("name",strings[1]);
//TODO 写出
mos.write("output", null, group);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
}
文件已经保存到/user/hive/warehouse/ods.db/usertest目录下,创建ods.usertest的hive表:
CREATE TABLE ods.usertest
(
id int,
name string
)
stored as parquet;
查询:
hive> select * from ods.usertest;
OK
1 xiaoming
2 hanmeimei
3 kangkang
4 maria
5 yokiko
6 michael
Time taken: 0.053 seconds, Fetched: 6 row(s)