Catalog table

之前提到过HBase默认有两个namespace,其中一个是hbase(系统空间),一个是default(默认用户空间)。catalog table就是hbase:meta表,在系统namespace里,其本质上也是一张Hbase table。

在HBase 0.96前,是没有hbase:meta表的,而是由-ROOT-表和.META.表共同承担其功能。在0.96版本后,-ROOT-表的寻址功能被zookeeper替代,.META.表改名为hbase:meta。

这组功能对应bigtable论文里的tablet location。

该表的rowkey是:{[table “” not found /]
, [region start key], [region_id]},这里应该是rowkey,而不是key,即会与column、version共同构成key。只有一个column family:info。初始情况有3个qualifier:regioninfo, server, serverstartcode。在region split的过程中,会产生两个临时的column:info:splitA, info:splitB,其value也会存储regioninfo信息;完成split后,这两个column会被删除掉。

Client

client端至少需要配置zookeeper的地址,以便通过zk找到hbase:meta表,以及master等信息。基于zookeeper,client也能及时得知region的变化。对于最精彩使用的Put、Get、Scan等数据操作,client是自己根据hbase:meta寻址RegionServers,并直接与其进行通信,在此过程中无需Master的配合。

TODO:在region变化与client获悉变化间,如果发起请求,会怎样呢?bigtable上是会通过一套机制来保证重新寻址并成功请求的。

在Java版的client里,使用开销较大的ConnectFactory和轻量级的Table、Admin、RegionLocator对象与HBase集群进行通信,需要注意的是在一个线程/进程里,ConnectionFactory最好是复用的,而Table等可以随时生成和销毁。并且,需要特别关注的是,Table是非线程安全的,所以多个线程不要共享该对象。也提供了线程池HConnection,进一步降低开销。

client也提供了writebuf功能,但只针对put有效,delete是不过buf直接发送的。

HMaster

HBase architecture follows the traditional master slave model where you have a master which takes decisions and one or more slaves which does the real task.

可以想象一下,在一个分布式的主从集群里,有哪些事情需要master参与呢?列举一些我能够想到的,一定不全。

集群事务:

  1. scheme的更改,包括column family的参数配置
  2. 对regionserver的调度(增删regionserver、region到regionserver的分配、负载均衡等)
  3. 集群运行状态的汇总和展示

master本身:master自己是一个单点,所以必然需要采取主备。HMaster通过zk的临时锁实现竞争机制。

在HMaster的设计中,功能性需求与非功能性需求并重。

其中External Interfaces既有设计所需的与RegionServer、Client通信的RPC接口,也有用于监控等所需的Info Svr,以便让使用者可以窥其运行状态并及时跟进。

Execute Services是实际功能的实现者,其与Interface等(也许execute过程中还会产生子event)配合,通过队列接受其提交(post)的events。每种事件的handler threads是隔离的,有独立的threads pool。采用这种设计方式的原因,猜测有以下几种:

  1. 通过队列提升吞吐量,在工作线程无法及时响应的时候,可以通过排队进行缓冲
  2. 通过队列的消息交互,实现解耦,使execute services可以透明的进行扩展,而interface等模块基本不用或很少量的修改
  3. 独立的threads pool,也使功能与功能间互不干扰,并可按需分别扩容
  4. 其他我没有想到的优点

HMaster的很多行为依赖Zookeeper,所以对其进行了封装。

可以看到,这里使用了Zookeeper Watch使用了proxy(代理)模式,Active Master Manager等使用了manager(管理器)模式,Region Server Tracker使用了tracker(跟踪器)模式。

其中,Draining server tracker类似于nginx的graceful restart,即admin可以调度server使其不再接受新的connection,而在当前所有conn处理完毕后,予以关闭或重启。

TODO:root region是指-ROOT吗?还是hbase:meta呢?root region server应该是root region所在的server。

