posts - 200, comments - 8, trackbacks - 0, articles - 0

     摘要: 转自:http://blog.oddfoo.net/2011/04/17/mapreduce-partition%E5%88%86%E6%9E%90-2/Partition所处的位置Partition位置Partition主要作用就是将map的结果发送到相应的reduce。这就对partition有两个要求:1)均衡负载,尽量的将工作均匀的分配给不同的reduce。2)效率,分配速度一定要快。Ma...  阅读全文

posted @ 2013-04-01 21:10 鑫龙 阅读(380) | 评论 (0)编辑 收藏

NameNode中几个关键的数据结构

FSImage

Namenode会将HDFS的文件和目录元数据存储在一个叫fsimage的二进制文件中,每次保存fsimage之后到下次保存之间的所有hdfs操作,将会记录在editlog文件中,当editlog达到一定的大小(bytes,由fs.checkpoint.size参数定义)或从上次保存过后一定时间段过后(sec,由fs.checkpoint.period参数定义),namenode会重新将内存中对整个HDFS的目录树和文件元数据刷到fsimage文件中。Namenode就是通过这种方式来保证HDFS中元数据信息的安全性。

Fsimage是一个二进制文件,当中记录了HDFS中所有文件和目录的元数据信息,在我的hadoop的HDFS版中,该文件的中保存文件和目录的格式如下:

 

当namenode重启加载fsimage时,就是按照如下格式协议从文件流中加载元数据信息。从fsimag的存储格式可以看出,fsimage保存有如下信息:

1.         首先是一个image head,其中包含:

a)         imgVersion(int):当前image的版本信息

b)        namespaceID(int):用来确保别的HDFS instance中的datanode不会误连上当前NN。

c)         numFiles(long):整个文件系统中包含有多少文件和目录

d)        genStamp(long):生成该image时的时间戳信息。

2.         接下来便是对每个文件或目录的源数据信息,如果是目录,则包含以下信息:

a)         path(String):该目录的路径,如”/user/build/build-index”

b)        replications(short):副本数(目录虽然没有副本,但这里记录的目录副本数也为3)

c)         mtime(long):该目录的修改时间的时间戳信息

d)        atime(long):该目录的访问时间的时间戳信息

e)         blocksize(long):目录的blocksize都为0

f)         numBlocks(int):实际有多少个文件块,目录的该值都为-1,表示该item为目录

g)        nsQuota(long):namespace Quota值,若没加Quota限制则为-1

h)        dsQuota(long):disk Quota值,若没加限制则也为-1

i)          username(String):该目录的所属用户名

j)          group(String):该目录的所属组

k)        permission(short):该目录的permission信息,如644等,有一个short来记录。

3.         若从fsimage中读到的item是一个文件,则还会额外包含如下信息:

a)         blockid(long):属于该文件的block的blockid,

b)        numBytes(long):该block的大小

c)         genStamp(long):该block的时间戳

当该文件对应的numBlocks数不为1,而是大于1时,表示该文件对应有多个block信息,此时紧接在该fsimage之后的就会有多个blockid,numBytes和genStamp信息。

因此,在namenode启动时,就需要对fsimage按照如下格式进行顺序的加载,以将fsimage中记录的HDFS元数据信息加载到内存中。

BlockMap

从以上fsimage中加载如namenode内存中的信息中可以很明显的看出,在fsimage中,并没有记录每一个block对应到哪几个datanodes的对应表信息,而只是存储了所有的关于namespace的相关信息。而真正每个block对应到datanodes列表的信息在hadoop中并没有进行持久化存储,而是在所有datanode启动时,每个datanode对本地磁盘进行扫描,将本datanode上保存的block信息汇报给namenode,namenode在接收到每个datanode的块信息汇报后,将接收到的块信息,以及其所在的datanode信息等保存在内存中。HDFS就是通过这种块信息汇报的方式来完成 block -> datanodes list的对应表构建。Datanode向namenode汇报块信息的过程叫做blockReport,而namenode将block -> datanodes list的对应表信息保存在一个叫BlocksMap的数据结构中。

BlocksMap的内部数据结构如下:   

              

 

