bambi7-service-fireworx

ESDSA-signed firework A/D service for BambiCTF7 in 2022
git clone https://git.sinitax.com/sinitax/bambi7-service-fireworx
Log | Files | Refs | README | LICENSE | sfeed.txt

checker.py (10915B)


      1from enochecker3 import (
      2    ChainDB,
      3    DependencyInjector,
      4    Enochecker,
      5    ExploitCheckerTaskMessage,
      6    GetflagCheckerTaskMessage,
      7    GetnoiseCheckerTaskMessage,
      8    HavocCheckerTaskMessage,
      9    InternalErrorException,
     10    MumbleException,
     11    PutflagCheckerTaskMessage,
     12    PutnoiseCheckerTaskMessage,
     13)
     14from enochecker3.utils import FlagSearcher, assert_in, assert_equals
     15
     16from typing import Optional, Callable
     17
     18from httpx import AsyncClient, Response
     19
     20from bs4 import BeautifulSoup
     21
     22from hashlib import md5
     23
     24from logging import LoggerAdapter
     25
     26from subprocess import Popen, PIPE
     27
     28import crypto
     29import dateutil.parser
     30import json
     31import os
     32import random
     33import string
     34import traceback
     35
     36checker = Enochecker("Fireworx", 1812)
     37app = lambda: checker.app
     38
     39random.seed(int.from_bytes(os.urandom(16), "little"))
     40
     41noise_alph = string.ascii_letters + string.digits
     42def noise(nmin: int, nmax: int) -> str:
     43    n = random.randint(nmin, nmax)
     44    return "".join(random.choice(noise_alph) for _ in range(n))
     45
     46def divmod(a: int, b: int, n: int) -> int:
     47    return (a * pow(b, n - 2, n)) % n
     48
     49def assert_status_code(logger: LoggerAdapter, r: Response, code: int = 200,
     50        parse: Optional[Callable[str, str]] = None) -> None:
     51    if r.status_code == code:
     52        return
     53    errlog = r.text
     54    if parse is not None:
     55        errlog = parse(errlog)
     56    logger.error(f"Bad status code during {r.request.method} {r.request.url.path}: " \
     57         + f"({r.status_code} != {code})\n{errlog}")
     58    raise MumbleException(f"{r.request.method} {r.request.url.path} failed")
     59
     60def parse_html(logger: LoggerAdapter, r: Response) -> BeautifulSoup:
     61    try:
     62        return BeautifulSoup(r.text, "html.parser")
     63    except:
     64        logger.error(f"Invalid html from {r.request.method} {r.request.url.path}\n" \
     65            + r.text)
     66        raise MumbleException(f"Invalid html ({r.request.method} {r.request.url.path})")
     67
     68def parse_notice(text: str) -> str:
     69    try:
     70        soup = BeautifulSoup(text, "html.parser")
     71        error = soup.select_one("meta#notice").get("content")
     72    except:
     73        raise MumbleException("Missing error from response")
     74    return error
     75
     76async def do_register(logger: LoggerAdapter,
     77        client: AsyncClient, username: str) -> None:
     78    r = await client.get("/register")
     79    assert_status_code(logger, r, code=200)
     80    soup = parse_html(logger, r)
     81
     82    privkey = crypto.DSAKey.gen()
     83
     84    data = {
     85        "username": username,
     86        "p": privkey.p,
     87        "q": privkey.q,
     88        "g": privkey.g,
     89        "x": privkey.x,
     90        "y": privkey.y
     91    }
     92    r = await client.post("/register", data=data)
     93    assert_status_code(logger, r, code=200)
     94
     95    return privkey
     96
     97async def do_login(logger: LoggerAdapter, client: AsyncClient,
     98        username: str, privkey: crypto.DSAKey) -> None:
     99
    100    r = await client.get("/challenge")
    101    assert_status_code(logger, r, code=200)
    102    try:
    103        challenge = int(r.text)
    104    except ValueError:
    105        raise MumbleException("Invalid challenge received")
    106
    107    sig_r, sig_s = privkey.sign(challenge)
    108
    109    data = {
    110        "username": username,
    111        "challenge": challenge,
    112        "signature": f"{sig_r},{sig_s}"
    113    }
    114    r = await client.post("/login", data=data)
    115    assert_status_code(logger, r, code=200)
    116
    117async def do_login_bad(logger: LoggerAdapter, client: AsyncClient,
    118        username: str, privkey: crypto.DSAKey) -> None:
    119
    120    r = await client.get("/challenge")
    121    assert_status_code(logger, r, code=200)
    122    try:
    123        challenge = int(r.text)
    124    except ValueError:
    125        raise MumbleException("Invalid challenge received")
    126
    127    sig_r, sig_s = privkey.sign(challenge)
    128    sig_bad_r = random.randint(2, pow(10, 49))
    129    sig_bad_s = random.randint(2, pow(10, 49))
    130
    131    data = {
    132        "username": username,
    133        "challenge": challenge,
    134        "signature": f"{sig_bad_r},{sig_bad_s}"
    135    }
    136    r = await client.post("/login", data=data)
    137    assert(r.status_code != 200)
    138
    139    try:
    140        sig = r.text.split("\n")[-1]
    141        r,s = (int(v) for v in sig.split(","))
    142        assert(sig_r == r and sig_s == s)
    143    except (KeyError, ValueError):
    144        raise MumbleException("Correct sig missing from login error")
    145
    146async def do_launch(logger: LoggerAdapter,
    147        client: AsyncClient, wish: str) -> None:
    148    data = {
    149        "type": "firework",
    150        "x": str(random.uniform(0, 1)),
    151        "y": str(random.uniform(0, 1)),
    152        "wish": wish
    153    }
    154    r = await client.post("/launch", data=data)
    155    assert_status_code(logger, r, code=200)
    156
    157async def do_profile(logger: LoggerAdapter, client: AsyncClient,
    158        username: Optional[str] = None) -> None:
    159    if username is not None:
    160        r = await client.get(f"/profile/{username}")
    161    else:
    162        r = await client.get("/profile")
    163    assert_status_code(logger, r, code=200)
    164    soup = parse_html(logger, r)
    165
    166    data = {}
    167    data["profile"] = {}
    168    data["events"] = []
    169
    170    for row in soup.select("#proplist table tr"):
    171        key = row.select_one("th").text
    172        value = row.select_one("td").text
    173        data["profile"][key] = value
    174
    175    for i, row in enumerate(soup.select("#eventlog table tr")):
    176        if i == 0:
    177            keys = [v.text for v in row.select("th")]
    178        else:
    179            event = {}
    180            vals = [v.text for v in row.select("td")]
    181            for k,v in zip(keys, vals):
    182                event[k] = v
    183            data["events"].append(event)
    184
    185    return data
    186
    187@checker.putflag(0)
    188async def putflag(task: PutflagCheckerTaskMessage, logger: LoggerAdapter,
    189        client: AsyncClient, db: ChainDB) -> str:
    190    username = noise(10, 20)
    191    privkey = await do_register(logger, client, username)
    192
    193    await do_launch(logger, client, task.flag)
    194
    195    await db.set("info", client.cookies["AIOHTTP_SESSION"])
    196
    197    return username
    198
    199@checker.getflag(0)
    200async def getflag(task: GetflagCheckerTaskMessage,
    201        logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> None:
    202    try:
    203        session_cookie = await db.get("info")
    204    except KeyError:
    205        raise MumbleException("Database info missing")
    206    client.cookies["AIOHTTP_SESSION"] = session_cookie
    207
    208    r = await client.get("/profile")
    209    assert_status_code(logger, r, code=200)
    210
    211    assert_in(task.flag, r.text, "Flag missing")
    212
    213@checker.putnoise(0)
    214async def putnoise(task: PutnoiseCheckerTaskMessage,
    215        logger: LoggerAdapter, client: AsyncClient, db: ChainDB) -> None:
    216    username = noise(10, 20)
    217    privkey = await do_register(logger, client, username)
    218
    219    wish = noise(20, 50)
    220    await do_launch(logger, client, wish)
    221
    222    keyvals = [str(v) for v in privkey.vals()]
    223    await db.set("info", (username, wish, keyvals))
    224
    225@checker.getnoise(0)
    226async def getnoise(task: GetnoiseCheckerTaskMessage,
    227        logger: LoggerAdapter, client: AsyncClient,
    228        db: ChainDB, di: DependencyInjector) -> None:
    229    try:
    230        username, wish, keyvals = await db.get("info")
    231    except KeyError:
    232        raise MumbleException("Database info missing")
    233
    234    keyvals = [int(v) for v in keyvals]
    235    privkey = crypto.DSAKey(*keyvals)
    236    await do_login_bad(logger, client, username, privkey)
    237    await do_login(logger, client, username, privkey)
    238
    239    data = await do_profile(logger, client)
    240
    241    try:
    242        for k,v in privkey.pubkey().dict().items():
    243            assert data["profile"][k] == str(v)
    244    except Exception as e:
    245        trace = traceback.format_exc()
    246        logger.error(f"Invalid public key info in profile\n{trace}")
    247        raise MumbleException("Invalid public key info in profile")
    248
    249    try:
    250        assert data["events"][0]["wish"] == wish
    251    except:
    252        raise MumbleException("Wish is missing from events logs")
    253
    254@checker.havoc(0)
    255async def havoc(task: HavocCheckerTaskMessage, logger: LoggerAdapter,
    256        client: AsyncClient, db: ChainDB, di: DependencyInjector) -> None:
    257    await do_register(logger, client, noise(10, 20))
    258
    259    for i in range(random.randint(1, 3)):
    260        await do_launch(logger, client, noise(20, 50))
    261
    262@checker.exploit(0)
    263async def exploit_trivial_sig(task: ExploitCheckerTaskMessage,
    264        logger: LoggerAdapter, searcher: FlagSearcher,
    265        client: AsyncClient) -> Optional[str]:
    266    if task.attack_info == "":
    267        raise InternalErrorException("Missing attack info")
    268    username = task.attack_info
    269
    270    data = await do_profile(logger, client, username)
    271    try:
    272        q = data["profile"]["q"]
    273    except KeyError:
    274        raise MumbleException("Missing pubkey q for profile")
    275
    276    r = await client.get("/challenge")
    277    assert_status_code(logger, r, code=200)
    278    challenge = r.text
    279
    280    data = {
    281        "username": username,
    282        "challenge": challenge,
    283        "signature": f"1,{q}"
    284    }
    285    r = await client.post("/login", data=data)
    286    assert_status_code(logger, r, code=200)
    287
    288    r = await client.get("/profile")
    289    assert_status_code(logger, r, code=200)
    290
    291    return searcher.search_flag(r.text)
    292
    293@checker.exploit(1)
    294async def exploit_nonce_reuse(task: ExploitCheckerTaskMessage,
    295        logger: LoggerAdapter, searcher: FlagSearcher,
    296        client: AsyncClient) -> Optional[str]:
    297    if task.attack_info == "":
    298        raise InternalErrorException("Missing attack info")
    299    username = task.attack_info
    300
    301    data = await do_profile(logger, client, username)
    302    try:
    303        p, q, g, y = [int(data["profile"][k]) for k in ("p", "q", "g", "y")]
    304    except KeyError as e:
    305        raise MumbleException(f"Missing pubkey components {e} in profile")
    306    except KeyError:
    307        raise MumbleException("Invalid pubkey components in profile")
    308
    309    sigpairs = []
    310    for i in range(2):
    311        r = await client.get("/challenge")
    312        assert_status_code(logger, r, code=200)
    313        challenge = int(r.text)
    314
    315        data = {
    316            "username": username,
    317            "challenge": challenge,
    318            "signature": f"1337,1337"
    319        }
    320        r = await client.post("/login", data=data)
    321        assert_status_code(logger, r, code=400)
    322
    323        try:
    324            sig = r.text.split("\n")[-1]
    325            r,s = (int(v) for v in sig.split(","))
    326        except (KeyError, ValueError):
    327            raise MumbleException("Correct sig missing from login error")
    328
    329        sigpairs.append((crypto.H(challenge), (r, s)))
    330
    331    z1, (r1, s1) = sigpairs[0]
    332    z2, (r2, s2) = sigpairs[1]
    333
    334    if r1 != r2:
    335        raise MumbleException("Signatures do not have same r, exploit fixed?")
    336
    337    k = divmod(z1 - z2, s1 - s2, q)
    338    x = divmod(k * s1 - z1, r1, q)
    339    privkey = crypto.DSAKey(p, q, g, x, y)
    340
    341    r = await client.get("/challenge")
    342    assert_status_code(logger, r, code=200)
    343    z3 = int(r.text)
    344    r3, s3 = privkey.sign(z3)
    345    assert privkey.pubkey().verify(z3, (r3, s3))
    346
    347    await do_login(logger, client, username, privkey)
    348
    349    r = await client.get("/profile")
    350    assert_status_code(logger, r, code=200)
    351
    352    return searcher.search_flag(r.text)
    353
    354if __name__ == "__main__":
    355    checker.run()