From 1109a88447e1c5cefe6ed93eccc8dcf8cd595d0e Mon Sep 17 00:00:00 2001 From: Louis Burda Date: Thu, 20 May 2021 02:49:09 +0200 Subject: implemented rest of checker functionality --- checker/src/checker.py | 292 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 191 insertions(+), 101 deletions(-) (limited to 'checker/src/checker.py') diff --git a/checker/src/checker.py b/checker/src/checker.py index 8818214..85c3a91 100644 --- a/checker/src/checker.py +++ b/checker/src/checker.py @@ -1,65 +1,96 @@ #!/usr/bin/env python3 from enochecker import BaseChecker, BrokenServiceException, EnoException, run from enochecker.utils import SimpleSocket, assert_equals, assert_in -import random, string, struct, logging +import random, string, struct, logging, selectors, time, socket +import pwnlib import numpy as np logging.getLogger("faker").setLevel(logging.WARNING) +logging.getLogger("pwnlib").setLevel(logging.WARNING) from faker import Faker -fake = Faker(["en_US"]) +def ensure_bytes(v): + if type(v) == bytes: + return v + elif type(v) == str: + return v.encode("latin-1") + else: + raise BrokenServiceException("Tried to pass non str/bytes to bytes arg") class STLDoctorChecker(BaseChecker): - flag_variants = 1 - noise_variants = 1 + flag_variants = 2 + noise_variants = 2 havoc_variants = 4 service_name = "stldoctor" port = 9000 - def login_user(self, conn: SimpleSocket, password: str): - self.debug("Sending command to login.") - conn.write(f"login\n{password}\n") - conn.readline_expect(b"logged in!", read_until=b"$", exception_message="Failed to log in") - - def binwrite(conn: SimpleSocket, buf: bytes): - conn.sock.sendall(buf) - - def fake_filename(self): - allowed = "abcdefghijklmopqrstuvwxyz0123456789-+.!" - return "".join([c for c in fake.name().lower().replace(" ", "-") if c in allowed][:60]).ljust(5, "!") + debuglog = True - def generate_ascii_file(self, solidname: str): - if solidname != "": - content = f"solid {solidname}\n" + def login_user(self, conn, password): + self.debug("Sending command to login.") + self.write(conn, f"login\n{password}\n") + conn.readline_expect(b"logged in!", recvuntil=b"$", exception_message="Failed to log in") + + def write(self, conn, buf): + if self.debuglog: + print("SEND: " + str(ensure_bytes(buf))) + conn.send(ensure_bytes(buf)) + + def fakeid(self): + fake = Faker(["en_US"]) + allowed = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmopqrstuvwxyz0123456789-+.!" + return "".join([c for c in fake.name().lower().replace(" ", "-") if c in allowed][:60]).ljust(10, "!") + + def havocid(self): + return "".join([chr(int(random.randint(ord(' '), 255))) for i in range(60)]) + + def do_auth(self, conn: SimpleSocket, authstr: str): + self.write(conn, f"auth {authstr}\n") + resp = conn.recvuntil("$") + assert_in(b"Success!", resp, f"Login with pass '{authstr}' failed!"); + + def check_listed(self, conn, modelid): + modelid = ensure_bytes(modelid) + self.write(conn, "list\n") + resp = conn.recvuntil("$") + assert_in(modelid, resp, f"Uploaded model is missing from list command") + + def generate_ascii_file(self, solidname): + solidname = ensure_bytes(solidname) + + if len(solidname) != 0: + content = b"solid " + solidname + b"\n" else: - content = "solid\n" - facet_count = int(random.random() * 30) + 4 + content = b"solid\n" + facet_count = random.randint(4, 30) for fi in range(facet_count): - content += "facet normal " + content += b"facet normal " vs = [[random.random() for i in range(3)] for k in range(3)] norm = np.cross(np.subtract(vs[1], vs[0]), np.subtract(vs[2],vs[0])) norm = norm / np.linalg.norm(norm) - content += " ".join([f"{v:.2f}" for v in norm]) + "\n" - content += "outer loop\n" + content += " ".join([f"{v:.2f}" for v in norm]).encode() + b"\n" + content += b"outer loop\n" for i in range(3): - content += "vertex " + " ".join([f"{v:.2f}" for v in vs[i]]) + "\n" - content += "endloop\n" - content += "endfacet\n" + content += b"vertex " + " ".join([f"{v:.2f}" for v in vs[i]]).encode() + b"\n" + content += b"endloop\n" + content += b"endfacet\n" if solidname != b"": - content += f"endsolid {solidname}\n" + content += b"endsolid " + solidname + b"\n" else: - content += "endsolid\n" + content += b"endsolid\n" - return content.encode("latin1") + return content + + def generate_bin_file(self, solidname): + solidname = ensure_bytes(solidname) - def generate_bin_file(self, solidname: str): - if len(solidname.encode()) > 78: + if len(solidname) > 78: raise EnoException("Solidname to embed in header is larger than header itself") if solidname != "": - content = b"#" + solidname.encode("ascii").ljust(78, b"\x00") + b"\x00" + content = b"#" + solidname.ljust(78, b"\x00") + b"\x00" else: content = b"#" + b"\x00" * 79 - facet_count = int(random.random() * 30) + 4 + facet_count = random.randint(4, 30) content += struct.pack(" None if self.variant_id == 0: conn = self.openconn() - modelname = self.fake_filename() - stlfile, fileid = self.putfile(conn, solidname = self.flag, modelname = modelname, filetype = "ascii") + modelname = self.fakeid() + stlfile, modelid = self.putfile(conn, self.flag, modelname, filetype = "ascii") + self.closeconn(conn) + self.postdb({ "modelid": modelid, "modelname": modelname }) + elif self.variant_id == 1: + conn = self.openconn() + modelname = self.fakeid() + authstr = self.fakeid() + self.do_auth(conn, authstr) + stlfile, modelid = self.putfile(conn, self.flag, modelname, filetype = "bin") self.closeconn(conn) - self.postdb({ "fileid": fileid, "modelname": modelname }) + self.postdb({ "modelid": modelid, "modelname": modelname, "auth": authstr }) else: raise EnoException("Invalid variant_id provided") def getflag(self): # type: () -> None if self.variant_id == 0: - fileid, modelname = self.querydb("fileid", "modelname") + modelid, modelname = self.querydb("modelid", "modelname") conn = self.openconn() - resp = self.getfile(conn, modelname) + resp = self.getfile(conn, modelname.encode()) + self.debug(resp) + assert_in(self.flag.encode(), resp, "Resulting flag was found to be incorrect") + self.closeconn(conn) + elif self.variant_id == 1: + modelid, modelname, authstr = self.querydb("modelid", "modelname", "auth") + conn = self.openconn() + self.do_auth(conn, authstr) + resp = self.getfile(conn, modelname.encode()) assert_in(self.flag.encode(), resp, "Resulting flag was found to be incorrect") self.closeconn(conn) else: raise EnoException("Invalid variant_id provided") - def putnoise(self): # type: () -> None if self.variant_id == 0: conn = self.openconn() - modelname = self.fake_filename() - solidname = self.fake_filename() - contents, fileid = self.putfile(conn, modelname = modelname, solidname = solidname, filetype = "bin") + modelname = self.fakeid() + solidname = self.fakeid() + contents, modelid = self.putfile(conn, solidname, modelname, filetype = "bin") self.closeconn(conn) - self.postdb({ "fileid": fileid, "modelname": modelname, "solidname": solidname, "contents": contents }) + self.postdb({ "modelid": modelid, "modelname": modelname, "solidname": solidname, "contents": contents }) + elif self.variant_id == 1: + conn = self.openconn() + authstr = self.fakeid() + modelname = self.fakeid() + solidname = self.fakeid() + self.do_auth(conn, authstr) + contents, modelid = self.putfile(conn, solidname, modelname, filetype = "ascii") + self.closeconn(conn) + self.postdb({ "modelid": modelid, "modelname": modelname, "solidname": solidname, "contents": contents, "auth": authstr }) else: raise EnoException("Invalid variant_id provided") def getnoise(self): # type: () -> None if self.variant_id == 0: - fileid, modelname, solidname, contents = self.querydb("fileid", "modelname", "solidname", "contents") + modelid, modelname, solidname, contents = self.querydb("modelid", "modelname", "solidname", "contents") conn = self.openconn() - resp = self.getfile(conn, modelname) - # assert_in(contents, resp, "File content returned by service found to be incorrect") - assert_in(solidname.encode(), resp, "Solid name returned by service found to be incorrect") - assert_in(modelname.encode(), resp, "Model name returned by service found to be incorrect") + self.check_getfile(conn, modelname, solidname, contents, modelid) + self.closeconn(conn) + elif self.variant_id == 1: + modelid, modelname, solidname, contents, authstr = self.querydb("modelid", "modelname", "solidname", "contents", "auth") + conn = self.openconn() + self.do_auth(conn, authstr) + self.check_getfile(conn, modelname, solidname, contents, modelid) self.closeconn(conn) else: raise EnoException("Invalid variant_id provided") def havoc(self): # type: () -> None if self.variant_id == 0: - self.havoc_upload(filetype = 'ascii', registered = False) + self.havoc_upload('ascii', False) elif self.variant_id == 1: - self.havoc_upload(filetype = 'bin', registered = False) + self.havoc_upload('bin', False) elif self.variant_id == 2: - self.havoc_upload(filetype = 'ascii', registered = True) + self.havoc_upload('ascii', True) elif self.variant_id == 3: - self.havoc_upload(filetype = 'bin', registered = True) + self.havoc_upload('bin', True) else: raise EnoException("Invalid variant_id provided"); - # TODO! - conn = self.openconn() - self.closeconn(conn) - def exploit(self): - """ - This method was added for CI purposes for exploits to be tested. - Will (hopefully) not be called during actual CTF. - :raises EnoException on Error - :return This function can return a result if it wants - If nothing is returned, the service status is considered okay. - The preferred way to report Errors in the service is by raising an appropriate EnoException - """ - # TODO: We still haven't decided if we want to use this function or not. TBA + if self.variant_id == 0: + pass + elif self.variant_id == 1: + pass + else: + raise EnoException("Invalid variant_id provided") + pass -- cgit v1.2.3-71-gd317