如上图显示,BlocksMap实际上就是一个Block对象对BlockInfo对象的一个Map表,其中Block对象中只记录了blockid,block大小以及时间戳信息,这些信息在fsimage中都有记录。而BlockInfo是从Block对象继承而来,因此除了Block对象中保存的信息外,还包括代表该block所属的HDFS文件的INodeFile对象引用以及该block所属datanodes列表的信息(即上图中的DN1,DN2,DN3,该数据结构会在下文详述)。

因此在namenode启动并加载fsimage完成之后,实际上BlocksMap中的key,也就是Block对象都已经加载到BlocksMap中,每个key对应的value(BlockInfo)中,除了表示其所属的datanodes列表的数组为空外,其他信息也都已经成功加载。所以可以说:fsimage加载完毕后,BlocksMap中仅缺少每个块对应到其所属的datanodes list的对应关系信息。所缺这些信息,就是通过上文提到的从各datanode接收blockReport来构建。当所有的datanode汇报给namenode的blockReport处理完毕后,BlocksMap整个结构也就构建完成。

BlockMap中datanode列表数据结构

在BlockInfo中,将该block所属的datanodes列表保存在一个Object[]数组中,但该数组不仅仅保存了datanodes列表,还包含了额外的信息。实际上该数组保存了如下信息:

 

上图表示一个block包含有三个副本,分别放置在DN1,DN2和DN3三个datanode上,每个datanode对应一个三元组,该三元组中的第二个元素,即上图中prev block所指的是该block在该datanode上的前一个BlockInfo引用。第三个元素,也就是上图中next Block所指的是该block在该datanode上的下一个BlockInfo引用。每个block有多少个副本,其对应的BlockInfo对象中就会有多少个这种三元组。

       Namenode采用这种结构来保存block->datanode list的目的在于节约namenode内存。由于namenode将block->datanodes的对应关系保存在了内存当中,随着HDFS中文件数的增加,block数也会相应的增加,namenode为了保存block->datanodes的信息已经耗费了相当多的内存,如果还像这种方式一样的保存datanode->block list的对应表,势必耗费更多的内存,而且在实际应用中,要查一个datanode上保存的block list的应用实际上非常的少,大部分情况下是要根据block来查datanode列表,所以namenode中通过上图的方式来保存block->datanode list的对应关系,当需要查询datanode->block list的对应关系时,只需要沿着该数据结构中next Block的指向关系,就能得出结果,而又无需保存datanode->block list在内存中。

NameNode启动过程

fsimage加载过程

Fsimage加载过程完成的操作主要是为了:

1.         从fsimage中读取该HDFS中保存的每一个目录和每一个文件

2.         初始化每个目录和文件的元数据信息

3.         根据目录和文件的路径,构造出整个namespace在内存中的镜像

4.         如果是文件,则读取出该文件包含的所有blockid,并插入到BlocksMap中。

整个加载流程如下图所示:

 

如上图所示,namenode在加载fsimage过程其实非常简单,就是从fsimage中不停的顺序读取文件和目录的元数据信息,并在内存中构建整个namespace,同时将每个文件对应的blockid保存入BlocksMap中,此时BlocksMap中每个block对应的datanodes列表暂时为空。当fsimage加载完毕后,整个HDFS的目录结构在内存中就已经初始化完毕,所缺的就是每个文件对应的block对应的datanode列表信息。这些信息需要从datanode的blockReport中获取,所以加载fsimage完毕后,namenode进程进入rpc等待状态,等待所有的datanodes发送blockReports。

blockReport阶段

每个datanode在启动时都会扫描其机器上对应保存hdfs block的目录下(dfs.data.dir)所保存的所有文件块,然后通过namenode的rpc调用将这些block信息以一个long数组的方式发送给namenode,namenode在接收到一个datanode的blockReport rpc调用后,从rpc中解析出block数组,并将这些接收到的blocks插入到BlocksMap表中,由于此时BlocksMap缺少的仅仅是每个block对应的datanode信息,而namenoe能从report中获知当前report上来的是哪个datanode的块信息,所以,blockReport过程实际上就是namenode在接收到块信息汇报后,填充BlocksMap中每个block对应的datanodes列表的三元组信息的过程。其流程如下图所示:

 

当所有的datanode汇报完block,namenode针对每个datanode的汇报进行过处理后,namenode的启动过程到此结束。此时BlocksMap中block->datanodes的对应关系已经初始化完毕。如果此时已经达到安全模式的推出阈值,则hdfs主动退出安全模式,开始提供服务。

启动过程数据采集和瓶颈分析

