MapReduce读定长文件写入Parquet文件

2022-09-01 14:45:32

前言

MapReduce中读写文件一般都存储在HDFS上,所以需要读取的文件需要上传到HDFS文件系统中。


一、定义MapReduce

1.定义Job示例

定时时候需要注意

  1. InputFormat类的数据格式。其数据格式为Mapper中的输入格式。
  2. OutputFormat类的数据格式。其数据格式为Reduce中输出的格式。

定义Job代码如下(示例):

packagecom.study.spark.mr;importcom.study.spark.mr.mapper.FixedLengthMapper;importcom.study.spark.mr.utils.ParquetDataSchema;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.BytesWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FixedLengthInputFormat;importorg.apache.parquet.example.data.Group;importorg.apache.parquet.hadoop.ParquetOutputFormat;importorg.apache.parquet.hadoop.example.GroupWriteSupport;importorg.apache.parquet.schema.MessageType;importorg.apache.parquet.schema.Type;importjava.util.ArrayList;importjava.util.List;publicclassFileParquetExample{publicvoidmr()throwsException{Configuration configuration=newConfiguration();Job job=Job.getInstance(configuration);//设置执行的
        job.setJarByClass(FileParquetExample.class);
        job.setJobName("FileParquetExample");String struct="当前这个文件的需要转出的数据格式,可以从文件中读取";
        configuration.set("data.struct", struct);

        job.setInputFormatClass(FixedLengthInputFormat.class);int recordLength=100;//定长行长度FixedLengthInputFormat.setRecordLength(configuration, recordLength);FixedLengthInputFormat.setInputPaths(job,newPath("hdfs://input/data/text.dat.gz"));
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(BytesWritable.class);
        job.setMapperClass(FixedLengthMapper.class);

        job.setOutputFormatClass(ParquetOutputFormat.class);ParquetOutputFormat.setWriteSupportClass(job,GroupWriteSupport.class);GroupWriteSupport.setSchema(parquetSchema(), configuration);ParquetOutputFormat.setOutputPath(job,newPath("hdfs://output/data/text"));
        job.setOutputKeyClass(Void.class);
        job.setOutputValueClass(Group.class);//从文件提交到结束boolean isFinsh= job.waitForCompletion(true);//执行成功或是失败的处理}/**
	* 示例Parquet数据格式
	*/publicMessageTypeparquetSchema(){List<Type> types=newArrayList<>();
        types.add(ParquetDataSchema.stringTypeInfo("name",Type.Repetition.REQUIRED));
        types.add(ParquetDataSchema.intTypeInfo("age",Type.Repetition.OPTIONAL));
        types.add(ParquetDataSchema.decimalTypeInfo("deposit",Type.Repetition.REQUIRED,24,4));//数组类型,()里面参数时固定的Type arrayDataType=ParquetDataSchema.stringTypeInfo("array_element",Type.Repetition.OPTIONAL);
        types.add(ParquetDataSchema.arrayTypeInfo("friends",Type.Repetition.OPTIONAL, arrayDataType));//数组类型,()里面参数时固定的Type keyType=ParquetDataSchema.stringTypeInfo("key",Type.Repetition.OPTIONAL);Type valueType=ParquetDataSchema.intTypeInfo("value",Type.Repetition.OPTIONAL);
        types.add(ParquetDataSchema.mapTypeInfo("map", keyType, valueType));//测试直接使用,
        types.add(ParquetDataSchema.structTypeInfo("schema", types));returnParquetDataSchema.convert(types);}}

2.Mapper

  1. 处理从InputFormat中读取到的数据。
  2. 其中输入数据类型为InputFormat的数据类型。
  3. 输出数据类型为Reduce中输入的数据类型。

定义Map示例代码

packagecom.study.spark.mr.mapper;importorg.apache.hadoop.io.BytesWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;publicclassFixedLengthMapperextendsMapper<LongWritable,BytesWritable,LongWritable,BytesWritable>{/**
     * 在这里完成,对数据的修改。如果不错修改也可以放到Reduce中进行修改
     */protectedvoidmap(LongWritable key,BytesWritable value,Context context)throwsIOException,InterruptedException{
        context.write(key, value);}}

3.Reduce

  1. 处理从Mapper中传输过来的数据。
  2. 将数据转换为输出文件需要的数据类型

定义Reduce示例代码

packagecom.study.spark.mr.reduce;importorg.apache.hadoop.io.BytesWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.parquet.example.data.Group;importorg.apache.parquet.example.data.simple.SimpleGroupFactory;importorg.apache.parquet.hadoop.example.GroupWriteSupport;importorg.apache.parquet.schema.MessageType;importscala.collection.parallel.ParIterableLike;importjava.io.IOException;publicclassParquetReduceextendsReducer<LongWritable,BytesWritable,Void,Group>{privateSimpleGroupFactory simpleGroupFactory;/**
     * 在任务开始时,初始化。用于定义reduce方法中需要用到的数据。
     */protectedvoidsetup(Context context)throwsIOException,InterruptedException{MessageType messageType=GroupWriteSupport.getSchema(context.getConfiguration());
       simpleGroupFactory=newSimpleGroupFactory(messageType);//可以在这里获取定长文件,每个字段的起始位和结束位。转为在reduce方法中容易读取的对象String struct= context.getConfiguration().get("data.struct");}/**
     * 循环Mapper中读取到的数据
     */@SuppressWarnings("unchecked")protectedvoidreduce(LongWritable key,Iterable<BytesWritable> values,Context context)throwsIOException,InterruptedException{for(BytesWritable value: values){byte[] bytes= value.getBytes();//根据自己需要,截取定长文件。Group group=  simpleGroupFactory.newGroup();//截取到的数据。根据定义的Schema。指定以输出//如:group.add("age",ParquetDataWrite.intWriter(new Integer(10)))//group.add(2,ParquetDataWrite.stringWriter("名称"));
            context.write(null,group);}}}

二、Parquet数据格式

1 定义输出Parquet文件格式

因Parquet数据是有格式的,需要将定义每行数据格式
Repetition.REQUIRED 字段必须有对应的值必填字段可以采用当前格式
Repetition.OPTIONAL 字段可以有对应的值如果不是必填项可以采用当前格式
Repetition.REPEATED 字段对应值可以重复

/**
* 定义Parquet格式
*/packagecom.study.spark.mr.utils;importorg.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;importorg.apache.parquet.schema.*;importorg.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;importorg.apache.parquet.schema.Type.Repetition;importjava.util.List;/**
 * 定义Parquet Schema格式
 * GroupType是Type的子类
 */publicclassParquetDataSchema{publicstaticMessageTypeconvert(String schemaName,finalList<Type> types){finalMessageType schema=newMessageType(schemaName, types);return schema;}publicstaticMessageTypeconvert(finalList<Type> types){returnconvert("hive_schema", types);}/**
     * 支持数据类型:String,Char, VarChar
     */publicstaticTypestringTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.BINARY, repetition).as(OriginalType.UTF8).named(name);}/**
     * 支持数据类型:int
     */publicstaticTypeintTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.INT32, repetition).named(name);}/**
     * 支持数据类型:short
     */publicstaticTypeshortTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.INT32, repetition).as(OriginalType.INT_16).named(name);}/**
     * 支持数据类型:byte
     */publicstaticTypebyteTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.INT32, repetition).as(OriginalType.INT_8).named(name);}/**
     * 支持数据类型:long
     */publicstaticTypelongTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.INT64, repetition).named(name);}/**
     * 支持数据类型:double
     */publicstaticTypedoubleTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.DOUBLE, repetition).named(name);}/**
     * 支持数据类型:float
     */publicstaticTypefloatTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.FLOAT, repetition).named(name);}/**
     * 支持数据类型:boolean
     */publicstaticTypebooleanTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.BOOLEAN, repetition).named(name);}/**
     * 支持数据类型:byte[]
     */publicstaticTypebinaryTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.BINARY, repetition).named(name);}/**
     * 支持数据类型:timestamp
     */publicstaticTypetimestampTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.INT96, repetition).named(name);}/**
     * 支持数据类型:decimal
     *
     * @param prec  数据长度
     * @param scale 小数点数
     */publicstaticTypedecimalTypeInfo(String name,Repetition repetition,int prec,int scale){int bytes=ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec-1];returnTypes.optional(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY).length(bytes).as(OriginalType.DECIMAL).scale(scale).precision(prec).named(name);}/**
     * 支持数据类型:date
     */publicstaticTypedateTypeInfo(String name,Repetition repetition){returnTypes.primitive(PrimitiveTypeName.INT32, repetition).as(OriginalType.DATE).named(name);}/**
     * 支持数据类型:Array,List
     *
     * @param subType: 表示数据类型如 stringTypeInfo("array_element",Repetition.OPTIONAL)
     *                 在使用的时候只要修改一下数组的数据类型即可,后面参数保持一致
     */publicstaticGroupTypearrayTypeInfo(finalString name,Repetition repetition,Type subType){returnnewGroupType(repetition, name,OriginalType.LIST,newGroupType(Repetition.REPEATED,ParquetHiveSerDe.ARRAY.toString(), subType));}/**
     * 支持数据类型:Map
     *
     * @param keyType   表示键数据类型如 stringTypeInfo("key",Repetition.OPTIONAL)
     * @param valueType 表示值数据类型如 stringTypeInfo("value",Repetition.OPTIONAL)
     *                  在使用的时候只要修改一下数组的数据类型即可,后面参数保持一致
     */publicstaticGroupTypemapTypeInfo(String name,Type keyType,Type valueType){returnConversionPatterns.mapType(Repetition.OPTIONAL, name, keyType, valueType);}/**
     * 支持数据类型:Struct
     *
     * @param types 如:new ArrayList(stringTypeInfo("name",Repetition.OPTIONAL), intTypeInfo("age",Repetition.OPTIONAL));
     */publicstaticGroupTypestructTypeInfo(finalString name,List<Type> types){returnnewGroupType(Repetition.OPTIONAL, name, types);}}

其中Category数据类别

publicstaticenumCategory{
        PRIMITIVE,//原始数据或是基础数据类别
        LIST,//数组
        MAP,//数据 key,value形式的
        STRUCT,//数据是
        UNION;privateCategory(){}}

测试数据示例

publicMessageTypeparquetSchema(){List<
  • 作者:swg321321
  • 原文链接:https://blog.csdn.net/swg321321/article/details/124998364
    更新时间:2022-09-01 14:45:32