cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

139 (15071B)


      1#!/usr/bin/env python3
      2# group: rw quick
      3#
      4# Test cases for the QMP 'blockdev-del' command
      5#
      6# Copyright (C) 2015 Igalia, S.L.
      7# Author: Alberto Garcia <berto@igalia.com>
      8#
      9# This program is free software; you can redistribute it and/or modify
     10# it under the terms of the GNU General Public License as published by
     11# the Free Software Foundation; either version 2 of the License, or
     12# (at your option) any later version.
     13#
     14# This program is distributed in the hope that it will be useful,
     15# but WITHOUT ANY WARRANTY; without even the implied warranty of
     16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     17# GNU General Public License for more details.
     18#
     19# You should have received a copy of the GNU General Public License
     20# along with this program.  If not, see <http://www.gnu.org/licenses/>.
     21#
     22
     23import os
     24import iotests
     25import time
     26
     27base_img = os.path.join(iotests.test_dir, 'base.img')
     28new_img = os.path.join(iotests.test_dir, 'new.img')
     29
     30class TestBlockdevDel(iotests.QMPTestCase):
     31
     32    def setUp(self):
     33        iotests.qemu_img('create', '-f', iotests.imgfmt, base_img, '1M')
     34        self.vm = iotests.VM()
     35        self.vm.add_device("{},id=virtio-scsi".format('virtio-scsi'))
     36        self.vm.launch()
     37
     38    def tearDown(self):
     39        self.vm.shutdown()
     40        os.remove(base_img)
     41        if os.path.isfile(new_img):
     42            os.remove(new_img)
     43
     44    # Check whether a BlockDriverState exists
     45    def checkBlockDriverState(self, node, must_exist = True):
     46        result = self.vm.qmp('query-named-block-nodes')
     47        nodes = [x for x in result['return'] if x['node-name'] == node]
     48        self.assertLessEqual(len(nodes), 1)
     49        self.assertEqual(must_exist, len(nodes) == 1)
     50
     51    # Add a BlockDriverState without a BlockBackend
     52    def addBlockDriverState(self, node):
     53        file_node = '%s_file' % node
     54        self.checkBlockDriverState(node, False)
     55        self.checkBlockDriverState(file_node, False)
     56        opts = {'driver': iotests.imgfmt,
     57                'node-name': node,
     58                'file': {'driver': 'file',
     59                         'node-name': file_node,
     60                         'filename': base_img}}
     61        result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
     62        self.assert_qmp(result, 'return', {})
     63        self.checkBlockDriverState(node)
     64        self.checkBlockDriverState(file_node)
     65
     66    # Add a BlockDriverState that will be used as overlay for the base_img BDS
     67    def addBlockDriverStateOverlay(self, node):
     68        self.checkBlockDriverState(node, False)
     69        iotests.qemu_img('create', '-u', '-f', iotests.imgfmt,
     70                         '-b', base_img, '-F', iotests.imgfmt, new_img, '1M')
     71        opts = {'driver': iotests.imgfmt,
     72                'node-name': node,
     73                'backing': None,
     74                'file': {'driver': 'file',
     75                         'filename': new_img}}
     76        result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
     77        self.assert_qmp(result, 'return', {})
     78        self.checkBlockDriverState(node)
     79
     80    # Delete a BlockDriverState
     81    def delBlockDriverState(self, node, expect_error = False):
     82        self.checkBlockDriverState(node)
     83        result = self.vm.qmp('blockdev-del', node_name = node)
     84        if expect_error:
     85            self.assert_qmp(result, 'error/class', 'GenericError')
     86        else:
     87            self.assert_qmp(result, 'return', {})
     88        self.checkBlockDriverState(node, expect_error)
     89
     90    # Add a device model
     91    def addDeviceModel(self, device, backend, driver = 'virtio-blk'):
     92        result = self.vm.qmp('device_add', id = device,
     93                             driver = driver, drive = backend)
     94        self.assert_qmp(result, 'return', {})
     95
     96    # Delete a device model
     97    def delDeviceModel(self, device, is_virtio_blk = True):
     98        result = self.vm.qmp('device_del', id = device)
     99        self.assert_qmp(result, 'return', {})
    100
    101        result = self.vm.qmp('system_reset')
    102        self.assert_qmp(result, 'return', {})
    103
    104        if is_virtio_blk:
    105            device_path = '/machine/peripheral/%s/virtio-backend' % device
    106            event = self.vm.event_wait(name="DEVICE_DELETED",
    107                                       match={'data': {'path': device_path}})
    108            self.assertNotEqual(event, None)
    109
    110        event = self.vm.event_wait(name="DEVICE_DELETED",
    111                                   match={'data': {'device': device}})
    112        self.assertNotEqual(event, None)
    113
    114    # Remove a BlockDriverState
    115    def ejectDrive(self, device, node, expect_error = False,
    116                   destroys_media = True):
    117        self.checkBlockDriverState(node)
    118        result = self.vm.qmp('eject', id = device)
    119        if expect_error:
    120            self.assert_qmp(result, 'error/class', 'GenericError')
    121            self.checkBlockDriverState(node)
    122        else:
    123            self.assert_qmp(result, 'return', {})
    124            self.checkBlockDriverState(node, not destroys_media)
    125
    126    # Insert a BlockDriverState
    127    def insertDrive(self, device, node):
    128        self.checkBlockDriverState(node)
    129        result = self.vm.qmp('blockdev-insert-medium',
    130                             id = device, node_name = node)
    131        self.assert_qmp(result, 'return', {})
    132        self.checkBlockDriverState(node)
    133
    134    # Create a snapshot using 'blockdev-snapshot-sync'
    135    def createSnapshotSync(self, node, overlay):
    136        self.checkBlockDriverState(node)
    137        self.checkBlockDriverState(overlay, False)
    138        opts = {'node-name': node,
    139                'snapshot-file': new_img,
    140                'snapshot-node-name': overlay,
    141                'format': iotests.imgfmt}
    142        result = self.vm.qmp('blockdev-snapshot-sync', conv_keys=False, **opts)
    143        self.assert_qmp(result, 'return', {})
    144        self.checkBlockDriverState(node)
    145        self.checkBlockDriverState(overlay)
    146
    147    # Create a snapshot using 'blockdev-snapshot'
    148    def createSnapshot(self, node, overlay):
    149        self.checkBlockDriverState(node)
    150        self.checkBlockDriverState(overlay)
    151        result = self.vm.qmp('blockdev-snapshot',
    152                             node = node, overlay = overlay)
    153        self.assert_qmp(result, 'return', {})
    154        self.checkBlockDriverState(node)
    155        self.checkBlockDriverState(overlay)
    156
    157    # Create a mirror
    158    def createMirror(self, node, new_node):
    159        self.checkBlockDriverState(new_node, False)
    160        opts = {'device': node,
    161                'job-id': node,
    162                'target': new_img,
    163                'node-name': new_node,
    164                'sync': 'top',
    165                'format': iotests.imgfmt}
    166        result = self.vm.qmp('drive-mirror', conv_keys=False, **opts)
    167        self.assert_qmp(result, 'return', {})
    168        self.checkBlockDriverState(new_node)
    169
    170    # Complete an existing block job
    171    def completeBlockJob(self, id, node_before, node_after):
    172        result = self.vm.qmp('block-job-complete', device=id)
    173        self.assert_qmp(result, 'return', {})
    174        self.wait_until_completed(id)
    175
    176    # Add a BlkDebug node
    177    # Note that the purpose of this is to test the blockdev-del
    178    # sanity checks, not to create a usable blkdebug drive
    179    def addBlkDebug(self, debug, node):
    180        self.checkBlockDriverState(node, False)
    181        self.checkBlockDriverState(debug, False)
    182        image = {'driver': iotests.imgfmt,
    183                 'node-name': node,
    184                 'file': {'driver': 'file',
    185                          'filename': base_img}}
    186        opts = {'driver': 'blkdebug',
    187                'node-name': debug,
    188                'image': image}
    189        result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
    190        self.assert_qmp(result, 'return', {})
    191        self.checkBlockDriverState(node)
    192        self.checkBlockDriverState(debug)
    193
    194    # Add a BlkVerify node
    195    # Note that the purpose of this is to test the blockdev-del
    196    # sanity checks, not to create a usable blkverify drive
    197    def addBlkVerify(self, blkverify, test, raw):
    198        self.checkBlockDriverState(test, False)
    199        self.checkBlockDriverState(raw, False)
    200        self.checkBlockDriverState(blkverify, False)
    201        iotests.qemu_img('create', '-f', iotests.imgfmt, new_img, '1M')
    202        node_0 = {'driver': iotests.imgfmt,
    203                  'node-name': test,
    204                  'file': {'driver': 'file',
    205                           'filename': base_img}}
    206        node_1 = {'driver': iotests.imgfmt,
    207                  'node-name': raw,
    208                  'file': {'driver': 'file',
    209                           'filename': new_img}}
    210        opts = {'driver': 'blkverify',
    211                'node-name': blkverify,
    212                'test': node_0,
    213                'raw': node_1}
    214        result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
    215        self.assert_qmp(result, 'return', {})
    216        self.checkBlockDriverState(test)
    217        self.checkBlockDriverState(raw)
    218        self.checkBlockDriverState(blkverify)
    219
    220    # Add a Quorum node
    221    def addQuorum(self, quorum, child0, child1):
    222        self.checkBlockDriverState(child0, False)
    223        self.checkBlockDriverState(child1, False)
    224        self.checkBlockDriverState(quorum, False)
    225        iotests.qemu_img('create', '-f', iotests.imgfmt, new_img, '1M')
    226        child_0 = {'driver': iotests.imgfmt,
    227                   'node-name': child0,
    228                   'file': {'driver': 'file',
    229                            'filename': base_img}}
    230        child_1 = {'driver': iotests.imgfmt,
    231                   'node-name': child1,
    232                   'file': {'driver': 'file',
    233                            'filename': new_img}}
    234        opts = {'driver': 'quorum',
    235                'node-name': quorum,
    236                'vote-threshold': 1,
    237                'children': [ child_0, child_1 ]}
    238        result = self.vm.qmp('blockdev-add', conv_keys = False, **opts)
    239        self.assert_qmp(result, 'return', {})
    240        self.checkBlockDriverState(child0)
    241        self.checkBlockDriverState(child1)
    242        self.checkBlockDriverState(quorum)
    243
    244    ########################
    245    # The tests start here #
    246    ########################
    247
    248    def testBlockDriverState(self):
    249        self.addBlockDriverState('node0')
    250        # You cannot delete a file BDS directly
    251        self.delBlockDriverState('node0_file', expect_error = True)
    252        self.delBlockDriverState('node0')
    253
    254    def testDeviceModel(self):
    255        self.addBlockDriverState('node0')
    256        self.addDeviceModel('device0', 'node0')
    257        self.ejectDrive('device0', 'node0', expect_error = True)
    258        self.delBlockDriverState('node0', expect_error = True)
    259        self.delDeviceModel('device0')
    260        self.delBlockDriverState('node0')
    261
    262    def testAttachMedia(self):
    263        # This creates a BlockBackend and removes its media
    264        self.addBlockDriverState('node0')
    265        self.addDeviceModel('device0', 'node0', 'scsi-cd')
    266        self.ejectDrive('device0', 'node0', destroys_media = False)
    267        self.delBlockDriverState('node0')
    268
    269        # This creates a new BlockDriverState and inserts it into the device
    270        self.addBlockDriverState('node1')
    271        self.insertDrive('device0', 'node1')
    272        # The node can't be removed: the new device has an extra reference
    273        self.delBlockDriverState('node1', expect_error = True)
    274        # The BDS still exists after being ejected, but now it can be removed
    275        self.ejectDrive('device0', 'node1', destroys_media = False)
    276        self.delBlockDriverState('node1')
    277        self.delDeviceModel('device0', False)
    278
    279    def testSnapshotSync(self):
    280        self.addBlockDriverState('node0')
    281        self.addDeviceModel('device0', 'node0')
    282        self.createSnapshotSync('node0', 'overlay0')
    283        # This fails because node0 is now being used as a backing image
    284        self.delBlockDriverState('node0', expect_error = True)
    285        self.delBlockDriverState('overlay0', expect_error = True)
    286        # This succeeds because device0 only has the backend reference
    287        self.delDeviceModel('device0')
    288        # FIXME Would still be there if blockdev-snapshot-sync took a ref
    289        self.checkBlockDriverState('overlay0', False)
    290        self.delBlockDriverState('node0')
    291
    292    def testSnapshot(self):
    293        self.addBlockDriverState('node0')
    294        self.addDeviceModel('device0', 'node0', 'scsi-cd')
    295        self.addBlockDriverStateOverlay('overlay0')
    296        self.createSnapshot('node0', 'overlay0')
    297        self.delBlockDriverState('node0', expect_error = True)
    298        self.delBlockDriverState('overlay0', expect_error = True)
    299        self.ejectDrive('device0', 'overlay0', destroys_media = False)
    300        self.delBlockDriverState('node0', expect_error = True)
    301        self.delBlockDriverState('overlay0')
    302        self.delBlockDriverState('node0')
    303
    304    def testMirror(self):
    305        self.addBlockDriverState('node0')
    306        self.addDeviceModel('device0', 'node0', 'scsi-cd')
    307        self.createMirror('node0', 'mirror0')
    308        # The block job prevents removing the device
    309        self.delBlockDriverState('node0', expect_error = True)
    310        self.delBlockDriverState('mirror0', expect_error = True)
    311        self.wait_ready('node0')
    312        self.completeBlockJob('node0', 'node0', 'mirror0')
    313        self.assert_no_active_block_jobs()
    314        # This succeeds because the device now points to mirror0
    315        self.delBlockDriverState('node0')
    316        self.delBlockDriverState('mirror0', expect_error = True)
    317        self.delDeviceModel('device0', False)
    318        # FIXME mirror0 disappears, drive-mirror doesn't take a reference
    319        #self.delBlockDriverState('mirror0')
    320
    321    @iotests.skip_if_unsupported(['blkdebug'])
    322    def testBlkDebug(self):
    323        self.addBlkDebug('debug0', 'node0')
    324        # 'node0' is used by the blkdebug node
    325        self.delBlockDriverState('node0', expect_error = True)
    326        # But we can remove the blkdebug node directly
    327        self.delBlockDriverState('debug0')
    328        self.checkBlockDriverState('node0', False)
    329
    330    @iotests.skip_if_unsupported(['blkverify'])
    331    def testBlkVerify(self):
    332        self.addBlkVerify('verify0', 'node0', 'node1')
    333        # We cannot remove the children of a blkverify device
    334        self.delBlockDriverState('node0', expect_error = True)
    335        self.delBlockDriverState('node1', expect_error = True)
    336        # But we can remove the blkverify node directly
    337        self.delBlockDriverState('verify0')
    338        self.checkBlockDriverState('node0', False)
    339        self.checkBlockDriverState('node1', False)
    340
    341    @iotests.skip_if_unsupported(['quorum'])
    342    def testQuorum(self):
    343        self.addQuorum('quorum0', 'node0', 'node1')
    344        # We cannot remove the children of a Quorum device
    345        self.delBlockDriverState('node0', expect_error = True)
    346        self.delBlockDriverState('node1', expect_error = True)
    347        # But we can remove the Quorum node directly
    348        self.delBlockDriverState('quorum0')
    349        self.checkBlockDriverState('node0', False)
    350        self.checkBlockDriverState('node1', False)
    351
    352
    353if __name__ == '__main__':
    354    iotests.main(supported_fmts=["qcow2"],
    355                 supported_protocols=["file"])