对namenode的整个启动过程有了详细了解之后,就可以对其启动过程中各阶段各函数的调用耗时进行profiling的采集,数据的profiling仍然分为两个阶段,即fsimage加载阶段和blockReport阶段。

fsimage加载阶段性能数据采集和瓶颈分析

以下是对建库集群真实的fsimage加载过程的的性能采集数据:

 

从上图可以看出,fsimage的加载过程那个中,主要耗时的操作分别分布在FSDirectory.addToParentFSImage.readString,以及PermissionStatus.read三个操作,这三个操作分别占用了加载过程的73%,15%以及8%,加起来总共消耗了整个加载过程的96%。而其中FSImage.readStringPermissionStatus.read操作都是从fsimage的文件流中读取数据(分别是读取String和short)的操作,这种操作优化的空间不大,但是通过调整该文件流的Buffer大小来提高少许性能。而FSDirectory.addToParent的调用却占用了整个加载过程的73%,所以该调用中的优化空间比较大。

       以下是addToParent调用中的profiling数据:

 

从以上数据可以看出addToParent调用占用的73%的耗时中,有66%都耗在了INode.getPathComponents调用上,而这66%分别有36%消耗在INode.getPathNames调用,30%消耗在INode.getPathComponents调用。这两个耗时操作的具体分布如以下数据所示:

 

可以看出,消耗了36%的处理时间的INode.getPathNames操作,全部都是在通过String.split函数调用来对文件或目录路径进行切分。另外消耗了30%左右的处理时间在INode.getPathComponents中,该函数中最终耗时都耗在获取字符串的byte数组的java原生操作中。

blockReport阶段性能数据采集和瓶颈分析

由于blockReport的调用是通过datanode调用namenode的rpc调用,所以在namenode进入到等待blockreport阶段后,会分别开启rpc调用的监听线程和rpc调用的处理线程。其中rpc处理和rpc鉴定的调用耗时分布如下图所示:

 

而其中rpc的监听线程的优化是另外一个话题,在其他的issue中再详细讨论,且由于blockReport的操作实际上是触发的rpc处理线程,所以这里只关心rpc处理线程的性能数据。

       在namenode处理blockReport过程中的调用耗时性能数据如下:

 

可以看出,在namenode启动阶段,处理从各个datanode汇报上来的blockReport耗费了整个rpc处理过程中的绝大部分时间(48/49),blockReport处理逻辑中的耗时分布如下图:

 

 

从上图数据中可以发现,blockReport阶段中耗时分布主要耗时在FSNamesystem.addStoredBlock调用以及DatanodeDescriptor.reportDiff过程中,分别耗时37/48和10/48,其中FSNamesystem.addStoredBlock所进行的操作时对每一个汇报上来的block,将其于汇报上来的datanode的对应关系初始化到namenode内存中的BlocksMap表中。所以对于每一个block就会调用一次该方法。所以可以看到该方法在整个过程中调用了774819次,而另一个耗时的操作,即DatanodeDescriptor.reportDiff,该操作的过程在上文中有详细介绍,主要是为了将该datanode汇报上来的blocks跟namenode内存中的BlocksMap中进行对比,以决定那个哪些是需要添加到BlocksMap中的block,哪些是需要添加到toRemove队列中的block,以及哪些是添加到toValidate队列中的block。由于这个操作需要针对每一个汇报上来的block去查询BlocksMap,以及namenode中的其他几个map,所以该过程也非常的耗时。而且从调用次数上可以看出,reportDiff调用在启动过程中仅调用了14次(有14个datanode进行块汇报),却耗费了10/48的时间。所以reportDiff也是整个blockReport过程中非常耗时的瓶颈所在。

       同时可以看到,出了reportDiff,addStoredBlock的调用耗费了37%的时间,也就是耗费了整个blockReport时间的37/48,该方法的调用目的是为了将从datanode汇报上来的每一个block插入到BlocksMap中的操作。从该方法调用的运行数据如下图所示:

 

从上图可以看出,addStoredBlock中,主要耗时的两个阶段分别是FSNamesystem.countNode和DatanodeDescriptor.addBlock,后者是java中的插表操作,而FSNamesystem.countNode调用的目的是为了统计在BlocksMap中,每一个block对应的各副本中,有几个是live状态,几个是decommission状态,几个是Corrupt状态。而在namenode的启动初始化阶段,用来保存corrput状态和decommission状态的block的map都还是空状态,并且程序逻辑中要得到的仅仅是出于live状态的block数,所以,这里的countNoes调用在namenode启动初始化阶段并无需统计每个block对应的副本中的corrrput数和decommission数,而仅仅需要统计live状态的block副本数即可,这样countNodes能够在namenode启动阶段变得更轻量,以节省启动时间。

