Lost(本blog所有图片都放在picasa,如不能访问,请翻墙)

July 21, 2013

一个互联网级别流控工具的介绍

最近要做的一个项目,可能要抗大促的流量,所以调研了一下厂内的一个流控工具,该流控工具我觉得在最近的大促活动中,功绩显赫,所以趁机也学习一把。

符合逻辑,既然介绍流控工具,我们先定义一下什么叫流控工具,或者说流控工具需要解决什么问题(这里只针对互联网这个行业来下定义,其他行业我也不懂,想讲也讲不清楚)。那我们先来看看互联网企业会碰到什么样的问题需要流控工具来解决。

在互联网企业里,伴随着业务的发展,有2件事可能是避免不了的:1.访问量无准备的或者有准备的成倍或者几十倍的增加;无准备的情形比如企业的流量推广部门某天花了大价钱,在外面买了很多流量引到系统来,但是可能沟通的时候忘了告诉系统的技术负责人,这个系统这个时候就有可能悲剧了;有准备的增加,比如天猫和淘宝的大促活动,提前几个月各个部门就会预估到时候自己的流量会涨多少,这个就是有准备的,当然,这个准备也不是面面俱到,因为谁都没法非常精准的预估高峰时期真正的QPS会是多少,可能估个大概,然后上下30%左右浮动。2.第二件避免不了的是业务发展后,业务变复杂了,系统也变的复杂了,为了便于维护,系统必然会进行拆分;而系统拆分后,就会存在这样的问题,即各个系统不是同等重要的,有些是核心系统,有些是非核心系统,同时在依赖上,也会存才核心系统依赖非核心系统,或者一个系统既依赖核心系统,又依赖非核心系统的状况。

上面的2件事,第一件事其实是流控工具要解决的问题;第二件事情,则是和流控工具如何解决问题有一定关系。先说要解决的问题,如果系统遇到自己没有准备好,也就是超出自己处理能力范围之外的流量时,该怎么办?这个就是流控工具要解决的问题。

可以思考一下如果读者你要来解决这个问题,你会怎么解决?我先说说我的思路:

1,做一个懒惰的程序员,啥都不管,流量大扛不住就扛不住吧;如果做大促的话,系统一旦超过自己的处理范围,会瞬间垮掉,而且在整个大促期间,该系统都起不来,哪怕系统重启,重启后新来的流量也会瞬间压垮系统(这里我们假设用户不会因为系统前面的不可用而离去,而是一直在等待重试);ok,作为一个程序员,这样的结果只能是工作不努力,只能努力找工作了。

2,做一个身体上比较勤奋的程序员,之所以身体比较勤奋,就是懒的思考,既然流量要增加(不考虑那种突然增加的那种),那好办,我们增加机器吧;其实平心而论,这种方法也能解决问题,如果对流量估计的很准,其实是能解决问题的,而且如果在流量增加的时候,但是这里面有2个不好的地方;一是流量很难估的准,二如果每个系统都靠增加机器来抗这个流量,那么会浪费钱,比如可能在大促期间,最经济节约的办法就是核心流程的系统都要提供正常服务,但是一些非关键系统,其可以进行降级服务,即对于容量内的请求,非关键系统可以按正常处理,但是一旦某些请求处理不过来,则可以让这些请求不走该非关键系统(比如对于淘宝,选择商品和支付是核心流程,但是商品如何优惠,可以认为不是核心流程)。作为一个程序员,只能说这种方法能够让你免于第一种的悲惨遭遇,但是你自己的存在感和价值感可能也不太强。

3,好吧,我们的英雄程序员是第3种,首先,他想解决问题,其次,他想创造性的,优雅的解决问题。特别是为了实现第二个目标,他开始认真思考了,首先,要进行流控的本质原因是什么?其实这个本质原因就是当前的系统的处理能力不满足增加后的流量续期。那么不满足需求后,导致的问题或者现象是什么呢?为了简单,这个英雄程序员简化了系统依赖,假设这个互联网企业就是有2个系统,一个系统叫 FrontWeb,是最前端的web系统,直接接收用户的访问,然后该系统就依赖了一个系统,比如叫BackBusinessSystem,叫后端业务系统。假设FW系统和BBS系统平时的处理能力都是100 QPS,现在突然要处理的QPS是200.这里分2种情形,一种是FW访问BBS时,没有设置请求线程上限,这种情况下,当QPS涨到200后,在FW中,新来的请求会通过使用新线程的方式来访问BBS,从而导致BBS中同时处理的请求变多(工作线程也变多),每个请求在BBS中的处理时长变长;还有一种方式是FW中设定了能够同时访问BBS的线程最大数目(比如线程池,连接池,都可以起到这个作用),比如就是100,当QPS为100时,每个请求进入FW后,都可以活动一个线程来请求BBS,但是当QPS到200后,会有100个请求先抢到线程,访问BBS,其他100个请求要等线程处理完成后,才能处理自己的请求。第一种和第二种对于外界用户的感知来说,宏观上来说,请求的平均响应时间都会变长,但是从微观角度来讲,是不一样的,第一种情形,每个用户的请求响应时间都会变成;但是第二种在理想情况下,会是200个用户中,有100个用户的响应时间是不受影响的,另外100个用户的响应时间则是变慢了。这2种结果那种好?是不是会想当然认为第2种好,在现实世界里,确实是第2种好,因为这种情况下,有一半的用户是可以正常访问的,这也是当系统扛不住时,服务优雅降级的一种体现,能服务一半的用户,比每个用户都服务不好要好。不过非要抬扛的话,我是觉得第一种会更公平一点;第二种的公平性略差;不深入了,再深入就牵扯到公平和效率的哲学问题了。