File System Interfaces的Log Cleaner,继承自base chore定时运行。一般有TimeToLiveLogCleaner、ReplicationLogCleaner。对应Master里的哪些Log呢?以及HFile Cleaner,hfile不都是存储在tablet节点上的吗?

 RegionServer

HRegionServer里,有一些后台线程,执行CompactSplitThread、MajorCompactionThread、MemStoreFlusher、LogRoller,其中CompactSplit指的是minor compactions,LogRoller处理的是WAL日志。

coprocessor(协程)

引入coprocessor的原因,这段话描述的很清楚:

HBase has very effective MapReduce integration for distributed computation over data stored within its tables, but in many cases – for example simple additive or aggregating operations like summing, counting, and the like – pushing the computation up to the server where it can operate on the data directly without communication overheads can give a dramatic performance improvement over HBase’s already good scanning performance.

究其思想,就是把非cpu密集型的简单计算,例如sum、count等,推到hbase server端进行,从而降低网络IO,以得到极大的性能提升。coprocessor就是为了给hbase的client提供这个扩展功能的,其初衷是通过coprocessor,使HBase可以方便的支持secondary indexing、complex filtering and access control。

值得关注的一个区别是,HBase的实现里,coprocessor的user code是在regionservers或masterserver的进程空间里运行的,而Google的实现里,它们是运行在同一台服务器但不同的进程空间里。而HBase也想往Google模式迁移。这是为什么呢?我猜测一个原因是,后者相当于提供了一个沙箱环境,user code的崩溃,理论上不会影响到HBase主体进程。但后者的实现复杂度可能也要高于前者(这个是看代码设计功底了,也不一定)

In order to support sufficient flexibility for potential coprocessor behaviors, two different aspects of extension are provided by the framework. One is the observer, which are like triggers in conventional databases, and the other is the endpoint, dynamic RPC endpoints that resemble stored procedures.

HBase coprocessor提供了两种扩展方式:observer和endpoints。前者类似trigger,后者类似stored procedures。

当前有3种observers:

  • regionObserver:运行在regionserver上,可以hook到数据的增删改查里。
  • WALObserver:也是运行在regionServer上,但是作用于write-ahead-log的写入或重放里(reconstruction)
  • masterObserver:运行在hmaster上,可以hook到table schema的变化里。

observer大多提供成对的preXXX和postXXX方法,分别作用于某事件发生前后,几乎每一种HBase相关的事件都可以被hook。同一事件上注册的不同hook间,先执行的(高优先级的)可以通过exception的方式,停止后面待执行的(低优先级的)hooks。

不同于Observer,仅由事件触发,运行在server端;endpoints需要 client和server端通过RPC交互调用,并协同工作。一般涉及多个regions(注意:不是region server,而是region)的操作,会先在每个region上进行计算(类似map),并将所有region计算的结果分别返回给client,由client进行聚合(类似reduce)。在HBase的发行版里,应该已有一些集成的endpoints,例如org.apache.hadoop.hbase.coprocessor.AggregateImplementation。

Due to Java’s lack of multiple inheritance this required extension plus base code to be refactored into a single class providing the full implementation, which quickly becomes brittle when considering multiple extensions. Who inherits from whom? Coprocessors allow a much more flexible mixin extension model.