瓶颈分析总结

从profiling数据和瓶颈分歧情况来看,fsimage加载阶段的瓶颈除了在分切路径的过程中不够优以外,其他耗时的地方几乎都是在java原生接口的调用中,如从字节流读数据,以及从String对象中获取byte[]数组的操作。

       而blockReport阶段的耗时其实很大的原因是跟当前的namenode设计以及内存结构有关,比较明显的不优之处就是在namenode启动阶段的countNode和reportDiff的必要性,这两处在namenode初始化时的blockReport阶段有一些不必要的操作浪费了时间。可以针对namenode启动阶段将必要的操作抽取出来,定制成namenode启动阶段才调用的方式,以优化namenode启动性能。


Ref: http://blog.csdn.net/ae86_fc/article/details/5842020

posted @ 2013-03-28 18:52 鑫龙 阅读(435) | 评论 (0)编辑 收藏

1.二次排序概念:

首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果 。

如: 输入文件:

20 21 
50 51 
50 52 
50 53 
50 54 
60 51 
60 53 
60 52 
60 56 
60 57 
70 58 
60 61 
70 54 
70 55 
70 56 
70 57 
70 58 
1 2 
3 4 
5 6 
7 82 
203 21 
50 512 
50 522 
50 53 
530 54 
40 511 
20 53 
20 522 
60 56 
60 57 
740 58 
63 61 
730 54 
71 55 
71 56 
73 57 
74 58 
12 211 
31 42 
50 62 
7 8

输出(需要分割线):

------------------------------------------------ 
1       2 
------------------------------------------------ 
3       4 
------------------------------------------------ 
5       6 
------------------------------------------------ 
7       8 
7       82 
------------------------------------------------ 
12      211 
------------------------------------------------ 
20      21 
20      53 
20      522 
------------------------------------------------ 
31      42 
------------------------------------------------ 
40      511 
------------------------------------------------ 
50      51 
50      52 
50      53 
50      53 
50      54 
50      62 
50      512 
50      522 
------------------------------------------------ 
60      51 
60      52 
60      53 
60      56 
60      56 
60      57 
60      57 
60      61 
------------------------------------------------ 
63      61 
------------------------------------------------ 
70      54 
70      55 
70      56 
70      57 
70      58 
70      58 
------------------------------------------------ 
71      55 
71      56 
------------------------------------------------ 
73      57 
------------------------------------------------ 
74      58 
------------------------------------------------ 
203     21 
------------------------------------------------ 
530     54 
------------------------------------------------ 
730     54 
------------------------------------------------ 
740     58

2.工作原理

使用如下map和reduce:(特别注意输入输出类型, 其中IntPair为自定义类型)

public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable> 
public static class Reduce extends Reducer<IntPair, NullWritable, IntWritable, IntWritable>

     在map阶段,使用job.setInputFormatClass(TextInputFormat)做为输入格式。注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则使用key的实现的compareTo方法。在随后的例子中,第一个例子中,使用了IntPair实现的compareTo方法,而在下一个例子中,专门定义了key比较函数类。

     在reduce阶段,reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。然后开始构造一个key对应的value迭代器。这时就要用到分组,使用jobjob.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key。最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。

3,具体步骤

(1)自定义key

在mr中,所有的key是需要被比较和排序的,并且是二次,先根据partitione,再根据大小。而本例中也是要比较两次。先按照第一字段排序,然后再对第一字段相同的按照第二字段排序。根据这一点,我们可以构造一个复合类IntPair,他有两个字段,先利用分区对第一字段排序,再利用分区内的比较对第二字段排序。 
所有自定义的key应该实现接口WritableComparable,因为是可序列的并且可比较的。并重载方法:

