盒子
盒子
Posts List
  1. 0x00 websocket的快速使用
  2. 0x00 连接过程解析
    1. 1、SocketServer
    2. 2、websocket连接:Handshake
    3. 2、websocket接收数据
    4. 3、websocket发送数据

websocket学习总结

基于python的websocet服务端代码阅读

0x00 websocket的快速使用

WebSocket是一个基于HTTP协议的持久化的协议,用于服务端与客户端的长连接
快速上手的资源来源:
https://pypi.python.org/pypi?%3Aaction=search&term=websocket&submit=search
or
https://github.com/search?l=Python&q=websocket&type=Repositories&utf8=%E2%9C%93

推荐:
https://github.com/Pithikos/python-websocket-server
https://pypi.python.org/pypi/websocket-server/0.4

websocket_server的使用:
超方便,超喜欢!
提供了三个功能性函数和设定函数的接口:

  • new_client (新客户端握手成功) :set_fn_new_client()
  • client_left (客户端关闭连接) :set_fn_client_left()
  • message_received (收到客户端的消息) :set_fn_message_received()

两个server类的方法:

  • send_message(client,message)
  • send_message_to_all(msg)
from websocket_server import WebsocketServer

# Called for every client connecting (after handshake)
def new_client(client, server):
    print("New client connected and was given id %d" % client['id'])
    server.send_message_to_all("Hey all, a new client has joined us")


# Called for every client disconnecting
def client_left(client, server):
    print("Client(%d) disconnected" % client['id'])


# Called when a client sends a message
def message_received(client, server, message):
    if len(message) > 200:
        message = message[:200]+'..'
    print("Client(%d) said: %s" % (client['id'], message))


PORT=9001
server = WebsocketServer(PORT)
server.set_fn_new_client(new_client)
server.set_fn_client_left(client_left)
server.set_fn_message_received(message_received)
server.run_forever()

0x00 连接过程解析

websocket_server代码阅读
取出相应模块打造一个简单websocket服务端

1、SocketServer

server:

SocketServer是python的网络服务框架模块,提供了4种基础服务器类:

  • BaseServer:所有服务器类的父类,定义了服务器类的所有方法接口,不直接使用
  • TCPServer/UDPServer:同步TCP(服务器和客户端之间的连续数据流)/UDP服务器(无序间断的数据包)
  • UnixStreamServer/UnixDatagramServer:使用Unix域套接字,适用于Unix平台

参数:
server_address(服务器地址), RequestHandlerClass(请求处理器类), bind_and_activate=True(绑定和监听服务器)
属性:
daemon_threads:指示服务器是否要等待线程终止,要是线程互相独立,必须要设置为True,默认是False
allow_reuse_address:地址的重用,设置allow_reuse_address = True(默认为False)
基础服务器类的派生图

以上4种服务器同步传递数据,每个请求必须等到上一个请求完成之后才处理,为了实现进程和线程化,提供了一下几类:

  • ForkingMixIn/ ThreadingMixIn:实现进程化/线程化,提供异步特性,不直接实例化
  • ForkingTCPServer/ ForkingUDPServer:ForkingMixIn和TCPServer/UDPServer的组合类,继承了ForkingMixIn和TCPServer/UDPServer
  • ThreadingTCPServer/ThreadingUDPServer:ThreadingMixIn和TCPServer/UDPServer的组合类,继承了ThreadingMixIn和TCPServer/UDPServer

异步服务器类派生图

handler:

请求处理类,提供一下三种:

  • BaseRequestHandler:提供处理服务请求的核心功能,不能直接实例化
  • StreamRequestHandler/ DatagramRequestHandler:用于TCP/UDP服务器的服务请求处理,派生于BaseRequestHandler

根据上面的特性,在程序中使用 ThreadingTCPServer和StreamRequestHandler

class MyStreamRequestHandler(StreamRequestHandler):  
    allow_reuse_address = True
    daemon_threads = True
    #……other settings

    #重写handle函数
    def handle(self): 
    #……some codes

2、websocket连接:Handshake

获取连接的http请求包:

GET / HTTP/1.1
Host: localhost:9999
Connection: Upgrade
Pragma: no-cache
Cache-Control: no-cache
Upgrade: websocket
Origin: file://
Sec-WebSocket-Version: 13
User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.75 Safari/537.36
Accept-Encoding: gzip, deflate, sdch
Accept-Language: zh-CN,zh;q=0.8,zh-TW;q=0.6
Sec-WebSocket-Key: YtMH+mxpOjnwpBwj+hYVgg==
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits

需要的参数:Upgrade,Sec-WebSocket-Key

  1. 根据Upgrade判断是否是websocket请求
  2. 生成Sec-WebSocket-Accept:
    将Sec-WebSocket-Key和GUID( GUID是一个固定值 )合并,依次经过Sha1加密和Base64加密,生成Sec-WebSocket-Accept,返回给客户端
  3. 返回给客户端
    HTTP/1.1 101 Switching Protocols
    Upgrade: websocket
    Connection: Upgrade
    Sec-WebSocket-Accept: %s

tips:
老版的websocket的握手方式不同,key由请求包中的Sec-WebSocket-Key1、Sec-WebSocket-Key2和客户端握手请求的最后8个字节合并加密生成

handshake代码:

def handshake(self):
    try: 
        #接收request包
        data = self.request.recv(1024).decode('unicode-escape','ignore').strip()
        print "receive from (%r):%r" % (self.client_address, data) 
    except:  
        traceback.print_exc()  
    #匹配upgrade,判断是否websocket请求
    upgrade = re.search('\nupgrade[\s]*:[\s]*websocket', data.lower())
    if not upgrade:
        self.keep_alive = False
        return
    #获取Sec-WebSocket-Accept
    key = re.search('\n[sS]ec-[wW]eb[sS]ocket-[kK]ey[\s]*:[\s]*(.*)\r\n', data)
    if key:
        key = key.group(1)
        key_return = self.generate_token(key)
        response = \
      'HTTP/1.1 101 Switching Protocols\r\n'\
      'Upgrade: websocket\r\n'              \
      'Connection: Upgrade\r\n'             \
      'Sec-WebSocket-Accept: %s\r\n'        \
      '\r\n' % key_return
        #发送返回包
        self.handshake_status = self.request.send(response)
        #设置client连接状态为true
        self.valid_client = True

    else:
        print("Client tried to connect but was missing a key")
        self.keep_alive = False 

2、websocket接收数据

数据帧结构

  • 0位,FIN,表示消息是否是最后一帧,1:最后一帧;0:非最后一帧
  • 1-3位,RSV1,RSV2,RSV3,预留空间
  • 4-7位,Opcode,表示数据类型,0x8:关闭连接, 0x0 为继续,0x1 文本 (以 utf-8 编码),0x2 为二进制数据

第一个字节b1处理:

fin    = b1 & 0x80  
opcode = b1 & 0x0f 
  • 8位,MASK,定义payload数据是否进行了掩码处理,1:进行了掩码处理
  • 9-15位,Payload len,payload数据的长度,126:payload长度位为7+16;127:payload长度位为7+64

第二个字节b2处理:

mask = b2 & 0x80
payload_length = b2 & 0x7f 
  • 16-79位 Extend payload length,payload长度扩展位
  • 80-111位 Masking-key,MASK=1时,掩码解密密钥,MASK=0:无Masking-key

第三~十四字节处理:

payload_length == 126:
    payload_length = struct.unpack(">H", self.rfile.read(2))[0]
elif payload_length == 127:
    payload_length = struct.unpack(">Q", self.rfile.read(8))[0]

获取数据:

  • 112-最后一位 Payload,传送的数据

继续从数据帧读取相应长度的字节,按位循环读取数据,并用掩码的已读取的数据长度对4取模位组来和当前获取到的数据异或

masks = map(ord,self.rfile.read(4))
decoded = ""
for char in map(ord,self.rfile.read(payload_length)):
    char ^= masks[len(decoded) % 4]
    decoded += chr(char)   

3、websocket发送数据

服务端像客户端发送数据,不需要掩码加密
消息头:

  • FIN:0x80 ,OPCODE:0x01(表示文本类型数据),或操作
header.append(FIN | OPCODE_TEXT) 
  • PAYLOAD_LENGTH:
# Normal payload 占7位
if payload_length <= 125:
header.append(payload_length)

# Extended payload 占7位+2个字节
elif payload_length >= 126 and payload_length <= 65535:
header.append(0x7e)
header.extend(struct.pack(">H", payload_length))

# Huge extended payload 占7位+8个字节
elif payload_length < 18446744073709551616:
header.append(0x7f)
header.extend(struct.pack(">Q", payload_length))
  • PAYLOAD:
#payload编码
payload = bytearray(payload,'utf-8')
#发送给客户端
self.request.send(header + payload)

简单服务端结构:

class MyStreamRequestHandler(StreamRequestHandler):  
    allow_reuse_address = True
    daemon_threads = True
    #自定义参数
    keep_alive = True
    handshake_status = False
    valid_client = False

    def handle(self): 
    #自定义方法
    def handshake(self):  
    def generate_token(self, key1):
    def read_message(self):        
    def send_message(self,message):

if __name__ == "__main__":  
    host = "127.0.0.1"       
    port = 9999     
    addr = (host, port)  
    server = ThreadingTCPServer(addr, MyStreamRequestHandler)  
    server.serve_forever() 

