python与C结构体之间数据转换
前言
在实际应用中,可能会遇到直接和C进行二进制字节流协议通信,这时要把数据解包成python数据,如果可能,最好与C定义的结构体完全对应上.
python中有2种方式,可处理二进制数据转换- 用ctypes包的
Structure
直接定义结构体 - 用struct包的
pack/unpack
函数组装转换
在转换时一定要注意==字节序==,这两种方式都有各自的方法标志字节序.
使用ctypes包
ctypes中有许多C中的操作接口,如sizeof
,memmove
等,也提供近似C结构体的模拟类Structure
,BigEndianStructure
,Union
,显然的是BigEndianStructure
是网络字节序(大端),方便直接用于网络传输,Union
和Structure
是主机序(可能是大端,也可能是小端,和本机有关).
Structure/BigEndianStructure使用
from ctypes import *class SSHead(BigEndianStructure): _pack_ = 1 _fields_ = [ #(字段名, c类型 ) ('nTotalSize', c_uint32), ('nSourceID', c_int32), ('sourceType', c_uint8), ('destType', c_uint8), ('transType', c_uint8), ('nDestID', c_int32), ('nFlag', c_uint8), ('nOptionalLength', c_uint16), ('arrOptional', c_char * 20), ] def encode(self): return string_at(addressof(self), sizeof(self)) def decode(self, data): memmove(addressof(self), data, sizeof(self)) return len(data)# -------------------# 使用sshead = SSHead()sshead.nSourceID = 20 #省略其他赋值buf = sshead.encode()ss = SSHead()ss.decode(buf)print(ss.nSourceID)
以上就是一个简单协议结构体定义,对应的C版本如下
struct SSHead{ uint32_t nTotalSize; int32_t nSourceID; uint8_t sourceType; uint8_t destType; uint8_t transType; int32_t nDestID; int8_t nFlag; uint16_t nOptionalLength; char arrOptional[20]; //简单模拟python的打包解包 int encode(char* buf, size_t max_len) { memmove(buf, this, sizeof(this)); return 0; } int decode(char* buf, size_t len) { memmove(this, buf, len); return 0; }}// c中对应的 打包/解包流程(假设本机字节序为大端)SSHead sshead = {0};sshead.nSourceID = 20;char buf[1024];sshead.encode(buf);SSHead ss = {0};ss.decode(buf, sizeof(ss));
其中_pack_ = 1
表示1字节对齐,不然可能会被填充,导致结构体实际所占字节数与表面上的不一样.
_fields_
定义C结构体中相对应的字段名和类型,C中每种基础类型在ctypes都有与之对应的类型,如c_uint32
对应uint32_t
,占4个字节.数组就是后面乘以对应的长度即可,如c_uint8 * 20
.另外还支持嵌套定义结构体.在实例化后,字段名会成为成员变量,可直接赋值. encode
会直接得到该对象的二进制数据,如果不考虑字节序,则与C中相同对象的二进制数据是一样的
decode
相反,直接解包二进制数据为python数据 这样python和c就可以直接通过结构体定义协议通信了. 注意
- python中的二进制数据是==bytes==类型,不是==str==类型
- 在python3.6及之前的版本,是没有
BigEndianUnion
类型 - 用来网络传输一定要用
BigEndianStructure
,不然会有字节序问题
缺点
此方法只能适用于结构体固定打解包的情况,如果协议中有大数组,但数组中的数据只有前几个是有效的,后面都是无效的,一般在打包的时候只打包有效数据,这种情况用Structure
就不合适了.
使用struct包
struct模块是专门用来处理python与C之间的二进制数据转换,总共只有几个函数
下面在原有的SSHead定义中增加2个使用struct打包解包的函数
from ctypes import *import structclass SSHead(BigEndianStructure): _pack_ = 1 _fields_ = [ #(字段名, c类型 ) ('nTotalSize', c_uint32), ('nSourceID', c_int32), ('sourceType', c_uint8), ('destType', c_uint8), ('transType', c_uint8), ('nDestID', c_int32), ('nFlag', c_uint8), ('nOptionalLength', c_uint16), ('arrOptional', c_char * 20), ] def encode(self): return string_at(addressof(self), sizeof(self)) def decode(self, data): memmove(addressof(self), data, sizeof(self)) return len(data) def pack(self): buffer = struct.pack("!IIBBBIBH20s", self.nTotalSize, self.nSourceID, self.sourceType , self.destType, self.transType, self.nDestID, self.nFlag, self.nOptionalLength, self.arrOptional) return buffer def unpack(self, data): (self.nTotalSize, self.nSourceID, self.sourceType, self.destType, self.transType, self.nDestID, self.nFlag, self.nOptionalLength, self.arrOptional) = struct.unpack("!IIBBBIBH20s", data)# ---------------------------# 测试 s = SSHead()s.arrOptional = b'hello'ss = SSHead()ss.unpack(s.encode())print(ss.arrOptional)
pack/unpack的fmt(格式化串)说明
"!IIBBBIBH20B"
:!
表示按照网络序处理,I
表示后面的第一变量为4字节的int
型,接着的B
表示为下一个变量为1字节的uint8_t
型,以此类推,20s
表示后面是长度20的字节数组
缺点
上面的例子中如果使用pakc/unpack
方法,是不用继承BigEndianStructure
,只需自定义相应字段变量.
Structure
相比,比较灵活,可以自由组合怎么打包,比如在nOptionalLength=0
时,不打包arrOptional
字段.缺点就是,定义pack/unpack
函数时,协议多起来会非常繁琐且容易出错.所以最好是自动化生成pack/unpack
函数. 自动化生成pack/unpack
定义结构体成员列表
显然,我们需要知道结构体成员的变量名和类型,参考Structure
,有如下定义
class BaseCode(object): _type_map_index_pack_tag = 1 _type_map_index_pack_size = 2 _type_map = { # C类型:(说明, 编码标志) 'char': ('int', 'B'), 'uint32_t': ('int', 'I'), 'string': ('str', 'B'), 'int32_t': ('int', 'i'), 'int64_t': ('int', 'q'), 'uint64_t': ('int', 'Q'), 'float': ('float', 'f'), 'double': ('double', 'd'), } # 每种基础类型所占字节数 _ctype_size_map = {'I': 4, 'B': 1, 'i': 4, 'b': 1, 'Q': 8, 'q': 8, 'f': 4, 'd': 8} _fields_index_ctype = 0 _fields_index_value_name = 1 _fields_index_array_length = 2 # 测试 _fields = [ # (C类型, 变量名) ('uint32_t', 'nUint'), ('string', 'szString', '_Const.enmMaxAccountIDLength'), ('int32_t', 'nInt3'), ('uint32_t', 'nUintArray', 4), ]
按序遍历_fields中的字段
对_fields中的每个元素,进行编码,通过变量名可获得实际变量值,通过C类型利用struct.pack/unpack
可获得实际编码
def encode(self, nest=1): data = b'' tmp = b'' debug_log("&" * nest, self.__class__.__name__, "encode struct start :") for one in self._fields: debug_log("#" * nest, "encode one element:", one) ctype = one[self._fields_index_ctype] value = getattr(self, one[self._fields_index_value_name]) if len(one) == 3: length = one[self._fields_index_array_length] if type(length) == str: length = eval(length) tmp = self._encode_array(ctype, value, length) else: # 不是基础类型,即嵌套定义 if ctype not in BaseCode._type_map: tmp = value.encode(nest+1) else: fmt = '!' + self._type_map[ctype][self._type_map_index_pack_tag] tmp = struct.pack(fmt, value) # debug_log(fmt, type(value), value) debug_log("#" * nest,"encode one element:", len(tmp), tmp) data += tmp debug_log("&" * nest, self.__class__.__name__, "encode end: len=", len(data), data) return data def _encode_array(self, ctype, value, max_length): """ 打包数组 如果是字符串类型 需要做下特殊处理 :param ctype: :param value: :param max_length: :return: """ debug_log('ctype:', ctype, type(ctype)) if ctype == 'string': max_length -= 1 # 字符串长度需要减一 value = bytes(value, encoding='utf8') #print(value) if len(value) > max_length: raise EncodeError('the length of array is too long') # pack长度 data = struct.pack('!H', len(value)) debug_log("array count:", len(value), "value:", value, type(value)) # pack数组内容 for one in value: #debug_log("self._type_map[ctype][1]=", self._type_map[ctype][self._type_map_index_pack_tag], one) if ctype not in BaseCode._type_map: data += one.encode() else: data += struct.pack('!' + self._type_map[ctype][self._type_map_index_pack_tag], one) return data
数组类型在python中使用list
表示,在打包数组类型之前会添加==2字节表示数组长度==
bytes
类型,然后就和普通数组一样,一个元素一个元素处理(实际在for遍历中,一个元素是一个int
,和C中一样,所以用B
标志打包) 当==c类型==不是_type_map
中的基础类型,那就是自定义的结构体类型,然后嵌套调用encode就可以了 目前没有考虑union
的处理 解码,反向处理
def decode(self, data, offset=0, nest=1): """ :param data: :return: """ debug_log("&" * nest, self.__class__.__name__, "decode struct start :") for one in self._fields: debug_log("#" * nest, "decode one element:", one) ctype = one[self._fields_index_ctype] if len(one) == 3: offset = self._decode_array(one, data, offset, nest) else: ctype_attr = self._type_map[ctype] if ctype not in BaseCode._type_map: value = eval(ctype + '()') offset = value.decode(data, offset, nest) setattr(self, one[self._fields_index_value_name], value) else: fmt = '!' + ctype_attr[self._type_map_index_pack_tag] value, = struct.unpack_from(fmt, data, offset) offset += self._ctype_size_map[ctype_attr[self._type_map_index_pack_tag]] debug_log(one, one[self._fields_index_value_name]) setattr(self, one[self._fields_index_value_name], value) debug_log("#" * nest, "decode one element end:", offset, one) return offset def _decode_array(self, field, data, offset, nest): ctype = field[self._fields_index_ctype] array_num, = struct.unpack_from('!H', data, offset) offset += 2 value = [] ctype_attr = self._type_map[ctype] debug_log("$" * nest, "decode array count", array_num, field) while array_num > 0: array_num -= 1 if ctype not in BaseCode._type_map: one = eval(ctype + '()') offset = one.decode(data, offset, nest) value.append(one) else: one, = struct.unpack_from('!' + ctype_attr[self._type_map_index_pack_tag], data, offset) value.append(one) offset += self._ctype_size_map[ctype_attr[self._type_map_index_pack_tag]] if ctype == 'string': # 这里是因为字符串是按照单个字符解包,会解成python的int,通过chr()转化为字符型 # value = [97,98] # list(map(chr,value)) 后等于 ['a','b'] # ''.join() 就转成'ab' value = ''.join(list(map(chr, value))) value = bytes(value, encoding='latin1').decode('utf8') setattr(self, field[self._fields_index_value_name], value) debug_log("$" * nest, "decode array ok", array_num, field) return offset
最后
完整代码:
包含简单测试和转成字典结构
在python3.5下运行成功希望帮到各位!!