//反序列化,从流中的二进制转换成IntPair  
public void readFields(DataInput in) throws IOException          
//序列化,将IntPair转化成使用流传送的二进制  
public void write(DataOutput out)  
//key的比较  
public int compareTo(IntPair o)          
//另外新定义的类应该重写的两个方法  
//The hashCode() method is used by the HashPartitioner (the default partitioner in MapReduce)  
public int hashCode()   
public boolean equals(Object right)
(2)由于key是自定义的,所以还需要自定义一下类: 
(2.1)分区函数类。这是key的第一次比较。 
public static class FirstPartitioner extends Partitioner<IntPair,IntWritable>

在job中使用setPartitionerClasss设置Partitioner。 
(2.2)key比较函数类。这是key的第二次比较。这是一个比较器,需要继承WritableComparator(也就是实现RawComprator接口)。

   (这个就是前面说的第二种方法,但是在第三部分的代码中并没有实现此函数,而是直接使用compareTo方法进行比较,所以也就不许下面一行的设置) 
在job中使用setSortComparatorClass设置key比较函数类。

public static class KeyComparator extends WritableComparator
2.3)分组函数类。在reduce阶段,构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器。这是一个比较器,需要继承WritableComparator。 
public static class GroupingComparator extends WritableComparator
分组函数类也必须有一个构造函数,并且重载 public int compare(WritableComparable w1, WritableComparable w2) 
分组函数类的另一种方法是实现接口RawComparator。 
在job中使用setGroupingComparatorClass设置分组函数类。 
另外注意的是,如果reduce的输入与输出不是同一种类型,则不要定义Combiner也使用reduce,因为Combiner的输出是reduce的输入。除非重新定义一个Combiner。 


转自:http://www.cnblogs.com/dandingyy/archive/2013/03/08/2950703.html

posted @ 2013-03-25 19:38 鑫龙 阅读(2233) | 评论 (0)编辑 收藏

面试hadoop可能被问到的问题,你能回答出几个 ?

1、hadoop运行的原理?

2、mapreduce的原理?

3、HDFS存储的机制?

4、举一个简单的例子说明mapreduce是怎么来运行的 ?

5、面试的人给你出一些问题,让你用mapreduce来实现?

      比如:现在有10个文件夹,每个文件夹都有1000000个url.现在让你找出top1000000url。

6、hadoop中Combiner的作用?

Src: http://p-x1984.javaeye.com/blog/859843


 

Q1. Name the most common InputFormats defined in Hadoop? Which one is default ? 
Following 2 are most common InputFormats defined in Hadoop 
- TextInputFormat
- KeyValueInputFormat
- SequenceFileInputFormat
Q2. What is the difference between TextInputFormatand KeyValueInputFormat class
TextInputFormat: It reads lines of text files and provides the offset of the line as key to the Mapper and actual line as Value to the mapper
KeyValueInputFormat: Reads text file and parses lines into key, val pairs. Everything up to the first tab character is sent as key to the Mapper and the remainder of the line is sent as value to the mapper.
Q3. What is InputSplit in Hadoop
When a hadoop job is run, it splits input files into chunks and assign each split to a mapper to process. This is called Input Split 
Q4. How is the splitting of file invoked in Hadoop Framework 
It is invoked by the Hadoop framework by running getInputSplit()method of the Input format class (like FileInputFormat) defined by the user 
Q5. Consider case scenario: In M/R system,
    - HDFS block size is 64 MB
    - Input format is FileInputFormat
    - We have 3 files of size 64K, 65Mb and 127Mb 
then how many input splits will be made by Hadoop framework?
Hadoop will make 5 splits as follows 
- 1 split for 64K files 
- 2  splits for 65Mb files 
- 2 splits for 127Mb file 
Q6. What is the purpose of RecordReader in Hadoop
The InputSplithas defined a slice of work, but does not describe how to access it. The RecordReaderclass actually loads the data from its source and converts it into (key, value) pairs suitable for reading by the Mapper. The RecordReader instance is defined by the InputFormat 
Q7. After the Map phase finishes, the hadoop framework does "Partitioning, Shuffle and sort". Explain what happens in this phase?
- Partitioning
Partitioning is the process of determining which reducer instance will receive which intermediate keys and values. Each mapper must determine for all of its output (key, value) pairs which reducer will receive them. It is necessary that for any key, regardless of which mapper instance generated it, the destination partition is the same

