cachepc-linux

Fork of AMDESE/linux with modifications for CachePC side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-linux
Log | Files | Refs | README | LICENSE | sfeed.txt

tpm2_tests.py (10439B)


      1# SPDX-License-Identifier: (GPL-2.0 OR BSD-3-Clause)
      2
      3from argparse import ArgumentParser
      4from argparse import FileType
      5import os
      6import sys
      7import tpm2
      8from tpm2 import ProtocolError
      9import unittest
     10import logging
     11import struct
     12
     13class SmokeTest(unittest.TestCase):
     14    def setUp(self):
     15        self.client = tpm2.Client()
     16        self.root_key = self.client.create_root_key()
     17
     18    def tearDown(self):
     19        self.client.flush_context(self.root_key)
     20        self.client.close()
     21
     22    def test_seal_with_auth(self):
     23        data = ('X' * 64).encode()
     24        auth = ('A' * 15).encode()
     25
     26        blob = self.client.seal(self.root_key, data, auth, None)
     27        result = self.client.unseal(self.root_key, blob, auth, None)
     28        self.assertEqual(data, result)
     29
     30    def determine_bank_alg(self, mask):
     31        pcr_banks = self.client.get_cap_pcrs()
     32        for bank_alg, pcrSelection in pcr_banks.items():
     33            if pcrSelection & mask == mask:
     34                return bank_alg
     35        return None
     36
     37    def test_seal_with_policy(self):
     38        bank_alg = self.determine_bank_alg(1 << 16)
     39        self.assertIsNotNone(bank_alg)
     40
     41        handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL)
     42
     43        data = ('X' * 64).encode()
     44        auth = ('A' * 15).encode()
     45        pcrs = [16]
     46
     47        try:
     48            self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
     49            self.client.policy_password(handle)
     50
     51            policy_dig = self.client.get_policy_digest(handle)
     52        finally:
     53            self.client.flush_context(handle)
     54
     55        blob = self.client.seal(self.root_key, data, auth, policy_dig)
     56
     57        handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
     58
     59        try:
     60            self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
     61            self.client.policy_password(handle)
     62
     63            result = self.client.unseal(self.root_key, blob, auth, handle)
     64        except:
     65            self.client.flush_context(handle)
     66            raise
     67
     68        self.assertEqual(data, result)
     69
     70    def test_unseal_with_wrong_auth(self):
     71        data = ('X' * 64).encode()
     72        auth = ('A' * 20).encode()
     73        rc = 0
     74
     75        blob = self.client.seal(self.root_key, data, auth, None)
     76        try:
     77            result = self.client.unseal(self.root_key, blob,
     78                        auth[:-1] + 'B'.encode(), None)
     79        except ProtocolError as e:
     80            rc = e.rc
     81
     82        self.assertEqual(rc, tpm2.TPM2_RC_AUTH_FAIL)
     83
     84    def test_unseal_with_wrong_policy(self):
     85        bank_alg = self.determine_bank_alg(1 << 16 | 1 << 1)
     86        self.assertIsNotNone(bank_alg)
     87
     88        handle = self.client.start_auth_session(tpm2.TPM2_SE_TRIAL)
     89
     90        data = ('X' * 64).encode()
     91        auth = ('A' * 17).encode()
     92        pcrs = [16]
     93
     94        try:
     95            self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
     96            self.client.policy_password(handle)
     97
     98            policy_dig = self.client.get_policy_digest(handle)
     99        finally:
    100            self.client.flush_context(handle)
    101
    102        blob = self.client.seal(self.root_key, data, auth, policy_dig)
    103
    104        # Extend first a PCR that is not part of the policy and try to unseal.
    105        # This should succeed.
    106
    107        ds = tpm2.get_digest_size(bank_alg)
    108        self.client.extend_pcr(1, ('X' * ds).encode(), bank_alg=bank_alg)
    109
    110        handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
    111
    112        try:
    113            self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
    114            self.client.policy_password(handle)
    115
    116            result = self.client.unseal(self.root_key, blob, auth, handle)
    117        except:
    118            self.client.flush_context(handle)
    119            raise
    120
    121        self.assertEqual(data, result)
    122
    123        # Then, extend a PCR that is part of the policy and try to unseal.
    124        # This should fail.
    125        self.client.extend_pcr(16, ('X' * ds).encode(), bank_alg=bank_alg)
    126
    127        handle = self.client.start_auth_session(tpm2.TPM2_SE_POLICY)
    128
    129        rc = 0
    130
    131        try:
    132            self.client.policy_pcr(handle, pcrs, bank_alg=bank_alg)
    133            self.client.policy_password(handle)
    134
    135            result = self.client.unseal(self.root_key, blob, auth, handle)
    136        except ProtocolError as e:
    137            rc = e.rc
    138            self.client.flush_context(handle)
    139        except:
    140            self.client.flush_context(handle)
    141            raise
    142
    143        self.assertEqual(rc, tpm2.TPM2_RC_POLICY_FAIL)
    144
    145    def test_seal_with_too_long_auth(self):
    146        ds = tpm2.get_digest_size(tpm2.TPM2_ALG_SHA1)
    147        data = ('X' * 64).encode()
    148        auth = ('A' * (ds + 1)).encode()
    149
    150        rc = 0
    151        try:
    152            blob = self.client.seal(self.root_key, data, auth, None)
    153        except ProtocolError as e:
    154            rc = e.rc
    155
    156        self.assertEqual(rc, tpm2.TPM2_RC_SIZE)
    157
    158    def test_too_short_cmd(self):
    159        rejected = False
    160        try:
    161            fmt = '>HIII'
    162            cmd = struct.pack(fmt,
    163                              tpm2.TPM2_ST_NO_SESSIONS,
    164                              struct.calcsize(fmt) + 1,
    165                              tpm2.TPM2_CC_FLUSH_CONTEXT,
    166                              0xDEADBEEF)
    167
    168            self.client.send_cmd(cmd)
    169        except IOError as e:
    170            rejected = True
    171        except:
    172            pass
    173        self.assertEqual(rejected, True)
    174
    175    def test_read_partial_resp(self):
    176        try:
    177            fmt = '>HIIH'
    178            cmd = struct.pack(fmt,
    179                              tpm2.TPM2_ST_NO_SESSIONS,
    180                              struct.calcsize(fmt),
    181                              tpm2.TPM2_CC_GET_RANDOM,
    182                              0x20)
    183            self.client.tpm.write(cmd)
    184            hdr = self.client.tpm.read(10)
    185            sz = struct.unpack('>I', hdr[2:6])[0]
    186            rsp = self.client.tpm.read()
    187        except:
    188            pass
    189        self.assertEqual(sz, 10 + 2 + 32)
    190        self.assertEqual(len(rsp), 2 + 32)
    191
    192    def test_read_partial_overwrite(self):
    193        try:
    194            fmt = '>HIIH'
    195            cmd = struct.pack(fmt,
    196                              tpm2.TPM2_ST_NO_SESSIONS,
    197                              struct.calcsize(fmt),
    198                              tpm2.TPM2_CC_GET_RANDOM,
    199                              0x20)
    200            self.client.tpm.write(cmd)
    201            # Read part of the respone
    202            rsp1 = self.client.tpm.read(15)
    203
    204            # Send a new cmd
    205            self.client.tpm.write(cmd)
    206
    207            # Read the whole respone
    208            rsp2 = self.client.tpm.read()
    209        except:
    210            pass
    211        self.assertEqual(len(rsp1), 15)
    212        self.assertEqual(len(rsp2), 10 + 2 + 32)
    213
    214    def test_send_two_cmds(self):
    215        rejected = False
    216        try:
    217            fmt = '>HIIH'
    218            cmd = struct.pack(fmt,
    219                              tpm2.TPM2_ST_NO_SESSIONS,
    220                              struct.calcsize(fmt),
    221                              tpm2.TPM2_CC_GET_RANDOM,
    222                              0x20)
    223            self.client.tpm.write(cmd)
    224
    225            # expect the second one to raise -EBUSY error
    226            self.client.tpm.write(cmd)
    227            rsp = self.client.tpm.read()
    228
    229        except IOError as e:
    230            # read the response
    231            rsp = self.client.tpm.read()
    232            rejected = True
    233            pass
    234        except:
    235            pass
    236        self.assertEqual(rejected, True)
    237
    238class SpaceTest(unittest.TestCase):
    239    def setUp(self):
    240        logging.basicConfig(filename='SpaceTest.log', level=logging.DEBUG)
    241
    242    def test_make_two_spaces(self):
    243        log = logging.getLogger(__name__)
    244        log.debug("test_make_two_spaces")
    245
    246        space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
    247        root1 = space1.create_root_key()
    248        space2 = tpm2.Client(tpm2.Client.FLAG_SPACE)
    249        root2 = space2.create_root_key()
    250        root3 = space2.create_root_key()
    251
    252        log.debug("%08x" % (root1))
    253        log.debug("%08x" % (root2))
    254        log.debug("%08x" % (root3))
    255
    256    def test_flush_context(self):
    257        log = logging.getLogger(__name__)
    258        log.debug("test_flush_context")
    259
    260        space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
    261        root1 = space1.create_root_key()
    262        log.debug("%08x" % (root1))
    263
    264        space1.flush_context(root1)
    265
    266    def test_get_handles(self):
    267        log = logging.getLogger(__name__)
    268        log.debug("test_get_handles")
    269
    270        space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
    271        space1.create_root_key()
    272        space2 = tpm2.Client(tpm2.Client.FLAG_SPACE)
    273        space2.create_root_key()
    274        space2.create_root_key()
    275
    276        handles = space2.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_TRANSIENT)
    277
    278        self.assertEqual(len(handles), 2)
    279
    280        log.debug("%08x" % (handles[0]))
    281        log.debug("%08x" % (handles[1]))
    282
    283    def test_invalid_cc(self):
    284        log = logging.getLogger(__name__)
    285        log.debug(sys._getframe().f_code.co_name)
    286
    287        TPM2_CC_INVALID = tpm2.TPM2_CC_FIRST - 1
    288
    289        space1 = tpm2.Client(tpm2.Client.FLAG_SPACE)
    290        root1 = space1.create_root_key()
    291        log.debug("%08x" % (root1))
    292
    293        fmt = '>HII'
    294        cmd = struct.pack(fmt, tpm2.TPM2_ST_NO_SESSIONS, struct.calcsize(fmt),
    295                          TPM2_CC_INVALID)
    296
    297        rc = 0
    298        try:
    299            space1.send_cmd(cmd)
    300        except ProtocolError as e:
    301            rc = e.rc
    302
    303        self.assertEqual(rc, tpm2.TPM2_RC_COMMAND_CODE |
    304                         tpm2.TSS2_RESMGR_TPM_RC_LAYER)
    305
    306class AsyncTest(unittest.TestCase):
    307    def setUp(self):
    308        logging.basicConfig(filename='AsyncTest.log', level=logging.DEBUG)
    309
    310    def test_async(self):
    311        log = logging.getLogger(__name__)
    312        log.debug(sys._getframe().f_code.co_name)
    313
    314        async_client = tpm2.Client(tpm2.Client.FLAG_NONBLOCK)
    315        log.debug("Calling get_cap in a NON_BLOCKING mode")
    316        async_client.get_cap(tpm2.TPM2_CAP_HANDLES, tpm2.HR_LOADED_SESSION)
    317        async_client.close()
    318
    319    def test_flush_invalid_context(self):
    320        log = logging.getLogger(__name__)
    321        log.debug(sys._getframe().f_code.co_name)
    322
    323        async_client = tpm2.Client(tpm2.Client.FLAG_SPACE | tpm2.Client.FLAG_NONBLOCK)
    324        log.debug("Calling flush_context passing in an invalid handle ")
    325        handle = 0x80123456
    326        rc = 0
    327        try:
    328            async_client.flush_context(handle)
    329        except OSError as e:
    330            rc = e.errno
    331
    332        self.assertEqual(rc, 22)
    333        async_client.close()