def tcp_listener(address, backlog=50):
"""
Listen on the given (ip, port) *address* with a TCP socket.
Returns a socket object on which one should call ``accept()`` to
accept a connection on the newly bound socket.
Generally, the returned socket will be passed to ``tcp_server()``,
which accepts connections forever and spawns greenlets for
each incoming connection.
"""
sock = GreenSocket() # 创建GreenSocket
socket_bind_and_listen(sock, address, backlog=backlog) # 绑定并监听端口
return sock # 返回连接
此时查看GreenSocket类,
class GreenSocket(object):
is_secure = False
timeout = None
def __init__(self, family_or_realsock=_socket.AF_INET, *args, **kwargs):
if isinstance(family_or_realsock, (int, long)): # 判断传入的是否是文件socket连接
fd = _original_socket(family_or_realsock, *args, **kwargs)
else:
fd = family_or_realsock # 如果不是,则是一个文件描述符
assert not args, args
assert not kwargs, kwargs
set_nonblocking(fd) # 将该文件描述符设置成非阻塞
self.fd = fd
self._fileno = fd.fileno() # 获取文件描述符值
self.recvbuffer = ''
self.closed = False
self.timeout = _socket.getdefaulttimeout() # 获取超时时间
# when client calls setblocking(0) or settimeout(0) the socket must
# act non-blocking
self.act_non_blocking = False # 是否启动阻塞与非阻塞标志位
def get_hub():
global _threadlocal
try:
hub = _threadlocal.hub # 检查该全局变量是否有hub属性
except AttributeError:
# do not import anything that can be monkey-patched at top level
import threading # 没有的话,则设置成线程安全的值
_threadlocal = threading.local()
hub = _threadlocal.hub = Hub() # 设置为Hub类的实例
return hub
继续查看Hub类,
class Hub(object):
def __init__(self):
self.greenlet = Greenlet(self.run) # 将self.run包装成协程
self.keyboard_interrupt_signal = None
def switch(self):
cur = getcurrent() # 获取当前协程
assert cur is not self.greenlet, 'Cannot switch to MAINLOOP from MAINLOOP'
switch_out = getattr(cur, 'switch_out', None) # 如果当前协程有switch_out属性则直接执行该方法
if switch_out is not None:
try:
switch_out()
except:
traceback.print_exception(*sys.exc_info())
if self.greenlet.dead: # 检查当前协程是否死亡
self.greenlet = Greenlet(self.run) # 如果死亡则重新初始化
return self.greenlet.switch() # 调用switch函数切换协程
def run(self, *args, **kwargs):
if self.keyboard_interrupt_signal is None:
self.keyboard_interrupt_signal = signal(2, MAIN.throw, KeyboardInterrupt)
while True:
result = core.dispatch() # 调用libevent的事件调度函数,去检查是否有注册的事件发生
if result>0:
return 'Hub.run() has finished because there are no events registered'
elif result<0:
return 'Hub.run() has finished because there was an error'
return result
def execute(self, func, *args, **kwargs):
"""Execute func in one of the coroutines maintained
by the pool, when one is free.
Immediately returns a Proc object which can be queried
for the func's result.
>>> pool = Pool()
>>> task = pool.execute(lambda a: ('foo', a), 1)
>>> task.wait()
('foo', 1)
"""
# if reentering an empty pool, don't try to wait on a coroutine freeing
# itself -- instead, just execute in the current coroutine
if self.sem.locked() and gevent.getcurrent() in self.procs: # 检查缓冲池中是否存在
p = spawn(func, *args, **kwargs)
try:
p.wait()
except:
pass
else:
self.sem.acquire() # 缓冲池可以立马使用
p = self.procs.spawn(func, *args, **kwargs) # 生成协程并执行
# assuming the above line cannot raise
p.link(lambda p: self.sem.release())
return p
此时调用了spawn方法,
def spawn(self, func, *args, **kwargs):
p = spawn(func, *args, **kwargs) # 生成协程
self.add(p)
return p
此时的spawn就是调用了,
def spawn(function, *args, **kwargs):
"""Create a new greenlet that will run `function(*args)'.
The current greenlet won't be unscheduled. Keyword arguments aren't
supported (limitation of greenlet), use spawn() to work around that.
"""
g = Greenlet(lambda : function(*args, **kwargs)) # 生成协程
g.parent = get_hub().greenlet # 设置生成协程的parent
timer(0, g.switch) # 注册0秒后执行
return g # 返回该协程
def handle(self):
"""Handle multiple requests if necessary."""
self.close_connection = 1
self.handle_one_request()
while not self.close_connection:
self.handle_one_request()
def handle_one_response(self):
start = time.time()
headers_set = []
headers_sent = []
# set of lowercase header names that were sent
header_dict = {}
wfile = self.wfile
result = None
use_chunked = [False]
length = [0]
status_code = [200]
...
try:
try:
result = self.application(self.environ, start_response) # 调用具体的业务处理函数,例子中的hello_world
if not headers_sent and hasattr(result, '__len__'):
headers_set[1].append(('content-length', str(sum(map(len, result)))))
towrite = []
for data in result: # 将处理返回数据写入
if data:
write(data) # 调用write函数写入data
if not headers_sent:
write('')
if use_chunked[0]:
wfile.write('0\r\n\r\n')
except Exception, e:
self.close_connection = 1
exc = traceback.format_exc()
print exc
if not headers_set:
start_response("500 Internal Server Error", [('Content-type', 'text/plain')])
write(exc)
def write(data, _writelines=wfile.writelines): # 写入文件的方法
...
try:
_writelines(towrite) # 写入数据
length[0] = length[0] + sum(map(len, towrite))
except UnicodeEncodeError:
print "Encountered unicode while attempting to write wsgi response: ", [x for x in towrite if isinstance(x, unicode)]
traceback.print_exc()
_writelines(
["HTTP/1.0 500 Internal Server Error\r\n",
"Connection: close\r\n",
"Content-type: text/plain\r\n",
"Content-length: 98\r\n",
"\r\n",
"Internal Server Error: wsgi application passed a unicode object to the server instead of a string."])
def writelines(self, list):
# XXX We could do better here for very long lists
# XXX Should really reject non-string non-buffers
lines = filter(None, map(str, list))
self._wbuf_len += sum(map(len, lines))
self._wbuf.extend(lines)
if (self._wbufsize <= 1 or
self._wbuf_len >= self._wbufsize):
self.flush()
调用了flush方法,
def flush(self):
if self._wbuf:
data = "".join(self._wbuf)
self._wbuf = []
self._wbuf_len = 0
buffer_size = max(self._rbufsize, self.default_bufsize)
data_size = len(data)
write_offset = 0
view = memoryview(data)
try:
while write_offset < data_size:
self._sock.sendall(view[write_offset:write_offset+buffer_size])
write_offset += buffer_size
finally:
if write_offset < data_size:
remainder = data[write_offset:]
del view, data # explicit free
self._wbuf.append(remainder)
self._wbuf_len = len(remainder)