Received Date: December 21, 2016; Accepted Date: February 04, 2017; Published Date: February 11, 2017
Citation: Yang T, Ngu AHH (2017) Implementation of Decision Tree Using Hadoop MapReduce. Int J Biomed Data Min 6: 125. doi: 10.4172/2090-4924.1000125
Copyright: © 2017 Yang T, et al. This is an open-access article distributed under the terms of the Creative Commons Attribution License, which permits unrestricted use, distribution, and reproduction in any medium, provided the original author and source are credited.
Visit for more related articles at International Journal of Biomedical Data Mining
Hadoop is one of the most popular general-purpose computing platforms for the distributed processing of big data. HDFS is implementation of distributed file system by Hadoop to be able to store huge amount of data in a reliable way and serve data processing component by Hadoop at the same time. MapReduce is the main processing engine of Hadoop. In this study, we have implemented HDFS and MapReduce for a well- known learning algorithm—decision tree in a scalable fashion to large input problem size. Computational performance with node count and problem size is evaluated.
Computing; Processing; Data; Algorithm; Network
MapReduce, as Hadoop project's principal processing engine, provides a framework for distributive computing [1,2]. The Map Reduce model is derived from the combinations of the map and reduces concepts in functional programming. A strong characteristic of functional programming is that it hides the complexity of dealing with a cluster of distributive computing nodes. Hence, the developers only need to focus on the implementation of map and reduce functions.
Decision tree is one of the most popular methods for classification and decision making . A classical decision tree is a directed tree comprised of a root node, and decision nodes—all the other nodes with exactly one incoming edge.
The procedure of building a decision tree is as follows. Given a set of training data, find a best splitting attribute from currently all available ones by applying a measure function on all attributes. Once the splitting attribute is determined, the instance is split into multiple partitions. The multiplicity depends on the number of values or ranges of values associated with the splitting attribute. Within each partition, if all samples belong to a single class, the algorithm terminates. Otherwise, the splitting process is recursively performed until each partition belongs to the same class, or no attribute is left. Once a decision tree is generated, classification rules are generated, which can be applied to classify new instances with unknown class labels.
C4.5 is a one of the standard algorithm for decision tree, which uses information gain ratio as the splitting criterion. The algorithm is illustrated in Figure 1.
In the algorithm above,
is the ratio of instances in S which has the jth class label, and C denotes the total number of classes.
is the information needed after splitting by attribute S, in which TS is a subset of T induced by attributes S, and TS,v is a subset of TS of value v for attribute S, and Values(TS) is the set of values for attribute S for records in TS. Absolute value operator means cardinality of.
Information gain is defined as:
Gain(S,T) = Entropy(S) - Info(S,T) ,
which measures the information gain after splitting by attribute S.
Finally, the information gain ratio is:
which is the criterion to split the decision tree. The attribute which gives the maximum GainRatio is selected as the splitting attribute.
Three requirements with decreasing priority have been enforced in our implementation of decision tree using MapReduce and HDFS. 1) Scalability to input data. This voids any assumption that the memory of the master node can hold the data. Therefore, an iteration of launching of MapReduce jobs is required. 2) Minimize the use of mapper/reducer unless necessary. We have found that each launching of MapReudce job takes tens of seconds before it starts executing mapper method. So if the runtime of the section of the program outside MapReduce is less than this launching time, it is not worth implementing an additional set of Mapper and Reducer for any part in corresponding section. 3) Minimize HDFS file I/O in case computation can fulfill the same purpose. Local computation is much faster than typical hard drive I/O, and even faster than network storage I/O which is limited by network data transmission speed. So based on these three requirements, one set of mapper and reducer functions are implemented to read data from the source file and generate the tuple (attribute id, attribute value, class label, count).
The goal of this study is to evaluate the performance of Hadoop implementation of decision tree. In particular, the compute time vs. the count of processors that perform mapper/reducer function, and compute time vs. input data size are examined.
The infrastructure used in this study is AWS (Amazon Web Service), in which three specific services are utilized, i.e., 1) S3, i.e., Simple Storage Service, to which the compiled jar file and input files are transferred from local computer. S3 further sends the above files to the virtual cluster that will be allocated by EC2; 2) EC2, i.e., Elastic Compute Cloud, which creates a virtual cluster to the users’ need; and 3) EMR, i.e., Elastic MapReduce, which runs MapReduce projects in the virtual cluster.
To simulate typical everyday working environment in which Hadoop MapReduce projects are running, our rules for the specification of the virtual cluster is as follow: 1) One compute (performing mapper and reducer functions) processor per node. The philosophy behind is employment of homogeneous hardware layout for scaling behavior study. Recently, multi-processor compute nodes and multi-core processors have become main stream. However, data transfer in a single node is much faster than cross node communication. To make a fair comparison for the scaling behavior of runtime vs. compute processors count, single compute core node is chosen; 2) Each processor has the same and medium computing power; 3) Network transfer capability among nodes is medium. Requirements 2) and 3) are to mimic normal Hadoop running environment. Based on the above rules, AWS m1medium architecture is selected for both master node and core node. The former assigns Hadoop tasks to core nodes and monitors status, but does not participate in the Hadoop jobs. The latter runs Hadoop tasks and store data using HDFS system. The count of master node processor is always 1. And the count of core processors varies from 1 to 8. The Hadoop distribution version is Amazon 2.4.0. All default Hadoop for this version setup is adopted, e.g. 3 copies of HDFS file duplication, 64 MB block size.
Table 1 shows the experimental results, in which, “ML” denotes million-lines and “MB” means megabytes. The 6-digit numbers show run time in unit millisecond. The run time is counted from the launching of main function to the completion of decision tree generation.
Table 1: Computing times for different file sizes and numbers of processors.
Figure 2 shows the runtime vs. number of processors. And Figure 3 shows the speedup vs. number of processors. Speedup is defined as the serial runtime, i.e., runtime for number of processors equal to 1, divided by parallel runtime for corresponding number of processors. For relatively small problem sizes i.e., 1 million lines and 2 million lines input data, Hadoop distributed computing does not improve performance at all. On the contrary, it hurts the performance. For 4 million lines input, the speed up goes up from 1 processor to 2 processors, continued with ignorable increase from 2 processors to 4, and then goes down from 4 to 8. The 8 million lines’ curve shows consistent rise-up from 1 to 8 processors. Only the increase rate slows down from 4 to 8 processors. This behavior can be explained using Amdahl’s law .
As is shown in Figure 5, in ideal situation (green curve), which can never be achieved in reality, speedup is linear to the number of processors. And in the idealist case, the slope for speedup vs. # of processors is 1. Amdahl’s law states that for a parallel program, if the ratio of parallelizable section occupies a fraction f of total run time, the speedup is upper bounded by:
The proof is as follows: T(n) = Ts (n) + Tp (n)
In which T is runtime, n refers to the processor count, and subscript “S” or “P” denotes serial or parallel runtime, respectively. The special case where n=1 and
Tp(1) = f (1)
Serial code runtime has no difference in serial or parallel execution, i.e.,
Ts(n) = Ts (1) = (1- f) T (1)
Apply the ideal case to
Substitute the expressions of Ts (n) and Tp (n) related to
So when # of processors n approaches infinity, S(n) increases and asymptotically approaches
This is basically what the blue curve shows in Figure 5. There exist multiple issues that can bend down the speedup for large number of processors. For example, f may not be a constant but decreases due to increased communications, imbalanced work load, limited bus speed, limited memory access rate, etc. In additional, the turning point (as shown in red figure) is not a constant. It typically moves to right when the problem size goes up. This indicates the smaller the input size, the earlier the saturated or even deteriorated speedup. This explains our curves in Figure 3 why input size 1 and 2 million lines goes down earlier (from # of processors 1) than input size 4 million lines (from # of processors 4), and the latter turns earlier than input size 8 million which has not shown the peak for 8 processors, but already shows decreased slope. In our case, the causes to make small inputs’ speedup decrease and the largest input have so small speedup (no greater than 1.3 are probably the following. 1) Overhead of launching MapReduce jobs. As is observed from Hadoop output, it takes tens of seconds each time a MapReduce job is launched before it starts executing the mapper function; and 2) Communication overhead. Each different node seems to work simultaneously on the data to perform mapper and reducer functions. Due to the network storage nature of HDFS, input data needs to be transferred among datanodes through ethernet; 3) Writing reducer output. After the reducer, it takes time to write 3 copies of output to HDFS.
If the number of processes is increased and the speedup increases linearly without increasing problem size, the problem is strongly scalable. Very few parallel problems fall into this category. On the other hand, if speedup increases linearly by increasing the problem size, the problem is weakly scalable. Apparently, our results show weak scalability.
Figure 4 shows runtime vs. input size for different # of processors. As of our choice of hardware layout, # of processors to perform mapper and reducer functions is the same as # of data nodes. And the default block size for HDFS is 64MB. In addition, each block has 3 duplications in total. Therefore, it is not strange for processor count 1 and 2, the runtime keeps increasing, since each datanode contains full copy of all data. However, for the 2 larger processor counts, for small data size, only part of the nodes has data. This means data needs to be transfer across network. For example, for input size 1 million lines, only 1 block and 3 duplications exist. If # of nodes if 8, 3 nodes need to transfer data to the rest of the 5 nodes. While for data size 8 million lines, i.e., 203.8 MB, 4 data blocks exist, and up to 12 nodes can have at least 1 block if distributed well. This indicates 8 data nodes can all have 1 block of data, which makes data transfer unnecessary. So this could be the reason for a relative big jump from 4 ML to 8 ML in the purple curve.
So can we draw a conclusion that in general case if a decision tree algorithm benefits from Map Reduce implementation? Probably we cannot. The major reason is as follows:
Decision tree is an irregular program, which means the performance is input dependent. Performance depends on multiple factors. For example; a) Number of attributes. We have studied input size’s impact. However, size is only one of the controlling factors. Even if the lines of input are the same, each line can contain different number of attributes, which can cause different complexity in the decision tree; b) Number of class labels; c) Number of values for each attribute. A decision tree of averaged attribute value 100 has different complexity compared with one of averaged attribute value 2; d) Input size, which we have examined; e) Input content, which means the combination of attribute’ values and class label. Even if factors in a) through d) are identical, different combination can lead to different tree complexity. In our study, in order to make a fair comparison, all the factors are kept identical on purpose except d) input file size, which means the complexity of the constructed decision tree is all the same and only the input size varies which causes the difference of runtime in MapReduce program section. Otherwise, the irregularity nature of the algorithm will make comparison meaningless.
In addition, the runtime difference in comparison is almost all caused by MapReduce. The state of program before each execution of Mapper is the same. And due to the fact that the output of Reducer differs in the values of the last column in a same iteration, the complexity of computation performed on the output of Reducer in the same iteration is identical. So the runtime counted reflects only difference in Map Reduce execution.
There are multiple other factors that can impact the results. For example, the selection of hardware layout can affect the performance. The shared-memory system of 8 processors on a single node is expected to perform better than 8 single-core nodes. Because the former uses all local data, while the latter typically involves network data transfer. On the other hand, Hadoop settings, e.g. data block size and duplication count can also impact the performance.
In ideal situation, is it possible to achieve linear speedup in our implementation of decision tree? We can make the following efforts. First, we need to require the numbers of attributes and label count, the number of each attribute’s values bounded. This indicates the total number of unique instances is finite. Also, overheads like invocation of Map Reduce jobs and hardware bottlenecks are also bounded. Second, network data transfer speed is sufficiently fast. Sufficiently here means in comparison with hard drive data transfer speed, for both downstream/read and upstream/write. This indicates all network data transfer works at corresponding local hard drive speed. Therefore, there exists no additional network communication to cause imbalanced work load. Third, number of instances in the input file approaches infinity. This indicates parallel execution fraction approaches 100%. However, for any finite problem size, linear speedup or quasi- linear speedup will continue to a certain node count. The reason is MapReduce jobs are only a portion of the entire program. Even if mapper and reducer functions are perfectly parallelized, the rest is still serial program. And Amdahl’s law tells us that speedup is upper bounded by 1/(1-f), where f is the parallelizable fraction of the program. Even if 95% of a program can be perfectly parallelized, the speedup cannot surpass 20.
In conclusion, we have implemented decision tree algorithm using Hadoop MapReduce computing engine and HDFS file system. We have observed that invocation of a MapReduce job takes tens of seconds. And for our selection of hardware setup and input data, larger input data size shows small performance gain with increased number of processors/ nodes. A weak scalability behavior has been found. However, it is hard to draw a general conclusion on whether decision tree will benefit from MapReduce implementation due to the irrelugarity nature of decision tree problem.
Future study can focus on evaluating performance of more diversified inputs, e.g. different numbers of attributes, class labels, different number of attributes’ values, and randomized instances, and generating performance statistical distributions for interested parameters. In addition, study of different hardware layout, e.g. multiprocessor nodes, multi-core processors in shared memory system, systems including both shared memory and distributed memory layouts. Further, impact of change of default Hadoop settings, e.g. block size and duplication number in HDFS can be modified. Finally, effect of adding an additional mapper/reducer to computer gain ratio can be evaluated.