扯太远了,一句话,当系统的处理能力不够时,导致的后果就是请求的响应时间变长,当请求的响应时间变得很长,用户已经没有耐心去等待结果的返回时,其实就意味着该系统已经不可用了。

因此,一个好的流控工具,我们其实希望它能做到如下几点:1.能够实现服务和系统的优雅降级,即如果流量实在扛不住,可以正常服务其中的部分请求,而不是所有的请求都不能服务。2.如果服务出现过载的情况,能够逐渐恢复到正常状态。而不是让服务和系统一直过载,一直没法响应。第一,二个程序员的处理办法都可能导致这种状况。

我们前面说了,系统处理能力不够时,其一定会表现出请求的响应时间过长,所以作为最简单的流控工具,可以通过给依赖的系统设定响应超时时间来解决部分问题,其实大部分的系统也是这样做的,比如访问数据库,会有获取连接的超时时间,会有查询的超时时间;不过我们来仔细review 一下设置超时时间的问题:1.超时时间设置多少合适?比如一个BBS系统,当其QPS为100时,其平均响应时间为10ms,但是其99.99%的请求的响应时间为50ms;而系统平时的请求的服务质量要达到4个9,所以FW中需要把BBS的响应超时设置为50ms,但是想一下,如果当BBS真的是因为QPS超过处理范围而变慢,FW中每个请求等50ms才返回,FW中会积压多少请求?这个问题的本质原因是由于BBS对于不同的请求其响应时间具有波动性(比如一个请求只要从数据库捞一条数据;一个请求需要从数据库捞1000条数据),如果没有这种波动性,也即每个请求的响应时间都是一样的,都等于平均响应处理时间,那么则不存在这个难题。2.超时时间还有一个更重要的问题,就是你明明知道BBS系统已经过载了,根本处理不了这么多请求了,每个进入FW的请求,还是会去BBS里面走一遭,而不说”我知道你不行了,我就不来麻烦你了”。这2个问题,基本上宣告设置超时时间这种办法,对于流控根本不可行。

那么有没有其他办法?其实从上面的描述中,我们发现,做流控,做服务的降级,最好的办法就是如果发现后端一个系统的处理能力已经不够的时候,能够让剩下的请求直接不去访问该服务。后端服务处理能力不够可以通过记录最近一段时间每个请求的平均响应时间来进行判断,那么当发现后端处理能力不够的时候,怎么来让后续的请求直接掉头就走,不继续去访问呢?这个其实比较简单,因为对其他系统和服务的访问肯定是在一个方法里进行,土一点的办法,可以在这个方法里做一个判断,当一个新的请求调用该方法时,看一下该方法对应的后端系统是否还有足够的处理能力,如果有,则让该请求继续访问后端服务;如果没有的话,则直接返回一个默认值,不去请求后端服务了。原理就是这样的,当然,为了解耦,维护性等角度考虑,可以将这个判断和实施抽取到单独的模块里来,然后通过aop的方式apply到要进行流控的与后端系统交互的方法上去。

基本原理就是这样,不过我厂的流控,有2种策略,一种是控制进入某个方法的线程数(就是有多少线程正在处于和后端系统交互的过程中,如果后端响应慢,一定会有很多线程堆积在这里),这个有点类似于排队,就是在性能的瓶颈点上,采取排队策略,最多让n个线程进入该瓶颈点,n个之外后来的请求,就直接挡掉;还有一种策略是通过控制线程数+响应时间的方式,大概机制就是设定一个线程数的阀值和响应时间的阀值,如果响应时间超过阀值了,那么说明在该线程数下,后端系统是处理不过来的,于是在这种情况下,会逐渐减低线程数的阀值,比如每次递减3,知道平均响应时间达标为止。

总结下来,我厂的流控工具其实主要nb的地方就是运用了这个排队原理,而不是通过单纯控制响应时间来做流控。当然,就我个人来说,看完代码后,我觉得该流控工具还nb在其产品化的思路,其不是只做一个流控工具,而是将周边的需求都考虑到了,比如统计流控工具的运行状况,通过类似于restful方式动态调整流控工具的阀值等。

July 18, 2013

jdbc 中如何实现mysql相关的statement的超时控制

Filed under: 性能相关 — admin @ 1:38 pm

