cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

296 (9060B)


      1#!/usr/bin/env python3
      2# group: rw
      3#
      4# Test case for encryption key management versus image sharing
      5#
      6# Copyright (C) 2019 Red Hat, Inc.
      7#
      8# This program is free software; you can redistribute it and/or modify
      9# it under the terms of the GNU General Public License as published by
     10# the Free Software Foundation; either version 2 of the License, or
     11# (at your option) any later version.
     12#
     13# This program is distributed in the hope that it will be useful,
     14# but WITHOUT ANY WARRANTY; without even the implied warranty of
     15# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     16# GNU General Public License for more details.
     17#
     18# You should have received a copy of the GNU General Public License
     19# along with this program.  If not, see <http://www.gnu.org/licenses/>.
     20#
     21
     22import iotests
     23import os
     24import time
     25import json
     26
     27test_img = os.path.join(iotests.test_dir, 'test.img')
     28
     29class Secret:
     30    def __init__(self, index):
     31        self._id = "keysec" + str(index)
     32        # you are not supposed to see the password...
     33        self._secret = "hunter" + str(index)
     34
     35    def id(self):
     36        return self._id
     37
     38    def secret(self):
     39        return self._secret
     40
     41    def to_cmdline_object(self):
     42        return  [ "secret,id=" + self._id + ",data=" + self._secret]
     43
     44    def to_qmp_object(self):
     45        return { "qom_type" : "secret", "id": self.id(),
     46                 "data": self.secret() }
     47
     48################################################################################
     49
     50class EncryptionSetupTestCase(iotests.QMPTestCase):
     51
     52    # test case startup
     53    def setUp(self):
     54
     55        # start the VMs
     56        self.vm1 = iotests.VM(path_suffix = 'VM1')
     57        self.vm2 = iotests.VM(path_suffix = 'VM2')
     58        self.vm1.launch()
     59        self.vm2.launch()
     60
     61        # create the secrets and load 'em into the VMs
     62        self.secrets = [ Secret(i) for i in range(0, 4) ]
     63        for secret in self.secrets:
     64            result = self.vm1.qmp("object-add", **secret.to_qmp_object())
     65            self.assert_qmp(result, 'return', {})
     66            result = self.vm2.qmp("object-add", **secret.to_qmp_object())
     67            self.assert_qmp(result, 'return', {})
     68
     69    # test case shutdown
     70    def tearDown(self):
     71        # stop the VM
     72        self.vm1.shutdown()
     73        self.vm2.shutdown()
     74
     75    ###########################################################################
     76    # create the encrypted block device using qemu-img
     77    def createImg(self, file, secret):
     78
     79        output = iotests.qemu_img_pipe(
     80            'create',
     81            '--object', *secret.to_cmdline_object(),
     82            '-f', iotests.imgfmt,
     83            '-o', 'key-secret=' + secret.id(),
     84            '-o', 'iter-time=10',
     85            file,
     86            '1M')
     87
     88        iotests.log(output, filters=[iotests.filter_test_dir])
     89
     90    # attempts to add a key using qemu-img
     91    def addKey(self, file, secret, new_secret):
     92
     93        image_options = {
     94            'key-secret' : secret.id(),
     95            'driver' : iotests.imgfmt,
     96            'file' : {
     97                'driver':'file',
     98                'filename': file,
     99                }
    100            }
    101
    102        output = iotests.qemu_img_pipe(
    103            'amend',
    104            '--object', *secret.to_cmdline_object(),
    105            '--object', *new_secret.to_cmdline_object(),
    106
    107            '-o', 'state=active',
    108            '-o', 'new-secret=' + new_secret.id(),
    109            '-o', 'iter-time=10',
    110
    111            "json:" + json.dumps(image_options)
    112            )
    113
    114        iotests.log(output, filters=[iotests.filter_test_dir])
    115
    116    ###########################################################################
    117    # open an encrypted block device
    118    def openImageQmp(self, vm, id, file, secret,
    119                     readOnly = False, reOpen = False):
    120
    121        command = 'blockdev-reopen' if reOpen else 'blockdev-add'
    122
    123        opts = {
    124                'driver': iotests.imgfmt,
    125                'node-name': id,
    126                'read-only': readOnly,
    127                'key-secret' : secret.id(),
    128                'file': {
    129                    'driver': 'file',
    130                    'filename': test_img,
    131                }
    132            }
    133
    134        if reOpen:
    135            result = vm.qmp(command, options=[opts])
    136        else:
    137            result = vm.qmp(command, **opts)
    138        self.assert_qmp(result, 'return', {})
    139
    140
    141    ###########################################################################
    142    # add virtio-blk consumer for a block device
    143    def addImageUser(self, vm, id, disk_id, share_rw=False):
    144        result = vm.qmp('device_add', **
    145            {
    146                'driver': 'virtio-blk',
    147                'id': id,
    148                'drive': disk_id,
    149                'share-rw' : share_rw
    150            }
    151        )
    152
    153        iotests.log(result)
    154
    155    # close the encrypted block device
    156    def closeImageQmp(self, vm, id):
    157        result = vm.qmp('blockdev-del', **{ 'node-name': id })
    158        self.assert_qmp(result, 'return', {})
    159
    160    ###########################################################################
    161
    162    # add a key to an encrypted block device
    163    def addKeyQmp(self, vm, id, new_secret):
    164
    165        args = {
    166            'node-name': id,
    167            'job-id' : 'job0',
    168            'options' : {
    169                'state'     : 'active',
    170                'driver'    : iotests.imgfmt,
    171                'new-secret': new_secret.id(),
    172                'iter-time' : 10
    173            },
    174        }
    175
    176        result = vm.qmp('x-blockdev-amend', **args)
    177        assert result['return'] == {}
    178        vm.run_job('job0')
    179
    180    # test that when the image opened by two qemu processes,
    181    # neither of them can update the encryption keys
    182    def test1(self):
    183        self.createImg(test_img, self.secrets[0]);
    184
    185        # VM1 opens the image and adds a key
    186        self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0])
    187        self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[1])
    188
    189
    190        # VM2 opens the image
    191        self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
    192
    193
    194        # neither VMs now should be able to add a key
    195        self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
    196        self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2])
    197
    198
    199        # VM 1 closes the image
    200        self.closeImageQmp(self.vm1, "testdev")
    201
    202
    203        # now VM2 can add the key
    204        self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2])
    205
    206
    207        # qemu-img should also not be able to add a key
    208        self.addKey(test_img, self.secrets[0], self.secrets[2])
    209
    210        # cleanup
    211        self.closeImageQmp(self.vm2, "testdev")
    212        os.remove(test_img)
    213
    214
    215    # test that when the image opened by two qemu processes,
    216    # even if first VM opens it read-only, the second can't update encryption
    217    # keys
    218    def test2(self):
    219        self.createImg(test_img, self.secrets[0]);
    220
    221        # VM1 opens the image readonly
    222        self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0],
    223                          readOnly = True)
    224
    225        # VM2 opens the image
    226        self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
    227
    228        # VM1 can't add a key since image is readonly
    229        self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
    230
    231        # VM2 can't add a key since VM is has the image opened
    232        self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2])
    233
    234
    235        #VM1 reopens the image read-write
    236        self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0],
    237                          reOpen = True, readOnly = False)
    238
    239        # VM1 still can't add the key
    240        self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
    241
    242        # VM2 gets away
    243        self.closeImageQmp(self.vm2, "testdev")
    244
    245        # VM1 now can add the key
    246        self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
    247
    248        self.closeImageQmp(self.vm1, "testdev")
    249        os.remove(test_img)
    250
    251    # test that two VMs can't open the same luks image by default
    252    # and attach it to a guest device
    253    def test3(self):
    254        self.createImg(test_img, self.secrets[0]);
    255
    256        self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0])
    257        self.addImageUser(self.vm1, "testctrl", "testdev")
    258
    259        self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
    260        self.addImageUser(self.vm2, "testctrl", "testdev")
    261
    262
    263    # test that two VMs can attach the same luks image to a guest device,
    264    # if both use share-rw=on
    265    def test4(self):
    266        self.createImg(test_img, self.secrets[0]);
    267
    268        self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0])
    269        self.addImageUser(self.vm1, "testctrl", "testdev", share_rw=True)
    270
    271        self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
    272        self.addImageUser(self.vm2, "testctrl", "testdev", share_rw=True)
    273
    274
    275
    276if __name__ == '__main__':
    277    # support only raw luks since luks encrypted qcow2 is a proper
    278    # format driver which doesn't allow any sharing
    279    iotests.activate_logging()
    280    iotests.main(supported_fmts = ['luks'])