编写 UART 解码器

正点原子逻辑分析仪协议解码器 python 脚本源码在上位机安装目录的 decoders 文件夹下, 以协议名进行命名, __init____pd.py__ 两个文件就是解码器的源码。

小技巧

  1. 在写解码器时,如果上位机无法识别自定义的协议,大家可以打开上位机的 log 查看错误原因, log 位置: C:\Users\用户名\AppData\Local\Temp\ATK-Logic, 其中 用户名 为电脑的用户名

  2. 如果自定义的协议出现BUG, 大家可以将调试信息输出到解码的数据上

这里我们以 UART 协议 (8位数据, 无奇偶校验为, 1位停止位) 波特率可变为例来一步步的教大家如何新增一个解码器, 时序图如下 :

无法显示图片
  • 1. 新建解码器文件夹和 python 脚本文件

    打开 ATK-Logic 软件的安装目录, 在 decoders 文件夹下新建一个文件夹并命名为 alientek 。 接着在新建的 alientek 文件夹下新建两个以 .py 结尾的文件 : __init__.pypd.py

    警告

    由于 ATK-Logic 已经支持 uart 协议, 我们这里以 alientek 命名作为区分。

  • 2. 编写 __init__.py 文件内容

    __init__.py 文件包含两个信息 : 解码器的说明信息 和 导入 pd.py 文件

    '''
    alientek 协议说明信息
    可以编写协议相关的一些介绍内容
    '''
    
    # 导入 pd.py 文件
    from .pd import Decoder
    
  • 3. 编写 pd.py 文件内容

    pd.py 文件包含两个信息 : 导入 sigrokdecode 模块和定义一个 Decoder 类, 以下是一个通用模板, 直接复制到 pd.py 文件中即可

    # 导入 sigrokdecode 模块, 并重名为 srd
    import sigrokdecode as srd
    
    # 定义一个 Decoder 类
    class Decoder(srd.Decoder):
    
        api_version = 3
        id = 'alientek'
        name = 'ALIENTEK'
        longname = 'The (long) name of the decoder.'
        desc = 'A freeform one-line description of the decoder.'
        license = 'gplv2+'
        inputs = ['logic']
        outputs = ['output']
        tags = ['A list of strings that make this protocol decoder be listed in the same categories', 'temp']
    
        channels = (
        )
    
        optional_channels = (
        )
    
        options = (
        )
    
        annotations = (
        )
    
        annotation_rows = (
        )
    
        binary = (
        )
    
        def __init__(self):
            self.reset()
    
        def start(self):
            pass
    
        def reset(self):
            pass
    
        def decode(self):
            pass
    
        def metadata(self, key, value):
            pass
    
            if key == srd.SRD_CONF_SAMPLERATE:
                print("SRD_CONF_SAMPLERATE key = ", key);
    

__init__.pypd.py 两个文件创建成功后, 重新打开上位机, 在协议解码列表中可以找到 ALIENTEK 名字的协议, 说明以上步骤正确无误。

警告

