From 4b240082cc9edf4156a9000cb83a42d91959a01d Mon Sep 17 00:00:00 2001 From: ghecko Date: Mon, 11 Oct 2021 09:59:16 +0200 Subject: [PATCH 1/2] adding tpm_spi stacked protocol decoder --- .../decoders/spi_tpm/RangeDict.py | 299 ++++++++++++++++++ .../decoders/spi_tpm/__init__.py | 25 ++ libsigrokdecode4DSL/decoders/spi_tpm/lists.py | 118 +++++++ libsigrokdecode4DSL/decoders/spi_tpm/pd.py | 270 ++++++++++++++++ 4 files changed, 712 insertions(+) create mode 100644 libsigrokdecode4DSL/decoders/spi_tpm/RangeDict.py create mode 100644 libsigrokdecode4DSL/decoders/spi_tpm/__init__.py create mode 100644 libsigrokdecode4DSL/decoders/spi_tpm/lists.py create mode 100644 libsigrokdecode4DSL/decoders/spi_tpm/pd.py diff --git a/libsigrokdecode4DSL/decoders/spi_tpm/RangeDict.py b/libsigrokdecode4DSL/decoders/spi_tpm/RangeDict.py new file mode 100644 index 00000000..a2ecabca --- /dev/null +++ b/libsigrokdecode4DSL/decoders/spi_tpm/RangeDict.py @@ -0,0 +1,299 @@ +# Source: https://raw.githubusercontent.com/WKPlus/rangedict/master/rangedict.py +__all__ = ['RangeDict'] + + +class Color(object): + BLACK = 0 + RED = 1 + + +class Node(object): + __slots__ = ('r', 'left', 'right', 'value', 'color', 'parent') + + def __init__(self, r, value, parent=None, color=Color.RED): + self.r = r + self.value = value + self.parent = parent + self.color = color + self.left = None + self.right = None + + def value_copy(self, other): + self.r = other.r + self.value = other.value + + +class RangeDict(dict): + + def __init__(self): + self._root = None + + def __setitem__(self, r, v): + if r[1] < r[0]: + raise KeyError + node = self._insert(r, v) + self._insert_adjust(node) + + def _insert(self, r, v): + if not self._root: + self._root = Node(r, v) + return self._root + cur = self._root + while True: + if r[1] < cur.r[0]: + if not cur.left: + cur.left = Node(r, v, cur) + return cur.left + cur = cur.left + elif r[0] > cur.r[1]: + if not cur.right: + cur.right = Node(r, v, cur) + return cur.right + cur = cur.right + else: + raise KeyError # overlap not supported + + def _insert_adjust(self, node): + ''' adjust to make the tree still a red black tree ''' + if not node.parent: + node.color = Color.BLACK + return + if node.parent.color == Color.BLACK: + return + uncle = self.sibling(node.parent) + if node_color(uncle) == Color.RED: + node.parent.color = Color.BLACK + uncle.color = Color.BLACK + node.parent.parent.color = Color.RED + return self._insert_adjust(node.parent.parent) + + #parent is red and uncle is black + # since parent is red, grandparent must exists and be black + parent = node.parent + grandparent = parent.parent + if self.is_left_son(parent, grandparent): + if self.is_left_son(node, parent): + self.right_rotate(grandparent) + grandparent.color = Color.RED + parent.color = Color.BLACK + else: + self.left_rotate(parent) + self.right_rotate(grandparent) + grandparent.color = Color.RED + node.color = Color.BLACK + else: + if self.is_left_son(node, parent): + self.right_rotate(parent) + self.left_rotate(grandparent) + grandparent.color = Color.RED + node.color = Color.BLACK + else: + self.left_rotate(grandparent) + grandparent.color = Color.RED + parent.color = Color.BLACK + + def _find_key(self, key): + cur = self._root + while cur: + if key > cur.r[1]: + cur = cur.right + elif key < cur.r[0]: + cur = cur.left + else: + break + return cur + + def _find_range(self, r): + cur = self._root + while cur: + if r[1] < cur.r[0]: + cur = cur.left + elif r[0] > cur.r[1]: + cur = cur.right + elif r[0] == cur.r[0] and r[1] == cur.r[1]: + return cur + else: + raise KeyError + raise KeyError + + def __getitem__(self, key): + tar = self._find_key(key) + if tar: + return tar.value + raise KeyError + + def __contains__(self, key): + return bool(self._find_key(key)) + + def __delitem__(self, r): + node = self._find_range(r) + if node.left and node.right: + left_rightest_child = self.find_rightest(node.left) + node.value_copy(left_rightest_child) + node = left_rightest_child + self._delete(node) + + def _delete(self, node): + # node has at most one child + child = node.left if node.left else node.right + if not node.parent: # node is root + self._root = child + if self._root: + self._root.parent = None + self._root.color = Color.BLACK + return + + parent = node.parent + if not child: + child = Node(None, None, parent, Color.BLACK) + if self.is_left_son(node, parent): + parent.left = child + else: + parent.right = child + child.parent = parent + + if node.color == Color.RED: + # no need to adjust when deleting a red node + return + if node_color(child) == Color.RED: + child.color = Color.BLACK + return + self._delete_adjust(child) + if not child.r: + # mock a None node for adjust, need to delete it after that + parent = child.parent + if self.is_left_son(child, parent): + parent.left = None + else: + parent.right = None + + def _delete_adjust(self, node): + if not node.parent: + node.color = Color.BLACK + return + + parent = node.parent + sibling = self.sibling(node) + if node_color(sibling) == Color.RED: + if self.is_left_son(node, parent): + self.left_rotate(parent) + else: + self.right_rotate(parent) + parent.color = Color.RED + sibling.color = Color.BLACK + sibling = self.sibling(node) # must be black + + # sibling must be black now + if not self.is_black(parent) and self.is_black(sibling.left) and \ + self.is_black(sibling.right): + parent.color = Color.BLACK + sibling.color = Color.RED + return + + if self.is_black(parent) and self.is_black(sibling.left) and \ + self.is_black(sibling.right): + sibling.color = Color.RED + return self._delete_adjust(parent) + + if self.is_left_son(node, parent): + if not self.is_black(sibling.left) and \ + self.is_black(sibling.right): + sibling.left.color = Color.BLACK + sibling.color = Color.RED + self.right_rotate(sibling) + sibling = sibling.parent + + # sibling.right must be red + sibling.color = parent.color + parent.color = Color.BLACK + sibling.right.color = Color.BLACK + self.left_rotate(parent) + else: + if not self.is_black(sibling.right) and \ + self.is_black(sibling.left): + sibling.right.color = Color.BLACK + sibling.color = Color.RED + self.left_rotate(parent) + sibling = sibling.parent + + # sibling.left must be red + sibling.color = parent.color + parent.color = Color.BLACK + sibling.left.color = Color.RED + self.right_rotate(parent) + + def left_rotate(self, node): + right_son = node.right + + if not node.parent: + self._root = right_son + elif self.is_left_son(node, node.parent): + node.parent.left = right_son + else: + node.parent.right = right_son + right_son.parent = node.parent + + node.parent = right_son + node.right = right_son.left + right_son.left = node + + def right_rotate(self, node): + left_son = node.left + if not node.parent: + self._root = left_son + elif self.is_left_son(node, node.parent): + node.parent.left = left_son + else: + node.parent.right = left_son + left_son.parent = node.parent + + node.parent = left_son + node.left = left_son.right + left_son.right = node + + @staticmethod + def sibling(node): + if node.parent.left == node: + return node.parent.right + else: + return node.parent.left + + @staticmethod + def is_left_son(child, parent): + if parent.left == child: + return True + else: + return False + + @staticmethod + def find_rightest(node): + while node.right: + node = node.right + return node + + @staticmethod + def is_black(node): + return node_color(node) == Color.BLACK + + +def node_color(node): + if not node: + return Color.BLACK + return node.color + + +def in_order(root): + ret = [] + if not root: + return [] + return in_order(root.left) + [root.value] + in_order(root.right) + + +def height(root): + if not root: + return 0 + return 1 + max(height(root.left), height(root.right)) + + +if __name__ == '__main__': + pass diff --git a/libsigrokdecode4DSL/decoders/spi_tpm/__init__.py b/libsigrokdecode4DSL/decoders/spi_tpm/__init__.py new file mode 100644 index 00000000..e3bee356 --- /dev/null +++ b/libsigrokdecode4DSL/decoders/spi_tpm/__init__.py @@ -0,0 +1,25 @@ +## +## This file is part of the libsigrokdecode project. +## +## Copyright (C) 2021 ghecko - Jordan Ovrè +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program; if not, see . +## + +''' +This decoder stacks on top of the 'spi' PD and decodes TPM transactions. +It automatically extract BitLocker Volume Master Key (VMK) +''' + +from .pd import Decoder diff --git a/libsigrokdecode4DSL/decoders/spi_tpm/lists.py b/libsigrokdecode4DSL/decoders/spi_tpm/lists.py new file mode 100644 index 00000000..40481ccc --- /dev/null +++ b/libsigrokdecode4DSL/decoders/spi_tpm/lists.py @@ -0,0 +1,118 @@ +## +## This file is part of the libsigrokdecode project. +## +## Copyright (C) 2021 ghecko - Jordan Ovrè +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program; if not, see . +## + +from .RangeDict import RangeDict + +# RangeDict which maps range of FIFO Register space to their names. +# Register space Addresses is defined on the paragraph 6.3.2 of the following document: https://trustedcomputinggroup.org/wp-content/uploads/PC-Client-Specific-Platform-TPM-Profile-for-TPM-2p0-v1p05p_r14_pub.pdf +# Credit: https://github.com/FSecureLABS/bitlocker-spi-toolkit + +fifo_registers = RangeDict() +fifo_registers[(0x0000, 0x0000)] = "TPM_ACCESS_0" +fifo_registers[(0x0001, 0x0007)] = "Reserved" +fifo_registers[(0x0008, 0x000b)] = "TPM_INT_ENABLE_0" +fifo_registers[(0x000c, 0x000c)] = "TPM_INT_VECTOR_0" +fifo_registers[(0x000d, 0x000f)] = "Reserved" +fifo_registers[(0x0010, 0x0013)] = "TPM_INT_STATUS_0" +fifo_registers[(0x0014, 0x0017)] = "TPM_INTF_CAPABILITY_0" +fifo_registers[(0x0018, 0x001b)] = "TPM_STS_0" +fifo_registers[(0x001c, 0x0023)] = "Reserved" +fifo_registers[(0x0024, 0x0027)] = "TPM_DATA_FIFO_0" +fifo_registers[(0x0028, 0x002f)] = "Reserved" +fifo_registers[(0x0030, 0x0033)] = "TPM_INTERFACE_ID_0" +fifo_registers[(0x0034, 0x007f)] = "Reserved" +fifo_registers[(0x0080, 0x0083)] = "TPM_XDATA_FIFO_0" +fifo_registers[(0x0084, 0x0881)] = "Reserved" +fifo_registers[(0x0f00, 0x0f03)] = "TPM_DID_VID_0" +fifo_registers[(0x0f04, 0x0f04)] = "TPM_RID_0" +fifo_registers[(0x0f90, 0x0fff)] = "Reserved" +fifo_registers[(0x1000, 0x1000)] = "TPM_ACCESS_1" +fifo_registers[(0x1001, 0x1007)] = "Reserved" +fifo_registers[(0x1008, 0x100b)] = "TPM_INT_ENABLE_1" +fifo_registers[(0x100c, 0x100c)] = "TPM_INT_VECTOR_1" +fifo_registers[(0x100d, 0x100f)] = "Reserved" +fifo_registers[(0x1010, 0x1013)] = "TPM_INT_STATUS_1" +fifo_registers[(0x1014, 0x1017)] = "TPM_INTF_CAPABILITY_1" +fifo_registers[(0x1018, 0x101b)] = "TPM_STS_1" +fifo_registers[(0x101c, 0x1023)] = "Reserved" +fifo_registers[(0x1024, 0x1027)] = "TPM_DATA_FIFO_1" +fifo_registers[(0x1028, 0x102f)] = "Reserved" +fifo_registers[(0x1030, 0x1030)] = "TPM_INTERFACE_ID_1" +fifo_registers[(0x1037, 0x107f)] = "Reserved" +fifo_registers[(0x1080, 0x1083)] = "TPM_XDATA_FIFO_1" +fifo_registers[(0x1084, 0x1eff)] = "Reserved" +fifo_registers[(0x1f00, 0x1f03)] = "TPM_DID_VID_1" +fifo_registers[(0x1f04, 0x1f04)] = "TPM_RID_1" +fifo_registers[(0x1f05, 0x1fff)] = "Reserved" +fifo_registers[(0x2000, 0x2000)] = "TPM_ACCESS_2" +fifo_registers[(0x2001, 0x2007)] = "Reserved" +fifo_registers[(0x2008, 0x200b)] = "TPM_INT_ENABLE_2" +fifo_registers[(0x200c, 0x200c)] = "TPM_INT_VECTOR_2" +fifo_registers[(0x200d, 0x200f)] = "Reserved" +fifo_registers[(0x2010, 0x2013)] = "TPM_INT_STATUS_2" +fifo_registers[(0x2014, 0x2017)] = "TPM_INTF_CAPABILITY_2" +fifo_registers[(0x2018, 0x201b)] = "TPM_STS_2" +fifo_registers[(0x201c, 0x2023)] = "Reserved" +fifo_registers[(0x2024, 0x2027)] = "TPM_DATA_FIFO_2" +fifo_registers[(0x2028, 0x202f)] = "Reserved" +fifo_registers[(0x2030, 0x2033)] = "TPM_INTERFACE_ID_2" +fifo_registers[(0x2034, 0x207f)] = "Reserved" +fifo_registers[(0x2080, 0x2083)] = "TPM_XDATA_FIFO_2" +fifo_registers[(0x2084, 0x2eff)] = "Reserved" +fifo_registers[(0x2f00, 0x2f03)] = "TPM_DID_VID_2" +fifo_registers[(0x2f04, 0x2f04)] = "TPM_RID_2" +fifo_registers[(0x2f05, 0x2fff)] = "Reserved" +fifo_registers[(0x3000, 0x3000)] = "TPM_ACCESS_3" +fifo_registers[(0x3001, 0x3007)] = "Reserved" +fifo_registers[(0x3008, 0x300b)] = "TPM_INT_ENABLE_3" +fifo_registers[(0x300c, 0x300c)] = "TPM_INT_VECTOR_3" +fifo_registers[(0x300d, 0x300f)] = "Reserved" +fifo_registers[(0x3010, 0x3013)] = "TPM_INT_STATUS_3" +fifo_registers[(0x3014, 0x3017)] = "TPM_INTF_CAPABILITY_3" +fifo_registers[(0x3018, 0x301b)] = "TPM_STS_3" +fifo_registers[(0x301c, 0x3023)] = "Reserved" +fifo_registers[(0x3024, 0x3027)] = "TPM_DATA_FIFO_3" +fifo_registers[(0x3028, 0x302f)] = "Reserved" +fifo_registers[(0x3030, 0x3033)] = "TPM_INTERFACE_ID_3" +fifo_registers[(0x3034, 0x307f)] = "Reserved" +fifo_registers[(0x3080, 0x3083)] = "TPM_XDATA_FIFO_3" +fifo_registers[(0x3084, 0x3eff)] = "Reserved" +fifo_registers[(0x3f00, 0x3f03)] = "TPM_DID_VID_3" +fifo_registers[(0x3f04, 0x3f04)] = "TPM_RID_3" +fifo_registers[(0x3f05, 0x3fff)] = "Reserved" +fifo_registers[(0x4000, 0x4000)] = "TPM_ACCESS_4" +fifo_registers[(0x4001, 0x4007)] = "Reserved" +fifo_registers[(0x4008, 0x400b)] = "TPM_INT_ENABLE_4" +fifo_registers[(0x400c, 0x400c)] = "TPM_INT_VECTOR_4" +fifo_registers[(0x400d, 0x400f)] = "Reserved" +fifo_registers[(0x4010, 0x4013)] = "TPM_INT_STATUS_4" +fifo_registers[(0x4014, 0x4017)] = "TPM_INTF_CAPABILITY_4" +fifo_registers[(0x4018, 0x401b)] = "TPM_STS_4" +fifo_registers[(0x401c, 0x401f)] = "Reserved" +fifo_registers[(0x4020, 0x4023)] = "TPM_HASH_END" +fifo_registers[(0x4024, 0x4027)] = "TPM_DATA_FIFO_4" +fifo_registers[(0x4028, 0x402f)] = "TPM_HASH_START" +fifo_registers[(0x4030, 0x4033)] = "TPM_INTERFACE_ID_4" +fifo_registers[(0x4034, 0x407f)] = "Reserved" +fifo_registers[(0x4080, 0x4083)] = "TPM_XDATA_FIFO_4" +fifo_registers[(0x4084, 0x4eff)] = "Reserved" +fifo_registers[(0x4f00, 0x4f03)] = "TPM_DID_VID_4" +fifo_registers[(0x4f04, 0x4f04)] = "TPM_RID_4" +fifo_registers[(0x4f05, 0x4fff)] = "Reserved" +fifo_registers[(0x5000, 0x5fff)] = "Reserved" diff --git a/libsigrokdecode4DSL/decoders/spi_tpm/pd.py b/libsigrokdecode4DSL/decoders/spi_tpm/pd.py new file mode 100644 index 00000000..301826f1 --- /dev/null +++ b/libsigrokdecode4DSL/decoders/spi_tpm/pd.py @@ -0,0 +1,270 @@ +## +## This file is part of the libsigrokdecode project. +## +## Copyright (C) 2021 ghecko - Jordan Ovrè +## +## This program is free software; you can redistribute it and/or modify +## it under the terms of the GNU General Public License as published by +## the Free Software Foundation; either version 2 of the License, or +## (at your option) any later version. +## +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. +## +## You should have received a copy of the GNU General Public License +## along with this program; if not, see . +## + +import sigrokdecode as srd +import re +from collections import deque +from .lists import fifo_registers + + +class Ann: + """ Annotation ID """ + READ, WRITE, ADDRESS, DATA, VMK = range(5) + + +class Operation: + """ TPM transaction type """ + READ = 0x80 + WRITE = 0x00 + + +class TransactionState: + """ State of the transaction """ + READ = 0 + WRITE = 1 + READ_ADDRESS = 2 + WAIT = 3 + TRANSFER_DATA = 4 + + +class Transaction: + """Represent one TPM SPI transaction + Args: + start_sample: The absolute samplenumber of the first sample of this transaction. + operation: Transaction type. + size: The number of data bytes. + Attributes: + start_sample (int): The absolute samplenumber of the first sample of this transaction. + end_sample (int): The absolute samplenumber of the last sample of this transaction. + operation (Operation): Transaction type. + address (bytearray): The register address in the transaction. (big-endian). + data (bytearray): Data in the transaction. + size (int): The number of data bytes. + wait_count (int): Holds the number of wait states between the address and data . + """ + + def __init__(self, start_sample, operation): + self.start_sample = start_sample + self.end_sample_op = None + self.end_sample_addr = None + self.end_sample_data = None + self.operation = operation + self.address = bytearray() + self.data = bytearray() + self.size = 0 + self.wait_count = 0 + + def is_complete(self): + """ + Check if the transaction is complete. + A transaction is complete when all address and data bytes are captured. + """ + return self.is_address_complete() and self.is_data_complete() + + def is_data_complete(self): + """ Check if all data bytes are captured """ + return len(self.data) == self.size + + def is_address_complete(self): + """ Check if all address bytes are captured. """ + return len(self.address) == 3 + + def frame(self): + """ Return address and data annotation if the transaction is complete. """ + if self.is_complete(): + register_name = "" + try: + register_name = fifo_registers[int.from_bytes(self.address, "big") & 0xffff] + except KeyError: + register_name = "Unknown" + wait_str = '' if self.wait_count == 0 else ' (Waits: {})'.format(self.wait_count) + data_str = ''.join('{:02x}'.format(x) for x in self.data) + op_ann = ['Read', 'Rd'] if self.operation == Operation.READ else ['Write', 'Wr'] + addr_ann = ['Register: {}'.format(register_name), '{}'.format(register_name)] + data_ann = ['{}{}'.format(data_str, wait_str), '{}'.format(data_str), data_str] + return ((self.start_sample, self.end_sample_op, op_ann), + (self.end_sample_op, self.end_sample_addr, addr_ann), + (self.end_sample_addr, self.end_sample_data, data_ann)) + return None + + +class Decoder(srd.Decoder): + api_version = 3 + id = 'spi_tpm' + name = 'SPI TPM' + longname = 'SPI TPM transactions' + desc = 'Parses TPM transactions from SPI bus with automatic VMK extraction for BitLocker.' + license = 'gplv2+' + inputs = ['spi'] + outputs = [] + tags = ['IC', 'TPM', 'BitLocker'] + annotations = ( + ('Read', 'Read register operation'), + ('Write', 'Write register operation'), + ('Address', 'Register address'), + ('Data', 'Data'), + ('VMK', 'Extracted BitLocker VMK'), + ) + annotation_rows = ( + ('Transactions', 'TPM transactions', (0, 1, 2, 3)), + ('B-VMK', 'BitLocker Volume Master Key', (4,)), + ) + options = ( + {'id': 'wait_mask', 'desc': 'TPM Wait transfer Mask', 'default': '0x00', + 'values': ('0x00', '0xFE')}, + ) + + def __init__(self): + self.end_wait = 0x01 + # Circular buffer to detect VMK header on transaction data + self.queue = deque(maxlen=12) + self.vmk_meta = {"s_queue": deque(maxlen=12), "vmk_ss": 0, "vmk_es": 0} + self.saving_vmk = False + self.vmk = [] + self.reset() + self.state_machine = None + self.init_state_machine() + + def reset(self): + self.ss = self.es = 0 + self.state = None + self.current_transaction = None + + def end_current_transaction(self): + self.reset() + + def start(self): + self.out_ann = self.register(srd.OUTPUT_ANN) + + def init_state_machine(self): + self.state_machine = { + TransactionState.READ: self._transaction_read, + TransactionState.WRITE: self._transaction_write, + TransactionState.READ_ADDRESS: self._transaction_read_addr, + TransactionState.WAIT: self._transaction_wait, + TransactionState.TRANSFER_DATA: self._transaction_data, + None: self._return + } + + def _return(self, mosi, miso): + return + + @staticmethod + def transaction_state(mosi): + if mosi == Operation.READ: + return TransactionState.READ + elif mosi == Operation.WRITE: + return TransactionState.WRITE + else: + return None + + def _transaction_wait(self, mosi, miso): + self.current_transaction.wait_count += 1 + if miso == self.end_wait: + self.state = TransactionState.TRANSFER_DATA + + def _transaction_read(self, mosi, miso): + # TPM operation is one byte long (0x80: Read, 0x00: Write) + self.current_transaction = Transaction(self.ss, Operation.READ) + self.current_transaction.end_sample_op = self.es + self.state = TransactionState.READ_ADDRESS + + def _transaction_write(self, mosi, miso): + # TPM operation is one byte long (0x80: Read, 0x00: Write) + self.current_transaction = Transaction(self.ss, Operation.WRITE) + self.current_transaction.end_sample_op = self.es + self.state = TransactionState.READ_ADDRESS + + def _transaction_read_addr(self, mosi, miso): + # Get address bytes + # Address is 3 bytes long + self.current_transaction.address.extend(mosi.to_bytes(1, byteorder='big')) + # The transfer size byte is sent at the same time than the last byte address + if self.current_transaction.is_address_complete(): + self.current_transaction.end_sample_addr = self.es + self.current_transaction.size = miso + self.state = TransactionState.TRANSFER_DATA + return + + def _transaction_data(self, mosi, miso): + self.current_transaction.end_sample_data = self.es + if miso == self.wait_mask: + self.state = TransactionState.WAIT + return + if self.current_transaction.operation == Operation.READ: + self.current_transaction.data.extend(miso.to_bytes(1, byteorder='big')) + self.recover_vmk(miso) + elif self.current_transaction.operation == Operation.WRITE: + self.current_transaction.data.extend(mosi.to_bytes(1, byteorder='big')) + # Check if the transaction is complete + annotation = self.current_transaction.frame() + if annotation: + (op_ss, op_es, op_ann), (addr_ss, addr_es, addr_ann), (data_ss, data_es, data_ann) = annotation + self.put(op_ss, op_es, self.out_ann, + [Ann.READ if self.current_transaction.operation == Operation.READ else Ann.WRITE, op_ann]) + self.put(addr_ss, addr_es, self.out_ann, [Ann.ADDRESS, addr_ann]) + self.put(data_ss, data_es, self.out_ann, [Ann.DATA, data_ann]) + self.end_current_transaction() + + def check_vmk_header(self): + """ Check for VMK header """ + if self.queue[0] == 0x2c: + potential_header = ''.join('{:02x}'.format(x) for x in self.queue) + if re.findall(r'2c000[0-6]000[1-9]000[0-1]000[0-5]200000', potential_header): + self.put(self.vmk_meta["s_queue"][0], self.es, self.out_ann, + [Ann.VMK, ['VMK header: {}'.format(potential_header)]]) + self.saving_vmk = True + + def recover_vmk(self, miso): + """ Check if VMK is releasing """ + if not self.saving_vmk: + # Add data to the circular buffer + self.queue.append(miso) + # Add sample number to meta queue + self.vmk_meta["s_queue"].append(self.ss) + # Check if VMK header retrieved + self.check_vmk_header() + else: + if len(self.vmk) == 0: + self.vmk_meta["vmk_ss"] = self.ss + if len(self.vmk) < 32: + self.vmk.append(miso) + self.vmk_meta["vmk_es"] = self.es + else: + self.saving_vmk = False + self.put(self.vmk_meta["vmk_ss"], self.vmk_meta["vmk_es"], self.out_ann, + [Ann.VMK, ['VMK: {}'.format(''.join('{:02x}'.format(x) for x in self.vmk))]]) + + def decode(self, ss, es, data): + self.wait_mask = bytes.fromhex(self.options['wait_mask'].strip("0x")) + + self.ss, self.es = ss, es + + ptype, mosi, miso = data + + if ptype == 'CS-CHANGE': + self.end_current_transaction() + + if ptype != 'DATA': + return + + if self.state is None: + self.state = self.transaction_state(mosi) + + self.state_machine[self.state](mosi, miso) From 4198b30240e4dbf35978e40d84965a5aa60b5c53 Mon Sep 17 00:00:00 2001 From: jovre Date: Fri, 15 Oct 2021 15:50:54 +0200 Subject: [PATCH 2/2] Rectifying transaction size extraction method --- libsigrokdecode4DSL/decoders/spi_tpm/pd.py | 28 ++++++++++++---------- 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/libsigrokdecode4DSL/decoders/spi_tpm/pd.py b/libsigrokdecode4DSL/decoders/spi_tpm/pd.py index 301826f1..167d5c9e 100644 --- a/libsigrokdecode4DSL/decoders/spi_tpm/pd.py +++ b/libsigrokdecode4DSL/decoders/spi_tpm/pd.py @@ -48,7 +48,7 @@ class Transaction: Args: start_sample: The absolute samplenumber of the first sample of this transaction. operation: Transaction type. - size: The number of data bytes. + transfer_size: The number of data bytes. Attributes: start_sample (int): The absolute samplenumber of the first sample of this transaction. end_sample (int): The absolute samplenumber of the last sample of this transaction. @@ -59,7 +59,7 @@ class Transaction: wait_count (int): Holds the number of wait states between the address and data . """ - def __init__(self, start_sample, operation): + def __init__(self, start_sample, operation, transfer_size): self.start_sample = start_sample self.end_sample_op = None self.end_sample_addr = None @@ -67,7 +67,7 @@ class Transaction: self.operation = operation self.address = bytearray() self.data = bytearray() - self.size = 0 + self.size = transfer_size self.wait_count = 0 def is_complete(self): @@ -132,6 +132,8 @@ class Decoder(srd.Decoder): def __init__(self): self.end_wait = 0x01 + self.operation_mask = 0x80 + self.address_mask = 0x3f # Circular buffer to detect VMK header on transaction data self.queue = deque(maxlen=12) self.vmk_meta = {"s_queue": deque(maxlen=12), "vmk_ss": 0, "vmk_es": 0} @@ -165,11 +167,10 @@ class Decoder(srd.Decoder): def _return(self, mosi, miso): return - @staticmethod - def transaction_state(mosi): - if mosi == Operation.READ: + def transaction_state(self, mosi): + if (mosi & self.operation_mask) == Operation.READ: return TransactionState.READ - elif mosi == Operation.WRITE: + elif (mosi & self.operation_mask) == Operation.WRITE: return TransactionState.WRITE else: return None @@ -180,14 +181,18 @@ class Decoder(srd.Decoder): self.state = TransactionState.TRANSFER_DATA def _transaction_read(self, mosi, miso): - # TPM operation is one byte long (0x80: Read, 0x00: Write) - self.current_transaction = Transaction(self.ss, Operation.READ) + # TPM operation is defined on the 7th bit of the first byte of the transaction (1=read / 0=write) + # transfer size is defined on bits 0 to 5 of the first byte of the transaction + transfer_size = (mosi & 0x3f) + 1 + self.current_transaction = Transaction(self.ss, Operation.READ, transfer_size) self.current_transaction.end_sample_op = self.es self.state = TransactionState.READ_ADDRESS def _transaction_write(self, mosi, miso): - # TPM operation is one byte long (0x80: Read, 0x00: Write) - self.current_transaction = Transaction(self.ss, Operation.WRITE) + # TPM operation is defined on the 7th bit of the first byte of the transaction (1=read / 0=write) + # transfer size is defined on bits 0 to 5 of the first byte of the transaction + transfer_size = (mosi & 0x3f) + 1 + self.current_transaction = Transaction(self.ss, Operation.WRITE, transfer_size) self.current_transaction.end_sample_op = self.es self.state = TransactionState.READ_ADDRESS @@ -198,7 +203,6 @@ class Decoder(srd.Decoder): # The transfer size byte is sent at the same time than the last byte address if self.current_transaction.is_address_complete(): self.current_transaction.end_sample_addr = self.es - self.current_transaction.size = miso self.state = TransactionState.TRANSFER_DATA return