Archive for 十二月, 2014

Catalog table

之前提到过HBase默认有两个namespace,其中一个是hbase(系统空间),一个是default(默认用户空间)。catalog table就是hbase:meta表,在系统namespace里,其本质上也是一张Hbase table。

在HBase 0.96前,是没有hbase:meta表的,而是由-ROOT-表和.META.表共同承担其功能。在0.96版本后,-ROOT-表的寻址功能被zookeeper替代,.META.表改名为hbase:meta。

这组功能对应bigtable论文里的tablet location。

该表的rowkey是:{[table “” not found /]
, [region start key], [region_id]},这里应该是rowkey,而不是key,即会与column、version共同构成key。只有一个column family:info。初始情况有3个qualifier:regioninfo, server, serverstartcode。在region split的过程中,会产生两个临时的column:info:splitA, info:splitB,其value也会存储regioninfo信息;完成split后,这两个column会被删除掉。

Client

client端至少需要配置zookeeper的地址,以便通过zk找到hbase:meta表,以及master等信息。基于zookeeper,client也能及时得知region的变化。对于最精彩使用的Put、Get、Scan等数据操作,client是自己根据hbase:meta寻址RegionServers,并直接与其进行通信,在此过程中无需Master的配合。

TODO:在region变化与client获悉变化间,如果发起请求,会怎样呢?bigtable上是会通过一套机制来保证重新寻址并成功请求的。

在Java版的client里,使用开销较大的ConnectFactory和轻量级的Table、Admin、RegionLocator对象与HBase集群进行通信,需要注意的是在一个线程/进程里,ConnectionFactory最好是复用的,而Table等可以随时生成和销毁。并且,需要特别关注的是,Table是非线程安全的,所以多个线程不要共享该对象。也提供了线程池HConnection,进一步降低开销。

client也提供了writebuf功能,但只针对put有效,delete是不过buf直接发送的。

HMaster

HBase architecture follows the traditional master slave model where you have a master which takes decisions and one or more slaves which does the real task.

可以想象一下,在一个分布式的主从集群里,有哪些事情需要master参与呢?列举一些我能够想到的,一定不全。

集群事务:

  1. scheme的更改,包括column family的参数配置
  2. 对regionserver的调度(增删regionserver、region到regionserver的分配、负载均衡等)
  3. 集群运行状态的汇总和展示

master本身:master自己是一个单点,所以必然需要采取主备。HMaster通过zk的临时锁实现竞争机制。

在HMaster的设计中,功能性需求与非功能性需求并重。

其中External Interfaces既有设计所需的与RegionServer、Client通信的RPC接口,也有用于监控等所需的Info Svr,以便让使用者可以窥其运行状态并及时跟进。

Execute Services是实际功能的实现者,其与Interface等(也许execute过程中还会产生子event)配合,通过队列接受其提交(post)的events。每种事件的handler threads是隔离的,有独立的threads pool。采用这种设计方式的原因,猜测有以下几种:

  1. 通过队列提升吞吐量,在工作线程无法及时响应的时候,可以通过排队进行缓冲
  2. 通过队列的消息交互,实现解耦,使execute services可以透明的进行扩展,而interface等模块基本不用或很少量的修改
  3. 独立的threads pool,也使功能与功能间互不干扰,并可按需分别扩容
  4. 其他我没有想到的优点

HMaster的很多行为依赖Zookeeper,所以对其进行了封装。

可以看到,这里使用了Zookeeper Watch使用了proxy(代理)模式,Active Master Manager等使用了manager(管理器)模式,Region Server Tracker使用了tracker(跟踪器)模式。

其中,Draining server tracker类似于nginx的graceful restart,即admin可以调度server使其不再接受新的connection,而在当前所有conn处理完毕后,予以关闭或重启。

TODO:root region是指-ROOT吗?还是hbase:meta呢?root region server应该是root region所在的server。

File System Interfaces的Log Cleaner,继承自base chore定时运行。一般有TimeToLiveLogCleaner、ReplicationLogCleaner。对应Master里的哪些Log呢?以及HFile Cleaner,hfile不都是存储在tablet节点上的吗?

 RegionServer

HRegionServer里,有一些后台线程,执行CompactSplitThread、MajorCompactionThread、MemStoreFlusher、LogRoller,其中CompactSplit指的是minor compactions,LogRoller处理的是WAL日志。

coprocessor(协程)

引入coprocessor的原因,这段话描述的很清楚:

HBase has very effective MapReduce integration for distributed computation over data stored within its tables, but in many cases – for example simple additive or aggregating operations like summing, counting, and the like – pushing the computation up to the server where it can operate on the data directly without communication overheads can give a dramatic performance improvement over HBase’s already good scanning performance.

究其思想,就是把非cpu密集型的简单计算,例如sum、count等,推到hbase server端进行,从而降低网络IO,以得到极大的性能提升。coprocessor就是为了给hbase的client提供这个扩展功能的,其初衷是通过coprocessor,使HBase可以方便的支持secondary indexing、complex filtering and access control。

值得关注的一个区别是,HBase的实现里,coprocessor的user code是在regionservers或masterserver的进程空间里运行的,而Google的实现里,它们是运行在同一台服务器但不同的进程空间里。而HBase也想往Google模式迁移。这是为什么呢?我猜测一个原因是,后者相当于提供了一个沙箱环境,user code的崩溃,理论上不会影响到HBase主体进程。但后者的实现复杂度可能也要高于前者(这个是看代码设计功底了,也不一定)

