资源描述
hadoop文本词频排序实验报告
大数据技术概论实验报告
文
本
词
频
排
序
姓名: 郭利强
专业: 工程管理专业
学号: 2015E8009064028
1. 实验要求 3
2. 环境说明 3
2.1 系统硬件 3
2.2 系统软件 3
2.3 安装及配置 3
3. 实验设计 10
3.1 设计思路 10
3.2 算法设计 10
3.3 程序和类的设计 11
4. 程序代码 16
4.1 WordCount.java代码 16
4.2 Pair.java代码 19
5. 实验输入和结果 20
5.1 实验输入 20
5.2 实验输出 21
5.3 实验结果分析 23
1. 实验要求
在Eclipse环境下编写WordCount程序,统计所有出现次数k次以上的单词计数,最后的结果按照词频从高到低排序输出。
2. 环境说明
2.1 系统硬件
处理器:Intel Core i3-2350M CPU@2.3GHz×4
内存:2GB
磁盘:60GB
2.2 系统软件
操作系统:Ubuntu 14.04 LTS
操作系统类型:32位
Java版本:1.7.0_85
Eclipse版本:3.8
Hadoop插件:hadoop-eclipse-plugin-2.6.0.jar
Hadoop:2.6.1
2.3 安装及配置
1.Hadoop配置
1)core-site.xml
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value></value>
<description>Abase for other temporary directories.</description>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://inspiron:9000</value>
</property>
</configuration>
2)hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value></value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value></value>
</property>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>127.0.0.1:50090</value>
<description>
The secondary namenode http server address and port.
</description>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
<description>
Enable WebHDFS (REST API) in Namenodes and Datanodes.
</description>
</property>
</configuration>
3)maprd-site.xml
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>127.0.0.1:10020</value>
<description>MapReduce JobHistory Server IPC host:port</description>
</property>
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>127.0.0.1:19888</value>
<description>MapReduce JobHistory Server Web UI host:port</description>
</property>
<property>
<name>mapreduce.jobtracker.http.address</name>
<value>127.0.0.1:50030</value>
<description>
The job tracker http server address and port the server will listen on.
If the port is 0 then the server will start on a free port.
</description>
</property>
</configuration>
4)yarn-site.xml
<configuration>
<property>
<description>The hostname of the RM.</description>
<name>yarn.resourcemanager.hostname</name>
<value>inspiron</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce_shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
<property>
<description>The address of the applications manager interface in the RM.</description>
<name>yarn.resourcemanager.address</name>
<value>inspiron:8032</value>
</property>
<property>
<description>The address of the scheduler interface.</description>
<name>yarn.resourcemanager.scheduler.address</name>
<value>inspiron:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>inspiron:8031</value>
</property>
<property>
<description>The class to use as the resource scheduler.</description>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>
</configuration>
5)slaves
inspiron
2.eclipse配置
1)安装hadoop开发插件。由于本次实验所使用的hadoop版本较新,编译过程中出现问题太多,所以直接使用了官方发布的2.6.0版本的插件,经过测试可以正常使用。将插件复制至eclipse安装目录下的plugins目录下。
2)进入eclipse->window->preferences配置hadoop安装路径
3.新建Hadoop Location
4.配置完成后在Project Explore及Map/Reduce Location窗口可看到如下界面
3. 实验设计
3.1 设计思路
利用MapReduce框架设计,在Map过程将输入文本拆分成单个的单词,并对单词进行初步统计,将单词及词频组合作为Map过程输出的value值,将Map过程的Key值设为统一固定值。在Reduce过程获取Map过程的输出,拆分value值,获取并汇总统计出所有单词的词频,根据设定值k对统计单词进行筛选,将词频高于设定值k的单词和词频以键值对的形式存入某个容器中,然后将容器的对象按照词频从高到低的顺序排序后以单词和词频键值对的形式输出。如此设计,只需要一个MapReduce过程即可完成词频统计并筛选排序输出。
3.2 算法设计
1.在Map过程中,重写map类,利用StringTokenizer类,将map方法中的value值中存储的文本,拆分成一个个的单词,将单词进行初步统计,统计得到的结果存入一个Map集合中。遍历Map集合,将所得单词和词频组成一个字符串,作为Map过程输出的value值,并以<key,word+split+count>形式输出。
2.在Reduce过程中,重写setup方法,获取设定词频。
3.对Map过程输出的<key, word+split+count >形式的键值对,遍历values,拆分并统计出对应单词的词频,以键值对的形式装入一个Map集合中。
4.遍历存有单词和词频键值对的Map集合,将其中词频大于设定值k的单词和词频存入一个List集合中。
5.利用Collect.sort()重载方法对List集合进行按照词频由高到低顺序的排序。
6.遍历List集合,将经过排序的List集合中存储的单词和词频写入reduce方法的context变量,以单词和词频键值对的形式输出。
3.3 程序和类的设计
1.定义TokenizerMapper类继承org.apache.hadoop.mapreduce包中Mapper类,并重写map方法。然后利用StringTokenizer类,将map方法中的value值中存储的文本,拆分成一个个的单词,进行初步统计后放入Map集合中,遍历Map集合取出单词及对应词频,将单词和词频组合后,以<key, word+split+count >的形式作为map方法的结果输出,其余的工作都交由MapReduce框架处理。
public static class TokenizerMapper extends
Mapper<Object, Text, Text, Text> {
private final static Text mapValue = new Text();
private Text mapKey = new Text("key");
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
Map<String, Integer> word2count = new HashMap<String, Integer>();
while (itr.hasMoreTokens()) {
String nextToken =removeNonLetters( itr.nextToken());
if (!word2count.containsKey(nextToken))
word2count.put(nextToken, 0);
word2count.put(nextToken, word2count.get(nextToken) + 1);
}
for (Entry<String, Integer> entry : word2count.entrySet()) {
mapValue.set(entry.getKey() + "\001" + entry.getValue());
context.write(mapKey, mapValue);
}
}
//去除拆分后字符串中所含非字母字符
public static String removeNonLetters(String original){
StringBuffer aBuffer=new StringBuffer(original.length());
char aCharacter;
for(int i=0;i<original.length();i++){
aCharacter=original.charAt(i);
if(Character.isLetter(aCharacter)){
aBuffer.append(aCharacter);
}
}
return new String(aBuffer);
}
}
2.定义IntSumReducer类继承org.apache.hadoop.mapreduce包中Reducer类,对Map过程中发送过来的键值对,拆分value值取出单词及对应词频,进行词频统计,筛选出词频高于设定值的单词,并按照词频从高到低的顺序排序后输出。
public static class IntSumReducer extends
Reducer<Text, Text, Text, IntWritable> {
private IntWritable outputValue = new IntWritable();
private Text outputKey = new Text();
private int k = 0;
@Override
protected void setup(
Reducer<Text, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
super.setup(context);
this.k = Integer.parseInt(context.getConfiguration().get("k"));
}
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Map<String, Integer> word2count = new HashMap<String, Integer>();
for (Text val : values) {
String valStr = val.toString();
String[] records = valStr.split("\001");
String word = records[0];
int cnt = Integer.parseInt(records[1]);
if (!word2count.containsKey(word))
word2count.put(word, 0);
word2count.put(word, word2count.get(word) + cnt);
}
List<Pair> list = new ArrayList<Pair>();
for (Map.Entry<String, Integer> entry : word2count.entrySet()) {
if (entry.getValue() > this.k) {
Pair p = new Pair(entry.getKey(), entry.getValue());
list.add(p);
}
}
Collections.sort(list, new Comparator<Pair>() {
@Override
public int compare(Pair o1, Pair o2) {
return o2.getV().compareTo(o1.getV());
}
});
for (Pair p : list) {
outputKey.set(p.getK());
outputValue.set(p.getV());
context.write(outputKey, outputValue);
}
}
}
1)重写setup方法,获取设定词频,并将其值赋给已声明的变量k。
protected void setup(
Reducer<Text, IntWritable, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
super.setup(context);
this.k = Integer.parseInt(context.getConfiguration().get("k"));
}
2)Map过程输出<key,values>中key为统一设定值,舍去不用,而values是[word+split+count]形式的集合,重写reduce方法,遍历values按照设定分隔符拆分后,汇总进行统计,得到某个单词的词频。声明一个Map变量word2count,将统计得到的单词及其词频,以键值对的形式存入word2count中。
Map<String, Integer> word2count = new HashMap<String, Integer>();
for (Text val : values) {
String valStr = val.toString();
String[] records = valStr.split("\001");
String word = records[0];
int cnt = Integer.parseInt(records[1]);
if (!word2count.containsKey(word))
word2count.put(word, 0);
word2count.put(word, word2count.get(word) + cnt);
}
3)声明一个List集合变量list,遍历word2count,根据词频进行筛选,用词频大于k的单词和词频的值初始化新定义的类Pair的对象,然后将对象存入list中。使用Collect.sort()重载方法对list进行排序。
List<Pair> list = new ArrayList<Pair>();
for (Map.Entry<String, Integer> entry : word2count.entrySet()) {
if (entry.getValue() > this.k) {
Pair p = new Pair(entry.getKey(), entry.getValue());
list.add(p);
}
}
Collections.sort(list, new Comparator<Pair>() {
@Override
public int compare(Pair o1, Pair o2) {
return o2.getV().compareTo(o1.getV());
}
});
4)遍历list,将经过排序的集合中存储的单词和词频写入reduce方法的context变量
for (Pair p : list) {
outputKey.set(p.getK());
outputValue.set(p.getV());
context.write(outputKey, outputValue);
}
3.主方法main,定义Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String master="127.0.0.1";
conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000");
conf.set("hadoop.job.user", "hadoop");
conf.set("mapreduce.framework.name","yarn");
conf.set("yarn.resourcemanager.address", master+":8032");
conf.set("yarn.resourcemanager.scheduler.address", master+":8030");
conf.set("mapred.jar","wordcount.jar");
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length < 3) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 获取设定值K
for (int i = 0; i < otherArgs.length - 2; ++i) {
(job, new Path(otherArgs[i]));
}
job.getConfiguration().set("k", otherArgs[otherArgs.length - 1]);
(job, new Path(
otherArgs[otherArgs.length - 2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
1)定义Configuration对象conf,设置配置信息后,使用conf对象初始化Job对象。
Configuration conf = new Configuration();
String master="127.0.0.1";
conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000");
conf.set("hadoop.job.user", "hadoop");
conf.set("mapreduce.framework.name","yarn");
conf.set("yarn.resourcemanager.address", master+":8032");
conf.set("yarn.resourcemanager.scheduler.address", master+":8030");
conf.set("mapreduce.jobhistory.address", master+":10020");
conf.set("mapred.jar","wordcount.jar");
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length < 3) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
2)调用setMapperClass(TokenizerMapper.class)方法设置TokenizerMapper.class作为map过程处理类,调用setReducerClass(IntSumReducer.class)方法设置IntSumReducer.class作为reduce过程处理类。
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
3)调用setOutputKeyClass(Text.class)方法和setOutputValueClass(Text.class)方法设置Job输出结果数据类型。
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
4)调用job.getConfiguration().set()方法获取词频设定参数k
job.getConfiguration().set("k", otherArgs[otherArgs.length - 1]);
5)调用()方法和()分别设置获取并设置输入输出路径。
(job, new Path(otherArgs[i]));
(job, new Path( otherArgs[otherArgs.length - 2]));
6)调用job.waitForCompletion()方法执行任务
System.exit(job.waitForCompletion(true) ? 0 : 1);
4. 程序代码
4.1 WordCount.java代码
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this in compliance
* with the License. You may obtain a copy of the License at
*
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.;
import org.apache.hadoop.mapreduce.lib.output.;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.Path;
public class WordCount {
public static class TokenizerMapper extends
Mapper<Object, Text, Text, Text> {
private final static Text mapValue = new Text();
private Text mapKey = new Text("key");
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
Map<String, Integer> word2count = new HashMap<String, Integer>();
while (itr.hasMoreTokens()) {
String nextToken =removeNonLetters( itr.nextToken());
if (!word2count.containsKey(nextToken))
word2count.put(nextToken, 0);
word2count.put(nextToken, word2count.get(nextToken) + 1);
}
for (Entry<String, Integer> entry : word2count.entrySet()) {
mapValue.set(entry.getKey() + "\001" + entry.getValue());
context.write(mapKey, mapValue);
}
}
public static String removeNonLetters(String original){
StringBuffer aBuffer=new StringBuffer(original.length());
char aCharacter;
for(int i=0;i<original.length();i++){
aCharacter=original.charA
展开阅读全文