From a01da8057d48fe69885ec37116c20482df6b2aa9 Mon Sep 17 00:00:00 2001 From: dreamsourcelabTAI Date: Tue, 21 Feb 2023 09:52:07 +0800 Subject: [PATCH] A new protocol decoder for china fast power --- libsigrokdecode4DSL/decoders/ufcs/__init__.py | 28 + libsigrokdecode4DSL/decoders/ufcs/pd.py | 530 ++++++++++++++++++ 2 files changed, 558 insertions(+) create mode 100644 libsigrokdecode4DSL/decoders/ufcs/__init__.py create mode 100644 libsigrokdecode4DSL/decoders/ufcs/pd.py diff --git a/libsigrokdecode4DSL/decoders/ufcs/__init__.py b/libsigrokdecode4DSL/decoders/ufcs/__init__.py new file mode 100644 index 00000000..ceece92d --- /dev/null +++ b/libsigrokdecode4DSL/decoders/ufcs/__init__.py @@ -0,0 +1,28 @@ +## +## This file is part of the libsigrokdecode project. +## +## Copyright (C) 2014 Bart de Waal +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program; if not, see . +## + +''' +This decoder stacks on top of the 'uart' PD and decodes UFCS, +a protocol of Universal fast charging specification for mobile devices in China. + + +by edison ren 2023.1.25 +''' + +from .pd import Decoder diff --git a/libsigrokdecode4DSL/decoders/ufcs/pd.py b/libsigrokdecode4DSL/decoders/ufcs/pd.py new file mode 100644 index 00000000..eab5b904 --- /dev/null +++ b/libsigrokdecode4DSL/decoders/ufcs/pd.py @@ -0,0 +1,530 @@ +## +## This file is part of the libsigrokdecode project. +## +## Copyright (C) 2023 edison ren +## ref from uart & usb_power_delivery +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program; if not, see . +## + +import sigrokdecode as srd + +# Control Message type +CTRL_TYPES = { + 0: 'PING', + 1: 'ACK', + 2: 'NCK', + 3: 'ACCEPT', + 4: 'SOFT RESET', + 5: 'POWER READY', + 6: 'GET OUTPUT CAP', + 7: 'GET SOURCE INFO', + 8: 'GET SINK INFO', + 9: 'GET CABLE INFO', + 10: 'GET DEVICE INFO', + 11: 'GET ERROR INFO', + 12: 'DETECT CABLE INFO', + 13: 'START CABLE DETECT', + 14: 'END CABLE DETECT', + 15: 'EXIT UFCS MODE', +} + +# Data message type +DATA_TYPES = { + 1: 'OUTPUT CAP', + 2: 'REQUEST', + 3: 'SOURCE INFO', + 4: 'SINK INFO', + 5: 'CABLE INFO', + 6: 'DEVICE INFO', + 7: 'ERROR INFO', + 8: 'CONFIG WATCHDOG', + 9: 'REFUSE', + 10: 'Verify_Request', + 11: 'Verify_Response', + 255: 'Test Request' +} + +class Decoder(srd.Decoder): + api_version = 3 + id = 'ufcs' + name = 'UFCS' + longname = 'Universal Fast Charging Specification' + desc = 'Universal fast charging specification for mobile devices. T/TAF 083-2021. Coding by edison ren 2023.1.25 ' + license = 'gplv2+' + inputs = ['uart'] + outputs = [] + tags = ['PC/Mobile'] + + options = ( + {'id': 'fulltext', 'desc': 'Full text decoding of packets', + 'default': 'no', 'values': ('yes', 'no')}, + ) + annotations = ( + ('type', 'Packet Type'), + ('training', 'Training'), + ('header', 'Header'), + ('data', 'Data'), + ('crc', 'Checksum'), + ('warnings', 'Warnings'), + ('src', 'Source Message'), + ('snk', 'Sink Message'), + ('payload', 'Payload'), + ('text', 'Plain text'), + ('cable', 'Cable Message'), + ('reserved', 'Reserved'), + + ) + annotation_rows = ( + ('phase', 'Parts', (1, 2, 3, 4,)), + ('payload', 'Payload', (8,)), + ('type', 'Type', (0, 6, 7, 10, 11)), + ('warnings', 'Warnings', (5,)), + ('text', 'Full text', (9,)), + ) + + def __init__(self): + self.reset() + + def reset(self): + self.ss_block = None # start pos with samles for show + self.es_block = None # end pos with samples for show + self.dataidx = 0 # data pos with u8 in packet + self.datapkt = [] # data buffer for packet, 4:128 + self.head = [] # header 2*u8 + self.bytepos = [] # (s, e) pos of byte (start, end) + self.text = '' # info string + self.plen = 0; + + def start(self): + self.out_ann = self.register(srd.OUTPUT_ANN) + + def head_id(self): + return (self.head[0] >> 1) & 15 + + def head_power_role(self): + return (self.head[0] >> 5) & 7 + + def head_rev(self): + return (self.head[1] >> 3) & 31 + + def head_type(self): + return self.head[2] + + def data_len(self): + # data_msg count = data[3], ctrl_msg count = 0 + return self.head[3] if (self.head[1] & 7) == 1 else 0 + + def putx(self, s0, s1, data): + self.put(s0, s1, self.out_ann, data) + + def putwarn(self, longm, shortm): + self.putx(0, -1, [8, [longm, shortm]]) + + def compute_crc8(self): + CRC_8_POLYNOMIAL = 0x29 + rCRC = 0 + + for s in range(self.plen - 1): + rCRC ^= self.datapkt[s] + for i in range(8): + if (rCRC & 0x80): + rCRC = (rCRC << 1) ^ CRC_8_POLYNOMIAL + else: + rCRC = (rCRC << 1) + + return (rCRC & 0xff) + + def get_short(self, i): + k = [self.datapkt[i], self.datapkt[i+1]] + val = k[0] <<8 | (k[1]) + return val + + def get_word(self, i): + hi = self.get_short(i) + lo = self.get_short(i+2) + return lo | (hi << 16) + + def get_dword(self, i): + hi = self.get_word(i) + lo = self.get_word(i+4) + return lo | (hi << 32) + + def puthead(self): + # message reviever, SRC, SNK, Cable + pwr_role = self.head_power_role() + if pwr_role == 1: + ann_type = 6 + role = 'SRC' + elif pwr_role == 2: + ann_type = 7 + role = 'SNK' + elif pwr_role == 3: + ann_type = 10 + role = 'Cable' + else: + ann_type = 11 + role = 'Reserved' + + t = self.head_type() + + if self.data_len() == 0: + if t > 15: + shortm = 'reserved cmd' + else: + shortm = CTRL_TYPES[t] + else: + shortm = DATA_TYPES[t] if t in DATA_TYPES else 'DAT???' + + longm = '(r{:d}) {:s}[{:d}]: {:s}'.format(self.head_rev(), role, self.head_id(), shortm) + self.putx(self.bytepos[0][0], self.bytepos[2][1], [ann_type, [longm, shortm]]) + self.text += longm + + def get_source_cap(self): + numbpdo = int(self.data_len()) // 8 + strpdo = '' + + for numb in range(numbpdo): + pdo = self.get_dword(4 + 8 * numb) + mode = (pdo >> 60) & 15 + step_ma = ((pdo >> 57) & 7) * 10 + 10 + step_mv = ((pdo >> 56) & 1) * 10 + 10 + max_mv = ((pdo >> 40) & 0xffff) * 0.01 + min_mv = ((pdo >> 24) & 0xffff) * 0.01 + max_ma = ((pdo >> 8) & 0xffff) * 0.01 + min_ma = (pdo & 0xff) * 0.01 + strpdo = '[%d] %g/%gV *%gmv %g/%gA *%gma ' \ + % (mode, min_mv, max_mv, step_mv, min_ma, max_ma, step_ma) + + s0 = self.bytepos[4 + 8 * numb][0] + s1 = self.bytepos[4 + 8 * numb + 7][1] + # show data in hex + self.putx(s0, s1, [3, ['[%d]%08x' % (numb, pdo), 'D%d' % (numb)]]) + # show data in comment + txt = '(PDO %s)' % strpdo + self.putx(s0, s1, [8, [txt, txt]]) + # show data in full text string + self.text += ' - ' + txt + + return self.text + + def get_request(self): + rdo = self.get_dword(4) + + mode = (rdo >> 60) & 15 + + curr = (rdo & 0xffff) * 0.01 + volt = ((rdo >> 16) & 0xffff) * 0.01 + + s = '%gV %gA' % (volt, curr) + + return rdo, '(PDO #%d) %s' % (mode, s) + + def get_src_info(self): + d = self.get_dword(4) + #self.putwarn('bbbbbb%08x' % self.get_dword(4), 'aaa') + + it = ((d >> 40 ) & 0xff) - 50 + its = ',internal temp %s' % ('%dC' % it if it > -50 else 'no data') + + pt = ((d >> 32 ) & 0xff) - 50 + pts = ',usb port temp %s' % ('%dC' % pt if pt > -50 else 'no data') + + curr = (d & 0xffff) * 0.01 + volt = ((d >> 16) & 0xffff) * 0.01 + + return d, '(SRC info: %gV %gA %s %s)' % (volt, curr, its, pts) + + def get_snk_info(self): + d = self.get_dword(4) + + it = ((d >> 40 ) & 0xff) - 50 + its = ',battery temp %s' % ('%dC' % it if it > -50 else 'no data') + + pt = ((d >> 32 ) & 0xff) - 50 + pts = ',usb port temp %s' % ('%dC' % pt if pt > -50 else 'no data') + + curr = (d & 0xffff) * 0.01 + volt = ((d >> 16) & 0xffff) * 0.01 + + return d, '(SNK info: %gV %gA %s %s)' % (volt, curr, its, pts) + + def get_cable_info(self): + d = self.get_dword(4) + + id = ((d >> 48 ) & 0xffff) + ids = 'Cable VID %04X' % (id) + + emk = ((d >> 32 ) & 0xffff) + emks = 'Emark VID %04X' % emk + + imp = ((d >> 16) & 0xffff) + imps = 'IMP %d' % imp + + curr = (d & 0xff) + volt = ((d >> 8) & 0xff) + + return d, '(%s %s %s %gV %gA)' % (ids, emks, imps, volt, curr) + + def get_device_info(self): + d = self.get_dword(4) + + id = ((d >> 48 ) & 0xffff) + ids = 'Device VID %04X' % (id) + + pid = ((d >> 32 ) & 0xffff) + pids = 'Protocol IC VID %04X' % emk + + hwv = ((d >> 16) & 0xffff) + hwvs = 'HW rev%04X' % imp + + swv = (d & 0xffff) + swvs = 'SW rev%04X' % imp + + return d, '(%s %s %s %s)' % (ids, pids, hwv, swv) + + def get_error_info(self): + d = self.get_word(4) + + id = ((d >> 15 ) & 0x1ffff) + ids = 'Reversed %04X' % (id) + + e14 = ((d >> 14 ) & 1) + e14s = 'Watchdog %s' % ('OK' if e14 == 0 else 'NG!') + + e13 = ((d >> 13 ) & 1) + e13s = 'CRC %s' % ('OK' if e13 == 0 else 'NG!') + + e12 = ((d >> 12 ) & 1) + e12s = 'Input %s' % ('OK' if e12 == 0 else 'NG!') + + e11 = ((d >> 11 ) & 1) + e11s = 'Current loss %s' % ('OK' if e11 == 0 else 'NG!') + + e10 = ((d >> 10 ) & 1) + e10s = 'UVP %s' % ('OK' if e10 == 0 else 'NG!') + + e9 = ((d >> 9 ) & 1) + e9s = 'OVP %s' % ('OK' if e9 == 0 else 'NG') + + e8 = ((d >> 8 ) & 1) + e8s = 'D+OVP %s' % ('OK' if e8 == 0 else 'NG') + + e7 = ((d >> 7 ) & 1) + e7s = 'D-OVP %s' % ('OK' if e7 == 0 else 'NG') + + e6 = ((d >> 6 ) & 1) + e6s = 'CCOVP %s' % ('OK' if e6 == 0 else 'NG') + + e5 = ((d >> 5 ) & 1) + e5s = 'Battery OTP %s' % ('OK' if e5 == 0 else 'NG') + + e4 = ((d >> 4 ) & 1) + e4s = 'USB port OTP %s' % ('OK' if e4 == 0 else 'NG') + + e3 = ((d >> 3 ) & 1) + e3s = 'SCP %s' % ('OK' if e3 == 0 else 'NG') + + e2 = ((d >> 2 ) & 1) + e2s = 'OCP %s' % ('OK' if e2 == 0 else 'NG') + + e1 = ((d >> 1 ) & 1) + e1s = 'UVP %s' % ('OK' if e1 == 0 else 'NG') + + e0 = ((d >> 0 ) & 1) + e0s = 'OVP %s' % ('OK' if e0 == 0 else 'NG') + + return d, '(%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s)' % (ids, \ + e14, e13, e12, e11, e10, e9, e8, e7, e6, e5, e4, e3, e2, e1, e0) + + def config_watchdog(self): + d = self.get_short(4) + + return d, 'Watchdog overflow time %dms' % d + + def refuse(self): + d = self.get_word(4) + + rev = ((d >> 28 ) & 0xf) + revs = ('Reversed %01X' % rev) if rev else '' + rev2 = ((d >> 19 ) & 0xf) + revs2 = ('Reversed %01X' % rev2) if rev2 else '' + + id = ((d >> 24 ) & 0xf) + ids = 'MSG #%d' % (id) + + t = ((d >> 16 ) & 7) + ts = ', Type %d' % ts + + c = ((d >> 8) & 0xff) + cs = ', CMD %d' % c + + r1 = ((d >> 0) & 0x7) + rs1 = ', unknow' if r1 == 1 else '' + rs1 += ', not support' if r1 == 2 else '' + rs1 += ', busy' if r1 == 3 else '' + rs1 += ', over range' if r1 == 4 else '' + rs1 += ', other' if r1 == 5 else '' + + return d, 'Refuse why? $s' % (ids + ts + cs + revs +revs2 + rs1) + + def verify_request(self): + id = self.datapkt[4] + dhi = self.get_dword(5) + dlo = self.get_dword(5 + 8) + + return id, 'Verify request key ID %d, random %08x%08x' % (id, dhi, dhl) + + def verify_response(self): + key = self.get_dword(4) + rand = self.get_word(4+8) + + return id, 'Verify request data %08x, random %04x' % (key, rand) + + def decode_data_msg(self): + dlen = self.data_len() + + # no data, do nothing + if dlen == 0: + return + + txt = '' + t = self.head_type() + + # SRC CAP + if t == 1: + self.get_source_cap() + # src cap has n(1-15) PDO, putx will be run n times within function, + # so difference here + return + + # request + elif t == 2: + d, txt = self.get_request() + + + # SRC info + elif t == 3: + d, txt = self.get_src_info() + + # SNK info + elif t == 4: + d, txt = self.get_snk_info() + + # cable info + elif t == 5: + d, txt = self.get_cable_info() + + # device info + elif t == 6: + d, txt = self.get_device_info() + + # error info + elif t == 7: + d, txt = self.get_error_info() + + # config watchdog + elif t == 8: + d, txt = self.config_watchdog() + + # refuse + elif t == 9: + d, txt = self.refuse() + + # verify request + elif t == 10: + d, txt = self.verify_request() + + # verify response + elif t == 11: + d, txt = self.verify_response() + + # test request + else: + d, txt = 0xffffffff, 'TODO...' + + s0 = self.bytepos[4][0] + s1 = self.bytepos[dlen+3][1] + # show data in hex + self.putx(s0, s1, [3, ['H:%08x' % d, 'DATA']]) + # show data in comment + self.putx(s0, s1, [8, [txt, txt]]) + # show data in full text string + self.text += ' - ' + txt + + def decode_pkt(self): + # Packet header + self.head = self.datapkt[0:4] + + self.putx(self.bytepos[0][0], self.bytepos[1][1], + [2, ['HEAD:%04x' % ((self.head[0] << 8) | self.head[1]), 'HD']]) + self.puthead() + + # Cmd + self.putx(self.bytepos[2][0], self.bytepos[2][1], + [2, ['CMD:%02x' % (self.head[2]), 'CMD']]) + + # Data length + if self.data_len(): + self.putx(self.bytepos[3][0], self.bytepos[3][1], + [2, ['LEN:%02x' % (self.head[3]), 'LEN']]) + + # Decode data payload + self.decode_data_msg() + + # CRC check + self.crc = self.datapkt[self.plen-1] + ccrc = self.compute_crc8() + if self.crc != ccrc: + self.putwarn('Bad CRC %02x != %02x' % (self.crc, ccrc), 'CRC!') + + self.putx(self.ss_block, self.es_block, + [4, ['CRC:%02x' % (self.crc), 'CRC']]) + + # Full text trace + if self.options['fulltext'] == 'yes': + self.putx(self.bytepos[0][0], self.es_block, [9, [self.text, '...']]) + + def decode(self, ss, es, data): + ptype, rxtx, pdata = data + self.ss_block, self.es_block = ss, es + + # just keep DATA + if ptype != 'DATA': + return + + # '0xAA' is the SOP of a ufcs package + if pdata[0] == 0xaa: + self.putx(ss, es, [3, ['SOP:0xaa', 'SOP']]) + self.reset() + return + + # append data to packet + self.datapkt.append(pdata[0]) + # append byte pos (start byte, end byte ) + self.bytepos.append((ss, es)) + + # ctrl_msg or data_msg? + if self.dataidx == 3 : + if (self.datapkt[1] & 1) == 1: + self.plen = self.datapkt[3] + 5 + else: + self.plen = 4 + + self.dataidx += 1 + + # packet is full + if self.dataidx == self.plen: + self.decode_pkt() +