1、hadoop文本词频排序实验报告 大数据技术概论实验报告 文 本 词 频 排 序 姓名: 郭利强 专业: 工程管理专业 学号: 2015E8009064028 1. 实验要求 3 2. 环境说明 3 2.1 系统硬件 3 2.2 系统软件 3 2.3 安装及配置 3 3. 实验设计 10 3.1 设计思路 10 3.2 算法设计 10 3.3 程序
2、和类的设计 11 4. 程序代码 16 4.1 WordCount.java代码 16 4.2 Pair.java代码 19 5. 实验输入和结果 20 5.1 实验输入 20 5.2 实验输出 21 5.3 实验结果分析 23 1. 实验要求 在Eclipse环境下编写WordCount程序,统计所有出现次数k次以上的单词计数,最后的结果按照词频从高到低排序输出。 2. 环境说明 2.1 系统硬件 处理器:Intel Core i3-2350M CPU@2.3GHz×4 内存:2GB 磁盘:60GB 2.2 系统软件 操作系统:Ub
3、untu 14.04 LTS
操作系统类型:32位
Java版本:1.7.0_85
Eclipse版本:3.8
Hadoop插件:hadoop-eclipse-plugin-2.6.0.jar
Hadoop:2.6.1
2.3 安装及配置
1.Hadoop配置
1)core-site.xml
4、mporary directories.
5、
6、0.0.1:50090
7、ty>
3)maprd-site.xml
8、rver IPC host:port
9、lue>127.0.0.1:50030
10、ption>The hostname of the RM.
11、er.aux-services.mapreduce_shuffle.class
12、2
13、lue>inspiron:8031
14、y>
5)slaves
inspiron
2.eclipse配置
1)安装hadoop开发插件。由于本次实验所使用的hadoop版本较新,编译过程中出现问题太多,所以直接使用了官方发布的2.6.0版本的插件,经过测试可以正常使用。将插件复制至eclipse安装目录下的plugins目录下。
2)进入eclipse->window->preferences配置hadoop安装路径
3.新建Hadoop Location
4.配置完成后在Project Explore及Map/Reduce Location窗口可看到如下界面
15、
3. 实验设计
3.1 设计思路
利用MapReduce框架设计,在Map过程将输入文本拆分成单个的单词,并对单词进行初步统计,将单词及词频组合作为Map过程输出的value值,将Map过程的Key值设为统一固定值。在Reduce过程获取Map过程的输出,拆分value值,获取并汇总统计出所有单词的词频,根据设定值k对统计单词进行筛选,将词频高于设定值k的单词和词频以键值对的形式存入某个容器中,然后将容器的对象按照词频从高到低的顺序排序后以单词和词频键值对的形式输出。如此设计,只需要一个MapReduce过程即可完成词频统计并筛选排序输出。
16、3.2 算法设计
1.在Map过程中,重写map类,利用StringTokenizer类,将map方法中的value值中存储的文本,拆分成一个个的单词,将单词进行初步统计,统计得到的结果存入一个Map集合中。遍历Map集合,将所得单词和词频组成一个字符串,作为Map过程输出的value值,并以 17、历存有单词和词频键值对的Map集合,将其中词频大于设定值k的单词和词频存入一个List集合中。
5.利用Collect.sort()重载方法对List集合进行按照词频由高到低顺序的排序。
6.遍历List集合,将经过排序的List集合中存储的单词和词频写入reduce方法的context变量,以单词和词频键值对的形式输出。
3.3 程序和类的设计
1.定义TokenizerMapper类继承org.apache.hadoop.mapreduce包中Mapper类,并重写map方法。然后利用StringTokenizer类,将map方法中的value值中存储的文本,拆分成一 18、个个的单词,进行初步统计后放入Map集合中,遍历Map集合取出单词及对应词频,将单词和词频组合后,以 19、blic void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
Map 20、eNonLetters( itr.nextToken());
if (!word2count.containsKey(nextToken))
word2count.put(nextToken, 0);
word2count.put(nextToken, word2count.get(nextToken) + 1);
}
for (Entry 21、alue());
context.write(mapKey, mapValue);
}
}
//去除拆分后字符串中所含非字母字符
public static String removeNonLetters(String original){
StringBuffer aBuffer=new StringBuffer(original.length());
char aCharacter;
for(int i=0;i 22、 if(Character.isLetter(aCharacter)){
aBuffer.append(aCharacter);
}
}
return new String(aBuffer);
}
}
2.定义IntSumReducer类继承org.apache.hadoop.mapreduce包中Reducer类,对Map过程中发送过来的键值对,拆分value值取出单词及对应词频,进行词频统计,筛选出词频高于设定值的单词,并按照词频从高到低的顺序排序后输出。
public static class IntSumReducer ex 23、tends
Reducer 24、ruptedException {
super.setup(context);
this.k = Integer.parseInt(context.getConfiguration().get("k"));
}
public void reduce(Text key, Iterable 25、eger>();
for (Text val : values) {
String valStr = val.toString();
String[] records = valStr.split("\001");
String word = records[0];
int cnt = Integer.parseInt(records[1]);
if (!word2count.containsKey(word))
word2count.put(word, 0);
word2count.put(word, wor 26、d2count.get(word) + cnt);
}
List 27、ist, new Comparator 28、}
1)重写setup方法,获取设定词频,并将其值赋给已声明的变量k。
protected void setup(
Reducer 29、>中key为统一设定值,舍去不用,而values是[word+split+count]形式的集合,重写reduce方法,遍历values按照设定分隔符拆分后,汇总进行统计,得到某个单词的词频。声明一个Map变量word2count,将统计得到的单词及其词频,以键值对的形式存入word2count中。
Map 30、 records = valStr.split("\001");
String word = records[0];
int cnt = Integer.parseInt(records[1]);
if (!word2count.containsKey(word))
word2count.put(word, 0);
word2count.put(word, word2count.get(word) + cnt);
}
3)声明一个List集合变量list,遍历word2count,根据词频进行筛选,用词频大于k的单词和词频的值 31、初始化新定义的类Pair的对象,然后将对象存入list中。使用Collect.sort()重载方法对list进行排序。
List 32、
Collections.sort(list, new Comparator 33、set(p.getV());
context.write(outputKey, outputValue);
}
3.主方法main,定义Job对象负责管理和运行一个计算任务,并通过Job的一些方法对任务的参数进行相关的设置。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String master="127.0.0.1";
conf.set("fs.defaultFS", "hdfs://12 34、7.0.0.1:9000");
conf.set("hadoop.job.user", "hadoop");
conf.set("mapreduce.framework.name","yarn");
conf.set("yarn.resourcemanager.address", master+":8032");
conf.set("yarn.resourcemanager.scheduler.address", master+":8030");
conf.set("mapred.jar","wordcount.jar");
S 35、tring[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length < 3) {
System.err.println("Usage: wordcount 36、etReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 获取设定值K
for (int i = 0; i < otherArgs.length - 2; ++i) {
(job, new Path(otherArgs[i]));
}
job.getConfiguration().set("k", otherArgs[otherArgs.length - 1]);
(job, ne 37、w Path(
otherArgs[otherArgs.length - 2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
1)定义Configuration对象conf,设置配置信息后,使用conf对象初始化Job对象。
Configuration conf = new Configuration();
String master="127.0.0.1";
conf.set("fs.defaultFS", "hdfs://127.0.0.1:9000");
conf.s 38、et("hadoop.job.user", "hadoop");
conf.set("mapreduce.framework.name","yarn");
conf.set("yarn.resourcemanager.address", master+":8032");
conf.set("yarn.resourcemanager.scheduler.address", master+":8030");
conf.set("mapreduce.jobhistory.address", master+":10020");
conf.se 39、t("mapred.jar","wordcount.jar");
String[] otherArgs = new GenericOptionsParser(conf, args)
.getRemainingArgs();
if (otherArgs.length < 3) {
System.err.println("Usage: wordcount 40、lass(TokenizerMapper.class)方法设置TokenizerMapper.class作为map过程处理类,调用setReducerClass(IntSumReducer.class)方法设置IntSumReducer.class作为reduce过程处理类。
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
3)调用setOutputKeyClass(Text.class)方法和setOutputValueClass(Text.class)方法设 41、置Job输出结果数据类型。
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
4)调用job.getConfiguration().set()方法获取词频设定参数k
job.getConfiguration().set("k", otherArgs[otherArgs.length - 1]);
5)调用()方法和()分别设置获取并设置输入输出路径。
(job, new Path(otherArgs[i]));
(job, new Path( otherArgs[otherAr 42、gs.length - 2]));
6)调用job.waitForCompletion()方法执行任务
System.exit(job.waitForCompletion(true) ? 0 : 1);
4. 程序代码
4.1 WordCount.java代码
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this 43、 work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this in compliance
* with the License. You may obtain a copy of the License at
*
*
*
* Unless require 44、d by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the L 45、icense.
*/
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.StringTokenizer;
import org.apache.hadoop.co 46、nf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.;
import org.apache.hadoop.m 47、apreduce.lib.output.;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.Path;
public class WordCount {
public static class TokenizerMapper extends
Mapper 48、y = new Text("key");
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
Map 49、
String nextToken =removeNonLetters( itr.nextToken());
if (!word2count.containsKey(nextToken))
word2count.put(nextToken, 0);
word2count.put(nextToken, word2count.get(nextToken) + 1);
}
for (Entry 50、tKey() + "\001" + entry.getValue());
context.write(mapKey, mapValue);
}
}
public static String removeNonLetters(String original){
StringBuffer aBuffer=new StringBuffer(original.length());
char aCharacter;
for(int i=0;i






