## ## This file is part of the libsigrokdecode project. ## ## Copyright (C) 2015 Google, Inc ## Copyright (C) 2018 davidanger ## Copyright (C) 2018 Peter Hazenberg ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, see . ## import os import json import sigrokdecode as srd import struct import zlib # for crc32 PERSISTED_PDO_CACHE = { 'source': { 'stored_pdos': {}, 'pdo_meta': {}, }, 'sink': { 'stored_pdos': {}, 'pdo_meta': {}, }, } # BMC encoding with a 600kHz datarate UI_US = 1000000/600000.0 # Threshold to discriminate half-1 from 0 in Binary Mark Conding THRESHOLD_US = (UI_US + 2 * UI_US) / 2 # Control Message type(0-24) CTRL_TYPES = { 0: 'reserved', 1: 'GOOD CRC', 2: 'GOTO MIN', 3: 'ACCEPT', 4: 'REJECT', 5: 'PING', 6: 'PS RDY', 7: 'GET SOURCE CAP', 8: 'GET SINK CAP', 9: 'DR_Swap', 10: 'PR_Swap', 11: 'VCONN_Swap', 12: 'Wait', 13: 'Soft_Reset', 14: 'Data_Reset', 15: 'Data_Reset_Complete', 16: 'Not_Supported', 17: 'Get_Source_Cap_Extended', 18: 'Get_Status', 19: 'FR_Swap', 20: 'Get_PPS_Status', 21: 'Get_Country_Codes', 22: 'Get_Sink_Cap_Extended', 23: 'Get_Source_Info', 24: 'Get_Revision' } # Data message type(1-15) DATA_TYPES = { 1: 'SOURCE CAP', 2: 'REQUEST', 3: 'BIST', 4: 'SINK CAP', 5: 'Battery_Status', 6: 'Alert', 7: 'Get_Country_Info', 8: 'Enter_USB', 9: 'EPR_Request', 10: 'EPR_Mode', 11: 'Source_Info', 12: 'Revision', 15: 'VDM' } # Extended message type(1-30) EXT_TYPES = { 1: 'Source_Capabilities_Extended', 2: 'Status', 3: 'Get_Battery_Cap', 4: 'Get_Battery_Status', 5: 'Battery_Capabilities', 6: 'Get_Manufacturer_Info', 7: 'Manufacturer_Info', 8: 'Security_Request', 9: 'Security_Response', 10: 'Firmware_Update_Request', 11: 'Firmware_Update_Response', 12: 'PPS_Status', 13: 'Country_Info', 14: 'Country_Codes', 15: 'Sink_Capabilities_Extended',# 16: 'Extended_Control', 17: 'EPR_Source_Capabilities', 18: 'EPR_Sink_Capabilities',# 30: 'Vendor_Defined_Extended' } # 4b5b encoding of the symbols DEC4B5B = [ 0x10, # Error 00000 0x10, # Error 00001 0x10, # Error 00010 0x10, # Error 00011 0x10, # Error 00100 0x10, # Error 00101 0x13, # Sync-3 00110 0x14, # RST-1 00111 0x10, # Error 01000 0x01, # 1 = 0001 01001 0x04, # 4 = 0100 01010 0x05, # 5 = 0101 01011 0x10, # Error 01100 0x16, # EOP 01101 0x06, # 6 = 0110 01110 0x07, # 7 = 0111 01111 0x10, # Error 10000 0x12, # Sync-2 10001 0x08, # 8 = 1000 10010 0x09, # 9 = 1001 10011 0x02, # 2 = 0010 10100 0x03, # 3 = 0011 10101 0x0A, # A = 1010 10110 0x0B, # B = 1011 10111 0x11, # Sync-1 11000 0x15, # RST-2 11001 0x0C, # C = 1100 11010 0x0D, # D = 1101 11011 0x0E, # E = 1110 11100 0x0F, # F = 1111 11101 0x00, # 0 = 0000 11110 0x10, # Error 11111 ] SYM_ERR = 0x10 SYNC1 = 0x11 SYNC2 = 0x12 SYNC3 = 0x13 RST1 = 0x14 RST2 = 0x15 EOP = 0x16 SYNC_CODES = [SYNC1, SYNC2, SYNC3] HRST_CODES = [RST1, RST1, RST1, RST2] SOP_SEQUENCES = [ (SYNC1, SYNC1, SYNC1, SYNC2), (SYNC1, SYNC1, SYNC3, SYNC3), (SYNC1, SYNC3, SYNC1, SYNC3), (SYNC1, RST2, RST2, SYNC3), (SYNC1, RST2, SYNC3, SYNC2), (RST1, SYNC1, RST1, SYNC3), (RST1, RST1, RST1, RST2), ] START_OF_PACKETS = { SOP_SEQUENCES[0]: 'SOP', SOP_SEQUENCES[1]: "SOP1", SOP_SEQUENCES[2]: 'SOP2', SOP_SEQUENCES[3]: "SOP1 Debug", SOP_SEQUENCES[4]: 'SOP2 Debug', SOP_SEQUENCES[5]: 'Cable Reset', SOP_SEQUENCES[6]: 'Hard Reset', } SYM_NAME = [ ['0x0', '0'], ['0x1', '1'], ['0x2', '2'], ['0x3', '3'], ['0x4', '4'], ['0x5', '5'], ['0x6', '6'], ['0x7', '7'], ['0x8', '8'], ['0x9', '9'], ['0xA', 'A'], ['0xB', 'B'], ['0xC', 'C'], ['0xD', 'D'], ['0xE', 'E'], ['0xF', 'F'], ['ERROR', 'X'], ['SYNC-1', 'S1'], ['SYNC-2', 'S2'], ['SYNC-3', 'S3'], ['RST-1', 'R1'], ['RST-2', 'R2'], ['EOP', '#'], ] RDO_FLAGS = { (1 << 23): 'unchunked', (1 << 24): 'no_suspend', (1 << 25): 'comm_cap', (1 << 26): 'cap_mismatch', (1 << 27): 'give_back' } BIST_MODES = { 0: 'Receiver', 1: 'Transmit', 2: 'Counters', 3: 'Carrier 0', 4: 'Carrier 1', 5: 'Carrier 2', 6: 'Carrier 3', 7: 'Eye', } VDM_CMDS = { 1: 'Disc Ident', 2: 'Disc SVID', 3: 'Disc Mode', 4: 'Enter Mode', 5: 'Exit Mode', 6: 'Attention', # 16..31: SVID Specific Commands # DisplayPort Commands 16: 'DP Status', 17: 'DP Configure', } VDM_ACK = ['REQ', 'ACK', 'NAK', 'BSY'] EPR_MODE_ACTION = { 1: 'Enter', 2: 'Enter Acknowledged', 3: 'Enter Succeeded', 4: 'Enter Failed', 5: 'Exit' } EPR_MODE_DATA = { 0: 'Unknown cause', 1: 'Cable not EPR capable', 2: 'Source failed to become Vconn source', 3: 'EPR Mode Capable bit not set in RDO', 4: 'Source unable to enter EPR Mode at this time', 5: 'EPR Mode Capable bit not set in PDO' } EXT_CONTROL_MSG_TYPES = { 1: 'EPR Get Source_Cap', 2: 'EPR Get Sink Cap', 3: 'EPR KeepAlive', 4: 'EPR KeepAlive Ack' } PEAK_CURRENT_DETAILS = { 0: { 'code': '00b', 'summary': 'IoC only / see Source_Cap_Extended', 'steps': [], }, 1: { 'code': '01b', 'summary': '150/125/110% IoC overload profile', 'steps': [ '150% IoC for 1ms @ 5%', '125% IoC for 2ms @ 10%', '110% IoC for 10ms @ 50%', ], }, 2: { 'code': '10b', 'summary': '200/150/125% IoC overload profile', 'steps': [ '200% IoC for 1ms @ 5%', '150% IoC for 2ms @ 10%', '125% IoC for 10ms @ 50%', ], }, 3: { 'code': '11b', 'summary': '200/175/150% IoC overload profile', 'steps': [ '200% IoC for 1ms @ 5%', '175% IoC for 2ms @ 10%', '150% IoC for 10ms @ 50%', ], }, } class SamplerateError(Exception): pass class Decoder(srd.Decoder): api_version = 3 id = 'usb_power_delivery' name = 'USB PD' longname = 'USB Power Delivery' desc = 'USB Power Delivery protocol.' license = 'gplv2+' inputs = ['logic'] outputs = ['usb_pd'] tags = ['PC'] channels = ( {'id': 'cc1', 'name': 'CC1', 'desc': 'Configuration Channel 1'}, ) optional_channels = ( {'id': 'cc2', 'name': 'CC2', 'desc': 'Configuration Channel 2'}, ) options = ( {'id': 'fulltext', 'desc': 'Full text decoding of packets', 'default': 'no', 'values': ('yes', 'no')}, ) annotations = ( ('type', 'Packet Type'), ('preamble', 'Preamble'), ('sop', 'Start of Packet'), ('header', 'Header'), ('data', 'Data'), ('crc', 'Checksum'), ('eop', 'End Of Packet'), ('sym', '4b5b symbols'), ('warning', 'Warning'), ('src', 'Source Message'), ('snk', 'Sink Message'), ('payload', 'Payload'), ('text', 'Plain text'), ('VDM','vdm'), #13 ('GOOD CRC','good crc'), ('SOURCE CAP','source cap'), ('REQUEST','request'), ('ACCEPT','accept'), ('PS RDY','ps rdy'), ('DR_Swap','dr swap'), ('Not_Supported','not supported'), ('EPR_Mode','epr mode'), ('EPR_Source_Capabilities','EPRSourceCapabilities'), ('EPR_Request','epr_request'), ('Extended_Control','extended_control'), #24 ('GET SOURCE CAP','get source cap'), ('GET SINK CAP','get sink cap'), ('SINK CAP','sink cap'), ('Source_Info','source info'), ('Sink_Capabilities_Extended','sink capabilities extended'), ('EPR_Sink_Capabilities','epr sink capabilities'), #30 ('REJECT','reject'), ('Soft_Reset','soft reset'), ('Source_Capabilities_Extended','source capabilities extended'), #33 ('VCONN_Swap','vconn swap'), #34 ('Get_PPS_Status','get pps status'), ('Revision','revision'), ('Get_Status','get status'), ('PPS_Status','pps status'), ('Country_Codes','country codes'), ('Get_Sink_Cap_Extended','get_sink_cap_extended'), ('BIST','bist') , ('Battery_Status','battery_status'), ('Get_Country_Info','get_country_info'), ('Alert','alert'), ('Enter_USB','enter_usb'), ('Get_Source_Cap_Extended','get_source_cap_extended'), ('Wait','Wait'), ('Get_Manufacturer_Info','get_manufacturer_info'), ('Manufacturer_Info','manufacturer_info'), ('Get_Battery_Cap','get_battery_cap'), ('Get_Battery_Status','get_battery_status'), ('Vendor_Defined_Extended','vendor_defined_extended'), ('Battery_Capabilities','battery_capabilities'), ('Security_Request','security_request'), ('Security_Response','security_response'), ('PR_Swap','pr_swap'), ('FR_Swap','fr_swap'), ('Data_Reset','data_reset'), ('Data_Reset_Complete','data_reset_complete'), ('Firmware_Update_Request','firmware_update_request'), ) annotation_rows = ( ('4b5b', 'Symbols', (7,)), ('parts', 'Parts', (1, 2, 3, 4, 5, 6)), ('payloads', 'Payloads', (11,)), ('types', 'Types', (0, 9, 10,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,)), #类型的颜色 ('warnings', 'Warnings', (8,)), ('texts', 'Full text', (12,)), ) binary = ( ('raw-data', 'RAW binary data'), ) stored_pdos = {} def get_source_sink_cap(self, pdo, idx, is_epr=False): t1 = (pdo >> 30) & 0x3 self.cap_mark[idx] = t1 self.voltage_multiplier = 0.1 if is_epr else 0.05 self.current_multiplier = 0.05 if is_epr else 0.01 self.power_multiplier = 0.5 if is_epr else 0.25 # Fixed Supply PDO(固定电压) if t1 == 0: t_name = "EPR_Fixed" if is_epr else "Fixed" mv = ((pdo >> 10) & 0x3ff) * self.voltage_multiplier ma = ((pdo >> 0) & 0x3ff) * self.current_multiplier p = '%gV %gA (%gW)' % (mv, ma, mv*ma) self.stored_pdos[idx] = f"{t_name} {mv}V" # Battery PDO(电池) elif t1 == 1: t_name = "EPR_Battery" if is_epr else "Battery" flags = {} # No flags defined for Battery PDO in PD 3.0 spec minv = ((pdo >> 10) & 0x3ff) * self.voltage_multiplier maxv = ((pdo >> 20) & 0x3ff) * self.voltage_multiplier mw = ((pdo >> 0) & 0x3ff) * self.power_multiplier p = '%g/%gV %gW' % (minv, maxv, mw) self.stored_pdos[idx] = f"{t_name} {minv}/{maxv}V" # Variable Supply PDO(可变电压) elif t1 == 2: t_name = "EPR_Variable" if is_epr else "Variable" flags = {} # No flags defined for Variable PDO in PD 3.0 spec minv = ((pdo >> 10) & 0x3ff) * self.voltage_multiplier maxv = ((pdo >> 20) & 0x3ff) * self.voltage_multiplier ma = ((pdo >> 0) & 0x3ff) * self.current_multiplier p = '%g/%gV %gA' % (minv, maxv, ma) self.stored_pdos[idx] = f"{t_name} {minv}/{maxv}V" # Augmented PDO(扩展类型,如 PPS、AVS) elif t1 == 3: t2 = (pdo >> 28) & 0x3 # 子类型(Bits 28-29) if t2 == 0: t_name = "EPR_PPS" if is_epr else "PPS" pps_current_multiplier = 0.125 if is_epr else 0.05 minv = ((pdo >> 8) & 0xff) * 0.1 # 最小电压 maxv = ((pdo >> 17) & 0xff) * 0.1 # 最大电压 ma = ((pdo >> 0) & 0xff) * pps_current_multiplier # 电流(A) p = '%.1f/%gV %gA' % (minv, maxv, ma) if (pdo >> 27) & 0x1: p += ' [EPR_MODE]' if is_epr else ' [limited]' self.stored_pdos[idx] = f"{t_name} {minv:.1f}/{maxv}V" elif t2 == 1: # EPR AVS(可调电压功率) t_name = "EPR_AVS" minv = ((pdo >> 8) & 0xff) * 0.1 # 最小电压 maxv = ((pdo >> 17) & 0x1ff) * 0.1 # 最大电压 pdp = (pdo & 0xFF) * 1 # 最大功率 ma = pdp / maxv if maxv > 0 else 0 # 解析峰值电流能力(Bits [21:20]) peak_current_mode = (pdo >> 20) & 0x3 peak_info = self._parse_epr_avs_peak_current(peak_current_mode, ma) p = '%g~%gV (%gW)' % (minv, maxv, pdp) # p = '%g~%gV (%gW) %s' % (minv, maxv, pdp,peak_info) self.stored_pdos[idx] = f"{t_name} {minv}-{maxv} {maxv*ma}W" else: t_name = 'EPR_Reserved' if is_epr else 'Reserved' p = '[raw: %08X]' % (pdo) self.stored_pdos[idx] = f"{t_name} {p}" return f"[{t_name}] {p}" def _parse_epr_avs_peak_current(self, mode, nominal_current): peak_modes = { 0: "Peak=Ioc (or check Extended Capabilities)", 1: "Overload: 150%/125%/110% Ioc", 2: "Overload: 200%/150%/125% Ioc", 3: "Overload: 200%/175%/150% Ioc", } return peak_modes.get(mode, "Unknown peak mode") def get_request(self, rdo): pos = (rdo >> 28) & 0x0F if pos == 0 or pos >= 0x0E: return f"(RDO Invalid Position {pos})" mark = self.cap_mark[pos] give_back = (rdo >> 27) & 0x1 if mark == 3: op_v = ((rdo >> 9) & 0x7ff) * 0.02 op_a = (rdo & 0x7f) * 0.05 t_settings = ') / (%gV %gA)' % (op_v, op_a) elif mark == 2: if give_back: op_w = ((rdo >> 10) & 0x3ff) * 0.25 minp_w = (rdo & 0x3ff) * 0.25 t_settings = '%gW (operating) %gW(min)' % (op_w,minp_w) else: op_w = ((rdo >> 10) & 0x3ff) * 0.25 maxp_w = (rdo & 0x3ff) * 0.25 t_settings = '%gW (operating) %gW(max)' % (op_w,maxp_w) else: if give_back: op_a = ((rdo >> 10) & 0x3ff) * 0.01 min_a = (rdo & 0x3ff) * 0.01 t_settings = '%gA) / %gA (op current)' % (min_a, op_a) else: op_a = ((rdo >> 10) & 0x3ff) * 0.01 max_a = (rdo & 0x3ff) * 0.01 t_settings = '%gA) / %gA (op current)' % (max_a, op_a) if pos in self.stored_pdos.keys(): t_rdo = '%d: %s' % (pos, self.stored_pdos[pos]) else: t_rdo = '%d:' % (pos) return '(RDO %s %s' % (t_rdo, t_settings) def _parse_peak_current(self, mode): peak_modes = { 0: "Ioc only", 1: "150%/125%/110% Ioc", 2: "200%/150%/125% Ioc", 3: "200%/175%/150% Ioc", } return peak_modes.get(mode, "Unknown peak mode") def _fmt_num(self, value): return '%g' % value def _lang(self): return os.environ.get('ATK_LANG', 'zh_CN') def _tr(self, zh_text, en_text): return en_text if self._lang() == 'en_US' else zh_text def _role_text(self, role): return 'Source' if role == 'source' else 'Sink' def _ref_state_text(self, state): return { 'live': self._tr('实时', 'Live'), 'cache': self._tr('缓存', 'Cache'), 'none': self._tr('无', 'None'), }.get(state, state) def _detail_item(self, key, value): return {'key': key, 'value': str(value)} def _detail_payload(self, kind, idx, raw, summary, items): return json.dumps({ 'format': 'pd_detail_v1', 'kind': kind, 'index': idx, 'raw': '0x%08X' % (raw & 0xffffffff), 'summary': summary, 'items': items, }, separators=(',', ':')) def _text_detail_payload(self, kind, idx, raw, summary, items): return summary, self._detail_payload(kind, idx, raw, summary, items) def _normalize_payload(self, result): if isinstance(result, tuple): return result return result, result def _flag_value(self, enabled, true_text='Supported', false_text='Not supported'): return '1 (%s)' % (true_text) if enabled else '0 (%s)' % (false_text) def _peak_current_items(self, field_name, mode, bit_range='B21-20'): info = PEAK_CURRENT_DETAILS.get(mode, PEAK_CURRENT_DETAILS[0]) items = [ self._detail_item('%s[%s]' % (field_name, bit_range), '%s %s' % (info['code'], info['summary'])) ] for idx, step in enumerate(info['steps'], start=1): items.append(self._detail_item('Peak Profile %d' % (idx), step)) return items def _frs_current_text(self, value): return { 0: '00b Fast Role Swap not supported', 1: '01b Default USB Port', 2: '10b 1.5A @ 5V', 3: '11b 3.0A @ 5V', }.get(value, 'Unknown') def _byte_list_from_u32(self, data): return [ data & 0xFF, (data >> 8) & 0xFF, (data >> 16) & 0xFF, (data >> 24) & 0xFF, ] def _upper_half_bytes_from_shifted_u32(self, data): return [ (data >> 16) & 0xFF, (data >> 24) & 0xFF, ] def _bytes_to_u32(self, data_bytes): value = 0 for index, byte in enumerate(data_bytes[:4]): value |= (byte & 0xFF) << (8 * index) return value def _mark_detail_as_supplemental(self, summary, detail_json, note=None): if note is None: note = self._tr('前包补全', 'Merged') try: payload = json.loads(detail_json) except Exception: return '*Merged ' + summary, detail_json marked_items = [self._detail_item('*Merged', note)] for item in payload.get('items', []): key = item.get('key', '') value = item.get('value', '') if key and key != 'Raw': marked_items.append(self._detail_item('*' + key, value)) payload['summary'] = '*Merged ' + summary payload['items'] = marked_items return payload['summary'], json.dumps(payload, separators=(',', ':')) def _pending_epr_cap_payload(self, idx, raw, role): summary = '[PDO #%d] %s' % (idx, self._tr('不完整', 'incomplete')) items = [ self._detail_item('*Pending', self._tr('本包仅收到2字节', 'Only 2 bytes received in this packet')), self._detail_item('Object', 'PDO #%d' % (idx)), self._detail_item('Role', self._role_text(role)), self._detail_item('Received Bytes', self._tr('2字节', '2 bytes')), ] return self._text_detail_payload('epr_cap_pending', idx, raw, summary, items) def _zero_fill_pdo_payload(self, idx, raw, role): summary = '[SPR_Padding] PDO #%d %s' % (idx, self._tr('零填充', 'zero filled')) items = [ self._detail_item('Object', 'PDO #%d' % (idx)), self._detail_item('Role', self._role_text(role)), self._detail_item('Padding', self._tr('未使用的SPR槽位为零填充', 'Zero filled unused SPR slot')), ] return self._text_detail_payload('pdo_padding', idx, raw, summary, items) def _present_temp_flag_text(self, value): return { 0: 'Not supported', 1: 'Normal', 2: 'Warning', 3: 'Over temperature', }.get(value, 'Reserved') def get_source_sink_cap(self, pdo, idx, is_epr=False, role='source'): t1 = (pdo >> 30) & 0x3 voltage_multiplier = 0.05 current_multiplier = 0.01 power_multiplier = 0.25 power_range = 'EPR' if is_epr else 'SPR' role_name = self._role_text(role) self.cap_mark[idx] = t1 items = [ self._detail_item('Object', 'PDO #%d' % (idx)), self._detail_item('Raw', '0x%08X' % (pdo & 0xffffffff)), self._detail_item('Power Range', power_range), self._detail_item('Role', role_name), ] meta = { 'type': t1, 'role': role, 'is_epr': is_epr, 'kind': 'unknown', } if t1 == 0: t_name = 'EPR_Fixed' if is_epr else 'Fixed' mv = ((pdo >> 10) & 0x3ff) * voltage_multiplier ma = ((pdo >> 0) & 0x3ff) * current_multiplier power = mv * ma summary = '[%s] %sV %sA (%sW)' % ( t_name, self._fmt_num(mv), self._fmt_num(ma), self._fmt_num(power)) items.extend([ self._detail_item('Supply Type[B31-30]', '%02db Fixed Supply PDO' % (t1)), self._detail_item('Voltage[B19-10]', '%s V' % (self._fmt_num(mv))), ]) if role == 'source' and not is_epr: items.extend([ self._detail_item('Maximum Current[B9-0]', '%s A' % (self._fmt_num(ma))), self._detail_item('Maximum Power', '%s W' % (self._fmt_num(power))), self._detail_item('Dual-Role Power[B29]', self._flag_value((pdo >> 29) & 0x1)), self._detail_item('USB Suspend Supported[B28]', self._flag_value((pdo >> 28) & 0x1)), self._detail_item('Unconstrained Power[B27]', self._flag_value((pdo >> 27) & 0x1)), self._detail_item('USB Communications Capable[B26]', self._flag_value((pdo >> 26) & 0x1)), self._detail_item('Dual-Role Data[B25]', self._flag_value((pdo >> 25) & 0x1)), self._detail_item('Unchunked Ext Msg[B24]', self._flag_value((pdo >> 24) & 0x1)), self._detail_item('EPR Capable[B23]', self._flag_value((pdo >> 23) & 0x1)), self._detail_item('Reserved[B22]', '%d' % ((pdo >> 22) & 0x1)), ]) items.extend(self._peak_current_items('Peak Current', (pdo >> 20) & 0x3)) elif role == 'source' and is_epr: items.extend([ self._detail_item('Maximum Current[B9-0]', '%s A' % (self._fmt_num(ma))), self._detail_item('Maximum Power', '%s W' % (self._fmt_num(power))), self._detail_item('Reserved[B29-23]', '0x%02X' % ((pdo >> 23) & 0x7F)), ]) items.extend(self._peak_current_items('Peak Current', (pdo >> 20) & 0x3)) elif role == 'sink' and not is_epr: items.extend([ self._detail_item('Operational Current[B9-0]', '%s A' % (self._fmt_num(ma))), self._detail_item('Dual-Role Power[B29]', self._flag_value((pdo >> 29) & 0x1)), self._detail_item('Higher Capability[B28]', self._flag_value((pdo >> 28) & 0x1)), self._detail_item('Unconstrained Power[B27]', self._flag_value((pdo >> 27) & 0x1)), self._detail_item('USB Communications Capable[B26]', self._flag_value((pdo >> 26) & 0x1)), self._detail_item('Dual-Role Data[B25]', self._flag_value((pdo >> 25) & 0x1)), self._detail_item('FRS Current[B24-23]', self._frs_current_text((pdo >> 23) & 0x3)), self._detail_item('Reserved[B22-20]', '%d' % ((pdo >> 20) & 0x7)), ]) elif role == 'sink' and is_epr: items.extend([ self._detail_item('Operational Current[B9-0]', '%s A' % (self._fmt_num(ma))), self._detail_item('Reserved[B29-20]', '0x%03X' % ((pdo >> 20) & 0x3FF)), ]) else: items.extend([ self._detail_item('Current[B9-0]', '%s A' % (self._fmt_num(ma))), self._detail_item('Power', '%s W' % (self._fmt_num(power))), self._detail_item('Upper Bits[B29-20]', '0x%03X' % ((pdo >> 20) & 0x3ff)), ]) self.stored_pdos[idx] = '%s %sV' % (t_name, self._fmt_num(mv)) meta['kind'] = 'fixed' elif t1 == 1: t_name = 'EPR_Battery' if is_epr else 'Battery' minv = ((pdo >> 10) & 0x3ff) * voltage_multiplier maxv = ((pdo >> 20) & 0x3ff) * voltage_multiplier mw = ((pdo >> 0) & 0x3ff) * power_multiplier summary = '[%s] %s/%sV %sW' % ( t_name, self._fmt_num(minv), self._fmt_num(maxv), self._fmt_num(mw)) items.extend([ self._detail_item('Supply Type[B31-30]', '%02db Battery PDO' % (t1)), self._detail_item('Maximum Voltage[B29-20]', '%s V' % (self._fmt_num(maxv))), self._detail_item('Minimum Voltage[B19-10]', '%s V' % (self._fmt_num(minv))), self._detail_item( 'Maximum Allowable Power[B9-0]' if role == 'source' else 'Operational Power[B9-0]', '%s W' % (self._fmt_num(mw))), ]) self.stored_pdos[idx] = '%s %s/%sV' % (t_name, self._fmt_num(minv), self._fmt_num(maxv)) meta['kind'] = 'battery' elif t1 == 2: t_name = 'EPR_Variable' if is_epr else 'Variable' minv = ((pdo >> 10) & 0x3ff) * voltage_multiplier maxv = ((pdo >> 20) & 0x3ff) * voltage_multiplier ma = ((pdo >> 0) & 0x3ff) * current_multiplier summary = '[%s] %s/%sV %sA' % ( t_name, self._fmt_num(minv), self._fmt_num(maxv), self._fmt_num(ma)) items.extend([ self._detail_item('Supply Type[B31-30]', '%02db Variable Supply PDO' % (t1)), self._detail_item('Maximum Voltage[B29-20]', '%s V' % (self._fmt_num(maxv))), self._detail_item('Minimum Voltage[B19-10]', '%s V' % (self._fmt_num(minv))), self._detail_item( 'Maximum Current[B9-0]' if role == 'source' else 'Operational Current[B9-0]', '%s A' % (self._fmt_num(ma))), ]) self.stored_pdos[idx] = '%s %s/%sV' % (t_name, self._fmt_num(minv), self._fmt_num(maxv)) meta['kind'] = 'variable' elif t1 == 3: t2 = (pdo >> 28) & 0x3 meta['apdo_type'] = t2 if t2 == 0: t_name = 'PPS' minv = ((pdo >> 8) & 0xff) * 0.1 maxv = ((pdo >> 17) & 0xff) * 0.1 ma = (pdo & 0x7f) * 0.05 limited = (pdo >> 27) & 0x1 summary = '[%s] %s/%sV %sA%s' % ( t_name, self._fmt_num(minv), self._fmt_num(maxv), self._fmt_num(ma), ' [limited]' if limited else '') items.extend([ self._detail_item('Supply Type[B31-30]', '%02db Augmented PDO' % (t1)), self._detail_item('APDO Type[B29-28]', '00b SPR PPS'), self._detail_item('PPS Power Limited[B27]', self._flag_value(limited)), self._detail_item('Reserved[B26-25]', '%d' % ((pdo >> 25) & 0x3)), self._detail_item('Maximum Voltage[B24-17]', '%s V' % (self._fmt_num(maxv))), self._detail_item('Reserved[B16]', '%d' % ((pdo >> 16) & 0x1)), self._detail_item('Minimum Voltage[B15-8]', '%s V' % (self._fmt_num(minv))), self._detail_item('Reserved[B7]', '%d' % ((pdo >> 7) & 0x1)), self._detail_item( 'Maximum Current[B6-0]' if role == 'source' else 'Required Current[B6-0]', '%s A' % (self._fmt_num(ma))), self._detail_item('Approx Power @ Vmax', '%s W' % (self._fmt_num(maxv * ma))), ]) self.stored_pdos[idx] = '%s %s/%sV' % (t_name, self._fmt_num(minv), self._fmt_num(maxv)) meta['kind'] = 'pps' elif t2 == 1: t_name = 'EPR_AVS' minv = ((pdo >> 8) & 0xff) * 0.1 maxv = ((pdo >> 17) & 0x1ff) * 0.1 pdp = (pdo & 0xff) * 1.0 current_at_max = (pdp / maxv) if maxv > 0 else 0 summary = '[%s] %s~%sV (%sW)' % ( t_name, self._fmt_num(minv), self._fmt_num(maxv), self._fmt_num(pdp)) items.extend([ self._detail_item('Supply Type[B31-30]', '%02db Augmented PDO' % (t1)), self._detail_item('APDO Type[B29-28]', '01b EPR AVS'), ]) items.extend(self._peak_current_items('Peak Current', (pdo >> 26) & 0x3, 'B27-26')) items.extend([ self._detail_item('Maximum Voltage[B25-17]', '%s V' % (self._fmt_num(maxv))), self._detail_item('Reserved[B16]', '%d' % ((pdo >> 16) & 0x1)), self._detail_item('Minimum Voltage[B15-8]', '%s V' % (self._fmt_num(minv))), self._detail_item('PDP[B7-0]', '%s W' % (self._fmt_num(pdp))), self._detail_item('Approx Current @ Vmax', '%s A' % (self._fmt_num(current_at_max))), ]) self.stored_pdos[idx] = '%s %s-%sV %sW' % ( t_name, self._fmt_num(minv), self._fmt_num(maxv), self._fmt_num(pdp)) meta['kind'] = 'epr_avs' elif t2 == 2: t_name = 'SPR_AVS' current_15v = ((pdo >> 10) & 0x3ff) * 0.01 current_20v = (pdo & 0x3ff) * 0.01 summary = '[%s] 9~20V 15V:%sA 20V:%sA' % ( t_name, self._fmt_num(current_15v), self._fmt_num(current_20v)) items.extend([ self._detail_item('Supply Type[B31-30]', '%02db Augmented PDO' % (t1)), self._detail_item('APDO Type[B29-28]', '10b SPR AVS'), ]) if role == 'source': items.extend(self._peak_current_items('Peak Current', (pdo >> 26) & 0x3, 'B27-26')) items.append(self._detail_item('Reserved[B25-20]', '%d' % ((pdo >> 20) & 0x3f))) else: items.append(self._detail_item('Reserved[B27-20]', '%d' % ((pdo >> 20) & 0xff))) items.extend([ self._detail_item('Current @ 9V-15V[B19-10]', '%s A' % (self._fmt_num(current_15v))), self._detail_item('Current @ 15V-20V[B9-0]', '%s A' % (self._fmt_num(current_20v))), ]) self.stored_pdos[idx] = '%s 9-20V' % (t_name) meta['kind'] = 'spr_avs' else: t_name = 'Reserved_APDO' summary = '[%s] [raw: %08X]' % (t_name, pdo) items.extend([ self._detail_item('Supply Type[B31-30]', '%02db Augmented PDO' % (t1)), self._detail_item('APDO Type[B29-28]', '%02db Reserved' % (t2)), ]) self.stored_pdos[idx] = '%s [raw:%08X]' % (t_name, pdo) meta['kind'] = 'reserved_apdo' else: t_name = 'Reserved' summary = '[%s] [raw: %08X]' % (t_name, pdo) self.stored_pdos[idx] = '%s [raw:%08X]' % (t_name, pdo) self.pdo_meta[idx] = meta self._remember_role_pdo(role, idx, self.stored_pdos[idx], meta) return summary, self._detail_payload('pdo', idx, pdo, summary, items) def _parse_epr_cap_pdo(self, pdo, obj_pos, role): if obj_pos <= 7 and pdo == 0: return self._zero_fill_pdo_payload(obj_pos, pdo, role) supply_type = (pdo >> 30) & 0x3 fixed_voltage = ((pdo >> 10) & 0x3ff) * 0.05 is_epr_object = obj_pos >= 8 if not is_epr_object: if supply_type == 0 and fixed_voltage > 20.0: is_epr_object = True elif supply_type == 3 and ((pdo >> 28) & 0x3) == 1: is_epr_object = True return self.get_source_sink_cap(pdo, obj_pos, is_epr=is_epr_object, role=role) def _get_epr_cap_state(self, msg_type): if msg_type not in self.epr_cap_state: self.epr_cap_state[msg_type] = { 'pending_bytes': None, 'pending_index': None, 'next_index': 1, 'msg_id': None, 'direction_signature': None, } return self.epr_cap_state[msg_type] def _current_direction_signature(self, role): return ( role, self.head_power_role(), self.head_data_role(), ) def _remember_role_pdo(self, role, idx, ref_text, meta): if role not in self.role_stored_pdos: self.role_stored_pdos[role] = {} if role not in self.role_pdo_meta: self.role_pdo_meta[role] = {} meta_copy = dict(meta) self.role_stored_pdos[role][idx] = ref_text self.role_pdo_meta[role][idx] = meta_copy PERSISTED_PDO_CACHE[role]['stored_pdos'][idx] = ref_text PERSISTED_PDO_CACHE[role]['pdo_meta'][idx] = meta_copy def _get_role_pdo_ref(self, pos, role='source'): if pos in self.role_stored_pdos.get(role, {}): return ( self.role_stored_pdos[role][pos], self.role_pdo_meta.get(role, {}).get(pos, {'kind': 'unknown'}), False, ) if pos in PERSISTED_PDO_CACHE.get(role, {}).get('stored_pdos', {}): return ( PERSISTED_PDO_CACHE[role]['stored_pdos'][pos], PERSISTED_PDO_CACHE[role]['pdo_meta'].get(pos, {'kind': 'unknown'}), True, ) return 'Unknown PDO', {'kind': 'unknown'}, False def _current_ext_chunk_payload_size(self): payload_capacity = max(self.head_count() * 4 - 2, 0) if self.req_chunk: return 0 if self.chunked: chunk_offset = self.chunk_num * 26 remaining = max(self.data_size - chunk_offset, 0) return min(remaining, payload_capacity) return min(self.data_size, payload_capacity) def _get_ext_data_bytes(self, idx): size = self.extdata_sizes[idx] if idx < len(self.extdata_sizes) else 4 data_bytes = self._byte_list_from_u32(self.extdata[idx])[:size] return data_bytes, size < 4 def _get_epr_cap_payload(self, msg_type, idx, role): state = self._get_epr_cap_state(msg_type) current_direction = self._current_direction_signature(role) if self.chunk_num == 0 and idx == 0: state['pending_bytes'] = None state['pending_index'] = None state['next_index'] = 1 state['msg_id'] = self.head_id() state['direction_signature'] = current_direction raw_bytes, is_partial = self._get_ext_data_bytes(idx) if self.chunk_num == 0: if is_partial: pending_index = state['next_index'] state['pending_bytes'] = list(raw_bytes) state['pending_index'] = pending_index return self._pending_epr_cap_payload(pending_index, self.extdata[idx], role) obj_pos = state['next_index'] state['next_index'] += 1 return self._parse_epr_cap_pdo(self.extdata[idx], obj_pos, role) if idx == 0: pending_bytes = state.get('pending_bytes') pending_index = state.get('pending_index') if pending_bytes and len(pending_bytes) == 2 and pending_index is not None and state.get('direction_signature') == current_direction: full_pdo = self._bytes_to_u32(pending_bytes + list(raw_bytes)) state['pending_bytes'] = None state['pending_index'] = None state['next_index'] = pending_index + 1 summary, detail = self._parse_epr_cap_pdo(full_pdo, pending_index, role) return self._mark_detail_as_supplemental(summary, detail) summary = self._tr('[续包] 缺少前包2字节尾部', '[Chunk continuation] missing previous 2-byte tail') items = [ self._detail_item('*Merged', self._tr('前包尾部缺失或方向变化', 'Previous packet tail not found or direction changed')), self._detail_item('Role', self._role_text(role)), self._detail_item('Chunk Number', '%d' % (self.chunk_num)), ] return self._text_detail_payload('epr_cap_error', idx + 1, self.extdata[idx], summary, items) obj_pos = state['next_index'] state['next_index'] += 1 return self._parse_epr_cap_pdo(self.extdata[idx], obj_pos, role) def get_request(self, rdo): pos = (rdo >> 28) & 0x0F if pos == 0 or pos >= 0x0E: summary = '(RDO Invalid Position %d)' % (pos) items = [ self._detail_item('Object Position[B31-28]', '%d (invalid)' % (pos)), self._detail_item('Raw', '0x%08X' % (rdo & 0xffffffff)), ] return summary, self._detail_payload('rdo', 1, rdo, summary, items) ref_pdo, meta, restored_from_cache = self._get_role_pdo_ref(pos, 'source') kind = meta.get('kind', 'unknown') give_back = (rdo >> 27) & 0x1 if ref_pdo == 'Unknown PDO' and kind == 'unknown': missing_source_cap = self._tr('未识别到SOURCE CAP包,无法解析', 'SOURCE CAP missing') summary = '(RDO %d: %s)' % (pos, missing_source_cap) items = [ self._detail_item('Object Position[B31-28]', '%d' % (pos)), self._detail_item('Referenced PDO', self._tr('未识别到SOURCE CAP包', 'SOURCE CAP missing')), self._detail_item('*Ref', self._ref_state_text('none')), self._detail_item('Raw', '0x%08X' % (rdo & 0xffffffff)), ] return summary, self._detail_payload('rdo', 1, rdo, summary, items) items = [ self._detail_item('Object Position[B31-28]', '%d' % (pos)), self._detail_item('Referenced PDO', ref_pdo), self._detail_item('*Ref', self._ref_state_text('cache' if restored_from_cache else 'live')), self._detail_item('Raw', '0x%08X' % (rdo & 0xffffffff)), self._detail_item('GiveBack[B27]', self._flag_value(give_back, 'Deprecated bit set', 'Deprecated bit clear')), self._detail_item('Capability Mismatch[B26]', self._flag_value((rdo >> 26) & 0x1)), self._detail_item('USB Communications Capable[B25]', self._flag_value((rdo >> 25) & 0x1)), self._detail_item('No USB Suspend[B24]', self._flag_value((rdo >> 24) & 0x1)), self._detail_item('Unchunked Ext Msg[B23]', self._flag_value((rdo >> 23) & 0x1)), self._detail_item('EPR Capable[B22]', self._flag_value((rdo >> 22) & 0x1)), ] if kind == 'pps': op_v = ((rdo >> 9) & 0x0fff) * 0.02 op_a = (rdo & 0x7f) * 0.05 summary = '(RDO %d: %s) %sV %sA' % ( pos, ref_pdo, self._fmt_num(op_v), self._fmt_num(op_a)) items.extend([ self._detail_item('Reserved[B21]', '%d' % ((rdo >> 21) & 0x1)), self._detail_item('Output Voltage[B20-9]', '%s V' % (self._fmt_num(op_v))), self._detail_item('Reserved[B8-7]', '%d' % ((rdo >> 7) & 0x3)), self._detail_item('Operating Current[B6-0]', '%s A' % (self._fmt_num(op_a))), ]) elif kind in ('spr_avs', 'epr_avs'): op_v = ((rdo >> 9) & 0x0fff) * 0.025 op_a = (rdo & 0x7f) * 0.05 summary = '(RDO %d: %s) %sV %sA' % ( pos, ref_pdo, self._fmt_num(op_v), self._fmt_num(op_a)) items.extend([ self._detail_item('Reserved[B21]', '%d' % ((rdo >> 21) & 0x1)), self._detail_item('Output Voltage[B20-9]', '%s V' % (self._fmt_num(op_v))), self._detail_item('Reserved[B8-7]', '%d' % ((rdo >> 7) & 0x3)), self._detail_item('Operating Current[B6-0]', '%s A' % (self._fmt_num(op_a))), ]) elif kind == 'battery': op_w = ((rdo >> 10) & 0x3ff) * 0.25 max_w = (rdo & 0x3ff) * 0.25 summary = '(RDO %d: %s) op %sW max %sW' % ( pos, ref_pdo, self._fmt_num(op_w), self._fmt_num(max_w)) items.extend([ self._detail_item('Reserved[B21-20]', '%d' % ((rdo >> 20) & 0x3)), self._detail_item('Operating Power[B19-10]', '%s W' % (self._fmt_num(op_w))), self._detail_item('Maximum Operating Power[B9-0]', '%s W' % (self._fmt_num(max_w))), ]) else: op_a = ((rdo >> 10) & 0x3ff) * 0.01 max_a = (rdo & 0x3ff) * 0.01 summary = '(RDO %d: %s) op %sA max %sA' % ( pos, ref_pdo, self._fmt_num(op_a), self._fmt_num(max_a)) items.extend([ self._detail_item('Reserved[B21-20]', '%d' % ((rdo >> 20) & 0x3)), self._detail_item('Operating Current[B19-10]', '%s A' % (self._fmt_num(op_a))), self._detail_item('Maximum Operating Current[B9-0]', '%s A' % (self._fmt_num(max_a))), ]) return summary, self._detail_payload('rdo', 1, rdo, summary, items) def get_battery_status(self, data): capacity = (data >> 16) & 0xFFFF battery_info = (data >> 8) & 0xFF invalid_ref = battery_info & 0x1 present = (battery_info >> 1) & 0x1 charge_state = (battery_info >> 2) & 0x3 charge_text = { 0: 'Charging', 1: 'Discharging', 2: 'Idle', 3: 'Reserved', }.get(charge_state, 'Reserved') capacity_text = 'Unknown' if capacity == 0xFFFF else '%s Wh' % (self._fmt_num(capacity * 0.1)) summary = 'Capacity:%s Present:%s Status:%s' % ( capacity_text, 'Y' if present else 'N', charge_text) items = [ self._detail_item('Battery Present Capacity[B31-16]', capacity_text), self._detail_item('Invalid Battery Ref[B8]', self._flag_value(invalid_ref, 'Invalid', 'Valid')), self._detail_item('Battery Present[B9]', self._flag_value(present)), self._detail_item('Charging Status[B11-10]', charge_text), self._detail_item('Reserved Info[B15-12]', '0x%X' % ((battery_info >> 4) & 0xF)), self._detail_item('Reserved[B7-0]', '0x%02X' % (data & 0xFF)), ] return self._text_detail_payload('battery_status', 1, data, summary, items) def get_alert(self, data): alert_bits = (data >> 24) & 0xFF fixed = (data >> 20) & 0xF hot = (data >> 16) & 0xF enabled = [] names = { 1: 'Battery Status Change', 2: 'OCP', 3: 'OTP', 4: 'Operating Condition Change', 5: 'Source Input Change', 6: 'OVP', 7: 'Extended Alert', } for bit, name in names.items(): if alert_bits & (1 << bit): enabled.append(name) summary = 'Alerts:' + (', '.join(enabled) if enabled else ' None') items = [ self._detail_item('Type of Alert[B31-24]', '0x%02X' % (alert_bits)), self._detail_item('Battery Status Change[B25]', self._flag_value(alert_bits & (1 << 1))), self._detail_item('OCP Event[B26]', self._flag_value(alert_bits & (1 << 2))), self._detail_item('OTP Event[B27]', self._flag_value(alert_bits & (1 << 3))), self._detail_item('Operating Condition Change[B28]', self._flag_value(alert_bits & (1 << 4))), self._detail_item('Source Input Change[B29]', self._flag_value(alert_bits & (1 << 5))), self._detail_item('OVP Event[B30]', self._flag_value(alert_bits & (1 << 6))), self._detail_item('Extended Alert[B31]', self._flag_value(alert_bits & (1 << 7))), self._detail_item('Fixed Batteries[B23-20]', '0x%X' % (fixed)), self._detail_item('Hot Swappable Batteries[B19-16]', '0x%X' % (hot)), self._detail_item('Reserved[B15-0]', '0x%04X' % (data & 0xFFFF)), ] return self._text_detail_payload('alert', 1, data, summary, items) def get_enter_usb(self, data): usb_mode = (data >> 28) & 0x7 cable_speed = (data >> 21) & 0x7 cable_type = (data >> 19) & 0x3 cable_current = (data >> 17) & 0x3 mode_text = { 0: 'USB 2.0', 1: 'USB 3.2', 2: 'USB4', }.get(usb_mode, 'Reserved') speed_text = { 0: 'USB 2.0 only', 1: 'USB 3.2 Gen1', 2: 'USB 3.2 Gen2 / USB4 Gen2', 3: 'USB4 Gen3', 4: 'USB4 Gen4', }.get(cable_speed, 'Reserved') type_text = { 0: 'Passive', 1: 'Active Re-timer', 2: 'Active Re-driver', 3: 'Optically Isolated', }.get(cable_type, 'Reserved') current_text = { 0: 'VBUS not supported', 1: 'Reserved', 2: '3A', 3: '5A', }.get(cable_current, 'Reserved') summary = '%s %s %s' % (mode_text, speed_text, type_text) items = [ self._detail_item('USB Mode[B30-28]', mode_text), self._detail_item('USB4 DRD[B26]', self._flag_value((data >> 26) & 0x1)), self._detail_item('USB3 DRD[B25]', self._flag_value((data >> 25) & 0x1)), self._detail_item('Cable Speed[B23-21]', speed_text), self._detail_item('Cable Type[B20-19]', type_text), self._detail_item('Cable Current[B18-17]', current_text), self._detail_item('PCIe Support[B16]', self._flag_value((data >> 16) & 0x1)), self._detail_item('DP Support[B15]', self._flag_value((data >> 15) & 0x1)), self._detail_item('TBT Support[B14]', self._flag_value((data >> 14) & 0x1)), self._detail_item('Host Present[B13]', self._flag_value((data >> 13) & 0x1)), self._detail_item('Reserved', '0x%08X' % (data & 0x90001FFF)), ] return self._text_detail_payload('enter_usb', 1, data, summary, items) def get_source_info(self, data): port_type = (data >> 31) & 0x1 type_text = 'Guaranteed Capability Port' if port_type else 'Managed Capability Port' max_pdp = (data >> 16) & 0xFF present_pdp = (data >> 8) & 0xFF reported_pdp = data & 0xFF summary = '%s Max:%dW Present:%dW Reported:%dW' % (type_text, max_pdp, present_pdp, reported_pdp) items = [ self._detail_item('Port Type[B31]', type_text), self._detail_item('Reserved[B30-24]', '0x%02X' % ((data >> 24) & 0x7F)), self._detail_item('Port Maximum PDP[B23-16]', '%d W' % (max_pdp)), self._detail_item('Port Present PDP[B15-8]', '%d W' % (present_pdp)), self._detail_item('Port Reported PDP[B7-0]', '%d W' % (reported_pdp)), ] return self._text_detail_payload('source_info', 1, data, summary, items) def get_revision(self, data): rev_major = (data >> 28) & 0xF rev_minor = (data >> 24) & 0xF ver_major = (data >> 20) & 0xF ver_minor = (data >> 16) & 0xF summary = 'Revision %d.%d Version %d.%d' % (rev_major, rev_minor, ver_major, ver_minor) items = [ self._detail_item('Revision.major[B31-28]', '%d' % (rev_major)), self._detail_item('Revision.minor[B27-24]', '%d' % (rev_minor)), self._detail_item('Version.major[B23-20]', '%d' % (ver_major)), self._detail_item('Version.minor[B19-16]', '%d' % (ver_minor)), self._detail_item('Reserved[B15-0]', '0x%04X' % (data & 0xFFFF)), ] return self._text_detail_payload('revision', 1, data, summary, items) def parse_status_extended(self, extdata_val, idx): data_bytes = self._byte_list_from_u32(extdata_val) if idx == 0: internal_temp = data_bytes[0] present_input = data_bytes[1] present_battery = data_bytes[2] power_state = data_bytes[3] temp_text = 'Not supported' if internal_temp == 0 else ('<2C' if internal_temp == 1 else '%dC' % (internal_temp)) power_state_text = { 0: 'Status not supported', 1: 'S0', 2: 'Modern Standby', 3: 'S3', 4: 'S4', 5: 'S5', 6: 'G3', }.get(power_state & 0x7, 'Reserved') indicator_text = { 0: 'Off LED', 1: 'On LED', 2: 'Blinking LED', 3: 'Breathing LED', }.get((power_state >> 3) & 0x7, 'Reserved') summary = 'Temp:%s Input:0x%02X State:%s' % (temp_text, present_input, power_state_text) items = [ self._detail_item('Internal Temp[Byte0]', temp_text), self._detail_item('External Power[Byte1.Bit1]', self._flag_value((present_input >> 1) & 0x1)), self._detail_item('External Power Type[Byte1.Bit2]', 'AC' if ((present_input >> 2) & 0x1) else 'DC'), self._detail_item('Internal Battery[Byte1.Bit3]', self._flag_value((present_input >> 3) & 0x1)), self._detail_item('Internal Non-Battery[Byte1.Bit4]', self._flag_value((present_input >> 4) & 0x1)), self._detail_item('Present Battery Input[Byte2]', '0x%02X' % (present_battery)), self._detail_item('New Power State[Byte3.B2-0]', power_state_text), self._detail_item('State Indicator[Byte3.B5-3]', indicator_text), ] return self._text_detail_payload('status_sop', idx + 1, extdata_val, summary, items) event_flags = data_bytes[0] temp_status = data_bytes[1] & 0x3 power_status = data_bytes[2] summary = 'Events:0x%02X Temp:%s PowerStatus:%d' % ( event_flags, ['Normal', 'Warning', 'Over temperature', 'Reserved'][temp_status], power_status) items = [ self._detail_item('Event Flags[Byte4]', '0x%02X' % (event_flags)), self._detail_item('Temperature Status[Byte5.B1-0]', ['Normal', 'Warning', 'Over temperature', 'Reserved'][temp_status]), self._detail_item('Power Status[Byte6]', '%d' % (power_status)), self._detail_item('Reserved', '0x%02X' % (data_bytes[3])), ] return self._text_detail_payload('status_sop', idx + 1, extdata_val, summary, items) def parse_pps_status_extended(self, extdata_val, idx): if idx != 0: return self._text_detail_payload('pps_status', idx + 1, extdata_val, 'Reserved', [self._detail_item('Raw', '0x%08X' % (extdata_val))]) output_voltage = extdata_val & 0xFFFF output_current = (extdata_val >> 16) & 0xFF flags = (extdata_val >> 24) & 0xFF voltage_text = 'Not supported' if output_voltage == 0xFFFF else '%s V' % (self._fmt_num(output_voltage * 0.02)) current_text = 'Not supported' if output_current == 0xFF else '%s A' % (self._fmt_num(output_current * 0.05)) summary = 'V:%s I:%s PTF:%s' % ( voltage_text, current_text, self._present_temp_flag_text((flags >> 1) & 0x3)) items = [ self._detail_item('Output Voltage[Byte1-0]', voltage_text), self._detail_item('Output Current[Byte2]', current_text), self._detail_item('PTF[Byte3.B2-1]', self._present_temp_flag_text((flags >> 1) & 0x3)), self._detail_item('OMF[Byte3.B3]', 'Current Limit' if ((flags >> 3) & 0x1) else 'Constant Voltage'), self._detail_item('Reserved Flags', '0x%X' % (flags & 0xF1)), ] return self._text_detail_payload('pps_status', 1, extdata_val, summary, items) def parse_country_codes_extended(self, extdata_val, idx): data_bytes = self._byte_list_from_u32(extdata_val) if idx == 0: length = data_bytes[0] codes = [] for i in range(2, 4, 2): if len(codes) >= length: break codes.append(chr(data_bytes[i]) + chr(data_bytes[i + 1])) summary = 'Count:%d %s' % (length, ' '.join(codes)) items = [ self._detail_item('Length[Byte0]', '%d' % (length)), self._detail_item('Reserved[Byte1]', '0x%02X' % (data_bytes[1])), ] for code_idx, code in enumerate(codes, start=1): items.append(self._detail_item('Country %d' % (code_idx), code)) return self._text_detail_payload('country_codes', idx + 1, extdata_val, summary, items) codes = [] for i in range(0, 4, 2): if data_bytes[i] == 0 or data_bytes[i + 1] == 0: continue codes.append(chr(data_bytes[i]) + chr(data_bytes[i + 1])) summary = ' '.join(codes) if codes else 'No codes' items = [self._detail_item('Country %d' % (idx * 2 + offset + 1), code) for offset, code in enumerate(codes)] if not items: items = [self._detail_item('Raw', '0x%08X' % (extdata_val))] return self._text_detail_payload('country_codes', idx + 1, extdata_val, summary, items) def parse_sink_capabilities_extended(self, extdata_val, idx): data_bytes = self._byte_list_from_u32(extdata_val) def pdp_text(value): return '%d W' % (value) if idx == 0: vid = data_bytes[0] | (data_bytes[1] << 8) pid = data_bytes[2] | (data_bytes[3] << 8) summary = 'VID:0x%04X PID:0x%04X' % (vid, pid) items = [ self._detail_item('Vendor ID[Byte1-0]', '0x%04X' % (vid)), self._detail_item('Product ID[Byte3-2]', '0x%04X' % (pid)), ] return self._text_detail_payload('sink_cap_ext', idx + 1, extdata_val, summary, items) if idx == 1: xid = extdata_val & 0xFFFFFFFF summary = 'XID:0x%08X' % (xid) items = [self._detail_item('XID[Byte7-4]', '0x%08X' % (xid))] return self._text_detail_payload('sink_cap_ext', idx + 1, extdata_val, summary, items) if idx == 2: skedb_ver = data_bytes[2] load_step = data_bytes[3] & 0x03 load_step_text = { 0: '150 mA/us (default)', 1: '500 mA/us', 2: 'Reserved', 3: 'Reserved', }.get(load_step, 'Reserved') version_text = 'Version 1.0' if skedb_ver == 1 else 'Reserved' summary = 'FW:%d HW:%d SKEDB:%d Load:%s' % ( data_bytes[0], data_bytes[1], skedb_ver, load_step_text) items = [ self._detail_item('FW Version[Byte8]', '%d' % (data_bytes[0])), self._detail_item('HW Version[Byte9]', '%d' % (data_bytes[1])), self._detail_item('SKEDB Version[Byte10]', '%s (%d)' % (version_text, skedb_ver)), self._detail_item('Load Step[Byte11.B1-0]', load_step_text), self._detail_item('Reserved[Byte11.B7-2]', '0x%02X' % ((data_bytes[3] >> 2) & 0x3F)), ] return self._text_detail_payload('sink_cap_ext', idx + 1, extdata_val, summary, items) if idx == 3: sink_load = data_bytes[0] | (data_bytes[1] << 8) overload = sink_load & 0x1F overload_period = (sink_load >> 5) & 0x3F duty_cycle = (sink_load >> 11) & 0x0F can_tolerate_droop = (sink_load >> 15) & 0x01 compliance = data_bytes[2] touch_temp = data_bytes[3] touch_temp_text = { 0: 'Not applicable', 1: 'IEC 60950-1', 2: 'IEC 62368-1 TS1', 3: 'IEC 62368-1 TS2', }.get(touch_temp, 'Reserved') overload_percent = min(overload, 25) * 10 overload_period_text = '%d ms' % (overload_period * 20) if overload else 'Ignored when overload=0' duty_cycle_text = '%d %%' % (duty_cycle * 5) if overload else 'Ignored when overload=0' summary = 'Load:%d%% Touch:%s' % (overload_percent, touch_temp_text) items = [ self._detail_item('Sink Load Characteristics[Byte13-12]', '0x%04X' % (sink_load)), self._detail_item('Percent Overload[Byte12-13.B4-0]', '%d %%' % (overload_percent)), self._detail_item('Overload Period[Byte12-13.B10-5]', overload_period_text), self._detail_item('Duty Cycle[Byte12-13.B14-11]', duty_cycle_text), self._detail_item('Can Tolerate VBUS Droop[Byte13.B15]', self._flag_value(can_tolerate_droop)), self._detail_item('Compliance Requires LPS[Byte14.B0]', self._flag_value(compliance & 0x01)), self._detail_item('Compliance Requires PS1[Byte14.B1]', self._flag_value((compliance >> 1) & 0x01)), self._detail_item('Compliance Requires PS2[Byte14.B2]', self._flag_value((compliance >> 2) & 0x01)), self._detail_item('Compliance Reserved[Byte14.B7-3]', '0x%02X' % ((compliance >> 3) & 0x1F)), self._detail_item('Touch Temp[Byte15]', '%s (%d)' % (touch_temp_text, touch_temp)), ] return self._text_detail_payload('sink_cap_ext', idx + 1, extdata_val, summary, items) if idx == 4: hot_swappable = (data_bytes[0] >> 4) & 0x0F fixed_batteries = data_bytes[0] & 0x0F sink_modes = data_bytes[1] summary = 'Batt:%d fixed/%d hot PPS:%s AVS:%s SPR:%d/%dW' % ( fixed_batteries, hot_swappable, 'Y' if (sink_modes & 0x01) else 'N', 'Y' if (sink_modes & 0x20) else 'N', data_bytes[2], data_bytes[3]) items = [ self._detail_item('Hot Swappable Battery Slots[Byte16.B7-4]', '%d' % (hot_swappable)), self._detail_item('Fixed Batteries[Byte16.B3-0]', '%d' % (fixed_batteries)), self._detail_item('PPS Charging Supported[Byte17.B0]', self._flag_value(sink_modes & 0x01)), self._detail_item('VBUS Powered[Byte17.B1]', self._flag_value((sink_modes >> 1) & 0x01)), self._detail_item('AC Supply Powered[Byte17.B2]', self._flag_value((sink_modes >> 2) & 0x01)), self._detail_item('Battery Powered[Byte17.B3]', self._flag_value((sink_modes >> 3) & 0x01)), self._detail_item('Battery Essentially Unlimited[Byte17.B4]', self._flag_value((sink_modes >> 4) & 0x01)), self._detail_item('AVS Supported[Byte17.B5]', self._flag_value((sink_modes >> 5) & 0x01)), self._detail_item('Sink Modes Reserved[Byte17.B7-6]', '0x%X' % ((sink_modes >> 6) & 0x03)), self._detail_item('SPR Sink Minimum PDP[Byte18]', pdp_text(data_bytes[2] & 0x7F)), self._detail_item('SPR Sink Operational PDP[Byte19]', pdp_text(data_bytes[3] & 0x7F)), ] return self._text_detail_payload('sink_cap_ext', idx + 1, extdata_val, summary, items) if idx == 5: summary = 'SPR Max:%dW EPR Min/Op/Max:%d/%d/%dW' % ( data_bytes[0] & 0x7F, data_bytes[1], data_bytes[2], data_bytes[3]) items = [ self._detail_item('SPR Sink Maximum PDP[Byte20]', pdp_text(data_bytes[0] & 0x7F)), self._detail_item('SPR Sink Maximum Reserved[Byte20.B7]', '%d' % ((data_bytes[0] >> 7) & 0x01)), self._detail_item('EPR Sink Minimum PDP[Byte21]', pdp_text(data_bytes[1])), self._detail_item('EPR Sink Operational PDP[Byte22]', pdp_text(data_bytes[2])), self._detail_item('EPR Sink Maximum PDP[Byte23]', pdp_text(data_bytes[3])), ] return self._text_detail_payload('sink_cap_ext', idx + 1, extdata_val, summary, items) return self._text_detail_payload( 'sink_cap_ext', idx + 1, extdata_val, 'Reserved', [self._detail_item('Raw', '0x%08X' % (extdata_val & 0xFFFFFFFF))]) def _parse_peak_current(self, mode): return PEAK_CURRENT_DETAILS.get(mode, PEAK_CURRENT_DETAILS[0])['summary'] def get_vdm(self, idx, data): if idx == 0: # VDM header vid = data >> 16 struct = data & (1 << 15) txt = 'VDM' items = [ self._detail_item('Object', 'VDO #%d' % (idx + 1)), self._detail_item('SVID[B31-16]', '0x%04X' % (vid)), self._detail_item('Structured[B15]', self._flag_value(struct)), ] if struct: # Structured VDM cmd = data & 0x1f src = data & (1 << 5) ack = (data >> 6) & 3 pos = (data >> 8) & 7 ver_major = (data >> 13) & 3 ver_minor = (data >> 11) & 3 major_text = { 0: 'Version 1.0 (Deprecated)', 1: 'Version 2.x', 2: 'Reserved', 3: 'Reserved', }.get(ver_major, 'Reserved') if cmd <= 15: minor_text = { 0: 'Version 2.0', 1: 'Version 2.1', 2: 'Reserved', 3: 'Reserved', }.get(ver_minor, 'Reserved') else: minor_text = 'Defined by SVID (%02db)' % (ver_minor) txt = VDM_ACK[ack] + ' ' txt += VDM_CMDS[cmd] if cmd in VDM_CMDS else 'cmd?' txt += ' pos %d' % (pos) if pos else ' ' items.extend([ self._detail_item('Structured VDM Version Major[B14-13]', '%s (%02db)' % (major_text, ver_major)), self._detail_item('Structured VDM Version Minor[B12-11]', '%s' % (minor_text)), self._detail_item('Object Position[B10-8]', '%d' % (pos)), self._detail_item('Command Type[B7-6]', '%s' % (VDM_ACK[ack])), self._detail_item('Reserved[B5]', '%d' % (1 if src else 0)), self._detail_item('Command[B4-0]', '%s (%d)' % (VDM_CMDS[cmd] if cmd in VDM_CMDS else 'Unknown', cmd)), ]) else: # Unstructured VDM txt = 'unstruct [%04x]' % (data & 0x7fff) items.append(self._detail_item('Payload[B14-0]', '0x%04X' % (data & 0x7fff))) txt += ' SVID:%04x' % (vid) return self._text_detail_payload('vdm_header', idx + 1, data, txt, items) else: # VDM payload txt = 'VDO:%08x' % (data) items = [ self._detail_item('Object', 'VDO #%d' % (idx + 1)), self._detail_item('Payload', '0x%08X' % (data & 0xffffffff)), ] return self._text_detail_payload('vdo', idx + 1, data, txt, items) def get_bist(self, idx, data): mode = data >> 28 counter = data & 0xffff mode_name = BIST_MODES[mode] if mode in BIST_MODES else 'INVALID' if mode == 2: mode_name = 'Counter[= %d]' % (counter) # TODO: Check all 0 bits are 0 / emit warnings. summary = 'mode %s' % (mode_name) if idx == 0 else 'invalid BRO' items = [ self._detail_item('Object', 'BIST Data Object #%d' % (idx + 1)), self._detail_item('Mode[B31-28]', '%s (%d)' % (mode_name, mode)), self._detail_item('Reserved[B27-16]', '0x%03X' % ((data >> 16) & 0xFFF)), self._detail_item('Counter[B15-0]', '%d' % (counter)), ] return self._text_detail_payload('bist', idx + 1, data, summary, items) def get_hex(self, idx, data): txt = '%02x' % ((data >> 8)&0xFF) txt += ' %02x' % ((data >> 0)&0xFF) txt += ' %02x' % ((data >> 24)&0xFF) txt += ' %02x' % ((data >> 16)&0xFF) return txt def get_hex16(self, data): txt = '%02x' % ((data >> 8)&0xFF) txt += ' %02x' % ((data >> 0)&0xFF) return txt def get_epr_mode(self, idx, data): action = (data >> 24) & 0xFF data_field = (data >> 16) & 0xFF txt = EPR_MODE_ACTION.get(action, 'Unknown') if action == 4: txt += ' ' txt += EPR_MODE_DATA.get(data_field, 'Unknown cause') items = [ self._detail_item('Object', 'EPR Mode Data Object #%d' % (idx + 1)), self._detail_item('Action[B31-24]', '%s (%d)' % (EPR_MODE_ACTION.get(action, 'Unknown'), action)), self._detail_item('Data[B23-16]', '%s (%d)' % (EPR_MODE_DATA.get(data_field, 'Unknown cause'), data_field)), self._detail_item('Reserved[B15-0]', '0x%04X' % (data & 0xFFFF)), ] return self._text_detail_payload('epr_mode', idx + 1, data, txt, items) def get_ext_control_type(self, idx, data): data_bytes, is_partial = self._get_ext_data_bytes(idx) type_byte = data_bytes[0] if len(data_bytes) >= 1 else 0 data_byte = data_bytes[1] if len(data_bytes) >= 2 else 0 if type_byte not in EXT_CONTROL_MSG_TYPES: txt = f'Unknown Extended Control Type {type_byte}' else: txt = EXT_CONTROL_MSG_TYPES[type_byte] items = [ self._detail_item('Object', 'Extended Control Data Object #%d' % (idx + 1)), self._detail_item('Control Type[Byte0]', '%s (%d)' % (EXT_CONTROL_MSG_TYPES.get(type_byte, 'Unknown'), type_byte)), self._detail_item('Data Byte[Byte1]', '0x%02X' % (data_byte)), ] if is_partial: items.append(self._detail_item('*Pending', 'ECDB is truncated in this packet')) return self._text_detail_payload('extended_control', idx + 1, data, txt, items) def put_ext_head(self, s0, s1): txt = 'Chunked: {} '. format(self.chunked) txt += 'Chunk Num: {} '.format(self.chunk_num) txt += 'Req Chunk: {} '.format(self.req_chunk) txt += 'Data Size: {} '.format(self.data_size) self.putx(s0, s1, [11, [txt, txt]]) def putpayload(self, s0, s1, idx): # Extended Message Processing if self.head_ext() == 1: if idx >= len(self.extdata): self.putwarn(f'Index {idx} out of range for extdata','IDX!') return # TODO: to decode Extended Message Types t = self.head_type() if self.extdata[idx] == 0 and t not in (15, 17, 18): return txt = f'[{idx+1}] ' detail = txt extdata_val = self.extdata[idx] if idx < len(self.extdata) else 0 if t == 16: # Extended_Control payload_text, detail = self._normalize_payload(self.get_ext_control_type(idx, extdata_val)) txt += payload_text elif t == 17 or t == 18: # EPR_Source_Capabilities/EPR_Sink_Capabilities role = 'source' if t == 17 else 'sink' payload_text, detail = self._normalize_payload( self._get_epr_cap_payload(t, idx, role)) txt += payload_text elif t == 1: #Source_Capabilities_Extended payload_text, detail = self._normalize_payload( self.parse_source_capabilities_extended(extdata_val,idx)) txt += payload_text elif t == 2: # Status payload_text, detail = self._normalize_payload( self.parse_status_extended(extdata_val, idx)) txt += payload_text elif t == 12: # PPS_Status payload_text, detail = self._normalize_payload( self.parse_pps_status_extended(extdata_val, idx)) txt += payload_text elif t == 14: # Country_Codes payload_text, detail = self._normalize_payload( self.parse_country_codes_extended(extdata_val, idx)) txt += payload_text elif t == 15: # Sink_Capabilities_Extended payload_text, detail = self._normalize_payload( self.parse_sink_capabilities_extended(extdata_val, idx)) txt += payload_text else: txt +=f"Unhandled ext type: {t}" detail = txt self.putx(s0, s1, [11, [txt, detail]]) self.text += ' - ' + txt return # Non-Extended Message processing else: if self.head_count() == 0: # Control Messages t = self.head_type() txt = '['+str(idx+1)+'] ' detail = txt else: # Data Message t = self.head_type() txt = '['+str(idx+1)+'] ' detail = txt data_val = self.data[idx] if t == 2: #Request payload_text, detail = self._normalize_payload(self.get_request(data_val)) txt += payload_text elif t == 9: #EPR_Request if idx == 0: payload_text, detail = self._normalize_payload(self.get_request(data_val)) txt += payload_text else: payload_text, detail = self._normalize_payload( self.get_source_sink_cap(data_val, idx+1)) txt += payload_text elif t == 1 or t == 4: # SOURCE CAP / SINK CAP role = 'source' if t == 1 else 'sink' payload_text, detail = self._normalize_payload( self.get_source_sink_cap(data_val, idx+1, role=role)) txt += payload_text elif t == 15: # VDM payload_text, detail = self._normalize_payload(self.get_vdm(idx, data_val)) txt += payload_text elif t == 3: # BIST payload_text, detail = self._normalize_payload(self.get_bist(idx, data_val)) txt += payload_text elif t == 10: # EPR_Mode payload_text, detail = self._normalize_payload(self.get_epr_mode(idx, data_val)) txt += payload_text elif t == 5: # Battery_Status payload_text, detail = self._normalize_payload(self.get_battery_status(data_val)) txt += payload_text elif t == 6: # Alert payload_text, detail = self._normalize_payload(self.get_alert(data_val)) txt += payload_text elif t == 8: # Enter_USB payload_text, detail = self._normalize_payload(self.get_enter_usb(data_val)) txt += payload_text elif t == 11: # Source_Info payload_text, detail = self._normalize_payload(self.get_source_info(data_val)) txt += payload_text elif t == 12: # Revision payload_text, detail = self._normalize_payload(self.get_revision(data_val)) txt += payload_text else: detail = txt self.putx(s0, s1, [11, [txt, detail]]) self.text += ' - ' + txt def parse_source_capabilities_extended(self, extdata_val, idx): fixed_batteries = (extdata_val >> 16) & 0x0F # 解析 Source_Capabilities_Extended 消息 if idx == 0: vid = (extdata_val >> 0) & 0xFFFF pid = (extdata_val >> 16) & 0xFFFF summary = f"VID:0x{vid:04X} PID:0x{pid:04X}" items = [ self._detail_item('Vendor ID[B15-0]', '0x%04X' % (vid)), self._detail_item('Product ID[B31-16]', '0x%04X' % (pid)), ] return self._text_detail_payload('source_cap_ext', idx + 1, extdata_val, summary, items) elif idx == 1: summary = f"XID:0x{extdata_val:08X}" items = [ self._detail_item('XID[B31-0]', '0x%08X' % (extdata_val)), ] return self._text_detail_payload('source_cap_ext', idx + 1, extdata_val, summary, items) elif idx == 2: # 字节8: FW Version (固件版本) fw_ver = (extdata_val >> 0) & 0xFF # 字节9: HW Version (硬件版本) hw_ver = (extdata_val >> 8) & 0xFF # 字节10: Voltage Regulation (电压调节) vreg = (extdata_val >> 16) & 0xFF # 字节11: Holdup Time (保持时间) holdup_time = (extdata_val >> 24) & 0xFF # 解析负载阶跃和IOC load = "150mA" if (vreg & 0b11) == 0 else "500mA" if (vreg & 0b11) == 1 else "RSVD" ioc = "25%" if (vreg >> 2) & 0b1 == 0 else "90%" # Holdup Time解释(0x00表示不支持) holdup_str = "Not supported" if holdup_time == 0 else f"{holdup_time}ms" summary = f"FW:{fw_ver} HW:{hw_ver} Load:{load} IOC:{ioc} Hold:{holdup_str}" items = [ self._detail_item('FW Version[7-0]', '%d' % (fw_ver)), self._detail_item('HW Version[15-8]', '%d' % (hw_ver)), self._detail_item('Load Step[17-16]', load), self._detail_item('IOC Percent[18]', ioc), self._detail_item('Reserved Vreg[23-19]', '0x%02X' % ((vreg >> 3) & 0x1F)), self._detail_item('Holdup Time[31-24]', holdup_str), ] return self._text_detail_payload('source_cap_ext', idx + 1, extdata_val, summary, items) elif idx == 3: # 字节12: Compliance (合规性) compliance = (extdata_val >> 0) & 0xFF # 字节13: Touch Current (接触电流) touch_current = (extdata_val >> 8) & 0xFF # 字节14-15: Peak Current1 (峰值电流1) peak_current = (extdata_val >> 16) & 0xFFFF # 解析Compliance lps = "Y" if compliance & 0b1 else "N" ps1 = "Y" if compliance & 0b10 else "N" ps2 = "Y" if compliance & 0b100 else "N" # 解析Touch Current low_current = "Y" if touch_current & 0b1 else "N" ground_pin = "Y" if touch_current & 0b10 else "N" protective_earth = "Y" if touch_current & 0b100 else "N" # 解析Peak Current1 overload = min((peak_current & 0x1F) * 10, 250) # 0-250% period = ((peak_current >> 5) & 0x3F) * 20 # 单位ms duty = ((peak_current >> 11) & 0xF) * 5 # 占空比5-75% droop = "Y" if (peak_current >> 15) & 0b1 else "N" summary = ( f"Comp:LPS={lps} PS1={ps1} PS2={ps2} | " f"Touch:Low={low_current} Ground={ground_pin} PE={protective_earth} | " f"Peak1:OL={overload}%/{period}ms/Duty={duty}% Droop={droop}" ) items = [ self._detail_item('Compliance LPS[0]', lps), self._detail_item('Compliance PS1[1]', ps1), self._detail_item('Compliance PS2[2]', ps2), self._detail_item('Touch Low Current[8]', low_current), self._detail_item('Touch Ground Pin[9]', ground_pin), self._detail_item('Touch PE[10]', protective_earth), self._detail_item('PeakCurrent1 Overload', '%d%%' % (overload)), self._detail_item('PeakCurrent1 Period', '%d ms' % (period)), self._detail_item('PeakCurrent1 Duty', '%d%%' % (duty)), self._detail_item('PeakCurrent1 Vdroop', droop), ] return self._text_detail_payload('source_cap_ext', idx + 1, extdata_val, summary, items) elif idx == 4: # 字节16-17: Peak Current2 (16位) pc2 = (extdata_val >> 0) & 0xFFFF # 字节18-19: Peak Current3 (16位) pc3 = (extdata_val >> 16) & 0xFFFF # 解析Peak Current2 ol2 = min((pc2 & 0x1F) * 10, 250) # 过载百分比 (0-250%) per2 = ((pc2 >> 5) & 0x3F) * 20 # 过载周期 (单位ms) duty2 = ((pc2 >> 11) & 0xF) * 5 # 占空比 (5-75%) droop2 = "Y" if (pc2 >> 15) & 0b1 else "N" # 电压跌落 # 解析Peak Current3 ol3 = min((pc3 & 0x1F) * 10, 250) per3 = ((pc3 >> 5) & 0x3F) * 20 duty3 = ((pc3 >> 11) & 0xF) * 5 droop3 = "Y" if (pc3 >> 15) & 0b1 else "N" summary = ( f"Peak2:OL={ol2}%/{per2}ms/Duty={duty2}% Droop={droop2} | " f"Peak3:OL={ol3}%/{per3}ms/Duty={duty3}% Droop={droop3}" ) items = [ self._detail_item('PeakCurrent2 Overload', '%d%%' % (ol2)), self._detail_item('PeakCurrent2 Period', '%d ms' % (per2)), self._detail_item('PeakCurrent2 Duty', '%d%%' % (duty2)), self._detail_item('PeakCurrent2 Vdroop', droop2), self._detail_item('PeakCurrent3 Overload', '%d%%' % (ol3)), self._detail_item('PeakCurrent3 Period', '%d ms' % (per3)), self._detail_item('PeakCurrent3 Duty', '%d%%' % (duty3)), self._detail_item('PeakCurrent3 Vdroop', droop3), ] return self._text_detail_payload('source_cap_ext', idx + 1, extdata_val, summary, items) elif idx == 5: # 字节20: Touch Temp (温度标准) touch_temp = (extdata_val >> 0) & 0xFF # 字节21: Source Inputs (电源输入配置) source_inputs = (extdata_val >> 8) & 0xFF # 字节22: Battery Slots (电池槽数量) batteries = (extdata_val >> 16) & 0xFF # 字节23: SPR Rating (标准功率范围评级) spr_rating = (extdata_val >> 24) & 0x7F # 注意bit7保留 # 解析Touch Temp temp_map = {0: "60950", 1: "62368-1", 2: "62368-2"} temp_str = temp_map.get(touch_temp, "RSVD") # 解析Source Inputs ext_supply = "Y" if source_inputs & 0b1 else "N" ext_constrained = "" if source_inputs & 0b1: ext_constrained = "Const" if not (source_inputs & 0b10) else "Unconst" int_battery = "Y" if source_inputs & 0b100 else "N" # 解析Battery Slots hot_swappable = (batteries >> 4) & 0xF # 高4位 fixed_batteries = batteries & 0xF # 低4位 summary = ( f"Temp:{temp_str} | " f"Ext:{ext_supply}{'('+ext_constrained+')' if ext_constrained else ''} " f"Batt:{int_battery} | " f"Slots:H{hot_swappable}/F{fixed_batteries} | " f"SPR:{spr_rating}" ) items = [ self._detail_item('Touch Temp Standard[7-0]', temp_str), self._detail_item('External Supply[8]', ext_supply), self._detail_item('External Supply Type[9]', ext_constrained if ext_constrained else 'N/A'), self._detail_item('Internal Battery[10]', int_battery), self._detail_item('Hot Swappable Batteries[19-16]', '%d' % (hot_swappable)), self._detail_item('Fixed Batteries[23-20]', '%d' % (fixed_batteries)), self._detail_item('SPR PDP Rating[30-24]', '%d W' % (spr_rating)), ] return self._text_detail_payload('source_cap_ext', idx + 1, extdata_val, summary, items) elif idx == 6: # 字节24: EPR Source PDP Rating (扩展功率范围评级) epr_rating = (extdata_val >> 0) & 0x7F # 注意bit7保留 # 字节25-27: 保留或其他扩展字段(根据协议补充) summary = f"EPR:{epr_rating}" items = [ self._detail_item('EPR PDP Rating[6-0]', '%d W' % (epr_rating)), self._detail_item('Reserved[31-7]', '0x%06X' % ((extdata_val >> 7) & 0x1FFFFFF)), ] return self._text_detail_payload('source_cap_ext', idx + 1, extdata_val, summary, items) summary = "N/A" items = [ self._detail_item('Raw', '0x%08X' % (extdata_val)), ] return self._text_detail_payload('source_cap_ext', idx + 1, extdata_val, summary, items) def puthead(self): t = self.head_type() if self.head_ext() == 1: shortm = EXT_TYPES[t] if t in EXT_TYPES else 'EXT???' elif self.head_count() == 0: shortm = CTRL_TYPES[t] if t in CTRL_TYPES else 'CTR???' else: shortm = DATA_TYPES[t] if t in DATA_TYPES else 'DAT???' if self.packet_sop == 'SOP': role = 'SRC' if self.head_power_role() else 'SNK' self.last_sop_power_role = role if self.head_data_role() != self.head_power_role(): role += '/DFP' if self.head_data_role() else '/UFP' elif self.packet_sop in ('SOP1', 'SOP2', 'SOP1 Debug', 'SOP2 Debug'): role = 'Plug' if self.head_power_role() else self.last_sop_power_role else: role = self.last_sop_power_role longm = '(r{:d}) {:s}[{:d}]: {:s}'.format(self.head_rev(), role, self.head_id(), shortm) ann_type = self._ann_type_map.get(shortm.upper(), 0) if ann_type == 0: ann_type = 13 if self.head_power_role() else 14 self.putx(0, -1, [ann_type, [longm, shortm]]) self.text += longm def head_ext(self): if self.head_rev() == 3: return (self.head >> 15) & 1 return 0 def head_id(self): return (self.head >> 9) & 7 def head_power_role(self): return (self.head >> 8) & 1 def head_data_role(self): return (self.head >> 5) & 1 def head_rev(self): return ((self.head >> 6) & 3) + 1 def head_type(self): if self.head_rev() == 3: return self.head & 0x1F return self.head & 0xF def head_count(self): return (self.head >> 12) & 7 def exthead_size(self): return (self.exthead) & 511 def putx(self, s0, s1, data): self.put(self.edges[s0], self.edges[s1], self.out_ann, data) def putwarn(self, longm, shortm): self.putx(0, -1, [8, [longm, shortm]]) def compute_crc32(self): if self.head_ext() == 1: bdata = struct.pack(' 0xF: self.putwarn(f'Invalid symbol: {symbol}', 'SYM!') return ERROR_CODE # TODO: Check bad symbols. val |= (symbol << (4 * i)) self.idx += SHORT_BITS return val def get_word(self, rec=True): ERROR_CODE = 0x0BAD0BAD lo = self.get_short(rec) hi = self.get_short(rec) if lo == 0x0BAD or hi == 0x0BAD: self.putwarn('Failed to read word', 'WORD!') return ERROR_CODE return (hi << 16) | lo def find_corrupted_sop(self, k): # Start of packet are valid even if they have only 3 correct symbols # out of 4. for seq in SOP_SEQUENCES: if [k[i] == seq[i] for i in range(len(k))].count(True) >= 3: return START_OF_PACKETS[seq] return None def scan_eop(self): for i in range(len(self.bits) - 19): k = (self.get_sym(i, rec=False), self.get_sym(i+5, rec=False), self.get_sym(i+10, rec=False), self.get_sym(i+15, rec=False)) sym = START_OF_PACKETS.get(k, None) if not sym: sym = self.find_corrupted_sop(k) # We have an interesting symbol sequence. if sym: # Annotate the preamble. self.putx(0, i, [1, ['Preamble', '...']]) # Annotate each symbol. self.rec_sym(i, k[0]) self.rec_sym(i+5, k[1]) self.rec_sym(i+10, k[2]) self.rec_sym(i+15, k[3]) if sym == 'Hard Reset': # self.text += 'HRST' self.text = 'Receive Hard Reset' return -1 # Hard reset elif sym == 'Cable Reset': self.text = 'Receive Cable Reset' # self.text += 'CRST' return -1 # Cable reset else: self.packet_sop = sym self.putx(i, i+20, [2, [sym, 'S']]) return i+20 self.putx(0, len(self.bits), [1, ['Junk???', 'XXX']]) self.text += 'Junk???' self.putwarn('No start of packet found', 'XXX') return -1 # No Start Of Packet def __init__(self): self.reset() def reset(self): self.samplerate = None self.idx = 0 self.packet_seq = 0 self.previous = 0 self.startsample = None self.bits = [] self.edges = [] self.bad = [] self.half_one = False self.start_one = 0 self.stored_pdos = {} self.pdo_meta = {} self.role_stored_pdos = { 'source': dict(PERSISTED_PDO_CACHE['source']['stored_pdos']), 'sink': dict(PERSISTED_PDO_CACHE['sink']['stored_pdos']), } self.role_pdo_meta = { 'source': dict(PERSISTED_PDO_CACHE['source']['pdo_meta']), 'sink': dict(PERSISTED_PDO_CACHE['sink']['pdo_meta']), } self.epr_cap_state = {} self.packet_sop = None self.last_sop_power_role = 'SRC' # self.cap_mark = [0, 0, 0, 0, 0, 0, 0, 0] self.cap_mark = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] # 支持pos 1-13 def metadata(self, key, value): if key == srd.SRD_CONF_SAMPLERATE: self.samplerate = value # 0 is 2 UI, space larger than 1.5x 0 is definitely wrong. self.maxbit = self.us2samples(3 * UI_US) ## 9600 波特率下,1 比特 = 104.16us # Duration threshold between half 1 and 0. self.threshold = self.us2samples(THRESHOLD_US) def start(self): self.out_ann = self.register(srd.OUTPUT_ANN) self.out_binary = self.register(srd.OUTPUT_BINARY) self.out_bitrate = self.register( srd.OUTPUT_META, meta=(int, 'Bitrate', 'Bitrate during the packet') ) self._ann_type_map = { name.upper(): idx for idx, (name, desc) in enumerate(self.annotations) } def us2samples(self, us): return int(us * self.samplerate / 1000000) def _legacy_decode_packet(self): self.data = [] self.extcrc = [] self.extdata = [] self.idx = 0 self.text = '' if len(self.edges) < 50: # self.putwarn('Packet too short', 'SHORT!') return # Not a real PD packet self.packet_seq += 1 tstamp = float(self.startsample) / self.samplerate self.text += '#%-4d (%8.6fms): ' % (self.packet_seq, tstamp*1000) self.idx = self.scan_eop() if self.idx < 0: # Full text trace of the issue. self.putx(0, self.idx, [12, [self.text, '...']]) return # No real packet: ABORT. # Packet header self.head = self.get_short() self.putx(self.idx-20, self.idx, [3, ['H:%04x' % (self.head), 'HD']]) self.puthead() # Packet header if self.head_ext() == 1: # Extend Header self.exthead = self.get_short() self.chunked = bool((self.exthead >> 15) & 0x01) self.chunk_num = (self.exthead >> 11) & 0x0F self.req_chunk = bool((self.exthead >> 10) & 0x01) self.data_size = self.exthead & 0x01FF self.putx(self.idx-20, self.idx, [3, ['EH:%04X' % (self.exthead), 'EHD']]) self.idx -= 20 # Save Data For Calculate CRC for i in range(self.head_count()): self.extcrc.append(self.get_word(False)) # Rewind to the beginning of the chunk payload bytes. # Each data object contributes 4 payload/padding bytes after the # extended header has been removed above. self.idx -= self.head_count() * 40 # Extended Message if self.chunk_num == 0: # First Part:source和sink两端支持不分块 for i in range(self.head_count()): if i == self.head_count() - 1: Data = self.get_short() self.extdata.append(Data<<16) # 将Data数据存入extdata列表中 self.putx(self.idx-20, self.idx, [4, ['[%d]%04x' % (i, Data), 'D%d' % (i)]]) self.putpayload(self.idx-20, self.idx, i) else: Data = self.get_word() self.extdata.append(Data) self.putx(self.idx-40, self.idx, [4, ['[%d]%08x' % (i, Data), 'D%d' % (i)]]) self.putpayload(self.idx-40, self.idx, i) else: for i in range(self.head_count()): if i == 0: Data = self.get_short() self.extdata.append(Data<<16) self.putx(self.idx-20, self.idx, [4, ['[%d]%04x' % (i, Data), 'D%d' % (i)]]) self.putpayload(self.idx-20, self.idx, i) else: Data = self.get_word() self.extdata.append(Data) self.putx(self.idx-40, self.idx, [4, ['[%d]%08x' % (i, Data), 'D%d' % (i)]]) self.putpayload(self.idx-40, self.idx, i) # Control Message or Data Message else: for i in range(self.head_count()): Data_1 = self.get_word() self.data.append(Data_1) self.putx(self.idx-40, self.idx, [4, ['[%d]%08x' % (i, Data_1), 'D%d' % (i)]]) self.putpayload(self.idx-40, self.idx, i) # CRC check self.crc = self.get_word() ccrc = self.compute_crc32() if self.crc != ccrc: self.putwarn('Bad CRC %08x != %08x' % (self.crc, ccrc), 'CRC!') self.putx(self.idx-40, self.idx, [5, ['CRC:%08x' % (self.crc), 'CRC']]) # End of Packet if len(self.bits) >= self.idx + 5 and self.get_sym(self.idx) == EOP: self.putx(self.idx, self.idx + 5, [6, ['EOP', 'E']]) self.idx += 5 else: self.putwarn('No EOP', 'EOP!') # Full text trace if self.options['fulltext'] == 'yes': self.putx(0, self.idx, [12, [self.text, '...']]) # Meta data for bitrate ss, es = self.edges[0], self.edges[-1] bitrate = self.samplerate*len(self.bits) / float(es - ss) self.put(es, ss, self.out_bitrate, int(bitrate)) # Raw binary data (BMC decoded) self.put(es, ss, self.out_binary, [0, bytes(self.bits)]) def decode_packet(self): self.data = [] self.extcrc = [] self.extdata = [] self.extdata_sizes = [] self.idx = 0 self.text = '' if len(self.edges) < 50: return self.packet_seq += 1 tstamp = float(self.startsample) / self.samplerate self.text += '#%-4d (%8.6fms): ' % (self.packet_seq, tstamp * 1000) self.idx = self.scan_eop() if self.idx < 0: self.putx(0, self.idx, [12, [self.text, '...']]) return self.head = self.get_short() self.putx(self.idx - 20, self.idx, [3, ['H:%04x' % (self.head), 'HD']]) self.puthead() if self.head_ext() == 1: self.exthead = self.get_short() self.chunked = bool((self.exthead >> 15) & 0x01) self.chunk_num = (self.exthead >> 11) & 0x0F self.req_chunk = bool((self.exthead >> 10) & 0x01) self.data_size = self.exthead & 0x01FF self.putx(self.idx - 20, self.idx, [3, ['EH:%04X' % (self.exthead), 'EHD']]) payload_capacity = max(self.head_count() * 4 - 2, 0) payload_size = self._current_ext_chunk_payload_size() chunk_offset = self.chunk_num * 26 if self.chunked else 0 payload_bytes = [] payload_ranges = [] for _ in range(payload_size): byte_start = self.idx payload_bytes.append(self.get_byte()) payload_ranges.append((byte_start, self.idx)) pad_bytes = max(payload_capacity - payload_size, 0) for _ in range(pad_bytes): self.get_byte() crc_bytes = [ self.exthead & 0xFF, (self.exthead >> 8) & 0xFF, ] + payload_bytes + ([0] * pad_bytes) for word_index in range(0, len(crc_bytes), 4): self.extcrc.append(self._bytes_to_u32(crc_bytes[word_index:word_index + 4])) group_sizes = [] if payload_size > 0: first_group_size = 4 - (chunk_offset % 4) if first_group_size == 0: first_group_size = 4 first_group_size = min(first_group_size, payload_size) consumed = 0 while consumed < payload_size: group_size = first_group_size if consumed == 0 else min(4, payload_size - consumed) group_sizes.append(group_size) consumed += group_size byte_index = 0 for i, group_size in enumerate(group_sizes): group_bytes = payload_bytes[byte_index:byte_index + group_size] start_idx = payload_ranges[byte_index][0] end_idx = payload_ranges[byte_index + group_size - 1][1] data_value = self._bytes_to_u32(group_bytes) self.extdata.append(data_value) self.extdata_sizes.append(group_size) if group_size == 4: label = '[%d]%08x' % (i, data_value) else: label_mask = (1 << (group_size * 8)) - 1 label = '[%d]%0*x' % (i, group_size * 2, data_value & label_mask) self.putx(start_idx, end_idx, [4, [label, 'D%d' % (i)]]) self.putpayload(start_idx, end_idx, i) byte_index += group_size else: for i in range(self.head_count()): data_word = self.get_word() self.data.append(data_word) self.putx(self.idx - 40, self.idx, [4, ['[%d]%08x' % (i, data_word), 'D%d' % (i)]]) self.putpayload(self.idx - 40, self.idx, i) self.crc = self.get_word() ccrc = self.compute_crc32() if self.crc != ccrc: self.putwarn('Bad CRC %08x != %08x' % (self.crc, ccrc), 'CRC!') self.putx(self.idx - 40, self.idx, [5, ['CRC:%08x' % (self.crc), 'CRC']]) if len(self.bits) >= self.idx + 5 and self.get_sym(self.idx) == EOP: self.putx(self.idx, self.idx + 5, [6, ['EOP', 'E']]) self.idx += 5 else: self.putwarn('No EOP', 'EOP!') if self.options['fulltext'] == 'yes': self.putx(0, self.idx, [12, [self.text, '...']]) ss, es = self.edges[0], self.edges[-1] bitrate = self.samplerate * len(self.bits) / float(es - ss) self.put(es, ss, self.out_bitrate, int(bitrate)) self.put(es, ss, self.out_binary, [0, bytes(self.bits)]) def decode(self): if not self.samplerate: raise SamplerateError('Cannot decode without samplerate.') while True: # This decoder only uses edge timing from the active CC signal. # Waiting on an unbound optional channel can crash some bundled # libsigrokdecode/atk_decoder builds on macOS ARM64. pins = self.wait([{0: 'e'}, {'skip': int(self.samplerate/1e3)}]) # First sample of the packet, just record the start date. if not self.startsample: self.startsample = self.samplenum self.previous = self.samplenum continue diff = self.samplenum - self.previous # Large idle: use it as the end of packet. if diff > self.maxbit: # The last edge of the packet. self.edges.append(self.previous) # Export the packet. self.decode_packet() # Reset for next packet. self.startsample = self.samplenum self.bits = [] self.edges = [] self.bad = [] self.half_one = False self.start_one = 0 else: # Add the bit to the packet. is_zero = diff > self.threshold if is_zero and not self.half_one: self.bits.append(0) self.edges.append(self.previous) elif not is_zero and self.half_one: self.bits.append(1) self.edges.append(self.start_one) self.half_one = False elif not is_zero and not self.half_one: self.half_one = True self.start_one = self.previous else: # Invalid BMC sequence self.bad.append((self.start_one, self.previous)) # TODO: Try to recover. self.bits.append(0) self.edges.append(self.previous) self.half_one = False self.previous = self.samplenum