bambi6-service-postit

Simple Note-Taking A/D Service for BambiCTF6 in 2021
git clone https://git.sinitax.com/sinitax/bambi6-service-postit
Log | Files | Refs | README | LICENSE | sfeed.txt

checker.py (12351B)


      1#!/usr/bin/env python3
      2
      3import re
      4from asyncio import StreamReader, StreamWriter
      5from asyncio.exceptions import TimeoutError
      6from logging import LoggerAdapter
      7from typing import Any, List, Optional, Tuple, cast
      8
      9from enochecker3 import (
     10    AsyncSocket,
     11    ChainDB,
     12    DependencyInjector,
     13    Enochecker,
     14    ExploitCheckerTaskMessage,
     15    FlagSearcher,
     16    GetflagCheckerTaskMessage,
     17    GetnoiseCheckerTaskMessage,
     18    InternalErrorException,
     19    MumbleException,
     20    PutflagCheckerTaskMessage,
     21    PutnoiseCheckerTaskMessage,
     22)
     23
     24from crypto import RSA, fake_sign, get_key, load_keys, sign
     25from util import gen_noise, gen_username, timed
     26
     27OK = 0
     28FAIL = 1
     29
     30prompt = b"\r$ "
     31
     32
     33class Session:
     34    def __init__(self, socket: AsyncSocket, logger: LoggerAdapter) -> None:
     35        socket_tuple = cast(tuple[StreamReader, StreamWriter], socket)
     36        self.reader = socket_tuple[0]
     37        self.writer = socket_tuple[1]
     38        self.reader._limit = 2 ** 20  # type: ignore
     39        self.logger = logger
     40        self.closed = False
     41
     42    async def __aenter__(self) -> "Session":
     43        self.logger.debug("Preparing session")
     44        await self.prepare()
     45        return self
     46
     47    async def __aexit__(self, *args: list[Any], **kwargs: dict[str, Any]) -> None:
     48        self.logger.debug("Closing session")
     49        await self.close()
     50
     51    async def readuntil(
     52        self, target: bytes, include: bool = True, ctx: Optional[str] = None
     53    ) -> bytes:
     54        try:
     55            ctxstr = f"readuntil {target!r}" if ctx is None else ctx
     56            data = await timed(self.reader.readuntil(target), self.logger, ctx=ctxstr)
     57            msg = f"read:  {data[:100]!r}{'..' if len(data) > 100 else ''}"
     58            self.logger.debug(msg)
     59            if not include:
     60                data = data[: -len(target)]
     61            return data
     62        except TimeoutError:
     63            self.logger.critical(f"Service timed out while waiting for {target!r}")
     64            raise MumbleException("Service took too long to respond")
     65
     66    async def readline(self, ctx: Optional[str] = None) -> bytes:
     67        return await self.readuntil(b"\n", ctx=ctx)
     68
     69    async def read(self, n: int, ctx: Optional[str] = None) -> bytes:
     70        try:
     71            ctxstr = f"reading {n} bytes" if ctx is None else ctx
     72            data = await timed(self.reader.readexactly(n), self.logger, ctx=ctxstr)
     73            msg = f"read:  {data[:60]!r}{'..' if len(data) > 60 else ''}"
     74            self.logger.debug(msg)
     75            return data
     76        except TimeoutError:
     77            self.logger.critical(f"Service timed out while reading {n} bytes")
     78            raise MumbleException("Service took too long to respond")
     79
     80    async def drain(self) -> None:
     81        await self.writer.drain()
     82
     83    def write(self, data: bytes) -> None:
     84        msg = f"write: {data[:60]!r}{'..' if len(data) > 60 else ''}"
     85        self.logger.debug(msg)
     86        self.writer.write(data)
     87
     88    async def prepare(self) -> None:
     89        await self.readuntil(prompt)
     90
     91    async def exit(self) -> None:
     92        if self.closed:
     93            return
     94        self.write(b"exit\n")
     95        await self.drain()
     96        await self.readuntil(b"bye!")
     97        await self.close()
     98
     99    async def close(self) -> None:
    100        if self.closed:
    101            return
    102        self.closed = True
    103        self.writer.close()
    104        await self.writer.wait_closed()
    105
    106
    107class _Enochecker(Enochecker):
    108    async def _init(self) -> None:
    109        load_keys()
    110        await super()._init()
    111
    112
    113checker = _Enochecker("postit", 9337)
    114app = lambda: checker.app
    115
    116
    117@checker.register_dependency
    118def _get_session(socket: AsyncSocket, logger: LoggerAdapter) -> Session:
    119    return Session(socket, logger)
    120
    121
    122async def getdb(db: ChainDB, key: str) -> tuple[Any, ...]:
    123    try:
    124        return await db.get(key)
    125    except KeyError:
    126        raise MumbleException(
    127            "Could not retrieve necessary info for service interaction"
    128        )
    129
    130
    131async def get_users(
    132    session: Session, expect: int = OK
    133) -> Tuple[int, Optional[List[bytes]]]:
    134    session.write(b"users\n")
    135    await session.drain()
    136    resp = await session.readuntil(prompt, include=False)
    137    try:
    138        return OK, [
    139            line.split(b"- ", 1)[1] for line in resp.split(b"\n") if line != b""
    140        ]
    141    except IndexError:
    142        if expect == FAIL:
    143            return FAIL, None
    144        session.logger.critical(f"Unexpected response when retrieving users:\n{resp!r}")
    145        raise MumbleException("Failed to retrieve user list")
    146
    147
    148async def register(
    149    session: Session, username: bytes, rsa: "RSA", expect: int = OK
    150) -> int:
    151    session.write(b"register %b\n%i\n%i\n" % (username, rsa.e, rsa.n))
    152    await session.drain()
    153    resp = await session.readuntil(prompt, include=False)
    154    if not resp.endswith(b"Enter RSA modulus: "):
    155        if expect == FAIL:
    156            return FAIL
    157        session.logger.critical(
    158            f"Unexpected response during registration of user {username!r}:\n{resp!r}"
    159        )
    160        raise MumbleException("Registration not working properly")
    161
    162    _, users = await get_users(session)
    163    assert users is not None
    164    if username not in users:
    165        if expect == FAIL:
    166            return FAIL
    167        session.logger.critical(
    168            f"Registered user {username!r} missing from user list\n"
    169        )
    170        raise MumbleException("Registration not working properly")
    171
    172    return OK
    173
    174
    175async def login(session: Session, username: bytes, rsa: "RSA", expect: int = OK) -> int:
    176    session.write(b"login %b\n" % username)
    177    await session.drain()
    178
    179    resp = await session.readuntil(b"Signature: ")
    180    match = re.search(b"Sign this message: (.+)\n", resp)
    181    assert match is not None
    182    challenge = match.group(1)
    183
    184    try:
    185        signature = sign(challenge, rsa)
    186    except ValueError as e:
    187        session.logger.critical(
    188            f"Failed to generate signature: {e}\n\
    189                Public exponent: {rsa.e!r}\n\
    190                Private exponent: {rsa.d!r}\n\
    191                Modulus: {rsa.n!r}"
    192        )
    193        raise InternalErrorException("Failed to sign message")
    194
    195    session.write(b"%i\n" % signature)
    196    await session.drain()
    197
    198    resp = await session.readuntil(prompt, include=False)
    199    if resp != b"":
    200        if expect == FAIL:
    201            return FAIL
    202        session.logger.critical(
    203            f"Unexpected response during registration of user {username!r}:\n\
    204                Public exponent: {rsa.e!r}\n\
    205                Private exponent: {rsa.d!r}\n\
    206                Modulus: {rsa.n!r}\n\
    207                Challenge: {challenge!r}\n\
    208                Signature: {signature!r}\n\
    209                Response: {resp!r}"
    210        )
    211        raise MumbleException("Authentication not working properly")
    212
    213    return OK
    214
    215
    216async def userinfo(
    217    session: Session, username: bytes, expect: int = OK
    218) -> Tuple[int, Optional[Tuple[int, int]]]:
    219    session.write(b"info %b\n" % username)
    220    await session.drain()
    221    resp = await session.readuntil(prompt, include=False)
    222    match = re.search(b"Username: (.+)\nRSA Exponent: (.+)\nRSA Modulus: (.+)\n", resp)
    223    if match is None:
    224        if expect == FAIL:
    225            return FAIL, None
    226        session.logger.critical(
    227            f"Unable to retrieve info for user {username!r}\n\
    228                Received instead:\n{resp!r}"
    229        )
    230        raise MumbleException("User info not returned properly")
    231
    232    return OK, (int(match.group(2)), int(match.group(3)))
    233
    234
    235async def get_posts(
    236    session: Session, expect: int = OK
    237) -> Tuple[int, Optional[List[bytes]]]:
    238    session.write(b"posts\n")
    239    await session.drain()
    240    resp = await session.readuntil(prompt, include=False)
    241
    242    try:
    243        return OK, [
    244            line.split(b"- ", 1)[1] for line in resp.split(b"\n") if line != b""
    245        ]
    246    except IndexError:
    247        if expect == FAIL:
    248            return FAIL, None
    249        session.logger.critical(
    250            f"Unexpected response while retrieving posts:\n{resp!r}"
    251        )
    252        raise MumbleException("Post listing not working properly")
    253
    254
    255async def add_post(session: Session, msg: bytes, expect: int = OK) -> int:
    256    session.write(b"post %b\n" % msg)
    257    await session.drain()
    258    resp = await session.readuntil(prompt, include=False)
    259    if resp != b"":
    260        if expect == FAIL:
    261            return FAIL
    262        session.logger.critical(f"Unexpected response for post creation:\n{resp!r}")
    263        raise MumbleException("Post creation not working properly")
    264
    265    _, posts = await get_posts(session)
    266    assert posts is not None
    267    if msg not in posts:
    268        if expect == FAIL:
    269            return FAIL
    270        session.logger.critical(
    271            f"Previously added post {msg!r} is missing from post list {posts!r}\n"
    272        )
    273        raise MumbleException("Post creation not working properly")
    274
    275    return OK
    276
    277
    278@checker.putflag(0)
    279async def putflag(task: PutflagCheckerTaskMessage, di: DependencyInjector) -> str:
    280    db = await di.get(ChainDB)
    281
    282    rsa = get_key()
    283    username = gen_username()
    284
    285    session = await di.get(Session)
    286    await register(session, username, rsa)
    287    await login(session, username, rsa)
    288    await add_post(session, task.flag.encode())
    289    await session.exit()
    290
    291    await db.set("info", (username, rsa.encode()))
    292    return "User '{}' just posted a secret".format(username.decode())
    293
    294
    295@checker.getflag(0)
    296async def getflag(task: GetflagCheckerTaskMessage, di: DependencyInjector) -> None:
    297    db = await di.get(ChainDB)
    298    username, rsa_params = await getdb(db, "info")
    299    rsa = RSA.decode(rsa_params)
    300
    301    session = await di.get(Session)
    302    await login(session, username, rsa)
    303    _, posts = await get_posts(session)
    304    await session.exit()
    305
    306    assert posts is not None
    307    if task.flag.encode() not in posts:
    308        session.logger.critical(f"Flag is missing from posts:\n{posts!r}")
    309        raise MumbleException("Failed to retrieve flag")
    310
    311
    312@checker.putnoise(0)
    313async def putnoise(task: PutnoiseCheckerTaskMessage, di: DependencyInjector) -> None:
    314    db = await di.get(ChainDB)
    315
    316    rsa = get_key()
    317    username = gen_username()
    318    noise = gen_noise()
    319
    320    session = await di.get(Session)
    321    await register(session, username, rsa)
    322    await login(session, username, rsa)
    323    await add_post(session, noise)
    324    await session.exit()
    325
    326    await db.set("info", (username, rsa.encode(), noise))
    327
    328
    329@checker.getnoise(0)
    330async def getnoise(task: GetnoiseCheckerTaskMessage, di: DependencyInjector) -> None:
    331    db = await di.get(ChainDB)
    332    username, rsa_params, noise = await getdb(db, "info")
    333    rsa = RSA.decode(rsa_params)
    334
    335    session = await di.get(Session)
    336    await login(session, username, rsa)
    337    _, posts = await get_posts(session)
    338    await session.exit()
    339
    340    assert posts is not None
    341    if noise not in posts:
    342        session.logger.critical("Noise is missing from posts")
    343        raise MumbleException("Failed to retrieve noise")
    344
    345
    346@checker.exploit(0)
    347async def exploit(task: ExploitCheckerTaskMessage, di: DependencyInjector) -> None:
    348    assert task.attack_info is not None
    349    victim = task.attack_info[len("User '") : -len("' just posted a secret")].encode()
    350    searcher = await di.get(FlagSearcher)
    351
    352    session = await di.get(Session)
    353    _, res = await userinfo(session, victim)
    354    assert res is not None
    355    exp, mod = res
    356
    357    session.write(b"login %b\n" % victim)
    358    await session.drain()
    359    resp = await session.readuntil(b"Signature: ")
    360
    361    match = re.search(b"Sign this message: (.+)\n", resp)
    362    assert match is not None
    363    challenge = match.group(1)
    364
    365    try:
    366        signature = fake_sign(challenge, exp, mod)
    367    except (ValueError, AssertionError) as e:
    368        session.logger.critical(
    369            f"Failed to sign message: {e}\n\
    370                Public exponent: {exp!r}\n\
    371                Modulus: {mod!r}"
    372        )
    373        raise InternalErrorException("Failed generate fake signature")
    374    session.write(b"%i\n" % signature)
    375    await session.drain()
    376
    377    resp = await session.readuntil(prompt, include=False)
    378    if resp != b"":
    379        raise MumbleException("Fake signature was rejected by service")
    380
    381    _, posts = await get_posts(session)
    382    assert posts is not None
    383    if flag := searcher.search_flag(b"\n".join(posts)):
    384        return flag
    385
    386    raise MumbleException("Exploit failed to find flag in posts")
    387
    388
    389if __name__ == "__main__":
    390    checker.run(port=9338)