Looyao's Blog

记录一些点滴

Beanstalkd - 一个简单、高性能的消息队列系统

| Comments

消息队列适用于很多业务场景,比如发邮件、短信,这种业务逻辑需要放到后端异步处理,而不需要阻塞前端,消息队列就排上用场,前端负责把任务put到消息队列中,后端服务从消息队列get任务处理。这种也可以成为生产者/消费者模型,这种模型试用范围很广,包括写多线程逻辑也经常会用到,主线程负责生产任务,工作线程负责消费任务。利用消息队列,这种生产者/消费者模型可以很容易扩展,提高整个系统的吞吐率。

在介绍Beanstalkd之前先说下其他。目前市面上有N多消息队列系统,这里有个网站专门介绍:http://queues.io/,尝试过搭建RabbitMQ,使用Erlang实现的,配置略复杂,最主要是,开放N个端口,我希望所有端口都绑定到内网,但是其中一个端口google了N久都没有找到如何绑定到内网,都是0.0.0.0这种,当然采用iptable限制一下也可以,但是略纠结,也可能是本人不是很懂Erlang,也不是很明白他的分布式原理,总之最后放弃不准备用它。当然RabbitMQ很功能很完善,只是我不想用==,我只是想找一个轻量级\好维护的消息队列。Redis也可以当作队列来用,使用list,或者pub/sub,但是毕竟Redis不是专业做消息队列的。下面Beanstalkd登场。

Beanstalkd,使用纯c实现,安装\配置\使用都非常简单,支持持久化,但是不支持分布式,不过我感觉这个还好,可以启动多个实例,前端逻辑做下支持也没关系。

安装

去github下载源码,https://github.com/kr/beanstalkd,下载下来make即可。beanstalkd就是可执行文件,为了方便,我们可以把可执行文件拷贝到指定目录。

1
2
3
mkdir -p /usr/local/beanstalkd/bin
cp beanstalkd /usr/local/beanstalkd/bin
ln -sv /usr/local/beanstalkd/bin/beanstalkd /usr/local/bin/

配置

配置非常简单,不需要配置文件,只有几个参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[looyao@vm ~]$ beanstalkd -h
Use: beanstalkd [OPTIONS]

Options:
 -b DIR   wal directory
 -f MS    fsync at most once every MS milliseconds (use -f0 for "always fsync")
 -F       never fsync (default)
 -l ADDR  listen on address (default is 0.0.0.0)
 -p PORT  listen on port (default is 11300)
 -u USER  become user and group
 -z BYTES set the maximum job size in bytes (default is 65535)
 -s BYTES set the size of each wal file (default is 10485760)
            (will be rounded up to a multiple of 512 bytes)
 -c       compact the binlog (default)
 -n       do not compact the binlog
 -v       show version information
 -V       increase verbosity
 -h       show this help

例子:

1
2
3
4
5
#绑定内网IP:10.10.0.100,端口11300,不持久化
((beanstalkd -l 10.10.0.100 -p 11300 &) &)

#绑定内网IP:10.10.0.100,端口11300,开启持久化,持久化文件保存在/data/beanstalkd/data,默认10兆一个文件,可以根据自己情况修改
beanstalkd -l 10.10.0.100 -p 11300 -b /data/beanstalkd/data -s 10485760

如果开启持久化,性能会下降好多,主要是磁盘I/O导致,每一次数据变化都会写磁盘。

使用

首先要理解下Beanstalk的几个概念。tube:队列,job:消息任务,job存放在tube中。在看下job的状态说明,https://github.com/kr/beanstalkd/blob/master/doc/protocol.txt

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Here is a picture of the typical job lifecycle:


   put            reserve               delete
  -----> [READY] ---------> [RESERVED] --------> *poof*



Here is a picture with more possibilities:



   put with delay               release with delay
  ----------------> [DELAYED] <------------.
                        |                   |
                        | (time passes)     |
                        |                   |
   put                  v     reserve       |       delete
  -----------------> [READY] ---------> [RESERVED] --------> *poof*
                       ^  ^                |  |
                       |   \  release      |  |
                       |    `-------------'   |
                       |                      |
                       | kick                 |
                       |                      |
                       |       bury           |
                    [BURIED] <---------------'
                       |
                       |  delete
                        `--------> *poof*

这张图比较简单明了, DELAYED:当put时设置了延迟时间,在延迟时间内就是DELAYED状态,过了就自动进入READY状态。 READY:就是可以被消费的状态。 RESERVED:从READY中取任务,任务自动进入RESERVED,处理完成可以删除掉,如果需要重新放入队列可以手动release,则任务重新进入READY状态。补充:RESERVED是针对连接来的,当任务进入RESERVED状态是不能被其他连接(消费者)获取的,当连接断掉且没有被delete的任务会自动重新进入READY状态。 BURIED:这个状态可以理解为操作系统的废纸篓,消费者获取到任务后,可以将任务放到这里,后续可以delete或者kick重新把它置为READY状态。

还有一个比较重要的就是可以设置任务优先级,值越小越先被执行。

语言支持,https://github.com/kr/beanstalkd/wiki/client-libraries,基本上主流语言都有支持。

下边介绍下PHP使用Beanstalkd。

  • 下载pheanstalkd

https://github.com/pda/pheanstalk,最好通过composer下载,省心省力。

  • 几个例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<?php

require '/opt/composer/vendor/autoload.php';

$b = new Pheanstalk\Pheanstalk('127.0.0.1', 11300);

$b->useTube('firstTube');

$b->put('hello world . ' . time());

$b->watch('firstTube');
$b->ignore('default');

//如果无任务,这里会阻塞
$job = $b->reserve();

echo $job->getData() . "\n";

$b->delete($job);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<?php

require '/opt/composer/vendor/autoload.php';

$b = new Pheanstalk\Pheanstalk('127.0.0.1', 11300);

$b->useTube('firstTube');

$b->put('hello world . ' . time());

$b->watch('firstTube');
$b->ignore('default');

$job = $b->reserve();

echo $job->getData() . "\n";


$b->bury($job);

$job = $b->peekBuried();

//$b->kickJob($job);

$b->delete($job);
  • 状态查看

官方有一些介绍,https://github.com/kr/beanstalkd/wiki/Tools

最简单是使用telnet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
telnet localhost 11300
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
list-tubes
OK 26
---
- default
- firstTube

stats-tube firstTube
OK 267
---
name: firstTube
current-jobs-urgent: 0
current-jobs-ready: 0
current-jobs-reserved: 0
current-jobs-delayed: 0
current-jobs-buried: 1
total-jobs: 1
current-using: 0
current-watching: 0
current-waiting: 0
cmd-delete: 0
cmd-pause-tube: 0
pause: 0
pause-time-left: 0

quit
Connection closed by foreign host.

最后,可以读一下FAQ,https://github.com/kr/beanstalkd/wiki/faq,很多疑问这里可能会有答案。

我们目前使用Beanstalk做集中统计日志收集,前端将统计数据放入队列,后端负责记录,解析挖掘,目前来看性能和稳定性都非常好。

Comments