频道栏目
IT货架 > > 正文
dbms_lock统制串行详解 1          使用dbms_lock包控制串行 FtpClient 种来操作FTP的上传和下载,带进度条 Hadoop入门1:Hadoop简介
网友分享于:Jun 12, 2018 10:44:04 PM    来源: IT货架   
dbms_lock控制串行详解

1          使用dbms_lock包控制串行

    在pl/sql代码块中,有些操作代码块不能被多个会话同时进行执行,比如生成中间数据表(如先清除,后插入中间数据),

并且此表的数据在后续业务处理总需要使用,如果此部分代码块被另个会话调用,则会造成中间数据表的数据在同一个会话中不完整。

因此当有类似这样的需求时,就可能需要在pl/sql块中使用dbms_lock包控制来控制此部分代码块只能进行串行调用。

1.1         锁定原理

1.1.1        用户锁概念

    1、通过dbms_lock获取的锁类型可以看成是oracle内部的一种队列锁,用户申请时通过指定的锁ID或通过指定需要锁定的名称

(此时会返回一个锁ID给用户)来获取锁定并独占此ID代表的信号量,从而达到控制并发的,因此称为用户锁(PL/SQL用户锁)

    2、用户锁有别于物理概念上的锁(如DML lockdata lock),DDL lockdictionary lock)internal lock/latch),可以看成是逻辑上的一种锁定,

且他们两者之间也是不相冲突的。比如如果会话1基于某个物理表的表明定义了一用户锁,此时会话2实际上是可以对此物理表做任何DDL,DML操作的。

1.1.2        锁定模式

    用户申请锁时可以指定锁定模式(默认为x_mode),这里的锁定模式逻辑上可以对应到TM锁中的模式,有以下六种模式。

锁定模式

说明

TM锁中的模式

 

nl_mode

nl锁定模式

Null

并发度从上到下依次减小

ss_mode

subshared锁定模式

row share(RS)

sx_mode

subexclusive锁定模式

row exclusive(RX)

s_mode

shared锁定模式

share mode (S)

ssx_mode

subshared exclusive锁定模式

share row exclusive mode(SRX)

x_mode

exclusive锁定模式

exclusive mode (X)

 

1.1.2.1       锁定模式与并发度

    需要注意的是,用户锁的锁定模式仅仅来用控制当其他会话试图获取自身会话所占有的锁ID(信号量)时,获取操作是成功、阻塞,

还是失败(如果没有指定阻塞时间或超时)。

关系

其他会话试图以某种锁定模式获取会话1所申请的用户锁时

NL

SS

SX

S

SSX

X

会话1以某种锁

定模

锁定

NL

SUCC

SUCC

SUCC

SUCC

SUCC

SUCC

SS

SUCC

SUCC

SUCC

SUCC

SUCC

fail

SX

SUCC

SUCC

SUCC

fail

fail

fail

S

SUCC

SUCC

fail

SUCC

fail

fail

SSX

SUCC

SUCC

fail

fail

fail

fail

X

SUCC

fail

fail

fail

fail

fail

 

1.1.3        查看

    SELECT * FROM v$lock where type='UL';

1.2         申请用户锁

    申请用户锁时有两种方式,分别通过指定的锁ID或通过指定锁名称(此时会返回一个锁ID给用户)来获取锁定并独占此ID,其返 值为integer, 义如下,其中14代表申请成功:

0 申请锁定成功
  1 申请锁定时超时
  2 申请锁定时发生死锁
  3 传入参数错误
  4 已经获得了锁定,重复申请了锁
  5 传入的锁定句柄错误

 

1.2.1        通过指定锁ID

    此种方式不建议,因实际中如果指定的锁ID不一样,是无法达到对代码进行串行调用控制的,且指定的锁ID可能会与其他不相关业务冲突,从而造成没必要的并发控制。指定的锁ID需要位于0 到的1073741823区间。

