EUAdvancer

pyspider之scheduler模块集群化

bg
上一篇文章介绍过,scheduler 模块只能启动一个进程,这也就意味着即使是分布式部署 pyspider ,当 scheduler 崩溃时整个系统仍然无法再正常工作,所以将 scheduler 模块集群化是非常有意义的事情

设计目标

pyspider 原生设计中,scheduler 是系统的中心节点,它来调度和管理任务,好处就是数据和管理高度统一集中在一个地方,简单易行。但是缺点也很明显,也就是很容易形成性能瓶颈,而且一旦 scheduler 挂掉,整个系统随之崩溃。而我们对 scheduler 集群化的目标就是形成一个无中心化的集群,节点彼此通信并且彼此合作完成任务,这样可以在保证在任意一个节点崩溃的时候系统仍然能够正常运行。但是由于 scheduler 本身就是一个调度模块,所以如果多个节点同时运行,最重要的彼此同步的数据,哪些数据该同步。这里用到 zookeeper 帮我们管理这些节点。

数据同步

多个 scheduler 节点可选共同维护相同的 project 或者分别负责不同的 project (均分),前者需要保证创建项目,删除项目,暂停项目等操作能够保证实时同步,因为这些操作必然是 rpc 调用,所以可以选择 leader 真正进行操作,其他节点再次 rpc 调用 worker 节点同步到内存即可,实现起来并不复杂,但是不利于统计每个项目的信息。而后者每个项目相互独立,上述操作也只需要相应节点操作即可,其他节点不需要知道,很适合统计每个项目的信息,所以我们选择后者。

首先研究需要同步的数据,scheduler 需要同步的数据主要为 self.projects ,它在内存中保存了所有项目的信息

1
2
3
4
5
6
def _update_project(self, project):
'''update one project'''
if project['name'] not in self.projects:
self.projects[project['name']] = Project(self, project)
else:
self.projects[project['name']].update(project)

源码中它通过定时从 db 检测项目的更新或者强制更新标记来维护 self.projects ,现在 每个 worker 只需维护属于它自己的项目,而不需要保存所有项目信息,只有 leader 需要额外维护一个所有 projectid 以便监控项目的创建和删除来通知相应的节点,所以我们只需在 znode 上存放相应的项目 id 即可 。由于每个 scheduler 的项目并不相同,这意味着,webuischeduler 发起一次调用时必须指向特定的 scheduler 节点,那么还需要一个 znode 节点保存所有每个 scheduler 节点的 ipport 等信息。这样整个同步信息就理顺了,下面的图展示了整个 znode 层次结构

主备切换

上面的图其实也解释了如何实现该功能,当多个 worker 均参与 leader 选举,一旦一个 worker 被确定选为 leader ,那么它需要额外监控根节点(上图的 app 节点),当然监控的不是根节点的内容,而是其子节点的结构变化,诸如新建节点,删除节点。这是因为我们可以使 每个 scheduler 启动时都会在 zookeeper 服务端创建临时 znode 节点(ephemeral node ,当会话结束时,该节点会自动删除),这样任何一个节点崩溃都会导致其对应的 znode 节点自动删除,而监控根节点的 leader 节点可以感知到这个变化并做出相应变化。因此 leader 节点和其他 worker 节点大部分时间是做相同的事情,只在节点变化时重新进行项目分配

数据均匀分发

为了保证每个节点的负载尽量一致,我们需要将项目均匀的分发到每个 scheduler 节点上,这里采用 哈希取模 映射到相应的 znode 上。 它可以解决一个比较麻烦的问题,之前逻辑是每个 task 都是通过 processor 插入到 newtask_queue 消息队列,然后 scheduler读取即可,但是现在我们有很多 scheduler ,并且它们分别负责不同的 project ,所以我们必须为每个 scheduler 创建一个消息队列并以每个 zookeeper 客户端连接 id 作为消息队列的连接名,那么问题就变成 processor 如何知道该往哪个消息队列存放 task 了。由于采用了哈希取模的方式映射到 znode 上,所以这里很简单,processor 在分发 task 前根据 projectid 确定目标 znode 来得到连接 id,这样就可以正常插入了。

