今天就跟大家聊聊有關使用MapReduce怎么實現決策樹算法,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
首先,基于C45決策樹算法實現對應的Mapper算子,相關的代碼如下:
public class MapClass extends MapReduceBase implements Mapper {
private final static IntWritable one = new IntWritable(1);
private Text attValue = new Text();
private int i;
private String token;
public static int no_Attr;
public Split split = null;
public int size_split_1 = 0;
public void configure(JobConf conf){
try {
split = (Split) ObjectSerializable.unSerialize(conf.get("currentsplit"));
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
size_split_1 = Integer.parseInt(conf.get("current_index"));
}
public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter)
throws IOException {
String line = value.toString(); // changing input instance value to
// string
StringTokenizer itr = new StringTokenizer(line);
int index = 0;
String attr_value = null;
no_Attr = itr.countTokens() - 1;
String attr[] = new String[no_Attr];
boolean match = true;
for (i = 0; i < no_Attr; i++) {
attr[i] = itr.nextToken(); // Finding the values of different
// attributes
}
String classLabel = itr.nextToken();
int size_split = split.attr_index.size();
Counter counter = reporter.getCounter("reporter-"+Main.current_index, size_split+" "+size_split_1);
counter.increment(1l);
for (int count = 0; count < size_split; count++) {
index = (Integer) split.attr_index.get(count);
attr_value = (String) split.attr_value.get(count);
if (!attr[index].equals(attr_value)) {
match = false;
break;
}
}
if (match) {
for (int l = 0; l < no_Attr; l++) {
if (!split.attr_index.contains(l)) {
//表示出某個屬性在某個類標簽上出現了一次
token = l + " " + attr[l] + " " + classLabel;
attValue.set(token);
output.collect(attValue, one);
}
else{
}
}
if (size_split == no_Attr) {
token = no_Attr + " " + "null" + " " + classLabel;
attValue.set(token);
output.collect(attValue, one);
}
}
}
}然后,基于C45決策樹算法實現對應的Reducer算子,相關的代碼如下:
public class Reduce extends MapReduceBase implements Reducer {
static int cnt = 0;
ArrayList ar = new ArrayList();
String data = null;
private static int currentIndex;
public void configure(JobConf conf) {
currentIndex = Integer.valueOf(conf.get("currentIndex"));
}
public void reduce(Text key, Iterator values, OutputCollector output,
Reporter reporter) throws IOException {
int sum = 0;
//sum表示按照某個屬性進行劃分的子數據集上的某個類出現的個數
while (values.hasNext()) {
sum += values.next().get();
}
//最后將這個屬性上的取值寫入output中;
output.collect(key, new IntWritable(sum));
String data = key + " " + sum;
ar.add(data);
//將最終結果寫入到文件中;
writeToFile(ar);
ar.add("\n");
}
public static void writeToFile(ArrayList text) {
try {
cnt++;
Path input = new Path("C45/intermediate" + currentIndex + ".txt");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(input, true)));
for (String str : text) {
bw.write(str);
}
bw.newLine();
bw.close();
} catch (Exception e) {
System.out.println("File is not creating in reduce");
}
}
}最后,編寫Main函數,啟動MapReduce作業,需要啟動多趟,代碼如下:
package com.hackecho.hadoop;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.PropertyConfigurator;
import org.dmg.pmml.MiningFunctionType;
import org.dmg.pmml.Node;
import org.dmg.pmml.PMML;
import org.dmg.pmml.TreeModel;
//在這里MapReduce的作用就是根據各個屬性的特征來劃分子數據集
public class Main extends Configured implements Tool {
//當前分裂
public static Split currentsplit = new Split();
//已經分裂完成的集合
public static List splitted = new ArrayList();
//current_index 表示目前進行分裂的位置
public static int current_index = 0;
public static ArrayList ar = new ArrayList();
public static List leafSplits = new ArrayList();
public static final String PROJECT_HOME = System.getProperty("user.dir");
public static void main(String[] args) throws Exception {
//在splitted中已經放入了一個currentsplit了,所以此時的splitted的size大小為1
PropertyConfigurator.configure(PROJECT_HOME + "/conf/log/log4j.properties");
splitted.add(currentsplit);
Path c45 = new Path("C45");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if (fs.exists(c45)) {
fs.delete(c45, true);
}
fs.mkdirs(c45);
int res = 0;
int split_index = 0;
//增益率
double gainratio = 0;
//最佳增益
double best_gainratio = 0;
//熵值
double entropy = 0;
//分類標簽
String classLabel = null;
//屬性個數
int total_attributes = MapClass.no_Attr;
total_attributes = 4;
//分裂的個數
int split_size = splitted.size();
//增益率
GainRatio gainObj;
//產生分裂的新節點
Split newnode;
while (split_size > current_index) {
currentsplit = splitted.get(current_index);
gainObj = new GainRatio();
res = ToolRunner.run(new Configuration(), new Main(), args);
System.out.println("Current NODE INDEX . ::" + current_index);
int j = 0;
int temp_size;
gainObj.getcount();
//計算當前節點的信息熵
entropy = gainObj.currNodeEntophy();
//獲取在當前節點的分類
classLabel = gainObj.majorityLabel();
currentsplit.classLabel = classLabel;
if (entropy != 0.0 && currentsplit.attr_index.size() != total_attributes) {
System.out.println("");
System.out.println("Entropy NOTT zero SPLIT INDEX:: " + entropy);
best_gainratio = 0;
//計算各個屬性的信息增益值
for (j = 0; j < total_attributes; j++) // Finding the gain of
// each attribute
{
if (!currentsplit.attr_index.contains(j)) {
//按照每一個屬性的序號,也就是索引j來計算各個屬性的信息增益
gainratio = gainObj.gainratio(j, entropy);
//找出最佳的信息增益
if (gainratio >= best_gainratio) {
split_index = j;
best_gainratio = gainratio;
}
}
}
//split_index表示在第幾個屬性上完成了分裂,也就是分裂的索引值;
//attr_values_split表示分裂的屬性所取的值的拼接成的字符串;
String attr_values_split = gainObj.getvalues(split_index);
StringTokenizer attrs = new StringTokenizer(attr_values_split);
int number_splits = attrs.countTokens(); // number of splits
// possible with
// attribute selected
String red = "";
System.out.println(" INDEX :: " + split_index);
System.out.println(" SPLITTING VALUES " + attr_values_split);
//根據分裂形成的屬性值的集合將在某個節點上按照屬性值將數據集分成若干類
for (int splitnumber = 1; splitnumber <= number_splits; splitnumber++) {
temp_size = currentsplit.attr_index.size();
newnode = new Split();
for (int y = 0; y < temp_size; y++) {
newnode.attr_index.add(currentsplit.attr_index.get(y));
newnode.attr_value.add(currentsplit.attr_value.get(y));
}
red = attrs.nextToken();
newnode.attr_index.add(split_index);
newnode.attr_value.add(red);
//按照當前的屬性值將數據集將若干分類,同時將數據集按照這個屬性劃分位若干個新的分裂;
splitted.add(newnode);
}
} else if(entropy==0.0 && currentsplit.attr_index.size()!=total_attributes){
//每次計算到葉子節點的時候,就將其持久化到模型文件中
/**
String rule = "";
temp_size = currentsplit.attr_index.size();
for (int val = 0; val < temp_size; val++) {
rule = rule + " " + currentsplit.attr_index.get(val) + " " + currentsplit.attr_value.get(val);
}
rule = rule + " " + currentsplit.classLabel;
ar.add(rule);
writeRuleToFile(ar);
ar.add("\n");
if (entropy != 0.0) {
System.out.println("Enter rule in file:: " + rule);
} else {
System.out.println("Enter rule in file Entropy zero :: " + rule);
}
System.out.println("persistence model@!!!!");
*/
leafSplits.add(currentsplit);
}
else{
TreeModel tree = PmmlDecisionTree.buildTreeModel(leafSplits);
PMML pmml = new PMML();
pmml.addModels(tree);
PmmlModelFactory.pmmlPersistence("C45/DecisionTree.pmml", pmml);
}
split_size = splitted.size();
System.out.println("TOTAL NODES:: " + split_size);
current_index++;
}
System.out.println("Done!");
System.exit(res);
}
public static void writeRuleToFile(ArrayList text) throws IOException {
Path rule = new Path("C45/rule.txt");
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
try {
BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(fs.create(rule, true)));
for (String str : text) {
bw.write(str);
}
bw.newLine();
bw.close();
} catch (Exception e) {
e.printStackTrace();
}
}
public int run(String[] args) throws Exception {
System.out.println("In main ---- run");
JobConf conf = new JobConf(getConf(), Main.class);
conf.setJobName("C45");
conf.set("currentsplit",ObjectSerializable.serialize(currentsplit));
conf.set("current_index",String.valueOf(currentsplit.attr_index.size()));
conf.set("currentIndex", String.valueOf(current_index));
// the keys are words (strings)
conf.setOutputKeyClass(Text.class);
// the values are counts (ints)
conf.setOutputValueClass(IntWritable.class);
conf.setMapperClass(MapClass.class);
conf.setReducerClass(Reduce.class);
System.out.println("back to run");
FileSystem fs = FileSystem.get(conf);
Path out = new Path(args[1] + current_index);
if (fs.exists(out)) {
fs.delete(out, true);
}
FileInputFormat.setInputPaths(conf, args[0]);
FileOutputFormat.setOutputPath(conf, out);
JobClient.runJob(conf);
return 0;
}
}看完上述內容,你們對使用MapReduce怎么實現決策樹算法有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。