1.2.1.1       api

    dbms_lock中提供了函数request用来通过指定锁ID申请用户锁,

  function  request(id in integer,
                    lockmode in integer default x_mode,
                    timeout in integer default maxwait,
                    release_on_commit in boolean default FALSE)
    return integer;

1.2.1.2       实例

DECLARE
  v_def_lock_id    INTEGER;
  v_request_status INTEGER; --返回申请标识
  v_sid            NUMBER;
  v_relase_status  INTEGER;
BEGIN
  v_def_lock_id    := 100; --指定申请id为100的用户锁
  v_request_status := DBMS_LOCK.request(v_def_lock_id,
                                        DBMS_LOCK.ssx_mode, --锁定模式为5
                                        100, --最多等待时间
                                        release_on_commit => FALSE);
  --会话提交时也不释放release_on_commit=false,此时只有等待会话显示释放或会话结束后自动释放
  DBMS_OUTPUT.put_line('request_status=' || v_request_status);
  IF v_request_status IN (0, 4) THEN
    SELECT SID
      INTO v_sid
      FROM v$lock
     WHERE TYPE = 'UL'
       AND ID1 = v_def_lock_id;
    DBMS_OUTPUT.put_line('current session id=' || v_sid ||
                         ',request sucess');
  ELSE
    DBMS_OUTPUT.put_line('request fail');
  END IF;
  --这里暂且不释放,实际中一定要释放,且最好在异常代码中重复释放
 /* v_relase_status := DBMS_LOCK.release(v_def_lock_id);
EXCEPTION
  WHEN OTHERS THEN
    v_relase_status := DBMS_LOCK.release(v_def_lock_id);
    RAISE;*/
END;

  

1.2.2        通过指定需要锁定的名称

    通过锁名称申请用户锁时相对更常用,常用的场景为指定一个表名称,名称区分大小写,不能以ORA$开头,最大支持128bytes

1.2.2.1       api

    1、首先调用procedure allocate_unique(lockname in varchar2,
               lockhandle out varchar2,
               expiration_secs in integer default
864000);

获取生成的lockid (相同的lockname,expiration_secs时间范围内生成的lockid总是相同,大小为[1073741824,1999999999])

    2、然后调用另一个重载的函数request用来通过lockid申请用户锁。

 function  request(lockhandle in varchar2,
                    lockmode in integer default x_mode,
                    timeout in integer default maxwait,
                    release_on_commit in boolean default FALSE)
    return integer;

1.2.2.2       实例

DECLARE
  v_def_lock_name      VARCHAR2(30);
  v_requset_lockhandle VARCHAR2(100);
  v_request_status     INTEGER; --返回申请标识
  v_sid                NUMBER;
  v_relase_status      INTEGER;
BEGIN
  v_def_lock_name := 'TEST_TABLE'; --指定申请名称为TEST_TABLE的用户锁
  DBMS_LOCK.ALLOCATE_UNIQUE(v_def_lock_name, v_requset_lockhandle, 30);
  DBMS_OUTPUT.put_line('lockhandle=' || v_requset_lockhandle);
  v_request_status := DBMS_LOCK.request(v_requset_lockhandle,
                                        DBMS_LOCK.ssx_mode, --锁定模式为5
                                        100, --最多等待时间
                                        release_on_commit => FALSE);
  --会话提交时也不释放release_on_commit=false,此时只有等待会话显示释放或会话结束后自动释放
  DBMS_OUTPUT.put_line('request_status=' || v_request_status);
  IF v_request_status IN (0, 4) THEN
    DBMS_OUTPUT.put_line('current session id' || ',request sucess');
  ELSE
    DBMS_OUTPUT.put_line('request fail');
  END IF;
  --这里暂且不释放,实际中一定要释放,且最好在异常代码中重复释放
  /* v_relase_status := DBMS_LOCK.release(v_requset_lockhandle);
  EXCEPTION
    WHEN OTHERS THEN
      v_relase_status := DBMS_LOCK.release(v_requset_lockhandle);
      RAISE;*/
END;

  

 

 

 

