编写上层协议(基于UART协议)

libsigrokdecode 库的功能非常强大,它可以基于底层协议(例如:UART)增加上层协议(例如:Modbus)。 由于它已经支持很多种基础协议,在编写上层协议时可以大大减少我们的工作量,简化协议解码器的难度。

为了示例方便,我们先来定义一个简单的协议:

帧头

功能码

校验

1byte

1byte

1byte

帧结构 = 帧头 + 功能码 + 校验

  • 帧头 : 固定为 0xA1

  • 功能码 : 值为0-255, 表示CMD0 ~ CMD255

  • 校验 : 帧头与功能码的累加和,如果大于255,取低字节

我们基于 UART 基础协议来解码这个报文结构, 为了方便,我们基于上一节 UART 解码器做修改。 具体内容如下:

  • 注册 OUTPUT_PYTHON 数据输出类型,该数据可以将底层协议数据传输给上层协议

  • 规定 OUTPUT_PYTHON 数据格式,数据为一个列表,第一项表示数据类型字符串,第二项表示数据值

  • 添加 OUTPUT_PYTHON 输出信息,源码中只增加了两种数据类型: data , Frame error 其中: data 用来传输串口数据, Frame error 用来提示错误

修改后的 alientek 解码器源码如下:

# 导入 sigrokdecode 模块, 并重名为 srd
import sigrokdecode as srd

# 导入其他需要用到的模块
from common.srdhelper import bitpack
from math import floor, ceil

# 定义一个异常类
class SamplerateError(Exception):
    pass