由于提供的模板没有任何参数, 所以在参数界面看不到任何配置信息。

  • 4. 补充 pd.py 文件中 Decoder 类的变量信息

    pd.py 文件中的 Decoder 类包含以下内容,详细信息请参考备注说明

    # 必须为3
    api_version = 3
    
    # 协议的标识符, 必须唯一
    id = 'alientek'
    
    # 协议的名字, 不一定要求跟 id 一致
    name = 'ALIENTEK'
    
    # 协议的长名字
    longname = 'Universal Asynchronous Receiver/Transmitter'
    
    # 协议简介
    desc = 'Asynchronous, serial bus.'
    
    # 开源协议信息
    license = 'gplv2+'
    
    # 协议输入数据源, 默认为逻辑分析仪输的数据
    inputs = ['logic']
    
    # 协议输出数据源,在叠加协议时会用到
    outputs = ['alientek_uart']
    
    # 协议适用范围标签
    tags = ['Embedded/industrial']
    

    为协议编写通道信息,由于串口接收与发送数据线格式一样,我们这里共用一根数据线

    # 必须提供给协议的通道,没有他们解码器无法正常工作
    # id : 通道标识
    # name : 通道名字
    # desc :通道描述
    channels = (
        {'id': 'rxtx', 'name': 'RX/TX', 'desc': 'UART transceive line'},
    )
    
    # 可选通道,如果相应的协议解码器没有可选通道, 则该元组允许为空,参数与 channels 一样
    optional_channels = (
    )
    

    为协议编写可选参数信息,串口参数为 (8位数据, 无奇偶校验为, 1位停止位) 波特率可自由设置,源码如下:

    # 与协议相关的可选参数,可在协议配置界面自由设置的参数
    # Python脚本中可通过 self.options[id] 取值
    # id : 标识符
    # desc : 描述信息
    # default : 默认值
    # valuse  : 可选值(如果不提供,用户可自由配置)
    options = (
        {'id': 'baudrate', 'desc': 'Baud rate', 'default': 115200},
        {'id': 'data_bits', 'desc': 'Data bits', 'default': 8,
            'values': (8)},
        {'id': 'parity', 'desc': 'Parity', 'default': 'none',
            'values': ('none')},
        {'id': 'stop_bits', 'desc': 'Stop bits', 'default': 1.0,
            'values': (1.0)},
    )
    

    为协议编写解析结果信息

    # 协议解析结果,有两个属性
    # 由标识符字符串和人可度的描述字符串组成
    annotations = (
        ('start', 'Start'),
        ('data', 'Data'),
        ('stop', 'Stop'),
        ('warning', 'Warning'),
    )
    
    # 协议解析结果行,用于将多种解析结果组合在一起,有三个属性
    # 参数一:标识符,不能与 annotations 元组中的标识符冲突
    # 参数二:人可读的描述字符串
    # 参数三:一个元组, 包含 annotations 元组中类的索引
    annotation_rows = (
        ('Data', 'RX/TX', (0, 1, 2,)),
        ('Warnings', 'Warnings', (3,)),
    )
    
    # 二进制数据解析结果列表,可以为空
    binary = (
    )
    
  • 4. 编写 uart 协议解析逻辑

    这里根据源码实例讲解一下该解码器的工作流程:

    • 上位机获取解码器时会自动实例化 Decoder类 ,如果存在 __init__(self) 函数将会自动调用该函数,源码中对变量进行初始化操作

    • 调用 metadata(self, key, value) 函数,根据当前采样率计算串口协议中的位宽,以便后续使用

    • 调用解码器中的 start(self) 函数,注册解码数据类型,这里只注册:OUTPUT_ANN

    • 调用 decode(self) 函数,该函数会循环分析所有样本数据,该函数中最重要的一个函数就是 wait 函数,底层会根据它的输入条件 判断样本数据是否与条件匹配,如果匹配成功,则会立即返回,如果匹配失败, decode 函数将直接退出

    示例源码提供的串口数据解码思路如下:

    • 根据下降沿识别起始位的位置

    • 以一半位宽确定起始位的电平值,且电平值必须为低电平

    • 以一半位宽确定8位数据位的电平值,并将其组合成一个字节数据,以十六进制字符串输出

    • 以一半位宽确定停止位的电平值,且电平值必须为高电平

    # 导入 sigrokdecode 模块, 并重名为 srd
    import sigrokdecode as srd
    
    # 导入其他需要用到的模块
    from common.srdhelper import bitpack
    from math import floor, ceil
    
    # 定义一个异常类
    class SamplerateError(Exception):
        pass
    
    # 定义一个 Decoder 类
    class Decoder(srd.Decoder):
    
        # 必须为3
        api_version = 3
    
        # 协议的标识符, 必须唯一
        id = 'alientek'
    
        # 协议的名字, 不一定要求跟 id 一致
        name = 'ALIENTEK'
    
        # 协议的长名字
        longname = 'Universal Asynchronous Receiver/Transmitter'
    
        # 协议简介
        desc = 'Asynchronous, serial bus.'
    
        # 开源协议信息
        license = 'gplv2+'
    
        # 协议输入数据源, 默认为逻辑分析仪输的数据
        inputs = ['logic']
    
        # 协议输出数据源,在叠加协议时会用到
        outputs = ['alientek_uart']
    
        # 协议适用范围标签
        tags = ['Embedded/industrial']
    
        # 必须提供给协议的通道,没有他们解码器无法正常工作,允许为空
        # id : 通道标识
        # name : 通道名字
        # desc :通道描述
        channels = (
            {'id': 'rxtx', 'name': 'RX/TX', 'desc': 'UART transceive line'},
        )
    
        # 可选通道,如果相应的协议解码器没有可选通道, 则该元组允许为空,参数与 channels 一样
        optional_channels = (
        )
    
        # 与协议相关的可选参数,可在协议配置界面自由设置的参数
        # Python脚本中可通过 self.options[id] 取值
        # id : 标识符
        # desc : 描述信息
        # default : 默认值
        # valuse  : 可选值(如果不提供,用户可自由配置)
        options = (
            {'id': 'baudrate', 'desc': 'Baud rate', 'default': 115200},
            {'id': 'data_bits', 'desc': 'Data bits', 'default': 8,
                'values': (8,)},
            {'id': 'parity', 'desc': 'Parity', 'default': 'none',
                'values': ('none',)},
            {'id': 'stop_bits', 'desc': 'Stop bits', 'default': 1.0,
                'values': (1.0,)},
        )
    
        # 协议解析结果,有两个属性
        # 由标识符字符串和人可度的描述字符串组成
        annotations = (
            ('start', 'Start'),
            ('data', 'Data'),
            ('stop', 'Stop'),
            ('warning', 'Warning'),
        )
    
        # 协议解析结果行,用于将多种解析结果组合在一起,有三个属性
        # 参数一:标识符,不能与 annotations 元组中的标识符冲突
        # 参数二:人可读的描述字符串
        # 参数三:一个元组, 包含 annotations 元组中类的索引
        annotation_rows = (
            ('Data', 'RX/TX', (0, 1, 2,)),
            ('Warnings', 'Warnings', (3,)),
        )
    
        # 二进制数据解析结果列表,可以为空
        binary = (
        )
    
        # 在创建类的实例时被调用,并允许你对对象进行初始化设置
        def __init__(self):
            self.reset()
    
        # 注册解码数据类型
        def start(self):
            self.out_ann = self.register(srd.OUTPUT_ANN)
    
        # 重置参数
        def reset(self):
            self.samplerate = None
            self.state = 'WAIT FOR START BIT'
            self.frame_start = 0
            self.cur_data_bit = 0
            self.databits = []
    
        # 可选函数,传递数据流的一些数据
        # 目前 key 只能为 SRD_CONF_SAMPLERATE, 此时 value = 采样率(Hz)
        def metadata(self, key, value):
            if key == srd.SRD_CONF_SAMPLERATE:
                self.samplerate = value
                # 计算串口数据中一个位的位宽
                self.bit_width = float(self.samplerate) / float(self.options['baudrate'])
    
        # 获取采样点的位置, 这里以一半的位宽确定数据为的采样点
        def get_sample_point(self, bitnum):
            bitpos = self.frame_start + (self.bit_width - 1) / 2.0
            bitpos += bitnum * self.bit_width
            return bitpos
    
        # 输出开始位、结束位、解码错误信息
        def putg(self, data):
            s, halfbit = self.samplenum, self.bit_width / 2.0
            self.put(s - floor(halfbit), s + ceil(halfbit), self.out_ann, data)
    
        # 输出解码处的有效数据信息
        def putx(self, data):
            halfbit = self.bit_width / 2.0
            self.put(self.frame_start + floor(self.bit_width), self.samplenum + ceil(halfbit), self.out_ann, data)
    
        def decode(self):
            # 判断采样率是否设置
            if not self.samplerate:
                raise SamplerateError('Cannot decode without samplerate.')
    
            state = 'WAIT FOR START BIT'
    
            # 循环处理协议数据
            while True:
    
                # 1.等待起始位的下降沿
                if state == 'WAIT FOR START BIT':
    
                    # 等待下降沿条件满足,才会返回
                    self.wait({0: 'f'})
    
                    # 条件匹配失败
                    if self.matched[0] != True:
                        self.putg([3, ['Frame error', 'Frame err', 'FE']])
                        continue
    
                    state = 'GET START BIT'
                    self.frame_start = self.samplenum
    
                # 2.获取起始位的电平值,必须为低电平
                if state == 'GET START BIT':
                    want_num = ceil(self.get_sample_point(0))
    
                    # 等待条件满足返回, wait 将返回当前位置的引脚电平值
                    rxtx,  = self.wait( {'skip': want_num - self.samplenum } )
    
                    # 条件匹配失败
                    if self.matched[0] != True:
                        state = 'WAIT FOR START BIT'
                        self.putg([3, ['Frame error', 'Frame err', 'FE']])
                        continue
    
                    # 起始位电平值必须为低电平
                    if rxtx != 0:
                        state = 'WAIT FOR START BIT'
                        self.putg([3, ['Frame error', 'Frame err', 'FE']])
                        continue
    
                    self.cur_data_bit = 0
                    self.databits = []
    
                    state = 'GET DATA BITS'
    
                    # 输出解码信息, 解码信息由数据类型决定 :OUTPUT_ANN / OUTPUT_PYTHON / OUTPUT_BINARY / OUTPUT_META
                    # 当类型为 OUTPUT_ANN 时,数据参数是一个包含两项的 Python 列表
                    # 第一项是注释索引 (由解码器 annotations 中的项顺序确定)
                    # 第二项是注释字符串列表,也就是上位机显示的解码数据,字符串应该是相同注释文本的更长和更短版本 (按长度排序, 最长的在前) , 上位机根据缩放级别显示不同的注释文本。
                    self.putg([0, ['Start bit', 'Start', 'S']])
    
                # 3.获取8位数据位
                if state == 'GET DATA BITS':
                    bitnum = self.cur_data_bit + 1
                    want_num = ceil(self.get_sample_point(bitnum))
                    rxtx,  = self.wait( {'skip': want_num - self.samplenum} )
    
                    # 条件匹配失败
                    if self.matched[0] != True:
                        state = 'WAIT FOR START BIT'
                        self.putg([3, ['Frame error', 'Frame err', 'FE']])
                        continue
    
                    # 等待所有数据位接收完毕, 8位数据位
                    self.databits.append(rxtx)
                    self.cur_data_bit += 1
                    if self.cur_data_bit < 8 :
                        continue
    
                    datavalue = bitpack(self.databits)
    
                    state = 'GET STOP BITS'
                    self.putx([1, ['0x%X' % datavalue]])
    
                    self.databits = []
    
                # 4.获取1位停止位
                if state == 'GET STOP BITS':
                    # 停止位的位置 : 1位起始位 + 8位数据位 + 1位停止位
                    want_num = ceil(self.get_sample_point(9))
                    rxtx,  = self.wait( {'skip': want_num - self.samplenum} )
    
                    # 条件匹配失败
                    if self.matched[0] != True:
                        state = 'WAIT FOR START BIT'
                        self.putg([3, ['Frame error', 'Frame err', 'FE']])
                        continue
    
                    # 停止位必须为高电平
                    if rxtx != 1:
                        state = 'WAIT FOR START BIT'
                        self.putg([3, ['Frame error', 'Frame err', 'FE']])
                        continue
    
                    state = 'WAIT FOR START BIT'
                    self.putg([2, ['Stop bit', 'Stop', 'T']])