欢迎您光临本小站。希望您在这里可以找到自己想要的信息。。。

Beanstalkd 高性能分布式内存队列系统(学习总结)

Java Web water 7914℃ 0评论

之前在微博上调查过大家正在使用的分布式内存队列系统,反馈有Memcacheq,FqueueRabbitMQBeanstalkd以及linkedin的kafka
RabbitMQ使用比较广泛,Beanstalkd是后起之秀。Beanstalkd之于RabbitMQ,就好比Nginx之于
Apache,Varnish之于Squid。后面在项目中使用Beanstalkd的过程中,更发现其简单、轻量级、高性能、易使用等特点,以及优先
级、多队列、持久化、分布式容错、超时控制等特性。下面就简单介绍一下Beanstalkd,不足之处请大家指正。

———————————————–正文分割线————————————————-

设计思想

高性能离不开异步,异步离不开队列,而其内部都是Producer-Comsumer模式的原理。

图1 Producer-Comsumer模式

应用

Beanstalkd,一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延
迟,支持过有9.5 million用户的Facebook
Causes应用。后来开源,现在有PostRank大规模部署和使用,每天处理百万级任务。Beanstalkd是典型的类Memcached设计,协
议和使用方式都是同样的风格,所以使用过memcached的用户会觉得Beanstalkd似曾相识。

核心概念

Beanstalkd设计里面的核心概念:

  • job

一个需要异步处理的任务,是Beanstalkd中的基本单元,需要放在一个tube中。

  • tube

一个有名的任务队列,用来存储统一类型的job,是producer和consumer操作的对象。

  • producer

Job的生产者,通过put命令来将一个job放到一个tube中。

  • consumer

Job的消费者,通过reserve/release/bury/delete命令来获取job或改变job的状态。

Beanstalkd中一个job的生命周期如图2所示。一个job有READY, RESERVED, DELAYED,
BURIED四种状态。当producer直接put一个job时,job就处于READY状态,等待consumer来处理,如果选择延迟
put,job就先到DELAYED状态,等待时间过后才迁移到READY状态。consumer获取了当前READY的job后,该job的状态就迁移
到RESERVED,这样其他的consumer就不能再操作该job。当consumer完成该job后,可以选择delete,

release或者bury操作;delete之后,job从系统消亡,之后不能再获取;release操作可以重新把该job状态迁移回READY(也
可以延迟该状态迁移操作),使其他的consumer可以继续获取和执行该job;有意思的是bury操作,可以把该job休眠,等到需要的时候,再将休
眠的job kick回READY状态,也可以delete
BURIED状态的job。正是有这些有趣的操作和状态,才可以基于此做出很多意思的应用,比如要实现一个循环队列,就可以将RESERVED状态的
job休眠掉,等没有READY状态的job时再将BURIED状态的job一次性kick回READY状态。

图2 Beanstalkd中job的生命周期

特性

Beanstalkd基于的源码安装和使用很简单,在此略过。这里重点介绍一下其几个很nice的特性。

  • 优先级

支持0到2**32的优先级,值越小,优先级越高,默认优先级为1024。

  • 持久化

可以通过binlog将job及其状态记录到文件里面,在Beanstalkd下次启动时可以通过读取binlog来恢复之前的job及状态。

  • 分布式容错

分布式设计和Memcached类似,beanstalkd各个server之间并不知道彼此的存在,都是通过client来实现分布式以及根据tube名称去特定server获取job。

  • 超时控制

为了防止某个consumer长时间占用任务但不能处理的情况,Beanstalkd为reserve操作设置了timeout时间,如果该consumer不能在指定时间内完成job,job将被迁移回READY状态,供其他consumer执行。

不足

在使用中发现一个Beanstalkd尚无提供删除一个tube的操作,只能将tube的job依次删除,并让Beanstalkd来自动删除空tube。还有就是Beanstalkd不支持客户端认证机制(开发者将应用场景定位在局域网)。

后续工作

  1. 介绍Beanstalkd的命令和使用

  2. 翻译Beanstalkd协议

  3. 分析Beanstalkd源码

参考文献

http://kr.github.com/beanstalkd/

http://adam.heroku.com/past/2010/4/24/beanstalk_a_simple_and_fast_queueing_backend/

http://nubyonrails.com/articles/about-this-blog-beanstalk-messaging-queue

http://www.igvita.com/2010/05/20/scalable-work-queues-with-beanstalk/

