前言
MapReduce中读写文件一般都存储在HDFS上,所以需要读取的文件需要上传到HDFS文件系统中。
一、定义MapReduce
1.定义Job示例
定时时候需要注意
- InputFormat类的数据格式。其数据格式为Mapper中的输入格式。
- OutputFormat类的数据格式。其数据格式为Reduce中输出的格式。
定义Job代码如下(示例):
packagecom.study.spark.mr;importcom.study.spark.mr.mapper.FixedLengthMapper;importcom.study.spark.mr.utils.ParquetDataSchema;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.BytesWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FixedLengthInputFormat;importorg.apache.parquet.example.data.Group;importorg.apache.parquet.hadoop.ParquetOutputFormat;importorg.apache.parquet.hadoop.example.GroupWriteSupport;importorg.apache.parquet.schema.MessageType;importorg.apache.parquet.schema.Type;importjava.util.ArrayList;importjava.util.List;publicclassFileParquetExample{publicvoidmr()throwsException{Configuration configuration=newConfiguration();Job job=Job.getInstance(configuration);//设置执行的
job.setJarByClass(FileParquetExample.class);
job.setJobName("FileParquetExample");String struct="当前这个文件的需要转出的数据格式,可以从文件中读取";
configuration.set("data.struct", struct);
job.setInputFormatClass(FixedLengthInputFormat.class);int recordLength=100;//定长行长度FixedLengthInputFormat.setRecordLength(configuration, recordLength);FixedLengthInputFormat.setInputPaths(job,newPath("hdfs://input/data/text.dat.gz"));
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setMapperClass(FixedLengthMapper.class);
job.setOutputFormatClass(ParquetOutputFormat.class);ParquetOutputFormat.setWriteSupportClass(job,GroupWriteSupport.class);GroupWriteSupport.setSchema(parquetSchema(), configuration);ParquetOutputFormat.setOutputPath(job,newPath("hdfs://output/data/text"));
job.setOutputKeyClass(Void.class);
job.setOutputValueClass(Group.class);//从文件提交到结束boolean isFinsh= job.waitForCompletion(true);//执行成功或是失败的处理}/**
* 示例Parquet数据格式
*/publicMessageTypeparquetSchema(){List<Type> types=newArrayList<>();
types.add(ParquetDataSchema.stringTypeInfo("name",Type.Repetition.REQUIRED));
types.add(ParquetDataSchema.intTypeInfo("age",Type.Repetition.OPTIONAL));
types.add(ParquetDataSchema.decimalTypeInfo("deposit",Type.Repetition.REQUIRED,24,4));//数组类型,()里面参数时固定的Type arrayDataType=ParquetDataSchema.stringTypeInfo("array_element",Type.Repetition.OPTIONAL);
types.add(ParquetDataSchema.arrayTypeInfo("friends",Type.Repetition.OPTIONAL, arrayDataType));//数组类型,()里面参数时固定的Type keyType=ParquetDataSchema.stringTypeInfo("key",Type.Repetition.OPTIONAL);Type valueType=ParquetDataSchema.intTypeInfo("value",Type.Repetition.OPTIONAL);
types.add(ParquetDataSchema.mapTypeInfo("map", keyType, valueType));//测试直接使用,
types.add(ParquetDataSchema.structTypeInfo("schema", types));returnParquetDataSchema.convert(types);}}
2.Mapper
- 处理从InputFormat中读取到的数据。
- 其中输入数据类型为InputFormat的数据类型。
- 输出数据类型为Reduce中输入的数据类型。
定义Map示例代码
packagecom.study.spark.mr.mapper;importorg.apache.hadoop.io.BytesWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;publicclassFixedLengthMapperextendsMapper<LongWritable,BytesWritable,LongWritable,BytesWritable>{/**
* 在这里完成,对数据的修改。如果不错修改也可以放到Reduce中进行修改
*/protectedvoidmap(LongWritable key,BytesWritable value,Context context)throwsIOException,InterruptedException{
context.write(key, value);}}
3.Reduce
- 处理从Mapper中传输过来的数据。
- 将数据转换为输出文件需要的数据类型
定义Reduce示例代码
packagecom.study.spark.mr.reduce;importorg.apache.hadoop.io.BytesWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.parquet.example.data.Group;importorg.apache.parquet.example.data.simple.SimpleGroupFactory;importorg.apache.parquet.hadoop.example.GroupWriteSupport;importorg.apache.parquet.schema.MessageType;importscala.collection.parallel.ParIterableLike;importjava.io.IOException;publicclassParquetReduceextendsReducer<LongWritable,BytesWritable,Void,Group>{privateSimpleGroupFactory simpleGroupFactory;/**
* 在任务开始时,初始化。用于定义reduce方法中需要用到的数据。
*/protectedvoidsetup(Context context)throwsIOException,InterruptedException{MessageType messageType=GroupWriteSupport.getSchema(context.getConfiguration());
simpleGroupFactory=newSimpleGroupFactory(messageType);//可以在这里获取定长文件,每个字段的起始位和结束位。转为在reduce方法中容易读取的对象String struct= context.getConfiguration().get("data.struct");}/**
* 循环Mapper中读取到的数据
*/@SuppressWarnings("unchecked")protectedvoidreduce(LongWritable key,Iterable<BytesWritable> values,Context context)throwsIOException,InterruptedException{for(BytesWritable value: values){byte[] bytes= value.getBytes();//根据自己需要,截取定长文件。Group group= simpleGroupFactory.newGroup();//截取到的数据。根据定义的Schema。指定以输出//如:group.add("age",ParquetDataWrite.intWriter(new Integer(10)))//group.add(2,ParquetDataWrite.stringWriter("名称"));
context.write(null,group);}}}
二、Parquet数据格式
1 定义输出Parquet文件格式
因Parquet数据是有格式的,需要将定义每行数据格式
Repetition.REQUIRED
字段必须有对应的值必填字段可以采用当前格式
Repetition.OPTIONAL
字段可以有对应的值如果不是必填项可以采用当前格式
Repetition.REPEATED
字段对应值可以重复
/**
* 定义Parquet格式
*/packagecom.study.spark.mr.utils;importorg.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;importorg.apache.parquet.schema.*;importorg.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;importorg.apache.parquet.schema.Type.Repetition;importjava.util.List;/**
* 定义Parquet Schema格式
* GroupType是Type的子类
*/publicclassParquetDataSchema{publicstaticMessageTypeconvert(String schemaName,finalList<Type> types){finalMessageType schema=newMessageType(schemaName, types);return schema;}publicstaticMessageTypeconvert(finalList<Type> types){returnconvert("hive_schema", types);}/**
* 支持数据类型:String,Char, VarChar
*/publicstaticTypestringTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.BINARY, repetition).as(OriginalType.UTF8).named(name);}/**
* 支持数据类型:int
*/publicstaticTypeintTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.INT32, repetition).named(name);}/**
* 支持数据类型:short
*/publicstaticTypeshortTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.INT32, repetition).as(OriginalType.INT_16).named(name);}/**
* 支持数据类型:byte
*/publicstaticTypebyteTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.INT32, repetition).as(OriginalType.INT_8).named(name);}/**
* 支持数据类型:long
*/publicstaticTypelongTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.INT64, repetition).named(name);}/**
* 支持数据类型:double
*/publicstaticTypedoubleTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.DOUBLE, repetition).named(name);}/**
* 支持数据类型:float
*/publicstaticTypefloatTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.FLOAT, repetition).named(name);}/**
* 支持数据类型:boolean
*/publicstaticTypebooleanTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.BOOLEAN, repetition).named(name);}/**
* 支持数据类型:byte[]
*/publicstaticTypebinaryTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.BINARY, repetition).named(name);}/**
* 支持数据类型:timestamp
*/publicstaticTypetimestampTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.INT96, repetition).named(name);}/**
* 支持数据类型:decimal
*
* @param prec 数据长度
* @param scale 小数点数
*/publicstaticTypedecimalTypeInfo(String name,Repetition repetition,int prec,int scale){int bytes=ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec-1];returnTypes.optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(bytes).as(OriginalType.DECIMAL).scale(scale).precision(prec).named(name);}/**
* 支持数据类型:date
*/publicstaticTypedateTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.INT32, repetition).as(OriginalType.DATE).named(name);}/**
* 支持数据类型:Array,List
*
* @param subType: 表示数据类型如 stringTypeInfo("array_element",Repetition.OPTIONAL)
* 在使用的时候只要修改一下数组的数据类型即可,后面参数保持一致
*/publicstaticGroupTypearrayTypeInfo(finalString name,Repetition repetition,Type subType){returnnewGroupType(repetition, name,OriginalType.LIST,newGroupType(Repetition.REPEATED,ParquetHiveSerDe.ARRAY.toString(), subType));}/**
* 支持数据类型:Map
*
* @param keyType 表示键数据类型如 stringTypeInfo("key",Repetition.OPTIONAL)
* @param valueType 表示值数据类型如 stringTypeInfo("value",Repetition.OPTIONAL)
* 在使用的时候只要修改一下数组的数据类型即可,后面参数保持一致
*/publicstaticGroupTypemapTypeInfo(String name,Type keyType,Type valueType){returnConversionPatterns.mapType(Repetition.OPTIONAL, name, keyType, valueType);}/**
* 支持数据类型:Struct
*
* @param types 如:new ArrayList(stringTypeInfo("name",Repetition.OPTIONAL), intTypeInfo("age",Repetition.OPTIONAL));
*/publicstaticGroupTypestructTypeInfo(finalString name,List<Type> types){returnnewGroupType(Repetition.OPTIONAL, name, types);}}
其中
Category
数据类别
publicstaticenumCategory{
PRIMITIVE,//原始数据或是基础数据类别
LIST,//数组
MAP,//数据 key,value形式的
STRUCT,//数据是
UNION;privateCategory(){}}
测试数据示例
publicMessageTypeparquetSchema(){List<