checker.py (12351B)
1#!/usr/bin/env python3 2 3import re 4from asyncio import StreamReader, StreamWriter 5from asyncio.exceptions import TimeoutError 6from logging import LoggerAdapter 7from typing import Any, List, Optional, Tuple, cast 8 9from enochecker3 import ( 10 AsyncSocket, 11 ChainDB, 12 DependencyInjector, 13 Enochecker, 14 ExploitCheckerTaskMessage, 15 FlagSearcher, 16 GetflagCheckerTaskMessage, 17 GetnoiseCheckerTaskMessage, 18 InternalErrorException, 19 MumbleException, 20 PutflagCheckerTaskMessage, 21 PutnoiseCheckerTaskMessage, 22) 23 24from crypto import RSA, fake_sign, get_key, load_keys, sign 25from util import gen_noise, gen_username, timed 26 27OK = 0 28FAIL = 1 29 30prompt = b"\r$ " 31 32 33class Session: 34 def __init__(self, socket: AsyncSocket, logger: LoggerAdapter) -> None: 35 socket_tuple = cast(tuple[StreamReader, StreamWriter], socket) 36 self.reader = socket_tuple[0] 37 self.writer = socket_tuple[1] 38 self.reader._limit = 2 ** 20 # type: ignore 39 self.logger = logger 40 self.closed = False 41 42 async def __aenter__(self) -> "Session": 43 self.logger.debug("Preparing session") 44 await self.prepare() 45 return self 46 47 async def __aexit__(self, *args: list[Any], **kwargs: dict[str, Any]) -> None: 48 self.logger.debug("Closing session") 49 await self.close() 50 51 async def readuntil( 52 self, target: bytes, include: bool = True, ctx: Optional[str] = None 53 ) -> bytes: 54 try: 55 ctxstr = f"readuntil {target!r}" if ctx is None else ctx 56 data = await timed(self.reader.readuntil(target), self.logger, ctx=ctxstr) 57 msg = f"read: {data[:100]!r}{'..' if len(data) > 100 else ''}" 58 self.logger.debug(msg) 59 if not include: 60 data = data[: -len(target)] 61 return data 62 except TimeoutError: 63 self.logger.critical(f"Service timed out while waiting for {target!r}") 64 raise MumbleException("Service took too long to respond") 65 66 async def readline(self, ctx: Optional[str] = None) -> bytes: 67 return await self.readuntil(b"\n", ctx=ctx) 68 69 async def read(self, n: int, ctx: Optional[str] = None) -> bytes: 70 try: 71 ctxstr = f"reading {n} bytes" if ctx is None else ctx 72 data = await timed(self.reader.readexactly(n), self.logger, ctx=ctxstr) 73 msg = f"read: {data[:60]!r}{'..' if len(data) > 60 else ''}" 74 self.logger.debug(msg) 75 return data 76 except TimeoutError: 77 self.logger.critical(f"Service timed out while reading {n} bytes") 78 raise MumbleException("Service took too long to respond") 79 80 async def drain(self) -> None: 81 await self.writer.drain() 82 83 def write(self, data: bytes) -> None: 84 msg = f"write: {data[:60]!r}{'..' if len(data) > 60 else ''}" 85 self.logger.debug(msg) 86 self.writer.write(data) 87 88 async def prepare(self) -> None: 89 await self.readuntil(prompt) 90 91 async def exit(self) -> None: 92 if self.closed: 93 return 94 self.write(b"exit\n") 95 await self.drain() 96 await self.readuntil(b"bye!") 97 await self.close() 98 99 async def close(self) -> None: 100 if self.closed: 101 return 102 self.closed = True 103 self.writer.close() 104 await self.writer.wait_closed() 105 106 107class _Enochecker(Enochecker): 108 async def _init(self) -> None: 109 load_keys() 110 await super()._init() 111 112 113checker = _Enochecker("postit", 9337) 114app = lambda: checker.app 115 116 117@checker.register_dependency 118def _get_session(socket: AsyncSocket, logger: LoggerAdapter) -> Session: 119 return Session(socket, logger) 120 121 122async def getdb(db: ChainDB, key: str) -> tuple[Any, ...]: 123 try: 124 return await db.get(key) 125 except KeyError: 126 raise MumbleException( 127 "Could not retrieve necessary info for service interaction" 128 ) 129 130 131async def get_users( 132 session: Session, expect: int = OK 133) -> Tuple[int, Optional[List[bytes]]]: 134 session.write(b"users\n") 135 await session.drain() 136 resp = await session.readuntil(prompt, include=False) 137 try: 138 return OK, [ 139 line.split(b"- ", 1)[1] for line in resp.split(b"\n") if line != b"" 140 ] 141 except IndexError: 142 if expect == FAIL: 143 return FAIL, None 144 session.logger.critical(f"Unexpected response when retrieving users:\n{resp!r}") 145 raise MumbleException("Failed to retrieve user list") 146 147 148async def register( 149 session: Session, username: bytes, rsa: "RSA", expect: int = OK 150) -> int: 151 session.write(b"register %b\n%i\n%i\n" % (username, rsa.e, rsa.n)) 152 await session.drain() 153 resp = await session.readuntil(prompt, include=False) 154 if not resp.endswith(b"Enter RSA modulus: "): 155 if expect == FAIL: 156 return FAIL 157 session.logger.critical( 158 f"Unexpected response during registration of user {username!r}:\n{resp!r}" 159 ) 160 raise MumbleException("Registration not working properly") 161 162 _, users = await get_users(session) 163 assert users is not None 164 if username not in users: 165 if expect == FAIL: 166 return FAIL 167 session.logger.critical( 168 f"Registered user {username!r} missing from user list\n" 169 ) 170 raise MumbleException("Registration not working properly") 171 172 return OK 173 174 175async def login(session: Session, username: bytes, rsa: "RSA", expect: int = OK) -> int: 176 session.write(b"login %b\n" % username) 177 await session.drain() 178 179 resp = await session.readuntil(b"Signature: ") 180 match = re.search(b"Sign this message: (.+)\n", resp) 181 assert match is not None 182 challenge = match.group(1) 183 184 try: 185 signature = sign(challenge, rsa) 186 except ValueError as e: 187 session.logger.critical( 188 f"Failed to generate signature: {e}\n\ 189 Public exponent: {rsa.e!r}\n\ 190 Private exponent: {rsa.d!r}\n\ 191 Modulus: {rsa.n!r}" 192 ) 193 raise InternalErrorException("Failed to sign message") 194 195 session.write(b"%i\n" % signature) 196 await session.drain() 197 198 resp = await session.readuntil(prompt, include=False) 199 if resp != b"": 200 if expect == FAIL: 201 return FAIL 202 session.logger.critical( 203 f"Unexpected response during registration of user {username!r}:\n\ 204 Public exponent: {rsa.e!r}\n\ 205 Private exponent: {rsa.d!r}\n\ 206 Modulus: {rsa.n!r}\n\ 207 Challenge: {challenge!r}\n\ 208 Signature: {signature!r}\n\ 209 Response: {resp!r}" 210 ) 211 raise MumbleException("Authentication not working properly") 212 213 return OK 214 215 216async def userinfo( 217 session: Session, username: bytes, expect: int = OK 218) -> Tuple[int, Optional[Tuple[int, int]]]: 219 session.write(b"info %b\n" % username) 220 await session.drain() 221 resp = await session.readuntil(prompt, include=False) 222 match = re.search(b"Username: (.+)\nRSA Exponent: (.+)\nRSA Modulus: (.+)\n", resp) 223 if match is None: 224 if expect == FAIL: 225 return FAIL, None 226 session.logger.critical( 227 f"Unable to retrieve info for user {username!r}\n\ 228 Received instead:\n{resp!r}" 229 ) 230 raise MumbleException("User info not returned properly") 231 232 return OK, (int(match.group(2)), int(match.group(3))) 233 234 235async def get_posts( 236 session: Session, expect: int = OK 237) -> Tuple[int, Optional[List[bytes]]]: 238 session.write(b"posts\n") 239 await session.drain() 240 resp = await session.readuntil(prompt, include=False) 241 242 try: 243 return OK, [ 244 line.split(b"- ", 1)[1] for line in resp.split(b"\n") if line != b"" 245 ] 246 except IndexError: 247 if expect == FAIL: 248 return FAIL, None 249 session.logger.critical( 250 f"Unexpected response while retrieving posts:\n{resp!r}" 251 ) 252 raise MumbleException("Post listing not working properly") 253 254 255async def add_post(session: Session, msg: bytes, expect: int = OK) -> int: 256 session.write(b"post %b\n" % msg) 257 await session.drain() 258 resp = await session.readuntil(prompt, include=False) 259 if resp != b"": 260 if expect == FAIL: 261 return FAIL 262 session.logger.critical(f"Unexpected response for post creation:\n{resp!r}") 263 raise MumbleException("Post creation not working properly") 264 265 _, posts = await get_posts(session) 266 assert posts is not None 267 if msg not in posts: 268 if expect == FAIL: 269 return FAIL 270 session.logger.critical( 271 f"Previously added post {msg!r} is missing from post list {posts!r}\n" 272 ) 273 raise MumbleException("Post creation not working properly") 274 275 return OK 276 277 278@checker.putflag(0) 279async def putflag(task: PutflagCheckerTaskMessage, di: DependencyInjector) -> str: 280 db = await di.get(ChainDB) 281 282 rsa = get_key() 283 username = gen_username() 284 285 session = await di.get(Session) 286 await register(session, username, rsa) 287 await login(session, username, rsa) 288 await add_post(session, task.flag.encode()) 289 await session.exit() 290 291 await db.set("info", (username, rsa.encode())) 292 return "User '{}' just posted a secret".format(username.decode()) 293 294 295@checker.getflag(0) 296async def getflag(task: GetflagCheckerTaskMessage, di: DependencyInjector) -> None: 297 db = await di.get(ChainDB) 298 username, rsa_params = await getdb(db, "info") 299 rsa = RSA.decode(rsa_params) 300 301 session = await di.get(Session) 302 await login(session, username, rsa) 303 _, posts = await get_posts(session) 304 await session.exit() 305 306 assert posts is not None 307 if task.flag.encode() not in posts: 308 session.logger.critical(f"Flag is missing from posts:\n{posts!r}") 309 raise MumbleException("Failed to retrieve flag") 310 311 312@checker.putnoise(0) 313async def putnoise(task: PutnoiseCheckerTaskMessage, di: DependencyInjector) -> None: 314 db = await di.get(ChainDB) 315 316 rsa = get_key() 317 username = gen_username() 318 noise = gen_noise() 319 320 session = await di.get(Session) 321 await register(session, username, rsa) 322 await login(session, username, rsa) 323 await add_post(session, noise) 324 await session.exit() 325 326 await db.set("info", (username, rsa.encode(), noise)) 327 328 329@checker.getnoise(0) 330async def getnoise(task: GetnoiseCheckerTaskMessage, di: DependencyInjector) -> None: 331 db = await di.get(ChainDB) 332 username, rsa_params, noise = await getdb(db, "info") 333 rsa = RSA.decode(rsa_params) 334 335 session = await di.get(Session) 336 await login(session, username, rsa) 337 _, posts = await get_posts(session) 338 await session.exit() 339 340 assert posts is not None 341 if noise not in posts: 342 session.logger.critical("Noise is missing from posts") 343 raise MumbleException("Failed to retrieve noise") 344 345 346@checker.exploit(0) 347async def exploit(task: ExploitCheckerTaskMessage, di: DependencyInjector) -> None: 348 assert task.attack_info is not None 349 victim = task.attack_info[len("User '") : -len("' just posted a secret")].encode() 350 searcher = await di.get(FlagSearcher) 351 352 session = await di.get(Session) 353 _, res = await userinfo(session, victim) 354 assert res is not None 355 exp, mod = res 356 357 session.write(b"login %b\n" % victim) 358 await session.drain() 359 resp = await session.readuntil(b"Signature: ") 360 361 match = re.search(b"Sign this message: (.+)\n", resp) 362 assert match is not None 363 challenge = match.group(1) 364 365 try: 366 signature = fake_sign(challenge, exp, mod) 367 except (ValueError, AssertionError) as e: 368 session.logger.critical( 369 f"Failed to sign message: {e}\n\ 370 Public exponent: {exp!r}\n\ 371 Modulus: {mod!r}" 372 ) 373 raise InternalErrorException("Failed generate fake signature") 374 session.write(b"%i\n" % signature) 375 await session.drain() 376 377 resp = await session.readuntil(prompt, include=False) 378 if resp != b"": 379 raise MumbleException("Fake signature was rejected by service") 380 381 _, posts = await get_posts(session) 382 assert posts is not None 383 if flag := searcher.search_flag(b"\n".join(posts)): 384 return flag 385 386 raise MumbleException("Exploit failed to find flag in posts") 387 388 389if __name__ == "__main__": 390 checker.run(port=9338)