因为最近在做一个项目,可能需要抗双十一的压力,由于后端接了一个集团自己研发的存储,但是该存储宣称支持和mysql一样的协议,所以需要调研一下该存储如何实现查询超时的控制,翻了一下mysql的jdbc实现,发现mysql是这样做的:
1.由于query是绑定在statement上执行的,所以在statement上设定querytimeout
2.然后statment执行的时候,会生成一个TimerTask,这个task的作用就是如果执行的时间已经超过了设定的querytimeout,则该task会拿到statment引用的connection,创建一个和该conneciton类似的connection,所以类似,就是新创建的connection在诸如ip,port,databasename等关键属性上和原有connection保持一致。
3.创建好该connection后,会给该connection发送KILL QUERY connectionID,其中connectionID就是执行的query对应的thread ID,意思就是告诉mysql在服务端把该执行线程kill掉。(不过我很好奇mysql kill这个执行语句是如何实现的)
4.发送了该语句,正常情况下,mysql在服务端会把该query kill掉,从而客户端等待该query返回的statement就能快速的感知到异常,从而实现返回。

April 14, 2013

HSF发布服务流程

Filed under: java 开源项目 — admin @ 5:55 am

对于一个服务框架来说,其必须要做的一件事情就是将现有的所有服务的信息能够放在一个地方(我们暂且称其为信息中心),这样当有消费者想查看某个服务是否存在,或者其已经确定想用某个服务,但是不太清楚一些细节信息时(比如实际服务提供者的网络地址),其可以在信息中心进行查询。

这个时候大家可以想一下,对于这个信息中心,如果是你来设计,你首先想到其应该具有哪些功能?我个人觉得基本功能有(从基础到高级排序):

1.信息的写入(服务提供者可以向其写入自己提供的服务相关信息,术语叫服务发布)和读取(服务消费者需要能够查询相关服务的信息)

2.信息安全存储,确保信息不丢失。

3.主动检测服务提供者的情况并及时推送给服务消费者。比如迅速检测到某个服务提供者down机了,然后把信息通告给相关的消费者,让这些消费者不要再向该机器发送请求了(如果服务提供者有多个实例,可以向其他实例发送请求)

4.分布式,我个人觉得最好的模式应该是主备+集群模式,信息根据hash算法hash到不同的机器,每台机器的信息除了自己保留一份外,还在其他机器上有一个备份。这样的好处是:1.hash能够提供扩展性和避免单点(所有的信息不是都存放在一台机器上了)2.主备的话,可以当一台机器挂掉的时候,基本不受影响。 当然,这一块我觉得可以根据实际情况在实现的复杂度和实际情况之间做折衷,比如如果信息中心承载的信息很少,一台单机存放绰绰有余,那我觉得就没必要搞这么复杂,可以考虑一台主机+2台备机模式,主备之间的数据一致性,再根据实际的要求进行实现即可。

当然,信息中心的服务信息的源头来自于服务提供者,服务提供者应该在自己初始化阶段,就向信息中心发布自己的服务信息。架构如下图所示:

 

在淘宝内部,HSF使用的信息中心是ConfigServer,由于这个系统我自己还没学习过,所以对于信息中心应该具有的功能,我上面的思考可能和ConfigServer实际提供的并不一致,这个可以以后再专门进行分析,这篇我们就重点来看看HSF里服务的发布流程。

所谓发布,在服务框架里其实就是发布服务的元信息,那么,第一不想到的就是这些元信息是如何表达的,其由谁生成,怎么方便的生成等问题,这里就不兜圈子了,直接上图:

HSF里的服务,到目前为止,就是把Srping的Bean当成服务进行暴露,让用户可以远程调用,所以这些服务信息也直接在spring的bean里进行配置。

具体的发布流程如下:

1.应用启动的时候,会初始化HSFSpringProviderBean,初始化的时候,会先根据xml里面该bean的配置信息,来生成ServiceMeta,也就是服务元信息。

2.生成好服务元信息后,在HSFSpringProveriderBean的inti初始化方法里,会调用ProcessService(该Service控制了HSF服务的发布和消费主流程)进行服务信息发布,此ProcessService是静态变量,全局惟一。

3.在ProcessService里,调用MetaService来进行发布,在这个步骤,系统进行了可扩展性的设计,也就是定义了prePublish Hook 和 postPublish Hook,在发布前会调用注册进来的所有的发布前钩子函数,在发布后会调用注册进来的所有postPublish钩子函数。

4.MetaService中,首先会根据service信息来检查此service是否已经发布过了,如果已经发布过了,那么就不发布,也就是同一个服务在同一个jvm里只会被发布一次。

5.然后根据服务信息生成发布者登记表(这个登记表具体是用来干什么的还没想明白),然后根据发布者登记表来生成一个发布者。然后调用发布者(Publisher)来发布服务信息。

6.对于DefaultPublisher来说,其发布时,首先检查承载服务信息的类是否是可序列化的,这点很好理解,毕竟这些信息都是要通过网络传送给信息中心的,所以中间一定是需要进行序列化的。而其检查对象是否可序列化,其实就是把该对象往ObjectOutputStream里写一遍,看是否会抛异常(在这里貌似和大家宣扬的不要用异常来判断业务逻辑向违背,不过我猜测可能也确实没有其他更优雅的方法来判断一个对象是否可以被序列化了,所以才出此下策)

