在mapreduce中使用Parquet,根据不同的序列化方式,有不同的选择,下面以Avro为例:
使用 AvroParquetInputFormat 和 AvroParquetOutputFormat
@Override public int run(String[] strings)throws Exception { Path inputPath =new Path(strings[0]); Path outputPath =new Path(strings[1]); Job job = Job.getInstance(getConf(),"AvroParquetMapReduce"); job.setJarByClass(getClass()); job.setInputFormatClass(AvroParquetInputFormat.class); AvroParquetInputFormat.setInputPaths(job,inputPath); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputFormatClass(AvroParquetOutputFormat.class); FileOutputFormat.setOutputPath(job,outputPath); AvroParquetOutputFormat.setSchema(job,StockAvg.SCHEMA$);return job.waitForCompletion(true) ?0 :1; } staticclassMapextendsMapper<Void,Stock,Text,DoubleWritable>{@Overrideprotected void map(Void key, Stock value, Context context)throws IOException, InterruptedException { context.write(new Text(value.getSymbol().toString()),new DoubleWritable(value.getOpen())); } } staticclassReduceextendsReducer<Text,DoubleWritable,Void,StockAvg> {@Overrideprotected void reduce(Text key, Iterable<DoubleWritable> values, Context context)throws IOException, InterruptedException { Mean mean =new Mean();for (DoubleWritableval :values){ mean.increment(val.get()); } StockAvg avg =new StockAvg(); avg.setSymbol(key.toString()); avg.setAvg(mean.getResult()); context.write(null,avg); } }
这里的输入输出都是 Parquet文件。如果向输入是Text文件,只要不设置InputFormatClass即可。
如果改变input schema文件,Avro不能加载具体的class,会强制使用GenericData代替。
publicclassAvroProjectionParquetMapReduceextendsConfiguredimplementsTool {publicstaticvoid main(String[] args) throws Exception { args =new String[2]; args[0] ="hdfs://hadoop:9000/user/madong/parquet-input"; args[1] ="hdfs://hadoop:9000/user/madong/parquet-output";int code = ToolRunner.run(new AvroProjectionParquetMapReduce(),args); System.exit(code); } @Overridepublicint run(String[] strings) throws Exception { Path inputPath =new Path(strings[0]); Path outputPath =new Path(strings[1]); Job job = Job.getInstance(getConf(),"AvroProjectionParquetMapReduce"); job.setJarByClass(AvroProjectionParquetMapReduce.class); job.setInputFormatClass(AvroParquetInputFormat.class); AvroParquetInputFormat.setInputPaths(job, inputPath);// predicate pushdown AvroParquetInputFormat.setUnboundRecordFilter(job, GoogleStockFilter.class);// projection pushdown Schema projection = Schema.createRecord(Stock.SCHEMA$.getName(), Stock.SCHEMA$.getDoc(), Stock.SCHEMA$.getNamespace(),false); List<Schema.Field> fields = Lists.newArrayList();for (Schema.Field field : Stock.SCHEMA$.getFields()) {if ("symbol".equals(field.name()) ||"open".equals(field.name())) { fields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultValue(), field.order())); } } projection.setFields(fields); AvroParquetInputFormat.setRequestedProjection(job, projection); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DoubleWritable.class); job.setOutputFormatClass(AvroParquetOutputFormat.class); FileOutputFormat.setOutputPath(job, outputPath); AvroParquetOutputFormat.setSchema(job, StockAvg.SCHEMA$);return job.waitForCompletion(true) ?0 :1; }publicstaticclassGoogleStockFilterimplementsUnboundRecordFilter {privatefinal UnboundRecordFilter filter;public GoogleStockFilter() { filter = ColumnRecordFilter.column("symbol", ColumnPredicates.equalTo("GOOG")); } @Overridepublic RecordFilter bind(Iterable<ColumnReader> readers) {return filter.bind(readers); } }staticclassMapextendsMapper<Void,Stock,Text,DoubleWritable> { @Overrideprotectedvoid map(Void key, Stock value, Context context) throws IOException, InterruptedException {if (value !=null) { context.write(new Text(value.getSymbol().toString()),new DoubleWritable(value.getOpen())); } } }staticclassReduceextendsReducer<Text,DoubleWritable,Void,StockAvg> { @Overrideprotectedvoid reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException { Mean mean =new Mean();for (DoubleWritable val :values){ mean.increment(val.get()); } StockAvgavg =new StockAvg();avg.setSymbol(key.toString());avg.setAvg(mean.getResult()); context.write(null,avg); } } }