In order to support sufficient flexibility for potential coprocessor behaviors, two different aspects of extension are provided by the framework. One is the observer, which are like triggers in conventional databases, and the other is the endpoint, dynamic RPC endpoints that resemble stored procedures.

HBase coprocessor提供了两种扩展方式:observer和endpoints。前者类似trigger,后者类似stored procedures。

当前有3种observers:

  • regionObserver:运行在regionserver上,可以hook到数据的增删改查里。
  • WALObserver:也是运行在regionServer上,但是作用于write-ahead-log的写入或重放里(reconstruction)
  • masterObserver:运行在hmaster上,可以hook到table schema的变化里。

observer大多提供成对的preXXX和postXXX方法,分别作用于某事件发生前后,几乎每一种HBase相关的事件都可以被hook。同一事件上注册的不同hook间,先执行的(高优先级的)可以通过exception的方式,停止后面待执行的(低优先级的)hooks。

不同于Observer,仅由事件触发,运行在server端;endpoints需要 client和server端通过RPC交互调用,并协同工作。一般涉及多个regions(注意:不是region server,而是region)的操作,会先在每个region上进行计算(类似map),并将所有region计算的结果分别返回给client,由client进行聚合(类似reduce)。在HBase的发行版里,应该已有一些集成的endpoints,例如org.apache.hadoop.hbase.coprocessor.AggregateImplementation。

Due to Java’s lack of multiple inheritance this required extension plus base code to be refactored into a single class providing the full implementation, which quickly becomes brittle when considering multiple extensions. Who inherits from whom? Coprocessors allow a much more flexible mixin extension model.

