checker.py (10915B)
1from enochecker3 import ( 2 ChainDB, 3 DependencyInjector, 4 Enochecker, 5 ExploitCheckerTaskMessage, 6 GetflagCheckerTaskMessage, 7 GetnoiseCheckerTaskMessage, 8 HavocCheckerTaskMessage, 9 InternalErrorException, 10 MumbleException, 11 PutflagCheckerTaskMessage, 12 PutnoiseCheckerTaskMessage, 13) 14from enochecker3.utils import FlagSearcher, assert_in, assert_equals 15 16from typing import Optional, Callable 17 18from httpx import AsyncClient, Response 19 20from bs4 import BeautifulSoup 21 22from hashlib import md5 23 24from logging import LoggerAdapter 25 26from subprocess import Popen, PIPE 27 28import crypto 29import dateutil.parser 30import json 31import os 32import random 33import string 34import traceback 35 36checker = Enochecker("Fireworx", 1812) 37app = lambda: checker.app 38 39random.seed(int.from_bytes(os.urandom(16), "little")) 40 41noise_alph = string.ascii_letters + string.digits 42def noise(nmin: int, nmax: int) -> str: 43 n = random.randint(nmin, nmax) 44 return "".join(random.choice(noise_alph) for _ in range(n)) 45 46def divmod(a: int, b: int, n: int) -> int: 47 return (a * pow(b, n - 2, n)) % n 48 49def assert_status_code(logger: LoggerAdapter, r: Response, code: int = 200, 50 parse: Optional[Callable[str, str]] = None) -> None: 51 if r.status_code == code: 52 return 53 errlog = r.text 54 if parse is not None: 55 errlog = parse(errlog) 56 logger.error(f"Bad status code during {r.request.method} {r.request.url.path}: " \ 57 + f"({r.status_code} != {code})\n{errlog}") 58 raise MumbleException(f"{r.request.method} {r.request.url.path} failed") 59 60def parse_html(logger: LoggerAdapter, r: Response) -> BeautifulSoup: 61 try: 62 return BeautifulSoup(r.text, "html.parser") 63 except: 64 logger.error(f"Invalid html from {r.request.method} {r.request.url.path}\n" \ 65 + r.text) 66 raise MumbleException(f"Invalid html ({r.request.method} {r.request.url.path})") 67 68def parse_notice(text: str) -> str: 69 try: 70 soup = BeautifulSoup(text, "html.parser") 71 error = soup.select_one("meta#notice").get("content") 72 except: 73 raise MumbleException("Missing error from response") 74 return error 75 76async def do_register(logger: LoggerAdapter, 77 client: AsyncClient, username: str) -> None: 78 r = await client.get("/register") 79 assert_status_code(logger, r, code=200) 80 soup = parse_html(logger, r) 81 82 privkey = crypto.DSAKey.gen() 83 84 data = { 85 "username": username, 86 "p": privkey.p, 87 "q": privkey.q, 88 "g": privkey.g, 89 "x": privkey.x, 90 "y": privkey.y 91 } 92 r = await client.post("/register", data=data) 93 assert_status_code(logger, r, code=200) 94 95 return privkey 96 97async def do_login(logger: LoggerAdapter, client: AsyncClient, 98 username: str, privkey: crypto.DSAKey) -> None: 99 100 r = await client.get("/challenge") 101 assert_status_code(logger, r, code=200) 102 try: 103 challenge = int(r.text) 104 except ValueError: 105 raise MumbleException("Invalid challenge received") 106 107 sig_r, sig_s = privkey.sign(challenge) 108 109 data = { 110 "username": username, 111 "challenge": challenge, 112 "signature": f"{sig_r},{sig_s}" 113 } 114 r = await client.post("/login", data=data) 115 assert_status_code(logger, r, code=200) 116 117async def do_login_bad(logger: LoggerAdapter, client: AsyncClient, 118 username: str, privkey: crypto.DSAKey) -> None: 119 120 r = await client.get("/challenge") 121 assert_status_code(logger, r, code=200) 122 try: 123 challenge = int(r.text) 124 except ValueError: 125 raise MumbleException("Invalid challenge received") 126 127 sig_r, sig_s = privkey.sign(challenge) 128 sig_bad_r = random.randint(2, pow(10, 49)) 129 sig_bad_s = random.randint(2, pow(10, 49)) 130 131 data = { 132 "username": username, 133 "challenge": challenge, 134 "signature": f"{sig_bad_r},{sig_bad_s}" 135 } 136 r = await client.post("/login", data=data) 137 assert(r.status_code != 200) 138 139 try: 140 sig = r.text.split("\n")[-1] 141 r,s = (int(v) for v in sig.split(",")) 142 assert(sig_r == r and sig_s == s) 143 except (KeyError, ValueError): 144 raise MumbleException("Correct sig missing from login error") 145 146async def do_launch(logger: LoggerAdapter, 147 client: AsyncClient, wish: str) -> None: 148 data = { 149 "type": "firework", 150 "x": str(random.uniform(0, 1)), 151 "y": str(random.uniform(0, 1)), 152 "wish": wish 153 } 154 r = await client.post("/launch", data=data) 155 assert_status_code(logger, r, code=200) 156 157async def do_profile(logger: LoggerAdapter, client: AsyncClient, 158 username: Optional[str] = None) -> None: 159 if username is not None: 160 r = await client.get(f"/profile/{username}") 161 else: 162 r = await client.get("/profile") 163 assert_status_code(logger, r, code=200) 164 soup = parse_html(logger, r) 165 166 data = {} 167 data["profile"] = {} 168 data["events"] = [] 169 170 for row in soup.select("#proplist table tr"): 171 key = row.select_one("th").text 172 value = row.select_one("td").text 173 data["profile"][key] = value 174 175 for i, row in enumerate(soup.select("#eventlog table tr")): 176 if i == 0: 177 keys = [v.text for v in row.select("th")] 178 else: 179 event = {} 180 vals = [v.text for v in row.select("td")] 181 for k,v in zip(keys, vals): 182 event[k] = v 183 data["events"].append(event) 184 185 return data 186 187@checker.putflag(0) 188async def putflag(task: PutflagCheckerTaskMessage, logger: LoggerAdapter, 189 client: AsyncClient, db: ChainDB) -> str: 190 username = noise(10, 20) 191 privkey = await do_register(logger, client, username) 192 193 await do_launch(logger, client, task.flag) 194 195 await db.set("info", client.cookies["AIOHTTP_SESSION"]) 196 197 return username 198 199@checker.getflag(0) 200async def getflag(task: GetflagCheckerTaskMessage, 201 logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> None: 202 try: 203 session_cookie = await db.get("info") 204 except KeyError: 205 raise MumbleException("Database info missing") 206 client.cookies["AIOHTTP_SESSION"] = session_cookie 207 208 r = await client.get("/profile") 209 assert_status_code(logger, r, code=200) 210 211 assert_in(task.flag, r.text, "Flag missing") 212 213@checker.putnoise(0) 214async def putnoise(task: PutnoiseCheckerTaskMessage, 215 logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> None: 216 username = noise(10, 20) 217 privkey = await do_register(logger, client, username) 218 219 wish = noise(20, 50) 220 await do_launch(logger, client, wish) 221 222 keyvals = [str(v) for v in privkey.vals()] 223 await db.set("info", (username, wish, keyvals)) 224 225@checker.getnoise(0) 226async def getnoise(task: GetnoiseCheckerTaskMessage, 227 logger: LoggerAdapter, client: AsyncClient, 228 db: ChainDB, di: DependencyInjector) -> None: 229 try: 230 username, wish, keyvals = await db.get("info") 231 except KeyError: 232 raise MumbleException("Database info missing") 233 234 keyvals = [int(v) for v in keyvals] 235 privkey = crypto.DSAKey(*keyvals) 236 await do_login_bad(logger, client, username, privkey) 237 await do_login(logger, client, username, privkey) 238 239 data = await do_profile(logger, client) 240 241 try: 242 for k,v in privkey.pubkey().dict().items(): 243 assert data["profile"][k] == str(v) 244 except Exception as e: 245 trace = traceback.format_exc() 246 logger.error(f"Invalid public key info in profile\n{trace}") 247 raise MumbleException("Invalid public key info in profile") 248 249 try: 250 assert data["events"][0]["wish"] == wish 251 except: 252 raise MumbleException("Wish is missing from events logs") 253 254@checker.havoc(0) 255async def havoc(task: HavocCheckerTaskMessage, logger: LoggerAdapter, 256 client: AsyncClient, db: ChainDB, di: DependencyInjector) -> None: 257 await do_register(logger, client, noise(10, 20)) 258 259 for i in range(random.randint(1, 3)): 260 await do_launch(logger, client, noise(20, 50)) 261 262@checker.exploit(0) 263async def exploit_trivial_sig(task: ExploitCheckerTaskMessage, 264 logger: LoggerAdapter, searcher: FlagSearcher, 265 client: AsyncClient) -> Optional[str]: 266 if task.attack_info == "": 267 raise InternalErrorException("Missing attack info") 268 username = task.attack_info 269 270 data = await do_profile(logger, client, username) 271 try: 272 q = data["profile"]["q"] 273 except KeyError: 274 raise MumbleException("Missing pubkey q for profile") 275 276 r = await client.get("/challenge") 277 assert_status_code(logger, r, code=200) 278 challenge = r.text 279 280 data = { 281 "username": username, 282 "challenge": challenge, 283 "signature": f"1,{q}" 284 } 285 r = await client.post("/login", data=data) 286 assert_status_code(logger, r, code=200) 287 288 r = await client.get("/profile") 289 assert_status_code(logger, r, code=200) 290 291 return searcher.search_flag(r.text) 292 293@checker.exploit(1) 294async def exploit_nonce_reuse(task: ExploitCheckerTaskMessage, 295 logger: LoggerAdapter, searcher: FlagSearcher, 296 client: AsyncClient) -> Optional[str]: 297 if task.attack_info == "": 298 raise InternalErrorException("Missing attack info") 299 username = task.attack_info 300 301 data = await do_profile(logger, client, username) 302 try: 303 p, q, g, y = [int(data["profile"][k]) for k in ("p", "q", "g", "y")] 304 except KeyError as e: 305 raise MumbleException(f"Missing pubkey components {e} in profile") 306 except KeyError: 307 raise MumbleException("Invalid pubkey components in profile") 308 309 sigpairs = [] 310 for i in range(2): 311 r = await client.get("/challenge") 312 assert_status_code(logger, r, code=200) 313 challenge = int(r.text) 314 315 data = { 316 "username": username, 317 "challenge": challenge, 318 "signature": f"1337,1337" 319 } 320 r = await client.post("/login", data=data) 321 assert_status_code(logger, r, code=400) 322 323 try: 324 sig = r.text.split("\n")[-1] 325 r,s = (int(v) for v in sig.split(",")) 326 except (KeyError, ValueError): 327 raise MumbleException("Correct sig missing from login error") 328 329 sigpairs.append((crypto.H(challenge), (r, s))) 330 331 z1, (r1, s1) = sigpairs[0] 332 z2, (r2, s2) = sigpairs[1] 333 334 if r1 != r2: 335 raise MumbleException("Signatures do not have same r, exploit fixed?") 336 337 k = divmod(z1 - z2, s1 - s2, q) 338 x = divmod(k * s1 - z1, r1, q) 339 privkey = crypto.DSAKey(p, q, g, x, y) 340 341 r = await client.get("/challenge") 342 assert_status_code(logger, r, code=200) 343 z3 = int(r.text) 344 r3, s3 = privkey.sign(z3) 345 assert privkey.pubkey().verify(z3, (r3, s3)) 346 347 await do_login(logger, client, username, privkey) 348 349 r = await client.get("/profile") 350 assert_status_code(logger, r, code=200) 351 352 return searcher.search_flag(r.text) 353 354if __name__ == "__main__": 355 checker.run()