当然还有一个思路,因为 processor 生成的 task 实际是 scheduler 分发出来的 task 产生出来的(也就是谁产生的写回谁),所以可以在 scheduler 传出 task 时增加一个 zkclientid 的标识,这样 processor 可以根据这个标识重新传回 scheduler ,但是这个方法的坏处是,因为 task 会存在 taskdb 中,所以 zkclientid 标识也会存在 db 中,那么当系统重启时,zkclientid 必然会改变,这就导致 db 中的 task 全部不能正确插入到消息队列,所以不推荐该方式。

分配代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def zk_distribute_projects(self, nodes, projects=None, mode=None):
"""distribute projects to all exist znodes"""
if mode == 'add' or mode == 'delete':
assert projects is not None, "please set 'projects' to use 'add' mode"

if projects is None:
projects = self.zk_projects

if len(projects) == 0:
logging.error('no project')
return

nodes_projects = defaultdict(list)
for index, projectid in enumerate(projects):
path = self.get_project_node(projectid, nodes)
nodes_projects[path].append(projectid)

# 分别为每个worker赋值
for path in nodes_projects:
if mode == 'add':
logging.error(path)
olddata = self.zkcli.get(path)[0]
if olddata == "":
data = json.dumps(nodes_projects[path])
else:
data = json.dumps(json.loads(olddata) + nodes_projects[path])
logger.error(path + ': (' + str(len(json.loads(data))) + ') ' + data)
elif mode == 'delete':
data = json.loads(self.zkcli.get(path)[0])
for projectid in nodes_projects[path]:
data.remove(projectid)
data = json.dumps(data)
else:
data = json.dumps(nodes_projects[path])
logger.error(path + ': (' + str(len(nodes_projects[path])) + ') ' + json.dumps(nodes_projects[path]))
self.zkcli.set(path, data)

def get_project_node(self, projectid, nodes):
"""哈希映射"""
num_nodes = len(nodes)
assert num_nodes is not 0
aimnode = hash(projectid) % num_nodes
return nodes[aimnode]

函数解释:

  • 该函数仅由 leader 执行,由于 leader 也作为一个 worker 执行,所以 self.projects 只有部分项目,所以需要额外维护一个 self.zk_projects 来保存所有 projectid 用于检测项目的删除和创建来分配
  • 为了防止频繁的改变 project 的映射位置,在项目增加(add)或删除(delete)模式下仅对相应的项目分配,其他项目不受影响,在新增节点或者删除节点的时候才会对全部项目进行统一分配

RPC调用

该章节可忽略,其中部分内容非原生 pyspider 内容,而是属于对 pyspider 功能进行二次开发后的集群化内容

项目启动 & 批量启动

启动一个项目需要两步,首先对项目状态进行修改(RUNNING),接着将 task 直接插入 newtask_queue 队列, 均由 rpc 调用实现。集群化比较重要的是得到项目对应的 scheduer 节点监听的 ipport。思路为: 通过 hash 映射得到 projectid 对应的 znodepath ,然后根据 pathleader 节点得到目标 scheduler 监听的 ipport 以供 rpc 调用

