博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python与C结构体之间二进制数据转换
阅读量:6413 次
发布时间:2019-06-23

本文共 9822 字,大约阅读时间需要 32 分钟。

python与C结构体之间数据转换

前言

在实际应用中,可能会遇到直接和C进行二进制字节流协议通信,这时要把数据解包成python数据,如果可能,最好与C定义的结构体完全对应上.

python中有2种方式,可处理二进制数据转换

  • 用ctypes包的Structure直接定义结构体
  • 用struct包的pack/unpack函数组装转换

在转换时一定要注意==字节序==,这两种方式都有各自的方法标志字节序.

使用ctypes包

ctypes中有许多C中的操作接口,如sizeof,memmove等,也提供近似C结构体的模拟类Structure,BigEndianStructure,Union,显然的是BigEndianStructure是网络字节序(大端),方便直接用于网络传输,UnionStructure是主机序(可能是大端,也可能是小端,和本机有关).

Structure/BigEndianStructure使用

from ctypes import *class SSHead(BigEndianStructure):    _pack_ = 1    _fields_ = [        #(字段名, c类型 )        ('nTotalSize', c_uint32),        ('nSourceID', c_int32),        ('sourceType', c_uint8),        ('destType', c_uint8),        ('transType', c_uint8),        ('nDestID', c_int32),        ('nFlag', c_uint8),        ('nOptionalLength', c_uint16),        ('arrOptional', c_char * 20),    ]        def encode(self):        return string_at(addressof(self), sizeof(self))    def decode(self, data):        memmove(addressof(self), data, sizeof(self))        return len(data)# -------------------# 使用sshead = SSHead()sshead.nSourceID = 20 #省略其他赋值buf = sshead.encode()ss = SSHead()ss.decode(buf)print(ss.nSourceID)

以上就是一个简单协议结构体定义,对应的C版本如下

struct SSHead{    uint32_t nTotalSize;    int32_t nSourceID;    uint8_t sourceType;    uint8_t destType;    uint8_t transType;    int32_t nDestID;    int8_t nFlag;    uint16_t nOptionalLength;    char arrOptional[20];        //简单模拟python的打包解包    int encode(char* buf, size_t max_len)    {        memmove(buf, this, sizeof(this));        return 0;    }    int decode(char* buf, size_t len)    {        memmove(this, buf, len);        return 0;    }}// c中对应的 打包/解包流程(假设本机字节序为大端)SSHead sshead = {0};sshead.nSourceID = 20;char buf[1024];sshead.encode(buf);SSHead ss = {0};ss.decode(buf, sizeof(ss));

其中_pack_ = 1表示1字节对齐,不然可能会被填充,导致结构体实际所占字节数与表面上的不一样.

_fields_定义C结构体中相对应的字段名和类型,C中每种基础类型在ctypes都有与之对应的类型,如c_uint32对应uint32_t,占4个字节.数组就是后面乘以对应的长度即可,如c_uint8 * 20.另外还支持嵌套定义结构体.在实例化后,字段名会成为成员变量,可直接赋值.

encode会直接得到该对象的二进制数据,如果不考虑字节序,则与C中相同对象的二进制数据是一样的

decode相反,直接解包二进制数据为python数据
这样python和c就可以直接通过结构体定义协议通信了.

注意

  • python中的二进制数据是==bytes==类型,不是==str==类型
  • 在python3.6及之前的版本,是没有BigEndianUnion类型
  • 用来网络传输一定要用BigEndianStructure,不然会有字节序问题

缺点

此方法只能适用于结构体固定打解包的情况,如果协议中有大数组,但数组中的数据只有前几个是有效的,后面都是无效的,一般在打包的时候只打包有效数据,这种情况用Structure就不合适了.

使用struct包

struct模块是专门用来处理python与C之间的二进制数据转换,总共只有几个函数

下面在原有的SSHead定义中增加2个使用struct打包解包的函数

from ctypes import *import structclass SSHead(BigEndianStructure):    _pack_ = 1    _fields_ = [        #(字段名, c类型 )        ('nTotalSize', c_uint32),        ('nSourceID', c_int32),        ('sourceType', c_uint8),        ('destType', c_uint8),        ('transType', c_uint8),        ('nDestID', c_int32),        ('nFlag', c_uint8),        ('nOptionalLength', c_uint16),        ('arrOptional', c_char * 20),    ]        def encode(self):        return string_at(addressof(self), sizeof(self))    def decode(self, data):        memmove(addressof(self), data, sizeof(self))        return len(data)            def pack(self):        buffer = struct.pack("!IIBBBIBH20s", self.nTotalSize, self.nSourceID, self.sourceType                             , self.destType, self.transType, self.nDestID, self.nFlag, self.nOptionalLength, self.arrOptional)        return buffer        def unpack(self, data):        (self.nTotalSize, self.nSourceID, self.sourceType, self.destType, self.transType, self.nDestID,        self.nFlag, self.nOptionalLength, self.arrOptional) = struct.unpack("!IIBBBIBH20s", data)# ---------------------------# 测试        s = SSHead()s.arrOptional = b'hello'ss = SSHead()ss.unpack(s.encode())print(ss.arrOptional)

pack/unpack的fmt(格式化串)说明

"!IIBBBIBH20B":!表示按照网络序处理,I表示后面的第一变量为4字节的int型,接着的B表示为下一个变量为1字节的uint8_t型,以此类推,20s表示后面是长度20的字节数组

其他参数可参考官方文档.

缺点

上面的例子中如果使用pakc/unpack方法,是不用继承BigEndianStructure,只需自定义相应字段变量.

可以看到,struct.pack/unpack必须对每个字段代表什么类型,几个字节进行描述.与Structure相比,比较灵活,可以自由组合怎么打包,比如在nOptionalLength=0时,不打包arrOptional字段.缺点就是,定义pack/unpack函数时,协议多起来会非常繁琐且容易出错.所以最好是自动化生成pack/unpack函数.

自动化生成pack/unpack

定义结构体成员列表

显然,我们需要知道结构体成员的变量名和类型,参考Structure,有如下定义

class BaseCode(object):    _type_map_index_pack_tag = 1    _type_map_index_pack_size = 2    _type_map = {        # C类型:(说明, 编码标志)        'char': ('int', 'B'),        'uint32_t': ('int', 'I'),        'string': ('str', 'B'),        'int32_t': ('int', 'i'),        'int64_t': ('int', 'q'),        'uint64_t': ('int', 'Q'),        'float': ('float', 'f'),        'double': ('double', 'd'),    }    # 每种基础类型所占字节数    _ctype_size_map = {'I': 4, 'B': 1, 'i': 4, 'b': 1, 'Q': 8, 'q': 8, 'f': 4, 'd': 8}    _fields_index_ctype = 0    _fields_index_value_name = 1    _fields_index_array_length = 2    # 测试        _fields = [        # (C类型, 变量名)        ('uint32_t', 'nUint'),        ('string', 'szString', '_Const.enmMaxAccountIDLength'),        ('int32_t', 'nInt3'),        ('uint32_t', 'nUintArray', 4),    ]

按序遍历_fields中的字段

对_fields中的每个元素,进行编码,通过变量名可获得实际变量值,通过C类型利用struct.pack/unpack可获得实际编码

下面是添加的类成员函数encode

def encode(self, nest=1):        data = b''        tmp = b''        debug_log("&" * nest, self.__class__.__name__, "encode struct start :")        for one in self._fields:            debug_log("#" * nest, "encode one element:", one)            ctype = one[self._fields_index_ctype]            value = getattr(self, one[self._fields_index_value_name])            if len(one) == 3:                length = one[self._fields_index_array_length]                if type(length) == str:                    length = eval(length)                tmp = self._encode_array(ctype, value, length)            else:                # 不是基础类型,即嵌套定义                if ctype not in BaseCode._type_map:                    tmp = value.encode(nest+1)                else:                    fmt = '!' + self._type_map[ctype][self._type_map_index_pack_tag]                    tmp = struct.pack(fmt, value)                    # debug_log(fmt, type(value), value)            debug_log("#" * nest,"encode one element:", len(tmp), tmp)            data += tmp        debug_log("&" * nest, self.__class__.__name__, "encode end: len=", len(data), data)        return data    def _encode_array(self, ctype, value, max_length):        """        打包数组        如果是字符串类型 需要做下特殊处理        :param ctype:        :param value:        :param max_length:        :return:        """        debug_log('ctype:', ctype, type(ctype))        if ctype == 'string':            max_length -= 1  # 字符串长度需要减一            value = bytes(value, encoding='utf8')            #print(value)        if len(value) > max_length:            raise EncodeError('the length of  array is too long')        # pack长度        data = struct.pack('!H', len(value))        debug_log("array count:", len(value), "value:", value, type(value))        # pack数组内容        for one in value:            #debug_log("self._type_map[ctype][1]=", self._type_map[ctype][self._type_map_index_pack_tag], one)            if ctype not in BaseCode._type_map:                data += one.encode()            else:                data += struct.pack('!' + self._type_map[ctype][self._type_map_index_pack_tag], one)        return data

数组类型在python中使用list表示,在打包数组类型之前会添加==2字节表示数组长度==

字符串类型转换为bytes类型,然后就和普通数组一样,一个元素一个元素处理(实际在for遍历中,一个元素是一个int,和C中一样,所以用B标志打包)
当==c类型==不是_type_map中的基础类型,那就是自定义的结构体类型,然后嵌套调用encode就可以了
目前没有考虑union的处理

解码,反向处理

def decode(self, data, offset=0, nest=1):        """        :param data:        :return:        """        debug_log("&" * nest, self.__class__.__name__, "decode struct start :")        for one in self._fields:            debug_log("#" * nest, "decode one element:", one)            ctype = one[self._fields_index_ctype]            if len(one) == 3:                offset = self._decode_array(one, data, offset, nest)            else:                ctype_attr = self._type_map[ctype]                if ctype not in BaseCode._type_map:                    value = eval(ctype + '()')                    offset = value.decode(data, offset, nest)                    setattr(self, one[self._fields_index_value_name], value)                else:                    fmt = '!' + ctype_attr[self._type_map_index_pack_tag]                    value, = struct.unpack_from(fmt, data, offset)                    offset += self._ctype_size_map[ctype_attr[self._type_map_index_pack_tag]]                    debug_log(one, one[self._fields_index_value_name])                    setattr(self, one[self._fields_index_value_name], value)            debug_log("#" * nest, "decode one element end:", offset, one)        return offset    def _decode_array(self, field, data, offset, nest):        ctype = field[self._fields_index_ctype]        array_num, = struct.unpack_from('!H', data, offset)        offset += 2        value = []        ctype_attr = self._type_map[ctype]        debug_log("$" * nest, "decode array count", array_num, field)        while array_num > 0:            array_num -= 1            if ctype not in BaseCode._type_map:                one = eval(ctype + '()')                offset = one.decode(data, offset, nest)                value.append(one)            else:                one, = struct.unpack_from('!' + ctype_attr[self._type_map_index_pack_tag], data, offset)                value.append(one)                offset += self._ctype_size_map[ctype_attr[self._type_map_index_pack_tag]]        if ctype == 'string':            # 这里是因为字符串是按照单个字符解包,会解成python的int,通过chr()转化为字符型            # value = [97,98]            # list(map(chr,value)) 后等于 ['a','b']            # ''.join() 就转成'ab'            value = ''.join(list(map(chr, value)))            value = bytes(value, encoding='latin1').decode('utf8')        setattr(self, field[self._fields_index_value_name], value)        debug_log("$" * nest, "decode array ok", array_num, field)        return offset

最后

完整代码:

包含简单测试和转成字典结构

在python3.5下运行成功

希望帮到各位!!

Buy me a coffee

image

转载于:https://www.cnblogs.com/iclodq/p/9216763.html

你可能感兴趣的文章
长平狐 memcached源代码阅读笔记(二):网络处理部分
查看>>
android onNewIntent
查看>>
实战利用腾讯企业邮箱zabbix3.x邮件(微信/QQ/短信)告警详细配置
查看>>
干掉运营商:神奇盒子让你自建GSM 网络
查看>>
配置企业级wlan
查看>>
iOS各种调试技巧豪华套餐(上)
查看>>
MCSegmentedControl
查看>>
开源 免费 java CMS - FreeCMS1.7 选择管理站点
查看>>
开源 java CMS - FreeCMS2.2 模板管理
查看>>
Vagrant介绍
查看>>
XML特殊符号
查看>>
kaptcha可配置项
查看>>
JavaMail邮箱验证用户注册
查看>>
EXECUTORSERVICE线程池讲解
查看>>
rsync通过服务同步、日志文件、screen工具
查看>>
系统时间——ntpd
查看>>
07-利用思维导图梳理JavaSE-包与访问控制权限
查看>>
Javascript中eval函数的使用方法与示例
查看>>
JavaScript进阶1-1
查看>>
反射实现AOP动态代理模式(Spring AOP实现原理)
查看>>