- Shuffle
After the first map tasks have completed, the nodes may still be performing several more map tasks each. But they also begin exchanging the intermediate outputs from the map tasks to where they are required by the reducers. This process of moving map outputs to the reducers is known as shuffling.
- Sort
Each reduce task is responsible for reducing the values associated with several intermediate keys. The set of intermediate keys on a single node is automatically sorted by Hadoop before they are presented to the Reducer 
Q9. If no custom partitioner is defined in the hadoop then how is data partitioned before its sent to the reducer 
The default partitioner computes a hash value for the key and assigns the partition based on this result 
Q10. What is a Combiner 
The Combiner is a "mini-reduce" process which operates only on data generated by a mapper. The Combiner will receive as input all data emitted by the Mapper instances on a given node. The output from the Combiner is then sent to the Reducers, instead of the output from the Mappers.
Q11. Give an example scenario where a cobiner can be used and where it cannot be used
There can be several examples following are the most common ones
- Scenario where you can use combiner
  Getting list of distinct words in a file
- Scenario where you cannot use a combiner
  Calculating mean of a list of numbers 
Q12. What is job tracker
Job Tracker is the service within Hadoop that runs Map Reduce jobs on the cluster
Q13. What are some typical functions of Job Tracker
The following are some typical tasks of Job Tracker
- Accepts jobs from clients
- It talks to the NameNode to determine the location of the data
- It locates TaskTracker nodes with available slots at or near the data
- It submits the work to the chosen Task Tracker nodes and monitors progress of each task by receiving heartbeat signals from Task tracker 
Q14. What is task tracker
Task Tracker is a node in the cluster that accepts tasks like Map, Reduce and Shuffle operations - from a JobTracker 

Q15. Whats the relationship between Jobs and Tasks in Hadoop
One job is broken down into one or many tasks in Hadoop
Q16. Suppose Hadoop spawned 100 tasks for a job and one of the task failed. What willhadoop do ?
It will restart the task again on some other task tracker and only if the task fails more than 4 (default setting and can be changed) times will it kill the job
Q17. Hadoop achieves parallelism by dividing the tasks across many nodes, it is possible for a few slow nodes to rate-limit the rest of the program and slow down the program. What mechanism Hadoop provides to combat this  
Speculative Execution 
Q18. How does speculative execution works in Hadoop 
Job tracker makes different task trackers process same input. When tasks complete, they announce this fact to the Job Tracker. Whichever copy of a task finishes first becomes the definitive copy. If other copies were executing speculatively, Hadoop tells the Task Trackers to abandon the tasks and discard their outputs. The Reducers then receive their inputs from whichever Mapper completed successfully, first. 
Q19. Using command line in Linux, how will you 
- see all jobs running in the hadoop cluster
- kill a job
hadoop job -list
hadoop job -kill jobid 
Q20. What is Hadoop Streaming 
Streaming is a generic API that allows programs written in virtually any language to be used asHadoop Mapper and Reducer implementations 

Q21. What is the characteristic of streaming API that makes it flexible run map reduce jobs in languages like perl, ruby, awk etc. 
Hadoop Streaming allows to use arbitrary programs for the Mapper and Reducer phases of a Map Reduce job by having both Mappers and Reducers receive their input on stdin and emit output (key, value) pairs on stdout.
Q22. Whats is Distributed Cache in Hadoop
Distributed Cache is a facility provided by the Map/Reduce framework to cache files (text, archives, jars and so on) needed by applications during execution of the job. The framework will copy the necessary files to the slave node before any tasks for the job are executed on that node.
Q23. What is the benifit of Distributed cache, why can we just have the file in HDFS and have the application read it 
This is because distributed cache is much faster. It copies the file to all trackers at the start of the job. Now if the task tracker runs 10 or 100 mappers or reducer, it will use the same copy of distributed cache. On the other hand, if you put code in file to read it from HDFS in the MR job then every mapper will try to access it from HDFS hence if a task tracker run 100 map jobs then it will try to read this file 100 times from HDFS. Also HDFS is not very efficient when used like this.

Q.24 What mechanism does Hadoop framework provides to synchronize changes made in Distribution Cache during runtime of the application 
This is a trick questions. There is no such mechanism. Distributed Cache by design is read only during the time of Job execution

Q25. Have you ever used Counters in Hadoop. Give us an example scenario
Anybody who claims to have worked on a Hadoop project is expected to use counters

Q26. Is it possible to provide multiple input to Hadoop? If yes then how can you give multiple directories as input to the Hadoop job 
Yes, The input format class provides methods to add multiple directories as input to a Hadoop job

