aboutsummaryrefslogtreecommitdiffstats
path: root/checker/src/checker.py
diff options
context:
space:
mode:
Diffstat (limited to 'checker/src/checker.py')
-rw-r--r--checker/src/checker.py292
1 files changed, 191 insertions, 101 deletions
diff --git a/checker/src/checker.py b/checker/src/checker.py
index 8818214..85c3a91 100644
--- a/checker/src/checker.py
+++ b/checker/src/checker.py
@@ -1,65 +1,96 @@
#!/usr/bin/env python3
from enochecker import BaseChecker, BrokenServiceException, EnoException, run
from enochecker.utils import SimpleSocket, assert_equals, assert_in
-import random, string, struct, logging
+import random, string, struct, logging, selectors, time, socket
+import pwnlib
import numpy as np
logging.getLogger("faker").setLevel(logging.WARNING)
+logging.getLogger("pwnlib").setLevel(logging.WARNING)
from faker import Faker
-fake = Faker(["en_US"])
+def ensure_bytes(v):
+ if type(v) == bytes:
+ return v
+ elif type(v) == str:
+ return v.encode("latin-1")
+ else:
+ raise BrokenServiceException("Tried to pass non str/bytes to bytes arg")
class STLDoctorChecker(BaseChecker):
- flag_variants = 1
- noise_variants = 1
+ flag_variants = 2
+ noise_variants = 2
havoc_variants = 4
service_name = "stldoctor"
port = 9000
- def login_user(self, conn: SimpleSocket, password: str):
- self.debug("Sending command to login.")
- conn.write(f"login\n{password}\n")
- conn.readline_expect(b"logged in!", read_until=b"$", exception_message="Failed to log in")
-
- def binwrite(conn: SimpleSocket, buf: bytes):
- conn.sock.sendall(buf)
-
- def fake_filename(self):
- allowed = "abcdefghijklmopqrstuvwxyz0123456789-+.!"
- return "".join([c for c in fake.name().lower().replace(" ", "-") if c in allowed][:60]).ljust(5, "!")
+ debuglog = True
- def generate_ascii_file(self, solidname: str):
- if solidname != "":
- content = f"solid {solidname}\n"
+ def login_user(self, conn, password):
+ self.debug("Sending command to login.")
+ self.write(conn, f"login\n{password}\n")
+ conn.readline_expect(b"logged in!", recvuntil=b"$", exception_message="Failed to log in")
+
+ def write(self, conn, buf):
+ if self.debuglog:
+ print("SEND: " + str(ensure_bytes(buf)))
+ conn.send(ensure_bytes(buf))
+
+ def fakeid(self):
+ fake = Faker(["en_US"])
+ allowed = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmopqrstuvwxyz0123456789-+.!"
+ return "".join([c for c in fake.name().lower().replace(" ", "-") if c in allowed][:60]).ljust(10, "!")
+
+ def havocid(self):
+ return "".join([chr(int(random.randint(ord(' '), 255))) for i in range(60)])
+
+ def do_auth(self, conn: SimpleSocket, authstr: str):
+ self.write(conn, f"auth {authstr}\n")
+ resp = conn.recvuntil("$")
+ assert_in(b"Success!", resp, f"Login with pass '{authstr}' failed!");
+
+ def check_listed(self, conn, modelid):
+ modelid = ensure_bytes(modelid)
+ self.write(conn, "list\n")
+ resp = conn.recvuntil("$")
+ assert_in(modelid, resp, f"Uploaded model is missing from list command")
+
+ def generate_ascii_file(self, solidname):
+ solidname = ensure_bytes(solidname)
+
+ if len(solidname) != 0:
+ content = b"solid " + solidname + b"\n"
else:
- content = "solid\n"
- facet_count = int(random.random() * 30) + 4
+ content = b"solid\n"
+ facet_count = random.randint(4, 30)
for fi in range(facet_count):
- content += "facet normal "
+ content += b"facet normal "
vs = [[random.random() for i in range(3)] for k in range(3)]
norm = np.cross(np.subtract(vs[1], vs[0]), np.subtract(vs[2],vs[0]))
norm = norm / np.linalg.norm(norm)
- content += " ".join([f"{v:.2f}" for v in norm]) + "\n"
- content += "outer loop\n"
+ content += " ".join([f"{v:.2f}" for v in norm]).encode() + b"\n"
+ content += b"outer loop\n"
for i in range(3):
- content += "vertex " + " ".join([f"{v:.2f}" for v in vs[i]]) + "\n"
- content += "endloop\n"
- content += "endfacet\n"
+ content += b"vertex " + " ".join([f"{v:.2f}" for v in vs[i]]).encode() + b"\n"
+ content += b"endloop\n"
+ content += b"endfacet\n"
if solidname != b"":
- content += f"endsolid {solidname}\n"
+ content += b"endsolid " + solidname + b"\n"
else:
- content += "endsolid\n"
+ content += b"endsolid\n"
- return content.encode("latin1")
+ return content
+
+ def generate_bin_file(self, solidname):
+ solidname = ensure_bytes(solidname)
- def generate_bin_file(self, solidname: str):
- if len(solidname.encode()) > 78:
+ if len(solidname) > 78:
raise EnoException("Solidname to embed in header is larger than header itself")
if solidname != "":
- content = b"#" + solidname.encode("ascii").ljust(78, b"\x00") + b"\x00"
+ content = b"#" + solidname.ljust(78, b"\x00") + b"\x00"
else:
content = b"#" + b"\x00" * 79
- facet_count = int(random.random() * 30) + 4
+ facet_count = random.randint(4, 30)
content += struct.pack("<I", facet_count)
for fi in range(facet_count):
vs = [[random.random() for i in range(3)] for k in range(3)]
@@ -72,7 +103,7 @@ class STLDoctorChecker(BaseChecker):
content += b"\x00\x00"
return content
- def generate_file(self, filetype: str, solidname: str):
+ def generate_file(self, filetype, solidname):
if filetype == "ascii":
return self.generate_ascii_file(solidname = solidname)
elif filetype == "bin":
@@ -80,39 +111,68 @@ class STLDoctorChecker(BaseChecker):
else:
raise EnoException("Invalid file type supplied");
- def putfile(self, conn: SimpleSocket, solidname: str, modelname: str, filetype: str):
+ def putfile(self, conn, solidname, modelname, filetype):
+ solidname = ensure_bytes(solidname)
+ modelname = ensure_bytes(modelname)
+
# Generate file contents
stlfile = self.generate_file(filetype = filetype, solidname = solidname)
# Upload file
self.debug("Sending command to submit file")
- conn.write("submit\n")
- conn.write(f"{len(stlfile)}\n")
- binwrite(conn, stlfile)
- conn.write(f"{modelname}\n")
- self.debug(b":::RESPONSE:::\n" + conn.read_until(b"with ID "))
+ self.write(conn, "upload\n")
+ self.write(conn, f"{len(stlfile)}\n")
+ self.write(conn, stlfile)
+ self.write(conn, modelname + b"\n")
# Parse ID
- fileid = conn.read_until(b"!")
- if fileid == b"":
+ print(conn.recvuntil("with ID "))
+ modelid = conn.recvuntil(b"!")[:-1]
+ if modelid == b"":
raise BrokenServiceException("Unable to upload file!")
- self.debug(f"Got ID {fileid}")
+ self.debug(f"Got ID {modelid}")
+
+ conn.recvuntil(b"$")
+
+ if self.debuglog:
+ print(f"PUT FILE: {modelid}")
- conn.read_until(b"$")
+ return stlfile, modelid
- return stlfile, fileid
+ def getfile(self, conn, modelname):
+ modelname = ensure_bytes(modelname)
- def getfile(self, conn: SimpleSocket, modelname: str):
if modelname != "":
- self.debug(f"Sending command to retrieve file with '{modelname}'")
- conn.write(f"query\n{modelname}\n0\ny\n")
+ self.debug(f"Sending command to retrieve file with name '{modelname}'")
+ self.write(conn, "search\n")
+ self.write(conn, modelname + b"\n")
+ self.write(conn, "0\n") # first result
+ self.write(conn, "y\n") # yes download
else:
self.debug(f"Sending command to retrieve file")
- conn.write(f"query\n0\ny\n")
+ self.write(conn, f"search\n0\ny\n")
- resp = conn.read_until(b"$")
+ resp = conn.recvuntil(b"Here you go.. (")
+ try:
+ size = int(conn.recvuntil(b"B)\n")[:-3])
+ except:
+ raise BrokenServiceException("Returned file content size for download is not a valid integer")
- return resp
+ print("FILE SIZE:", size)
+ contents = conn.recvn(size)
+ if self.debuglog:
+ print("GOT FILE:\n" + str(contents))
+
+ conn.recvuntil("$") # clean up rest
+ return resp + contents
+
+ def check_getfile(self, conn, modelname, solidname, contents, modelid = None):
+ resp = self.getfile(conn, modelname = modelname)
+ if modelid:
+ assert_in(ensure_bytes(modelid), resp, f"Model id {modelid} not returned / correctly parsed")
+ assert_in(ensure_bytes(modelname), resp, f"Model name {modelname} not returned / correctly parsed")
+ assert_in(ensure_bytes(solidname), resp, f"Solid name {solidname} not returned / correctly parsed")
+ assert_in(ensure_bytes(contents), resp, f"STL File contents not returned / correctly parsed")
def querydb(self, *args):
self.debug("Querying db contents");
@@ -128,107 +188,137 @@ class STLDoctorChecker(BaseChecker):
def postdb(self, vdict):
self.chain_db = vdict
- def havoc_upload(self, filetype: str, registered: bool):
+ def havoc_upload(self, filetype, register):
+ solidname = self.havocid()
+ # these should not be havoc, since they are hashed
+ # and this could trigger the buffer overflow: part of exploit 2
+ modelname = self.fakeid()
+ authstr = self.fakeid()
+
+ # create new session and user and upload file
conn = self.openconn()
- if registered:
- pass # TODO: auth
- modelname = self.fake_filename()[:50]
- modelname += "".join([chr(127 + int(random.random() * 128)) for i in range(10)]) # noise
- contents, mid = self.putfile(conn, filetype = filetype, modelname = modelname, solidname = self.fake_filename())
- resp = self.getfile(conn, modelname = modelname)
- assert_in(modelname.encode(), resp, f"Model name '{modelname}' not returned / correctly parsed")
- assert_in(solidname.encode(), resp, f"Solid name '{modelname}' not returned / correctly parsed")
- assert_in(contents, resp, f"STL File contents not returned / correctly parsed")
+ if register:
+ self.do_auth(conn, authstr)
+ contents, modelid = self.putfile(conn, solidname, modelname, filetype)
+ self.check_getfile(conn, modelname, solidname, contents)
+ if register:
+ self.check_listed(conn, modelid)
self.closeconn(conn)
+ # try getting file from a new session
conn = self.openconn()
- resp = self.getfile(conn, modelname = modelname)
- assert_in(modelname.encode(), resp, f"Model name '{modelname}' not returned / correctly parsed")
- assert_in(solidname.encode(), resp, f"Solid name '{modelname}' not returned / correctly parsed")
- assert_in(contents, resp, f"STL File contents not returned / correctly parsed")
+ if register:
+ self.do_auth(conn, authstr)
+ self.check_getfile(conn, modelname, solidname, contents)
+ if register:
+ self.check_listed(conn, modelid)
self.closeconn(conn)
def openconn(self):
self.debug("Connecting to service")
- conn = self.connect()
- conn.read_until("$") # ignore welcome
+ conn = pwnlib.tubes.remote.remote(self.address, self.port)
+ conn.recvuntil("$") # ignore welcome
+ if self.debuglog:
+ self.write(conn, "echo\n")
+ conn.recvuntil("$")
return conn
- def closeconn(self, conn: SimpleSocket):
+ def closeconn(self, conn):
self.debug("Sending exit command")
- conn.write("exit\n")
+ self.write(conn, "exit\n")
conn.close()
def putflag(self): # type: () -> None
if self.variant_id == 0:
conn = self.openconn()
- modelname = self.fake_filename()
- stlfile, fileid = self.putfile(conn, solidname = self.flag, modelname = modelname, filetype = "ascii")
+ modelname = self.fakeid()
+ stlfile, modelid = self.putfile(conn, self.flag, modelname, filetype = "ascii")
+ self.closeconn(conn)
+ self.postdb({ "modelid": modelid, "modelname": modelname })
+ elif self.variant_id == 1:
+ conn = self.openconn()
+ modelname = self.fakeid()
+ authstr = self.fakeid()
+ self.do_auth(conn, authstr)
+ stlfile, modelid = self.putfile(conn, self.flag, modelname, filetype = "bin")
self.closeconn(conn)
- self.postdb({ "fileid": fileid, "modelname": modelname })
+ self.postdb({ "modelid": modelid, "modelname": modelname, "auth": authstr })
else:
raise EnoException("Invalid variant_id provided")
def getflag(self): # type: () -> None
if self.variant_id == 0:
- fileid, modelname = self.querydb("fileid", "modelname")
+ modelid, modelname = self.querydb("modelid", "modelname")
conn = self.openconn()
- resp = self.getfile(conn, modelname)
+ resp = self.getfile(conn, modelname.encode())
+ self.debug(resp)
+ assert_in(self.flag.encode(), resp, "Resulting flag was found to be incorrect")
+ self.closeconn(conn)
+ elif self.variant_id == 1:
+ modelid, modelname, authstr = self.querydb("modelid", "modelname", "auth")
+ conn = self.openconn()
+ self.do_auth(conn, authstr)
+ resp = self.getfile(conn, modelname.encode())
assert_in(self.flag.encode(), resp, "Resulting flag was found to be incorrect")
self.closeconn(conn)
else:
raise EnoException("Invalid variant_id provided")
-
def putnoise(self): # type: () -> None
if self.variant_id == 0:
conn = self.openconn()
- modelname = self.fake_filename()
- solidname = self.fake_filename()
- contents, fileid = self.putfile(conn, modelname = modelname, solidname = solidname, filetype = "bin")
+ modelname = self.fakeid()
+ solidname = self.fakeid()
+ contents, modelid = self.putfile(conn, solidname, modelname, filetype = "bin")
self.closeconn(conn)
- self.postdb({ "fileid": fileid, "modelname": modelname, "solidname": solidname, "contents": contents })
+ self.postdb({ "modelid": modelid, "modelname": modelname, "solidname": solidname, "contents": contents })
+ elif self.variant_id == 1:
+ conn = self.openconn()
+ authstr = self.fakeid()
+ modelname = self.fakeid()
+ solidname = self.fakeid()
+ self.do_auth(conn, authstr)
+ contents, modelid = self.putfile(conn, solidname, modelname, filetype = "ascii")
+ self.closeconn(conn)
+ self.postdb({ "modelid": modelid, "modelname": modelname, "solidname": solidname, "contents": contents, "auth": authstr })
else:
raise EnoException("Invalid variant_id provided")
def getnoise(self): # type: () -> None
if self.variant_id == 0:
- fileid, modelname, solidname, contents = self.querydb("fileid", "modelname", "solidname", "contents")
+ modelid, modelname, solidname, contents = self.querydb("modelid", "modelname", "solidname", "contents")
conn = self.openconn()
- resp = self.getfile(conn, modelname)
- # assert_in(contents, resp, "File content returned by service found to be incorrect")
- assert_in(solidname.encode(), resp, "Solid name returned by service found to be incorrect")
- assert_in(modelname.encode(), resp, "Model name returned by service found to be incorrect")
+ self.check_getfile(conn, modelname, solidname, contents, modelid)
+ self.closeconn(conn)
+ elif self.variant_id == 1:
+ modelid, modelname, solidname, contents, authstr = self.querydb("modelid", "modelname", "solidname", "contents", "auth")
+ conn = self.openconn()
+ self.do_auth(conn, authstr)
+ self.check_getfile(conn, modelname, solidname, contents, modelid)
self.closeconn(conn)
else:
raise EnoException("Invalid variant_id provided")
def havoc(self): # type: () -> None
if self.variant_id == 0:
- self.havoc_upload(filetype = 'ascii', registered = False)
+ self.havoc_upload('ascii', False)
elif self.variant_id == 1:
- self.havoc_upload(filetype = 'bin', registered = False)
+ self.havoc_upload('bin', False)
elif self.variant_id == 2:
- self.havoc_upload(filetype = 'ascii', registered = True)
+ self.havoc_upload('ascii', True)
elif self.variant_id == 3:
- self.havoc_upload(filetype = 'bin', registered = True)
+ self.havoc_upload('bin', True)
else:
raise EnoException("Invalid variant_id provided");
- # TODO!
- conn = self.openconn()
- self.closeconn(conn)
-
def exploit(self):
- """
- This method was added for CI purposes for exploits to be tested.
- Will (hopefully) not be called during actual CTF.
- :raises EnoException on Error
- :return This function can return a result if it wants
- If nothing is returned, the service status is considered okay.
- The preferred way to report Errors in the service is by raising an appropriate EnoException
- """
- # TODO: We still haven't decided if we want to use this function or not. TBA
+ if self.variant_id == 0:
+ pass
+ elif self.variant_id == 1:
+ pass
+ else:
+ raise EnoException("Invalid variant_id provided")
+
pass