附一篇

Beanstalkd 是最近出现的一个轻量级消息中间件,他的最大特点是将自己定位为基于管道  (tube) 和任务 (job) 的工作队列 (work-queue):

 

Beanstalk

“(Beanstalkd) is a simple, fast workqueue service. Its interface is
generic, but was originally designed for reducing the latency of page
views in high-volume web applications by running time-consuming tasks
asynchronously.”

 

Beanstalkd 支持任务优先级 (priority), 延时 (delay), 超时重发 (time-to-run) 和预留 (buried), 能够很好的支持分布式的后台任务和定时任务处理。

 

它的内部实现采用 libevent, 服务器-客户端之间用类似 memcached 的轻量级通讯协议,因此有很高的性能:

 

Beanstalkd

 

尽管是内存队列, beanstalkd 提供了 binlog 机制, 当重启 beanstalkd 时,当前任务状态能够从纪录的本地 binlog 中恢复。

 

管道 (tube):

管道类似于消息主题 (topic), 在一个 Beanstalkd 中可以支持多个管道, 每个管道都有自己的发布者 (producer) 和消费者 (consumer). 管道之间互相不影响。 

 

任务 (job):

Beanstalkd 用任务 (job) 代替消息 (message) 的概念。与消息不同,任务有一系列状态:

 

Beanstalkd

 

READY- 需要立即处理的任务,当延时 (DELAYED) 任务到期后会自动成为当前任务;

DELAYED- 延迟执行的任务, 当消费者处理任务后, 可以用将消息再次放回 DELAYED 队列延迟执行;

RESERVED- 已经被消费者获取, 正在执行的任务。Beanstalkd 负责检查任务是否在 TTR(time-to-run) 内完成;

BURIED- 保留的任务: 任务不会被执行,也不会消失,除非有人把它 "踢" 回队列;

DELETED- 消息被彻底删除。Beanstalkd 不再维持这些消息。

 

任务优先级 (priority):

任务 (job) 可以有 0~2^32 个优先级, 0 代表最高优先级。 beanstalkd 采用最大最小堆 (Min-max heap)
处理任务优先级排序, 任何时刻调用 reserve 命令的消费者总是能拿到当前优先级最高的任务, 时间复杂度为 O(logn). 

 

延时任务 (delay):

有两种方式可以延时执行任务 (job): 生产者发布任务时指定延时;或者当任务处理完毕后, 消费者再次将任务放入队列延时执行 (RELEASE
with <delay>)。这种机制可以实现分布式的
java.util.Timer,这种分布式定时任务的优势是:如果某个消费者节点故障,任务超时重发 (time-to-run)
能够保证任务转移到另外的节点执行。

 

任务超时重发 (time-to-run):

Beanstalkd 把任务返回给消费者以后:消费者必须在预设的 TTR (time-to-run) 时间内发送 delete /
release/ bury 改变任务状态;否则 Beanstalkd 会认为消息处理失败,然后把任务交给另外的消费者节点执行。如果消费者预计在
TTR (time-to-run) 时间内无法完成任务, 也可以发送 touch 命令, 它的作用是让 Beanstalkd 从系统时间重新计算
TTR (time-to-run).

 

任务预留 (buried):

如果任务因为某些原因无法执行, 消费者可以把任务置为 buried 状态让 Beanstalkd 保留这些任务。管理员可以通过 peek
buried 命令查询被保留的任务,并且进行人工干预。简单的, kick <n> 能够一次性把 n 条被保留的任务踢回队列。

 

Beanstalkd 协议:

Beanstalkd 采用类 memcached 协议, 客户端通过文本命令与服务器交互。这些命令可以简单的分成三组: 

 

生产类 – use <tube> / put <priority> <delay> <ttr> [bytes]:  

生产者用 use 选择一个管道 (tube), 然后用 put 命令向管道发布任务 (job).

 

消费类 – watch
<tubes> / reserve / delete <id> / release <id>
<priority> <delay> / bury <id> / touch <id>

消费者用 watch 选择多个管道 (tube), 然后用 reserve
命令获取待执行的任务,这个命令是阻塞的。客户端直到有任务可执行才返回。当任务处理完毕后, 消费者可以彻底删除任务 (DELETE),
释放任务让别人处理 (RELEASE), 或者保留 (BURY) 任务。

 

维护类 – peek job / peek delayed / peek ready / peek buried / kick <n>

