nbd-fault-injector.py (8638B)
1 #!/usr/bin/env python3 2 # NBD server - fault injection utility 3 # 4 # Configuration file syntax: 5 # [inject-error "disconnect-neg1"] 6 # event=neg1 7 # io=readwrite 8 # when=before 9 # 10 # Note that Python's ConfigParser squashes together all sections with the same 11 # name, so give each [inject-error] a unique name. 12 # 13 # inject-error options: 14 # event - name of the trigger event 15 # "neg1" - first part of negotiation struct 16 # "export" - export struct 17 # "neg2" - second part of negotiation struct 18 # "request" - NBD request struct 19 # "reply" - NBD reply struct 20 # "data" - request/reply data 21 # io - I/O direction that triggers this rule: 22 # "read", "write", or "readwrite" 23 # default: readwrite 24 # when - after how many bytes to inject the fault 25 # -1 - inject error after I/O 26 # 0 - inject error before I/O 27 # integer - inject error after integer bytes 28 # "before" - alias for 0 29 # "after" - alias for -1 30 # default: before 31 # 32 # Currently the only error injection action is to terminate the server process. 33 # This resets the TCP connection and thus forces the client to handle 34 # unexpected connection termination. 35 # 36 # Other error injection actions could be added in the future. 37 # 38 # Copyright Red Hat, Inc. 2014 39 # 40 # Authors: 41 # Stefan Hajnoczi <stefanha@redhat.com> 42 # 43 # This work is licensed under the terms of the GNU GPL, version 2 or later. 44 # See the COPYING file in the top-level directory. 45 46 import sys 47 import socket 48 import struct 49 import collections 50 import configparser 51 52 FAKE_DISK_SIZE = 8 * 1024 * 1024 * 1024 # 8 GB 53 54 # Protocol constants 55 NBD_CMD_READ = 0 56 NBD_CMD_WRITE = 1 57 NBD_CMD_DISC = 2 58 NBD_REQUEST_MAGIC = 0x25609513 59 NBD_SIMPLE_REPLY_MAGIC = 0x67446698 60 NBD_PASSWD = 0x4e42444d41474943 61 NBD_OPTS_MAGIC = 0x49484156454F5054 62 NBD_CLIENT_MAGIC = 0x0000420281861253 63 NBD_OPT_EXPORT_NAME = 1 << 0 64 65 # Protocol structs 66 neg_classic_struct = struct.Struct('>QQQI124x') 67 neg1_struct = struct.Struct('>QQH') 68 export_tuple = collections.namedtuple('Export', 'reserved magic opt len') 69 export_struct = struct.Struct('>IQII') 70 neg2_struct = struct.Struct('>QH124x') 71 request_tuple = collections.namedtuple('Request', 'magic type handle from_ len') 72 request_struct = struct.Struct('>IIQQI') 73 reply_struct = struct.Struct('>IIQ') 74 75 def err(msg): 76 sys.stderr.write(msg + '\n') 77 sys.exit(1) 78 79 def recvall(sock, bufsize): 80 received = 0 81 chunks = [] 82 while received < bufsize: 83 chunk = sock.recv(bufsize - received) 84 if len(chunk) == 0: 85 raise Exception('unexpected disconnect') 86 chunks.append(chunk) 87 received += len(chunk) 88 return b''.join(chunks) 89 90 class Rule(object): 91 def __init__(self, name, event, io, when): 92 self.name = name 93 self.event = event 94 self.io = io 95 self.when = when 96 97 def match(self, event, io): 98 if event != self.event: 99 return False 100 if io != self.io and self.io != 'readwrite': 101 return False 102 return True 103 104 class FaultInjectionSocket(object): 105 def __init__(self, sock, rules): 106 self.sock = sock 107 self.rules = rules 108 109 def check(self, event, io, bufsize=None): 110 for rule in self.rules: 111 if rule.match(event, io): 112 if rule.when == 0 or bufsize is None: 113 print('Closing connection on rule match %s' % rule.name) 114 self.sock.close() 115 sys.stdout.flush() 116 sys.exit(0) 117 if rule.when != -1: 118 return rule.when 119 return bufsize 120 121 def send(self, buf, event): 122 bufsize = self.check(event, 'write', bufsize=len(buf)) 123 self.sock.sendall(buf[:bufsize]) 124 self.check(event, 'write') 125 126 def recv(self, bufsize, event): 127 bufsize = self.check(event, 'read', bufsize=bufsize) 128 data = recvall(self.sock, bufsize) 129 self.check(event, 'read') 130 return data 131 132 def close(self): 133 self.sock.close() 134 135 def negotiate_classic(conn): 136 buf = neg_classic_struct.pack(NBD_PASSWD, NBD_CLIENT_MAGIC, 137 FAKE_DISK_SIZE, 0) 138 conn.send(buf, event='neg-classic') 139 140 def negotiate_export(conn): 141 # Send negotiation part 1 142 buf = neg1_struct.pack(NBD_PASSWD, NBD_OPTS_MAGIC, 0) 143 conn.send(buf, event='neg1') 144 145 # Receive export option 146 buf = conn.recv(export_struct.size, event='export') 147 export = export_tuple._make(export_struct.unpack(buf)) 148 assert export.magic == NBD_OPTS_MAGIC 149 assert export.opt == NBD_OPT_EXPORT_NAME 150 name = conn.recv(export.len, event='export-name') 151 152 # Send negotiation part 2 153 buf = neg2_struct.pack(FAKE_DISK_SIZE, 0) 154 conn.send(buf, event='neg2') 155 156 def negotiate(conn, use_export): 157 '''Negotiate export with client''' 158 if use_export: 159 negotiate_export(conn) 160 else: 161 negotiate_classic(conn) 162 163 def read_request(conn): 164 '''Parse NBD request from client''' 165 buf = conn.recv(request_struct.size, event='request') 166 req = request_tuple._make(request_struct.unpack(buf)) 167 assert req.magic == NBD_REQUEST_MAGIC 168 return req 169 170 def write_reply(conn, error, handle): 171 buf = reply_struct.pack(NBD_SIMPLE_REPLY_MAGIC, error, handle) 172 conn.send(buf, event='reply') 173 174 def handle_connection(conn, use_export): 175 negotiate(conn, use_export) 176 while True: 177 req = read_request(conn) 178 if req.type == NBD_CMD_READ: 179 write_reply(conn, 0, req.handle) 180 conn.send(b'\0' * req.len, event='data') 181 elif req.type == NBD_CMD_WRITE: 182 _ = conn.recv(req.len, event='data') 183 write_reply(conn, 0, req.handle) 184 elif req.type == NBD_CMD_DISC: 185 break 186 else: 187 print('unrecognized command type %#02x' % req.type) 188 break 189 conn.close() 190 191 def run_server(sock, rules, use_export): 192 while True: 193 conn, _ = sock.accept() 194 handle_connection(FaultInjectionSocket(conn, rules), use_export) 195 196 def parse_inject_error(name, options): 197 if 'event' not in options: 198 err('missing \"event\" option in %s' % name) 199 event = options['event'] 200 if event not in ('neg-classic', 'neg1', 'export', 'neg2', 'request', 'reply', 'data'): 201 err('invalid \"event\" option value \"%s\" in %s' % (event, name)) 202 io = options.get('io', 'readwrite') 203 if io not in ('read', 'write', 'readwrite'): 204 err('invalid \"io\" option value \"%s\" in %s' % (io, name)) 205 when = options.get('when', 'before') 206 try: 207 when = int(when) 208 except ValueError: 209 if when == 'before': 210 when = 0 211 elif when == 'after': 212 when = -1 213 else: 214 err('invalid \"when\" option value \"%s\" in %s' % (when, name)) 215 return Rule(name, event, io, when) 216 217 def parse_config(config): 218 rules = [] 219 for name in config.sections(): 220 if name.startswith('inject-error'): 221 options = dict(config.items(name)) 222 rules.append(parse_inject_error(name, options)) 223 else: 224 err('invalid config section name: %s' % name) 225 return rules 226 227 def load_rules(filename): 228 config = configparser.RawConfigParser() 229 with open(filename, 'rt') as f: 230 config.read_file(f, filename) 231 return parse_config(config) 232 233 def open_socket(path): 234 '''Open a TCP or UNIX domain listen socket''' 235 if ':' in path: 236 host, port = path.split(':', 1) 237 sock = socket.socket() 238 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 239 sock.bind((host, int(port))) 240 241 # If given port was 0 the final port number is now available 242 path = '%s:%d' % sock.getsockname() 243 else: 244 sock = socket.socket(socket.AF_UNIX) 245 sock.bind(path) 246 sock.listen(0) 247 print('Listening on %s' % path) 248 sys.stdout.flush() # another process may be waiting, show message now 249 return sock 250 251 def usage(args): 252 sys.stderr.write('usage: %s [--classic-negotiation] <tcp-port>|<unix-path> <config-file>\n' % args[0]) 253 sys.stderr.write('Run an fault injector NBD server with rules defined in a config file.\n') 254 sys.exit(1) 255 256 def main(args): 257 if len(args) != 3 and len(args) != 4: 258 usage(args) 259 use_export = True 260 if args[1] == '--classic-negotiation': 261 use_export = False 262 elif len(args) == 4: 263 usage(args) 264 sock = open_socket(args[1 if use_export else 2]) 265 rules = load_rules(args[2 if use_export else 3]) 266 run_server(sock, rules, use_export) 267 return 0 268 269 if __name__ == '__main__': 270 sys.exit(main(sys.argv))