学习目标
理解mapreduce执行原理
掌握mapreduce程序开发技术
熟悉mapreduce作业提交流程
知识点
1、准备数据文件
2、mapreduce程序编写
3、程序测试及运行
知识回顾
MapReducer的主要过程主要分为map阶段与Reduce阶段,首先通过Map读取HDFS中的数据,然后经过拆分,将每个文件中的每行数据分拆成键值对,最后输出作为Reduce的输入,通过Reduce进行数据逻辑上的处理。
编写一个mapreduce程序进行wordcount统计,其中一个map类继承了Mapper类,一个reduce类继承了Reducer类,还有一个主类用来提交程序
对原始数据进行处理,把文档中所有的英文单词进行统计所有单词的个数。
首先对待处理的信息进行拆分,拆分之后在map阶段,拆分后计算出单词个数并作为map方法的输出值,而map的方法输出键作为NullWritable即可,最后在reduce阶段对每个键的值集合进行遍历并把遍历的值进行相加,输出结果即可。
环境资源
硬件:Ubuntu16.04学习目标
掌握去重的原理并使用MapReduce进行编程
知识点
1、启动Hadoop服务并查看处理数据
2、程序编写
知识回顾
目标:原始数据中出现次数超过一次的数据在输出文件中只出现一次。
算法思想:根据reduce的过程特性,会自动根据key来计算输入的value集合,把数据作为key输出给reduce,无论这个数据出现多少次,reduce最终结果中key只能输出一次。
1.实例中每个数据代表输入文件中的一行内容,map阶段采用Hadoop默认的作业输入方式。将value设置为key,并直接输出。 map输出数据的key为数据,将value设置成空值
2.在MapReduce流程中,map的输出<key,value>
经过shuffle过程聚集成<key,value-list>
后会交给reduce
3.reduce阶段不管每个key有多少个value,它直接将输入的key复制为输出的key,并输出(输出中的value被设置成空)。
环境资源
硬件:Ubuntu16.04
软件:JDK-1.8、Hadoop-3.3、Eclipse
数据存放路径:/data/dataset
tar包路径:/data/software
tar包压缩路径:/data/bigdata
软件安装路径:/opt
实验设计创建文件:/data/resource
操作步骤
启动Hadoop集群
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
1、在终端窗口中,执行如下命令,启动HDFS集群。
$ cd /opt/hadoop-3.3.0/sbin/
$ ./start-all.sh
2、在终端窗口中,执行如下命令,查看HDFS服务启动情况:
$ jps
准备数据文件
1、查看源数据文件内容。在终端窗口中,执行如下命令:
$ cat /data/dataset/Deduplicationinfo.txt
可以看到,文件内容如下:
2012-3-1 a
2012-3-2 b
2012-3-3 c
2012-3-4 d
2012-3-5 a
2012-3-6 b
2012-3-7 c
2012-3-3 c
2012-3-1 b
2012-3-2 a
2012-3-3 b
2012-3-4 d
2012-3-5 a
2012-3-7 d
2012-3-3 c
2、将数据源文件上传至HDFS的根目录下。在终端窗口中,执行如下命令:
$ hdfs dfs -put /data/dataset/Deduplicationinfo.txt /
创建Map/Reduce项目
1、启动开发工具Eclipse。
2、打开eclipse开发工具后,在菜单栏中,选择【File】->【New】->【Project】->【Java project】创建Java项目并命名为【Hadoop3Demo】,点击【Finish】完成创建,如下图所示:
3、导入hadoop相关的jar包,首先右击项目选择【New】—>【Folder】创建一个【lib】文件夹并把指定位置中(/data/software/hadoop3_lib/)的包放入该文件中。如下图所示:
4、把lib下所有的jar包导入到环境变量,首先全选【lib】文件夹下的jar包文件,右键点击,选择【build path】->【add to build path】,添加后,发现在项目下多一个列表项【Referenced Libraries】。如下图所示:
编写MapReduce程序
1、在项目src目录下,右键点击,选择【New】->【Class】创建一个类文件名称为”com.simple.DeduplicationMapper”。
2、让类【DeduplicationMapper】继承类Mapper同时指定需要的参数类型,根据业务逻辑修改map类的内容如下。
package com.simple;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class DeduplicationMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//按行读取信息并作为mapper的输出键,mapper的输出值置为空文本即可
Text line = value;
context.write(line, new Text(""));
}
}
3、在项目src目录下右键点击,新建一个类名为”com.simple.DeduplicationReducer”并继承Reducer类,然后添加该类中的代码内容如下所示。
package com.simple;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class DeduplicationReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> value, Context context)
throws IOException, InterruptedException {
//Reducer阶段直接按键输出即可,键直接可以实现去重
context.write(key, new Text(""));
}
}
4、在项目src目录下右键点击,新建一个测试主类名为”com.simple.TestDeduplication”并指定main主方法,测试代码如下所示:
package com.simple;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TestDeduplication {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
//获取作业对象
Job job = Job.getInstance(conf);
//设置主类
job.setJarByClass(TestDeduplication.class);
//设置job参数
job.setMapperClass(DeduplicationMapper.class);
job.setReducerClass(DeduplicationReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//设置job输入输出
FileInputFormat.addInputPath(job, new Path("/Deduplicationinfo.txt"));
FileOutputFormat.setOutputPath(job, new Path("/simple/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
程序测试及运行
1、在”WordCountDriver”类文件的任意空白处,单击右键,在弹出的环境菜单中,选择”【Run As】->【Java Application】”菜单项,运行程序。操作如下图所示:
程序运行后,控制台打印如下图所示,且无错误日志产生,程序运行完毕。
2、程序执行完毕之后,查看对数据处理后产生的结果。如下图所示:
软件:JDK-1.8、Hadoop-3.3、Eclipse
数据存放路径:/data/dataset
tar包路径:/data/software
tar包压缩路径:/data/bigdata
软件安装位置:/opt
实验设计创建文件:/data/resource
操作步骤
启动Hadoop集群
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
1、在终端窗口中,执行如下命令,启动HDFS集群。
$ cd /opt/hadoop-3.3.0/sbin/
$ ./start-all.sh
2、在终端窗口中,执行如下命令,查看HDFS服务启动情况:
$ jps
准备数据文件
1、编辑数据文件。在终端窗口中,执行如下命令,编辑数据文件”word.txt”:
$ cd /data/dataset/
$ vi word.txt
在”word.txt”文件中,输入如下内容,单词间以空格分隔:
good good study
day day up
保存并退出文件编辑。
2、将数据文件”word.txt”上传至HDFS的根目录下。在终端窗口中,执行如下命令:
$ hdfs dfs -put /data/dataset/word.txt /
创建Map/Reduce项目
1、启动开发工具Eclipse。
2、打开eclipse开发工具后,在菜单栏中,选择【File】->【New】->【Project】->【Java project】创建Java项目并命名为【Hadoop3Demo】,点击【Finish】完成创建,如下图所示:
3、导入hadoop相关的jar包,首先右击项目选择【New】—>【Folder】创建一个【lib】文件夹并把指定位置中(/data/software/hadoop3_lib/)的包放入该文件中。如下图所示:
4、把lib下所有的jar包导入到环境变量,首先全选【lib】文件夹下的jar包文件,右键点击,选择【build path】->【add to build path】,添加后,发现在项目下多一个列表项【Referenced Libraries】。如下图所示:
编写MapReduce程序,统计”word.txt”文本中的单词出现频率
1、编写Maper类,完成对单词的切分处理,并以(k,v)的形式输出到Reduce阶段
在项目【src】目录上,单击右键,创建名为”com.simple.WordCountMapper”的Java类,并编辑源代码如下:
package com.simple;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer token = new StringTokenizer(line);
while (token.hasMoreTokens()) {
word.set(token.nextToken());
context.write(word, one);
}
}
}
2、编写WordCountReducer类代码,实现对单词个数的统计。
在项目【src】目录上,单击右键,创建名为”com.simple.WordCountReducer”的Java类,并编辑源代码如下:
package com.simple;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
context.write(key, new IntWritable(sum));
}
}
3、创建驱动程序类WordCountDriver,提交和运行作业。
在项目【src】目录上,单击右键,创建名为”com.simple.WordCountDriver”的Java类,并编辑源代码如下:
package com.simple;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.WordCount.TokenizerMapper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
public class WordCountDriver {
public static void main(String[] args) throws Exception {
final String hdfsurl = "hdfs://localhost:9000";
// 组织一个job,并提交
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
// 如果map输出的中间结果类型,与reduce输出的结果类型相同时,可省略map的输出类型设置
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定要处理的输入数据文件的路径,执行时传入的第一个参数指定
FileInputFormat.addInputPath(job, new Path(hdfsurl+"/word.txt"));
// 指定最后reducer输出结果保存的文件路径,执行时传入的第二个参数指定
FileOutputFormat.setOutputPath(job, new Path(hdfsurl+"/word-output"));
// 参数true:是否在控制台打印执行过程的详细信息
boolean flag = job.waitForCompletion(false);
System.exit(flag?0:1);
}
}
程序测试及运行
1、在”WordCountDriver”类文件的任意空白处,单击右键,在弹出的环境菜单中,选择”【Run As】->【Java Application】”菜单项,运行程序。操作如下图所示:
2、如果一切正常,则可以在HDFS上查看统计的结果文件。在终端窗口中,执行如下命令:
$ hdfs dfs -cat /word-output/part-r-00000
可以看到单词计数的结果如下:
day 2
good 2
study 1
up 1