1.3         释放用户锁

如果在申请锁时为定义release_on_commit=true, 会话提交时也不释放此锁,此时只有会话必须显示释放,否则只能等待会话结束后由数据库自动清理自动清理,需要注意的时,最好在业务代码退出中捕获异常的代码中也显示调用释放用户锁代码。

相对于前面的两个申请方式,oracle中有两个对应的释放方法:

1.3.1        对应锁ID的释放方法

    当以锁ID申请用户锁时,对应的方法为function release(id in integer) return integer; 入参为锁ID,返回值为04,说明释放成功。

返回值说明:

0 – 成功

 3 – 参数错误
         4 –
当前会话不再拥有指定的锁

1.3.2        对应名称的释放方法

   当以名称指定申请用户锁时,对应的方法为function release(lockhandle in varchar2) return integer;,其中入参为allocate_unique过程产生的lockid,返回值为04,说明释放成功。

返回值说明:

   0 – 成功

   3 – 参数错误
    4 –
当前会话不再拥有指定的锁
    5 –
不合法的lockhandle

1.4         转换用户锁模式

如果在会话中以某种锁定模式获得锁后,如果对锁定模式升级或降级时,以应对并发度的减少或增加。则需要用到所动模式转换

相对于前面的两个申请方式,oracle中有两个对应的释放方法:

1.4.1        对应锁ID的转换方法

当以锁ID申请用户锁时,对应的方法为function convert(id in integer,
                   lockmode in integer,
                   timeout in number default maxwait)
    return integer;
