aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLouis Burda <quent.burda@gmail.com>2022-10-26 19:28:04 +0200
committerLouis Burda <quent.burda@gmail.com>2022-10-26 19:28:04 +0200
commitb4f2da2ca448e173d2ff638d71018c50eca0982d (patch)
tree965872d1b692dacb4191d60c507dca3b0e54d018
parentab549121677c902683acf8887796ae039d7d571e (diff)
downloadbambi7-service-catchbox-b4f2da2ca448e173d2ff638d71018c50eca0982d.tar.gz
bambi7-service-catchbox-b4f2da2ca448e173d2ff638d71018c50eca0982d.zip
Add debug info for unexpected status_codes (untested)
-rw-r--r--checker/src/checker.py81
1 files changed, 45 insertions, 36 deletions
diff --git a/checker/src/checker.py b/checker/src/checker.py
index 0925763..962b073 100644
--- a/checker/src/checker.py
+++ b/checker/src/checker.py
@@ -1,3 +1,5 @@
+from bs4 import BeautifulSoup
+
from enochecker3 import (
ChainDB,
DependencyInjector,
@@ -12,11 +14,9 @@ from enochecker3 import (
)
from enochecker3.utils import FlagSearcher, assert_in, assert_equals
-from typing import Optional
-
-from httpx import AsyncClient
+import dateutil.parser
-from bs4 import BeautifulSoup
+from httpx import AsyncClient, Response
from hashlib import md5
@@ -24,9 +24,12 @@ from logging import LoggerAdapter
from subprocess import Popen, PIPE
-import dateutil.parser
import string
+
+from typing import Optional
+
import random
+
import os
checker = Enochecker("CatchBox", 9090)
@@ -48,18 +51,24 @@ def str2epoch(text: str) -> int:
date = dateutil.parser.parse(text + " UTC")
return int(date.timestamp())
+def assert_status_code(logger: LoggerAdapter, r: Response,
+ status_code: int, action: str) -> None:
+ if (r.status_code != status_code):
+ logger.warn(f"Bad service response for {action}:\n{r.text}")
+ raise MumbleException(status[0].upper() + status[1:] + " failed")
+
@checker.putflag(0)
async def putflag_file(task: PutflagCheckerTaskMessage, logger: LoggerAdapter,
client: AsyncClient, db: ChainDB) -> str:
username, password = noise(10, 20), noise(20, 30)
data = { "action": "register", "username": username, "password": password }
r = await client.post("/index.php", data=data)
- assert_equals(r.status_code, 200, "register failed")
+ assert_status_code(logger, r, 200, "register")
filename, content = noise(20, 30), task.flag
data = { "action": "upload", "filename": filename, "content": content }
r = await client.post("/index.php", data=data)
- assert_equals(r.status_code, 200, "upload failed")
+ assert_status_code(logger, r, 200, "file upload")
await db.set("info", (username, password, filename))
@@ -75,12 +84,12 @@ async def getflag_file(task: GetflagCheckerTaskMessage,
data = { "action": "login", "username": username, "password": password }
r = await client.post("/index.php", data=data)
- assert_equals(r.status_code, 200, "login failed")
+ assert_status_code(logger, r, 200, "login")
r = await client.get(f"/index.php?f={filename}")
- assert_equals(r.status_code, 200, "file download failed")
+ assert_status_code(logger, r, 200, "file download")
- assert_in(task.flag, r.text, "flag missing")
+ assert_in(task.flag, r.text, "Flag missing")
@checker.putflag(1)
async def putflag_report(task: PutflagCheckerTaskMessage, logger: LoggerAdapter,
@@ -88,11 +97,11 @@ async def putflag_report(task: PutflagCheckerTaskMessage, logger: LoggerAdapter,
username, password = noise(10, 20), noise(20, 30)
data = { "action": "register", "username": username, "password": password }
r = await client.post("/index.php", data=data)
- assert_equals(r.status_code, 200, "register failed")
+ assert_status_code(logger, r, 200, "register")
data = { "action": "report", "content": task.flag }
r = await client.post("/index.php", data=data)
- assert_equals(r.status_code, 200, "upload failed")
+ assert_status_code(logger, r, 200, "upload")
await db.set("info", (username, password))
@@ -100,41 +109,40 @@ async def putflag_report(task: PutflagCheckerTaskMessage, logger: LoggerAdapter,
@checker.getflag(1)
async def getflag_report(task: GetflagCheckerTaskMessage,
- client: AsyncClient, db: ChainDB) -> None:
+ logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> None:
try:
username, password = await db.get("info")
except KeyError:
- raise MumbleException("database info missing")
+ raise MumbleException("Database info missing")
data = { "action": "login", "username": username, "password": password }
r = await client.post("/index.php", data=data)
- assert_equals(r.status_code, 200, "login failed")
+ assert_status_code(logger, r, 200, "login")
r = await client.get(f"/index.php?r")
- assert_equals(r.status_code, 200, "report download failed")
+ assert_status_code(logger, r, 200, "report download")
- assert_in(task.flag, r.text, "flag missing")
+ assert_in(task.flag, r.text, "Flag missing")
@checker.putnoise(0)
async def putnoise_file(task: PutnoiseCheckerTaskMessage,
- client: AsyncClient, db: ChainDB) -> None:
+ logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> None:
username, password = noise(10, 20), noise(20, 30)
data = { "action": "register", "username": username, "password": password }
r = await client.post("/index.php", data=data)
- assert_equals(r.status_code, 200, "register failed")
+ assert_status_code(logger, r, 200, "register")
filename, content = noise(20, 30), noise(20, 30)
data = { "action": "upload", "filename": filename, "content": content }
r = await client.post("/index.php", data=data)
- assert_equals(r.status_code, 200, "upload failed")
+ assert_status_code(logger, r, 200, "file upload")
await db.set("info", (username, password, filename, content))
- return f"User {username} File {filename}"
-
@checker.getnoise(0)
async def getnoise_file(task: GetnoiseCheckerTaskMessage,
- client: AsyncClient, db: ChainDB, di: DependencyInjector) -> None:
+ logger: LoggerAdapter, client: AsyncClient,
+ db: ChainDB, di: DependencyInjector) -> None:
try:
username, password, filename, noise = await db.get("info")
except KeyError:
@@ -142,44 +150,45 @@ async def getnoise_file(task: GetnoiseCheckerTaskMessage,
data = { "action": "login", "username": username, "password": password }
r = await client.post("/index.php", data=data)
- assert_equals(r.status_code, 200, "login failed")
+ assert_status_code(logger, r., 200, "login")
r = await client.get("/index.php?q=files")
- assert_equals(r.status_code, 200, "files missing")
+ assert_status_code(logger, r, 200, "files query")
soup = BeautifulSoup(r.text, "html.parser")
files = [v.select("a") for v in soup.select("ul.filelist > li")]
assert_equals(all([len(v) == 2 for v in files]), True, "noise missing")
urls = { a.text.strip(): b.get("href", None) for a,b in files }
- assert_in(filename, urls, "noise missing")
+ assert_in(filename, urls, "Noise missing")
assert_equals(type(urls[filename]), str, "noise missing")
r = await client.get(f"/index.php?f={filename}")
- assert_equals(r.status_code, 200, "file download failed")
- assert_in(noise, r.text, "noise missing")
+ assert_status_code(logger, r, 200, "file download")
+ assert_in(noise, r.text, "Noise missing")
anon = await di.get(AsyncClient)
r = await anon.get(urls[filename])
- assert_equals(r.status_code, 200, "file public url invalid")
- assert_in(noise, r.text, "noise missing")
+ assert_status_code(logger, r, 200, "public file retrieve")
+ assert_in(noise, r.text, "Noise missing")
@checker.exploit(0)
-async def exploit_file_creat(task: ExploitCheckerTaskMessage, logger: LoggerAdapter,
- searcher: FlagSearcher, client: AsyncClient) -> Optional[str]:
+async def exploit_file_creat(task: ExploitCheckerTaskMessage,
+ logger: LoggerAdapter, searcher: FlagSearcher,
+ client: AsyncClient) -> Optional[str]:
assert_equals(type(task.attack_info), str, "attack info missing")
assert_equals(len(task.attack_info.split()), 4)
_, flaguser, _, flagfile = task.attack_info.split()
r = await client.get("/?q=users")
- assert_equals(r.status_code, 200, "users unavailable")
+ assert_status_code(logger, r, 200, "query users")
soup = BeautifulSoup(r.text, "html.parser")
users = [v.children for v in soup.select("ul.userlist > li")]
times = { a.text.strip(): str2epoch(b.text.strip()) for a,b in users }
- assert_in(flaguser, times, "flag user missing")
+ assert_in(flaguser, times, "Flag user missing")
for creat in range(times[flaguser], times[flaguser] + 15):
dirname = filehash(flaguser, flagfile, creat)
r = await client.get(f"/uploads/{dirname}/{flagfile}")
@@ -213,12 +222,12 @@ async def exploit_report_path(task: ExploitCheckerTaskMessage,
username, password = noise(10, 20), noise(20, 30)
data = { "action": "register", "username": username, "password": password }
r = await client.post("/index.php", data=data)
- assert_equals(r.status_code, 200, "register failed")
+ assert_status_code(logger, r, 200, "register")
filepath = f"../../reports/{reportfile}"
data = { "action": "upload", "filename": filepath, "content": "exploit2!" }
r = await client.post("/index.php", data=data)
- assert_equals(r.status_code, 200, "path traversal failed")
+ assert_status_code(logger, r, 200, "path traversal")
r = await client.get(f"/index.php?f={filepath}")
if flag := searcher.search_flag(r.text):