最后,简单websocket服务端全代码:

# -*- coding: utf-8 -*-
from SocketServer import ThreadingTCPServer, StreamRequestHandler,TCPServer  
import traceback  
import re
import hashlib
import base64
import sys
import struct


FIN    = 0x80
OPCODE = 0x0f
MASKED = 0x80
PAYLOAD_LEN = 0x7f
PAYLOAD_LEN_EXT16 = 0x7e
PAYLOAD_LEN_EXT64 = 0x7f

OPCODE_TEXT = 0x01
CLOSE_CONN  = 0x8

class MyStreamRequestHandler(StreamRequestHandler):  
    allow_reuse_address = True
    daemon_threads = True
    keep_alive = True
    handshake_status = False
    valid_client = False


    def handle(self): 
        while self.keep_alive:
            if not self.handshake_status:
                self.handshake()
            elif self.valid_client:
                self.read_message()

    def handshake(self):
        try:  
            data = self.request.recv(1024).decode('unicode-escape','ignore').strip()
            print "receive from (%r):%r" % (self.client_address, data) 
        except:  
            traceback.print_exc()  
        upgrade = re.search('\nupgrade[\s]*:[\s]*websocket', data.lower())
        if not upgrade:
            self.keep_alive = False
            return
        key = re.search('\n[sS]ec-[wW]eb[sS]ocket-[kK]ey[\s]*:[\s]*(.*)\r\n', data)
        if key:
            key = key.group(1)
            print key
            key_return = self.generate_token(key)
            print key_return
            response = \
          'HTTP/1.1 101 Switching Protocols\r\n'\
          'Upgrade: websocket\r\n'              \
          'Connection: Upgrade\r\n'             \
          'Sec-WebSocket-Accept: %s\r\n'        \
          '\r\n' % key_return
            self.handshake_status = self.request.send(response)
            self.valid_client = True

        else:
            print("Client tried to connect but was missing a key")
            self.keep_alive = False 

    def generate_token(self, key1):
        key1 = key1.strip()
        GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
        str = key1+GUID
        print str
        str1 = hashlib.sha1(str).digest()
        str2 = base64.b64encode(str1)
        return str2

    def read_message(self):
        bytes = self.rfile.read(2)
        b1,b2 = map(ord, bytes)
        print b1,b2
        fin    = b1 & FIN
        opcode = b1 & OPCODE
        masked = b2 & MASKED

        payload_length = b2&PAYLOAD_LEN 

        if not fin:
            print("Client closed connection.")
            self.keep_alive = 0
            return
        if opcode == CLOSE_CONN:
            print("Client asked to close connection.")
            self.keep_alive = 0
            return
        if not masked:
            print("Client must always be masked.")
            self.keep_alive = 0
            return

        if payload_length == 126:
            payload_length = struct.unpack(">H", self.rfile.read(2))[0]
        elif payload_length == 127:
            payload_length = struct.unpack(">Q", self.rfile.read(8))[0]

        masks = map(ord,self.rfile.read(4))
        decoded = ""
        for char in map(ord,self.rfile.read(payload_length)):
            char ^= masks[len(decoded) % 4]
            decoded += chr(char)   
        print decoded
        if decoded =='123':
            self.send_message('234')

    def send_message(self,message):
        if isinstance(message, bytes):
            message = message.decode('utf-8') # this is slower but assures we have UTF-8
            if not message:
                print("Can\'t send message, message is not valid UTF-8")
                return False
        elif isinstance(message, str) or isinstance(message, unicode):
            pass
        else:
            print('Can\'t send message, message has to be a string or bytes. Given type is %s' % type(message))
            return False

        header  = bytearray()
        payload = message.decode('utf-8') 
        payload_length = len(payload)

        # Normal payload
        if payload_length <= 125:
            header.append(FIN | OPCODE_TEXT)
            header.append(payload_length)

        # Extended payload
        elif payload_length >= 126 and payload_length <= 65535:
            header.append(FIN | OPCODE_TEXT)
            header.append(PAYLOAD_LEN_EXT16)
            header.extend(struct.pack(">H", payload_length))

        # Huge extended payload
        elif payload_length < 18446744073709551616:
            header.append(FIN | OPCODE_TEXT)
            header.append(PAYLOAD_LEN_EXT64)
            header.extend(struct.pack(">Q", payload_length))

        else:
            raise Exception("Message is too big. Consider breaking it into chunks.")
            return
        print payload
        payload = bytearray(payload,'utf-8')
#         发送消息
#         self.request.send(header + payload)

if __name__ == "__main__":  
    host = ""       
    port = 9999     
    addr = (host, port)  
    server = ThreadingTCPServer(addr, MyStreamRequestHandler)  
    server.serve_forever()  
支持一下
扫一扫,支持forsigner