上面这段话也很精彩,对于设计模式的实际抉择,以及语言层面的分析。(TODO

 Block Cache

毫无疑问,cache是提高HBase吞吐和性能的关键!HRegion有两种block cache:onheap LruBlockCache和BucketCache,后者在hbase-0.98.6之后才可以使用。在region server的 UI界面里,可以看到相关配置和使用情况,以我们一个offline环境为例:

usedHeapMB=842, maxHeapMB=4075, blockCacheSizeMB=13.33, blockCacheFreeMB=1005.49, blockCacheCount=76, blockCacheHitCount=1392277, blockCacheMissCount=2192269514, blockCacheEvictedCount=72270, blockCacheHitRatio=0%, blockCacheHitCachingRatio=94%, hdfsBlocksLocalityIndex=84

简单理解两者的区别是,onheap LruBlockCache是基于Java heap的,即被cache的数据都是在java进程的内存空间里,受Java GC机制的管理。而BucketCache,有onheap和offheap两种modes,默认和一般情况都是用offheap的。region加载初始,bucketCache的查询速度会慢于LruBlockCache,但后者受java GC机制的制约,随着GC次数和内存碎片的增加,性能波动会比较大。所以,给出的结论是:

Also see Comparing BlockCache Deploys which finds that if your dataset fits inside your LruBlockCache deploy, use it , otherwise if you are experiencing cache churn (or you want your cache to exist beyond the vagaries of java GC), use BucketCache.

我的理解是,如果内存足够大,很少需要GC,那么可以使用LRU。否则,如果经常遇到内存使用率的波动(代表经常GC),或者不希望由GC来管理cache的话,那就使用BucketCache。

在实现层面,BucketCache其实是一个两层cache结构。L2层才是真正的offheadp BucketCache,用来存储block data。而L1层就是LruBlockCache,存储了INDEX和BLOOM blocks。所以,它仍然是需要使用heap的(哪个程序不需要heap呢?),但由于索引和bloom blocks的量,相对data而言,是非常小的,所以内存用量也会大大降低,从而使GC频率降低。

LruBlockCache的实现原理

lruBlockCache区分了3个优先级:

  • Single access priority: The first time a block is loaded from HDFS it normally has this priority and it will be part of the first group to be considered during evictions. The advantage is that scanned blocks are more likely to get evicted than blocks that are getting more usage.
  • Mutli access priority: If a block in the previous priority group is accessed again, it upgrades to this priority. It is thus part of the second group considered during evictions.
  • In-memory access priority: If the block’s family was configured to be “in-memory”, it will be part of this priority disregarding the number of times it was accessed. Catalog tables are configured like this. This group is the last one considered during evictions.

这样的好处是,对于由于scan请求而加载的blocks,首先会被放到single access priority里,是最低优先级被内存维护的,也就是最先会被LRU evict掉的。原因是,从使用场景来看,scan blocks比较少被重复使用,一般是一次性的。而如果是多次被访问的blocks,说明是热点数据,那就放到multi access priority里,从而降低其被LRU的概率。Catalog table对应的blocks一般就会是multi access的。同时,由于HBase允许指定Column family是in-memory的,这些cf对应的blocks在被读取后,就会放到in-memory priority里,但需要注意的是,它们仅是降低被evict的可能性,在内存不足的情况下,仍然会被踢掉的。

通过下面的公式可以计算在一个HBase集群中,有多少内存可以用来作为Cache使用:

number of region servers * heap size * hfile.block.cache.size * 0.99

其含义其实就是,整个cluster里的机器数目 × 配置的每个机器的heap大小 × 配置的允许给cache使用的百分比 × 0.99。其中 heapsize的大小在 hbase/conf/conf/hbase-env.sh里配置:

# The maximum amount of heap to use, in MB. Default is 1000.
export HBASE_HEAPSIZE=4096

hfile.block.cache.size在hbase/conf/hbase-size.xml里配置,默认值在src/main/resources/hbase-default.xml ,我这个版本是0.25。

还需要注意的是,并不是所有的block cache都是被data block使用的,还有:

  • catalog,即-ROOT-和hbase:meta,基本上是常驻内存的。前者只有几百字节,后者可能会达到几百MB,依赖于regions的个数
  • HFile indexes,这个是对每个region里HFile的索引,其大小甚至会达到GB量级,不过是可以被LRU的。可以在region server的UI的metrics栏里看到使用情况
  • Keys,因为跟随data一起被cache的,还有keys,包括rowkey、column family、column qualifier、timestamp等信息
  • Bloom Filters,如果开启的话,它也是被cache的,不过也是可以被LRU的。可以在region server的UI的metrics栏里看到使用情况
LRUcache不是万能的,一个完全随机读的table,或者一个mapreduce的input table,开启cache,只会浪费内存和cpu,并且增加GC的负担。

BucketCache的实现原理

如前所述,它由L1的LruBlockCache类和L2的BucketCache类共同组成,并且有CombinedBlockCache类来组装。Meta和Bloom Filters会放到onheap的L1层,而data是放到L2层。BucketCache可以选择使用onheap、offheap和file based 3种方式。如果服务器有SSD硬盘,那么file based也是一种不错的选择。(onheap和offheap的区别参考:http://stackoverflow.com/questions/6091615/difference-between-on-heap-and-off-heap)

两种cache可以组合使用,且具体参数可调。甚至cache的数据,也可以被压缩,这是一把双刃剑。

查看key size占比(TODO)

$ ./bin/hbase org.apache.hadoop.hbase.io.hfile.HFile
14/12/25 13:36:44 INFO util.ChecksumType: Checksum can use java.util.zip.CRC32
usage: HFile [-a] [-b] [-e] [-f <arg>] [-k] [-m] [-p] [-r <arg>] [-s] [-v]
 [-w <arg>]
 -a,--checkfamily Enable family check
 -b,--printblocks Print block index meta data
 -e,--printkey Print keys
 -f,--file <arg> File to scan. Pass full-path; e.g. hdfs://a:9000/hbase/.META./12/34
 -k,--checkrow Enable row order check; looks for out-of-order keys
 -m,--printmeta Print meta data of file
 -p,--printkv Print key/value pairs
 -r,--region <arg> Region to scan. Pass region name; e.g. '.META.,,1'
 -s,--stats Print statistics
 -v,--verbose Verbose output; emits file and meta data delimiters
 -w,--seekToRow <arg> Seek to this row and print all the kvs for this row only

 Write Ahead Log

HBase版本间的兼容性真是头痛……WAL在0.94之前对应的是HLog,存储在/hbase-root/.logs/下面,之后存储在/hbase-root/WALs/下面。

正常情况下,每个region server只有一个WAL log文件,其上所有region的写操作都追加到这个文件里。当需要进行故障恢复时,就需要按region对这个log里的数据进行分组,以便在open 该region的新region server上replay这些log。这个过程叫做WAL log splitting,其触发方式是:1. HMaster在集群启动时;2. 一个region server关闭时。(问题:failover时,是如何触发的呢?

WAL splitting

  1. The /hbase/WALs/<host>,<port>,<startcode> directory is renamed。防止region server假死,虽然被Master判定为down,但仍然在向WAL log写日志。但rename后,可能会导致丢失部分数据?
  2. Each log file is split, one at a time。一个线程依次从WAL log里读取每一个entry,写入到对应region buffer里,多个writter线程,从对应的region buffer里读数据,并写入HDFS文件/hbase/<table_name>/<region_id>/recovered.edits/.temp。当所有写入完成后,rename为/hbase/<table_name>/<region_id>/recovered.edits/<sequence-id>。这时,需要检查是否全部写入成功,会比较rename的sequence-id和该region下最新HFile的last sequence-id。如果rename-sequence-id <= last-sequence-id,则代表写入成功。因为如果splitting过程中,没有新写入,则两者应该相等;而如果有新写入(怎么可能?这时region应该置为down了!除非假死,但这时真的没有问题吗?),那么last的应该较大。
  3. After log splitting is complete, each affected region is assigned to a RegionServer。新的region server会从recovered.edits下replay日志,加载到memstore里,并在完成后flush到disk,删除recovered.edits。

从上面的步骤能看出,log splitting还是比较耗时的,facebook的一位开发者Prakash Khemani提交了distributed log splitting的方法,并已成为0.92版本后的默认方法。说起来很简单,就是把本来一台服务器独立完成的事情,交给多个服务器同时进行,但这就涉及到服务器间如何通信、如何调度、容错、解决短板等概念。Prakash利用zookeeper较好的解决了这个问题!但到底能提速多少呢?每个split log worker都需要完整遍历WAL log里的每一个entry,过滤出属于自己负责的部分,然后写入。区别可能只是在不match的entry就直接跳过了,而不像单机版每一个entry都得处理。

WAL replay也可以做成distributed的。

WAL Deferred log Flush: the default behavior for Puts using the Write Ahead Log (WAL) is that WAL edits will be written immediately. If deferred log flush is used, WAL edits are kept in memory until the flush period. The benefit is aggregated and asynchronous WAL- writes, but the potential downside is that if the RegionServer goes down the yet-to-be-flushed edits are lost. This is safer, however, than not using WAL at all with Puts.

Deferred log flush can be configured on tables via HTableDescriptor. The default value ofhbase.regionserver.optionallogflushinterval is 1000ms.即,可以选择是否打开Deferred Log Flush,并可以修改配置以调整延迟时长。

 

Region

一个Region Server上Regions的个数,需要合理设置,折衷点如下:

  1. Region server角度
    1. MSLAB内存消耗:defaultr equires 2mb per memstore,如果有1k个regions,则MSLAB就需要消耗4GB heap size。
    2. memstore过小:如果有1k个regions,并且给memstore整体分配了5GB,那每个memstore只有5MB,很容易就需要flush到disk(产生HDFS IO),而且多个小HFile文件,又会频繁导致minor compactions,消耗更多资源。
    3. 老版本问题(<=0.90):更多的regions会导致store file index上升,从而消耗更多的heap size。
  2. HMaster角度
    1. 老版本问题(<0.96):更多的regions会导致HMaster消耗更多时间进行region assign和批量moving。由于老版本的这些操作是同步的,所以会导致问题(不太明白老版本的原理)。猜测这些操作可能发生在start up、load balance and failover。
  3. MR的影响
    1. 由于Hadoop的数据就近分配原则,过多的regions会导致过多的task,相互竞争资源,而太少的regions又会导致过少的task,资源不会有效利用(map会排队)。

Region是由HMaster分配给Region Server的,分别发生在集群或master启动时、以及region server的failover时。对于后者,需要特别关注region可用性、数据一致性问题:

  1. The regions immediately become unavailable because the RegionServer is down.
  2. The Master will detect that the RegionServer has failed.
  3. The region assignments will be considered invalid and will be re-assigned just like the startup sequence.
  4. In-flight queries are re-tried, and not lost.
  5. Operations are switched to a new RegionServer within the following amount of time:

    ZooKeeper session timeout + split time + assignment/replay time

TODO:对于第4、5条的理解。

Region状态转移

需要注意的是,所有状态的改变都是Master进行的,可以是它主动发起,或接受到RegionServer的notification后被动修改。但状态的维持者只有master一个。

Region Split一般自动发生,但也可以手动触发。而手动split必定需要明确的理由 ,例如:

  1. 由于rowkey设计不合理,导致了hotspot,从而需要重新split全表或指定的regions
  2. 扩容region server,需要把数据和流量重新均衡
  3. 批量导入数据,可能造成负载的严重不均衡,在这一类的bulk-load前需要split

Region Merge由client手动触发()。Master会在client指定的region里,选择heavy load的那个,作为merge的执行者。

Memstore flushing

flushing的最小粒度是region而不是memstore,这就意味着一旦flush发生,则至少一个region下的所有memstores都会被flush掉,不论大小。发生的时机受hbase.hregion.memstore.flush.size(一个memstore的最大size)、hbase.regionserver.global.memstore.upperLimit(一个RS上所有memstore的总大小)、hbase.regionserver.max.logs(一个RS上WAL logs的数目)的制约,前两个回导致一个region下memstores的flush,后两个导致RS上所有regions下面所有memstore的flush。

Scans

Compactions

由上图可见minor compaction和major compaction之间的区别:minor只是将“一些”小的HFile文件合并,由于看不到全局,也不涉及stombstone marker和version的清理;而major是将一个region下“所有”的HFile合并为1个HFile,会进行各种数据清理。后者对资源的消耗较大,默认情况下,每7天进行一次。

Compaction相对较为消耗资源,所以,需要决定:什么时候compaction、针对哪些HFile、采取什么Compaction方法?这些是由Compaction Policy决定的。

老版本的RatioBasedCompactionPolicy算法:(以下括号里是自己的理解,不保证准确)

  1. 用该Region下的HFiles初始化tobe compaction list,应包括all StoreFiles not already in the compaction queue(之前compaction没处理的Hfiles)和all StoreFiles newer than the newest file that is currently being compacted(在之前compaction后产生的HFiles)
  2. 如果是Stuck状态,则强制major compaction。Stuck状态是指Memstore size和HFile数目都达到上限了,需要赶快处理。在bulk-load批量导入数据时,由于memstore size和HFile数目都会快速上升,故会频繁导致无用的major compaction
  3. 如果是user request的compaction,那就尽量执行它。但不能保证一定会进行major,即使用户要求了。例如all HFiles are not available for compaction(可能在splitting或compaction)或too may StoreFiles exist(这时先minor再major可能会更好)
  4. 排除一些HFiles,例如StoreFiles that are larger than hbase.hstore.compaction.max.size(可能先split会更好)、StoreFiles that were created by a bulk-load operation which explicitly excluded compaction(在load期间可以临时性的设置hbase.mapreduce.hfileoutputformat.compaction.exclude)
  5. 决定是minor or major:if hfiles number > hbase.hstore.compaction.max, then minor; else major. 
  6. 如果step 5决定使用minor,且hfiles number < hbase.hstore.compaction.min, then abort。如果step 5指定major,计算只有一个hfile,也可以执行,从而去除delete和expired version数据
  7. 再次根据文件大小过滤文件:对于HFile X, if x's size > hbase.hstore.compaction.ratio * (sum of other smaller Hfile size),则excluded。针对off-peak,可以单独设置ratio值
  8. 如果很久没有major,则优先major

针对RatioBasedCompactionPolicy的问题,后续默认的policy改变为ExploringCompactionPolicy,尤其解决了bulk-load时的问题。

 

参考资料

http://hbase.apache.org/book/architecture.html

http://blog.zahoor.in/2012/08/hbase-hmaster-architecture/

https://blogs.apache.org/hbase/entry/coprocessor_introduction

https://issues.apache.org/jira/browse/HBASE-5273  // 在src/example/coprocessor下提供了一些Coprocessor例子

http://www.n10k.com/blog/blockcache-101/

http://people.apache.org/~stack/bc/

http://hbase.apache.org/book/regionserver.arch.html#offheap.blockcache

http://hbase.apache.org/devapidocs/org/apache/hadoop/hbase/io/hfile/CacheConfig.html

https://issues.apache.org/jira/browse/HBASE-9857

http://hbase.apache.org/book/important_configurations.html#balancer_config

http://hbase.apache.org/xref/org/apache/hadoop/hbase/io/hfile/LruBlockCache.html

http://en.wikipedia.org/wiki/Working_set_size

http://en.wikipedia.org/wiki/Write-ahead_logging

http://blog.cloudera.com/blog/2012/07/hbase-log-splitting/

https://issues.apache.org/jira/browse/HBASE-2643

http://www.larsgeorge.com/2010/05/hbase-file-locality-in-hdfs.html

http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html

http://hortonworks.com/blog/apache-hbase-region-splitting-and-merging/

http://hadoop.apache.org/docs/current/api/org/apache/hadoop/io/file/tfile/TFile.html

http://cloudepr.blogspot.com/2009/09/hfile-block-indexed-file-format-to.html

http://th30z.blogspot.com/2011/02/hbase-io-hfile.html?spref=tw

今年做的重要项目之一,就是对一个核心Web系统重构,使之达到了99.999% 的高可用性。在此过程中,积累了一些系统架构、自动故障发现与解决、代码健壮性等方面的经验,予以记录。

业务背景介绍

该web系统是一个大型互联网系统商业运营侧的核心web系统,PHP语言实现,之前的可用性方面存在一定问题,在历史上也出过不少事故。由于是商业系统,其PV仅是中等规模,但复杂度相对较高,体现在其所涉及的网络交互、DB调用、Cache交互较多。

需要解决的问题

  1. 所依赖的非核心上游服务不可用时,及时发现并自动降级
  2. 所依赖的上游服务部分节点不可用时,及时发现并自动摘除故障节点
  3. 通过Lib库,封装网络交互中的重试、连接和读写超时、异常日志和以上各种功能,使之对业务层透明

整体架构设计

该系统采用集中式+分布式相结合的异常发现和处理方案。由浅蓝色业务层和深蓝色基础服务设施一起,识别并处理异常攻击、网络、系统和上游服务问题。为了最大程度的解耦,它们交互的方式,就是规范化的文本日志和INI配置。

之所以采用集中+分布的混合方式,是由于所处系统环境的复杂度导致的:

  1. 对于异常流量,由于目前集群规模较大,单点不可能阈值配置过大(规则),也无法收集到全局信息(策略),只有集中式才可以综合判断;
  2. 对于上游服务的整体故障,同样的道理,单点也不可能阈值配置过小,否则很容易产生抖动,而集中式可以全局收集信息,全局联动;
  3. 但集中式的服务降级,无法很好解决以下两种问题,所以需要通过每个业务节点通过节点健康检查来处理。
    1. 不可降级的核心服务交互
    2. 点对点的非核心服务故障,例如上游服务部分机房故障,或某台服务器故障

全局故障监控与降级

故障采集:为了降低侵入性,采取了由业务模块打印warn、error log,各业务节点上的守护进程阻塞read日志文件,并发往多个(目前配置为2个)同构监控与调度服务的方式。之所以采用多写的方式,是由于一期实现时,复用了日志ETL流,走的是公司的消息队列和流式计算平台,而它们都是强一致性的,当网络出现抖动或拥塞时,故障日志甚至会延迟数小时之久!所以,最终我们决定采用最简单的方式,对于这种对实时性要求极高的消息交互,采用CS直接透传的方式。而交叉多写可以保证只要不是所有机房间链路都发生故障,就可以在秒级别完成消息传递。多个监控与调度服务完全同构,都会接收异常日志并予以计算和处理,由于重复降级不会产生副作用,所以没有做maser、slave角色的划分。

故障判别:出于扩展性和复用角度的考虑,故障判别的规则是基于Protobuf格式配置的,基本原理就是配置:某一种类型的异常日志在一段时间内到达指定阈值后,需要将什么配置项改为什么值。由于异常日志的数量一般不会太大,所以每个监控与调度服务都是独立运行的,不存在数据同步的问题,时间窗口、频率等信息都在内存中直接计算。当需要横向扩展时,可以与故障采集配合,根据业务模块、异常日志类型进行纵向划分,映射到不同的监控节点上。仅有一点需要注意的是,为了防止网络等问题导致消息延迟到达,在计算频率时,会过滤掉超时的消息。

故障处理:同样为了降低业务侵入性和耦合度,故障处理是通过修改业务模块的配置文件实现的。通过在PHP业务节点上部署我们的zk_agent,监听zk文件变化并修改业务模块的ini配置对应项,和复用C++进程的基于zk的热加载功能,监控与调度模块只需与zookeeper集群通信,修改指定配置项,而无需知晓业务模块的具体信息。上面提到的多实例方式,也可以保证只要有一个监控与调度模块与zk集群通信成功,就可以成功完成降级指令的发布。这里需要注意的是,由于机器规则总有其不完善处,引入了保险栓的方式,保证在开启保险栓的情况下,人工干预的优先级最高。

当然,这套流程也有其不完善处,例如为了避免网络抖动,我们是通过设置一个较大的阈值来实现的,而没有做更为精细化的处理。同时,其配置恢复是手工的,因为监控与调度模块为了降低耦合度和复杂度,没有主动去探测故障的恢复情况。

单点故障监控与摘除

作为自动降级的补充,该功能着眼于发现点对点的问题,并予以处理。其原理也是:该业务服务器上,在一段时间内,对某上游节点的调用若失败超过阈值,则屏蔽该上游节点。这里直接通过真实请求的失败进行计数,没有开启独立进程,也是为了降低复杂度,提升易用性。

采用策略模式,实现了APC和File两种存储介质的封装,并支持扩展。由于APC也是跨进程的,所以可以在单机所有PHP进程间共享失败次数、故障节点信息。经过测试,APC的性能优于File,所以一般推荐使用APC模式。但为了兼容那些未安装APC module的业务模块,所以也保留了File的方式。

由于大中型系统的点对点故障是频繁发生的,所以这里采用屏蔽超时自动恢复的方式,虽然可能会造成锯齿状的耗时波动,但无需人工干预,是综合而言最优的方式。

由于请求间可能有相互依赖关系,单点是无法handle降级的,所以一旦检查到某服务的上游节点全部不可用,则重新置位,设为全部可用。

PHP基础lib库

以上及其他未提及的稳定性保障,对于业务层而言略显复杂,且由每个业务开发者来保证这些也是不可靠的,所以必须有一套对业务透明的php Lib库,予以封装。我们实现了这样一套Lib,首先细致处理了Mysql、Memcache、Curl、Socket、Redis等网络交互时的重试、连接和读写超时。其次,以上网络组件又是基于代码层的负载均衡、节点健康检查、服务降级、配置组件,从而对上屏蔽稳定性细节。同时,该Lib充分考虑易用性,例如通过对多种Cache的封装,仅暴露一个简单的Call接口和规范化配置,就可以使业务层完成旁路cache的使用。

总结

通过以上方式,该系统在上线3个月内,自动处理了2000多次故障,实现了按请求成功率计算,99.999%以上的可用性。感触最深的是,很多做业务开发的PHPer,简单的认为稳定大多是运维同学的责任,但想要达到高稳定性,工作其实从架构设计就开始了,更深深渗透在每一行代码里!

HBase的table可以认为是一个多维map,其key依次是rowkey、column、version,存储单元是一个cell。

一定要牢记于心的是,增删改在HBase里都是文件的追加操作,以{row, column, timestamp}为key、cell+操作类型为值 组成的一行。在查询时,是需要整合所有HFile和内存里的条目,才可以拼出最后数据的。所以,文件的大小、个数会影响到查询性能。

增删改,对应着column meta里的type。更确定的说,其实只有put、delete×族的操作,逻辑上的update操作也是一个put(不管是API还是oplog)。而delete会形成一个tombstone 记录。

HBase conceptual vs Bigtable

TODO

Namespace

A namespace is a logical grouping of tables analogous to a database in relation database systems.

它是独立进行配额管理、安全、region server groups的单位。其中regions server groups是指将这个namespace指定在一组region servers上,从而实现程序级别的隔离。

默认有hbase和default两个namespace,前者是系统级,后者是默认用户的ns。

存储顺序

逻辑上,一个table里的rows按字母序升序排列。

物理上,一个table的column family里的column数据是存储在一起的。

TODO,到底是怎样存储的呢?逻辑与物理是如何映射的呢?

delete

删除操作不是实时生效的,而是生成一条删除标记,之后在major compaction的时候,真正执行物理删除动作。删除可以指定version范围。也有一些hbase-site.xml的配置和column family的设置会影响delete操作。

Delete markers are purged during the next major compaction of the store, unless the KEEP_DELETED_CELLS option is set in the column family. To keep the deletes for a configurable amount of time, you can set the delete TTL via the hbase.hstore.time.to.purge.deletes property in hbase-site.xml. If hbase.hstore.time.to.purge.deletes is not set, or set to 0, all delete markers, including those with timestamps in the future, are purged during the next major compaction. Otherwise, a delete marker with a timestamp in the future is kept until the major compaction which occurs after the time represented by the marker’s timestamp plus the value of hbase.hstore.time.to.purge.deletes, in milliseconds.

timestamp and version

Hbase默认使用timestamp作为version,但不是必须的,user也可以自己指定version的值,可以使用过去、现在、未来的时间戳,或压根不使用时间(long integer)。

由于HBase使用(row,column,version)作为三维数组的index,所以如果同一时间(version)写入多个拥有相同row、column的数据,那仅有最后处理的那个数据会被留下,其他数据就默默地丢失了!虽然timestamp可以精确到ms,但仍存在一定冲突的可能性!

不过,由于HBase的一些内部机制以来version timestamp,所以,最好使用默认方式。

Caution: the version timestamp is internally by HBase for things like time-to-live calculations. It’s usually best to avoid setting this timestamp yourself. Prefer using a separate timestamp attribute of the row, or have the timestamp a part of the rowkey, or both.

结合delete操作的原理,即设置tombstone而非立即删除,在不是使用默认时间戳的情况下,可能会有问题(即使使用默认时间戳,在小概率下,也会有问题):即delete可能会有条件的影响后续的put操作

Deletes mask puts, even puts that happened after the delete was entered. See HBASE-2256. Remember that a delete writes a tombstone, which only disappears after then next major compaction has run. Suppose you do a delete of everything <= T. After this you do a new put with a timestamp <= T. This put, even if it happened after the delete, will be masked by the delete tombstone. Performing the put will not fail, but when you do a get you will notice the put did have no effect. It will start working again after the major compaction has run. These issues should not be a problem if you use always-increasing versions for new puts to a row. But they can occur even if you do not care about time: just do delete and put immediately after each other, and there is some chance they happen within the same millisecond.

delete的原理,还可能对get multi version造成有条件的影响

…create three cell versions at t1, t2 and t3, with a maximum-versions setting of 2. So when getting all versions, only the values at t2 and t3 will be returned. But if you delete the version at t2 or t3, the one at t1 will appear again. Obviously, once a major compaction has run, such behavior will not be the case anymore…

即,根据delete t3和major compaction顺序的不同,有两个细节分支:

  1. put t1-t3,delete t3,major compaction => t1、t2  or t2?(TODO,待验证)
  2. put t1-t3,major compaction,delete t3 => t2
key里的version/timestamp还用于在跨数据中心同步时的冲突检测,所以如果自己定义version,可能会导致问题。但一种特例情况是,如果针对多个tables建立二级索引,那么会希望能够获取到一致性(consistent)的数据,这样就最好自己指定version,以便在读取的时候,可以通过version来获取特定的cell。

GC

gc中与version相关点之一是,在添加新cell时,可能会使oldest version过期(超过了maxVersions配置),但真正的删除还是发生在major compaction时!所以,结合上面的delete内容,各种错乱就有可能发生。

另一个gc点是TTL(time-to-live)过期,cell过期的清理也是发生在major compaction时。当一个row下所有的cell都过期了,这个row也就不存在了。

动态column

在创建table的时候,只需要指定column family,而column是put时动态扩展的,这就意味着,无法让HBase告诉我们,有哪些column存在。这些信息,只能由使用者自己维护、理解和使用。

对于ACID的影响

Atomicity:保证单行原子性,包括单行、跨column family的写,都是原子的。但是跨行不支持原子,即批量写时可能会部分成功、失败或超时。对超时情况,可能会成功、也可能会失败。

Consistency&Isolation:单行的get保证一致性;但scan操作只保证“read committed”的一致性,可能会读到不同版本的多行数据,但一行数据仍然是一致的。另外需要注意的是,scan多行时,所依赖的是transaction commit time,即操作的时间,而不是version/timestamp,导致的情况是,如果scan的construction发生在时间t,那么假如在t之前提交了一个cell,虽然其version时间是t之后的,那么这个值也可能会读出。

Please note that the guarantees listed above regarding scanner consistency are referring to “transaction commit time”, not the “timestamp” field of each cell. That is to say, a scanner started at time t may see edits with a timestamp value greater than t, if those edits were committed with a “forward dated” timestamp before the scanner was constructed.

visiablity:保证针对一个row的多次操作是有序的,即针对一个row,多次get的结果顺序,一定遵循多次put的顺序。

Durability:可见(get、scan)的数据,都是持久化、不会丢失的。

参考文档

http://hbase.apache.org/book/datamodel.html

http://jimbojw.com/wiki/index.php?title=Understanding_Hbase_and_BigTable

http://0b4af6cdc2f0c5998459-c0245c5c937c5dedcca3f1764ecc9b2f.r43.cf2.rackcdn.com/9353-login1210_khurana.pdf

https://issues.apache.org/jira/browse/HBASE-2406

推荐:http://www.ngdata.com/bending-time-in-hbase/

目前还是笔记,待整理为学习心得。

=======================================

什么是schema design?除了简单的表结构的create和update,更重要的是,如何适当的利用HBase提供的功能以适应复杂的业务需求,同时支持快速的写入/查询、使存储空间尽可能小。

更改表结构是否需要停服,何时生效?

由于table里唯一需要提前设定的就是column family,故这里表结构的更改也就是指column family及其相关属性。

online schema changes are supported in the 0.92.x codebase, but the 0.90.x codebase requires the table to be disabled.

When changes are made to either Tables or ColumnFamilies (e.g., region size, block size), these changes take effect the next time there is a major compaction and the StoreFiles get re-written.

column families的个数

  • HBase stores data in HDFS at $HBASE_HOME/TableName/regionId/ColumnFamily/HFiles.

如上所示,由于目前HBase的flush和compaction是基于region server的,即只要有一个column family需要flush,其相邻的cf也都会被flush,会造成不必要的IO负担。所以,建议一个table只建立一个column family,除非两组数据的读写真是完全的分离的(那为什么他们在一个table里?)

如果HBase后续改变了自己的flush、compaction机制,那么这条设计原则也需要随之改变了。毕竟一个cf,也会带来不少弊端,可能对于性能、压缩都略有影响,表结构的修改也会较为频繁。

当cf个数大于1时,还需要注意由于cf间数据稀疏程度不一,导致数据量相差较大时,由于region的划分是根据rowkey进行的,从而带来scan效率降低的问题。Where multiple ColumnFamilies exist in a single table, be aware of the cardinality (i.e., number of rows). If ColumnFamilyA has 1 million rows and ColumnFamilyB has 1 billion rows, ColumnFamilyA’s data will likely be spread across many, many regions (and RegionServers). This makes mass scans for ColumnFamilyA less efficient.

共用HBase集群的弊端之一

除了正常的资源竞争外,还可能由于其他使用方不恰当的rowkey设计导致的流量过载,使与其部署在同一region server的我们的regions不可用;反之亦然。

keyvalue存储数据结构

The KeyValue format inside a byte array is:

  • keylength
  • valuelength
  • key
  • value

 

The Key is further decomposed as:

  • rowlength
  • row (i.e., the rowkey)
  • columnfamilylength
  • columnfamily
  • columnqualifier
  • timestamp
  • keytype (e.g., Put, Delete, DeleteColumn, DeleteFamily)

 

KeyValue instances are not split across blocks. For example, if there is an 8 MB KeyValue, even if the block-size is 64kb this KeyValue will be read in as a coherent block. For more information, see the KeyValue source code.

 

即,“key”其实是由rowkey、columnfamily、column qualifier、timestamp等组成的!每一个column存储,都会附带这个庞大的“key”。该结构对rowkey设计的理解,至关重要!

rowkey设计原则

To prevent hotspotting on writes, design your row keys such that rows that truly do need to be in the same region are, but in the bigger picture, data is being written to multiple regions across the cluster, rather than one at a time.

全局是均匀分散的,而需要相邻的数据其rowkey是连续的。

过长的rowkey会带来巨大的内存浪费

要深入了解这一点,需要了解HFile和Region Server内存索引的数据结构,我目前还没有看完。但必须记住的是,rowkey尽可能短,与mysql设计不一样,这里不一定要明确表义,例如可以用u代替user。当然也可以适当提高block size,从而使冗余存储的内容和比例下降。

目前猜测,影响到Region server 内存的原因,是由于每个region server都需要知道所有region的分布情况,所以需要将所有HFile的index数据加载到内存里。同时,虽然values是按照column family分别存储的,但每个cell都会保存自己的rowkey等信息,即rowkey会被重复很多很多次。

Block size的配置

Over here we are not talking about hdfs block size but HFile block size which is by default set to 64kb. Each HFile has a byte offset marking its starting and ending position. Smaller the block size the better it is for random gets. Block size should be determine based on the use case, for example If the use case does random gets we should make the block size small but not too small so that one row spans over multiple blocks. Also there is an overhead involved in creating and maintain block indexes.  If the use case involves scans instead of gets then it is advisable to set it to zero such that we have as few as possible blocks.

基于最小化disk访问的原则,较小的block size,较利于random read;相对而言,较大的block size,较利于scan。但同时,由于block的管理也是有成本的,所以不可能把它设置的过小,默认64kb。

rowkey和column family的关系

关系型数据库里,很容易理解,一行里可以包含多列,所以可以认为自增id、列名都是在一个table的范围内的。

但HBase里,其说明是:

Rowkeys are scoped to ColumnFamilies. Thus, the same rowkey could exist in each ColumnFamily that exists in a table without collision.

Rowkeys的有效性是被局限在Columnfamilies里的,这是因为HBase是Column(Family) based存储的。即,同一个table不同CF可以有相同的rowkey,效果其实等同于一个rowkey对应的数据可以由多个CF组成的values组成。

rowkey取值空间与split的方式

Bytes.split是默认的split方法,它是按照ASCII码来分片的,即 ‘0’ is byte 48, and ‘f’ is byte 102。所以,如果rowkey的取值范围是000000 – ffffff之类的,就会导致分片的不均匀以及hotspot问题。因为按照ASCII码表,bytes 58 to 96在这个rowkey取值范围是不会有值的。

为了避免这些问题,在设计rowkey时,需要了解其取值范围和分布情况,并选择或自己开发合适的spilt method。

use timestamp as part of rowkey or version?

TODO

 opentsdb表结构解读

  1. 全站仅适用一个table,即tsdb
  2. 只有一个column family,即t
  3. Row Key – Row keys are byte arrays comprised of the metric UID, a base timestamp and the UID for tagk/v pairs: <metric_uid><timestamp><tagk1><tagv1>[...<tagkN><tagvN>]。采用自己设计的rowkey格式,包含了一些富信息。其中timestamp是采用秒精度计算,但仅映射到hour粒度,从而使相邻时间的数据尽可能相邻存储,从而提高查询效率(猜测其业务场景比较多顺序查找或scan)。
  4. column qualifier:也采用了自己设计的数据结构,包含了一些富信息,分为2bytes和4bytes两种。
  5. column values:按照qualifier format flags指示的,存储1-8bytes。

 

参考文档

http://hbase.apache.org/book/rowkey.design.html

https://communities.intel.com/community/itpeernetwork/datastack/blog/2013/11/10/discussion-on-designing-hbase-tables

http://phoenix.apache.org/salted.html

https://issues.apache.org/jira/browse/HBASE-11682

http://blog.sematext.com/2012/04/09/hbasewd-avoid-regionserver-hotspotting-despite-writing-records-with-sequential-keys/

http://opentsdb.net/docs/build/html/user_guide/backends/hbase.html

https://issues.apache.org/jira/browse/HBASE-3551

http://hbase.apache.org/book/regions.arch.html#keyvalue