## ## Copyright (C) 2020 Sven Soltermann ## Copyright (C) 2023 ALIENTEK(正点原子) <39035605@qq.com> ## ## This program is free software; you can redistribute it and/or modify ## it under the terms of the GNU General Public License as published by ## the Free Software Foundation; either version 2 of the License, or ## (at your option) any later version. ## ## This program is distributed in the hope that it will be useful, ## but WITHOUT ANY WARRANTY; without even the implied warranty of ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the ## GNU General Public License for more details. ## ## You should have received a copy of the GNU General Public License ## along with this program; if not, see . ## import sigrokdecode as srd import struct import sys import codecs import ctypes class pcap_udp_pkt(): # GSM TAP h = b'' # layer_2 h += b'\x00\x00\x00\x00\x00\x00' # destination_mac h += b'\x00\x00\x00\x00\x00\x00' # source_mac h += b'\x08\x00' # layer_3_protocol # layer_3 h += b'\x45' # version h += b'\x00' # DiffServField h += b'\xFF\xFF' # total_length h += b'\x2B\x0D' # Identification h += b'\x40\x00' # Flags h += b'\x40' # TTL h += b'\x11' # layer_4_protocol h += b'\x00\x00' # header_checksum h += b'\x7F\x00\x00\x01' # source_ip h += b'\x7F\x00\x00\x01' # dest_ip # layer_4 h += b'\xcc\x46' # source_port h += b'\x12\x79' # dest_port h += b'\x00\x00' # datagram_length h += b'\x00\x00' # checksum def __init__(self, ts, data): self.header = bytearray(pcap_udp_pkt.h) self.data = b'' self.set_timestamp(ts) self.set_data(data) def set_timestamp(self, ts): self.timestamp = ts def set_data(self, data): self.data = list(b'\x02\x04\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + bytes(data)); self.header[16:18] = struct.pack('>H', len(self.data) + 28) self.header[38:40] = struct.pack('>H', len(self.data) + 8) def packet(self): return bytes(self.header) + bytes(self.data) def record_header(self): # See https://wiki.wireshark.org/Development/LibpcapFileFormat. (secs, usecs) = self.timestamp h = struct.pack(' 0 ): self.sampleOverflowCount += 1 self.lastSamplePositive = True if (self.peeked_byte != None): byte =self.peeked_byte self.peeked_byte = None self.peeked_samplenum = -1 return byte; self.wait({1: 'f'}) self.sleep_cycles() return self.read_byte_no_wait(); def peek_byte(self): self.wait({1: 'f'}) self.sleep_cycles() self.peeked_samplenum = self.samplenum self.peeked_byte = self.read_byte_no_wait(); return self.peeked_byte; def read_byte_no_wait(self): self.bits = [] self.ss = self.samplenum for x in range(10): pins = self.wait({'skip': 0}) #if (x != 0 and x < 9): #self.log("read_bit: " + str(pins[1]) + " sample: "+ str(self.samplenum)) self.bits.append(pins[1]) if (self.sample_as_clock): self.wait({'skip': self.clock_skip - 4}) else: for c in range(int(self.clock_skip)): self.wait({0: 'r'}) self.es = self.samplenum if (self.bits.count(1) % 2 != 0): self.log(self.samplenum, "CHKSUM ERROR: ", pins[1], "bits: ", self.bits) self.put(self.ss, self.samplenum, self.out_ann, [0, ["CHKSUM ERROR bits={bits}".format(bits=self.bits)]]) byte = self.get_bytes(self.bits[1:9]) self.log(self.samplenum, self.bits[1:9], " : ", "0x{:02x}".format(byte)) self.put(self.ss, self.samplenum, self.out_ann, [1, [hex(byte)]]) return byte def sleep_cycles(self): if (self.sample_as_clock): self.wait({'skip': int(self.clock_skip / 3)}) else: for c in range(int(self.clock_skip / 3)): self.wait({0: 'r'}) def handle_atr(self, pins): atr_start = self.samplenum; self.log("START ATR:", self.samplenum); self.state = 'ATR' tA = [] tB = [] tC = [] tD = [] historicalBytes = [] self.ATR = [] if (self.peeked_byte != None): atr_start = self.peeked_samplenum byte = self.read_byte() else: byte = self.read_first_byte() self.ATR.append(byte) t0 = self.read_byte() self.ATR.append(t0) firstT0 = t0 self.hasT0 = False self.hasT1 = False self.hasT15 = False while (firstT0 & 0b11110000): if (firstT0 & 0b00010000): byte = self.read_byte() tA.append(byte) self.ATR.append(byte) if (firstT0 & 0b00100000): byte = self.read_byte() tB.append(byte) self.ATR.append(byte) if (firstT0 & 0b01000000): byte = self.read_byte() tC.append(byte) self.ATR.append(byte) if (firstT0 & 0b10000000): byte = self.read_byte() if ((byte & 0x0F) == 0): self.hasT0 = True elif ((byte & 0x0F) == 1): self.hasT1 = True elif ((byte & 0x0F) == 15): self.hasT15 = True else: self.log("Invalid Protocol in ATR: ", "T=",(byte & 0x0F)) self.put(atr_start, self.samplenum, self.out_ann, [0, ["Invalid Protocol in ATR T={protocol}".format(protocol=(byte & 0x0F))]]) tD.append(byte) self.ATR.append(byte) firstT0 = byte self.log("TD("+str(len(tD))+"): ", hex(firstT0), "T=",(byte & 0x0F)) else: firstT0 = 0; for _ in range(0, t0 & 0x0F): byte = self.read_byte() self.ATR.append(byte) if (self.hasT1 == False and self.hasT0 == False): self.hasT0 = True if (self.hasT1 == True or self.hasT15): byte = self.read_byte() self.ATR.append(byte) xor = 0 for i in range(1, len(self.ATR)): xor = xor ^ self.ATR[i] if (xor != 0): self.put(atr_start, self.samplenum, self.out_ann, [0, ["Invalid TCK in ATR, got={tck:02x} expected={xor:02x}".format(tck=byte,xor=xor)]]) self.log("Invalid TCK in ATR", hex(byte), hex(xor)) self.put(atr_start, self.samplenum, self.out_ann, [2, ["ATR", "ATR={atr}".format(atr=codecs.encode(bytes(self.ATR), 'hex'))]]) self.put(atr_start, self.samplenum, self.out_python, [0, self.ATR]) self.log("ENDATR", codecs.encode(bytes(self.ATR), 'hex')) self.state = 'DATA' if (self.options['protocol'] == "T=0"): self.hasT0 = True self.hasT1 = False elif (self.options['protocol'] == "T=1"): self.hasT0 = False self.hasT1 = True def t1_parse_block(self, es): packet = [] lrc = 0; nad = self.read_byte() lrc = lrc ^ nad self.put(es, self.samplenum, self.out_ann, [1, "NAD:" + hex(nad)]) sad = nad & 0x70 dad = nad & 0x07 self.log("T=1 NAD=", hex(nad), "SAD=", hex(sad), "DAD=", hex(dad)) pcb = self.read_byte() lrc = lrc ^ pcb isSBlock = False isRBlock = False isIBlock = False if (pcb & 0b11000000 == 0b11000000): # S-Block isSBlock = True self.put(es, self.samplenum, self.out_ann, [3, "PCB S " + hex(pcb)]) self.log("PCB S-Block", hex(pcb & 0b00111111)) elif (pcb & 0b10000000 == 0b10000000): # R-Block isRBlock = True self.put(es, self.samplenum, self.out_ann, [3, "PCB R " + hex(pcb)]) self.log("PCB R-Block", hex(pcb & 0b00111111)) else: # I-Block isIBlock = True self.put(es, self.samplenum, self.out_ann, [3, "PCB I " + hex(pcb)]) self.log("PCB I-Block", hex(pcb)) bLen = self.read_byte() lrc = lrc ^ bLen if (bLen > 0): for _ in range(bLen): byte = self.read_byte() lrc = lrc ^ byte if (isIBlock): packet.append(byte) # CRC to implement bLrc = self.read_byte() lrc = lrc ^ bLrc if (lrc != 0): self.put(es, self.samplenum, self.out_ann, [0, ["Invalid checksum on T=1 block, , got={got:02x} expected={expected:02x}".format(got=lrc,expected=bLrc)]]) self.log("Invalid checksum on T=1 block", hex(lrc), hex(bLrc)) self.log("block_content", codecs.encode(bytes(packet), 'hex')) if (isIBlock): self.put(es, self.samplenum, self.out_ann, [6, ["I-Block", "I-Block len={len} isMultiBlock={multi}".format(len=bLen,multi=(pcb & 0b00100000 > 0))]]) if (isRBlock): self.put(es, self.samplenum, self.out_ann, [7, ["R-Block", "R-Block flag={flag:02x}".format(flag=pcb & 0b00111111)]]) if (isSBlock): self.put(es, self.samplenum, self.out_ann, [8, ["S-Block", "S-Block flag={flag:02x}".format(flag=pcb & 0b00111111)]]) if (isIBlock and pcb & 0b00100000): # m-flag self.log("T=1 Multiblock flag", hex(pcb)) while (True): isIBlock2,packet2 = self.t1_parse_block(self.samplenum) if (isIBlock2): packet = packet + packet2 break return isIBlock,packet; def handle_pps(self): lrc = 0 ss = self.peeked_samplenum pps = self.read_byte() pps0 = self.read_byte() pps1 = 0; pps2 = 0; pps3 = 0 if (pps0 & 0b00010000): pps1 = self.read_byte() lrc = lrc ^ pps1 if (pps0 & 0b00100000): pps2 = self.read_byte() lrc = lrc ^ pps2 if (pps0 & 0b01000000): pps3 = self.read_byte() lrc = lrc ^ pps3 pck = self.read_byte() lrc = lrc ^ pps ^ pps0 ^ pck if (lrc != 0): self.put(ss, self.samplenum, self.out_ann, [0, ["INVALID Checksum on PPS Request, got={got:02x} expected={expected:02x}".format(got=pck,expected=(lrc ^ pps ^ pps0))]]) self.log("INVALID Checksum on PPS Request", hex(lrc)) r_lrc = 0 r_pps = self.read_byte() r_pps1 = 0; r_pps2 = 0; r_pps3 = 0 if (r_pps != 0xFF): self.put(ss, self.samplenum, self.out_ann, [0, ["PPS Request not confirmed"]]) self.log("PPS Request not confirmed", r_pps) r_pps0 = self.read_byte() if (r_pps0 & 0b00010000): r_pps1 = self.read_byte() r_lrc = r_lrc ^ r_pps1 if (r_pps0 & 0b00100000): r_pps2 = self.read_byte() r_lrc = r_lrc ^ r_pps2 if (r_pps0 & 0b01000000): r_pps3 = self.read_byte() r_lrc = r_lrc ^ r_pps3 r_pck = self.read_byte() r_lrc = r_lrc ^ r_pps ^ r_pps0 ^ r_pck if (r_lrc != 0): self.put(ss, self.samplenum, self.out_ann, [0, ["INVALID Checksum on PPS Response, got={got:02x} expected={expected:02x}".format(got=r_pck,expected=(r_lrc ^ r_pps ^ r_pps0))]]) self.log("INVALID Checksum on PPS Response", hex(r_lrc)) if (pps0 == r_pps0 and pps1 == r_pps1 and pps2 == r_pps2 and pps3 == r_pps3): if (self.detect_clock or self.sample_as_clock): tmp_fi = self.clock_rate[int(pps1 >> 4)] tmp_di = self.baud_rate[int(pps1 & 0x0F)] tmp_clock_skip = int(tmp_fi / tmp_di) self.log("Received PPS change: FI", tmp_fi, "DI", tmp_di, "clock_skip", tmp_clock_skip) self.clock_skip = int(tmp_clock_skip * self.detected_clock_skip / 372) self.fi = tmp_fi self.di = tmp_di self.log("PPS Success new settings (calculated): FI", self.fi, "DI", self.di, "clock_skip", self.clock_skip) else: self.fi = self.clock_rate[int(pps1 >> 4)] self.di = self.baud_rate[int(pps1 & 0x0F)] self.clock_skip = int(self.fi / self.di) self.log("PPS Success new settings: FI", self.fi, "DI", self.di, "clock_skip", self.clock_skip) else: self.log("INVALID PPS. Request & Response not matching.", hex(r_lrc)) self.put(ss, self.samplenum, self.out_ann, [0, ["INVALID PPS. Request & Response not matching"]]) self.put(ss, self.samplenum, self.out_ann, [3, ["PPS", "PPS DI={di} FI={fi} clock_skip={clock_skip}".format(di=self.di,fi=self.fi,clock_skip=self.clock_skip)]]) def decode(self): self.write_pcap_header(); while True: # State machine. if self.state == 'FIND START': self.wait({1: 'h'}) self.handle_atr(self.wait({1: 'f'})) elif self.state == 'DATA': packet = []; firstByte = self.peek_byte(); if (firstByte == 0xFF): # PPS Request self.handle_pps(); continue; elif (firstByte == 0x3b): # Probably ATR self.handle_atr(self.wait({'skip': 0})) continue; es = self.peeked_samplenum; if (self.hasT0): bClass = self.read_byte() packet.append(bClass); # class bIns = self.read_byte(); packet.append(bIns); # instruction packet.append(self.read_byte()); # param1 packet.append(self.read_byte()); # param2 dataLen = self.read_byte(); self.log("DATALEN: ", dataLen) packet.append(dataLen); # param3 procedureByte = self.read_byte(); if (procedureByte == bIns): for _ in range(0,dataLen): packet.append(self.read_byte()); # payload packet.append(self.read_byte()); # status0 packet.append(self.read_byte()); # status1 elif (procedureByte == 0x60): packet.append(procedureByte); # status0 packet.append(self.read_byte()); # status1 elif (procedureByte & 0xF0 == 0x60 or procedureByte & 0xF0 == 0x90): packet.append(procedureByte); # status0 packet.append(self.read_byte()); # status1 else: self.put(es, self.samplenum, self.out_ann, [0, ["INVALID Procedure Byte"]]) self.log("INVALID Procedure Byte", hex(procedureByte)) self.put(es, self.samplenum, self.out_ann, [4, ["T=0"]]) self.put(es, self.samplenum, self.out_ann, [9, ["APDU", "APDU cls={cls:02x} ins={ins:02x}".format(cls=bClass,ins=bIns), "APDU cls={cls:02x} ins={ins:02x} p1={p1:02x} p2={p2:02x} p3={p3:02x} len={len} status={sw1:02x}{sw2:02x}".format(cls=bClass,ins=bIns,p1=packet[2],p2=packet[3],p3=packet[4],len=dataLen,sw1=packet[-2],sw2=packet[-1])]]) elif (self.hasT1): isIBlock,packet = self.t1_parse_block(es) if (isIBlock): while (True): isIBlock,packet2 = self.t1_parse_block(es) if (isIBlock): packet = packet + packet2 break self.put(es, self.samplenum, self.out_ann, [4, ["T=1", "T=1 (reassembled)"]]) if (len(packet) >= 8): self.put(es, self.samplenum, self.out_ann, [9, ["APDU", "APDU cls={cls:02x} ins={ins:02x}".format(cls=packet[0],ins=packet[1]), "APDU cls={cls:02x} ins={ins:02x} p1={p1:02x} p2={p2:02x} p3={p3:02x} len={len} status={sw1:02x}{sw2:02x}".format(cls=packet[0],ins=packet[1],p1=packet[2],p2=packet[3],p3=packet[4],len=len(packet) - 7,sw1=packet[-2],sw2=packet[-1])]]) if (len(packet) > 0): ts = self.ts_from_samplenum(es) pkt = pcap_udp_pkt(ts, packet) self.put(es, self.samplenum, self.out_binary, [0, pkt.record_header()]) self.put(es, self.samplenum, self.out_binary, [0, pkt.packet()]) self.log("PACKETEND", codecs.encode(bytes(packet), 'hex')) else: break;