Q27. Is it possible to have Hadoop job output in multiple directories. If yes then how 
Yes, by using Multiple Outputs class

Q28. What will a hadoop job do if you try to run it with an output directory that is already present? Will it
- overwrite it
- warn you and continue
- throw an exception and exit

The hadoop job will throw an exception and exit.

Q29. How can you set an arbitary number of mappers to be created for a job in Hadoop 
This is a trick question. You cannot set it

Q30. How can you set an arbitary number of reducers to be created for a job in Hadoop 
You can either do it progamatically by using method setNumReduceTasksin the JobConfclass or set it up as a configuration setting

 

Src:http://xsh8637.blog.163.com/blog/#m=0&t=1&c=fks_084065087084081065083083087095086082081074093080080069

posted @ 2013-03-18 13:03 鑫龙 阅读(1449) | 评论 (0)编辑 收藏

基于Hadoop Sequencefile的小文件解决方案

 

一、 概述

   小文件是指文件size小于HDFSblock大小的文件。这样的文件会给hadoop的扩展性和性能带来严重问题。首先,在HDFS中,任何block,文件或者目录在内存中均以对象的形式存储,每个对象约占150byte,如果有1000 0000个小文件,每个文件占用一个block,则namenode大约需要2G空间。如果存储1亿个文件,则namenode需要20G空间。这样namenode内存容量严重制约了集群的扩展。 其次,访问大量小文件速度远远小于访问几个大文件。HDFS最初是为流式访问大文件开发的,如果访问大量小文件,需要不断的从一个datanode跳到另一个datanode,严重影响性能。最后,处理大量小文件速度远远小于处理同等大小的大文件的速度。每一个小文件要占用一个slot,而task启动将耗费大量时间甚至大部分时间都耗费在启动task和释放task上。

二、Hadoop自带的解决方案

对于小文件问题,Hadoop本身也提供了几个解决方案,分别为:Hadoop ArchiveSequence fileCombineFileInputFormat

1 Hadoop Archive

Hadoop Archive或者HAR,是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样在减少namenode内存使用的同时,仍然允许对文件进行透明的访问。

使用HAR时需要两点,第一,对小文件进行存档后,原文件并不会自动被删除,需要用户自己删除;第二,创建HAR文件的过程实际上是在运行一个mapreduce作业,因而需要有一个hadoop集群运行此命令。

该方案需人工进行维护,适用管理人员的操作,而且har文件一旦创建,Archives便不可改变,不能应用于多用户的互联网操作。

2 Sequence file

sequence file由一系列的二进制key/value组成,如果为key小文件名,value为文件内容,则可以将大批小文件合并成一个大文件。

Hadoop-0.21.0中提供了SequenceFile,包括WriterReaderSequenceFileSorter类进行写,读和排序操作。如果hadoop版本低于0.21.0的版本,实现方法可参见[3]

 

该方案对于小文件的存取都比较自由,不限制用户和文件的多少,但是SequenceFile文件不能追加写入,适用于一次性写入大量小文件的操作。

 

3CombineFileInputFormat

CombineFileInputFormat是一种新的inputformat,用于将多个文件合并成一个单独的split,另外,它会考虑数据的存储位置。

该方案版本比较老,网上资料甚少,从资料来看应该没有第二种方案好。

 

 

三、小文件问题解决方案

在原有HDFS基础上添加一个小文件处理模块,具体操作流程如下:

       1.   当用户上传文件时,判断该文件是否属于小文件,如果是,则交给小文件处理模块处理,否则,交给通用文件处理模块处理。

       2.  在小文件模块中开启一定时任务,其主要功能是当模块中文件总size大于HDFSblock大小的文件时,则通过SequenceFile组件以文件名做key,相应的文件内容为value将这些小文件一次性写入hdfs模块。

       3. 同时删除已处理的文件,并将结果写入数据库。

       4.  当用户进行读取操作时,可根据数据库中的结果标志来读取文件。


转自:http://lxm63972012.iteye.com/blog/1429011

posted @ 2013-03-04 19:28 鑫龙 阅读(791) | 评论 (0)编辑 收藏

