Parquet 之mapreduce

2022年9月20日09:14:36

在mapreduce中使用Parquet,根据不同的序列化方式,有不同的选择,下面以Avro为例:
使用 AvroParquetInputFormat 和 AvroParquetOutputFormat

@Override
    public int run(String[] strings)throws Exception {


        Path inputPath =new Path(strings[0]);
        Path outputPath =new Path(strings[1]);

        Job job = Job.getInstance(getConf(),"AvroParquetMapReduce");
        job.setJarByClass(getClass());

        job.setInputFormatClass(AvroParquetInputFormat.class);
        AvroParquetInputFormat.setInputPaths(job,inputPath);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setOutputFormatClass(AvroParquetOutputFormat.class);
        FileOutputFormat.setOutputPath(job,outputPath);
        AvroParquetOutputFormat.setSchema(job,StockAvg.SCHEMA$);return job.waitForCompletion(true) ?0 :1;
    }

    staticclassMapextendsMapper<Void,Stock,Text,DoubleWritable>{@Overrideprotected void map(Void key, Stock value, Context context)throws IOException, InterruptedException {
            context.write(new Text(value.getSymbol().toString()),new DoubleWritable(value.getOpen()));
        }
    }

    staticclassReduceextendsReducer<Text,DoubleWritable,Void,StockAvg> {@Overrideprotected void reduce(Text key, Iterable<DoubleWritable> values, Context context)throws IOException, InterruptedException {
            Mean mean =new Mean();for (DoubleWritableval :values){
                mean.increment(val.get());
            }

            StockAvg avg =new StockAvg();
            avg.setSymbol(key.toString());
            avg.setAvg(mean.getResult());
            context.write(null,avg);
        }
    }

这里的输入输出都是 Parquet文件。如果向输入是Text文件,只要不设置InputFormatClass即可。

如果改变input schema文件,Avro不能加载具体的class,会强制使用GenericData代替。

publicclassAvroProjectionParquetMapReduceextendsConfiguredimplementsTool {publicstaticvoid main(String[] args) throws Exception {
        args =new String[2];

        args[0] ="hdfs://hadoop:9000/user/madong/parquet-input";
        args[1] ="hdfs://hadoop:9000/user/madong/parquet-output";int code = ToolRunner.run(new AvroProjectionParquetMapReduce(),args);
        System.exit(code);
    }

    @Overridepublicint run(String[] strings) throws Exception {
        Path inputPath =new Path(strings[0]);
        Path outputPath =new Path(strings[1]);


        Job job = Job.getInstance(getConf(),"AvroProjectionParquetMapReduce");
        job.setJarByClass(AvroProjectionParquetMapReduce.class);

        job.setInputFormatClass(AvroParquetInputFormat.class);
        AvroParquetInputFormat.setInputPaths(job, inputPath);// predicate pushdown
        AvroParquetInputFormat.setUnboundRecordFilter(job, GoogleStockFilter.class);// projection pushdown
        Schema projection = Schema.createRecord(Stock.SCHEMA$.getName(),
                Stock.SCHEMA$.getDoc(), Stock.SCHEMA$.getNamespace(),false);
        List<Schema.Field> fields = Lists.newArrayList();for (Schema.Field field : Stock.SCHEMA$.getFields()) {if ("symbol".equals(field.name()) ||"open".equals(field.name())) {
                fields.add(new Schema.Field(field.name(), field.schema(), field.doc(),
                        field.defaultValue(), field.order()));
            }
        }
        projection.setFields(fields);
        AvroParquetInputFormat.setRequestedProjection(job, projection);


        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(DoubleWritable.class);

        job.setOutputFormatClass(AvroParquetOutputFormat.class);
        FileOutputFormat.setOutputPath(job, outputPath);
        AvroParquetOutputFormat.setSchema(job, StockAvg.SCHEMA$);return job.waitForCompletion(true) ?0 :1;
    }publicstaticclassGoogleStockFilterimplementsUnboundRecordFilter {privatefinal UnboundRecordFilter filter;public GoogleStockFilter() {
            filter = ColumnRecordFilter.column("symbol", ColumnPredicates.equalTo("GOOG"));
        }

        @Overridepublic RecordFilter bind(Iterable<ColumnReader> readers) {return filter.bind(readers);
        }
    }staticclassMapextendsMapper<Void,Stock,Text,DoubleWritable> {

        @Overrideprotectedvoid map(Void key, Stock value, Context context) throws IOException, InterruptedException {if (value !=null) {
                context.write(new Text(value.getSymbol().toString()),new DoubleWritable(value.getOpen()));
            }
        }
    }staticclassReduceextendsReducer<Text,DoubleWritable,Void,StockAvg> {

        @Overrideprotectedvoid reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            Mean mean =new Mean();for (DoubleWritable val :values){
                mean.increment(val.get());
            }

            StockAvgavg =new StockAvg();avg.setSymbol(key.toString());avg.setAvg(mean.getResult());
            context.write(null,avg);
        }
    }
}
  • 作者:乄浅醉
  • 原文链接:https://blog.csdn.net/GG584741/article/details/51555847
    更新时间:2022年9月20日09:14:36 ,共 3975 字。