2
0
forked from Ivasoft/DSView

A new protocol decoder for china fast power

This commit is contained in:
dreamsourcelabTAI
2023-02-21 09:52:07 +08:00
parent 28d1a7caea
commit a01da8057d
2 changed files with 558 additions and 0 deletions

View File

@@ -0,0 +1,28 @@
##
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2014 Bart de Waal <bart@waalamo.com>
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
'''
This decoder stacks on top of the 'uart' PD and decodes UFCS,
a protocol of Universal fast charging specification for mobile devices in China.
<T/TAF 083-2021>
by edison ren 2023.1.25 <i2tv@qq.com>
'''
from .pd import Decoder

View File

@@ -0,0 +1,530 @@
##
## This file is part of the libsigrokdecode project.
##
## Copyright (C) 2023 edison ren <i2tv@qq.com>
## ref from uart & usb_power_delivery
##
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with this program; if not, see <http://www.gnu.org/licenses/>.
##
import sigrokdecode as srd
# Control Message type
CTRL_TYPES = {
0: 'PING',
1: 'ACK',
2: 'NCK',
3: 'ACCEPT',
4: 'SOFT RESET',
5: 'POWER READY',
6: 'GET OUTPUT CAP',
7: 'GET SOURCE INFO',
8: 'GET SINK INFO',
9: 'GET CABLE INFO',
10: 'GET DEVICE INFO',
11: 'GET ERROR INFO',
12: 'DETECT CABLE INFO',
13: 'START CABLE DETECT',
14: 'END CABLE DETECT',
15: 'EXIT UFCS MODE',
}
# Data message type
DATA_TYPES = {
1: 'OUTPUT CAP',
2: 'REQUEST',
3: 'SOURCE INFO',
4: 'SINK INFO',
5: 'CABLE INFO',
6: 'DEVICE INFO',
7: 'ERROR INFO',
8: 'CONFIG WATCHDOG',
9: 'REFUSE',
10: 'Verify_Request',
11: 'Verify_Response',
255: 'Test Request'
}
class Decoder(srd.Decoder):
api_version = 3
id = 'ufcs'
name = 'UFCS'
longname = 'Universal Fast Charging Specification'
desc = 'Universal fast charging specification for mobile devices. T/TAF 083-2021. Coding by edison ren 2023.1.25 <i2tv@qq.com>'
license = 'gplv2+'
inputs = ['uart']
outputs = []
tags = ['PC/Mobile']
options = (
{'id': 'fulltext', 'desc': 'Full text decoding of packets',
'default': 'no', 'values': ('yes', 'no')},
)
annotations = (
('type', 'Packet Type'),
('training', 'Training'),
('header', 'Header'),
('data', 'Data'),
('crc', 'Checksum'),
('warnings', 'Warnings'),
('src', 'Source Message'),
('snk', 'Sink Message'),
('payload', 'Payload'),
('text', 'Plain text'),
('cable', 'Cable Message'),
('reserved', 'Reserved'),
)
annotation_rows = (
('phase', 'Parts', (1, 2, 3, 4,)),
('payload', 'Payload', (8,)),
('type', 'Type', (0, 6, 7, 10, 11)),
('warnings', 'Warnings', (5,)),
('text', 'Full text', (9,)),
)
def __init__(self):
self.reset()
def reset(self):
self.ss_block = None # start pos with samles for show
self.es_block = None # end pos with samples for show
self.dataidx = 0 # data pos with u8 in packet
self.datapkt = [] # data buffer for packet, 4:128
self.head = [] # header 2*u8
self.bytepos = [] # (s, e) pos of byte (start, end)
self.text = '' # info string
self.plen = 0;
def start(self):
self.out_ann = self.register(srd.OUTPUT_ANN)
def head_id(self):
return (self.head[0] >> 1) & 15
def head_power_role(self):
return (self.head[0] >> 5) & 7
def head_rev(self):
return (self.head[1] >> 3) & 31
def head_type(self):
return self.head[2]
def data_len(self):
# data_msg count = data[3], ctrl_msg count = 0
return self.head[3] if (self.head[1] & 7) == 1 else 0
def putx(self, s0, s1, data):
self.put(s0, s1, self.out_ann, data)
def putwarn(self, longm, shortm):
self.putx(0, -1, [8, [longm, shortm]])
def compute_crc8(self):
CRC_8_POLYNOMIAL = 0x29
rCRC = 0
for s in range(self.plen - 1):
rCRC ^= self.datapkt[s]
for i in range(8):
if (rCRC & 0x80):
rCRC = (rCRC << 1) ^ CRC_8_POLYNOMIAL
else:
rCRC = (rCRC << 1)
return (rCRC & 0xff)
def get_short(self, i):
k = [self.datapkt[i], self.datapkt[i+1]]
val = k[0] <<8 | (k[1])
return val
def get_word(self, i):
hi = self.get_short(i)
lo = self.get_short(i+2)
return lo | (hi << 16)
def get_dword(self, i):
hi = self.get_word(i)
lo = self.get_word(i+4)
return lo | (hi << 32)
def puthead(self):
# message reviever, SRC, SNK, Cable
pwr_role = self.head_power_role()
if pwr_role == 1:
ann_type = 6
role = 'SRC'
elif pwr_role == 2:
ann_type = 7
role = 'SNK'
elif pwr_role == 3:
ann_type = 10
role = 'Cable'
else:
ann_type = 11
role = 'Reserved'
t = self.head_type()
if self.data_len() == 0:
if t > 15:
shortm = 'reserved cmd'
else:
shortm = CTRL_TYPES[t]
else:
shortm = DATA_TYPES[t] if t in DATA_TYPES else 'DAT???'
longm = '(r{:d}) {:s}[{:d}]: {:s}'.format(self.head_rev(), role, self.head_id(), shortm)
self.putx(self.bytepos[0][0], self.bytepos[2][1], [ann_type, [longm, shortm]])
self.text += longm
def get_source_cap(self):
numbpdo = int(self.data_len()) // 8
strpdo = ''
for numb in range(numbpdo):
pdo = self.get_dword(4 + 8 * numb)
mode = (pdo >> 60) & 15
step_ma = ((pdo >> 57) & 7) * 10 + 10
step_mv = ((pdo >> 56) & 1) * 10 + 10
max_mv = ((pdo >> 40) & 0xffff) * 0.01
min_mv = ((pdo >> 24) & 0xffff) * 0.01
max_ma = ((pdo >> 8) & 0xffff) * 0.01
min_ma = (pdo & 0xff) * 0.01
strpdo = '[%d] %g/%gV *%gmv %g/%gA *%gma ' \
% (mode, min_mv, max_mv, step_mv, min_ma, max_ma, step_ma)
s0 = self.bytepos[4 + 8 * numb][0]
s1 = self.bytepos[4 + 8 * numb + 7][1]
# show data in hex
self.putx(s0, s1, [3, ['[%d]%08x' % (numb, pdo), 'D%d' % (numb)]])
# show data in comment
txt = '(PDO %s)' % strpdo
self.putx(s0, s1, [8, [txt, txt]])
# show data in full text string
self.text += ' - ' + txt
return self.text
def get_request(self):
rdo = self.get_dword(4)
mode = (rdo >> 60) & 15
curr = (rdo & 0xffff) * 0.01
volt = ((rdo >> 16) & 0xffff) * 0.01
s = '%gV %gA' % (volt, curr)
return rdo, '(PDO #%d) %s' % (mode, s)
def get_src_info(self):
d = self.get_dword(4)
#self.putwarn('bbbbbb%08x' % self.get_dword(4), 'aaa')
it = ((d >> 40 ) & 0xff) - 50
its = ',internal temp %s' % ('%dC' % it if it > -50 else 'no data')
pt = ((d >> 32 ) & 0xff) - 50
pts = ',usb port temp %s' % ('%dC' % pt if pt > -50 else 'no data')
curr = (d & 0xffff) * 0.01
volt = ((d >> 16) & 0xffff) * 0.01
return d, '(SRC info: %gV %gA %s %s)' % (volt, curr, its, pts)
def get_snk_info(self):
d = self.get_dword(4)
it = ((d >> 40 ) & 0xff) - 50
its = ',battery temp %s' % ('%dC' % it if it > -50 else 'no data')
pt = ((d >> 32 ) & 0xff) - 50
pts = ',usb port temp %s' % ('%dC' % pt if pt > -50 else 'no data')
curr = (d & 0xffff) * 0.01
volt = ((d >> 16) & 0xffff) * 0.01
return d, '(SNK info: %gV %gA %s %s)' % (volt, curr, its, pts)
def get_cable_info(self):
d = self.get_dword(4)
id = ((d >> 48 ) & 0xffff)
ids = 'Cable VID %04X' % (id)
emk = ((d >> 32 ) & 0xffff)
emks = 'Emark VID %04X' % emk
imp = ((d >> 16) & 0xffff)
imps = 'IMP %d' % imp
curr = (d & 0xff)
volt = ((d >> 8) & 0xff)
return d, '(%s %s %s %gV %gA)' % (ids, emks, imps, volt, curr)
def get_device_info(self):
d = self.get_dword(4)
id = ((d >> 48 ) & 0xffff)
ids = 'Device VID %04X' % (id)
pid = ((d >> 32 ) & 0xffff)
pids = 'Protocol IC VID %04X' % emk
hwv = ((d >> 16) & 0xffff)
hwvs = 'HW rev%04X' % imp
swv = (d & 0xffff)
swvs = 'SW rev%04X' % imp
return d, '(%s %s %s %s)' % (ids, pids, hwv, swv)
def get_error_info(self):
d = self.get_word(4)
id = ((d >> 15 ) & 0x1ffff)
ids = 'Reversed %04X' % (id)
e14 = ((d >> 14 ) & 1)
e14s = 'Watchdog %s' % ('OK' if e14 == 0 else 'NG!')
e13 = ((d >> 13 ) & 1)
e13s = 'CRC %s' % ('OK' if e13 == 0 else 'NG!')
e12 = ((d >> 12 ) & 1)
e12s = 'Input %s' % ('OK' if e12 == 0 else 'NG!')
e11 = ((d >> 11 ) & 1)
e11s = 'Current loss %s' % ('OK' if e11 == 0 else 'NG!')
e10 = ((d >> 10 ) & 1)
e10s = 'UVP %s' % ('OK' if e10 == 0 else 'NG!')
e9 = ((d >> 9 ) & 1)
e9s = 'OVP %s' % ('OK' if e9 == 0 else 'NG')
e8 = ((d >> 8 ) & 1)
e8s = 'D+OVP %s' % ('OK' if e8 == 0 else 'NG')
e7 = ((d >> 7 ) & 1)
e7s = 'D-OVP %s' % ('OK' if e7 == 0 else 'NG')
e6 = ((d >> 6 ) & 1)
e6s = 'CCOVP %s' % ('OK' if e6 == 0 else 'NG')
e5 = ((d >> 5 ) & 1)
e5s = 'Battery OTP %s' % ('OK' if e5 == 0 else 'NG')
e4 = ((d >> 4 ) & 1)
e4s = 'USB port OTP %s' % ('OK' if e4 == 0 else 'NG')
e3 = ((d >> 3 ) & 1)
e3s = 'SCP %s' % ('OK' if e3 == 0 else 'NG')
e2 = ((d >> 2 ) & 1)
e2s = 'OCP %s' % ('OK' if e2 == 0 else 'NG')
e1 = ((d >> 1 ) & 1)
e1s = 'UVP %s' % ('OK' if e1 == 0 else 'NG')
e0 = ((d >> 0 ) & 1)
e0s = 'OVP %s' % ('OK' if e0 == 0 else 'NG')
return d, '(%s %s %s %s %s %s %s %s %s %s %s %s %s %s %s %s)' % (ids, \
e14, e13, e12, e11, e10, e9, e8, e7, e6, e5, e4, e3, e2, e1, e0)
def config_watchdog(self):
d = self.get_short(4)
return d, 'Watchdog overflow time %dms' % d
def refuse(self):
d = self.get_word(4)
rev = ((d >> 28 ) & 0xf)
revs = ('Reversed %01X' % rev) if rev else ''
rev2 = ((d >> 19 ) & 0xf)
revs2 = ('Reversed %01X' % rev2) if rev2 else ''
id = ((d >> 24 ) & 0xf)
ids = 'MSG #%d' % (id)
t = ((d >> 16 ) & 7)
ts = ', Type %d' % ts
c = ((d >> 8) & 0xff)
cs = ', CMD %d' % c
r1 = ((d >> 0) & 0x7)
rs1 = ', unknow' if r1 == 1 else ''
rs1 += ', not support' if r1 == 2 else ''
rs1 += ', busy' if r1 == 3 else ''
rs1 += ', over range' if r1 == 4 else ''
rs1 += ', other' if r1 == 5 else ''
return d, 'Refuse why? $s' % (ids + ts + cs + revs +revs2 + rs1)
def verify_request(self):
id = self.datapkt[4]
dhi = self.get_dword(5)
dlo = self.get_dword(5 + 8)
return id, 'Verify request key ID %d, random %08x%08x' % (id, dhi, dhl)
def verify_response(self):
key = self.get_dword(4)
rand = self.get_word(4+8)
return id, 'Verify request data %08x, random %04x' % (key, rand)
def decode_data_msg(self):
dlen = self.data_len()
# no data, do nothing
if dlen == 0:
return
txt = ''
t = self.head_type()
# SRC CAP
if t == 1:
self.get_source_cap()
# src cap has n(1-15) PDO, putx will be run n times within function,
# so difference here
return
# request
elif t == 2:
d, txt = self.get_request()
# SRC info
elif t == 3:
d, txt = self.get_src_info()
# SNK info
elif t == 4:
d, txt = self.get_snk_info()
# cable info
elif t == 5:
d, txt = self.get_cable_info()
# device info
elif t == 6:
d, txt = self.get_device_info()
# error info
elif t == 7:
d, txt = self.get_error_info()
# config watchdog
elif t == 8:
d, txt = self.config_watchdog()
# refuse
elif t == 9:
d, txt = self.refuse()
# verify request
elif t == 10:
d, txt = self.verify_request()
# verify response
elif t == 11:
d, txt = self.verify_response()
# test request
else:
d, txt = 0xffffffff, 'TODO...'
s0 = self.bytepos[4][0]
s1 = self.bytepos[dlen+3][1]
# show data in hex
self.putx(s0, s1, [3, ['H:%08x' % d, 'DATA']])
# show data in comment
self.putx(s0, s1, [8, [txt, txt]])
# show data in full text string
self.text += ' - ' + txt
def decode_pkt(self):
# Packet header
self.head = self.datapkt[0:4]
self.putx(self.bytepos[0][0], self.bytepos[1][1],
[2, ['HEAD:%04x' % ((self.head[0] << 8) | self.head[1]), 'HD']])
self.puthead()
# Cmd
self.putx(self.bytepos[2][0], self.bytepos[2][1],
[2, ['CMD:%02x' % (self.head[2]), 'CMD']])
# Data length
if self.data_len():
self.putx(self.bytepos[3][0], self.bytepos[3][1],
[2, ['LEN:%02x' % (self.head[3]), 'LEN']])
# Decode data payload
self.decode_data_msg()
# CRC check
self.crc = self.datapkt[self.plen-1]
ccrc = self.compute_crc8()
if self.crc != ccrc:
self.putwarn('Bad CRC %02x != %02x' % (self.crc, ccrc), 'CRC!')
self.putx(self.ss_block, self.es_block,
[4, ['CRC:%02x' % (self.crc), 'CRC']])
# Full text trace
if self.options['fulltext'] == 'yes':
self.putx(self.bytepos[0][0], self.es_block, [9, [self.text, '...']])
def decode(self, ss, es, data):
ptype, rxtx, pdata = data
self.ss_block, self.es_block = ss, es
# just keep DATA
if ptype != 'DATA':
return
# '0xAA' is the SOP of a ufcs package
if pdata[0] == 0xaa:
self.putx(ss, es, [3, ['SOP:0xaa', 'SOP']])
self.reset()
return
# append data to packet
self.datapkt.append(pdata[0])
# append byte pos (start byte, end byte )
self.bytepos.append((ss, es))
# ctrl_msg or data_msg?
if self.dataidx == 3 :
if (self.datapkt[1] & 1) == 1:
self.plen = self.datapkt[3] + 5
else:
self.plen = 4
self.dataidx += 1
# packet is full
if self.dataidx == self.plen:
self.decode_pkt()