IT货架 > > 正文
网友分享于:Jun 12, 2018 11:51:00 PM    来源: IT货架   


The good news is that Big Data is here. The bad news is that we are struggling to store and analyze it.


This, in a nutshell, is what Hadoop provides: a reliable shared storage and analysis system. The storage is provided by HDFS , and analysis by MapReduce



$$ 30^$^
  • MapReduce is a good fit for problems that need to analyze the whole dataset , in a batch fashion, particularly for ad hoc analysis. An RDBMS is good for point queries or updates, where the dataset has been indexed to deliver low-latency retrieval and update times of a relatively small amount of data .
  • MapReduce suits applications where the data is written once, and read many times , whereas a relational database is good for datasets that are continually updated .
  • MapReduce works well on unstructured or semistructured data , since it is designed to interpret the data at processing time. In other words, the input keys and values for MapReduce are not an intrinsic property of the data, but they are chosen by the person analyzing the data.
  • Both as relational databases start incorporating some of the ideas from MapReduce (such as Aster Data’s and Greenplum’s databases), and, from the other direction, as higher-level query languages built on MapReduce (such as Pig and Hive) make MapReduce systems more approachable to traditional database programmers.


    和Grid Computing的区别

    MapReduce tries to colocate the data with the compute node, so data access is fast since it is local, known as data locality , is at the heart of MapReduce and is the reason for its good performance.

    Grid Computing is to distribute the work across a cluster of machines, which access a shared filesystem , hosted by a SAN. This works well for predominantly compute-intensive jobs, but becomes a problem when nodes need to access larger data volumes (hundreds of gigabytes, the point at which MapReduce really starts to shine), since the network bandwidth is the bottleneck, and compute nodes become idle.


    Hadoop was created by Doug Cutting , the creator of Apache Lucene, the widely used text search library. Hadoop has its origins in Apache Nutch , an open source web search engine, itself a part of the Lucene project.






    (0, 0067011990999991950 051507004...9999999N9+0000 1+99999999999...)

    (424, 0043012650999991949 032418004...0500001N9+0078 1+99999999999...)
    (106, 0043011990999991950 051512004...9999999N9+0022 1+99999999999...)
    (212, 0043011990999991950 051518004...9999999N9-00 11 1+99999999999...)
    (318, 0043012650999991949 032412004...0500001N9+0111 1+99999999999...)

    Map的任务就是从大量的初始数据中抽取需要的少量数据,组成(Key, Value)的结构,因为Map过程都是本地进行的,所以是很高效的,只需要通过网络传输抽取出的少量的数据。

    The map function merely extracts the year and the air temperature (indicated in bold text), and emits them as its output.

    (1950, 0)

    (1949, 78)
    (1950, 22)
    (1950, -11)
    (1949, 111)

    Framework在把Map输出的数据传给Reduce之前,需要做个预处理,就是把(Key, Value) pairs,按照key排序
    The output from the map function is processed by the MapReduce framework before being sent to the reduce function. This processing sorts and groups the key-value pairs by key.
    (1949, [111, 78])
    (1950, [0, 22, -11])


    (1949, 111)

    (1949, 78)

    (1950, 0)
    (1950, 22)
    (1950, -11)


    All the reduce function has to do now is iterate through the list and pick up the maximum reading.
    (1949, 111)
    (1950, 22)


    MapReduce Data Flow


    • A MapReduce job is a unit of work that the client wants to be performed: it consists of the input data , the MapReduce program , and configuration information.
    • Hadoop runs the job by dividing it into tasks , of which there are two types: map tasks and reduce tasks .
    • There are two types of nodes that control the job execution process: a jobtracker and a number of tasktrackers .The jobtracker coordinates all the jobs run on the system by scheduling tasks to run on tasktrackers. Tasktrackers run tasks and send progress reports to the jobtracker, which keeps a record of the overall progress of each job. If a tasks fails, the jobtracker can reschedule it on a different tasktracker.
    • Hadoop divides the input to a MapReduce job into fixed-size pieces called input splits, or just splits . For most jobs, a good split size tends to be the size of a HDFS block , 64 MB by default . Hadoop does its best to run the map task on a node where the input data resides in HDFS. This is called the data locality optimization . It should now be clear why the optimal split size is the same as the block size: it is the largest size of input that can be guaranteed to be stored on a single node.
    • Map tasks write their output to local disk, not to HDFS . Map output is intermediate output: it’s processed by reduce tasks to produce the final output, and once the job is complete the map output can be thrown away.
    • Reduce tasks don’t have the advantage of data locality—the input to a single reduce task is normally the output from all mappers . Therefore the sorted map outputs have to be transferred across the network to the node where the reduce task is running, where they are merged and then passed to the user-defined reduce function. The output of
      the reduce
      is normally stored in HDFS for reliability .

    Reducer只有比较少的几个,而且需要把所有mapper的数据通过网络传给reducer,而mapper是具有data locality optimization特性,就是需要处理的数据都在本地,所以mapper的任务就是从大量的未处理的数据提取需要的数据,并进行预处理,让传给reducer的数据尽量的简单。

    Combiner Functions

    Many MapReduce jobs are limited by the bandwidth available on the cluster, so it pays to minimize the data transferred between map and reduce tasks.


    The first map produced the output:
    (1950, 0)
    (1950, 20)  
    (1950, 10)
    And the second produced:
    (1950, 25)
    (1950, 15)
    reduce function input:
    (1950, [0, 20, 10, 25, 15])

    加上combiner的效果是,对每个mapper的数据做预处理,在每个mapper node上先滤出最大的,再发给reducer, 这样可以大大减少网络传输量。
    reduce function input:
    (1950, [20, 25])



    Hadoop Streaming
    Hadoop provides an API to MapReduce that allows you to write your map and reduce functions in languages other than Java. Hadoop Streaming uses Unix standard streams as the interface between Hadoop and your program, so you can use any language that can read standard input and write to standard output to write your MapReduce

    Hadoop的这个模块很有意思,他本身是用Java编写的,你通过Java API去调用map reduce接口当然可以实现map reduce的过程,但是这儿提供了一种可能性,你可以用你喜欢的任意的语言去开发map reduce函数,只要它支持read standard input and write to standard output, 你根本就不需要去直接调用Map reduce接口,Hadoop Streaming会帮你搞定一切,so cool!!!

    Example 2-10. Map function for maximum temperature in Python
    #!/usr/bin/env python
    import re
    import sys
    for line in sys.stdin:
        val = line.strip()
        (year, temp, q) = (val[15:19], val[87:92], val[92:93])
        if (temp != "+9999" and re.match("[01459]", q)):
            print "%s\t%s" % (year, temp)
    Example 2-11. Reduce function for maximum temperature in Python
    #!/usr/bin/env python
    import sys
    (last_key, max_val) = (None, 0)
    for line in sys.stdin:
        (key, val) = line.strip().split("\t")
        if last_key and last_key != key:
            print "%s\t%s" % (last_key, max_val) #因为framework再传给reducer之前会按key排序,所以这个逻辑才成立
            (last_key, max_val) = (key, int(val))

            (last_key, max_val) = (key, max(max_val, int(val)))
    if last_key:
    print "%s\t%s" % (last_key, max_val)

    % cat input/ncdc/sample.txt | src/main/ch02/python/max_temperature_map.py | \
    sort | src/main/ch02/python/max_temperature_reduce.py
    1949 111
    1950 22

    用Hadoop Streaming来运行你的mapreduce过程
    % hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
    -input input/ncdc/sample.txt \
    -output output \
    -mapper src/main/ch02/python/max_temperature_map.py \
    -reducer src/main/ch02/python/max_temperature_reduce.py




    The Hadoop Distributed Filesystem


    The Design of HDFS
    Very large files
    “Very large” in this context means files that are hundreds of megabytes, gigabytes, or terabytes in size. There are Hadoop clusters running today that store petabytes of data.

    Streaming data access
    HDFS is built around the idea that the most efficient data processing pattern is a write-once, read-many-times pattern. Each analysis will involve a large proportion, if not all, of the dataset, so the time to read the whole dataset is more important than the latency in reading the first record.

    Commodity hardware
    Hadoop doesn’t require expensive, highly reliable hardware to run on.

    While this may change in the future, these are areas where HDFS is not a good fit today:
    Low-latency data access
    Applications that require low-latency access to data, in the tens of milliseconds range, will not work well with HDFS. Remember HDFS is optimized for delivering a high throughput of data, and this may be at the expense of latency. HBase is currently a better choice for low-latency access.

    Lots of small files
    Since the namenode holds filesystem metadata in memory, the limit to the number of files in a filesystem is governed by the amount of memory on the namenode.

    Multiple writers, arbitrary file modifications
    Files in HDFS may be written to by a single writer. Writes are always made at the end of the file. There is no support for multiple writers, or for modifications at arbitrary offsets in the file.


    HDFS Concepts

    Having a block abstraction for a distributed filesystem brings several benefits.

    • The first benefit is the most obvious: a file can be larger than any single disk in the network. There’s nothing that requires the blocks from a file to be stored on the same disk, so they can take advantage of any of the disks in the cluster.
    • Second, making the unit of abstraction a block rather than a file simplifies the storage subsystem.
    • Furthermore, blocks fit well with replication for providing fault tolerance and availability.

    Namenodes and Datanodes
    A HDFS cluster has two types of node operating in a master-worker pattern: a namenode(the master) and a number of datanodes (workers) .
    Namenode manages the filesystem namespace. It maintains the filesystem tree and the metadata for all the files
    and directories in the tree. This information is stored persistently on the local disk in the form of two files: the namespace image and the edit log. The namenode also knows the datanodes on which all the blocks for a given file are located, however, it does not store block locations persistently, since this information is reconstructed from datanodes when the system starts.

    Datanodes are the work horses of the filesystem. They store and retrieve blocks when they are told to (by clients or the namenode), and they report back to the namenode periodically with lists of blocks that they are storing.

    Without the namenode, the filesystem cannot be used. For this reason, it is important to make the namenode resilient to failure, and Hadoop provides two mechanisms for this.

    • The first way is to back up the files that make up the persistent state of the filesystem metadata.
    • It is also possible to run a secondary namenode, which despite its name does not act as a namenode.

    Hadoop Filesystems
    Hadoop has an abstract notion of filesystem, of which HDFS is just one implementation.


    Filesystem            URI scheme       Java implementation (all under org.apache.hadoop)        
    Local                          file                                    fs.LocalFileSystem
    HDFS                         hdfs                            hdfs.DistributedFileSystem
    KFS (Cloud-Store)      kfs                                fs.kfs.KosmosFileSystem                                
    FTP                            ftp                                     fs.ftp.FTPFileSystem
    S3 (native)                s3n                             fs.s3native.NativeS3FileSystem
    S3 (blockbased)        s3                                       fs.s3.S3FileSystem


    KFS (Cloud-Store):CloudStore (formerly Kosmos filesystem)is a distributed filesystem like HDFS or Google’s GFS, written in C++. Find more information about it at http://kosmosfs.sourceforge.net/.

    S3 (native):A filesystem backed by AmazonS3. See http://wiki.apache.org/hadoop/AmazonS3.

    S3 (blockbased):A filesystem backed by Amazon S3, which stores files in blocks(much like HDFS) to overcome S3’s 5 GB file size limit.

    Although it is possible (and sometimes very convenient) to run MapReduce programs that access any of these filesystems, when you are processing large volumes of data, you should choose a distributed filesystem that has the data locality optimization, such as HDFS or KFS

    HDFS Data Flow

    Anatomy of a File Read

    1. The client calling open() on the FileSystem object, which for HDFS is an instance of DistributedFileSystem .
    2. DistributedFileSystem calls the namenode , using RPC, to determine the locations of the blocks for the first few blocks in the file. The DistributedFileSystem returns a FSDataInputStream (an input stream that supports file seeks) to the client for it to read data from.
    3. The client then calls read() on the stream. DFSInputStream, which has stored the datanode addresses for the first few blocks in the file, then connects to the first (closest) datanode for the first block in the file. Data is streamed from the datanode back to the client, which calls read() repeatedly on the stream. When the endof the block is reached, DFSInputStream will close the connection to the datanode , then find the best datanode for the next block . This happens transparently to the client , which from its point of view is just reading a continuous stream
    4. When the client has finished reading, it calls close() on the FSDataInputStream

    During reading, if the client encounters an error while communicating with a datanode, then it will try the next closest one for that block . It will also remember datanodes that have failed so that it doesn’t needlessly retry them for later blocks.

    Anatomy of a File Write

    1. The client creates the file by calling create() on DistributedFileSystem.
    2. DistributedFileSystem makes an RPC call to the namenode to create a new file in the filesystem’s namespace, with no blocks associated with it. The namenode performs various checks to make sure the file doesn’t already exist, and that the client has the right permissions to create the file. If these checks pass , the namenode makes a record of the new file; otherwise, file creation fails and the client is thrown an IOException. The DistributedFileSystem returns a FSDataOutputStream for the client to start writing data to.
    3. As the client writes data, DFSOutputStream splits it into packets , which it writes to an internal queue, called the data queue . The data queue is consumed by the Data Streamer , whose responsibility it is to ask the namenode to allocate new blocks by picking a list of suitable datanodes to store the replicas . The list of datanodes forms a pipeline .
    4. DFSOutputStream also maintains an internal queue of packets that are waiting to be acknowledged by datanodes , called the ack queue . A packet is removed from the ack queue only when it has been acknowledged by all the datanodes in the pipeline
    5. When the client has finished writing data it calls close() on the stream. This action flushes all the remaining packets to the datanode pipeline and waits for acknowledgments before contacting the namenode to signal that the file is complete

    如果在过程中发现某个datanote fail了,当然也有办法处理它
    首先会关闭管道。之后,所有在确认队列中的包都被放到数据队列的最前面。然后,管道中其余工作正常的 datanode的当前block会被赋予一个新的认证号。这样,那个出错的datanode的没写完的block会在下次该datanode恢复工作之后被删除。之后,把那个出错的datanode从管道中删除。当前block的其余部分将被写到其余的正常的datanode上。于此同时,namenode指定一个新的datanode来替换出错的datanode,以正常完成写文件及数据复制过程。HDFS设有一个 dfs.replication.min参数(默认值是1)是用来设置容错率的。如果在写一个block过程中多个datanode出错了,只要没有达到 dfs.replication.min的值,写文件依然会成功

    Replica Placement
    How does the namenode choose which datanodes to store replicas on ?

    There’s a tradeoff between reliability and write bandwidth and read bandwidth here.
    Hadoop’s strategy is to place the first replica on the same node as the client (for clients running outside the cluster, a node is chosen at random, although the system tries not to pick nodes that are too full or too busy). The second replica is placed on a different rack from the first (off-rack), chosen at random. The third replica is placed on the same rack as the second , but on a different node chosen at random. Further replicas are placed on random nodes on the cluster, although the system tries to avoid placing too many replicas on the same rack.


    Hadoop Archives
    HDFS stores small files inefficiently , since each file is stored in a block, and block metadata is held in memory by the namenode.
    Hadoop Archives, or HAR files , are a file archiving facility that packs files into HDFS blocks more efficiently, thereby reducing namenode memory usage while still allowing transparent access to files. In particular, Hadoop Archives can be used as input to MapReduce .


    Anatomy of a MapReduce Job Run

    You can run a MapReduce job with a single line of code: JobClient.runJob(conf). It’s very short, but it conceals a great deal of processing behind the scenes. At the highest level, there are four independent entities:

    • The client , which submits the MapReduce job.
    • The jobtracker , which coordinates the job run. The jobtracker is a Java application whose main class is JobTracker.
    • The tasktrackers , which run the tasks that the job has been split into. Tasktrackers are Java applications whose main class is TaskTracker.
    • The distributed filesystem , which is used for sharing job files between the other entities.

    下面就每个步骤详细描述一下Job Run的process

    Job Submission
    The job submission process implemented by JobClient’s submitJob() method does the following:

    • Asks the jobtracker for a new job ID (by calling getNewJobId() on JobTracker)
    • Checks the output specification of the job. For example, if the output directory has not been specified or it already exists, the job is not submitted and an error is thrown to the MapReduce program.
    • Computes the input splits for the job. If the splits cannot be computed, because the input paths don’t exist, for example, then the job is not submitted and an error is thrown to the MapReduce program.
    • Copies the resources needed to run the job, including the job JAR file , the configuration file and the computed input splits , to the jobtracker’s filesystem i n a directory named after the job ID.
    • Tells the jobtracker that the job is ready for execution (by calling submitJob() on JobTracker)

    Job Initialization
    When the JobTracker receives a call to its submitJob() method, it puts it into an internal queue from where the job scheduler will pick it up and initialize it . Initialization involves creating an object to represent the job being run, which encapsulates its tasks, and bookkeeping information to keep track of the tasks’ status and progress


    Task Assignment
    Tasktrackers run a simple loop that periodically sends heartbeat method calls to the jobtracker . Heartbeats tell the jobtracker that a tasktracker is alive , but they also double as a channel for messages. As a part of the heartbeat, a tasktracker will indicate whether it is ready to run a new task , and if it is, the jobtracker will allocate it a task , which it communicates to the tasktracker using the heartbeat return value


    Tasktrackers have a fixed number of slots for map tasks and for reduce tasks.The default scheduler fills empty map task slots before reduce task slots, so if the tasktracker has at least one empty map task slot, the jobtracker will select a map task; otherwise, it will select a reduce task.

    Jobtracker在选择reduce task时比较简单,因为不用考虑data locality,但是对于map task,it takes account of the tasktracker’s network location and picks a task whose input split is as close as possible to the tasktracker
    就是要尽量保证data-local,如果无法达到,maybe rack-local,一句话,选取map task时,尽量减少map数据的网络流量


    Task Execution
    First, it localizes the job JAR by copying it from the shared filesystem to the tasktracker’s filesystem .
    Second, it creates a local working directory for the task, and un-jars the contents of the JAR into this directory.
    Third, it creates an instance of TaskRunner to run the task.

    先copy task的代码,对于java就是jar包,到tasktracker所在服务器,然后创建子进程去执行,对于java就是创建新的JVM去执行,保证task之间不干扰,这儿考虑到不断创建JVM的耗费,可以优化建立JVM的pool,节省不断创建和销毁的时间开销。


    Progress and Status Updates
    MapReduce jobs are long-running batch jobs, taking anything from minutes to hours to run. It’s important for the user to get feedback on how the job is progressing. A job and each of its tasks have a status, which includes such things as the state of the job or task (e.g., running, successfully completed, failed), the progress of maps and reduces , the values of the job’s counters , and a status message or description (which may be set by user code).


    Job Completion
    When the jobtracker receives a notification that the last task for a job is complete, it changes the status for the job to “successful.” Then, when the JobClient polls for status, it learns that the job has completed successfully, so it prints a message to tell the user, and then returns from the runJob() method.

    Last, the jobtracker cleans up its working state for the job, and instructs tasktrackers to do the same (so intermediate output is deleted, for example).



    Task Failure

    • The most common way that this happens is when user code in the map or reduce task throws a runtime exception . If this happens, the child JVM reports the error back to its parent tasktracker, before it exits. The error ultimately makes it into the user logs. The tasktracker marks the task attempt as failed, freeing up a slot to run another task.
    • Another failure mode is the sudden exit of the child JVM —perhaps there is a JVM bug that causes the JVM to exit for a particular set of circumstances exposed by the Map-Reduce user code. In this case, the tasktracker notices that the process has exited, and marks the attempt as failed.
    • Hanging tasks are dealt with differently. The tasktracker notices that it hasn’t received a progress update for a while, and proceeds to mark the task as failed. The child JVM process will be automatically killed after this period.
    • When the jobtracker is notified of a task attempt that has failed (by the tasktracker’s heartbeat call) it will reschedule execution of the task. The jobtracker will try to avoid rescheduling the task on a tasktracker where it has previously failed. Furthermore, if a task fails more than four times, it will not be retried further.


    Tasktracker Failure
    Failure of a tasktracker is another failure mode. If a tasktracker fails by crashing, or running very slowly, it will stop sending heartbeats to the jobtracker (or send them very infrequently). The jobtracker will notice a tasktracker that has stopped sending heartbeats (if it hasn’t received one for 10 minutes, configured via the mapred.task tracker.expiry.interval property, in milliseconds) and remove it from its pool of tasktrackers to schedule tasks on.

    Jobtracker Failure
    Failure of the jobtracker is the most serious failure mode. Currently, Hadoop has no mechanism for dealing with failure of the jobtracker—it is a single point of failure—so in this case the job fails.


    Shuffle and Sort
    MapReduce makes the guarantee that the input to every reducer is sorted by key. The process by which the system performs the sort—and transfers the map outputs to the reducers as inputs—is known as the shuffle.

    The Map Side
    Each map task has a circular memory buffer that it writes the output to. The buffer is 100 MB by default, a size which can be tuned by changing the io.sort.mb property. When the contents of the buffer reaches a certain threshold size (io.sort.spill.percent, default 0.80, or 80%) a background thread will start to spill the contents to disk .

    Before it writes to disk, the thread first divides the data into partitions corresponding to the reducers that they will ultimately be sent to. Within each partition, the background thread performs an in-memory sort by key , and if there is a combiner function, it is run on the output of the sort.


    Each time the memory buffer reaches the spill threshold, a new spill file is created, so after the map task has written its last output record there could be several spill files . Before the task is finished, the spill files are merged into a single partitioned and sorted output file . The configuration property io.sort.factor controls the maximum number
    of streams to merge at once; the default is 10.

    The Reduce Side
    The map output file is sitting on the local disk of the tasktracker that ran the map task, 所以要做reduce,必须把所有map的output数据都copy到reducer所在的服务器上,This is known as the copy phase of the reduce task.

    When all the map outputs have been copied, the reduce task moves into the sort phase (which should properly be called the merge phase , as the sorting was carried out on the map side), which merges the map outputs, maintaining their sort ordering. This is done in rounds.
    For example, 50 map outputs, and the merge factor was 10 (the default, controlled by the io.sort.factor), then there would be 5 rounds.

    Rather than have a final round that merges these five files into a single sorted file, the merge saves a trip to disk by directly feeding the reduce function in what is the last phase: the reduce phase .


    广告服务联系QQ:1134687142 | 网站地图

    版权所有: IT货架- 内容来自互联网,仅供用于技术学习,请遵循相关法律法规. 京ICP备11030978号-1