qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

295 (8710B)


      1 #!/usr/bin/env python3
      2 # group: rw
      3 #
      4 # Test case QMP's encrypted key management
      5 #
      6 # Copyright (C) 2019 Red Hat, Inc.
      7 #
      8 # This program is free software; you can redistribute it and/or modify
      9 # it under the terms of the GNU General Public License as published by
     10 # the Free Software Foundation; either version 2 of the License, or
     11 # (at your option) any later version.
     12 #
     13 # This program is distributed in the hope that it will be useful,
     14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
     15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
     16 # GNU General Public License for more details.
     17 #
     18 # You should have received a copy of the GNU General Public License
     19 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
     20 #
     21 
     22 import iotests
     23 import os
     24 import time
     25 import json
     26 
     27 test_img = os.path.join(iotests.test_dir, 'test.img')
     28 
     29 class Secret:
     30     def __init__(self, index):
     31         self._id = "keysec" + str(index)
     32         # you are not supposed to see the password...
     33         self._secret = "hunter" + str(index)
     34 
     35     def id(self):
     36         return self._id
     37 
     38     def secret(self):
     39         return self._secret
     40 
     41     def to_cmdline_object(self):
     42         return  [ "secret,id=" + self._id + ",data=" + self._secret]
     43 
     44     def to_qmp_object(self):
     45         return { "qom_type" : "secret", "id": self.id(),
     46                  "data": self.secret() }
     47 
     48 ################################################################################
     49 class EncryptionSetupTestCase(iotests.QMPTestCase):
     50 
     51     # test case startup
     52     def setUp(self):
     53         # start the VM
     54         self.vm = iotests.VM()
     55         self.vm.launch()
     56 
     57         # create the secrets and load 'em into the VM
     58         self.secrets = [ Secret(i) for i in range(0, 6) ]
     59         for secret in self.secrets:
     60             result = self.vm.qmp("object-add", **secret.to_qmp_object())
     61             self.assert_qmp(result, 'return', {})
     62 
     63         if iotests.imgfmt == "qcow2":
     64             self.pfx = "encrypt."
     65             self.img_opts = [ '-o', "encrypt.format=luks" ]
     66         else:
     67             self.pfx = ""
     68             self.img_opts = []
     69 
     70     # test case shutdown
     71     def tearDown(self):
     72         # stop the VM
     73         self.vm.shutdown()
     74 
     75     ###########################################################################
     76     # create the encrypted block device
     77     def createImg(self, file, secret):
     78 
     79         iotests.qemu_img(
     80             'create',
     81             '--object', *secret.to_cmdline_object(),
     82             '-f', iotests.imgfmt,
     83             '-o', self.pfx + 'key-secret=' + secret.id(),
     84             '-o', self.pfx + 'iter-time=10',
     85             *self.img_opts,
     86             file,
     87             '1M')
     88 
     89     ###########################################################################
     90     # open an encrypted block device
     91     def openImageQmp(self, id, file, secret, read_only = False):
     92 
     93         encrypt_options = {
     94             'key-secret' : secret.id()
     95         }
     96 
     97         if iotests.imgfmt == "qcow2":
     98             encrypt_options = {
     99                 'encrypt': {
    100                     'format':'luks',
    101                     **encrypt_options
    102                 }
    103             }
    104 
    105         result = self.vm.qmp('blockdev-add', **
    106             {
    107                 'driver': iotests.imgfmt,
    108                 'node-name': id,
    109                 'read-only': read_only,
    110 
    111                 **encrypt_options,
    112 
    113                 'file': {
    114                     'driver': 'file',
    115                     'filename': test_img,
    116                 }
    117             }
    118         )
    119         self.assert_qmp(result, 'return', {})
    120 
    121     # close the encrypted block device
    122     def closeImageQmp(self, id):
    123         result = self.vm.qmp('blockdev-del', **{ 'node-name': id })
    124         self.assert_qmp(result, 'return', {})
    125 
    126     ###########################################################################
    127     # add a key to an encrypted block device
    128     def addKeyQmp(self, id, new_secret, secret = None,
    129                   slot = None, force = False):
    130 
    131         crypt_options = {
    132             'state'      : 'active',
    133             'new-secret' : new_secret.id(),
    134             'iter-time' : 10
    135         }
    136 
    137         if slot != None:
    138             crypt_options['keyslot'] = slot
    139 
    140 
    141         if secret != None:
    142             crypt_options['secret'] = secret.id()
    143 
    144         if iotests.imgfmt == "qcow2":
    145             crypt_options['format'] = 'luks'
    146             crypt_options = {
    147                 'encrypt': crypt_options
    148             }
    149 
    150         args = {
    151             'node-name': id,
    152             'job-id' : 'job_add_key',
    153             'options' : {
    154                     'driver' : iotests.imgfmt,
    155                     **crypt_options
    156                 },
    157         }
    158 
    159         if force == True:
    160             args['force'] = True
    161 
    162         #TODO: check what jobs return
    163         result = self.vm.qmp('x-blockdev-amend', **args)
    164         assert result['return'] == {}
    165         self.vm.run_job('job_add_key')
    166 
    167     # erase a key from an encrypted block device
    168     def eraseKeyQmp(self, id, old_secret = None, slot = None, force = False):
    169 
    170         crypt_options = {
    171             'state'      : 'inactive',
    172         }
    173 
    174         if slot != None:
    175             crypt_options['keyslot'] = slot
    176         if old_secret != None:
    177             crypt_options['old-secret'] = old_secret.id()
    178 
    179         if iotests.imgfmt == "qcow2":
    180             crypt_options['format'] = 'luks'
    181             crypt_options = {
    182                 'encrypt': crypt_options
    183             }
    184 
    185         args = {
    186             'node-name': id,
    187             'job-id' : 'job_erase_key',
    188             'options' : {
    189                     'driver' : iotests.imgfmt,
    190                     **crypt_options
    191                 },
    192         }
    193 
    194         if force == True:
    195             args['force'] = True
    196 
    197         result = self.vm.qmp('x-blockdev-amend', **args)
    198         assert result['return'] == {}
    199         self.vm.run_job('job_erase_key')
    200 
    201     ###########################################################################
    202     # create image, and change its key
    203     def testChangeKey(self):
    204 
    205         # create the image with secret0 and open it
    206         self.createImg(test_img, self.secrets[0]);
    207         self.openImageQmp("testdev", test_img, self.secrets[0])
    208 
    209         # add key to slot 1
    210         self.addKeyQmp("testdev", new_secret = self.secrets[1])
    211 
    212         # add key to slot 5
    213         self.addKeyQmp("testdev", new_secret = self.secrets[2], slot=5)
    214 
    215         # erase key from slot 0
    216         self.eraseKeyQmp("testdev", old_secret = self.secrets[0])
    217 
    218         #reopen the image with secret1
    219         self.closeImageQmp("testdev")
    220         self.openImageQmp("testdev", test_img, self.secrets[1])
    221 
    222         # close and erase the image for good
    223         self.closeImageQmp("testdev")
    224         os.remove(test_img)
    225 
    226     # test that if we erase the old password,
    227     # we can still change the encryption keys using 'old-secret'
    228     def testOldPassword(self):
    229 
    230         # create the image with secret0 and open it
    231         self.createImg(test_img, self.secrets[0]);
    232         self.openImageQmp("testdev", test_img, self.secrets[0])
    233 
    234         # add key to slot 1
    235         self.addKeyQmp("testdev", new_secret = self.secrets[1])
    236 
    237         # erase key from slot 0
    238         self.eraseKeyQmp("testdev", old_secret = self.secrets[0])
    239 
    240         # this will fail as the old password is no longer valid
    241         self.addKeyQmp("testdev", new_secret = self.secrets[2])
    242 
    243         # this will work
    244         self.addKeyQmp("testdev", new_secret = self.secrets[2], secret = self.secrets[1])
    245 
    246         # close and erase the image for good
    247         self.closeImageQmp("testdev")
    248         os.remove(test_img)
    249 
    250     def testUseForceLuke(self):
    251 
    252         self.createImg(test_img, self.secrets[0]);
    253         self.openImageQmp("testdev", test_img, self.secrets[0])
    254 
    255         # Add bunch of secrets
    256         self.addKeyQmp("testdev", new_secret = self.secrets[1], slot=4)
    257         self.addKeyQmp("testdev", new_secret = self.secrets[4], slot=2)
    258 
    259         # overwrite an active secret
    260         self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2)
    261         self.addKeyQmp("testdev", new_secret = self.secrets[5], slot=2, force=True)
    262 
    263         self.addKeyQmp("testdev", new_secret = self.secrets[0])
    264 
    265         # Now erase all the secrets
    266         self.eraseKeyQmp("testdev", old_secret = self.secrets[5])
    267         self.eraseKeyQmp("testdev", slot=4)
    268 
    269         # erase last keyslot
    270         self.eraseKeyQmp("testdev", old_secret = self.secrets[0])
    271         self.eraseKeyQmp("testdev", old_secret = self.secrets[0], force=True)
    272 
    273         self.closeImageQmp("testdev")
    274         os.remove(test_img)
    275 
    276 
    277 if __name__ == '__main__':
    278     iotests.verify_working_luks()
    279     # Encrypted formats support
    280     iotests.activate_logging()
    281     iotests.main(supported_fmts = ['qcow2', 'luks'])