7.确认对象是可序列化的后,会首先把要发布的服务信息构建成一个PublishingRequestedEvent,然后塞到一个eventStore里去,这一步的操作和功能无关,而是为了出现问题时,定位问题方便。该eventStore会存储最近的一些Event信息,这样当万一发生问题时,可以输出这些最近的Event信息,看在发生问题之前,到底进行过哪些操作,相当于是帮助了解上下文信息。便于定位问题。接下来进入deliver流程。

8.deliver中,会调用worker来scheudle自己(也就是DefaultPublisher).这里的流程比较绕,我们来仔细讲一下。

9.这里的worker,是ConfigClientWorker的对象,其本身是隶属于淘宝的configserver系统,而非hsf。我们来看一下ConfigClientWorker的实现,ConfigClientWorker拥有一个WorkThread,ConfigClientWorker接收了task后,只是将task放到一个队列里,然后告诉workthread,有新的task进来了,你需要处理一下。而WorkThread为了能够处理这些任务,说白了,这些任务就是要把信息通过网络发送给远端的信息中心,然后收到远端的返回消息。所以WorkThread需要有网络通信能力,这个通信能力封装在ConfigClientConnection中,ConfigClientConnection提供了网络连接等基础功能,同时,其还封装了2个方法,sendReceive和send方法,前者代表了一个网络来回,即客户端发送,服务端响应;而后者则只是实现客户端往服务端发送,发送后不用关心处理服务端的响应消息。通过ConfigClientConnection,WorkThread可以不用处理网络的底层细节,其只需要将task封装成ConfigClientConnection要求的ProtocolPackage对象,然后调用其的sendReceive方法,就能实现网络收发。

10.我们现在回过头来关注一下WorkThread的整个处理流程,由于其是一个线程,所以其实现了run方法,在run方法里可以理解为无限循环,在循环里,其首先检查ConfigClientWorker的mailbox是否有消息,这里的消息代表者服务端发过来的消息,如果有消息的话,则调用相应的处理类将其处理掉。

11.接下来开始处理task,其先构建一个TaskContext,这个TaskContext会持有一个ProtocolPackage,同时还会持有ConfigClientWorker,然后遍历task队列,对每个task,调用其run

方法,task的run方法执行的时候,会接收TaskContext作为参数,其实这些task的run方法不会真正处理task,而只是把task里包含的服务信息写道context的ProtocolPackage包里去,这样当所有的task都遍历完后,context的ProtocolPackage对象会有所有这些task需要发送给服务端的信息,因此这个时候,ConfigClientWorker只需要将这个ProtocolPackage的信息进行发送即可,实际上它也确实是这样做的,通过调用ConfigClientConnection的sendReceive方法,其将这些信息发送给服务端,然后接收服务端的响应信息,并进行处理。这里比较巧的地方就是其会把task进行聚合,让多个task聚合成一个网络包一起发送,这样的好处是节省网络带宽,提高性能;缺点就是我觉得发送的报文和返回的报文格式会更复杂一点,毕竟可能要表示哪个task的发送是成功的,哪个task的发送结果是失败的。

January 21, 2013

win7下使用cygwin 安装hadoop的一些问题记录

Filed under: hadoop — admin @ 3:23 pm

安装的主体教程可以在网上搜一下”在Windows上安装Hadoop教程”这个pdf文档,里面讲的比较详细了,可以根据其一步步来,我讲一下自己碰到的一些特殊问题。

1.安装cygwin的时候,由于使用的是公司域帐号,而当时由于是在家里,没有远程登录公司的域,导致了sshd服务安装后,没法进行连接,折腾了好久,如果也有我这种情况,请确保在安装cygwin的时候自己域帐号和对应的域是连接上的。
2.在启动sshd时有可能遇到“Privilege separation user sshd does not exist”的错误信息,解决方案:
往etc/passwd里添加如下内容
sshd:x:74:74:Privilege-separated SSH:/var/empty/sshd:/sbin/nologin
3.hadoop启动过程中,如果碰到dfs的permission的问题,则可以通过conf/hdfs-site.xml,找到dfs.permissions属性修改为false
4.启动jobtracker过程中,如果碰到obtracker.info could only be replicated to 0 nodes, instead of 1
说明找不到datanode,看是否还没有启动datanode,或者启动有异常。
5.使用非0.20.2版本的haoop,有可能在启动tasktracker的时候,出现:
13/01/21 23:12:53 ERROR mapred.TaskTracker: Can not start task tracker because java.io.IOException: Failed to set permissions of path: /tmp/hadoop-shouru.hw/mapred/local/ttprivate to 0700
这个异常可以去查一下hadoop的jira,据说和是否在cygwin上无关,而且貌似也无解,当碰到这个问题的时候,还是退回到使用0.20.2版本比较好。

January 14, 2013

hive新手碰到的一个诡异问题

Filed under: hive — admin @ 4:00 pm

问题背景:
1.可以简单认为是在hive上建立了一个表,一开始为了自己开发,使用hive的load语法,从一个文件中导入数据到该表
2.文件中因为含有中文,由于hive默认是支持UTF-8编码,所以在windows下生成了文件,并且使用UTF-8编码方式进行保存。

问题表现:
假设在该UTF-8文件中只有一行数据,该行数据对应表的唯一列(假设列名为column_a),假设数据内容为”123″,则当你select全表的时候,能显示该记录,但是当你要用该column来做条件匹配的时候,比如select * from table a where column_a=’123′,这样死活不能匹配记录。

