编写上层协议(基于UART协议) =================================== ``libsigrokdecode`` 库的功能非常强大,它可以基于底层协议(例如:UART)增加上层协议(例如:Modbus)。 由于它已经支持很多种基础协议,在编写上层协议时可以大大减少我们的工作量,简化协议解码器的难度。 为了示例方便,我们先来定义一个简单的协议: .. table:: :align: center +----------+----------+----------+ |帧头 |功能码 |校验 | +==========+==========+==========+ |1byte |1byte |1byte | +----------+----------+----------+ .. include:: style.rst :blue:`帧结构 = 帧头 + 功能码 + 校验` - **帧头** : 固定为 ``0xA1`` - **功能码** : 值为0-255, 表示CMD0 ~ CMD255 - **校验** : 帧头与功能码的累加和,如果大于255,取低字节 我们基于 ``UART`` 基础协议来解码这个报文结构, 为了方便,我们基于上一节 ``UART`` 解码器做修改。 具体内容如下: - 注册 ``OUTPUT_PYTHON`` 数据输出类型,该数据可以将底层协议数据传输给上层协议 - 规定 ``OUTPUT_PYTHON`` 数据格式,数据为一个列表,第一项表示数据类型字符串,第二项表示数据值 - 添加 ``OUTPUT_PYTHON`` 输出信息,源码中只增加了两种数据类型: ``data`` , ``Frame error`` 其中: ``data`` 用来传输串口数据, ``Frame error`` 用来提示错误 修改后的 ``alientek`` 解码器源码如下: .. code-block:: python # 导入 sigrokdecode 模块, 并重名为 srd import sigrokdecode as srd # 导入其他需要用到的模块 from common.srdhelper import bitpack from math import floor, ceil # 定义一个异常类 class SamplerateError(Exception): pass # 定义一个 Decoder 类 class Decoder(srd.Decoder): # 必须为3 api_version = 3 # 协议的标识符, 必须唯一 id = 'alientek' # 协议的名字, 不一定要求跟 id 一致 name = 'ALIENTEK' # 协议的长名字 longname = 'Universal Asynchronous Receiver/Transmitter' # 协议简介 desc = 'Asynchronous, serial bus.' # 开源协议信息 license = 'gplv2+' # 协议输入数据源, 默认为逻辑分析仪输的数据 inputs = ['logic'] # 协议输出数据源,在叠加协议时会用到 outputs = ['alientek_uart'] # 协议适用范围标签 tags = ['Embedded/industrial'] # 必须提供给协议的通道,没有他们解码器无法正常工作,允许为空 # id : 通道标识 # name : 通道名字 # desc :通道描述 channels = ( {'id': 'rxtx', 'name': 'RX/TX', 'desc': 'UART transceive line'}, ) # 可选通道,如果相应的协议解码器没有可选通道, 则该元组允许为空,参数与 channels 一样 optional_channels = ( ) # 与协议相关的可选参数,可在协议配置界面自由设置的参数 # Python脚本中可通过 self.options[id] 取值 # id : 标识符 # desc : 描述信息 # default : 默认值 # valuse : 可选值(如果不提供,用户可自由配置) options = ( {'id': 'baudrate', 'desc': 'Baud rate', 'default': 115200}, {'id': 'data_bits', 'desc': 'Data bits', 'default': 8, 'values': (8,)}, {'id': 'parity', 'desc': 'Parity', 'default': 'none', 'values': ('none',)}, {'id': 'stop_bits', 'desc': 'Stop bits', 'default': 1.0, 'values': (1.0,)}, ) # 协议解析结果,有两个属性 # 由标识符字符串和人可度的描述字符串组成 annotations = ( ('start', 'Start'), ('data', 'Data'), ('stop', 'Stop'), ('warning', 'Warning'), ) # 协议解析结果行,用于将多种解析结果组合在一起,有三个属性 # 参数一:标识符,不能与 annotations 元组中的标识符冲突 # 参数二:人可读的描述字符串 # 参数三:一个元组, 包含 annotations 元组中类的索引 annotation_rows = ( ('Data', 'RX/TX', (0, 1, 2,)), ('Warnings', 'Warnings', (3,)), ) # 二进制数据解析结果列表,可以为空 binary = ( ) # 在创建类的实例时被调用,并允许你对对象进行初始化设置 def __init__(self): self.reset() # 注册解码数据类型 def start(self): self.out_ann = self.register(srd.OUTPUT_ANN) self.out_python = self.register(srd.OUTPUT_PYTHON) # 重置参数 def reset(self): self.samplerate = None self.state = 'WAIT FOR START BIT' self.frame_start = 0 self.cur_data_bit = 0 self.databits = [] # 可选函数,传递数据流的一些数据 # 目前 key 只能为 SRD_CONF_SAMPLERATE, 此时 value = 采样率(Hz) def metadata(self, key, value): if key == srd.SRD_CONF_SAMPLERATE: self.samplerate = value # 计算串口数据中一个位的位宽 self.bit_width = float(self.samplerate) / float(self.options['baudrate']) # 获取采样点的位置, 这里以一半的位宽确定数据为的采样点 def get_sample_point(self, bitnum): bitpos = self.frame_start + (self.bit_width - 1) / 2.0 bitpos += bitnum * self.bit_width return bitpos # 输出开始位、结束位、解码错误信息 def putg(self, data): s, halfbit = self.samplenum, self.bit_width / 2.0 self.put(s - floor(halfbit), s + ceil(halfbit), self.out_ann, data) # 输出解码处的有效数据信息 def putx(self, data): halfbit = self.bit_width / 2.0 self.put(self.frame_start + floor(self.bit_width), self.samplenum + ceil(halfbit), self.out_ann, data) # 输出解码错误信息给上层协议 def putp(self, data): s, halfbit = self.samplenum, self.bit_width / 2.0 self.put(s - floor(halfbit), s + ceil(halfbit), self.out_python, data) # 输出解码信息给上层协议 def putpx(self, data): halfbit = self.bit_width / 2.0 self.put(self.frame_start + floor(self.bit_width), self.samplenum + ceil(halfbit), self.out_python, data) def decode(self): # 判断采样率是否设置 if not self.samplerate: raise SamplerateError('Cannot decode without samplerate.') state = 'WAIT FOR START BIT' # 循环处理协议数据 while True: # 1.等待起始位的下降沿 if state == 'WAIT FOR START BIT': # 等待下降沿条件满足,才会返回 self.wait({0: 'f'}) # 条件匹配失败 if self.matched[0] != True: self.putp([3, ['Frame error', 0]]) self.putg([3, ['Frame error', 'Frame err', 'FE']]) continue state = 'GET START BIT' self.frame_start = self.samplenum # 2.获取起始位的电平值,必须为低电平 if state == 'GET START BIT': want_num = ceil(self.get_sample_point(0)) # 等待条件满足返回, wait 将返回当前位置的引脚电平值 rxtx, = self.wait( {'skip': want_num - self.samplenum } ) # 条件匹配失败 if self.matched[0] != True: state = 'WAIT FOR START BIT' self.putp([3, ['Frame error', 0]]) self.putg([3, ['Frame error', 'Frame err', 'FE']]) continue # 起始位电平值必须为低电平 if rxtx != 0: state = 'WAIT FOR START BIT' self.putp([3, ['Frame error', 0]]) self.putg([3, ['Frame error', 'Frame err', 'FE']]) continue self.cur_data_bit = 0 self.databits = [] state = 'GET DATA BITS' # 输出解码信息, 解码信息由数据类型决定 :OUTPUT_ANN / OUTPUT_PYTHON / OUTPUT_BINARY / OUTPUT_META # 当类型为 OUTPUT_ANN 时,数据参数是一个包含两项的 Python 列表 # 第一项是注释索引 (由解码器 annotations 中的项顺序确定) # 第二项是注释字符串列表,也就是上位机显示的解码数据,字符串应该是相同注释文本的更长和更短版本 (按长度排序, 最长的在前) , 上位机根据缩放级别显示不同的注释文本。 self.putg([0, ['Start bit', 'Start', 'S']]) # 3.获取8位数据位 if state == 'GET DATA BITS': bitnum = self.cur_data_bit + 1 want_num = ceil(self.get_sample_point(bitnum)) rxtx, = self.wait( {'skip': want_num - self.samplenum} ) # 条件匹配失败 if self.matched[0] != True: state = 'WAIT FOR START BIT' self.putp([3, ['Frame error', 0]]) self.putg([3, ['Frame error', 'Frame err', 'FE']]) continue # 等待所有数据位接收完毕, 8位数据位 self.databits.append(rxtx) self.cur_data_bit += 1 if self.cur_data_bit < 8 : continue datavalue = bitpack(self.databits) state = 'GET STOP BITS' self.putpx(["data", datavalue]) self.putx([1, ['0x%X' % datavalue]]) self.databits = [] # 4.获取1位停止位 if state == 'GET STOP BITS': # 停止位的位置 : 1位起始位 + 8位数据位 + 1位停止位 want_num = ceil(self.get_sample_point(9)) rxtx, = self.wait( {'skip': want_num - self.samplenum} ) # 条件匹配失败 if self.matched[0] != True: state = 'WAIT FOR START BIT' self.putp([3, ['Frame error', 0]]) self.putg([3, ['Frame error', 'Frame err', 'FE']]) continue # 停止位必须为高电平 if rxtx != 1: state = 'WAIT FOR START BIT' self.putp([3, ['Frame error', 0]]) self.putg([3, ['Frame error', 'Frame err', 'FE']]) continue state = 'WAIT FOR START BIT' self.putg([2, ['Stop bit', 'Stop', 'T']]) 有了底层协议数据后, 就可以新增一个上层协议了,具体步骤如下: - :blue:`1. 新增上层协议解码器文件夹及文件,在 ATK-Logic 软件安装目录 decoders 文件夹下新建 alientek_top 文件夹, 并在文件夹内新增两个文件: __init__.py 和 pd.py` - :blue:`2. 编写 __init__.py 文件内容` .. code-block:: python ''' alientek_top 协议说明信息 可以编写协议相关的一些介绍内容 这里是alientek上层协议 ''' # 导入 pd.py 文件 from .pd import Decoder - :blue:`3. 编写 __pd__.py 文件内容` ``__pd__.py`` 文件结构与前面介绍的内容基本一致,这里只需要注意以下两点: 1. ``inputs = ['alientek_uart']`` 属性值, 这个值特别重要,上下层协议就是通过该值串起来的,上层协议中的 ``inputs`` 必须等于下层协议中的 ``outputs`` 2. ``decode`` 函数,在不同模式下输入参数不一样,在上层协议中为 ``decode(self, startsample, endsample, data)`` ,详细介绍请参考 **协议解码器 (Python脚本) 格式** 章节内容 **上层协议是如何被调用的?** ``ATK-Logic`` 软件在初始化协议库时,会自动根据 ``inputs`` 与 ``outputs`` 将上层协议与下层协议关联起来。 当下层协议通过 ``OUTPUT_PYTHON`` 输出数据时, ``libsigrokdecode`` 会自动调用上层协议中的 ``decode(self, startsample, endsample, data)`` 函数, 数据通过函数输入参数传给上层协议,我们在上层协议中分析处理数据,再将解码数据输出即可完成解码功能。 源码如下: .. code-block:: python # 导入 sigrokdecode 模块, 并重名为 srd import sigrokdecode as srd # 定义一个 Decoder 类 class Decoder(srd.Decoder): # 必须为3 api_version = 3 # 协议的标识符, 必须唯一 id = 'alientek_top' # 协议的名字, 不一定要求跟 id 一致 name = 'ALIENTEK_TOP' # 协议的长名字 longname = 'ALIENTEK TOP Protocol Example' # 协议简介 desc = 'Asynchronous, serial bus.' # 开源协议信息 license = 'gplv2+' # 协议输入数据源, 必须填下层协议中的 outpus 参数 inputs = ['alientek_uart'] # 协议输出数据源,在叠加协议时会用到 outputs = ['alientek_top'] # 协议适用范围标签 tags = ['Embedded/industrial'] # 必须提供给协议的通道,没有他们解码器无法正常工作,允许为空 # id : 通道标识 # name : 通道名字 # desc :通道描述 channels = ( ) # 可选通道,如果相应的协议解码器没有可选通道, 则该元组允许为空,参数与 channels 一样 optional_channels = ( ) # 与协议相关的可选参数,可在协议配置界面自由设置的参数 # Python脚本中可通过 self.options[id] 取值 # id : 标识符 # desc : 描述信息 # default : 默认值 # valuse : 可选值(如果不提供,用户可自由配置) options = ( ) # 协议解析结果,有两个属性 # 由标识符字符串和人可度的描述字符串组成 annotations = ( ('data', 'Data'), ('error', 'Errror'), ) # 协议解析结果行,用于将多种解析结果组合在一起,有三个属性 # 参数一:标识符,不能与 annotations 元组中的标识符冲突 # 参数二:人可读的描述字符串 # 参数三:一个元组, 包含 annotations 元组中类的索引 annotation_rows = ( ('cmd data', 'CMD Data', (0,)), ('verify error', 'Verify ERROR', (1,)), ) # 二进制数据解析结果列表,可以为空 binary = ( ) # 在创建类的实例时被调用,并允许你对对象进行初始化设置 def __init__(self): self.reset() # 注册解码数据类型 def start(self): self.out_ann = self.register(srd.OUTPUT_ANN) # 重置参数 def reset(self): self.status = "Header" self.addr_pos = 0 self.data_save = [] # 可选函数,传递数据流的一些数据 # 目前 key 只能为 SRD_CONF_SAMPLERATE, 此时 value = 采样率(Hz) def metadata(self, key, value): pass # 输出解码数据 def putx(self, ss, es, data): self.put(ss, es, self.out_ann, data) # 解码函数,注意上层协议与下层协议该函数的输入参数不一样 # 这里 data 格式为 : ['type', value], 每个协议都不同,需要根据下层协议确定 def decode(self, ss, es, data): ptype, pdata = data if ptype == 'Frame error': self.status = "Header" self.data_save.clear() return if ptype != "data": return # 判断帧头数据 if self.status == "Header": if pdata != 0xA1: self.putx(ss, es, [1, ['Header Error']]) return self.data_save.append(pdata) self.addr_pos = ss self.status = "Function" return # 记录功能码数据 if self.status == "Function": self.data_save.append(pdata) self.cmd = pdata self.status = "Verify" return # 校验和 if self.status == "Verify": sum = 0 for temp in self.data_save: sum += temp sum &= 0xFF if sum != pdata: self.status = "Header" self.data_save.clear() self.putx(self.addr_pos, es, [1, ['Verify Error']]) return self.putx(self.addr_pos, es, [0, ['CMD:%d' % self.cmd]]) self.status = "Header" self.data_save.clear()