项目批量启动时比较特殊,多个项目有可能在不同 scheduler 上,所以可以先获取所有 scheduler 的信息,然后再根据上述思路确定每一个 projectrpc 地址,以免频繁访问 leader 节点(znode

项目停止

和项目启动原理相同

项目重启

项目重启要经历删除内存中的项目 self.projects ,删除 taskdb 等持久化数据,强制从 db 中重新更新 project 到内存三个步骤。所以我们仍然可以延续项目启动的思路,rpc 调用直接指向目标 scheduler 节点

项目删除 & 批量删除

项目删除不同于项目重启,因为在删除内存中该项目后,并不会再次强制恢复了,所以在项目重启的思路的基础上,我们还需要考虑 scheduler 对应的 znode 节点上的数据的同步,保证它们的一致性。这里我们可以交给 leader 做,在 数据同步 小节中已经介绍了, leader 会监控项目的创建和删除,它的作用就在此,一旦项目删除,它会立即同步信息到相应的 znode 节点上。批量删除是一样的道理,原理和项目删除一样

多线程锁

对,不是分布式锁。这里讲的是由于每个 zookeeper 客户端需要参与 leader 选举,那么也就是说除了主节点,其他节点都将处于阻塞状态,所以每启动一个 scheduler 都会开启一个线程来运行 zookeeper 客户端,而 zookeeper 客户端会创建并监控对应的临时 znode 节点,只要 znodeprojectid 列表发生变化(由 leader 分配),那么它会立刻将项目列表同步回来(也就是同步到 self.projects 变量上),这意味着涉及到 self.projects 的操作需要加锁,比如:

1
2
3
4
5
6
def _update_project(self, project):
'''update one project'''
if project['name'] not in self.projects:
self.projects[project['name']] = Project(self, project)
else:
self.projects[project['name']].update(project)

但是集群化的 scheduler 已经不需要这个操作了(因为它的 projectid 的变化只会从 znode 节点同步回来,而不是从 db 中实时读取更新)。这里强调的是我们不需要为每个地方都加锁,对于用于遍历 self.projects 的地方只需要 try-catch ,因为它在下一轮仍然会刷新到,不需要加锁导致性能缺失,当然对于会改变 self.projects 结构的地方加锁是必须的了。以下是同步代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def zk_synchronize_projects(self, projects):
"""synchronize projects with watched znode"""
assert type(projects) is list

exist_projects = set(projects) & set(self.projects.keys())
delete_projects = [projectid for projectid in self.projects if projectid not in projects]
add_projects = set(projects) - exist_projects

# 涉及到修改变量结构的地方均需要加锁
self.lock.acquire()
# 删除需要删除的项目
for projectid in delete_projects:
del self.projects[projectid]
# 加载新增的项目(从projectdb中读取项目信息)
for projectid in add_projects:
self.projects[projectid] = Project(self, self.projectdb.get(name=projectid))
# 从taskdb中加载每个项目的task
for projectid in self.projects:
self._load_project_tasks(projectid)
self.lock.release()
logger.error('synchronize projects done: ' + str(len(self.projects)))

zookeeper客户端操作

客户端的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
from kazoo.client import KazooClient
import socket
import json
import logging
import os

logging.basicConfig()


zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()

root = '/app'


class ZKops(object):
def __init__(self, createNode=True):
self.scheduler = None
self.zk = zk
self.root = root
self.id = str(self.zk.client_id[0])
self.rpcport = None
if createNode:
self.path = self._create_node('worker', ephemeral=True, append_id=True, haveRoot=True)
else:
self.path = None

def forScheduler(self, scheduler):
"""传入scheduler对象,用于监控其对应的znode节点和leader节点,并同步回信息"""
self.scheduler = scheduler

@zk.DataWatch('/leader')
def watch_leader(data, stat):
"""监控leader的信息(ip, hostname, client_id)"""
if data:
logging.error("[Watch Leader Data] leader information is %s [Version is %s]" % (data, stat.version))
else:
# logging.warning("[Watch leader] data is not available")
pass

@zk.DataWatch(self.path)
def watch_node(data, stat):
if data:
# logging.error("[Watch Node Data] Node data is %s [Version is %s]" % (data, stat.version))
try:
assert self.scheduler is not None, "please set scheduler first"
self.scheduler.zk_synchronize_projects(json.loads(data))
except Exception, e:
logging.error(e)
else:
# logging.warning("[Watch current node] data is not available")
pass

def forWebui(self, app):
"""webui模块不需要创建znode节点,但需要监控leader节点,用于rpc调用"""
@zk.DataWatch('/leader')
def webui_watch_leader(data, stat):
"""监控leader的信息(ip, hostname, client_id)"""
if data:
logging.error("[Watch Leader Data] leader information is %s [Version is %s]" % (data, stat.version))
connect_info = json.loads(data)['/leader']
self.connect_scheduler_rpc_by_address(app, "http://%s:%s" % (connect_info['ip'], connect_info['port']))
else:
# logging.warning("[Watch leader] data is not available")
pass

def forProcessor(self, zk_synchronize_projects):
"""processor模块不需要创建znode节点,但需要监控leader节点,用于判断每个task的输出消息队列"""
@zk.DataWatch('/leader')
def processor_watch_leader(data, stat):
"""监控leader的信息(ip, hostname, client_id)"""
if data:
logging.error("[Watch Leader Data] leader information is %s [Version is %s]" % (data, stat.version))
self.force_synchronize_projectslocation(zk_synchronize_projects)
else:
# logging.warning("[Watch leader] data is not available")
pass

def force_synchronize_projectslocation(self, zk_synchronize_projects):
"""用于processor,当processor中没有正确同步每个scheduler的消息队列信息时强制更新"""
info = self.get_projects_info_of_every_node()
projectslocation = dict()
connect_info = json.loads(self.zk.get('/leader')[0])
for nodepath in info:
projectslocation[connect_info[nodepath]['client_id']] = info[nodepath]
zk_synchronize_projects(projectslocation)

@staticmethod
def connect_rpc(value):
"""连接rpc"""
if not value:
return
try:
from six.moves import xmlrpc_client
except ImportError:
import xmlrpclib as xmlrpc_client
return xmlrpc_client.ServerProxy(value, allow_none=True)

@staticmethod
def connect_scheduler_rpc_by_address(app, address, key='scheduler_rpc'):
"""给定rpc地址连接rpc"""
try:
app.config[key] = ZKops.connect_rpc(address)
logging.error('[rpc] connect to ' + address)
except Exception, e:
logging.error(e)

def get_all_scheduler_rpc_address(self):
"""得到所有可选的rpc连接地址"""
conns = dict()
connect_info = json.loads(self.zk.get('/leader')[0])
for nodepath in connect_info:
if nodepath == '/leader':
continue

address = 'http://%s:%s' % (connect_info[nodepath]['ip'], connect_info[nodepath]['port'])
conns[nodepath] = address
return conns

def connect_scheduler_rpc_by_specify_project(self, app, projectid):
"""得到指定的项目的对应rpc调用地址"""
info = self.get_projects_info_of_every_node()

aimnode = None
logging.error(info)
for nodepath in info:
if projectid in info[nodepath]:
aimnode = nodepath
logging.error(aimnode)
connect_info = json.loads(self.zk.get('/leader')[0])
aiminfo = connect_info.get(aimnode, None)

if aiminfo is not None:
address = 'http://%s:%s' % (aiminfo['ip'], aiminfo['port'])
self.connect_scheduler_rpc_by_address(app, address, key='scheduler_rpc_specify_project')
else:
app.config['scheduler_rpc_specify_project'] = None
logging.error('scheduler_rpc set error: ' + projectid)

def connect_leader_scheduler_rpc(self, app):
"""连接到leader(scheduler节点)"""
connect_info = json.loads(self.zk.get('/leader')[0])
if '/leader' in connect_info:
leaderinfo = connect_info['/leader']
address = 'http://%s:%s' % (leaderinfo['ip'], leaderinfo['port'])
self.connect_scheduler_rpc_by_address(app, address, key='scheduler_rpc_leader')
else:
app.config['scheduler_rpc_leader'] = None
logging.error("can't get the leader info to connect")

def create_node_later(self, port):
"""为了获取端口而设置,直接初始化类无法获得scheduler端口(因为单机测试需要改端口)"""
self.rpcport = port
self.path = self._create_node('worker', ephemeral=True, append_id=True, haveRoot=True)

def _create_node(self, node, ephemeral=False, append_id=False, haveRoot=False):
"""创建znode节点,可选临时,增加后缀,路径增加根节点选项"""
root = self.root if haveRoot else ''
self.zk.ensure_path(self.root)
if append_id:
path = root + '/' + node + self.id
else:
path = root + '/' + node

if not self.zk.exists(path):
if ephemeral:
self.zk.create(path, ephemeral=True)
else:
self.zk.create(path)
else:
pass

port = self.rpcport if self.rpcport is not None else None
hostname, ip = self.get_ip_hostname()
olddata = self.zk.get('/leader')[0]
if olddata != "":
data = json.loads(olddata)
assert type(data) is dict
data[path] = {
'ip': ip,
'hostname': hostname,
'client_id': self.id,
'port': port
}
else:
data = dict()
data[path] = {
'ip': ip,
'hostname': hostname,
'client_id': self.id,
'port': port
}
self.zk.set('/leader', json.dumps(data))
# logging.error('[create_ephemeral_node Ops] Current children: ' + str(self.zk.get_children(self.root)))
return path

def election_leader(self):
"""领导选举"""
election = self.zk.Election("/electionpath")
election.run(self.leader_func)

@staticmethod
def get_ip_hostname():
"""得到系统ip和hostname"""
hostname = socket.gethostname()
ip = socket.gethostbyname(hostname)
return hostname, ip

def leader_func(self):
"""leader向/leader路径传入本机ip和hostname"""
hostname, ip = self.get_ip_hostname()
leader_path = self._create_node('leader', ephemeral=False, append_id=False, haveRoot=False)

assert self.scheduler is not None
port = self.rpcport

olddata = self.zk.get('/leader')[0]
if olddata != "":
data = json.loads(olddata)
assert type(data) is dict
data[leader_path] = {
'ip': ip,
'hostname': hostname,
'client_id': self.id,
'port': port
}
else:
data = dict()
data[leader_path] = {
'ip': ip,
'hostname': hostname,
'client_id': self.id,
'port': port
}

self.zk.set(leader_path, json.dumps(data))
self.scheduler.zk_init_projects()
logging.error('[I am the leader] ' + json.dumps(data))

@zk.ChildrenWatch(root)
def watch_children(children):
"""leader监控所有子节点的创建,删除"""
if children:
logging.error('[Watch Children changed] ' + str(children))
olddata = self.zk.get('/leader')[0]
if olddata != "":
data = json.loads(olddata)
nodes = self.get_nodes_list()
for path in data.keys():
if path not in nodes and path != '/leader':
del data[path]
self.zk.set('/leader', json.dumps(data))

assert self.scheduler is not None, "please set scheduler first"
children = [root + '/' + path for path in children]
self.scheduler.zk_distribute_projects(children)
else:
# logging.warning("[Watch children] children is not available")
pass

while True:
pass

def get_client_id(self):
return self.id

@staticmethod
def get_root():
return root

def set(self, path, data):
self.zk.set(path, data)

def get(self, path):
return self.zk.get(path)

def getNodePath(self):
return self.path

def isLeader(self):
"""判断自己是否为leader"""
leader_info, _ = self.zk.get('/leader')
client_id = json.loads(leader_info)['/leader']['client_id']
if client_id == self.id:
# logging.error("make sure I'm the leader")
return True
else:
# logging.error("make sure I'm not the leader")
return False

def get_nodes_list(self):
"""得到所有znode的path"""
return [root + '/' + path for path in self.zk.get_children(root)]

def get_projects_info_of_every_node(self):
"""得到每个znode上项目信息,返回字典,key为znode的path,value为list"""
nodes = self.zk.get_children(root)
path = [root + '/' + node for node in nodes]
info = dict()

for nodepath in path:
data = self.zk.get(nodepath)[0]
if data == "":
projects = []
else:
projects = json.loads(data)
info[nodepath] = projects
return info

小结

上面介绍了整个设计流程和关键代码,实现完成后可支持 scheduler 集群化部署,并且支持 scheduler 节点动态增加,删除(项目会自动重新分配同步到可用节点)