问题原因:

windows下的utf-8文件,其会在文件头存上3个16进制的字符:’EE BB BF’,这是3个不可见字符,遗憾的是这3个字符也会导入到hive表中,而在显示的时候,由于是不可见字符,所以你看到的只有123,其实在123前面还有这3个字符,因此你用其来和’123′进行相当匹配的时候,不会过滤出记录。

January 13, 2013

利用hbase的coprocessor机制来在hbase上增加sql解析引擎–(二)具体实现

Filed under: 存储 — admin @ 4:23 pm

实现上,需要实现的几个大的功能有:1.提供基于hbase的jdbc接口;2.sql解析引擎(使用druid);3.schema转换,sql感知的schema转换成底层hbase中世纪表的schema

1.基于hbase的jdbc实现,这个不用多说,把jdk里的jdbc相关接口按照hbase方式进行实现即可,由于这里面的接口方法奇多,所以只实现自己能用到的方法,用不到的写个空实现即可。
2.sql解析引擎,从druid中抽取出来的代码封装而成(之所以要抽取出来,是因为这部分代码需要部署在hbase的region server上,因此要尽量精简,不要有过多无用的依赖)
3.schema转换,schema中会存放该表的各个字段的名称和类型,我们会利用类型信息,来将hbase里的byte数组转换成对应的java类型。

在程序组织上,实际上分成了4个模块:

interface模块包含一些公共的基础bean,主要是用来表征一行记录的类(Record),sql-client和sql-server都会依赖它。sql-client和sql-server是2个不同的模块,前者部署在客户端,其封装了对jdbc的实现,同时将用户的sql进行必要的检查,并且转换成对hbase友好的sql。sql-server部署在hbase 的region server上,其主要职责是从传递过来的sql中取出rowkey的区间,根据这个区间来生成scan,通过scan获得返回的result,将这些result转换成record,然后再根据sql语句中的过滤条件,对这些record进行过滤,符合条件的record,会转换成MapWritable,后者能够进行序列化,返回给客户端,客户端再进行反序列化,就能够拿到结果。同时,在sql-client和sql-server之间,传递的String类型的sql语句,也就是客户端和服务端会对sql进行2次parse,这样做的原因是因为使用string进行传递的话,不需要考虑序列化,反序列化的事情,方便一些,缺点就是解析2次,耗cpu,但是这个目前来说问题不大。

流程图:

October 7, 2012

利用hbase的coprocessor机制来在hbase上增加sql解析引擎–(一)原因&架构

自己参与构建的一个产品,其场景是每天凌晨批量导入计算好的前一天相关的业务数据到前端存储(数据库或者nosql),然后供用户调用。业务场景有2个特点:
1.对于前端存储,存在大量的批量导入,数据量比较大,每天导入的数据达到数亿行。可以理解为凌晨集中写,然后白天只进行读。
2.查询来说相对简单,如果是存数据库的话,每个表大概会由3~4个列来构成一个组合索引,然后查询就是根据这个组合索引来进行。

产品刚上线的时候,前端存储是选用的mysql,后来由于mysql在写入时压力确实很大,而且也不想让所有的技术方案都绑定在关系数据库上,套用某知名DBA的话,我们自己需要拥有在不同存储上进行切换的能力,而且业务场景的2个特点,都比较符合hbase,所以决定选用hbase进行尝试。

但是这样问题又来了,产品刚上线的时候,使用的mysql,所以在业务实现上,使用了大量的sql,而且,我们也觉得sql是好东西(不要被nosql 这个名词所糊弄,其实这个名字取的有点不好,从我的理解来讲,nosql的起因是现有关系数据库做了太多的事情,比如事物,B+索引等,导致关系数据库本身代码很复杂,同时写入比较慢;而对于很多业务场景来说,其实它可能只需要用到关系数据库的部分功能,但是没有选择,却要承担关系数据库做了太多事情带来的恶果;而对于sql,一来其语义表达能力强,二来其本身代价也不大,先阶段毕竟还是IO是最耗时的,sql解析不是瓶颈,物美价廉,为啥不用),所以我们想让hbase也支持sql,这样相当于我们对选用的所有的存储都有一个统一的接口要求,即该存储的接口需要能支持sql。

从业务场景上,可以推出我们的需求:

1.数据是批量写的,不需要对写实现sql功能。

2.查询简单,可以理解为查询都可以落到rowkey上。

3.sql处理只读,不需要考虑事物等复杂的问题。

好了,既然需求已经比较清楚了,现在就要来看如何进行实现了,从架构层面来说,这个sql引擎有2个地方可以放,一个是放在客户端,即sql解析后,解析出startkey,endkey后,拼装成一个scan,发送给hbase server,拿到hbase server的返回的数据后,再根据sql里面的条件,对这些数据进行过滤,返回给业务层。还有一个就是将sql解析引擎放在hbase 服务端,sql引擎和hbase之间的数据传递直接在region server 的jvm进程内进行传递,sql引擎将数据过滤后,再返回给业务层。放在server 端相对于放在客户端的好处是如果sql引擎需要过滤大量的数据话,这些数据直接在服务端就被过滤掉,不需要耗费客户端和服务端之间的网络流量,性能会更高一些。so,在我们的实现里,我们选择在服务端放sql 引擎。

