EUAdvancer

分布式爬虫pyspider架构解析

bg
有项目对 pyspider 二次开发了很久,所以对它源码比较了解,这里记录下它的原理

pyspider模块介绍

pyspider主要有以下几大模块

  • scheduler
  • fetcher
  • processor
  • result_worker
  • webui

它们分工十分明确,scheduler 负责调度任务,fetcher 负责抓取网页内容,processor 负责处理网页内容,result_worker 负责将处理结果存储到db中,而 webui 则提供了可视化操作,每个模块之间通过消息队列通信。该爬虫同时支持分布式部署和创建子线程/进程方式部署

子线程/进程部署

首先先简单介绍下这个模式,可以定位到 pyspider/run.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def all(ctx, fetcher_num, processor_num, result_worker_num, run_in):
"""
Run all the components in subprocess or thread
"""


...

# result worker
result_worker_config = g.config.get('result_worker', {})
for i in range(result_worker_num):
threads.append(run_in(ctx.invoke, result_worker, **result_worker_config))

# processor
processor_config = g.config.get('processor', {})
for i in range(processor_num):
threads.append(run_in(ctx.invoke, processor, **processor_config))

# fetcher
fetcher_config = g.config.get('fetcher', {})
fetcher_config.setdefault('xmlrpc_host', '127.0.0.1')
for i in range(fetcher_num):
threads.append(run_in(ctx.invoke, fetcher, **fetcher_config))

# scheduler
scheduler_config = g.config.get('scheduler', {})
scheduler_config.setdefault('xmlrpc_host', '127.0.0.1')
threads.append(run_in(ctx.invoke, scheduler, **scheduler_config))

# running webui in main thread to make it exitable
webui_config = g.config.get('webui', {})
webui_config.setdefault('scheduler_rpc', 'http://127.0.0.1:%s/'
% g.config.get('scheduler', {}).get('xmlrpc_port', 23333))

...

这种模式下,每个模块的通信是通过跨进程通信队列

1
2
3
def connect_message_queue():
from pyspider.libs.multiprocessing_queue import Queue
return Queue(maxsize=maxsize)

分布式部署

这才是主流的部署方式。我们需要分别启动每个模块

1
2
3
4
5
$ python run.py -c config.json scheduler
$ python run.py -c config.json fetcher
$ python run.py -c config.json processor
$ python run.py -c config.json result_worker
$ python run.py -c config.json webui

由于它们之间都是独立的进程,所以通过消息队列传递信息

1
2
3
4
5
6
7
8
for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
'fetcher2processor', 'processor2result'):
if kwargs.get('message_queue'):
kwargs[name] = utils.Get(lambda name=name: connect_message_queue(
name, kwargs.get('message_queue'), kwargs['queue_maxsize']))
else:
kwargs[name] = connect_message_queue(name, kwargs.get('message_queue'),
kwargs['queue_maxsize'])

整个设计思路如下:

除了 scheduler 模块,其它模块都可以分布式部署(实际上 scheduler 模块也能设计成集群化,这在之后的文章会单独针对它分析)

scheduler & webui 模块

scheduler 是整个系统里最重要的部分,它主要功能其实就是以下几个函数构成:

1
2
3
4
5
6
7
8
9
10
11
def run_once(self):
'''comsume queues and feed tasks to fetcher, once'''

self._update_projects()
self._check_task_done()
self._check_request()
while self._check_cronjob():
pass
self._check_select()
self._check_delete()
self._try_dump_cnt()
  • _update_projects():负责定时检查并更新 db 中每个项目的状态,并将 project 信息同步到内存中
  • _check_task_done():用于处理 status_queue 传回来的 task(包含 fetcherprocessortask 的处理结果)
  • _check_request():用于处理 newtask_queue(包含新的task任务),步骤如下:
    • 判断 task 是否已经在 task_queue 队列(该队列仅在 scheduler 内部使用)中,如果在且不强制更新,则忽略该 task ,函数结束
    • 根据 taskdb 判重,如果是新 task ,则将该 task 插入到 taskdb ;如果是已存在的 task ,则更新 tasktaskdb
    • task 插入到 task_queue 队列
  • _check_select():用于处理 task_queue,从 task_queue 中取出每个 taskid ,并从 taskdb 中读取该 task ,将该 task 插入 out_queue(scheduler2fetcher)
  • _check_delete() :由于 pyspider 删除项目并不是立刻删除的,而是在置标记为删除后,隔一段时间才会删。所以这里就是定时检测置删除标记的项目并将其删除
  • _try_dump_cnt():用于统计
  • _check_cronjob():用于检测定期重新调度启动的项目(比如一个网站设置为每15分钟重新爬取一次)

除了上述的主逻辑以外,scheduler 还注册了 rpc 服务以供 webui 调用。我们可以通过 webui 的调用了解到每个函数的作用。

1
def xmlrpc_run(self, port=23333, bind='127.0.0.1', logRequests=False):

项目启动

1
2
3
4
5
6
def new_task(task):
if self.task_verify(task):
self.newtask_queue.put(task)
return True
return False
application.register_function(new_task, 'newtask')

webui 启动项目调用了上述的函数,而从前面 scheduler 主逻辑我们可以看到,scheduler 会通过 _check_request()__check_select() 两个函数不断的读取 newtask_queue 队列的数据然后将该 task 分发出去,所以启动项目是将第一个 task 直接放入 newtask_queue 队列(一般是 processor 放进去的)以便 scheduler 分发。

项目更新

1
2
3
def update_project():
self._force_update_project = True
application.register_function(update_project, 'update_project')

对项目的配置更新后会直接 update 数据库中相应项目的内容,那么我们如何通知 scheduler 更新到内存呢(_update_projects 函数具有该功能,但是它间隔时间较长,不能立刻知道),因此通过 webui 通过 rpc 调用强制置更新标记,而它是如何真正做到更新项目的呢,其实 scheduler 主逻辑函数 _update_projects 已经体现出来了

1
2
3
4
5
6
7
8
9
10
11
12
13
def _update_projects(self):
'''Check project update'''
now = time.time()
if (
not self._force_update_project
and self._last_update_project + self.UPDATE_PROJECT_INTERVAL > now
):
return
for project in self.projectdb.check_update(self._last_update_project):
self._update_project(project)
logger.debug("project: %s updated.", project['name'])
self._force_update_project = False
self._last_update_project = now

通过置强制更新标记 _force_update_project 来从 db 中同步项目信息。webui 中还有很多一些 rpc 调用,但比较容易理解,所以就不一一列举了。

fetcher & processor & result_worker 模块

这几个模块思想比较简单,不需要特别的介绍了

  • fetcher:获取 scheduler2fetcher 传出的 task ,并获取页面的内容(可选通过 phantomjs 渲染页面获取异步加载的页面)
  • processor:处理 fetcher 传出的 task ,并且将 fetchprocess 的处理状态通过 status_queue 传回 scheduler
  • result_worker:保存到 db