# 定义一个 Decoder 类
class Decoder(srd.Decoder):

    # 必须为3
    api_version = 3

    # 协议的标识符, 必须唯一
    id = 'alientek'

    # 协议的名字, 不一定要求跟 id 一致
    name = 'ALIENTEK'

    # 协议的长名字
    longname = 'Universal Asynchronous Receiver/Transmitter'

    # 协议简介
    desc = 'Asynchronous, serial bus.'

    # 开源协议信息
    license = 'gplv2+'

    # 协议输入数据源, 默认为逻辑分析仪输的数据
    inputs = ['logic']

    # 协议输出数据源,在叠加协议时会用到
    outputs = ['alientek_uart']

    # 协议适用范围标签
    tags = ['Embedded/industrial']

    # 必须提供给协议的通道,没有他们解码器无法正常工作,允许为空
    # id : 通道标识
    # name : 通道名字
    # desc :通道描述
    channels = (
        {'id': 'rxtx', 'name': 'RX/TX', 'desc': 'UART transceive line'},
    )

    # 可选通道,如果相应的协议解码器没有可选通道, 则该元组允许为空,参数与 channels 一样
    optional_channels = (
    )

    # 与协议相关的可选参数,可在协议配置界面自由设置的参数
    # Python脚本中可通过 self.options[id] 取值
    # id : 标识符
    # desc : 描述信息
    # default : 默认值
    # valuse  : 可选值(如果不提供,用户可自由配置)
    options = (
        {'id': 'baudrate', 'desc': 'Baud rate', 'default': 115200},
        {'id': 'data_bits', 'desc': 'Data bits', 'default': 8,
            'values': (8,)},
        {'id': 'parity', 'desc': 'Parity', 'default': 'none',
            'values': ('none',)},
        {'id': 'stop_bits', 'desc': 'Stop bits', 'default': 1.0,
            'values': (1.0,)},
    )

    # 协议解析结果,有两个属性
    # 由标识符字符串和人可度的描述字符串组成
    annotations = (
        ('start', 'Start'),
        ('data', 'Data'),
        ('stop', 'Stop'),
        ('warning', 'Warning'),
    )

    # 协议解析结果行,用于将多种解析结果组合在一起,有三个属性
    # 参数一:标识符,不能与 annotations 元组中的标识符冲突
    # 参数二:人可读的描述字符串
    # 参数三:一个元组, 包含 annotations 元组中类的索引
    annotation_rows = (
        ('Data', 'RX/TX', (0, 1, 2,)),
        ('Warnings', 'Warnings', (3,)),
    )

    # 二进制数据解析结果列表,可以为空
    binary = (
    )

    # 在创建类的实例时被调用,并允许你对对象进行初始化设置
    def __init__(self):
        self.reset()

    # 注册解码数据类型
    def start(self):
        self.out_ann = self.register(srd.OUTPUT_ANN)
        self.out_python = self.register(srd.OUTPUT_PYTHON)

    # 重置参数
    def reset(self):
        self.samplerate = None
        self.state = 'WAIT FOR START BIT'
        self.frame_start = 0
        self.cur_data_bit = 0
        self.databits = []

    # 可选函数,传递数据流的一些数据
    # 目前 key 只能为 SRD_CONF_SAMPLERATE, 此时 value = 采样率(Hz)
    def metadata(self, key, value):
        if key == srd.SRD_CONF_SAMPLERATE:
            self.samplerate = value
            # 计算串口数据中一个位的位宽
            self.bit_width = float(self.samplerate) / float(self.options['baudrate'])

    # 获取采样点的位置, 这里以一半的位宽确定数据为的采样点
    def get_sample_point(self, bitnum):
        bitpos = self.frame_start + (self.bit_width - 1) / 2.0
        bitpos += bitnum * self.bit_width
        return bitpos

    # 输出开始位、结束位、解码错误信息
    def putg(self, data):
        s, halfbit = self.samplenum, self.bit_width / 2.0
        self.put(s - floor(halfbit), s + ceil(halfbit), self.out_ann, data)

    # 输出解码处的有效数据信息
    def putx(self, data):
        halfbit = self.bit_width / 2.0
        self.put(self.frame_start + floor(self.bit_width), self.samplenum + ceil(halfbit), self.out_ann, data)

    # 输出解码错误信息给上层协议
    def putp(self, data):
        s, halfbit = self.samplenum, self.bit_width / 2.0
        self.put(s - floor(halfbit), s + ceil(halfbit), self.out_python, data)

    # 输出解码信息给上层协议
    def putpx(self, data):
        halfbit = self.bit_width / 2.0
        self.put(self.frame_start + floor(self.bit_width), self.samplenum + ceil(halfbit), self.out_python, data)

    def decode(self):
        # 判断采样率是否设置
        if not self.samplerate:
            raise SamplerateError('Cannot decode without samplerate.')

        state = 'WAIT FOR START BIT'

        # 循环处理协议数据
        while True:

            # 1.等待起始位的下降沿
            if state == 'WAIT FOR START BIT':

                # 等待下降沿条件满足,才会返回
                self.wait({0: 'f'})

                # 条件匹配失败
                if self.matched[0] != True:
                    self.putp([3, ['Frame error', 0]])
                    self.putg([3, ['Frame error', 'Frame err', 'FE']])
                    continue

                state = 'GET START BIT'
                self.frame_start = self.samplenum

            # 2.获取起始位的电平值,必须为低电平
            if state == 'GET START BIT':
                want_num = ceil(self.get_sample_point(0))

                # 等待条件满足返回, wait 将返回当前位置的引脚电平值
                rxtx,  = self.wait( {'skip': want_num - self.samplenum } )

                # 条件匹配失败
                if self.matched[0] != True:
                    state = 'WAIT FOR START BIT'
                    self.putp([3, ['Frame error', 0]])
                    self.putg([3, ['Frame error', 'Frame err', 'FE']])
                    continue

                # 起始位电平值必须为低电平
                if rxtx != 0:
                    state = 'WAIT FOR START BIT'
                    self.putp([3, ['Frame error', 0]])
                    self.putg([3, ['Frame error', 'Frame err', 'FE']])
                    continue

                self.cur_data_bit = 0
                self.databits = []

                state = 'GET DATA BITS'

                # 输出解码信息, 解码信息由数据类型决定 :OUTPUT_ANN / OUTPUT_PYTHON / OUTPUT_BINARY / OUTPUT_META
                # 当类型为 OUTPUT_ANN 时,数据参数是一个包含两项的 Python 列表
                # 第一项是注释索引 (由解码器 annotations 中的项顺序确定)
                # 第二项是注释字符串列表,也就是上位机显示的解码数据,字符串应该是相同注释文本的更长和更短版本 (按长度排序, 最长的在前) , 上位机根据缩放级别显示不同的注释文本。
                self.putg([0, ['Start bit', 'Start', 'S']])

            # 3.获取8位数据位
            if state == 'GET DATA BITS':
                bitnum = self.cur_data_bit + 1
                want_num = ceil(self.get_sample_point(bitnum))
                rxtx,  = self.wait( {'skip': want_num - self.samplenum} )

                # 条件匹配失败
                if self.matched[0] != True:
                    state = 'WAIT FOR START BIT'
                    self.putp([3, ['Frame error', 0]])
                    self.putg([3, ['Frame error', 'Frame err', 'FE']])
                    continue

                # 等待所有数据位接收完毕, 8位数据位
                self.databits.append(rxtx)
                self.cur_data_bit += 1
                if self.cur_data_bit < 8 :
                    continue

                datavalue = bitpack(self.databits)

                state = 'GET STOP BITS'
                self.putpx(["data", datavalue])
                self.putx([1, ['0x%X' % datavalue]])

                self.databits = []

            # 4.获取1位停止位
            if state == 'GET STOP BITS':
                # 停止位的位置 : 1位起始位 + 8位数据位 + 1位停止位
                want_num = ceil(self.get_sample_point(9))
                rxtx,  = self.wait( {'skip': want_num - self.samplenum} )

                # 条件匹配失败
                if self.matched[0] != True:
                    state = 'WAIT FOR START BIT'
                    self.putp([3, ['Frame error', 0]])
                    self.putg([3, ['Frame error', 'Frame err', 'FE']])
                    continue

                # 停止位必须为高电平
                if rxtx != 1:
                    state = 'WAIT FOR START BIT'
                    self.putp([3, ['Frame error', 0]])
                    self.putg([3, ['Frame error', 'Frame err', 'FE']])
                    continue

                state = 'WAIT FOR START BIT'
                self.putg([2, ['Stop bit', 'Stop', 'T']])