这个决定做了后,其实我们大的架构也基本定了,如下图所示:

 

为了方便理解,其实大家可以将mysql认为是一个sql 解析引擎+一个存储引擎(实际上mysql也确实是这样做的,其架构上分成了server+各种存储引擎的插件,而sql解析就是在server这一层做掉的)。而现有的hbase则是一个存储引擎,你能往里面写入数据,也能从中获取数据,而我们现在要做的,就是要在其上增加一个sql 解析引擎,从而使的其看起来像个关系数据库。

好了,大的架构定了,那么如何将sql引擎部署到hbase上去,以及sql引擎如何与hbase打交道?这一块幸亏0.92的hbase里提供了coprocessor机制,我们可以将自己的sql引擎封装成一个coprocessor,来实现接收客户端的请求,并将请求的sql进行解析,转换成hbase能够识别的scan,利用scan获取数据,然后将获取回来的数据再根据sql进行过滤和计算等,然后返回给客户端。

这里sql的解析,使用的是druid,阿里巴巴B2B开源的一个项目(之所以用druid,因为我们是在淘宝,大家属于一个集团,有问题也好解决,sql解析本身有很多成熟的开源软件,所以如果你自己所在公司不是阿里巴巴集团的,其实可能还是选择其他成熟的开源项目更靠谱一些)。druid我就不详细介绍了,有兴趣的同学可以去看一下该开源项目,加深了解。

这一篇就到此为止,下一篇我们讲一下更细节的实现以及实战过程中碰到的一些问题。

hbase coprocessor的介绍:https://blogs.apache.org/hbase/entry/coprocessor_introduction

druid的连接:http://code.alibabatech.com/wiki/display/Druid/Home

Le Philosophe Du Valais Ou Correspondance Philosophique...

 

 

 

 

May 1, 2012

从存储角度分析hbase

Filed under: 存储 — admin @ 9:27 am

最近在做项目,数据量太大,批量插入的时候,数据库速度相当来说比较慢(单台机器每秒8500行),需要找寻其他存储,结合稳定性以及公司在各种存储积累的经验来看,还是hbase比较靠谱。从存储角度研究了一下,写出这篇blog,权当总结。

对于一个存储,如果不需要其计算的话(当然,完全不进行任何计算的存储,其实是有问题的,因为这样一来会浪费存储服务器的cpu,二来如果不进行计算,一个sum求值,会导致大量的detail数据从存储发送到计算服务器上去,导致浪费网络),那么最需要考察的3个方面,个人认为是其存储的数据模型(是一个二维表还是k-v),插入速度,读取速度(读取速度其实又分为单行读取和区间读取等)。本文就从这3方面来对hbase进行分析。

图1.HBase的架构图

1.数据模型

hbase的理论是基于google的bigtable,所以其实际存储的,是一个松散的二维表,所谓松散,是指其没有很强的schema,是一个稀疏表(该表的schema定义了1000个列,但其实可能实际存储的大部分数据,都只有50列有数据,剩余列的数据都为空)。

在实际存储时,一个hbase table会有几个column family(现在的hbase,一个表有2,3个column family足矣,再多了hbase会有问题),一个column family可以有很多column(成千上万都可以)。column family 里的所有column,都会存储在一个文件里(不同的column family是放在不同的文件里),也即意味着这些column在物理上是相邻的,也即反推出来,当你设计column family时,需要将那些需要经常一起读取的列放在一个column family里。

2.插入数据

我们之所以考察hbase,是因为现在mysql的插入速度不符合我们预期,而mysql插入速度之所以慢,主要是因为为了维护索引(我们是唯一索引,在mysql中通过B tree来实现),而为了维护索引,特别是唯一索引的时候,会产生随机读(为了确定要插入的recored所组成的索引是否已经存在了),同时也会产生随机写(如果插入的数据不是按照索引字段进行排序,那么会产生大量的page split),同时,我个人理解,哪怕插入的数据是按照是按照索引列进行排序,其实还是会产生随机写,因为所谓的顺序写,应该也是逻辑上的顺序,实际在物理上落到磁盘上,还是不相邻的page,因为磁盘上没法为一个表的index预留大量连续的空间,特别是不知道你这个表的索引会变成多大时。

那么hbase的插入速度是怎么样呢?据说比较快(没有实际测试过,所以只能是据说),那么为了实现这种快,从上面的解释来看,它一定需要避免走mysql的老路,mysql的随机读写,可以说是一部血泪史,那么hbase是怎么避免随机读写的呢?我们来看一下hbase的插入流程。

插入数据时,hbase先顺序写入到HLog里(如果不启用HLog,连这个顺序写入都可以免掉),HLog写完后,再在内存的memstore里进行更新(内存里的数据会根据rowkey进行排序),当内存里积累的数据达到一定量时(64M),会将内存里的数据flush到磁盘(这个时候也可以理解为顺序写,不过应该是逻辑上的顺序,磁盘上可能还是没法找到这么大块连续的空间),刷写成storefile。此时HLog里保存的对应记录可以进行删除,也就是memstore和Hlog里保存了同样的数据,这样如果regionserver 此时宕机了,那么重启后,Hlog里面会保存还没有刷写到storefile里面的记录。可以发现,HBase在插入数据时,不需要磁盘随机读写,而是尽可能利用磁盘顺序写,因而能提高写入速度。

