diff options
| author | Louis Burda <quent.burda@gmail.com> | 2022-10-26 19:28:04 +0200 |
|---|---|---|
| committer | Louis Burda <quent.burda@gmail.com> | 2022-10-26 19:28:04 +0200 |
| commit | b4f2da2ca448e173d2ff638d71018c50eca0982d (patch) | |
| tree | 965872d1b692dacb4191d60c507dca3b0e54d018 | |
| parent | ab549121677c902683acf8887796ae039d7d571e (diff) | |
| download | bambi7-service-catchbox-b4f2da2ca448e173d2ff638d71018c50eca0982d.tar.gz bambi7-service-catchbox-b4f2da2ca448e173d2ff638d71018c50eca0982d.zip | |
Add debug info for unexpected status_codes (untested)
| -rw-r--r-- | checker/src/checker.py | 81 |
1 files changed, 45 insertions, 36 deletions
diff --git a/checker/src/checker.py b/checker/src/checker.py index 0925763..962b073 100644 --- a/checker/src/checker.py +++ b/checker/src/checker.py @@ -1,3 +1,5 @@ +from bs4 import BeautifulSoup + from enochecker3 import ( ChainDB, DependencyInjector, @@ -12,11 +14,9 @@ from enochecker3 import ( ) from enochecker3.utils import FlagSearcher, assert_in, assert_equals -from typing import Optional - -from httpx import AsyncClient +import dateutil.parser -from bs4 import BeautifulSoup +from httpx import AsyncClient, Response from hashlib import md5 @@ -24,9 +24,12 @@ from logging import LoggerAdapter from subprocess import Popen, PIPE -import dateutil.parser import string + +from typing import Optional + import random + import os checker = Enochecker("CatchBox", 9090) @@ -48,18 +51,24 @@ def str2epoch(text: str) -> int: date = dateutil.parser.parse(text + " UTC") return int(date.timestamp()) +def assert_status_code(logger: LoggerAdapter, r: Response, + status_code: int, action: str) -> None: + if (r.status_code != status_code): + logger.warn(f"Bad service response for {action}:\n{r.text}") + raise MumbleException(status[0].upper() + status[1:] + " failed") + @checker.putflag(0) async def putflag_file(task: PutflagCheckerTaskMessage, logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> str: username, password = noise(10, 20), noise(20, 30) data = { "action": "register", "username": username, "password": password } r = await client.post("/index.php", data=data) - assert_equals(r.status_code, 200, "register failed") + assert_status_code(logger, r, 200, "register") filename, content = noise(20, 30), task.flag data = { "action": "upload", "filename": filename, "content": content } r = await client.post("/index.php", data=data) - assert_equals(r.status_code, 200, "upload failed") + assert_status_code(logger, r, 200, "file upload") await db.set("info", (username, password, filename)) @@ -75,12 +84,12 @@ async def getflag_file(task: GetflagCheckerTaskMessage, data = { "action": "login", "username": username, "password": password } r = await client.post("/index.php", data=data) - assert_equals(r.status_code, 200, "login failed") + assert_status_code(logger, r, 200, "login") r = await client.get(f"/index.php?f={filename}") - assert_equals(r.status_code, 200, "file download failed") + assert_status_code(logger, r, 200, "file download") - assert_in(task.flag, r.text, "flag missing") + assert_in(task.flag, r.text, "Flag missing") @checker.putflag(1) async def putflag_report(task: PutflagCheckerTaskMessage, logger: LoggerAdapter, @@ -88,11 +97,11 @@ async def putflag_report(task: PutflagCheckerTaskMessage, logger: LoggerAdapter, username, password = noise(10, 20), noise(20, 30) data = { "action": "register", "username": username, "password": password } r = await client.post("/index.php", data=data) - assert_equals(r.status_code, 200, "register failed") + assert_status_code(logger, r, 200, "register") data = { "action": "report", "content": task.flag } r = await client.post("/index.php", data=data) - assert_equals(r.status_code, 200, "upload failed") + assert_status_code(logger, r, 200, "upload") await db.set("info", (username, password)) @@ -100,41 +109,40 @@ async def putflag_report(task: PutflagCheckerTaskMessage, logger: LoggerAdapter, @checker.getflag(1) async def getflag_report(task: GetflagCheckerTaskMessage, - client: AsyncClient, db: ChainDB) -> None: + logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> None: try: username, password = await db.get("info") except KeyError: - raise MumbleException("database info missing") + raise MumbleException("Database info missing") data = { "action": "login", "username": username, "password": password } r = await client.post("/index.php", data=data) - assert_equals(r.status_code, 200, "login failed") + assert_status_code(logger, r, 200, "login") r = await client.get(f"/index.php?r") - assert_equals(r.status_code, 200, "report download failed") + assert_status_code(logger, r, 200, "report download") - assert_in(task.flag, r.text, "flag missing") + assert_in(task.flag, r.text, "Flag missing") @checker.putnoise(0) async def putnoise_file(task: PutnoiseCheckerTaskMessage, - client: AsyncClient, db: ChainDB) -> None: + logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> None: username, password = noise(10, 20), noise(20, 30) data = { "action": "register", "username": username, "password": password } r = await client.post("/index.php", data=data) - assert_equals(r.status_code, 200, "register failed") + assert_status_code(logger, r, 200, "register") filename, content = noise(20, 30), noise(20, 30) data = { "action": "upload", "filename": filename, "content": content } r = await client.post("/index.php", data=data) - assert_equals(r.status_code, 200, "upload failed") + assert_status_code(logger, r, 200, "file upload") await db.set("info", (username, password, filename, content)) - return f"User {username} File {filename}" - @checker.getnoise(0) async def getnoise_file(task: GetnoiseCheckerTaskMessage, - client: AsyncClient, db: ChainDB, di: DependencyInjector) -> None: + logger: LoggerAdapter, client: AsyncClient, + db: ChainDB, di: DependencyInjector) -> None: try: username, password, filename, noise = await db.get("info") except KeyError: @@ -142,44 +150,45 @@ async def getnoise_file(task: GetnoiseCheckerTaskMessage, data = { "action": "login", "username": username, "password": password } r = await client.post("/index.php", data=data) - assert_equals(r.status_code, 200, "login failed") + assert_status_code(logger, r., 200, "login") r = await client.get("/index.php?q=files") - assert_equals(r.status_code, 200, "files missing") + assert_status_code(logger, r, 200, "files query") soup = BeautifulSoup(r.text, "html.parser") files = [v.select("a") for v in soup.select("ul.filelist > li")] assert_equals(all([len(v) == 2 for v in files]), True, "noise missing") urls = { a.text.strip(): b.get("href", None) for a,b in files } - assert_in(filename, urls, "noise missing") + assert_in(filename, urls, "Noise missing") assert_equals(type(urls[filename]), str, "noise missing") r = await client.get(f"/index.php?f={filename}") - assert_equals(r.status_code, 200, "file download failed") - assert_in(noise, r.text, "noise missing") + assert_status_code(logger, r, 200, "file download") + assert_in(noise, r.text, "Noise missing") anon = await di.get(AsyncClient) r = await anon.get(urls[filename]) - assert_equals(r.status_code, 200, "file public url invalid") - assert_in(noise, r.text, "noise missing") + assert_status_code(logger, r, 200, "public file retrieve") + assert_in(noise, r.text, "Noise missing") @checker.exploit(0) -async def exploit_file_creat(task: ExploitCheckerTaskMessage, logger: LoggerAdapter, - searcher: FlagSearcher, client: AsyncClient) -> Optional[str]: +async def exploit_file_creat(task: ExploitCheckerTaskMessage, + logger: LoggerAdapter, searcher: FlagSearcher, + client: AsyncClient) -> Optional[str]: assert_equals(type(task.attack_info), str, "attack info missing") assert_equals(len(task.attack_info.split()), 4) _, flaguser, _, flagfile = task.attack_info.split() r = await client.get("/?q=users") - assert_equals(r.status_code, 200, "users unavailable") + assert_status_code(logger, r, 200, "query users") soup = BeautifulSoup(r.text, "html.parser") users = [v.children for v in soup.select("ul.userlist > li")] times = { a.text.strip(): str2epoch(b.text.strip()) for a,b in users } - assert_in(flaguser, times, "flag user missing") + assert_in(flaguser, times, "Flag user missing") for creat in range(times[flaguser], times[flaguser] + 15): dirname = filehash(flaguser, flagfile, creat) r = await client.get(f"/uploads/{dirname}/{flagfile}") @@ -213,12 +222,12 @@ async def exploit_report_path(task: ExploitCheckerTaskMessage, username, password = noise(10, 20), noise(20, 30) data = { "action": "register", "username": username, "password": password } r = await client.post("/index.php", data=data) - assert_equals(r.status_code, 200, "register failed") + assert_status_code(logger, r, 200, "register") filepath = f"../../reports/{reportfile}" data = { "action": "upload", "filename": filepath, "content": "exploit2!" } r = await client.post("/index.php", data=data) - assert_equals(r.status_code, 200, "path traversal failed") + assert_status_code(logger, r, 200, "path traversal") r = await client.get(f"/index.php?f={filepath}") if flag := searcher.search_flag(r.text): |
