diff options
Diffstat (limited to 'checker/src/checker.py')
| -rw-r--r-- | checker/src/checker.py | 396 |
1 files changed, 124 insertions, 272 deletions
diff --git a/checker/src/checker.py b/checker/src/checker.py index 88729a9..145cc60 100644 --- a/checker/src/checker.py +++ b/checker/src/checker.py @@ -5,17 +5,38 @@ import random import string #### Checker Tenets -# A checker SHOULD not be easily identified by the examination of network traffic => This one is not satisfied, because our usernames and notes are simple too random and easily identifiable. -# A checker SHOULD use unusual, incorrect or pseudomalicious input to detect network filters => This tenet is not satisfied, because we do not send common attack strings (i.e. for SQL injection, RCE, etc.) in our notes or usernames. +# A checker SHOULD not be easily identified by the examination of network traffic +# => satisfied, because checker uses regular user interface and picks strings from a wordlist +# to appear more human (TODO) +# A checker SHOULD use unusual, incorrect or pseudomalicious input to detect network filters +# => satisfied, send various garbage bytes for model name and file contents (TODO) #### - -class N0t3b00kChecker(BaseChecker): +samplestl = """ +solid + facet normal 1.0 0 0 + outer loop + vertex 0 1 0 + vertex 0 1 1 + vertex 0 0 1 + endloop + endfacet + facet normal 0 0 1.0 + outer loop + vertex 1 0 0 + vertex 1 1 0 + vertex 0 1 0 + endloop + endfacet +endsolid +""" + +class STLDoctorChecker(BaseChecker): """ Change the methods given here, then simply create the class and .run() it. Magic. A few convenient methods and helpers are provided in the BaseChecker. - ensure_bytes ans ensure_unicode to make sure strings are always equal. + ensure_bytes and ensure_unicode to make sure strings are always equal. As well as methods: self.connect() connects to the remote server. self.get and self.post request from http. @@ -28,30 +49,81 @@ class N0t3b00kChecker(BaseChecker): ##### EDIT YOUR CHECKER PARAMETERS flag_variants = 1 noise_variants = 1 - havoc_variants = 3 - service_name = "n0t3b00k" - port = 2323 # The port will automatically be picked up as default by self.connect and self.http. + havoc_variants = 0 + service_name = "stldoctor" + port = 9000 ##### END CHECKER PARAMETERS - def register_user(self, conn: SimpleSocket, username: str, password: str): - self.debug( - f"Sending command to register user: {username} with password: {password}" - ) - conn.write(f"reg {username} {password}\n") - conn.readline_expect( - b"User successfully registered", - read_until=b">", - exception_message="Failed to register user", - ) + def login_user(self, conn: SimpleSocket, password: str): + self.debug("Sending command to login.") + conn.write(f"login\n{password}\n") + conn.readline_expect(b"logged in!", read_until=b"$", exception_message="Failed to log in") + + def generate_file(self, filetype: str = "ascii", extra: str = ""): + if filetype == "ascii": + # TODO handle extra as solidname and gen randomly + return samplestl + elif filetype == "bin": + # TODO handle extra as header + return samplestl # TODO: this is not a binary STL! + else: + raise EnoException("Invalid file type supplied"); + + def putfile(self, conn: SimpleSocket, solidname = "TODO", modelname = "TODO"): + # Generate file contents + stlfile = samplestl + + # Upload file + self.debug("Sending command to submit file") + conn.write(f"submit\n{len(stlfile)}\n{stlfile}{modelname}\n") + conn.read_until(b"with ID ") + + # Parse ID + fileid = conn.read_until(b"!") + if fileid == b"": + raise BrokenServiceException("Unable to upload file!") + self.debug(f"Got ID {fileid}") + + conn.read_until(b"$") + + return stlfile, fileid + + def getfile(self, conn: SimpleSocket, modelname: str): + if modelname != "": + self.debug(f"Sending command to retrieve file with '{modelname}'") + conn.write(f"query\n{modelname}\n0\ny\n") + else: + self.debug(f"Sending command to retrieve file") + conn.write(f"query\n0\ny\n") + + resp = conn.read_until(b"$") + + return resp + + def querydb(self, *args): + self.debug("Querying db contents"); + vals = [] + for arg in args: + try: + val: str = self.chain_db[arg] + except IndexError as ex: + raise BrokenServiceException("Invalid db contents") + vals.append(val) + return vals - def login_user(self, conn: SimpleSocket, username: str, password: str): - self.debug(f"Sending command to login.") - conn.write(f"log {username} {password}\n") - conn.readline_expect( - b"Successfully logged in!", - read_until=b">", - exception_message="Failed to log in", - ) + def postdb(self, vdict): + self.chain_db = vdict + + def openconn(self): + self.debug("Connecting to service") + conn = self.connect() + conn.read_until("$") # ignore welcome + return conn + + def closeconn(self, conn: SimpleSocket): + self.debug("Sending exit command") + conn.write("exit\n") + conn.close() def putflag(self): # type: () -> None """ @@ -65,55 +137,11 @@ class N0t3b00kChecker(BaseChecker): the preferred way to report errors in the service is by raising an appropriate enoexception """ if self.variant_id == 0: - # First we need to register a user. So let's create some random strings. (Your real checker should use some funny usernames or so) - username: str = "".join( - random.choices(string.ascii_uppercase + string.digits, k=12) - ) - password: str = "".join( - random.choices(string.ascii_uppercase + string.digits, k=12) - ) - - # Log a message before any critical action that could raise an error. - self.debug(f"Connecting to service") - # Create a TCP connection to the service. - conn = self.connect() - welcome = conn.read_until(">") - - # Register a new user - self.register_user(conn, username, password) - - # Now we need to login - self.login_user(conn, username, password) - - # Finally, we can post our note! - self.debug(f"Sending command to set the flag") - conn.write(f"set {self.flag}\n") - conn.read_until(b"Note saved! ID is ") - - try: - # Try to retrieve the resulting noteId. Using rstrip() is hacky, you should probably want to use regular expressions or something more robust. - noteId = conn.read_until(b"!\n>").rstrip(b"!\n>").decode() - except Exception as ex: - self.debug(f"Failed to retrieve note: {ex}") - raise BrokenServiceException("Could not retrieve NoteId") - - assert_equals(len(noteId) > 0, True, message="Empty noteId received") - - self.debug(f"Got noteId {noteId}") - - # Exit! - self.debug(f"Sending exit command") - conn.write(f"exit\n") - conn.close() - - # Save the generated values for the associated getflag() call. - # This is not a real dictionary! You cannot update it (i.e., self.chain_db["foo"] = bar) and some types are converted (i.e., bool -> str.). See: https://github.com/enowars/enochecker/issues/27 - self.chain_db = { - "username": username, - "password": password, - "noteId": noteId, - } - + conn = self.openconn() + modelname = self.flag + stlfile, fileid = self.putfile(conn, modelname = modelname) + self.closeconn(conn) + self.chain_db = { "fileid": fileid, "modelname": modelname } else: raise EnoException("Wrong variant_id provided") @@ -128,34 +156,11 @@ class N0t3b00kChecker(BaseChecker): the preferred way to report errors in the service is by raising an appropriate enoexception """ if self.variant_id == 0: - # First we check if the previous putflag succeeded! - try: - username: str = self.chain_db["username"] - password: str = self.chain_db["password"] - noteId: str = self.chain_db["noteId"] - except IndexError as ex: - self.debug(f"error getting notes from db: {ex}") - raise BrokenServiceException("Previous putflag failed.") - - self.debug(f"Connecting to the service") - conn = self.connect() - welcome = conn.read_until(">") - - # Let's login to the service - self.login_user(conn, username, password) - - # Let´s obtain our note. - self.debug(f"Sending command to retrieve note: {noteId}") - conn.write(f"get {noteId}\n") - note = conn.read_until(">") - assert_in( - self.flag.encode(), note, "Resulting flag was found to be incorrect" - ) - - # Exit! - self.debug(f"Sending exit command") - conn.write(f"exit\n") - conn.close() + fileid, modelname = self.querydb("fileid", "modelname") + conn = self.openconn() + resp = self.getfile(conn, modelname) + assert_in(modelname.encode(), resp, "Resulting flag was found to be incorrect") + self.closeconn(conn) else: raise EnoException("Wrong variant_id provided") @@ -172,53 +177,11 @@ class N0t3b00kChecker(BaseChecker): the preferred way to report errors in the service is by raising an appropriate enoexception """ if self.variant_id == 0: - self.debug(f"Connecting to the service") - conn = self.connect() - welcome = conn.read_until(">") - - # First we need to register a user. So let's create some random strings. (Your real checker should use some better usernames or so [i.e., use the "faker¨ lib]) - username = "".join( - random.choices(string.ascii_uppercase + string.digits, k=12) - ) - password = "".join( - random.choices(string.ascii_uppercase + string.digits, k=12) - ) - randomNote = "".join( - random.choices(string.ascii_uppercase + string.digits, k=36) - ) - - # Register another user - self.register_user(conn, username, password) - - # Now we need to login - self.login_user(conn, username, password) - - # Finally, we can post our note! - self.debug(f"Sending command to save a note") - conn.write(f"set {randomNote}\n") - conn.read_until(b"Note saved! ID is ") - - try: - noteId = conn.read_until(b"!\n>").rstrip(b"!\n>").decode() - except Exception as ex: - self.debug(f"Failed to retrieve note: {ex}") - raise BrokenServiceException("Could not retrieve NoteId") - - assert_equals(len(noteId) > 0, True, message="Empty noteId received") - - self.debug(f"{noteId}") - - # Exit! - self.debug(f"Sending exit command") - conn.write(f"exit\n") - conn.close() - - self.chain_db = { - "username": username, - "password": password, - "noteId": noteId, - "note": randomNote, - } + conn = self.openconn() + modelname = "NOISE" # TODO + contents, fileid = self.putfile(conn, modelname = modelname) + self.closeconn(conn) + self.postdb({ "fileid": fileid, "modelname": modelname, "contents": contents }) else: raise EnoException("Wrong variant_id provided") @@ -235,35 +198,11 @@ class N0t3b00kChecker(BaseChecker): the preferred way to report errors in the service is by raising an appropriate enoexception """ if self.variant_id == 0: - try: - username: str = self.chain_db["username"] - password: str = self.chain_db["password"] - noteId: str = self.chain_db["noteId"] - randomNote: str = self.chain_db["note"] - except Exception as ex: - self.debug("Failed to read db {ex}") - raise BrokenServiceException("Previous putnoise failed.") - - self.debug(f"Connecting to service") - conn = self.connect() - welcome = conn.read_until(">") - - # Let's login to the service - self.login_user(conn, username, password) - - # Let´s obtain our note. - self.debug(f"Sending command to retrieve note: {noteId}") - conn.write(f"get {noteId}\n") - conn.readline_expect( - randomNote.encode(), - read_until=b">", - exception_message="Resulting flag was found to be incorrect" - ) - - # Exit! - self.debug(f"Sending exit command") - conn.write(f"exit\n") - conn.close() + fileid, modelname, contents = self.querydb("fileid", "modelname", "contents") + conn = self.openconn() + resp = self.getfile(conn, modelname) + assert_in(contents.encode(), resp, "Noise file content was found to be incorrect") + self.closeconn(conn) else: raise EnoException("Wrong variant_id provided") @@ -276,98 +215,11 @@ class N0t3b00kChecker(BaseChecker): If nothing is returned, the service status is considered okay. The preferred way to report Errors in the service is by raising an appropriate EnoException """ - self.debug(f"Connecting to service") - conn = self.connect() - welcome = conn.read_until(">") - - if self.variant_id == 0: - # In variant 1, we'll check if the help text is available - self.debug(f"Sending help command") - conn.write(f"help\n") - is_ok = conn.read_until(">") + return - for line in [ - "This is a notebook service. Commands:", - "reg USER PW - Register new account", - "log USER PW - Login to account", - "set TEXT..... - Set a note", - "user - List all users", - "list - List all notes", - "exit - Exit!", - "dump - Dump the database", - "get ID", - ]: - assert_in(line.encode(), is_ok, "Received incomplete response.") - - elif self.variant_id == 1: - # In variant 2, we'll check if the `user` command still works. - username = "".join( - random.choices(string.ascii_uppercase + string.digits, k=12) - ) - password = "".join( - random.choices(string.ascii_uppercase + string.digits, k=12) - ) - - # Register and login a dummy user - self.register_user(conn, username, password) - self.login_user(conn, username, password) - - self.debug(f"Sending user command") - conn.write(f"user\n") - ret = conn.readline_expect( - "User 0: ", - read_until=b">", - exception_message="User command does not return any users", - ) - - if username: - assert_in(username.encode(), ret, "Flag username not in user output") - - elif self.variant_id == 2: - # In variant 2, we'll check if the `list` command still works. - username = "".join( - random.choices(string.ascii_uppercase + string.digits, k=12) - ) - password = "".join( - random.choices(string.ascii_uppercase + string.digits, k=12) - ) - randomNote = "".join( - random.choices(string.ascii_uppercase + string.digits, k=36) - ) - - # Register and login a dummy user - self.register_user(conn, username, password) - self.login_user(conn, username, password) - - self.debug(f"Sending command to save a note") - conn.write(f"set {randomNote}\n") - conn.read_until(b"Note saved! ID is ") - - try: - noteId = conn.read_until(b"!\n>").rstrip(b"!\n>").decode() - except Exception as ex: - self.debug(f"Failed to retrieve note: {ex}") - raise BrokenServiceException("Could not retrieve NoteId") - - assert_equals(len(noteId) > 0, True, message="Empty noteId received") - - self.debug(f"{noteId}") - - self.debug(f"Sending list command") - conn.write(f"list\n") - conn.readline_expect( - noteId.encode(), - read_until=b'>', - exception_message="List command does not work as intended" - ) - - else: - raise EnoException("Wrong variant_id provided") - - # Exit! - self.debug(f"Sending exit command") - conn.write(f"exit\n") - conn.close() + # TODO! + conn = self.openconn() + self.closeconn(conn) def exploit(self): """ @@ -382,6 +234,6 @@ class N0t3b00kChecker(BaseChecker): pass -app = N0t3b00kChecker.service # This can be used for uswgi. +app = STLDoctorChecker.service # This can be used for uswgi. if __name__ == "__main__": - run(N0t3b00kChecker) + run(STLDoctorChecker) |
