MapReduce
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/*
参数: 数据类型:
KEYIN: 表示当前行数在文件中的偏移量 LongWritable
VALUEIN:表示一行数据 Text
KEYOUT: 表示一个单词 Text
VALUEOUT:针对当前单词的标记 IntWritable
*/
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
private Text outk = new Text();
private IntWritable outv = new IntWritable(1);
// 重写map方法写业务逻辑
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 获取当前行数据
String lineData = value.toString(); // value 没有split方法,字符串有
// 按业务要求切割字符串
String[] datas = lineData.split(" ");
// 输出 key value context.write() 数据类型准话-->封装对象
for (String data : datas) {
outk.set(data);
context.write(outk,outv);
}
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
private IntWritable outv = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
// 遍历相同key的一组values 进行累加
for (IntWritable value : values) {
// 注意数据类型的匹配 用 .get() 方法可以获取当前对象的int值
sum += value.get();
}
outv.set(sum);
context.write(key,outv);
}
}
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
// 提交job set基本环境
public class WordCountDriver {
// 程序的入口
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
// 声明job对象
Job job = Job.getInstance(conf);
// 针对job做内容的配置
// 1.指定当前job的mapper和reducer
job.setMapperClass(WordCountMapper.class); // 通过反射获取实例对象
job.setReducerClass(WordCountReduce.class);
// 2.指定当前job的map阶段输出的key 和 value 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 3.指定当前job的最终输出的key 和 value的类型
job.setOutputValueClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 4.指定job的输入输出路径
FileInputFormat.addInputPath(job,new Path(""));
FileOutputFormat.setOutputPath(job,new Path(""));
// 提交job
job.waitForCompletion(true); // 全程监控
}
}
切片(128m)--> MapTask (1.整行读入2.按split切分3.KV形式存入Map) --> 分区(根据业务需求,分区需要将结果溢写到磁盘)
--> Reduce(根据分区去拉取Map中的数据,一个分区对应一个Reduce) --> 各分区聚合数据 -->输出到文件(落盘)
MapTask ReduceTask 分区 排序 衔接 Task即为JAVA进程
一行调用一次:Map(序列号,一行内容)每个<k,v>调用一次
ReduceTask每一组相同的Key调用一次
12.MapReduce代码
Mapper Reduce Driver
施工中。。。