qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

296 (9264B)


      1 #!/usr/bin/env python3
      2 # group: rw
      3 #
      4 # Test case for encryption key management versus image sharing
      5 #
      6 # Copyright (C) 2019 Red Hat, Inc.
      7 #
      8 # This program is free software; you can redistribute it and/or modify
      9 # it under the terms of the GNU General Public License as published by
     10 # the Free Software Foundation; either version 2 of the License, or
     11 # (at your option) any later version.
     12 #
     13 # This program is distributed in the hope that it will be useful,
     14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     16 # GNU General Public License for more details.
     17 #
     18 # You should have received a copy of the GNU General Public License
     19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
     20 #
     21 
     22 import iotests
     23 import os
     24 import time
     25 import json
     26 
     27 test_img = os.path.join(iotests.test_dir, 'test.img')
     28 
     29 class Secret:
     30     def __init__(self, index):
     31         self._id = "keysec" + str(index)
     32         # you are not supposed to see the password...
     33         self._secret = "hunter" + str(index)
     34 
     35     def id(self):
     36         return self._id
     37 
     38     def secret(self):
     39         return self._secret
     40 
     41     def to_cmdline_object(self):
     42         return  [ "secret,id=" + self._id + ",data=" + self._secret]
     43 
     44     def to_qmp_object(self):
     45         return { "qom_type" : "secret", "id": self.id(),
     46                  "data": self.secret() }
     47 
     48 ################################################################################
     49 
     50 class EncryptionSetupTestCase(iotests.QMPTestCase):
     51 
     52     # test case startup
     53     def setUp(self):
     54 
     55         # start the VMs
     56         self.vm1 = iotests.VM(path_suffix = 'VM1')
     57         self.vm2 = iotests.VM(path_suffix = 'VM2')
     58         self.vm1.launch()
     59         self.vm2.launch()
     60 
     61         # create the secrets and load 'em into the VMs
     62         self.secrets = [ Secret(i) for i in range(0, 4) ]
     63         for secret in self.secrets:
     64             result = self.vm1.qmp("object-add", **secret.to_qmp_object())
     65             self.assert_qmp(result, 'return', {})
     66             result = self.vm2.qmp("object-add", **secret.to_qmp_object())
     67             self.assert_qmp(result, 'return', {})
     68 
     69     # test case shutdown
     70     def tearDown(self):
     71         # stop the VM
     72         self.vm1.shutdown()
     73         self.vm2.shutdown()
     74 
     75     ###########################################################################
     76     # create the encrypted block device using qemu-img
     77     def createImg(self, file, secret):
     78 
     79         iotests.qemu_img(
     80             'create',
     81             '--object', *secret.to_cmdline_object(),
     82             '-f', iotests.imgfmt,
     83             '-o', 'key-secret=' + secret.id(),
     84             '-o', 'iter-time=10',
     85             file,
     86             '1M')
     87         iotests.log('')
     88 
     89     # attempts to add a key using qemu-img
     90     def addKey(self, file, secret, new_secret):
     91 
     92         image_options = {
     93             'key-secret' : secret.id(),
     94             'driver' : iotests.imgfmt,
     95             'file' : {
     96                 'driver':'file',
     97                 'filename': file,
     98                 }
     99             }
    100 
    101         output = iotests.qemu_img(
    102             'amend',
    103             '--object', *secret.to_cmdline_object(),
    104             '--object', *new_secret.to_cmdline_object(),
    105 
    106             '-o', 'state=active',
    107             '-o', 'new-secret=' + new_secret.id(),
    108             '-o', 'iter-time=10',
    109 
    110             "json:" + json.dumps(image_options),
    111             check=False  # Expected to fail. Log output.
    112         ).stdout
    113 
    114         iotests.log(output, filters=[iotests.filter_test_dir])
    115 
    116     ###########################################################################
    117     # open an encrypted block device
    118     def openImageQmp(self, vm, id, file, secret,
    119                      readOnly = False, reOpen = False):
    120 
    121         command = 'blockdev-reopen' if reOpen else 'blockdev-add'
    122 
    123         opts = {
    124                 'driver': iotests.imgfmt,
    125                 'node-name': id,
    126                 'read-only': readOnly,
    127                 'key-secret' : secret.id(),
    128                 'file': {
    129                     'driver': 'file',
    130                     'filename': test_img,
    131                 }
    132             }
    133 
    134         if reOpen:
    135             result = vm.qmp(command, options=[opts])
    136         else:
    137             result = vm.qmp(command, **opts)
    138         self.assert_qmp(result, 'return', {})
    139 
    140 
    141     ###########################################################################
    142     # add virtio-blk consumer for a block device
    143     def addImageUser(self, vm, id, disk_id, share_rw=False):
    144         result = vm.qmp('device_add', **
    145             {
    146                 'driver': 'virtio-blk',
    147                 'id': id,
    148                 'drive': disk_id,
    149                 'share-rw' : share_rw
    150             }
    151         )
    152 
    153         iotests.log(result)
    154 
    155     # close the encrypted block device
    156     def closeImageQmp(self, vm, id):
    157         result = vm.qmp('blockdev-del', **{ 'node-name': id })
    158         self.assert_qmp(result, 'return', {})
    159 
    160     ###########################################################################
    161 
    162     # add a key to an encrypted block device
    163     def addKeyQmp(self, vm, id, new_secret):
    164 
    165         args = {
    166             'node-name': id,
    167             'job-id' : 'job0',
    168             'options' : {
    169                 'state'     : 'active',
    170                 'driver'    : iotests.imgfmt,
    171                 'new-secret': new_secret.id(),
    172                 'iter-time' : 10
    173             },
    174         }
    175 
    176         result = vm.qmp('x-blockdev-amend', **args)
    177         iotests.log(result)
    178         # Run the job only if it was created
    179         event = ('JOB_STATUS_CHANGE',
    180                  {'data': {'id': 'job0', 'status': 'created'}})
    181         if vm.events_wait([event], timeout=0.0) is not None:
    182             vm.run_job('job0')
    183 
    184     # test that when the image opened by two qemu processes,
    185     # neither of them can update the encryption keys
    186     def test1(self):
    187         self.createImg(test_img, self.secrets[0]);
    188 
    189         # VM1 opens the image and adds a key
    190         self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0])
    191         self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[1])
    192 
    193 
    194         # VM2 opens the image
    195         self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
    196 
    197 
    198         # neither VMs now should be able to add a key
    199         self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
    200         self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2])
    201 
    202 
    203         # VM 1 closes the image
    204         self.closeImageQmp(self.vm1, "testdev")
    205 
    206 
    207         # now VM2 can add the key
    208         self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2])
    209 
    210 
    211         # qemu-img should also not be able to add a key
    212         self.addKey(test_img, self.secrets[0], self.secrets[2])
    213 
    214         # cleanup
    215         self.closeImageQmp(self.vm2, "testdev")
    216         os.remove(test_img)
    217 
    218 
    219     # test that when the image opened by two qemu processes,
    220     # even if first VM opens it read-only, the second can't update encryption
    221     # keys
    222     def test2(self):
    223         self.createImg(test_img, self.secrets[0]);
    224 
    225         # VM1 opens the image readonly
    226         self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0],
    227                           readOnly = True)
    228 
    229         # VM2 opens the image
    230         self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
    231 
    232         # VM1 can't add a key since image is readonly
    233         self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
    234 
    235         # VM2 can't add a key since VM is has the image opened
    236         self.addKeyQmp(self.vm2, "testdev", new_secret = self.secrets[2])
    237 
    238 
    239         #VM1 reopens the image read-write
    240         self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0],
    241                           reOpen = True, readOnly = False)
    242 
    243         # VM1 still can't add the key
    244         self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
    245 
    246         # VM2 gets away
    247         self.closeImageQmp(self.vm2, "testdev")
    248 
    249         # VM1 now can add the key
    250         self.addKeyQmp(self.vm1, "testdev", new_secret = self.secrets[2])
    251 
    252         self.closeImageQmp(self.vm1, "testdev")
    253         os.remove(test_img)
    254 
    255     # test that two VMs can't open the same luks image by default
    256     # and attach it to a guest device
    257     def test3(self):
    258         self.createImg(test_img, self.secrets[0]);
    259 
    260         self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0])
    261         self.addImageUser(self.vm1, "testctrl", "testdev")
    262 
    263         self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
    264         self.addImageUser(self.vm2, "testctrl", "testdev")
    265 
    266 
    267     # test that two VMs can attach the same luks image to a guest device,
    268     # if both use share-rw=on
    269     def test4(self):
    270         self.createImg(test_img, self.secrets[0]);
    271 
    272         self.openImageQmp(self.vm1, "testdev", test_img, self.secrets[0])
    273         self.addImageUser(self.vm1, "testctrl", "testdev", share_rw=True)
    274 
    275         self.openImageQmp(self.vm2, "testdev", test_img, self.secrets[0])
    276         self.addImageUser(self.vm2, "testctrl", "testdev", share_rw=True)
    277 
    278 
    279 
    280 if __name__ == '__main__':
    281     # support only raw luks since luks encrypted qcow2 is a proper
    282     # format driver which doesn't allow any sharing
    283     iotests.activate_logging()
    284     iotests.main(supported_fmts = ['luks'])