shadowsocks源码解读(二):SS Server工作流程

共计 4645 个字符,预计需要花费 12 分钟才能阅读完成。

上一节中写到shadowsocks由SS Local和SS Server组成,因为工作原理相似,且SS Server方便调试跟踪,所以以SS Server来分析其中具体流程。

运行ssserver -p 10001 -k pass -m aes-256-cfb,运行SS Server
* 监听本地10001端口,密码pass,加密方式aes-256-cfb

在server.py的main()下设置断点方便调试。

def main():
# ....省略配置处理相关代码
    tcp_servers = []
    udp_servers = [] 
    dns_resolver = asyncdns.DNSResolver()
    port_password = config['port_password']
    del config['port_password']
    for port, password in port_password.items():
        a_config = config.copy()
        a_config['server_port'] = int(port)
        a_config['password'] = password
        logging.info("starting server at %s:%d" %
                     (a_config['server'], int(port)))
        实例化一个tcprelay.TCPRelay类放入到tcp_servers
        tcp_servers.append(tcprelay.TCPRelay(a_config, dns_resolver, False))
        udp_servers.append(udprelay.UDPRelay(a_config, dns_resolver, False))

程序一开始先实例化一个TCPRelay类,我们看看TCPRelay在初始化的时候干了什么。

def __init__(self, config, dns_resolver, is_local, stat_callback=None):
        # .....省略
        # 判断是SS Local还是SS Server
        if is_local:
            listen_addr = config['local_address']
            listen_port = config['local_port']
        else:
            listen_addr = config['server']
            listen_port = config['server_port']
        self._listen_port = listen_port

        addrs = socket.getaddrinfo(listen_addr, listen_port, 0,
                                   socket.SOCK_STREAM, socket.SOL_TCP)
        if len(addrs) == 0:
            raise Exception("can't get addrinfo for %s:%d" %
                            (listen_addr, listen_port))
        af, socktype, proto, canonname, sa = addrs[0]
        # 创建一个socket
        server_socket = socket.socket(af, socktype, proto)
        server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        # 绑定到监听的端口
        server_socket.bind(sa)
        server_socket.setblocking(False)
        if config['fast_open']:
            try:
                server_socket.setsockopt(socket.SOL_TCP, 23, 5)
            except socket.error:
                logging.error('warning: fast open is not available')
                self._config['fast_open'] = False
        # 监听端口
        server_socket.listen(1024)
        self._server_socket = server_socket

初始化的时候根据配置项新建一个socket并绑定到端口进行监听,之后进入到run_server()函数

    def run_server():
        # ......
        try:
            # 实例化一个EventLoop类事件循环
            # 之后将上面初始化的dns、tcp和udp类加入循环中
            loop = eventloop.EventLoop()
            dns_resolver.add_to_loop(loop)
            list(map(lambda s: s.add_to_loop(loop), tcp_servers + udp_servers))

            daemon.set_user(config.get('user', None))
            loop.run()
        # ...

再来看看EventLoop初始化干了什么

class EventLoop(object):
    def __init__(self):
        # 根据操作系统选择不同的方法,将select、poll都包装成了epoll的用法
        if hasattr(select, 'epoll'):
            self._impl = select.epoll()
            model = 'epoll'
        elif hasattr(select, 'kqueue'):
            self._impl = KqueueLoop()
            model = 'kqueue'
        elif hasattr(select, 'select'):
            self._impl = SelectLoop()
            model = 'select'
        else:
            raise Exception('can not find any available functions in select '
                            'package')
        # 这个很重要,建立一个以fd为key,(f,handler)为值的字典,之后会根据这个映射分发事件到各自的handler
        self._fdmap = {}  # (f, handler)
        self._last_time = time.time()
        self._periodic_callbacks = []
        self._stopping = False
        logging.debug('using event model: %s', model)

然后是add_to_loop()函数

    def add_to_loop(self, loop):
        if self._eventloop:
            raise Exception('already add to loop')
        if self._closed:
            raise Exception('already closed')
        self._eventloop = loop
        # 加入到事件循环中,并监听socket的可读和错误事件
        self._eventloop.add(self._server_socket,
                            eventloop.POLL_IN | eventloop.POLL_ERR, self)
        self._eventloop.add_periodic(self.handle_periodic)

在调用self._eventloop.add()函数时,会在EventLoop类中建立一个映射

    def add(self, f, mode, handler):
        fd = f.fileno()
        # 这个很重要
        self._fdmap[fd] = (f, handler)
        self._impl.register(fd, mode)

到这边总结下程序做的工作,初始化一个TCPRelay类监听本地端口,将它加入事件循环,监听socket的可读和错误事件,事件循环中有一个映射,将fd(文件描述符)映射到对应的(socket,handler),以便之后检测到有请求发生的时候,将请求分发给各自的handler处理,事件循环只管监听事件。

接下来进入到核心的loop.run()函数中。

    def run(self):
        events = []
        while not self._stopping:
            asap = False
            try:
                # 当有新的请求发送到监听的本地端口时,TCPRelay可读
                events = self.poll(TIMEOUT_PRECISION)
            # ......
            for sock, fd, event in events:
                # 查询fd对应的handler,这个handler是最开始初始化的TCPRelay
                handler = self._fdmap.get(fd, None)
                if handler is not None:
                    handler = handler[1]
                    try:
                        直接交给handler处理事件
                        handler.handle_event(sock, fd, event)
                    # ......

然后事件循环继续监听事件。
继续看handler.handle_event()函数怎么处理请求的。

    def handle_event(self, sock, fd, event):
        # 这边注意下
        if sock == self._server_socket:
            try:
                logging.debug('accept')
                # 接受到新请求之后会生成一个conn
                conn = self._server_socket.accept()
                TCPRelayHandler(self, self._fd_to_handlers,
                                self._eventloop, conn[0], self._config,
                                self._dns_resolver, self._is_local)
        else:
            if sock:
                handler = self._fd_to_handlers.get(fd, None)
                if handler:
                    handler.handle_event(sock, event)

然后交给TCPRelayHandler处理请求
TCPRelayHandler在初始化的时候会将conn这个socket加入到loop循环中监听可读或者错误事件

if sock == self._server_socket判断当前socket是客户端第一次请求连接还是客户端连接之后发送的请求。
如果是客户端第一次连接,调用TCPRelayHandler,会将新的socket加入事件循环,并且在TCPRelay._fd_to_handlers添加映射,这个等下再讲用途。
如果是客户端在连接之后发来的请求的话,会在self._fd_to_handlers.get()查找handler,也就是客户端第一次连接创建的映射,此时handler是TCPRelayHandler,进入handle_event()函数后,这时候如果请求正常的话会进入到self._on_local_read()函数

    def _on_local_read(self):
        # ......
        data = None
        try:
            # 读取请求内容
            data = self._local_sock.recv(BUF_SIZE)
        # ......
        if not is_local:
            # 将请求解密
            data = self._encryptor.decrypt(data)
        # ......
            self._handle_stage_addr(data)

之后就是TCPRelayHandler在负责处理数据了。

所以处理的流程大致理清了,我画了一张流程图,更好地理清思路。

shadowsocks源码解读(二):SS Server工作流程

总结

shadowsocks原理并不难,主要是程序一直处在EventLoop的循环中,需要在循环里面观察各个模块的运行情况,而且SS Server和SS Local共用同一个TCPRelay,所以就增加了代码的复杂度,只要耐心看下去,摸清楚哪个模块干了些什么事之后就简单了。

正文完
 
root
版权声明:本站原创文章,由 root 2016-12-22发表,共计4645字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。