jar -cvf xxx.jar .
hadopp jar xxx.jar clalss-name [input] [output]
----------------------------------------------------------------------
hadoop jar hadoop-0.20.2-examples.jar [class name]的实质是:
1.利用hadoop这个脚本启动一个jvm进程;
2.jvm进程去运行org.apache.hadoop.util.RunJar这个java类;
3.org.apache.hadoop.util.RunJar解压hadoop-0.20.2-examples.jar到hadoop.tmp.dir/hadoop-unjar*/目录下;
4.org.apache.hadoop.util.RunJar动态的加载并运行Main-Class或指定的Class;
5.Main-Class或指定的Class中设定Job的各项属性
6.提交job到JobTracker上并监视运行情况。
注意:以上都是在jobClient上执行的。
运行jar文件的时候,jar会被解压到hadoop.tmp.dir/hadoop-unjar*/目录下(如:/home/hadoop/hadoop-fs/dfs/temp/hadoop-unjar693919842639653083, 注意:这个目录是JobClient的目录,不是JobTracker的目录)。解压后的文件为:
drwxr-xr-x 2 hadoop hadoop 4096 Jul 30 15:40 META-INF
drwxr-xr-x 3 hadoop hadoop 4096 Jul 30 15:40 org
有图有真相:

提交job的实质是:
生成${job-id}/job.xml文件到hdfs://${mapred.system.dir}/(比如hdfs://bcn152:9990/home/hadoop/hadoop-fs/dfs/temp/mapred/system/job_201007301137_0012/job.xml),job的描述包括jar文件的路径,map|reduce类路径等等.
上传${job-id}/job.jar文件到hdfs://${mapred.system.dir}/(比如hdfs://bcn152:9990/home/hadoop/hadoop-fs/dfs/temp/mapred/system/job_201007301137_0012/job.jar)
有图有真相:

生成job之后,通过static JobClient.runJob()就会向jobTracker提交job:
JobClient jc = new JobClient(job);
RunningJob rj = jc.submitJob(job);
之后JobTracker就会调度此job,
提交job之后,使用下面的代码获取job的进度:
    try {
      if (!jc.monitorAndPrintJob(job, rj)) {
        throw new IOException("Job failed!");
      }
    } catch (InterruptedException ie) {
      Thread.currentThread().interrupt();
    }


posted @ 2013-03-02 17:28 鑫龙 阅读(3961) | 评论 (0)编辑 收藏

     摘要: 译自http://www.wangafu.net/~nickm/libevent-book/Ref8_listener.htmlevconnlistener机制提供了监听和接受TCP连接的方法。本章的所有函数和类型都在event2/listener.h中声明,除非特别说明,它们都在2.0.2-alpha版本中首次出现。1 创建和释放evconnlistener接口Code highlig...  阅读全文

posted @ 2013-02-07 10:48 鑫龙 阅读(6452) | 评论 (0)编辑 收藏

     摘要: 译自http://www.wangafu.net/~nickm/libevent-book/Ref7_evbuffer.htmllibevent的evbuffer实现了为向后面添加数据和从前面移除数据而优化的字节队列。evbuffer用于处理缓冲网络IO的“缓冲”部分。它不提供调度IO或者当IO就绪时触发IO的功能:这是bufferevent的工作。除非特别说明,本章描述的...  阅读全文

posted @ 2013-02-05 18:52 鑫龙 阅读(6017) | 评论 (0)编辑 收藏

     摘要: 译自http://www.wangafu.net/~nickm/libevent-book/Ref6a_advanced_bufferevents.html 本章描述bufferevent的一些对通常使用不必要的高级特征。如果只想学习如何使用bufferevent,可以跳过这一章,直接阅读下一章。1 成对的bufferevent有时候网络程序需要与自身通信。比如说,通过某些协议...  阅读全文

posted @ 2013-02-05 17:39 鑫龙 阅读(6676) | 评论 (0)编辑 收藏

     摘要: 很多时候,除了响应事件之外,应用还希望做一定的数据缓冲。比如说,写入数据的时候,通常的运行模式是:l 决定要向连接写入一些数据,把数据放入到缓冲区中l 等待连接可以写入l 写入尽量多的数据l 记住写入了多少数据,如果还有更多数据要写入,等待连接再次可以写入这种缓冲IO模式很通用,libevent为此提供了一种通用机制,即bufferevent。buffere...  阅读全文

posted @ 2013-02-05 14:38 鑫龙 阅读(9859) | 评论 (1)编辑 收藏

仅列出标题
共20页: First 2 3 4 5 6 7 8 9 10 Last