入参为锁ID,新的锁定模式,最大等待时长(s,返回值为0,说明释放成功。

   返回值说明:

0 – 成功

2 –deadlock

 3 – 参数错误
        4 –
当前会话不再拥有指定的锁

1.4.2        对应名称的转换方法

 当以名称指定申请用户锁时,对应的方法为function convert(lockhandle in varchar2, lockmode in integer,  timeout in number default maxwait)
    return integer;
,其中入参为allocate_unique过程产生的lockid,新的锁定模式,最大等待时长(s,返回值为0,说明转换成功。

   返回值说明:

0 – 成功

2 –deadlock

3 – 参数错误
          4 –
当前会话不再拥有指定的锁
          5 –
不合法的lockhandle

1.5         当前会话睡眠

   dbms_lock中提供了过程

procedure sleep(seconds in number);入参为睡眠时长(s),用来显示指定当前会话阻塞时长。

 

1.6         全局测试实例

1.6.1        准备函数

这里暂且以名称申请用户锁来作为测试例子,(以锁ID进行申函数大体类似)

1.6.1.1       创建申请锁的函数

 

CREATE OR REPLACE FUNCTION f_request_lock(v_lock_name          VARCHAR2,
                                          v_requset_lockhandle OUT VARCHAR2, --generate lockid
                                          v_lock_mode          INTEGER DEFAULT dbms_lock.x_mode, --锁定模式                                          v_requst_time_out    INTEGER DEFAULT dbms_lock.maxwait,
                                          --请求超时时间(最多等待时间s)
                                          v_release_on_commit BOOLEAN DEFAULT FALSE)
  RETURN BOOLEAN IS
  v_request_status INTEGER DEFAULT - 1; 
  v_request_flag   BOOLEAN := FALSE;
  v_sid            NUMBER;
BEGIN
  DBMS_LOCK.ALLOCATE_UNIQUE(v_lock_name, v_requset_lockhandle);
  DBMS_OUTPUT.put_line('lockhandle=' || v_requset_lockhandle);
  v_request_status := DBMS_LOCK.request(v_requset_lockhandle,
                                        v_lock_mode,
                                        v_requst_time_out,
                                        v_release_on_commit);

  DBMS_OUTPUT.put_line('request_status=' || v_request_status);
  IF v_request_status IN (0, 4) THEN
    DBMS_OUTPUT.put_line('request sucess');
    v_request_flag:=true;
  ELSE
    DBMS_OUTPUT.put_line('request fail');
  END IF;
  RETURN v_request_flag;
END;

  

 

1.6.1.2       创建转换锁的函数

CREATE OR REPLACE FUNCTION f_convert_lock(v_lock_handle VARCHAR2,
                                          ----generate lockid
                                          v_dest_lock_mode   INTEGER DEFAULT dbms_lock.x_mode, --目标转换模式
                                          v_convert_time_out INTEGER DEFAULT dbms_lock.maxwait
                                          --转换超时时间(最多等待时间s)
                                          ) RETURN BOOLEAN IS
  v_convert_status INTEGER;
  v_convert_flag   BOOLEAN := FALSE;
BEGIN
  v_convert_status := DBMS_LOCK.convert(v_lock_handle,
                                        v_dest_lock_mode,
                                        v_convert_time_out);
  DBMS_OUTPUT.put_line('convert_status=' || v_convert_status);
  IF v_convert_status = 0 THEN
    v_convert_flag := TRUE;
  END IF;
  RETURN v_convert_flag;
END;

 

1.6.1.3       创建释放锁的函数

 

CREATE OR REPLACE FUNCTION f_release_lock(v_lock_handle VARCHAR2
                                          ----generate lockid
                                          ) RETURN BOOLEAN IS
  v_release_status INTEGER;
  v_release_flag   BOOLEAN := FALSE;
BEGIN
  v_release_status := DBMS_LOCK.release(v_lock_handle);
  DBMS_OUTPUT.put_line('release_status=' || v_release_status);
  IF v_release_status IN (0, 4) THEN
    v_release_flag := TRUE;
  END IF;
  RETURN v_release_flag;
EXCEPTION
  WHEN OTHERS THEN
    v_release_status := DBMS_LOCK.release(v_lock_handle);
    RAISE;
END;

  

 

 

1.6.2        测试申请、转换、睡眠、释放

1.6.2.1       测试代码

DECLARE
  v_requset_lockhandle VARCHAR2(100); ----generate lockid
  v_lock_name          VARCHAR2(30);
  v_flag               BOOLEAN;
BEGIN
  v_lock_name := 'TEST_TABLE'; --测试基于表TEST_TABLE申请、转换、释放用户锁
  --1、以名称申请用户锁,锁定模式为dbms_lock.ssx_mode
  v_flag := f_request_lock(v_lock_name,
                           v_requset_lockhandle,
                           dbms_lock.ssx_mode);
  IF v_flag THEN
    DBMS_OUTPUT.put_line('--------request_flag=sucess------');
    --2、转换用户锁,目标锁定模式为dbms_lock.x_mode,即升级锁定类型
    v_flag := f_convert_lock(v_requset_lockhandle, dbms_lock.x_mode);
    IF v_flag THEN
      DBMS_OUTPUT.put_line('-----convert_flag=sucess---------');
    END IF;
  END IF;
  --3、测试,当前代码块睡眠5s
  dbms_output.put_line('time before sleep=' || to_char(SYSDATE, 'hh24:mi:ss'));
  DBMS_LOCK.sleep(5);
  dbms_output.put_line('time after sleep=' || to_char(SYSDATE, 'hh24:mi:ss'));
  --4、释放用户锁----------------
  v_flag := f_release_lock(v_requset_lockhandle);
   IF v_flag THEN
      DBMS_OUTPUT.put_line('-----relase_flag=sucess--------');
    END IF;
EXCEPTION
  WHEN OTHERS THEN
    v_flag := f_release_lock(v_requset_lockhandle);
    RAISE;
END;

 

 

1.6.2.2       测试输出



 

 

 

1.6.3        测试控制代码并发调用

    测试指定业务操作的并发性控制:暂以两个会话调用同一代码为例,会话1称为A,会话2称为B

1.6.3.1       测试原则

FtpClient 种来操作FTP的上传和下载,带进度条

FtpClient 类来操作FTP的上传和下载,带进度条

先给出FtpClient 类(还没有做完。明天继续)


再看界面调用这个类

 


 

FtpClient 类来操作FTP的上传和下载,带进度条

先给出FtpClient 类(还没有做完。明天继续)


再看界面调用这个类

 


 

Hadoop入门一:Hadoop简介
  从数据爆炸开始。。。

 1.1 第三次工业革命

     第一次:18世纪60年代,手工工厂向机器大生产过渡,以蒸汽机的发明和使用为标志。
     第二次:19世纪70年代,各种新技术新发明不断被应用于工业生产,以电力的发明使用为标志。
     第三次:20世界四五十年代末,以高新技术为代表的新科学技术革命,以原子能、航天技术和电子计算机
为标志。

1.2 信息技术发展带来的数据爆炸
  • 纽约证券所交易    每天 1TB
  • FaceBook一千亿照片  1PB 
  • 腾讯 每天 300TB
  • 淘宝 每天 pv20亿 数据量 50TB
  •  ......


1.3 数据存储与分析

问题:数据量指数增加,磁盘访问速度未能与时俱进
  • 1990 年 一个磁盘 1370MB 速度4.4MB/s 用时5分钟
  • 2010 年 一个磁盘 1TB  速度 100MB/s 用时两个半
分析:读一个很慢,那么可以同时读多个
  • 如果把1TB存储到100个磁盘,每个存储1%,并行读取,用时不到两分钟。
  • 如果一个我们有100个1TB数据集,100个1TB磁盘,那么我们以磁盘共享的方式把每个数据集分布到100个磁盘中,这样边会大大提高每个数据集的读取速率。
如果实现此类文件系统需要解决哪些问题?
  • 硬盘故障:因为文件系统有多个磁盘,那么任意一个磁盘发生故障的概率就变得很高。(采取数据备份)
  • 数据分析:某些分析任务需要结合大部分数据共同完成,那么我们的文件系统就要保证对来自多个数据源的数据进行分析的准确性。     
1.4 Hadoop-一个可靠的分布式共享存储和分析系统

简要介绍:Hadoop 是Apache基金会下一个开源的分布式计算平台,它以分布式文件系统HDFS和MapReduce算法为核心,为用户提供了系统底层细节透明的分布式基础架构。


     如上图Hadoop集群中有很多并行的机器来存储和分析数据,客户端把任务提交到集群,集群计算返回结果。
     
历史起源:Apache的子项目的子项目
     雄心勃勃的Doug Cutting:他先领导创立了Apache的项目Lucene,然后Lucene又衍生出子项目Nutch,Nutch又衍生了子项目Hadoop。Lucene是一个功能全面的文本搜索和查询库,Nutch目标就是要视图以Lucene为核心建立一个完整的搜索引擎,并且能达到提到Google商业搜索引擎的目标。网络搜索引擎和基本文档搜索区别就在规模上,Lucene目标是索引数百万文档,而Nutch应该能处理数十亿的网页。因此Nutch就面临了一个极大的挑战,即在Nutch中建立一个层,来负责分布式处理、冗余、故障恢复及负载均衡等等一系列问题。。。
     曙光的到来:2004年,Google发表了两篇论文来论述Google文件系统(GFS)和MapReduce框架,并且使用了这两项技术来拓展自己的搜索系统,于是Doug Cutting看到了这两篇论文的价值并带领他的团队便实现了这个框架,并将Nutch移植上去,于是Nutch的可扩展性得到极大的提高。
     Hadoop的诞生:Doug Cutting逐渐认识到急需要成立一个专门的项目来充实这两种技术,于是就诞生了Hadoop。
     2006年1月,雅虎雇佣Doug Cutting,并让他和一个专门的团队来一起改进Hadoop,并将其作为一个开源项目。
     2008年2月19日,雅虎正式宣布,其索引网页的生产系统采用的就是在10000多个核的Linux系统上运行的Hadoop。
     于是,Hadoop真正达到了互联网级。。。
      ps:关于Doug Cutting它三个项目的名字由来,这个人很有意思,三个项目的名字都来源于他家庭,Lucene是他妻子的中间名也是她外祖母的名字,他儿子在很小的时候总是把吃饭的词叫做Nutch,后来,他又把一个黄色大象毛绒玩具叫做Hadoop,这样大家就明白了为何好多关于Hadoop的资料中都能看到个黄色的大象。
     
优点:Hadoop是一个开源框架,可编写和运行分布式应用来处理大规模数据,分布式计算是一个不断变化且宽泛的领域,优点如下:
     1.易用性。Hadoop运行在由一般商用机器构成的大型集群上。
     2.可靠性。Hadoop致力于一般商用机器上,其架构假设硬件会频繁出现失效,它可以从容处理大多数此类故障。
     3.可扩展。Hadoop通过增加集群节点,可以线性地拓展以处理更大数据集。
     4.简单。Hadoop允许用户快速的编写出高效地并行代码。

了解分布式系统和Hadoop:

     摩尔定律: 当价格不变时,集成电路上可容纳的晶体管 数目,约每隔18个月便会增加一倍,性能也将提升一倍,但解决大规模数据计算问题不能单纯依赖制造越来越大型的服务器。
      分布式系统(向外拓展scale-out)与大型服务器(向上拓展scale-up),从IO性价比层面分析:
     一个四IO通道的高端机,每个通道的吞吐量各为100MB/sec,读取4TB数据也要接近3小时,而用Hadoop,同样的数据被划分为较小的块(通常为64MB),通过HDFS分不到群内的多台计算机上,集群可以并行存取数据,这样,一组通用的计算机比一台高端机要便宜。
     Hadoop与其它分布式系统比较,如SETI@home,它的一台中央服务器存储了来自太空的无线电信号,并把这些信号数据发给分布在世界各地的客户端计算,再将计算返回的结果存储起来。
     Hadoop对待数据的理念与其不同。SETI@home需要服务器和客户端重复地传输数据,这种方式在处理密集数据时,会使得数据迁移变得十分困难。而Hadoop则强调把代码向数据迁移,即Hadoop集群中既包含数据又包含运算环境,并且尽可能让一段数据的计算发生在同一台机器上,代码比数据更加容易移动,Hadoop的设计理念即是把要执行的计算代码移动到数据所在的机器上去。
 
比较Hadoop和SQL数据库

      从总体上看,现在大多数数据应用处理的主力是关系型数据库,即SQL面向的是结构化的数据,而Hadoop则针对的是非结构化的数据,从这一角度看,Hadoop提供了对数据处理的一种更为通用的方式。
    下面,我们从特定的视角将Hadoop与SQL数据库做详细比较:
      1. 用scale-out代替scale-up
             拓展商用服务器的代价是非常昂贵的。要运行一个更大的数据库,就要一个更大的服务器,事实上,各服务器厂商往往会把其昂贵的高端机标称为“数据库级服务器”,不过有时候有可能需要处理更大的数据集,但却找不到更大的机器,而更为重要的是,高端机对于许多应用并不经济。
      2.用键值对代替关系表
              关系型数据库需要将数据按照某种模式存放到具有关系型数据结构表中,但是许多当前的数据模型并不能很好的适应这些模型,如文本、图片、xml等,此外,大型数据集往往是非结构化或半结构化的。而Hadoop以键值对作为最基本的数据单元,能够灵活的处理较少结构化的数据类型。
      3.用函数式编程(MapReduce)代替声明式查询(SQL)
              SQL从根本上说是一个高级声明式语言,它的手段是声明你想要的结果,并让数据库引擎判断如何获取数据。而在MapReduce程序中,实际的数据处理步骤是由你指定的。SQL使用查询语句,而MapReduce使用程序和脚本。MapReduce还可以建立复杂的数据统计模型,或者改变图像数据的处理格式。
      4.用离线批量处理代替在线处理
              Hadoop并不适合处理那种对几条记录读写的在线事务处理模式,而适合一次写入多次读取的数据需求。


1.5 理解MapReduce

     也许你知道管道和消息队列数据处理模型,管道有助于进程原语的重用,用已有模块的简单连接就可组成一个新的模块,消息队列则有助于进程原语的同步。
     同样,MapReduce也是一种数据处理模型 。它的最大的特点就是容易拓展到多个计算机节点上处理数据。在MapReduce中,原语通常被称作Mapper和Reducer。也许讲一个数据处理应用分解为一个Mapper和Reducer是非常繁琐的,但是一旦你写好了一个Mapreduce应用程序,仅需通过配置,就可将其拓展到集群的成百上千个节点上运行,这种简单的可拓展性使得Mapreduce吸引了大量程序员。

手动拓展一个简单单词计数程序

     统计一个单词的出现次数,单词只有一句话:"do as i say , not as i do"。如果文档很小,一段简单的代码即可实现,下面是一段伪代码:
               define wordCount as Multiset;
                   for each document in documentSet {
                        T = tokenize(document);
                        for each token in T {
                              wordCount[token]++;
                         }
                   }
                  display(wordCount);

     但是这个程序只适合处理少了文档,我们试着重写程序,使它可以分布在多个计算机上,每台机器处理文档的不同部分,在把这些机器处理的结果放到第二阶段,由第二阶段来合并第一阶段的结果。
     第一阶段要分布到多台机器上的代码为:

               defi ne wordCount as Multiset;
                   for each document in documentSet {
                        T = tokenize(document);
                        for each token in T {
                              wordCount[token]++;
                         }
                   }
                sendToSecondPhase(wordCount);
     第二阶段伪代码:
               defi ne totalWordCount as Multiset;
               for each wordCount received from firstPhase {
                        multisetAdd (totalWordCount, wordCount);
               }
     如果这么设计的话还有什么其他困难吗?一些细节可能会妨碍它按预期工作,
          1.如果数据集很大,中心存储服务器性能可能会跟不上,因此我们需要把文档分不到多台机器上存储。
          2.还有一个缺陷是wordcount被存放在内存当中,同样,如果数据集很大一个wordcount就有可能超过内存容量,因此我们不能将其放在内存中,我们需实现一个基于磁盘的散列表,其中当然涉及大量编码。
          3.第二阶段如果只有一台计算机,显然不太合理,若按照第一阶段的设计把第二阶段的任务也分布到多台计算机上呢?答案当然是可以的,但是我们必须将第一阶段的结果按某种方式分区,使其每个分区可以独立运行在第二阶段的各个计算机上。比如第二阶段的A计算机只统计以a开头的wordcount,计算机B统计wordcount-b分区,依次类推。
     现在这个单词统计程序正在变得复杂,为了使它能够运行在一个分布式计算机集群中,我们发现需要添加以下功能:
          1. 存储文件到多台计算机上
          2.编写一个基于磁盘的散列表,使其不受计算机内存限制
          3.划分来自第一阶段的中间数据
          4.洗牌第一阶段分区到第二阶段合适的计算机上
     仅仅这一个简单的小问题就需要考虑这么多细节处理,这就是我们为什么需要一个Hadoop框架,当我们用MapReduce模型编写程序时,Hadoop框架可以管理所有与可拓展性相关的底层问题。

相同程序在MapReduce中拓展

     Map和Reduce程序必须遵循以下键和值类型的约束
          1.应用的输入必须组织为一个键值对列表List<key1,value1>,输入格式不受约束,例如处理多个文件的输入格式可以使List<String filename,String fileContent>。
          2.含键值对的列表被拆分,进而通过调用Mapper的Map函数对每个键值对<K1,V1>进行处理,Mapper 转换每个<K1,V1>,并将其结果并入<K2,V2>。在上面例子中,Mapper转换成的是一个<String word,Integer count>的列表。
            3.所有Mapper的输出被聚合在一个巨大的<K2,V2>列表中,所有共享K2的对被组织在一起成为一个新的键值对列表<K2,List(V2)>,让reducer来处理每一个聚合起来的<K2,List(V2)>,并将处理转换成<K3,V3>,MapReduce框架自动搜索所有<K3,V3>并将其写入文件中。

1.6 运行第一个Hadoop程序——用Hadoop框架来统计单词

 在linux环境下配置Hadoop运行环境

首先安装JAVA JDK
     Hadoop需要1.6或更高版本
     1.到oracle官网下载Linux版java安装包(rpm包)
     2.查看是否已安装:java or java -version
     3.卸载老版本 rpm -e jdk
     4.安装jdk rpm -ivh jdk
     5.配置环境变量
     
下载一个Hadoop稳定版本
     
# mkdir /usr/hadoop
# cd /usr/hadoop/
# wget http://apache.mesi.com.ar/hadoop/common/hadoop-1.2.1/hadoop-1.2.1.tar.gz
# tar -xzf hadoop-1.2.1.tar.gz
# mv hadoop-1.2.1 hadoop
# cd /usr/hadoop/hadoop/ 
# bin/hadoop 
配置 Hadoop

First edit hadoop configuration files and make following changes.
 Edit core-site.xml

# vim conf/core-site.xml
#Add the following inside the configuration tag
<property>
    <name>fs.default.name</name>
    <value>hdfs://localhost:9000/</value>
</property>
<property>
    <name>dfs.permissions</name>
    <value>false</value>
</property>

 Edit hdfs-site.xml

# vim conf/hdfs-site.xml
# Add the following inside the configuration tag
<property>
        <name>dfs.data.dir</name>
        <value>/opt/hadoop/hadoop/dfs/name/data</value>
        <final>true</final>
</property>
<property>
        <name>dfs.name.dir</name>
        <value>/opt/hadoop/hadoop/dfs/name</value>
        <final>true</final>
</property>
<property>
        <name>dfs.replication</name>
        <value>2</value>
</property>

 Edit mapred-site.xml

# vim conf/mapred-site.xml
# Add the following inside the configuration tag
<property>
        <name>mapred.job.tracker</name>
        <value>localhost:9001</value>
</property>

 Edit hadoop-env.sh

# vim conf/hadoop-env.sh
export JAVA_HOME=/opt/jdk1.7.0_17
export HADOOP_OPTS=-Djava.net.preferIPv4Stack=true

Set JAVA_HOME path as per your system configuration for java.

Next to format Name Node

$ cd /usr/hadoop/hadoop
$ bin/hadoop namenode -format 
启动 Hadoop Services

Use the following command to start all hadoop services.

$ bin/start-all.sh    

输入网址查看一下效果:

     http://localhost:50030(MapReduce 的web页面)
     http://localhost:50070(HDFS 的web页面)

1.7 Setting up SSH for a Hadoop cluster

       安装Hadoop集群时需要一个服务器做主节点,常驻NameNode和JobTracker进程,也将作为一个基站,负责联络并激活从节点上的DataNode和TaskTracker守护进程。因此我们需要为主节点定制一种手段,使他可以访问到集群中的每个节点。
     为此,Hadoop使用无口令的SSH协议。SSH采用标准公钥加密来生成一对用户验证秘钥,私钥存储到主节点,公钥存储到从节点。
     
定义一个公共账号
     我们所从一个节点访问另一个节点的说法一直被作为通用语言使用,其实更具体的说法应该是从一个账户访问另一个账户。对于在Hadoop集群中的所有节点都应该有相同的账户,为了安全考虑,建议把这些账户级别设置为用户级别。

验证SSH安装
     使用Linux的which命令
          which  ssh
          which  sshd
          which  ssh-keygen
生成ssh秘钥对
     
未完待续。。。
     


              


     
     
     

      

广告服务联系QQ:1134687142 | 网站地图

版权所有: IT货架- 内容来自互联网,仅供用于技术学习,请遵循相关法律法规. 京ICP备11030978号-1