编写 UART 解码器 =================================== 正点原子逻辑分析仪协议解码器 python 脚本源码在上位机安装目录的 ``decoders`` 文件夹下, 以协议名进行命名, ``__init__`` 和 ``__pd.py__`` 两个文件就是解码器的源码。 .. tip:: 1. 在写解码器时,如果上位机无法识别自定义的协议,大家可以打开上位机的 ``log`` 查看错误原因, ``log`` 位置: ``C:\Users\用户名\AppData\Local\Temp\ATK-Logic``, 其中 ``用户名`` 为电脑的用户名 2. 如果自定义的协议出现BUG, 大家可以将调试信息输出到解码的数据上 这里我们以 ``UART`` 协议 ``(8位数据, 无奇偶校验为, 1位停止位)`` 波特率可变为例来一步步的教大家如何新增一个解码器, 时序图如下 : .. image:: /img/uart.svg :alt: 无法显示图片 :align: center .. include:: style.rst - :blue:`1. 新建解码器文件夹和 python 脚本文件` 打开 ``ATK-Logic`` 软件的安装目录, 在 ``decoders`` 文件夹下新建一个文件夹并命名为 ``alientek`` 。 接着在新建的 ``alientek`` 文件夹下新建两个以 ``.py`` 结尾的文件 : ``__init__.py`` 和 ``pd.py``。 .. warning:: 由于 ``ATK-Logic`` 已经支持 ``uart`` 协议, 我们这里以 ``alientek`` 命名作为区分。 - :blue:`2. 编写 __init__.py 文件内容` **__init__.py** 文件包含两个信息 : 解码器的说明信息 和 导入 ``pd.py`` 文件 .. code-block:: python ''' alientek 协议说明信息 可以编写协议相关的一些介绍内容 ''' # 导入 pd.py 文件 from .pd import Decoder - :blue:`3. 编写 pd.py 文件内容` **pd.py** 文件包含两个信息 : 导入 ``sigrokdecode`` 模块和定义一个 ``Decoder`` 类, 以下是一个通用模板, 直接复制到 ``pd.py`` 文件中即可 .. code-block:: python # 导入 sigrokdecode 模块, 并重名为 srd import sigrokdecode as srd # 定义一个 Decoder 类 class Decoder(srd.Decoder): api_version = 3 id = 'alientek' name = 'ALIENTEK' longname = 'The (long) name of the decoder.' desc = 'A freeform one-line description of the decoder.' license = 'gplv2+' inputs = ['logic'] outputs = ['output'] tags = ['A list of strings that make this protocol decoder be listed in the same categories', 'temp'] channels = ( ) optional_channels = ( ) options = ( ) annotations = ( ) annotation_rows = ( ) binary = ( ) def __init__(self): self.reset() def start(self): pass def reset(self): pass def decode(self): pass def metadata(self, key, value): pass if key == srd.SRD_CONF_SAMPLERATE: print("SRD_CONF_SAMPLERATE key = ", key); 将 ``__init__.py`` 和 ``pd.py`` 两个文件创建成功后, 重新打开上位机, 在协议解码列表中可以找到 ``ALIENTEK`` 名字的协议, 说明以上步骤正确无误。 .. warning:: 由于提供的模板没有任何参数, 所以在参数界面看不到任何配置信息。 - :blue:`4. 补充 pd.py 文件中 Decoder 类的变量信息` **pd.py** 文件中的 ``Decoder`` 类包含以下内容,详细信息请参考备注说明 .. code-block:: python # 必须为3 api_version = 3 # 协议的标识符, 必须唯一 id = 'alientek' # 协议的名字, 不一定要求跟 id 一致 name = 'ALIENTEK' # 协议的长名字 longname = 'Universal Asynchronous Receiver/Transmitter' # 协议简介 desc = 'Asynchronous, serial bus.' # 开源协议信息 license = 'gplv2+' # 协议输入数据源, 默认为逻辑分析仪输的数据 inputs = ['logic'] # 协议输出数据源,在叠加协议时会用到 outputs = ['alientek_uart'] # 协议适用范围标签 tags = ['Embedded/industrial'] 为协议编写通道信息,由于串口接收与发送数据线格式一样,我们这里共用一根数据线 .. code-block:: python # 必须提供给协议的通道,没有他们解码器无法正常工作 # id : 通道标识 # name : 通道名字 # desc :通道描述 channels = ( {'id': 'rxtx', 'name': 'RX/TX', 'desc': 'UART transceive line'}, ) # 可选通道,如果相应的协议解码器没有可选通道, 则该元组允许为空,参数与 channels 一样 optional_channels = ( ) 为协议编写可选参数信息,串口参数为 ``(8位数据, 无奇偶校验为, 1位停止位)`` 波特率可自由设置,源码如下: .. code-block:: python # 与协议相关的可选参数,可在协议配置界面自由设置的参数 # Python脚本中可通过 self.options[id] 取值 # id : 标识符 # desc : 描述信息 # default : 默认值 # valuse : 可选值(如果不提供,用户可自由配置) options = ( {'id': 'baudrate', 'desc': 'Baud rate', 'default': 115200}, {'id': 'data_bits', 'desc': 'Data bits', 'default': 8, 'values': (8)}, {'id': 'parity', 'desc': 'Parity', 'default': 'none', 'values': ('none')}, {'id': 'stop_bits', 'desc': 'Stop bits', 'default': 1.0, 'values': (1.0)}, ) 为协议编写解析结果信息 .. code-block:: python # 协议解析结果,有两个属性 # 由标识符字符串和人可度的描述字符串组成 annotations = ( ('start', 'Start'), ('data', 'Data'), ('stop', 'Stop'), ('warning', 'Warning'), ) # 协议解析结果行,用于将多种解析结果组合在一起,有三个属性 # 参数一:标识符,不能与 annotations 元组中的标识符冲突 # 参数二:人可读的描述字符串 # 参数三:一个元组, 包含 annotations 元组中类的索引 annotation_rows = ( ('Data', 'RX/TX', (0, 1, 2,)), ('Warnings', 'Warnings', (3,)), ) # 二进制数据解析结果列表,可以为空 binary = ( ) - :blue:`4. 编写 uart 协议解析逻辑` 这里根据源码实例讲解一下该解码器的工作流程: - 上位机获取解码器时会自动实例化 ``Decoder类`` ,如果存在 ``__init__(self)`` 函数将会自动调用该函数,源码中对变量进行初始化操作 - 调用 ``metadata(self, key, value)`` 函数,根据当前采样率计算串口协议中的位宽,以便后续使用 - 调用解码器中的 ``start(self)`` 函数,注册解码数据类型,这里只注册:OUTPUT_ANN - 调用 ``decode(self)`` 函数,该函数会循环分析所有样本数据,该函数中最重要的一个函数就是 ``wait`` 函数,底层会根据它的输入条件 判断样本数据是否与条件匹配,如果匹配成功,则会立即返回,如果匹配失败, ``decode`` 函数将直接退出 示例源码提供的串口数据解码思路如下: - 根据下降沿识别起始位的位置 - 以一半位宽确定起始位的电平值,且电平值必须为低电平 - 以一半位宽确定8位数据位的电平值,并将其组合成一个字节数据,以十六进制字符串输出 - 以一半位宽确定停止位的电平值,且电平值必须为高电平 .. code-block:: python # 导入 sigrokdecode 模块, 并重名为 srd import sigrokdecode as srd # 导入其他需要用到的模块 from common.srdhelper import bitpack from math import floor, ceil # 定义一个异常类 class SamplerateError(Exception): pass # 定义一个 Decoder 类 class Decoder(srd.Decoder): # 必须为3 api_version = 3 # 协议的标识符, 必须唯一 id = 'alientek' # 协议的名字, 不一定要求跟 id 一致 name = 'ALIENTEK' # 协议的长名字 longname = 'Universal Asynchronous Receiver/Transmitter' # 协议简介 desc = 'Asynchronous, serial bus.' # 开源协议信息 license = 'gplv2+' # 协议输入数据源, 默认为逻辑分析仪输的数据 inputs = ['logic'] # 协议输出数据源,在叠加协议时会用到 outputs = ['alientek_uart'] # 协议适用范围标签 tags = ['Embedded/industrial'] # 必须提供给协议的通道,没有他们解码器无法正常工作,允许为空 # id : 通道标识 # name : 通道名字 # desc :通道描述 channels = ( {'id': 'rxtx', 'name': 'RX/TX', 'desc': 'UART transceive line'}, ) # 可选通道,如果相应的协议解码器没有可选通道, 则该元组允许为空,参数与 channels 一样 optional_channels = ( ) # 与协议相关的可选参数,可在协议配置界面自由设置的参数 # Python脚本中可通过 self.options[id] 取值 # id : 标识符 # desc : 描述信息 # default : 默认值 # valuse : 可选值(如果不提供,用户可自由配置) options = ( {'id': 'baudrate', 'desc': 'Baud rate', 'default': 115200}, {'id': 'data_bits', 'desc': 'Data bits', 'default': 8, 'values': (8,)}, {'id': 'parity', 'desc': 'Parity', 'default': 'none', 'values': ('none',)}, {'id': 'stop_bits', 'desc': 'Stop bits', 'default': 1.0, 'values': (1.0,)}, ) # 协议解析结果,有两个属性 # 由标识符字符串和人可度的描述字符串组成 annotations = ( ('start', 'Start'), ('data', 'Data'), ('stop', 'Stop'), ('warning', 'Warning'), ) # 协议解析结果行,用于将多种解析结果组合在一起,有三个属性 # 参数一:标识符,不能与 annotations 元组中的标识符冲突 # 参数二:人可读的描述字符串 # 参数三:一个元组, 包含 annotations 元组中类的索引 annotation_rows = ( ('Data', 'RX/TX', (0, 1, 2,)), ('Warnings', 'Warnings', (3,)), ) # 二进制数据解析结果列表,可以为空 binary = ( ) # 在创建类的实例时被调用,并允许你对对象进行初始化设置 def __init__(self): self.reset() # 注册解码数据类型 def start(self): self.out_ann = self.register(srd.OUTPUT_ANN) # 重置参数 def reset(self): self.samplerate = None self.state = 'WAIT FOR START BIT' self.frame_start = 0 self.cur_data_bit = 0 self.databits = [] # 可选函数,传递数据流的一些数据 # 目前 key 只能为 SRD_CONF_SAMPLERATE, 此时 value = 采样率(Hz) def metadata(self, key, value): if key == srd.SRD_CONF_SAMPLERATE: self.samplerate = value # 计算串口数据中一个位的位宽 self.bit_width = float(self.samplerate) / float(self.options['baudrate']) # 获取采样点的位置, 这里以一半的位宽确定数据为的采样点 def get_sample_point(self, bitnum): bitpos = self.frame_start + (self.bit_width - 1) / 2.0 bitpos += bitnum * self.bit_width return bitpos # 输出开始位、结束位、解码错误信息 def putg(self, data): s, halfbit = self.samplenum, self.bit_width / 2.0 self.put(s - floor(halfbit), s + ceil(halfbit), self.out_ann, data) # 输出解码处的有效数据信息 def putx(self, data): halfbit = self.bit_width / 2.0 self.put(self.frame_start + floor(self.bit_width), self.samplenum + ceil(halfbit), self.out_ann, data) def decode(self): # 判断采样率是否设置 if not self.samplerate: raise SamplerateError('Cannot decode without samplerate.') state = 'WAIT FOR START BIT' # 循环处理协议数据 while True: # 1.等待起始位的下降沿 if state == 'WAIT FOR START BIT': # 等待下降沿条件满足,才会返回 self.wait({0: 'f'}) # 条件匹配失败 if self.matched[0] != True: self.putg([3, ['Frame error', 'Frame err', 'FE']]) continue state = 'GET START BIT' self.frame_start = self.samplenum # 2.获取起始位的电平值,必须为低电平 if state == 'GET START BIT': want_num = ceil(self.get_sample_point(0)) # 等待条件满足返回, wait 将返回当前位置的引脚电平值 rxtx, = self.wait( {'skip': want_num - self.samplenum } ) # 条件匹配失败 if self.matched[0] != True: state = 'WAIT FOR START BIT' self.putg([3, ['Frame error', 'Frame err', 'FE']]) continue # 起始位电平值必须为低电平 if rxtx != 0: state = 'WAIT FOR START BIT' self.putg([3, ['Frame error', 'Frame err', 'FE']]) continue self.cur_data_bit = 0 self.databits = [] state = 'GET DATA BITS' # 输出解码信息, 解码信息由数据类型决定 :OUTPUT_ANN / OUTPUT_PYTHON / OUTPUT_BINARY / OUTPUT_META # 当类型为 OUTPUT_ANN 时,数据参数是一个包含两项的 Python 列表 # 第一项是注释索引 (由解码器 annotations 中的项顺序确定) # 第二项是注释字符串列表,也就是上位机显示的解码数据,字符串应该是相同注释文本的更长和更短版本 (按长度排序, 最长的在前) , 上位机根据缩放级别显示不同的注释文本。 self.putg([0, ['Start bit', 'Start', 'S']]) # 3.获取8位数据位 if state == 'GET DATA BITS': bitnum = self.cur_data_bit + 1 want_num = ceil(self.get_sample_point(bitnum)) rxtx, = self.wait( {'skip': want_num - self.samplenum} ) # 条件匹配失败 if self.matched[0] != True: state = 'WAIT FOR START BIT' self.putg([3, ['Frame error', 'Frame err', 'FE']]) continue # 等待所有数据位接收完毕, 8位数据位 self.databits.append(rxtx) self.cur_data_bit += 1 if self.cur_data_bit < 8 : continue datavalue = bitpack(self.databits) state = 'GET STOP BITS' self.putx([1, ['0x%X' % datavalue]]) self.databits = [] # 4.获取1位停止位 if state == 'GET STOP BITS': # 停止位的位置 : 1位起始位 + 8位数据位 + 1位停止位 want_num = ceil(self.get_sample_point(9)) rxtx, = self.wait( {'skip': want_num - self.samplenum} ) # 条件匹配失败 if self.matched[0] != True: state = 'WAIT FOR START BIT' self.putg([3, ['Frame error', 'Frame err', 'FE']]) continue # 停止位必须为高电平 if rxtx != 1: state = 'WAIT FOR START BIT' self.putg([3, ['Frame error', 'Frame err', 'FE']]) continue state = 'WAIT FOR START BIT' self.putg([2, ['Stop bit', 'Stop', 'T']])