当然,从hbase性能测试人员了解到的数据,如果开启HLog的话,Hlog会成为插入的瓶颈(因为为了确保插入的数据不丢失,Hlog不是一个本地文件,而是HDFS文件,所以除了会在本地写一份外,还会在网络上写备份),现在测试结果大概是单机12000L/S。而从hbase维护团队获得的信息,关闭hlog方式的插入,单机速度可以达到4~60000L/S。

其实hbase采用的这种方式,就是被称为Log Structure Merged的算法,我个人的理解其是将一些修改在内存里聚集起来(同时,为了确保这段期间的修改不会丢失,它也要将这些修改flush到磁盘,但是很巧妙,它将这个期间的修改是append到一个日志文件,这样就是使用了磁盘的顺序写),不是每个修改来了后,都马上去找到磁盘里对应的数据,进行修改;这样的话,当内存里聚集到一定数量时,它可以和磁盘里的数据做merge,进行批量修改。从而达到更快的插入速度。那么,这个时候要是有读的话,该如何处理呢?在hbase中,读的时候,会先检索内存里的memstore,看是否有对应的记录;如果内存里没有,再去建设storefile,可以看出,此时它的读,会比mysql那种方式稍微复杂一点,会有2个数据源需要处理。

3.读取数据

评估hbase,我们是看到它的插入速度,但是,作为一个线上系统,实时读的响应情况,其实比插入写更重要,毕竟我批量插入失败的时候,我有个时间窗口可以重试(我们一般是凌晨开始同步数据,到早上7点这段时间,都可以进行同步数据的工作),但是如果突然发现线上实时读比较慢的话,那么这样会直接影响到最终用户,是一个比较大的问题了。 所以我们还需要评估hbase的读,由于还没有实地测试过,所以还是从其架构上来进行评估。

图2.HFile存储格式

StoreFile是对Hfile的轻量级包装,HFile由block组成,当要读取数据时,会先找Data index,通过data index 找到对应的data block,把block加载到内存里,然后scan block,从中获取指定的K-V。data index一般在打开HFile时,就会被加载到内存,并在block cache中被cache住,所以一般情况下,如果对应的data block没有在内存里的话,查找一条数据一般需要进行一次磁盘IO。同时,针对我们项目里跨时间区间进行查询的场景,在hbase里,如果rowkey设计的好,将时间因素考虑在内,有可能能将不同业务日期的数据聚集在一起,从而可能一次磁盘IO就将需要的所有数据获取到,提高读取效率。当然,从根本上来说,想要读取快,还是需要结果尽量落在内存里。

 

April 29, 2012

mysql-对索引优化的大批量数据插入

Filed under: 存储 — admin @ 9:50 am

做的一个项目,在haoop上计算好业务数据,然后回流到mysql里,每天需要回流的数据量大概有2亿行左右。这2亿行数据在mysql端,分属于9个表,这里面最大的一个表,每天回流1亿数据,这个表在mysql里被分成了1024个分表,也就是基本上每个分表每天要回流10万行数据。

由于牵扯到具体业务,为了避免不必要的纠纷,就不细讲这个表的schema,笼统来讲,这个表采用innodb引擎,使用自增主键,同时使用3个列来做唯一索引(2个列是long,一个列是datetime类型)。整个表60多个列,除了一个列是datetime类型外,其他列都是int,long类型。

测试分成3种情况,一种是在haoop上,不对要插入的数据进行hash和排序(hash是按照一个列进行hash,该列既是构成唯一索引的列,又是构成用来进行分库分表的列),一种是在haoop上,对要插入的数据进行hash以及排序,第三种是在第二种的基础上,将binlog进行关闭。

1.上图纵坐标是每秒插入的条数,横坐标是当数据库累积插入第多少天业务数据时的速度(横坐标应该除以10,即最多不是累积到140,而是14)。
2.由于测试数据库的硬盘容量的关系,最多只能插入14天数据。
3.从上图结果来看,是否去掉binlog对插入速度影响不大(原因应该是我们已经设置了innodb_flush_log_at_trx_commit = 2)。
4.对组成索引的字段进行排序,则可以大幅提高插入速度。如果直接将hadoop产生的数据不做处理,进行插入,当插入第5天
数据时,插入速度已经下降到2000多行/S。

原因分析:

被插入的表,有一个唯一索引,构成的列为seller_id,biz_date,area(卖家ID,日期,其实是个datetime类型,地区),一个卖家当天可能有1000条数据需要进行插入,如果不对卖家ID进行hash和排序,那么,这些这1000条数据可能分散在多个云梯文件里,同步某个文件时,可能插入5条数据进去,这个时候为了维护索引,需要将该索引对应的页从硬盘读入内存,插入5条数据后,由于后续数据都不和该卖家相关,此索引页可能被唤出,下一次再碰到5条数据时,可能又被唤入,以此类推,该索引页可能需要被唤入换出20次;而如果按照卖家ID索引和排序,一个卖家的数据都集中在一起,那么这1000条数据的插入,只需要唤入唤出索引页1次。所以才会造成插入速度有如此显著的差异。同时,如果不对seller_id进行排序,插入的seller_id是无序的,这样会造成B tree的page进行split(比如某个page已经存放了11,13,15这3个seller_id,结果后来突然插入了14,那么如果一个page只能存3个seller_id的话,那么这种情况下就会进行split。mysql中,除了hash索引,全文索引外,其他索引都是通过B tree实现的),从而产生随机IO.

 

同时,无论是否hash源数据,插入速度都在下降,这是由于维护索引造成的,当索引大了后,为了维护索引,需要从硬盘上读取索引所在页进入内存,会导致随机读写,其索引越大,随机读写越多,从而造成插入速度一直呈下降趋势。

结论:

现在各大互联网公司,应该都有业务场景涉及到离线计算大量日志或是业务数据,然后需要同步到一个存储里,再对外提供实时读服务。如果这个存储很不幸是mysql,且引擎是innodb的话,那么很不幸,索引的存在会使得插入速度随着数据的累积而下降。那么为了让这个下降速度慢一些,在离线计算时,让产生的数据就按照索引字段排好序,有利于大幅提高插入速度。

附测试环境:

“方案1主备同步”的数据库环境已经搭建好

my031093.sqa.cm4(主) db031175.sqa.cm4(备)
IP 10.232.31.93 10.232.31.167
端口 3306 3306
CPU 8| Intel(R) Xeon(R) CPU E5420 @ 2.50GHz 8| Intel(R) Xeon(R) CPU E5420 @ 2.50GHz
内存 16G=2Gx8 16G=4Gx4
硬盘 SAS 899.6 GB SAS 898.3 GB

 

MySQL为5.1.48版本,innodb的相关配置是:

default-storage-engine = INNODB

innodb_flush_method = O_DIRECT

innodb_file_per_table = 1

innodb_open_files=60000

innodb_flush_log_at_trx_commit = 2

innodb_lock_wait_timeout = 100

innodb_additional_mem_pool_size = 20M

innodb_log_buffer_size= 200M

innodb_log_file_size = 100M

innodb_log_files_in_group = 4

innodb_file_io_threads = 4

innodb_thread_concurrency = 64

innodb_max_dirty_pages_pct = 50

innodb_data_file_path = ibdata1:5G;ibdata2:5G:autoextend

innodb_buffer_pool_size=11G

 

January 15, 2012

观察者模式
The Railway Auditor

Filed under: 设计模式 — admin @ 5:23 am

1.应用场景:当有一对象的改变,需要通知多个接收方时,需要使用这种模式。

2.解决的问题:使用观察者模式,被观察对象和观察者之间耦合比较松(通过实现各自的接口来达到这个目的,被观察者不需要感知具体的观察者,只要知道观察者接口即可,其也不需要了解观察者的数量;观察者其实也可以不用感知具体的被观察对象,只要知道其接口即可),被观察对象可以挂载多个观察者,且添加和删除观察者,对于被观察对象没有任何影响。

场景举例:当发生一个新闻事件(Subject)时,会有多个通讯社(Observer)进行报道。使用jdk自带的Observable,Observer接口。

public class News extends Observable{

	private String content;

	public String getContent() {
		return content;
	}

	public void setContent(String content) {
		this.content = content;
		this.setChanged();
		this.notifyObservers(content);
	}

}

新闻类,观察者模式中的主题。

public class CCTVReporter implements Observer{

	@Override
	public void update(Observable o, Object arg) {
		System.out.println(this.getClass().getName()+ ":The news's content has changed!");

	}

}

观察者一,这个观察者比较二,不报道新闻内容。

public class BBCReporter implements Observer{

	@Override
	public void update(Observable o, Object arg) {
		System.out.println(this.getClass().getName()+ ":The news's content has changed! The content is:"+(String)arg);

	}
}

观察者二,这个观察者比较靠谱,能够报道新闻的内容。

@Test
	public void oneToMany() {

		News news = new News();
		news.addObserver(new BBCReporter());
		news.addObserver(new CCTVReporter());
		news.setContent("大事不好了!!!");
	}

一个subject上注册了多个Observer。

com.dazhexiang.designpattern.Observer.CCTVReporter:The news's content has changed!
com.dazhexiang.designpattern.Observer.BBCReporter:The news's content has changed! The content is:大事不好了!!!

输出内容

@Test
	public void manyToOne() {

		News goodNews = new News();

		goodNews.addObserver(new BBCReporter());

		goodNews.setContent("太好了!!!");

		News badNews = new News();

		badNews.addObserver(new BBCReporter());

		badNews.setContent("大事不好了!!!");

	}

一个Observer注册了多个Subject

com.dazhexiang.designpattern.Observer.BBCReporter:The news's content has changed! The content is:太好了!!!
com.dazhexiang.designpattern.Observer.BBCReporter:The news's content has changed! The content is:大事不好了!!!

输出内容。

Older Posts »

Powered by WordPress