用于维护管道内的任务状态, 在不改变任务状态的条件下获取任务。可以用消费类命令改变这些任务的状态。

被保留 (buried) 的任务可以用 kick 命令 "踢" 回队列。

 

协议文档: https://raw.github.com/kr/beanstalkd/master/doc/protocol.txt

 

Beanstalkd 不足:

Beanstalkd 没有提供主备同步 + 故障切换机制, 在应用中有成为单点的风险。实际应用中,可以用数据库为任务 (job) 提供持久化存储。

 

Beanstalkd

 

另外, 和 memcached 类似, Beanstalkd 依赖 libevent 的单线程事件分发机制, 不能有效利用多核 cpu 的性能。这一点可以通过单机部署多个实例克服。

 

官方网站见 http://kr.github.com/beanstalkd/

 

参考资料:

http://adam.heroku.com/past/2010/4/24/beanstalk_a_simple_and_fast_queueing_backend/

http://nubyonrails.com/articles/about-this-blog-beanstalk-messaging-queue

http://abulman.co.uk/presentations/Beanstalkd-2010-05-06/

http://rdc.taobao.com/blog/cs/?p=1201 

via http://in355hz.iteye.com/blog/1395727


二、部署安装:

Beanstalkd 的安装非常简单:

在Ubuntu和debian下使用下面命令:

1
sudo apt-get install beanstalkd

安装后编辑配置文件:

1

vim /etc/default/beanstalkd

把START=NO改为:START=yes即可

更多关于安装可以参考官网


通过命令可以启动、停止Beanstalk

启动后,就可以通过客户端进行调用了:

Beanstalk支持多种客户端语言:

php,java,perl,c,c++,lua,python,go,ruby等等(了解更多可以来官网)。

我们将通过php给大家介绍在生产环境下面的使用。

就拿录视频制程序使用到的Beanstalk来给大家介绍:

先介绍一下程序结构:

视频录制程序分为两个方面,一个是产生录制任务的脚本(生产者),还有一个处理录制任务脚本(消费者)。


java客户端

在官方推荐的几个java的客户端里,我选择了TrendrrBeanstalk,是基于MIT许可证的开源项目,源码一共六个.java文件。用起来很简单,但是没有封装peek相关的操作。如果你有更好的java client的话欢迎交流沟通哈 🙂  随意来个example。

[java] view plaincopy在CODE上查看代码片派生到我的代码片

  1. public class BeanstalkExample {  

  2.       

  3.     protected static Log log = LogFactory.getLog(BeanstalkExample.class);  

  4.       

  5.     public static void main(String[] args) {  

  6.         try {  

  7.             clientExample();  

  8.             //pooledExample();  

  9.         } catch (BeanstalkException e) {  

  10.             // TODO Auto-generated catch block  

  11.             e.printStackTrace();  

  12.         }  

  13.     }  

  14.   

  15.     /** 

  16.      * Example for using an unpooled client 

  17.      *  

  18.      * @throws BeanstalkException 

  19.      */  

  20.     public static void clientExample() throws BeanstalkException {  

  21.         BeanstalkClient client = new BeanstalkClient("ip"8300"example");  

  22.           

  23.         //client.put(1l, 0, 5000, "this is some data".getBytes());  

  24.         log.info(client.tubeStats());  

  25.         //while(client.reserve(60) != null) {  

  26.             BeanstalkJob job = client.reserve(60);  

  27.             log.info("Get job: " + job.getId());  

  28.             client.deleteJob(job);  

  29.         //}  

  30.           

  31.         client.close(); // closes the connection  

  32.     }  

  33.   

  34.     public static void pooledExample() throws BeanstalkException {  

  35.         BeanstalkPool pool = new BeanstalkPool("jx-crm-flare00.jx.baidu.com"830030// poolsize  

  36.                 "example" // tube to use  

  37.         );  

  38.           

  39.         BeanstalkClient client = pool.getClient();  

  40.           

  41.         client.put(1l, 05000"this is some data".getBytes());  

  42.         BeanstalkJob job = client.reserve(60);  

  43.           

  44.         client.deleteJob(job);  

  45.           

  46.         client.close(); // returns the connection to the pool  

  47.     }  

转载请注明:学时网 » Beanstalkd 高性能分布式内存队列系统(学习总结)

喜欢 (0)or分享 (0)

您必须 登录 才能发表评论!