有了底层协议数据后, 就可以新增一个上层协议了,具体步骤如下:

  • 1. 新增上层协议解码器文件夹及文件,在 ATK-Logic 软件安装目录 decoders 文件夹下新建 alientek_top 文件夹, 并在文件夹内新增两个文件: __init__.py 和 pd.py

  • 2. 编写 __init__.py 文件内容

    '''
    alientek_top 协议说明信息
    可以编写协议相关的一些介绍内容
    
    这里是alientek上层协议
    '''
    
    # 导入 pd.py 文件
    from .pd import Decoder
    
  • 3. 编写 __pd__.py 文件内容

    __pd__.py 文件结构与前面介绍的内容基本一致,这里只需要注意以下两点:

    1. inputs = ['alientek_uart'] 属性值, 这个值特别重要,上下层协议就是通过该值串起来的,上层协议中的 inputs 必须等于下层协议中的 outputs

    2. decode 函数,在不同模式下输入参数不一样,在上层协议中为 decode(self, startsample, endsample, data) ,详细介绍请参考 协议解码器 (Python脚本) 格式 章节内容

    上层协议是如何被调用的?

    ATK-Logic 软件在初始化协议库时,会自动根据 inputsoutputs 将上层协议与下层协议关联起来。 当下层协议通过 OUTPUT_PYTHON 输出数据时, libsigrokdecode 会自动调用上层协议中的 decode(self, startsample, endsample, data) 函数, 数据通过函数输入参数传给上层协议,我们在上层协议中分析处理数据,再将解码数据输出即可完成解码功能。

    源码如下:

    # 导入 sigrokdecode 模块, 并重名为 srd
    import sigrokdecode as srd
    
    
    # 定义一个 Decoder 类
    class Decoder(srd.Decoder):
    
        # 必须为3
        api_version = 3
    
        # 协议的标识符, 必须唯一
        id = 'alientek_top'
    
        # 协议的名字, 不一定要求跟 id 一致
        name = 'ALIENTEK_TOP'
    
        # 协议的长名字
        longname = 'ALIENTEK TOP Protocol Example'
    
        # 协议简介
        desc = 'Asynchronous, serial bus.'
    
        # 开源协议信息
        license = 'gplv2+'
    
        # 协议输入数据源, 必须填下层协议中的 outpus 参数
        inputs = ['alientek_uart']
    
        # 协议输出数据源,在叠加协议时会用到
        outputs = ['alientek_top']
    
        # 协议适用范围标签
        tags = ['Embedded/industrial']
    
        # 必须提供给协议的通道,没有他们解码器无法正常工作,允许为空
        # id : 通道标识
        # name : 通道名字
        # desc :通道描述
        channels = (
        )
    
        # 可选通道,如果相应的协议解码器没有可选通道, 则该元组允许为空,参数与 channels 一样
        optional_channels = (
        )
    
        # 与协议相关的可选参数,可在协议配置界面自由设置的参数
        # Python脚本中可通过 self.options[id] 取值
        # id : 标识符
        # desc : 描述信息
        # default : 默认值
        # valuse  : 可选值(如果不提供,用户可自由配置)
        options = (
        )
    
        # 协议解析结果,有两个属性
        # 由标识符字符串和人可度的描述字符串组成
        annotations = (
            ('data', 'Data'),
            ('error', 'Errror'),
        )
    
        # 协议解析结果行,用于将多种解析结果组合在一起,有三个属性
        # 参数一:标识符,不能与 annotations 元组中的标识符冲突
        # 参数二:人可读的描述字符串
        # 参数三:一个元组, 包含 annotations 元组中类的索引
        annotation_rows = (
            ('cmd data', 'CMD Data', (0,)),
            ('verify error', 'Verify ERROR', (1,)),
        )
    
        # 二进制数据解析结果列表,可以为空
        binary = (
        )
    
        # 在创建类的实例时被调用,并允许你对对象进行初始化设置
        def __init__(self):
            self.reset()
    
        # 注册解码数据类型
        def start(self):
            self.out_ann = self.register(srd.OUTPUT_ANN)
    
        # 重置参数
        def reset(self):
            self.status = "Header"
            self.addr_pos = 0
            self.data_save = []
    
        # 可选函数,传递数据流的一些数据
        # 目前 key 只能为 SRD_CONF_SAMPLERATE, 此时 value = 采样率(Hz)
        def metadata(self, key, value):
            pass
    
        # 输出解码数据
        def putx(self, ss, es, data):
            self.put(ss, es, self.out_ann, data)
    
        # 解码函数,注意上层协议与下层协议该函数的输入参数不一样
        # 这里 data 格式为 : ['type', value], 每个协议都不同,需要根据下层协议确定
        def decode(self, ss, es, data):
            ptype, pdata = data
    
            if ptype == 'Frame error':
                self.status = "Header"
                self.data_save.clear()
                return
    
            if ptype != "data":
                return
    
            # 判断帧头数据
            if self.status == "Header":
                if pdata != 0xA1:
                    self.putx(ss, es, [1, ['Header Error']])
                    return
    
                self.data_save.append(pdata)
                self.addr_pos = ss
                self.status = "Function"
                return
    
            # 记录功能码数据
            if self.status == "Function":
                self.data_save.append(pdata)
                self.cmd = pdata
                self.status = "Verify"
                return
    
            # 校验和
            if self.status == "Verify":
    
                sum = 0
                for temp in self.data_save:
                    sum += temp
    
                sum &= 0xFF
    
                if sum != pdata:
                    self.status = "Header"
                    self.data_save.clear()
                    self.putx(self.addr_pos, es, [1, ['Verify Error']])
                    return
    
                self.putx(self.addr_pos, es, [0, ['CMD:%d' % self.cmd]])
    
            self.status = "Header"
            self.data_save.clear()