cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

155 (12158B)


      1#!/usr/bin/env python3
      2# group: rw
      3#
      4# Test whether the backing BDSs are correct after completion of a
      5# mirror block job; in "existing" modes (drive-mirror with
      6# mode=existing and blockdev-mirror) the backing chain should not be
      7# overridden.
      8#
      9# Copyright (C) 2016 Red Hat, Inc.
     10#
     11# This program is free software; you can redistribute it and/or modify
     12# it under the terms of the GNU General Public License as published by
     13# the Free Software Foundation; either version 2 of the License, or
     14# (at your option) any later version.
     15#
     16# This program is distributed in the hope that it will be useful,
     17# but WITHOUT ANY WARRANTY; without even the implied warranty of
     18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     19# GNU General Public License for more details.
     20#
     21# You should have received a copy of the GNU General Public License
     22# along with this program.  If not, see <http://www.gnu.org/licenses/>.
     23#
     24
     25import os
     26import iotests
     27from iotests import qemu_img
     28
     29back0_img = os.path.join(iotests.test_dir, 'back0.' + iotests.imgfmt)
     30back1_img = os.path.join(iotests.test_dir, 'back1.' + iotests.imgfmt)
     31back2_img = os.path.join(iotests.test_dir, 'back2.' + iotests.imgfmt)
     32source_img = os.path.join(iotests.test_dir, 'source.' + iotests.imgfmt)
     33target_img = os.path.join(iotests.test_dir, 'target.' + iotests.imgfmt)
     34
     35
     36# Class variables for controlling its behavior:
     37#
     38# existing: If True, explicitly create the target image and blockdev-add it
     39# target_backing: If existing is True: Use this filename as the backing file
     40#                 of the target image
     41#                 (None: no backing file)
     42# target_blockdev_backing: If existing is True: Pass this dict as "backing"
     43#                          for the blockdev-add command
     44#                          (None: do not pass "backing")
     45# target_real_backing: If existing is True: The real filename of the backing
     46#                      image during runtime, only makes sense if
     47#                      target_blockdev_backing is not None
     48#                      (None: same as target_backing)
     49# target_open_with_backing: If True, the target image is added with its backing
     50#                           chain opened right away. If False, blockdev-add
     51#                           opens it without a backing file and job completion
     52#                           is supposed to open the backing chain.
     53# use_iothread: If True, an iothread is configured for the virtio-blk device
     54#               that uses the image being mirrored
     55
     56class BaseClass(iotests.QMPTestCase):
     57    target_blockdev_backing = None
     58    target_real_backing = None
     59    target_open_with_backing = True
     60    use_iothread = False
     61
     62    def setUp(self):
     63        qemu_img('create', '-f', iotests.imgfmt, back0_img, '1440K')
     64        qemu_img('create', '-f', iotests.imgfmt, '-b', back0_img,
     65                 '-F', iotests.imgfmt, back1_img)
     66        qemu_img('create', '-f', iotests.imgfmt, '-b', back1_img,
     67                 '-F', iotests.imgfmt, back2_img)
     68        qemu_img('create', '-f', iotests.imgfmt, '-b', back2_img,
     69                 '-F', iotests.imgfmt, source_img)
     70
     71        self.vm = iotests.VM()
     72        # Add the BDS via blockdev-add so it stays around after the mirror block
     73        # job has been completed
     74        blockdev = {'node-name': 'source',
     75                    'driver': iotests.imgfmt,
     76                    'file': {'driver': 'file',
     77                             'filename': source_img}}
     78        self.vm.add_blockdev(self.vm.qmp_to_opts(blockdev))
     79
     80        if self.use_iothread:
     81            self.vm.add_object('iothread,id=iothread0')
     82            iothread = ",iothread=iothread0"
     83        else:
     84            iothread = ""
     85
     86        self.vm.add_device('virtio-scsi%s' % iothread)
     87        self.vm.add_device('scsi-hd,id=qdev0,drive=source')
     88
     89        self.vm.launch()
     90
     91        self.assertIntactSourceBackingChain()
     92
     93        if self.existing:
     94            if self.target_backing:
     95                qemu_img('create', '-f', iotests.imgfmt,
     96                         '-b', self.target_backing, '-F', 'raw',
     97                         target_img, '1440K')
     98            else:
     99                qemu_img('create', '-f', iotests.imgfmt, target_img, '1440K')
    100
    101            if self.cmd == 'blockdev-mirror':
    102                options = { 'node-name': 'target',
    103                            'driver': iotests.imgfmt,
    104                            'file': { 'driver': 'file',
    105                                      'node-name': 'target-file',
    106                                      'filename': target_img } }
    107
    108                if not self.target_open_with_backing:
    109                        options['backing'] = None
    110                elif self.target_blockdev_backing:
    111                        options['backing'] = self.target_blockdev_backing
    112
    113                result = self.vm.qmp('blockdev-add', **options)
    114                self.assert_qmp(result, 'return', {})
    115
    116    def tearDown(self):
    117        self.vm.shutdown()
    118        os.remove(source_img)
    119        os.remove(back2_img)
    120        os.remove(back1_img)
    121        os.remove(back0_img)
    122        try:
    123            os.remove(target_img)
    124        except OSError:
    125            pass
    126
    127    def findBlockNode(self, node_name, qdev=None):
    128        if qdev:
    129            result = self.vm.qmp('query-block')
    130            for device in result['return']:
    131                if device['qdev'] == qdev:
    132                    if node_name:
    133                        self.assert_qmp(device, 'inserted/node-name', node_name)
    134                    return device['inserted']
    135        else:
    136            result = self.vm.qmp('query-named-block-nodes')
    137            for node in result['return']:
    138                if node['node-name'] == node_name:
    139                    return node
    140
    141        self.fail('Cannot find node %s/%s' % (qdev, node_name))
    142
    143    def assertIntactSourceBackingChain(self):
    144        node = self.findBlockNode('source')
    145
    146        self.assert_qmp(node, 'image' + '/backing-image' * 0 + '/filename',
    147                        source_img)
    148        self.assert_qmp(node, 'image' + '/backing-image' * 1 + '/filename',
    149                        back2_img)
    150        self.assert_qmp(node, 'image' + '/backing-image' * 2 + '/filename',
    151                        back1_img)
    152        self.assert_qmp(node, 'image' + '/backing-image' * 3 + '/filename',
    153                        back0_img)
    154        self.assert_qmp_absent(node, 'image' + '/backing-image' * 4)
    155
    156    def assertCorrectBackingImage(self, node, default_image):
    157        if self.existing:
    158            if self.target_real_backing:
    159                image = self.target_real_backing
    160            else:
    161                image = self.target_backing
    162        else:
    163            image = default_image
    164
    165        if image:
    166            self.assert_qmp(node, 'image/backing-image/filename', image)
    167        else:
    168            self.assert_qmp_absent(node, 'image/backing-image')
    169
    170
    171# Class variables for controlling its behavior:
    172#
    173# cmd: Mirroring command to execute, either drive-mirror or blockdev-mirror
    174
    175class MirrorBaseClass(BaseClass):
    176    def openBacking(self):
    177        pass
    178
    179    def runMirror(self, sync):
    180        if self.cmd == 'blockdev-mirror':
    181            result = self.vm.qmp(self.cmd, job_id='mirror-job', device='source',
    182                                 sync=sync, target='target',
    183                                 auto_finalize=False)
    184        else:
    185            if self.existing:
    186                mode = 'existing'
    187            else:
    188                mode = 'absolute-paths'
    189            result = self.vm.qmp(self.cmd, job_id='mirror-job', device='source',
    190                                 sync=sync, target=target_img,
    191                                 format=iotests.imgfmt, mode=mode,
    192                                 node_name='target', auto_finalize=False)
    193
    194        self.assert_qmp(result, 'return', {})
    195
    196        self.vm.run_job('mirror-job', auto_finalize=False,
    197                        pre_finalize=self.openBacking, auto_dismiss=True)
    198
    199    def testFull(self):
    200        self.runMirror('full')
    201
    202        node = self.findBlockNode('target', 'qdev0')
    203        self.assertCorrectBackingImage(node, None)
    204        self.assertIntactSourceBackingChain()
    205
    206    def testTop(self):
    207        self.runMirror('top')
    208
    209        node = self.findBlockNode('target', 'qdev0')
    210        self.assertCorrectBackingImage(node, back2_img)
    211        self.assertIntactSourceBackingChain()
    212
    213    def testNone(self):
    214        self.runMirror('none')
    215
    216        node = self.findBlockNode('target', 'qdev0')
    217        self.assertCorrectBackingImage(node, source_img)
    218        self.assertIntactSourceBackingChain()
    219
    220
    221class TestDriveMirrorAbsolutePaths(MirrorBaseClass):
    222    cmd = 'drive-mirror'
    223    existing = False
    224
    225class TestDriveMirrorExistingNoBacking(MirrorBaseClass):
    226    cmd = 'drive-mirror'
    227    existing = True
    228    target_backing = None
    229
    230class TestDriveMirrorExistingBacking(MirrorBaseClass):
    231    cmd = 'drive-mirror'
    232    existing = True
    233    target_backing = 'null-co://'
    234
    235class TestBlockdevMirrorNoBacking(MirrorBaseClass):
    236    cmd = 'blockdev-mirror'
    237    existing = True
    238    target_backing = None
    239
    240class TestBlockdevMirrorBacking(MirrorBaseClass):
    241    cmd = 'blockdev-mirror'
    242    existing = True
    243    target_backing = 'null-co://'
    244
    245class TestBlockdevMirrorForcedBacking(MirrorBaseClass):
    246    cmd = 'blockdev-mirror'
    247    existing = True
    248    target_backing = None
    249    target_blockdev_backing = { 'driver': 'null-co' }
    250    target_real_backing = 'null-co://'
    251
    252# Attach the backing chain only during completion, with blockdev-reopen
    253class TestBlockdevMirrorReopen(MirrorBaseClass):
    254    cmd = 'blockdev-mirror'
    255    existing = True
    256    target_backing = 'null-co://'
    257    target_open_with_backing = False
    258
    259    def openBacking(self):
    260        if not self.target_open_with_backing:
    261            result = self.vm.qmp('blockdev-add', node_name="backing",
    262                                 driver="null-co")
    263            self.assert_qmp(result, 'return', {})
    264            result = self.vm.qmp('blockdev-reopen', options=[{
    265                                     'node-name': "target",
    266                                     'driver': iotests.imgfmt,
    267                                     'file': "target-file",
    268                                     'backing': "backing"
    269                                 }])
    270            self.assert_qmp(result, 'return', {})
    271
    272class TestBlockdevMirrorReopenIothread(TestBlockdevMirrorReopen):
    273    use_iothread = True
    274
    275# Attach the backing chain only during completion, with blockdev-snapshot
    276class TestBlockdevMirrorSnapshot(MirrorBaseClass):
    277    cmd = 'blockdev-mirror'
    278    existing = True
    279    target_backing = 'null-co://'
    280    target_open_with_backing = False
    281
    282    def openBacking(self):
    283        if not self.target_open_with_backing:
    284            result = self.vm.qmp('blockdev-add', node_name="backing",
    285                                 driver="null-co")
    286            self.assert_qmp(result, 'return', {})
    287            result = self.vm.qmp('blockdev-snapshot', node="backing",
    288                                 overlay="target")
    289            self.assert_qmp(result, 'return', {})
    290
    291class TestBlockdevMirrorSnapshotIothread(TestBlockdevMirrorSnapshot):
    292    use_iothread = True
    293
    294class TestCommit(BaseClass):
    295    existing = False
    296
    297    def testCommit(self):
    298        result = self.vm.qmp('block-commit', job_id='commit-job',
    299                             device='source', base=back1_img)
    300        self.assert_qmp(result, 'return', {})
    301
    302        self.vm.event_wait('BLOCK_JOB_READY')
    303
    304        result = self.vm.qmp('block-job-complete', device='commit-job')
    305        self.assert_qmp(result, 'return', {})
    306
    307        self.vm.event_wait('BLOCK_JOB_COMPLETED')
    308
    309        node = self.findBlockNode(None, 'qdev0')
    310        self.assert_qmp(node, 'image' + '/backing-image' * 0 + '/filename',
    311                        back1_img)
    312        self.assert_qmp(node, 'image' + '/backing-image' * 1 + '/filename',
    313                        back0_img)
    314        self.assert_qmp_absent(node, 'image' + '/backing-image' * 2 +
    315                               '/filename')
    316
    317        self.assertIntactSourceBackingChain()
    318
    319
    320BaseClass = None
    321MirrorBaseClass = None
    322
    323if __name__ == '__main__':
    324    iotests.main(supported_fmts=['qcow2'],
    325                 supported_protocols=['file'])