cachepc-qemu

Fork of AMDESE/qemu with changes for cachepc side-channel attack
git clone https://git.sinitax.com/sinitax/cachepc-qemu
Log | Files | Refs | Submodules | LICENSE | sfeed.txt

057 (9785B)


      1#!/usr/bin/env python3
      2# group: rw
      3#
      4# Tests for internal snapshot.
      5#
      6# Copyright (C) 2013 IBM, Inc.
      7#
      8# Based on 055.
      9#
     10# This program is free software; you can redistribute it and/or modify
     11# it under the terms of the GNU General Public License as published by
     12# the Free Software Foundation; either version 2 of the License, or
     13# (at your option) any later version.
     14#
     15# This program is distributed in the hope that it will be useful,
     16# but WITHOUT ANY WARRANTY; without even the implied warranty of
     17# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     18# GNU General Public License for more details.
     19#
     20# You should have received a copy of the GNU General Public License
     21# along with this program.  If not, see <http://www.gnu.org/licenses/>.
     22#
     23
     24import time
     25import os
     26import iotests
     27from iotests import qemu_img, qemu_io
     28
     29test_drv_base_name = 'drive'
     30
     31class ImageSnapshotTestCase(iotests.QMPTestCase):
     32    image_len = 120 * 1024 * 1024 # MB
     33
     34    def __init__(self, *args):
     35        self.expect = []
     36        super(ImageSnapshotTestCase, self).__init__(*args)
     37
     38    def _setUp(self, test_img_base_name, image_num):
     39        self.vm = iotests.VM()
     40        for i in range(0, image_num):
     41            filename = '%s%d' % (test_img_base_name, i)
     42            img = os.path.join(iotests.test_dir, filename)
     43            device = '%s%d' % (test_drv_base_name, i)
     44            qemu_img('create', '-f', iotests.imgfmt, img, str(self.image_len))
     45            self.vm.add_drive(img)
     46            self.expect.append({'image': img, 'device': device,
     47                                'snapshots': [],
     48                                'snapshots_name_counter': 0})
     49        self.vm.launch()
     50
     51    def tearDown(self):
     52        self.vm.shutdown()
     53        for dev_expect in self.expect:
     54            os.remove(dev_expect['image'])
     55
     56    def createSnapshotInTransaction(self, snapshot_num, abort = False):
     57        actions = []
     58        for dev_expect in self.expect:
     59            num = dev_expect['snapshots_name_counter']
     60            for j in range(0, snapshot_num):
     61                name = '%s_sn%d' % (dev_expect['device'], num)
     62                num = num + 1
     63                if abort == False:
     64                    dev_expect['snapshots'].append({'name': name})
     65                    dev_expect['snapshots_name_counter'] = num
     66                actions.append({
     67                    'type': 'blockdev-snapshot-internal-sync',
     68                    'data': { 'device': dev_expect['device'],
     69                              'name': name },
     70                })
     71
     72        if abort == True:
     73            actions.append({
     74                'type': 'abort',
     75                'data': {},
     76            })
     77
     78        result = self.vm.qmp('transaction', actions = actions)
     79
     80        if abort == True:
     81            self.assert_qmp(result, 'error/class', 'GenericError')
     82        else:
     83            self.assert_qmp(result, 'return', {})
     84
     85    def verifySnapshotInfo(self):
     86        result = self.vm.qmp('query-block')
     87
     88        # Verify each expected result
     89        for dev_expect in self.expect:
     90            # 1. Find the returned image value and snapshot info
     91            image_result = None
     92            for device in result['return']:
     93                if device['device'] == dev_expect['device']:
     94                    image_result = device['inserted']['image']
     95                    break
     96            self.assertTrue(image_result != None)
     97            # Do not consider zero snapshot case now
     98            sn_list_result = image_result['snapshots']
     99            sn_list_expect = dev_expect['snapshots']
    100
    101            # 2. Verify it with expect
    102            self.assertTrue(len(sn_list_result) == len(sn_list_expect))
    103
    104            for sn_expect in sn_list_expect:
    105                sn_result = None
    106                for sn in sn_list_result:
    107                    if sn_expect['name'] == sn['name']:
    108                        sn_result = sn
    109                        break
    110                self.assertTrue(sn_result != None)
    111                # Fill in the detail info
    112                sn_expect.update(sn_result)
    113
    114    def deleteSnapshot(self, device, id = None, name = None):
    115        sn_list_expect = None
    116        sn_expect = None
    117
    118        self.assertTrue(id != None or name != None)
    119
    120        # Fill in the detail info include ID
    121        self.verifySnapshotInfo()
    122
    123        #find the expected snapshot list
    124        for dev_expect in self.expect:
    125            if dev_expect['device'] == device:
    126                sn_list_expect = dev_expect['snapshots']
    127                break
    128        self.assertTrue(sn_list_expect != None)
    129
    130        if id != None and name != None:
    131            for sn in sn_list_expect:
    132                if sn['id'] == id and sn['name'] == name:
    133                    sn_expect = sn
    134                    result = \
    135                          self.vm.qmp('blockdev-snapshot-delete-internal-sync',
    136                                      device = device,
    137                                      id = id,
    138                                      name = name)
    139                    break
    140        elif id != None:
    141            for sn in sn_list_expect:
    142                if sn['id'] == id:
    143                    sn_expect = sn
    144                    result = \
    145                          self.vm.qmp('blockdev-snapshot-delete-internal-sync',
    146                                      device = device,
    147                                      id = id)
    148                    break
    149        else:
    150            for sn in sn_list_expect:
    151                if sn['name'] == name:
    152                    sn_expect = sn
    153                    result = \
    154                          self.vm.qmp('blockdev-snapshot-delete-internal-sync',
    155                                      device = device,
    156                                      name = name)
    157                    break
    158
    159        self.assertTrue(sn_expect != None)
    160
    161        self.assert_qmp(result, 'return', sn_expect)
    162        sn_list_expect.remove(sn_expect)
    163
    164class TestSingleTransaction(ImageSnapshotTestCase):
    165    def setUp(self):
    166        self._setUp('test_a.img', 1)
    167
    168    def test_create(self):
    169        self.createSnapshotInTransaction(1)
    170        self.verifySnapshotInfo()
    171
    172    def test_error_name_empty(self):
    173        actions = [{'type': 'blockdev-snapshot-internal-sync',
    174                    'data': { 'device': self.expect[0]['device'],
    175                              'name': '' },
    176                  }]
    177        result = self.vm.qmp('transaction', actions = actions)
    178        self.assert_qmp(result, 'error/class', 'GenericError')
    179
    180    def test_error_device(self):
    181        actions = [{'type': 'blockdev-snapshot-internal-sync',
    182                    'data': { 'device': 'drive_error',
    183                              'name': 'a' },
    184                  }]
    185        result = self.vm.qmp('transaction', actions = actions)
    186        self.assert_qmp(result, 'error/class', 'GenericError')
    187
    188    def test_error_exist(self):
    189        self.createSnapshotInTransaction(1)
    190        self.verifySnapshotInfo()
    191        actions = [{'type': 'blockdev-snapshot-internal-sync',
    192                    'data': { 'device': self.expect[0]['device'],
    193                              'name': self.expect[0]['snapshots'][0] },
    194                  }]
    195        result = self.vm.qmp('transaction', actions = actions)
    196        self.assert_qmp(result, 'error/class', 'GenericError')
    197
    198class TestMultipleTransaction(ImageSnapshotTestCase):
    199    def setUp(self):
    200        self._setUp('test_b.img', 2)
    201
    202    def test_create(self):
    203        self.createSnapshotInTransaction(3)
    204        self.verifySnapshotInfo()
    205
    206    def test_abort(self):
    207        self.createSnapshotInTransaction(2)
    208        self.verifySnapshotInfo()
    209        self.createSnapshotInTransaction(3, abort = True)
    210        self.verifySnapshotInfo()
    211
    212class TestSnapshotDelete(ImageSnapshotTestCase):
    213    def setUp(self):
    214        self._setUp('test_c.img', 1)
    215
    216    def test_delete_with_id(self):
    217        self.createSnapshotInTransaction(2)
    218        self.verifySnapshotInfo()
    219        self.deleteSnapshot(self.expect[0]['device'],
    220                            id = self.expect[0]['snapshots'][0]['id'])
    221        self.verifySnapshotInfo()
    222
    223    def test_delete_with_name(self):
    224        self.createSnapshotInTransaction(3)
    225        self.verifySnapshotInfo()
    226        self.deleteSnapshot(self.expect[0]['device'],
    227                            name = self.expect[0]['snapshots'][1]['name'])
    228        self.verifySnapshotInfo()
    229
    230    def test_delete_with_id_and_name(self):
    231        self.createSnapshotInTransaction(4)
    232        self.verifySnapshotInfo()
    233        self.deleteSnapshot(self.expect[0]['device'],
    234                            id = self.expect[0]['snapshots'][2]['id'],
    235                            name = self.expect[0]['snapshots'][2]['name'])
    236        self.verifySnapshotInfo()
    237
    238
    239    def test_error_device(self):
    240        result = self.vm.qmp('blockdev-snapshot-delete-internal-sync',
    241                              device = 'drive_error',
    242                              id = '0')
    243        self.assert_qmp(result, 'error/class', 'GenericError')
    244
    245    def test_error_no_id_and_name(self):
    246        result = self.vm.qmp('blockdev-snapshot-delete-internal-sync',
    247                              device = self.expect[0]['device'])
    248        self.assert_qmp(result, 'error/class', 'GenericError')
    249
    250    def test_error_snapshot_not_exist(self):
    251        self.createSnapshotInTransaction(2)
    252        self.verifySnapshotInfo()
    253        result = self.vm.qmp('blockdev-snapshot-delete-internal-sync',
    254                              device = self.expect[0]['device'],
    255                              id = self.expect[0]['snapshots'][0]['id'],
    256                              name = self.expect[0]['snapshots'][1]['name'])
    257        self.assert_qmp(result, 'error/class', 'GenericError')
    258
    259if __name__ == '__main__':
    260    iotests.main(supported_fmts=['qcow2'],
    261                 supported_protocols=['file'])