
需要通過Java程序提交Yarn的MapReduce的計算任務。與一般的通過Jar包提交MapReduce任務不同,通過程序提交MapReduce任務需要有點小變動,詳見以下代碼。
以下為MapReduce主程序,有幾點需要提一下:
1、在程序中,我將文件讀入格式設定為WholeFileInputFormat,即不對文件進行切分。
2、為了控制reduce的處理過程,map的輸出鍵的格式為組合鍵格式。與常規的<key,value>不同,這里變為了<textpair,value>,TextPair的格式為<key1,key2>。
3、為了適應組合鍵,重新設定了分組函數,即GroupComparator。分組規則為,只要TextPair中的key1相同(不要求key2相同),則數據被分配到一個reduce容器中。這樣,當相同key1的數據進入reduce容器后,key2起到了一個數據標識的作用。
package web.Hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import util.Utils;
public class GEMIMain {
public GEMIMain(){
job = null;
}
public Job job;
public static class NamePartitioner extends
Partitioner<textpair, byteswritable=""> {
@Override
public int getPartition(TextPair key, BytesWritable value,
int numPartitions) {
return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;
}
}
/**
* 分組設置類,只要兩個TextPair的第一個key相同,他們就屬于同一組。他們的Value就放到一個Value迭代器中,
* 然后進入Reducer的reduce方法中。
*
* @author hduser
*
*/
public static class GroupComparator extends WritableComparator {
public GroupComparator() {
super(TextPair.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
TextPair t1 = (TextPair) a;
TextPair t2 = (TextPair) b;
// 比較相同則返回0,比較不同則返回-1
return t1.getFirst().compareTo(t2.getFirst()); // 只要是第一個字段相同的就分成為同一組
}
}
public boolean runJob(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 在conf中設置outputath變量,以在reduce函數中可以獲取到該參數的值
conf.set("outputPath", args[args.length - 1].toString());
//設置HDFS中,每次任務生成產品的質量文件所在文件夾。args數組的倒數第二個原數為質量文件所在文件夾
conf.set("qualityFolder", args[args.length - 2].toString());
//如果在Server中運行,則需要獲取web項目的根路徑;如果以java應用方式調試,則讀取/opt/hadoop-2.5.0/etc/hadoop/目錄下的配置文件
//MapReduceProgress mprogress = new MapReduceProgress();
//String rootPath= mprogress.rootPath;
String rootPath="/opt/hadoop-2.5.0/etc/hadoop/";
conf.addResource(new Path(rootPath+"yarn-site.xml"));
conf.addResource(new Path(rootPath+"core-site.xml"));
conf.addResource(new Path(rootPath+"hdfs-site.xml"));
conf.addResource(new Path(rootPath+"mapred-site.xml"));
this.job = new Job(conf);
job.setJobName("Job name:" + args[0]);
job.setJarByClass(GEMIMain.class);
job.setMapperClass(GEMIMapper.class);
job.setMapOutputKeyClass(TextPair.class);
job.setMapOutputValueClass(BytesWritable.class);
// 設置partition
job.setPartitionerClass(NamePartitioner.class);
// 在分區之后按照指定的條件分組
job.setGroupingComparatorClass(GroupComparator.class);
job.setReducerClass(GEMIReducer.class);
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
// job.setOutputKeyClass(NullWritable.class);
// job.setOutputValueClass(Text.class);
job.setNumReduceTasks(8);
// 設置計算輸入數據的路徑
for (int i = 1; i < args.length - 2; i++) {
FileInputFormat.addInputPath(job, new Path(args[i]));
}
// args數組的最后一個元素為輸出路徑
FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1]));
boolean flag = job.waitForCompletion(true);
return flag;
}
@SuppressWarnings("static-access")
public static void main(String[] args) throws ClassNotFoundException,
IOException, InterruptedException {
String[] inputPaths = new String[] { "normalizeJob",
"hdfs://192.168.168.101:9000/user/hduser/red1/",
"hdfs://192.168.168.101:9000/user/hduser/nir1/","quality11111",
"hdfs://192.168.168.101:9000/user/hduser/test" };
GEMIMain test = new GEMIMain();
boolean result = test.runJob(inputPaths);
}
}
以下為TextPair類
public class TextPair implements WritableComparable {
private Text first;
private Text second;
public TextPair() {
set(new Text(), new Text());
}
public TextPair(String first, String second) {
set(new Text(first), new Text(second));
}
public TextPair(Text first, Text second) {
set(first, second);
}
public void set(Text first, Text second) {
this.first = first;
this.second = second;
}
public Text getFirst() {
return first;
}
public Text getSecond() {
return second;
}
@Override
public void write(DataOutput out) throws IOException {
first.write(out);
second.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
first.readFields(in);
second.readFields(in);
}
@Override
public int hashCode() {
return first.hashCode() * 163 + second.hashCode();
}
@Override
public boolean equals(Object o) {
if (o instanceof TextPair) {
TextPair tp = (TextPair) o;
return first.equals(tp.first) && second.equals(tp.second);
}
return false;
}
@Override
public String toString() {
return first + "\t" + second;
}
@Override
/**A.compareTo(B)
* 如果比較相同,則比較結果為0
* 如果A大于B,則比較結果為1
* 如果A小于B,則比較結果為-1
*
*/
public int compareTo(TextPair tp) {
int cmp = first.compareTo(tp.first);
if (cmp != 0) {
return cmp;
}
//此時實現的是升序排列
return second.compareTo(tp.second);
}
}
以下為WholeFileInputFormat,其控制數據在mapreduce過程中不被切分
package web.hadoop;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class WholeFileInputFormat extends FileInputFormat<text, byteswritable=""> {
@Override
public RecordReader<text, byteswritable=""> createRecordReader(
InputSplit arg0, TaskAttemptContext arg1) throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return new WholeFileRecordReader();
}
@Override
protected boolean isSplitable(JobContext context, Path filename) {
// TODO Auto-generated method stub
return false;
}
}
以下為WholeFileRecordReader類
package web.hadoop;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class WholeFileRecordReader extends RecordReader<text, byteswritable=""> {
private FileSplit fileSplit;
private FSDataInputStream fis;
private Text key = null;
private BytesWritable value = null;
private boolean processed = false;
@Override
public void close() throws IOException {
// TODO Auto-generated method stub
// fis.close();
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return this.key;
}
@Override
public BytesWritable getCurrentValue() throws IOException,
InterruptedException {
// TODO Auto-generated method stub
return this.value;
}
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)
throws IOException, InterruptedException {
fileSplit = (FileSplit) inputSplit;
Configuration job = tacontext.getConfiguration();
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(job);
fis = fs.open(file);
}
@Override
public boolean nextKeyValue() {
if (key == null) {
key = new Text();
}
if (value == null) {
value = new BytesWritable();
}
if (!processed) {
byte[] content = new byte[(int) fileSplit.getLength()];
Path file = fileSplit.getPath();
System.out.println(file.getName());
key.set(file.getName());
try {
IOUtils.readFully(fis, content, 0, content.length);
// value.set(content, 0, content.length);
value.set(new BytesWritable(content));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
IOUtils.closeStream(fis);
}
processed = true;
return true;
}
return false;
}
@Override
public float getProgress() throws IOException, InterruptedException {
// TODO Auto-generated method stub
return processed ? fileSplit.getLength() : 0;
}
}
數據分析咨詢請掃描二維碼
若不方便掃碼,搜微信號:CDAshujufenxi
CDA數據分析師證書考試體系(更新于2025年05月22日)
2025-05-26解碼數據基因:從數字敏感度到邏輯思維 每當看到超市貨架上商品的排列變化,你是否會聯想到背后的銷售數據波動?三年前在零售行 ...
2025-05-23在本文中,我們將探討 AI 為何能夠加速數據分析、如何在每個步驟中實現數據分析自動化以及使用哪些工具。 數據分析中的AI是什么 ...
2025-05-20當數據遇見人生:我的第一個分析項目 記得三年前接手第一個數據分析項目時,我面對Excel里密密麻麻的銷售數據手足無措。那些跳動 ...
2025-05-20在數字化運營的時代,企業每天都在產生海量數據:用戶點擊行為、商品銷售記錄、廣告投放反饋…… 這些數據就像散落的拼圖,而相 ...
2025-05-19在當今數字化營銷時代,小紅書作為國內領先的社交電商平臺,其銷售數據蘊含著巨大的商業價值。通過對小紅書銷售數據的深入分析, ...
2025-05-16Excel作為最常用的數據分析工具,有沒有什么工具可以幫助我們快速地使用excel表格,只要輕松幾步甚至輸入幾項指令就能搞定呢? ...
2025-05-15數據,如同無形的燃料,驅動著現代社會的運轉。從全球互聯網用戶每天產生的2.5億TB數據,到制造業的傳感器、金融交易 ...
2025-05-15大數據是什么_數據分析師培訓 其實,現在的大數據指的并不僅僅是海量數據,更準確而言是對大數據分析的方法。傳統的數 ...
2025-05-14CDA持證人簡介: 萬木,CDA L1持證人,某電商中廠BI工程師 ,5年數據經驗1年BI內訓師,高級數據分析師,擁有豐富的行業經驗。 ...
2025-05-13CDA持證人簡介: 王明月 ,CDA 數據分析師二級持證人,2年數據產品工作經驗,管理學博士在讀。 學習入口:https://edu.cda.cn/g ...
2025-05-12CDA持證人簡介: 楊貞璽 ,CDA一級持證人,鄭州大學情報學碩士研究生,某上市公司數據分析師。 學習入口:https://edu.cda.cn/g ...
2025-05-09CDA持證人簡介 程靖 CDA會員大咖,暢銷書《小白學產品》作者,13年頂級互聯網公司產品經理相關經驗,曾在百度、美團、阿里等 ...
2025-05-07相信很多做數據分析的小伙伴,都接到過一些高階的數據分析需求,實現的過程需要用到一些數據獲取,數據清洗轉換,建模方法等,這 ...
2025-05-06以下的文章內容來源于劉靜老師的專欄,如果您想閱讀專欄《10大業務分析模型突破業務瓶頸》,點擊下方鏈接 https://edu.cda.cn/g ...
2025-04-30CDA持證人簡介: 邱立峰 CDA 數據分析師二級持證人,數字化轉型專家,數據治理專家,高級數據分析師,擁有豐富的行業經驗。 ...
2025-04-29CDA持證人簡介: 程靖 CDA會員大咖,暢銷書《小白學產品》作者,13年頂級互聯網公司產品經理相關經驗,曾在百度,美團,阿里等 ...
2025-04-28CDA持證人簡介: 居瑜 ,CDA一級持證人國企財務經理,13年財務管理運營經驗,在數據分析就業和實踐經驗方面有著豐富的積累和經 ...
2025-04-27數據分析在當今信息時代發揮著重要作用。單因素方差分析(One-Way ANOVA)是一種關鍵的統計方法,用于比較三個或更多獨立樣本組 ...
2025-04-25CDA持證人簡介: 居瑜 ,CDA一級持證人國企財務經理,13年財務管理運營經驗,在數據分析就業和實踐經驗方面有著豐富的積累和經 ...
2025-04-25