## ## This file is part of the libsigrokdecode project. ## ## Copyright (C) 2015 Google, Inc ## Copyright (C) 2018 davidanger ## Copyright (C) 2018 Peter Hazenberg ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, see . ## import sigrokdecode as srd import struct import zlib # for crc32 # BMC encoding with a 600kHz datarate UI_US = 1000000/600000.0 # Threshold to discriminate half-1 from 0 in Binary Mark Conding THRESHOLD_US = (UI_US + 2 * UI_US) / 2 # Control Message type(0-24) CTRL_TYPES = { 0: 'reserved', 1: 'GOOD CRC', 2: 'GOTO MIN', 3: 'ACCEPT', 4: 'REJECT', 5: 'PING', 6: 'PS RDY', 7: 'GET SOURCE CAP', 8: 'GET SINK CAP', 9: 'DR_Swap', 10: 'PR_Swap', 11: 'VCONN_Swap', 12: 'Wait', 13: 'Soft_Reset', 14: 'Data_Reset', 15: 'Data_Reset_Complete', 16: 'Not_Supported', 17: 'Get_Source_Cap_Extended', 18: 'Get_Status', 19: 'FR_Swap', 20: 'Get_PPS_Status', 21: 'Get_Country_Codes', 22: 'Get_Sink_Cap_Extended', 23: 'Get_Source_Info', 24: 'Get_Revision' } # Data message type(1-15) DATA_TYPES = { 1: 'SOURCE CAP', 2: 'REQUEST', 3: 'BIST', 4: 'SINK CAP', 5: 'Battery_Status', 6: 'Alert', 7: 'Get_Country_Info', 8: 'Enter_USB', 9: 'EPR_Request', 10: 'EPR_Mode', 11: 'Source_Info', 12: 'Revision', 15: 'VDM' } # Extended message type(1-30) EXT_TYPES = { 1: 'Source_Capabilities_Extended', 2: 'Status', 3: 'Get_Battery_Cap', 4: 'Get_Battery_Status', 5: 'Battery_Capabilities', 6: 'Get_Manufacturer_Info', 7: 'Manufacturer_Info', 8: 'Security_Request', 9: 'Security_Response', 10: 'Firmware_Update_Request', 11: 'Firmware_Update_Response', 12: 'PPS_Status', 13: 'Country_Info', 14: 'Country_Codes', 15: 'Sink_Capabilities_Extended',# 16: 'Extended_Control', 17: 'EPR_Source_Capabilities', 18: 'EPR_Sink_Capabilities',# 30: 'Vendor_Defined_Extended' } # 4b5b encoding of the symbols DEC4B5B = [ 0x10, # Error 00000 0x10, # Error 00001 0x10, # Error 00010 0x10, # Error 00011 0x10, # Error 00100 0x10, # Error 00101 0x13, # Sync-3 00110 0x14, # RST-1 00111 0x10, # Error 01000 0x01, # 1 = 0001 01001 0x04, # 4 = 0100 01010 0x05, # 5 = 0101 01011 0x10, # Error 01100 0x16, # EOP 01101 0x06, # 6 = 0110 01110 0x07, # 7 = 0111 01111 0x10, # Error 10000 0x12, # Sync-2 10001 0x08, # 8 = 1000 10010 0x09, # 9 = 1001 10011 0x02, # 2 = 0010 10100 0x03, # 3 = 0011 10101 0x0A, # A = 1010 10110 0x0B, # B = 1011 10111 0x11, # Sync-1 11000 0x15, # RST-2 11001 0x0C, # C = 1100 11010 0x0D, # D = 1101 11011 0x0E, # E = 1110 11100 0x0F, # F = 1111 11101 0x00, # 0 = 0000 11110 0x10, # Error 11111 ] SYM_ERR = 0x10 SYNC1 = 0x11 SYNC2 = 0x12 SYNC3 = 0x13 RST1 = 0x14 RST2 = 0x15 EOP = 0x16 SYNC_CODES = [SYNC1, SYNC2, SYNC3] HRST_CODES = [RST1, RST1, RST1, RST2] SOP_SEQUENCES = [ (SYNC1, SYNC1, SYNC1, SYNC2), (SYNC1, SYNC1, SYNC3, SYNC3), (SYNC1, SYNC3, SYNC1, SYNC3), (SYNC1, RST2, RST2, SYNC3), (SYNC1, RST2, SYNC3, SYNC2), (RST1, SYNC1, RST1, SYNC3), (RST1, RST1, RST1, RST2), ] START_OF_PACKETS = { SOP_SEQUENCES[0]: 'SOP', SOP_SEQUENCES[1]: "SOP'", SOP_SEQUENCES[2]: 'SOP"', SOP_SEQUENCES[3]: "SOP' Debug", SOP_SEQUENCES[4]: 'SOP" Debug', SOP_SEQUENCES[5]: 'Cable Reset', SOP_SEQUENCES[6]: 'Hard Reset', } SYM_NAME = [ ['0x0', '0'], ['0x1', '1'], ['0x2', '2'], ['0x3', '3'], ['0x4', '4'], ['0x5', '5'], ['0x6', '6'], ['0x7', '7'], ['0x8', '8'], ['0x9', '9'], ['0xA', 'A'], ['0xB', 'B'], ['0xC', 'C'], ['0xD', 'D'], ['0xE', 'E'], ['0xF', 'F'], ['ERROR', 'X'], ['SYNC-1', 'S1'], ['SYNC-2', 'S2'], ['SYNC-3', 'S3'], ['RST-1', 'R1'], ['RST-2', 'R2'], ['EOP', '#'], ] RDO_FLAGS = { (1 << 23): 'unchunked', (1 << 24): 'no_suspend', (1 << 25): 'comm_cap', (1 << 26): 'cap_mismatch', (1 << 27): 'give_back' } BIST_MODES = { 0: 'Receiver', 1: 'Transmit', 2: 'Counters', 3: 'Carrier 0', 4: 'Carrier 1', 5: 'Carrier 2', 6: 'Carrier 3', 7: 'Eye', } VDM_CMDS = { 1: 'Disc Ident', 2: 'Disc SVID', 3: 'Disc Mode', 4: 'Enter Mode', 5: 'Exit Mode', 6: 'Attention', # 16..31: SVID Specific Commands # DisplayPort Commands 16: 'DP Status', 17: 'DP Configure', } VDM_ACK = ['REQ', 'ACK', 'NAK', 'BSY'] EPR_MODE_ACTION = { 1: 'Enter', 2: 'Enter Acknowledged', 3: 'Enter Succeeded', 4: 'Enter Failed', 5: 'Exit' } EPR_MODE_DATA = { 0: 'Unknown cause', 1: 'Cable not EPR capable', 2: 'Source failed to become Vconn source', 3: 'EPR Mode Capable bit not set in RDO', 4: 'Source unable to enter EPR Mode at this time', 5: 'EPR Mode Capable bit not set in PDO' } EXT_CONTROL_MSG_TYPES = { 1: 'EPR Get Source_Cap', 2: 'EPR Get Sink Cap', 3: 'EPR KeepAlive', 4: 'EPR KeepAlive Ack' } class SamplerateError(Exception): pass class Decoder(srd.Decoder): api_version = 3 id = 'usb_power_delivery' name = 'USB PD' longname = 'USB Power Delivery' desc = 'USB Power Delivery protocol.' license = 'gplv2+' inputs = ['logic'] outputs = ['usb_pd'] tags = ['PC'] channels = ( {'id': 'cc1', 'name': 'CC1', 'desc': 'Configuration Channel 1'}, ) optional_channels = ( {'id': 'cc2', 'name': 'CC2', 'desc': 'Configuration Channel 2'}, ) options = ( {'id': 'fulltext', 'desc': 'Full text decoding of packets', 'default': 'no', 'values': ('yes', 'no')}, ) annotations = ( ('type', 'Packet Type'), ('preamble', 'Preamble'), ('sop', 'Start of Packet'), ('header', 'Header'), ('data', 'Data'), ('crc', 'Checksum'), ('eop', 'End Of Packet'), ('sym', '4b5b symbols'), ('warning', 'Warning'), ('src', 'Source Message'), ('snk', 'Sink Message'), ('payload', 'Payload'), ('text', 'Plain text'), ('VDM','vdm'), #13 ('GOOD CRC','good crc'), ('SOURCE CAP','source cap'), ('REQUEST','request'), ('ACCEPT','accept'), ('PS RDY','ps rdy'), ('DR_Swap','dr swap'), ('Not_Supported','not supported'), ('EPR_Mode','epr mode'), ('EPR_Source_Capabilities','EPRSourceCapabilities'), ('EPR_Request','epr_request'), ('Extended_Control','extended_control'), #24 ('GET SOURCE CAP','get source cap'), ('GET SINK CAP','get sink cap'), ('SINK CAP','sink cap'), ('Source_Info','source info'), ('Sink_Capabilities_Extended','sink capabilities extended'), ('EPR_Sink_Capabilities','epr sink capabilities'), #30 ('REJECT','reject'), ('Soft_Reset','soft reset'), ('Source_Capabilities_Extended','source capabilities extended') #33 ) annotation_rows = ( ('4b5b', 'Symbols', (7,)), ('parts', 'Parts', (1, 2, 3, 4, 5, 6)), ('payloads', 'Payloads', (11,)), ('types', 'Types', (0, 9, 10,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33)), #类型的颜色 ('warnings', 'Warnings', (8,)), ('texts', 'Full text', (12,)), ) binary = ( ('raw-data', 'RAW binary data'), ) stored_pdos = {} def get_source_sink_cap(self, pdo, idx, is_epr=False): t1 = (pdo >> 30) & 0x3 self.cap_mark[idx] = t1 self.voltage_multiplier = 0.1 if is_epr else 0.05 self.current_multiplier = 0.05 if is_epr else 0.01 self.power_multiplier = 0.5 if is_epr else 0.25 # Fixed Supply PDO(固定电压) if t1 == 0: t_name = "EPR_Fixed" if is_epr else "Fixed" mv = ((pdo >> 10) & 0x3ff) * self.voltage_multiplier ma = ((pdo >> 0) & 0x3ff) * self.current_multiplier p = '%gV %gA (%gW)' % (mv, ma, mv*ma) self.stored_pdos[idx] = f"{t_name} {mv}V" # Battery PDO(电池) elif t1 == 1: t_name = "EPR_Battery" if is_epr else "Battery" flags = {} # No flags defined for Battery PDO in PD 3.0 spec minv = ((pdo >> 10) & 0x3ff) * self.voltage_multiplier maxv = ((pdo >> 20) & 0x3ff) * self.voltage_multiplier mw = ((pdo >> 0) & 0x3ff) * self.power_multiplier p = '%g/%gV %gW' % (minv, maxv, mw) self.stored_pdos[idx] = f"{t_name} {minv}/{maxv}V" # Variable Supply PDO(可变电压) elif t1 == 2: t_name = "EPR_Variable" if is_epr else "Variable" flags = {} # No flags defined for Variable PDO in PD 3.0 spec minv = ((pdo >> 10) & 0x3ff) * self.voltage_multiplier maxv = ((pdo >> 20) & 0x3ff) * self.voltage_multiplier ma = ((pdo >> 0) & 0x3ff) * self.current_multiplier p = '%g/%gV %gA' % (minv, maxv, ma) self.stored_pdos[idx] = f"{t_name} {minv}/{maxv}V" # Augmented PDO(扩展类型,如 PPS、AVS) elif t1 == 3: t2 = (pdo >> 28) & 0x3 # 子类型(Bits 28-29) if t2 == 0: t_name = "EPR_PPS" if is_epr else "PPS" pps_current_multiplier = 0.125 if is_epr else 0.05 minv = ((pdo >> 8) & 0xff) * 0.1 # 最小电压 maxv = ((pdo >> 17) & 0xff) * 0.1 # 最大电压 ma = ((pdo >> 0) & 0xff) * pps_current_multiplier # 电流(A) p = '%.1f/%gV %gA' % (minv, maxv, ma) if (pdo >> 27) & 0x1: p += ' [EPR_MODE]' if is_epr else ' [limited]' self.stored_pdos[idx] = f"{t_name} {minv:.1f}/{maxv}V" elif t2 == 1: # EPR AVS(可调电压功率) t_name = "EPR_AVS" minv = ((pdo >> 8) & 0xff) * 0.1 # 最小电压 maxv = ((pdo >> 17) & 0x1ff) * 0.1 # 最大电压 pdp = (pdo & 0xFF) * 1 # 最大功率 ma = pdp / maxv if maxv > 0 else 0 # 解析峰值电流能力(Bits [21:20]) peak_current_mode = (pdo >> 20) & 0x3 peak_info = self._parse_epr_avs_peak_current(peak_current_mode, ma) p = '%g~%gV (%gW)' % (minv, maxv, pdp) # p = '%g~%gV (%gW) %s' % (minv, maxv, pdp,peak_info) self.stored_pdos[idx] = f"{t_name} {minv}-{maxv} {maxv*ma}W" else: t_name = 'EPR_Reserved' if is_epr else 'Reserved' p = '[raw: %08X]' % (pdo) self.stored_pdos[idx] = f"{t_name} {p}" return f"[{t_name}] {p}" def _parse_epr_avs_peak_current(self, mode, nominal_current): peak_modes = { 0: "Peak=Ioc (or check Extended Capabilities)", 1: "Overload: 150%/125%/110% Ioc", 2: "Overload: 200%/150%/125% Ioc", 3: "Overload: 200%/175%/150% Ioc", } return peak_modes.get(mode, "Unknown peak mode") def get_request(self, rdo): pos = (rdo >> 28) & 0x0F if pos == 0 or pos >= 0x0E: return f"(RDO Invalid Position {pos})" mark = self.cap_mark[pos] give_back = (rdo >> 27) & 0x1 if mark == 3: op_v = ((rdo >> 9) & 0x7ff) * 0.02 op_a = (rdo & 0x7f) * 0.05 t_settings = ') / (%gV %gA)' % (op_v, op_a) elif mark == 2: if give_back: op_w = ((rdo >> 10) & 0x3ff) * 0.25 minp_w = (rdo & 0x3ff) * 0.25 t_settings = '%gW (operating) %gW(min)' % (op_w,minp_w) else: op_w = ((rdo >> 10) & 0x3ff) * 0.25 maxp_w = (rdo & 0x3ff) * 0.25 t_settings = '%gW (operating) %gW(max)' % (op_w,maxp_w) else: if give_back: op_a = ((rdo >> 10) & 0x3ff) * 0.01 min_a = (rdo & 0x3ff) * 0.01 t_settings = '%gA) / %gA (op current)' % (min_a, op_a) else: op_a = ((rdo >> 10) & 0x3ff) * 0.01 max_a = (rdo & 0x3ff) * 0.01 t_settings = '%gA) / %gA (op current)' % (max_a, op_a) if pos in self.stored_pdos.keys(): t_rdo = '%d: %s' % (pos, self.stored_pdos[pos]) else: t_rdo = '%d:' % (pos) return '(RDO %s %s' % (t_rdo, t_settings) def _parse_peak_current(self, mode): peak_modes = { 0: "Ioc only", 1: "150%/125%/110% Ioc", 2: "200%/150%/125% Ioc", 3: "200%/175%/150% Ioc", } return peak_modes.get(mode, "Unknown peak mode") def get_vdm(self, idx, data): if idx == 0: # VDM header vid = data >> 16 struct = data & (1 << 15) txt = 'VDM' if struct: # Structured VDM cmd = data & 0x1f src = data & (1 << 5) ack = (data >> 6) & 3 pos = (data >> 8) & 7 ver = (data >> 13) & 3 txt = VDM_ACK[ack] + ' ' txt += VDM_CMDS[cmd] if cmd in VDM_CMDS else 'cmd?' txt += ' pos %d' % (pos) if pos else ' ' else: # Unstructured VDM txt = 'unstruct [%04x]' % (data & 0x7fff) txt += ' SVID:%04x' % (vid) else: # VDM payload txt = 'VDO:%08x' % (data) return txt def get_bist(self, idx, data): mode = data >> 28 counter = data & 0xffff mode_name = BIST_MODES[mode] if mode in BIST_MODES else 'INVALID' if mode == 2: mode_name = 'Counter[= %d]' % (counter) # TODO: Check all 0 bits are 0 / emit warnings. return 'mode %s' % (mode_name) if idx == 0 else 'invalid BRO' def get_hex(self, idx, data): txt = '%02x' % ((data >> 8)&0xFF) txt += ' %02x' % ((data >> 0)&0xFF) txt += ' %02x' % ((data >> 24)&0xFF) txt += ' %02x' % ((data >> 16)&0xFF) return txt def get_hex16(self, data): txt = '%02x' % ((data >> 8)&0xFF) txt += ' %02x' % ((data >> 0)&0xFF) return txt def get_epr_mode(self, idx, data): txt = EPR_MODE_ACTION[data >> 24] if data >> 24 == 4: txt += ' ' txt += EPR_MODE_DATA[(data >> 16) & 0xFF] return txt def get_ext_control_type(self, idx, data): txt = EXT_CONTROL_MSG_TYPES[data >> 16] return txt def put_ext_head(self, s0, s1): txt = 'Chunked: {} '. format(self.chunked) txt += 'Chunk Num: {} '.format(self.chunk_num) txt += 'Req Chunk: {} '.format(self.req_chunk) txt += 'Data Size: {} '.format(self.data_size) self.putx(s0, s1, [11, [txt, txt]]) def putpayload(self, s0, s1, idx): # Extended Message Processing if self.head_ext() == 1: if idx >= len(self.extdata): self.putwarn(f'Index {idx} out of range for extdata','IDX!') return if self.extdata[idx] == 0: return # TODO: to decode Extended Message Types t = self.head_type() txt = f'[{idx+1}] ' extdata_val = self.extdata[idx] if idx < len(self.extdata) else 0 if t == 16: # Extended_Control txt += self.get_ext_control_type(idx, extdata_val) elif t == 17 or t == 18: # EPR_Source_Capabilities/EPR_Sink_Capabilities txt += self.get_source_sink_cap(extdata_val, idx+1, is_epr=False) elif t == 1: #Source_Capabilities_Extended txt += self.parse_source_capabilities_extended(extdata_val,idx) else: txt +=f"Unhandled ext type: {t}" self.putx(s0, s1, [11, [txt, txt]]) self.text += ' - ' + txt return # Non-Extended Message processing else: if self.head_count() == 0: # Control Messages t = self.head_type() txt = '['+str(idx+1)+'] ' else: # Data Message t = self.head_type() txt = '['+str(idx+1)+'] ' data_val = self.data[idx] if t == 2: #Request txt += self.get_request(data_val) elif t == 9: #EPR_Request if idx == 0: txt += self.get_request(data_val) else: txt += self.get_source_sink_cap(data_val, idx+1) elif t == 1 or t == 4: # SOURCE CAP / SINK CAP txt += self.get_source_sink_cap(data_val, idx+1) elif t == 15: # VDM txt += self.get_vdm(idx, data_val) elif t == 3: # BIST txt += self.get_bist(idx, data_val) elif t == 10: # EPR_Mode txt += self.get_epr_mode(idx, data_val) else: pass self.putx(s0, s1, [11, [txt, txt]]) self.text += ' - ' + txt def parse_source_capabilities_extended(self, extdata_val, idx): # 解析 Source_Capabilities_Extended 消息 if idx == 0: vid = (extdata_val >> 0) & 0xFFFF pid = (extdata_val >> 16) & 0xFFFF return f"VID:0x{vid:04X} PID:0x{pid:04X}" elif idx == 1: return f"XID:0x{extdata_val:08X}" elif idx == 2: # 字节8: FW Version (固件版本) fw_ver = (extdata_val >> 0) & 0xFF # 字节9: HW Version (硬件版本) hw_ver = (extdata_val >> 8) & 0xFF # 字节10: Voltage Regulation (电压调节) vreg = (extdata_val >> 16) & 0xFF # 字节11: Holdup Time (保持时间) holdup_time = (extdata_val >> 24) & 0xFF # 解析负载阶跃和IOC load = "150mA" if (vreg & 0b11) == 0 else "500mA" if (vreg & 0b11) == 1 else "RSVD" ioc = "25%" if (vreg >> 2) & 0b1 == 0 else "90%" # Holdup Time解释(0x00表示不支持) holdup_str = "Not supported" if holdup_time == 0 else f"{holdup_time}ms" return f"FW:{fw_ver} HW:{hw_ver} Load:{load} IOC:{ioc} Hold:{holdup_str}" elif idx == 3: # 字节12: Compliance (合规性) compliance = (extdata_val >> 0) & 0xFF # 字节13: Touch Current (接触电流) touch_current = (extdata_val >> 8) & 0xFF # 字节14-15: Peak Current1 (峰值电流1) peak_current = (extdata_val >> 16) & 0xFFFF # 解析Compliance lps = "Y" if compliance & 0b1 else "N" ps1 = "Y" if compliance & 0b10 else "N" ps2 = "Y" if compliance & 0b100 else "N" # 解析Touch Current low_current = "Y" if touch_current & 0b1 else "N" ground_pin = "Y" if touch_current & 0b10 else "N" protective_earth = "Y" if touch_current & 0b100 else "N" # 解析Peak Current1 overload = min((peak_current & 0x1F) * 10, 250) # 0-250% period = ((peak_current >> 5) & 0x3F) * 20 # 单位ms duty = ((peak_current >> 11) & 0xF) * 5 # 占空比5-75% droop = "Y" if (peak_current >> 15) & 0b1 else "N" return ( f"Comp:LPS={lps} PS1={ps1} PS2={ps2} | " f"Touch:Low={low_current} Ground={ground_pin} PE={protective_earth} | " f"Peak1:OL={overload}%/{period}ms/Duty={duty}% Droop={droop}" ) elif idx == 4: # 字节16-17: Peak Current2 (16位) pc2 = (extdata_val >> 0) & 0xFFFF # 字节18-19: Peak Current3 (16位) pc3 = (extdata_val >> 16) & 0xFFFF # 解析Peak Current2 ol2 = min((pc2 & 0x1F) * 10, 250) # 过载百分比 (0-250%) per2 = ((pc2 >> 5) & 0x3F) * 20 # 过载周期 (单位ms) duty2 = ((pc2 >> 11) & 0xF) * 5 # 占空比 (5-75%) droop2 = "Y" if (pc2 >> 15) & 0b1 else "N" # 电压跌落 # 解析Peak Current3 ol3 = min((pc3 & 0x1F) * 10, 250) per3 = ((pc3 >> 5) & 0x3F) * 20 duty3 = ((pc3 >> 11) & 0xF) * 5 droop3 = "Y" if (pc3 >> 15) & 0b1 else "N" return ( f"Peak2:OL={ol2}%/{per2}ms/Duty={duty2}% Droop={droop2} | " f"Peak3:OL={ol3}%/{per3}ms/Duty={duty3}% Droop={droop3}" ) elif idx == 5: # 字节20: Touch Temp (温度标准) touch_temp = (extdata_val >> 0) & 0xFF # 字节21: Source Inputs (电源输入配置) source_inputs = (extdata_val >> 8) & 0xFF # 字节22: Battery Slots (电池槽数量) batteries = (extdata_val >> 16) & 0xFF # 字节23: SPR Rating (标准功率范围评级) spr_rating = (extdata_val >> 24) & 0x7F # 注意bit7保留 # 解析Touch Temp temp_map = {0: "60950", 1: "62368-1", 2: "62368-2"} temp_str = temp_map.get(touch_temp, "RSVD") # 解析Source Inputs ext_supply = "Y" if source_inputs & 0b1 else "N" ext_constrained = "" if source_inputs & 0b1: ext_constrained = "Const" if not (source_inputs & 0b10) else "Unconst" int_battery = "Y" if source_inputs & 0b100 else "N" # 解析Battery Slots hot_swappable = (batteries >> 4) & 0xF # 高4位 fixed_batteries = batteries & 0xF # 低4位 return ( f"Temp:{temp_str} | " f"Ext:{ext_supply}{'('+ext_constrained+')' if ext_constrained else ''} " f"Batt:{int_battery} | " f"Slots:H{hot_swappable}/F{fixed_batteries} | " f"SPR:{spr_rating}" ) elif idx == 6: # 字节24: EPR Source PDP Rating (扩展功率范围评级) epr_rating = (extdata_val >> 0) & 0x7F # 注意bit7保留 # 字节25-27: 保留或其他扩展字段(根据协议补充) return f"EPR:{epr_rating}" return "N/A" def puthead(self): t = self.head_type() if self.head_ext() == 1: shortm = EXT_TYPES[t] if t in EXT_TYPES else 'EXT???' elif self.head_count() == 0: shortm = CTRL_TYPES[t] if t in CTRL_TYPES else 'CTR???' else: shortm = DATA_TYPES[t] if t in DATA_TYPES else 'DAT???' role = 'SRC' if self.head_power_role() else 'SNK' if self.head_data_role() != self.head_power_role(): role += '/DFP' if self.head_data_role() else '/UFP' longm = '(r{:d}) {:s}[{:d}]: {:s}'.format(self.head_rev(), role, self.head_id(), shortm) ann_type = self._ann_type_map.get(shortm.upper(), 0) if ann_type == 0: ann_type = 13 if self.head_power_role() else 14 self.putx(0, -1, [ann_type, [longm, shortm]]) self.text += longm def head_ext(self): if self.head_rev() == 3: return (self.head >> 15) & 1 return 0 def head_id(self): return (self.head >> 9) & 7 def head_power_role(self): return (self.head >> 8) & 1 def head_data_role(self): return (self.head >> 5) & 1 def head_rev(self): return ((self.head >> 6) & 3) + 1 def head_type(self): if self.head_rev() == 3: return self.head & 0x1F return self.head & 0xF def head_count(self): return (self.head >> 12) & 7 def exthead_size(self): return (self.exthead) & 511 def putx(self, s0, s1, data): self.put(self.edges[s0], self.edges[s1], self.out_ann, data) def putwarn(self, longm, shortm): self.putx(0, -1, [8, [longm, shortm]]) def compute_crc32(self): if self.head_ext() == 1: bdata = struct.pack(' 0xF: self.putwarn(f'Invalid symbol: {symbol}', 'SYM!') return ERROR_CODE # TODO: Check bad symbols. val |= (symbol << (4 * i)) self.idx += SHORT_BITS return val def get_word(self, rec=True): ERROR_CODE = 0x0BAD0BAD lo = self.get_short(rec) hi = self.get_short(rec) if lo == 0x0BAD or hi == 0x0BAD: self.putwarn('Failed to read word', 'WORD!') return ERROR_CODE return (hi << 16) | lo def find_corrupted_sop(self, k): # Start of packet are valid even if they have only 3 correct symbols # out of 4. for seq in SOP_SEQUENCES: if [k[i] == seq[i] for i in range(len(k))].count(True) >= 3: return START_OF_PACKETS[seq] return None def scan_eop(self): for i in range(len(self.bits) - 19): k = (self.get_sym(i, rec=False), self.get_sym(i+5, rec=False), self.get_sym(i+10, rec=False), self.get_sym(i+15, rec=False)) sym = START_OF_PACKETS.get(k, None) if not sym: sym = self.find_corrupted_sop(k) # We have an interesting symbol sequence. if sym: # Annotate the preamble. self.putx(0, i, [1, ['Preamble', '...']]) # Annotate each symbol. self.rec_sym(i, k[0]) self.rec_sym(i+5, k[1]) self.rec_sym(i+10, k[2]) self.rec_sym(i+15, k[3]) if sym == 'Hard Reset': # self.text += 'HRST' self.text = 'Receive Hard Reset' return -1 # Hard reset elif sym == 'Cable Reset': self.text = 'Receive Cable Reset' # self.text += 'CRST' return -1 # Cable reset else: self.putx(i, i+20, [2, [sym, 'S']]) return i+20 self.putx(0, len(self.bits), [1, ['Junk???', 'XXX']]) self.text += 'Junk???' self.putwarn('No start of packet found', 'XXX') return -1 # No Start Of Packet def __init__(self): self.reset() def reset(self): self.samplerate = None self.idx = 0 self.packet_seq = 0 self.previous = 0 self.startsample = None self.bits = [] self.edges = [] self.bad = [] self.half_one = False self.start_one = 0 self.stored_pdos = {} # self.cap_mark = [0, 0, 0, 0, 0, 0, 0, 0] self.cap_mark = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] # 支持pos 1-13 def metadata(self, key, value): if key == srd.SRD_CONF_SAMPLERATE: self.samplerate = value # 0 is 2 UI, space larger than 1.5x 0 is definitely wrong. self.maxbit = self.us2samples(3 * UI_US) ## 9600 波特率下,1 比特 = 104.16us # Duration threshold between half 1 and 0. self.threshold = self.us2samples(THRESHOLD_US) def start(self): self.out_ann = self.register(srd.OUTPUT_ANN) self.out_binary = self.register(srd.OUTPUT_BINARY) self.out_bitrate = self.register( srd.OUTPUT_META, meta=(int, 'Bitrate', 'Bitrate during the packet') ) self._ann_type_map = { name.upper(): idx for idx, (name, desc) in enumerate(self.annotations) } def us2samples(self, us): return int(us * self.samplerate / 1000000) def decode_packet(self): self.data = [] self.extcrc = [] self.extdata = [] self.idx = 0 self.text = '' if len(self.edges) < 50: # self.putwarn('Packet too short', 'SHORT!') return # Not a real PD packet self.packet_seq += 1 tstamp = float(self.startsample) / self.samplerate self.text += '#%-4d (%8.6fms): ' % (self.packet_seq, tstamp*1000) self.idx = self.scan_eop() if self.idx < 0: # Full text trace of the issue. self.putx(0, self.idx, [12, [self.text, '...']]) return # No real packet: ABORT. # Packet header self.head = self.get_short() self.putx(self.idx-20, self.idx, [3, ['H:%04x' % (self.head), 'HD']]) self.puthead() # Packet header if self.head_ext() == 1: # Extend Header self.exthead = self.get_short() self.chunked = bool((self.exthead >> 15) & 0x01) self.chunk_num = (self.exthead >> 11) & 0x0F self.req_chunk = bool((self.exthead >> 10) & 0x01) self.data_size = self.exthead & 0x01FF self.putx(self.idx-20, self.idx, [3, ['EH:%04X' % (self.exthead), 'EHD']]) self.idx -= 20 # Save Data For Calculate CRC for i in range(self.head_count()): self.extcrc.append(self.get_word(False)) self.idx -= (self.head_count()-1) * 40 + 20 # Extended Message if self.chunk_num == 0: # First Part:source和sink两端支持不分块 for i in range(self.head_count()): if i == self.head_count() - 1: Data = self.get_short() self.extdata.append(Data<<16) # 将Data数据存入extdata列表中 self.putx(self.idx-20, self.idx, [4, ['[%d]%04x' % (i, Data), 'D%d' % (i)]]) self.putpayload(self.idx-20, self.idx, i) else: Data = self.get_word() self.extdata.append(Data) self.putx(self.idx-40, self.idx, [4, ['[%d]%08x' % (i, Data), 'D%d' % (i)]]) self.putpayload(self.idx-40, self.idx, i) else: for i in range(self.head_count()): if i == 0: Data = self.get_short() self.extdata.append(Data<<16) self.putx(self.idx-20, self.idx, [4, ['[%d]%04x' % (i, Data), 'D%d' % (i)]]) self.putpayload(self.idx-20, self.idx, i) else: Data = self.get_word() self.extdata.append(Data) self.putx(self.idx-40, self.idx, [4, ['[%d]%08x' % (i, Data), 'D%d' % (i)]]) self.putpayload(self.idx-40, self.idx, i) # Control Message or Data Message else: for i in range(self.head_count()): Data_1 = self.get_word() self.data.append(Data_1) self.putx(self.idx-40, self.idx, [4, ['[%d]%08x' % (i, Data_1), 'D%d' % (i)]]) self.putpayload(self.idx-40, self.idx, i) # CRC check self.crc = self.get_word() ccrc = self.compute_crc32() if self.crc != ccrc: self.putwarn('Bad CRC %08x != %08x' % (self.crc, ccrc), 'CRC!') self.putx(self.idx-40, self.idx, [5, ['CRC:%08x' % (self.crc), 'CRC']]) # End of Packet if len(self.bits) >= self.idx + 5 and self.get_sym(self.idx) == EOP: self.putx(self.idx, self.idx + 5, [6, ['EOP', 'E']]) self.idx += 5 else: self.putwarn('No EOP', 'EOP!') # Full text trace if self.options['fulltext'] == 'yes': self.putx(0, self.idx, [12, [self.text, '...']]) # Meta data for bitrate ss, es = self.edges[0], self.edges[-1] bitrate = self.samplerate*len(self.bits) / float(es - ss) self.put(es, ss, self.out_bitrate, int(bitrate)) # Raw binary data (BMC decoded) self.put(es, ss, self.out_binary, [0, bytes(self.bits)]) def decode(self): if not self.samplerate: raise SamplerateError('Cannot decode without samplerate.') while True: pins = self.wait([{0: 'e'}, {1: 'e'}, {'skip': int(self.samplerate/1e3)}]) # First sample of the packet, just record the start date. if not self.startsample: self.startsample = self.samplenum self.previous = self.samplenum continue diff = self.samplenum - self.previous # Large idle: use it as the end of packet. if diff > self.maxbit: # The last edge of the packet. self.edges.append(self.previous) # Export the packet. self.decode_packet() # Reset for next packet. self.startsample = self.samplenum self.bits = [] self.edges = [] self.bad = [] self.half_one = False self.start_one = 0 else: # Add the bit to the packet. is_zero = diff > self.threshold if is_zero and not self.half_one: self.bits.append(0) self.edges.append(self.previous) elif not is_zero and self.half_one: self.bits.append(1) self.edges.append(self.start_one) self.half_one = False elif not is_zero and not self.half_one: self.half_one = True self.start_one = self.previous else: # Invalid BMC sequence self.bad.append((self.start_one, self.previous)) # TODO: Try to recover. self.bits.append(0) self.edges.append(self.previous) self.half_one = False self.previous = self.samplenum