上面这段话也很精彩,对于设计模式的实际抉择,以及语言层面的分析。(TODO

 Block Cache

毫无疑问,cache是提高HBase吞吐和性能的关键!HRegion有两种block cache:onheap LruBlockCache和BucketCache,后者在hbase-0.98.6之后才可以使用。在region server的 UI界面里,可以看到相关配置和使用情况,以我们一个offline环境为例:

usedHeapMB=842, maxHeapMB=4075, blockCacheSizeMB=13.33, blockCacheFreeMB=1005.49, blockCacheCount=76, blockCacheHitCount=1392277, blockCacheMissCount=2192269514, blockCacheEvictedCount=72270, blockCacheHitRatio=0%, blockCacheHitCachingRatio=94%, hdfsBlocksLocalityIndex=84

简单理解两者的区别是,onheap LruBlockCache是基于Java heap的,即被cache的数据都是在java进程的内存空间里,受Java GC机制的管理。而BucketCache,有onheap和offheap两种modes,默认和一般情况都是用offheap的。region加载初始,bucketCache的查询速度会慢于LruBlockCache,但后者受java GC机制的制约,随着GC次数和内存碎片的增加,性能波动会比较大。所以,给出的结论是:

Also see Comparing BlockCache Deploys which finds that if your dataset fits inside your LruBlockCache deploy, use it , otherwise if you are experiencing cache churn (or you want your cache to exist beyond the vagaries of java GC), use BucketCache.

我的理解是,如果内存足够大,很少需要GC,那么可以使用LRU。否则,如果经常遇到内存使用率的波动(代表经常GC),或者不希望由GC来管理cache的话,那就使用BucketCache。

在实现层面,BucketCache其实是一个两层cache结构。L2层才是真正的offheadp BucketCache,用来存储block data。而L1层就是LruBlockCache,存储了INDEX和BLOOM blocks。所以,它仍然是需要使用heap的(哪个程序不需要heap呢?),但由于索引和bloom blocks的量,相对data而言,是非常小的,所以内存用量也会大大降低,从而使GC频率降低。

LruBlockCache的实现原理

lruBlockCache区分了3个优先级:

  • Single access priority: The first time a block is loaded from HDFS it normally has this priority and it will be part of the first group to be considered during evictions. The advantage is that scanned blocks are more likely to get evicted than blocks that are getting more usage.
  • Mutli access priority: If a block in the previous priority group is accessed again, it upgrades to this priority. It is thus part of the second group considered during evictions.
  • In-memory access priority: If the block’s family was configured to be “in-memory”, it will be part of this priority disregarding the number of times it was accessed. Catalog tables are configured like this. This group is the last one considered during evictions.

这样的好处是,对于由于scan请求而加载的blocks,首先会被放到single access priority里,是最低优先级被内存维护的,也就是最先会被LRU evict掉的。原因是,从使用场景来看,scan blocks比较少被重复使用,一般是一次性的。而如果是多次被访问的blocks,说明是热点数据,那就放到multi access priority里,从而降低其被LRU的概率。Catalog table对应的blocks一般就会是multi access的。同时,由于HBase允许指定Column family是in-memory的,这些cf对应的blocks在被读取后,就会放到in-memory priority里,但需要注意的是,它们仅是降低被evict的可能性,在内存不足的情况下,仍然会被踢掉的。

通过下面的公式可以计算在一个HBase集群中,有多少内存可以用来作为Cache使用:

number of region servers * heap size * hfile.block.cache.size * 0.99

其含义其实就是,整个cluster里的机器数目 × 配置的每个机器的heap大小 × 配置的允许给cache使用的百分比 × 0.99。其中 heapsize的大小在 hbase/conf/conf/hbase-env.sh里配置:

# The maximum amount of heap to use, in MB. Default is 1000.
export HBASE_HEAPSIZE=4096

hfile.block.cache.size在hbase/conf/hbase-size.xml里配置,默认值在src/main/resources/hbase-default.xml ,我这个版本是0.25。

还需要注意的是,并不是所有的block cache都是被data block使用的,还有:

  • catalog,即-ROOT-和hbase:meta,基本上是常驻内存的。前者只有几百字节,后者可能会达到几百MB,依赖于regions的个数
  • HFile indexes,这个是对每个region里HFile的索引,其大小甚至会达到GB量级,不过是可以被LRU的。可以在region server的UI的metrics栏里看到使用情况
  • Keys,因为跟随data一起被cache的,还有keys,包括rowkey、column family、column qualifier、timestamp等信息
  • Bloom Filters,如果开启的话,它也是被cache的,不过也是可以被LRU的。可以在region server的UI的metrics栏里看到使用情况
LRUcache不是万能的,一个完全随机读的table,或者一个mapreduce的input table,开启cache,只会浪费内存和cpu,并且增加GC的负担。

BucketCache的实现原理

如前所述,它由L1的LruBlockCache类和L2的BucketCache类共同组成,并且有CombinedBlockCache类来组装。Meta和Bloom Filters会放到onheap的L1层,而data是放到L2层。BucketCache可以选择使用onheap、offheap和file based 3种方式。如果服务器有SSD硬盘,那么file based也是一种不错的选择。(onheap和offheap的区别参考:http://stackoverflow.com/questions/6091615/difference-between-on-heap-and-off-heap)

两种cache可以组合使用,且具体参数可调。甚至cache的数据,也可以被压缩,这是一把双刃剑。

查看key size占比(TODO)

$ ./bin/hbase org.apache.hadoop.hbase.io.hfile.HFile
14/12/25 13:36:44 INFO util.ChecksumType: Checksum can use java.util.zip.CRC32
usage: HFile [-a] [-b] [-e] [-f <arg>] [-k] [-m] [-p] [-r <arg>] [-s] [-v]
 [-w <arg>]
 -a,--checkfamily Enable family check
 -b,--printblocks Print block index meta data
 -e,--printkey Print keys
 -f,--file <arg> File to scan. Pass full-path; e.g. hdfs://a:9000/hbase/.META./12/34
 -k,--checkrow Enable row order check; looks for out-of-order keys
 -m,--printmeta Print meta data of file
 -p,--printkv Print key/value pairs
 -r,--region <arg> Region to scan. Pass region name; e.g. '.META.,,1'
 -s,--stats Print statistics
 -v,--verbose Verbose output; emits file and meta data delimiters
 -w,--seekToRow <arg> Seek to this row and print all the kvs for this row only

 Write Ahead Log

HBase版本间的兼容性真是头痛……WAL在0.94之前对应的是HLog,存储在/hbase-root/.logs/下面,之后存储在/hbase-root/WALs/下面。

正常情况下,每个region server只有一个WAL log文件,其上所有region的写操作都追加到这个文件里。当需要进行故障恢复时,就需要按region对这个log里的数据进行分组,以便在open 该region的新region server上replay这些log。这个过程叫做WAL log splitting,其触发方式是:1. HMaster在集群启动时;2. 一个region server关闭时。(问题:failover时,是如何触发的呢?

WAL splitting

  1. The /hbase/WALs/<host>,<port>,<startcode> directory is renamed。防止region server假死,虽然被Master判定为down,但仍然在向WAL log写日志。但rename后,可能会导致丢失部分数据?
  2. Each log file is split, one at a time。一个线程依次从WAL log里读取每一个entry,写入到对应region buffer里,多个writter线程,从对应的region buffer里读数据,并写入HDFS文件/hbase/<table_name>/<region_id>/recovered.edits/.temp。当所有写入完成后,rename为/hbase/<table_name>/<region_id>/recovered.edits/<sequence-id>。这时,需要检查是否全部写入成功,会比较rename的sequence-id和该region下最新HFile的last sequence-id。如果rename-sequence-id <= last-sequence-id,则代表写入成功。因为如果splitting过程中,没有新写入,则两者应该相等;而如果有新写入(怎么可能?这时region应该置为down了!除非假死,但这时真的没有问题吗?),那么last的应该较大。
  3. After log splitting is complete, each affected region is assigned to a RegionServer。新的region server会从recovered.edits下replay日志,加载到memstore里,并在完成后flush到disk,删除recovered.edits。

从上面的步骤能看出,log splitting还是比较耗时的,facebook的一位开发者Prakash Khemani提交了distributed log splitting的方法,并已成为0.92版本后的默认方法。说起来很简单,就是把本来一台服务器独立完成的事情,交给多个服务器同时进行,但这就涉及到服务器间如何通信、如何调度、容错、解决短板等概念。Prakash利用zookeeper较好的解决了这个问题!但到底能提速多少呢?每个split log worker都需要完整遍历WAL log里的每一个entry,过滤出属于自己负责的部分,然后写入。区别可能只是在不match的entry就直接跳过了,而不像单机版每一个entry都得处理。

WAL replay也可以做成distributed的。

WAL Deferred log Flush: the default behavior for Puts using the Write Ahead Log (WAL) is that WAL edits will be written immediately. If deferred log flush is used, WAL edits are kept in memory until the flush period. The benefit is aggregated and asynchronous WAL- writes, but the potential downside is that if the RegionServer goes down the yet-to-be-flushed edits are lost. This is safer, however, than not using WAL at all with Puts.

Deferred log flush can be configured on tables via HTableDescriptor. The default value ofhbase.regionserver.optionallogflushinterval is 1000ms.即,可以选择是否打开Deferred Log Flush,并可以修改配置以调整延迟时长。

 

Region

一个Region Server上Regions的个数,需要合理设置,折衷点如下:

  1. Region server角度
    1. MSLAB内存消耗:defaultr equires 2mb per memstore,如果有1k个regions,则MSLAB就需要消耗4GB heap size。
    2. memstore过小:如果有1k个regions,并且给memstore整体分配了5GB,那每个memstore只有5MB,很容易就需要flush到disk(产生HDFS IO),而且多个小HFile文件,又会频繁导致minor compactions,消耗更多资源。
    3. 老版本问题(<=0.90):更多的regions会导致store file index上升,从而消耗更多的heap size。
  2. HMaster角度
    1. 老版本问题(<0.96):更多的regions会导致HMaster消耗更多时间进行region assign和批量moving。由于老版本的这些操作是同步的,所以会导致问题(不太明白老版本的原理)。猜测这些操作可能发生在start up、load balance and failover。
  3. MR的影响
    1. 由于Hadoop的数据就近分配原则,过多的regions会导致过多的task,相互竞争资源,而太少的regions又会导致过少的task,资源不会有效利用(map会排队)。

Region是由HMaster分配给Region Server的,分别发生在集群或master启动时、以及region server的failover时。对于后者,需要特别关注region可用性、数据一致性问题:

  1. The regions immediately become unavailable because the RegionServer is down.
  2. The Master will detect that the RegionServer has failed.
  3. The region assignments will be considered invalid and will be re-assigned just like the startup sequence.
  4. In-flight queries are re-tried, and not lost.
  5. Operations are switched to a new RegionServer within the following amount of time:

    ZooKeeper session timeout + split time + assignment/replay time

TODO:对于第4、5条的理解。

Region状态转移

需要注意的是,所有状态的改变都是Master进行的,可以是它主动发起,或接受到RegionServer的notification后被动修改。但状态的维持者只有master一个。

Region Split一般自动发生,但也可以手动触发。而手动split必定需要明确的理由 ,例如:

  1. 由于rowkey设计不合理,导致了hotspot,从而需要重新split全表或指定的regions
  2. 扩容region server,需要把数据和流量重新均衡
  3. 批量导入数据,可能造成负载的严重不均衡,在这一类的bulk-load前需要split

Region Merge由client手动触发()。Master会在client指定的region里,选择heavy load的那个,作为merge的执行者。

Memstore flushing

flushing的最小粒度是region而不是memstore,这就意味着一旦flush发生,则至少一个region下的所有memstores都会被flush掉,不论大小。发生的时机受hbase.hregion.memstore.flush.size(一个memstore的最大size)、hbase.regionserver.global.memstore.upperLimit(一个RS上所有memstore的总大小)、hbase.regionserver.max.logs(一个RS上WAL logs的数目)的制约,前两个回导致一个region下memstores的flush,后两个导致RS上所有regions下面所有memstore的flush。

Scans

Compactions

由上图可见minor compaction和major compaction之间的区别:minor只是将“一些”小的HFile文件合并,由于看不到全局,也不涉及stombstone marker和version的清理;而major是将一个region下“所有”的HFile合并为1个HFile,会进行各种数据清理。后者对资源的消耗较大,默认情况下,每7天进行一次。

Compaction相对较为消耗资源,所以,需要决定:什么时候compaction、针对哪些HFile、采取什么Compaction方法?这些是由Compaction Policy决定的。

老版本的RatioBasedCompactionPolicy算法:(以下括号里是自己的理解,不保证准确)

  1. 用该Region下的HFiles初始化tobe compaction list,应包括all StoreFiles not already in the compaction queue(之前compaction没处理的Hfiles)和all StoreFiles newer than the newest file that is currently being compacted(在之前compaction后产生的HFiles)
  2. 如果是Stuck状态,则强制major compaction。Stuck状态是指Memstore size和HFile数目都达到上限了,需要赶快处理。在bulk-load批量导入数据时,由于memstore size和HFile数目都会快速上升,故会频繁导致无用的major compaction
  3. 如果是user request的compaction,那就尽量执行它。但不能保证一定会进行major,即使用户要求了。例如all HFiles are not available for compaction(可能在splitting或compaction)或too may StoreFiles exist(这时先minor再major可能会更好)
  4. 排除一些HFiles,例如StoreFiles that are larger than hbase.hstore.compaction.max.size(可能先split会更好)、StoreFiles that were created by a bulk-load operation which explicitly excluded compaction(在load期间可以临时性的设置hbase.mapreduce.hfileoutputformat.compaction.exclude)
  5. 决定是minor or major:if hfiles number > hbase.hstore.compaction.max, then minor; else major. 
  6. 如果step 5决定使用minor,且hfiles number < hbase.hstore.compaction.min, then abort。如果step 5指定major,计算只有一个hfile,也可以执行,从而去除delete和expired version数据
  7. 再次根据文件大小过滤文件:对于HFile X, if x's size > hbase.hstore.compaction.ratio * (sum of other smaller Hfile size),则excluded。针对off-peak,可以单独设置ratio值
  8. 如果很久没有major,则优先major

针对RatioBasedCompactionPolicy的问题,后续默认的policy改变为ExploringCompactionPolicy,尤其解决了bulk-load时的问题。

 

参考资料

http://hbase.apache.org/book/architecture.html

http://blog.zahoor.in/2012/08/hbase-hmaster-architecture/

https://blogs.apache.org/hbase/entry/coprocessor_introduction

https://issues.apache.org/jira/browse/HBASE-5273  // 在src/example/coprocessor下提供了一些Coprocessor例子

http://www.n10k.com/blog/blockcache-101/

http://people.apache.org/~stack/bc/

http://hbase.apache.org/book/regionserver.arch.html#offheap.blockcache

http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/io/hfile/CacheConfig.html

https://issues.apache.org/jira/browse/HBASE-9857

http://hbase.apache.org/book/important_configurations.html#balancer_config

http://hbase.apache.org/xref/org/apache/hadoop/hbase/io/hfile/LruBlockCache.html

http://en.wikipedia.org/wiki/Working_set_size

http://en.wikipedia.org/wiki/Write-ahead_logging

http://blog.cloudera.com/blog/2012/07/hbase-log-splitting/

https://issues.apache.org/jira/browse/HBASE-2643

http://www.larsgeorge.com/2010/05/hbase-file-locality-in-hdfs.html

http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html

http://hortonworks.com/blog/apache-hbase-region-splitting-and-merging/

http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/file/tfile/TFile.html

http://cloudepr.blogspot.com/2009/09/hfile-block-indexed-file-format-to.html

http://th30z.blogspot.com/2011/02/hbase-io-hfile.html?spref=tw

Leave a Reply