import datetime import json import os import shutil import signal import subprocess import sys import tempfile import time from pathlib import Path from typing import Optional import click import questionary from questionary import Choice, Style from rich.console import Console from rich.table import Table from rich import box console = Console() def get_tasks_base_dir() -> Path: return Path.home() / ".claude" / "tasks" def get_session_state_file() -> Path: return Path(os.path.expanduser("~/.claude/ctask_state.json")) def get_session_leader_pid() -> int: return os.getsid(0) def load_session_state() -> dict: state_file = get_session_state_file() if state_file.exists(): with open(state_file) as f: return json.load(f) return {} def save_session_state(state: dict): state_file = get_session_state_file() state_file.parent.mkdir(parents=True, exist_ok=True) with open(state_file, 'w') as f: json.dump(state, f, indent=2) def get_current_session_path() -> Optional[Path]: pid = get_session_leader_pid() state = load_session_state() session_path = state.get(str(pid)) if session_path: return Path(session_path) return None def require_session() -> Path: session_path = get_current_session_path() if session_path is None: console.print("[red]No session selected. Run 'ctask select' first.[/red]") sys.exit(1) return session_path def set_current_session_path(path: Path): pid = get_session_leader_pid() state = load_session_state() state[str(pid)] = str(path) save_session_state(state) def load_task(task_file: Path) -> dict: with open(task_file) as f: return json.load(f) def task_to_md(task: dict) -> str: lines = ["---", f"subject: {task.get('subject', '')}"] blocks = task.get("blocks", []) if blocks: lines.append(f"blocks: {', '.join(blocks)}") blocked_by = task.get("blockedBy", []) if blocked_by: lines.append(f"blockedBy: {', '.join(blocked_by)}") lines += ["---", "", task.get("description", "")] return "\n".join(lines) def md_to_task(text: str, original: dict) -> dict: lines = text.split("\n") if not lines or lines[0].strip() != "---": raise ValueError("Missing opening frontmatter delimiter") try: end = next(i for i in range(1, len(lines)) if lines[i].strip() == "---") except StopIteration: raise ValueError("Missing closing frontmatter delimiter") fm_lines = lines[1:end] body = "\n".join(lines[end + 1:]).strip() fm = {} for line in fm_lines: if ":" in line: k, _, v = line.partition(":") fm[k.strip()] = v.strip() def parse_ids(s): return [x.strip() for x in s.split(",") if x.strip()] if s else [] task = dict(original) task["subject"] = fm.get("subject", task.get("subject", "")) task["blocks"] = parse_ids(fm.get("blocks", "")) task["blockedBy"] = parse_ids(fm.get("blockedBy", "")) task["description"] = body return task def save_task(task_file: Path, task: dict): with open(task_file, 'w') as f: json.dump(task, f, indent=2) task_file.chmod(0o644) def get_all_tasks(task_dir: Path) -> list[tuple[Path, dict]]: if not task_dir.exists(): return [] tasks = [] for task_file in sorted(task_dir.glob("*.json")): try: task = load_task(task_file) tasks.append((task_file, task)) except Exception: continue return tasks def get_next_task_id(task_dir: Path) -> int: tasks = get_all_tasks(task_dir) if tasks: return max(int(task["id"]) for _, task in tasks) + 1 return 1 def _run_impl(resume: bool = False): ctask_bin = sys.argv[0] if sys.argv[0] != "-c" else (shutil.which("ctask") or "ctask") settings = { "hooks": { "Stop": [ { "hooks": [ { "type": "command", "command": f"{ctask_bin} --stop-hook" } ] } ] } } with tempfile.NamedTemporaryFile( mode="w", suffix=".json", prefix="ctask_settings_", delete=False ) as tmp: json.dump(settings, tmp, indent=2) tmp_path = Path(tmp.name) try: cmd = ["claude"] if resume: cmd.append("--resume") cmd += ["--settings", str(tmp_path), "Use the tasks tool to check for tasks and run them"] result = subprocess.run(cmd) sys.exit(result.returncode) finally: tmp_path.unlink(missing_ok=True) def _stop_hook_impl(): try: hook_input = json.loads(sys.stdin.read()) except Exception: hook_input = {} with open("/tmp/log", "w") as _log: print(json.dumps(hook_input, indent=2), file=_log) session_id = hook_input.get("session_id") if not session_id: sys.exit(0) session_path = get_tasks_base_dir() / session_id def get_unblocked_tasks() -> list[dict]: tasks = get_all_tasks(session_path) if not tasks: return [] completed_ids = {t["id"] for _, t in tasks if t.get("status") == "completed"} return [ task for _, task in tasks if task.get("status") not in ("completed", "deleted") and all(bid in completed_ids for bid in task.get("blockedBy", [])) ] def block_stop(tasks: list[dict]): task_lines = "\n".join( f"- #{t['id']}: {t.get('subject', '(no subject)')} [{t.get('status', 'pending')}]" for t in tasks ) reason = f"Use the tasks tool to check for tasks and run them\n\nPending tasks:\n{task_lines}" print(json.dumps({"decision": "block", "reason": reason})) sys.exit(0) pending = get_unblocked_tasks() if pending: block_stop(pending) def handle_signal(signum, frame): sys.exit(0) signal.signal(signal.SIGINT, handle_signal) signal.signal(signal.SIGTERM, handle_signal) def dir_snapshot() -> dict: if not session_path.exists(): return {} return {str(f): f.stat().st_mtime for f in session_path.glob("*.json")} snapshot = dir_snapshot() while True: time.sleep(1) new_snapshot = dir_snapshot() if new_snapshot != snapshot: snapshot = new_snapshot pending = get_unblocked_tasks() if pending: block_stop(pending) @click.group(invoke_without_command=True, context_settings={"help_option_names": ["-h", "--help"]}) @click.option("--stop-hook", "do_stop_hook", is_flag=True, hidden=True) @click.pass_context def main(ctx, do_stop_hook): if do_stop_hook: _stop_hook_impl() return if ctx.invoked_subcommand is None: ctx.invoke(list_tasks) def _list_tasks_impl(): task_dir = require_session() tasks = get_all_tasks(task_dir) if not tasks: console.print(f"[yellow]No tasks found in {task_dir}[/yellow]") return terminal_width = console.width min_table_width = min(80, terminal_width) table = Table( box=box.SIMPLE, min_width=min_table_width, show_header=True, header_style="bold", padding=(0, 0) ) table.add_column("ID", style="cyan", no_wrap=True) table.add_column("Status", style="magenta", no_wrap=True) table.add_column("Subject", style="green", ratio=2) table.add_column("Description", style="white", ratio=1) table.add_column("Blocks", style="yellow", no_wrap=True) table.add_column("Blocked By", style="red", no_wrap=True) for _, task in tasks: blocks = ", ".join(task.get("blocks", [])) or "-" blocked_by = ", ".join(task.get("blockedBy", [])) or "-" description = task.get("description", "") if description and len(description) > 50: description = description[:47] + "..." table.add_row( task["id"], task.get("status", "unknown"), task.get("subject", ""), description, blocks, blocked_by ) console.print(table) @main.command("list") def list_tasks(): _list_tasks_impl() @main.command("ls") def ls_tasks(): _list_tasks_impl() def _create_task_impl(name, description, status, active_form, blocks, blocked_by): if name is None: skeleton = { "subject": "", "status": status, "activeForm": active_form or "", "blocks": list(blocks), "blockedBy": list(blocked_by), "description": description, } md_content = task_to_md(skeleton) with tempfile.NamedTemporaryFile( mode="w", suffix=".md", prefix="ctask_new_", delete=False ) as tmp: tmp.write(md_content) tmp_path = Path(tmp.name) try: mtime_before = tmp_path.stat().st_mtime editor = os.environ.get("EDITOR", "vi") subprocess.run([editor, str(tmp_path)]) mtime_after = tmp_path.stat().st_mtime if mtime_after == mtime_before: console.print("[yellow]No changes made, task not created[/yellow]") return parsed = md_to_task(tmp_path.read_text(), skeleton) except ValueError as e: console.print(f"[red]Parse error: {e}[/red]") sys.exit(1) finally: tmp_path.unlink(missing_ok=True) name = parsed["subject"].strip() if not name: console.print("[yellow]No subject provided, task not created[/yellow]") return description = parsed["description"] status = parsed["status"] active_form = parsed["activeForm"] blocks = parsed["blocks"] blocked_by = parsed["blockedBy"] task_dir = require_session() task_dir.mkdir(parents=True, exist_ok=True) task_id = get_next_task_id(task_dir) task = { "id": str(task_id), "subject": name, "description": description, "activeForm": active_form or f"Working on {name}", "status": status, "blocks": list(blocks), "blockedBy": list(blocked_by) } task_file = task_dir / f"{task_id}.json" save_task(task_file, task) console.print(f"[green]Created task {task_id}: {name}[/green]") console.print(f"[dim]{task_file}[/dim]") @main.command("create") @click.argument("name", required=False, default=None) @click.argument("description", required=False, default="") @click.option("--status", default="pending", help="Task status") @click.option("--active-form", help="Active form description") @click.option("--blocks", "--before", multiple=True, help="Task IDs this blocks (comes before)") @click.option("--blocked-by", "--after", multiple=True, help="Task IDs blocking this (comes after)") def create_task(name, description, status, active_form, blocks, blocked_by): _create_task_impl(name, description, status, active_form, blocks, blocked_by) @main.command("add") @click.argument("name", required=False, default=None) @click.argument("description", required=False, default="") @click.option("--status", default="pending", help="Task status") @click.option("--active-form", help="Active form description") @click.option("--blocks", "--before", multiple=True, help="Task IDs this blocks (comes before)") @click.option("--blocked-by", "--after", multiple=True, help="Task IDs blocking this (comes after)") def add_task(name, description, status, active_form, blocks, blocked_by): _create_task_impl(name, description, status, active_form, blocks, blocked_by) @main.command("mod") @click.argument("task_id") @click.option("--name", help="New task name") @click.option("--description", help="New description") @click.option("--status", help="New status") @click.option("--active-form", help="New active form") @click.option("--blocks", "--before", multiple=True, help="Set task IDs this blocks (comes before)") @click.option("--blocked-by", "--after", multiple=True, help="Set task IDs blocking this (comes after)") @click.option("--unblock", is_flag=True, help="Clear all blockers before applying --blocked-by") def modify_task(task_id, name, description, status, active_form, blocks, blocked_by, unblock): task_dir = require_session() task_file = task_dir / f"{task_id}.json" if not task_file.exists(): console.print(f"[red]Task {task_id} not found[/red]") sys.exit(1) task = load_task(task_file) if name: task["subject"] = name if description: task["description"] = description if status: task["status"] = status if active_form: task["activeForm"] = active_form if unblock: task["blockedBy"] = [] if blocks: task["blocks"] = list(blocks) if blocked_by: if unblock: task["blockedBy"] = list(blocked_by) else: task["blockedBy"].extend(blocked_by) save_task(task_file, task) console.print(f"[green]Modified task {task_id}[/green]") @main.command("edit") @click.argument("task_id") @click.option("--json", "use_json", is_flag=True, help="Edit raw JSON instead of markdown") def edit_task(task_id, use_json): task_dir = require_session() task_file = task_dir / f"{task_id}.json" if not task_file.exists(): console.print(f"[red]Task {task_id} not found[/red]") sys.exit(1) editor = os.environ.get("EDITOR", "vi") if use_json: subprocess.run([editor, str(task_file)]) return task = load_task(task_file) md_content = task_to_md(task) with tempfile.NamedTemporaryFile( mode="w", suffix=".md", prefix=f"ctask_{task_id}_", delete=False ) as tmp: tmp.write(md_content) tmp_path = Path(tmp.name) try: mtime_before = tmp_path.stat().st_mtime subprocess.run([editor, str(tmp_path)]) mtime_after = tmp_path.stat().st_mtime if mtime_after == mtime_before: console.print("[yellow]No changes made[/yellow]") return updated_text = tmp_path.read_text() updated_task = md_to_task(updated_text, task) save_task(task_file, updated_task) console.print(f"[green]Saved task {task_id}[/green]") except ValueError as e: console.print(f"[red]Parse error: {e}[/red]") sys.exit(1) finally: tmp_path.unlink(missing_ok=True) def _delete_task_impl(task_id): task_dir = require_session() task_file = task_dir / f"{task_id}.json" if not task_file.exists(): console.print(f"[red]Task {task_id} not found[/red]") sys.exit(1) task = load_task(task_file) task_file.unlink() console.print(f"[green]Deleted task {task_id}: {task.get('subject', '')}[/green]") console.print(f"[dim]{task_file}[/dim]") @main.command("delete") @click.argument("task_id") def delete_task(task_id): _delete_task_impl(task_id) @main.command("rm") @click.argument("task_id") def rm_task(task_id): _delete_task_impl(task_id) def _relative_time(mtime: float) -> str: diff = time.time() - mtime if diff < 60: return "just now" elif diff < 3600: n = int(diff / 60) return f"{n} minute{'s' if n != 1 else ''} ago" elif diff < 86400: n = int(diff / 3600) return f"{n} hour{'s' if n != 1 else ''} ago" elif diff < 86400 * 7: n = int(diff / 86400) return f"{n} day{'s' if n != 1 else ''} ago" else: n = int(diff / (86400 * 7)) return f"{n} week{'s' if n != 1 else ''} ago" def _transcript_cwd(transcript: Path) -> Optional[str]: try: with open(transcript) as f: for line in f: line = line.strip() if not line: continue entry = json.loads(line) if "cwd" in entry: return entry["cwd"] except Exception: pass return None def get_available_sessions() -> list[dict]: projects_dir = Path.home() / ".claude" / "projects" if not projects_dir.exists(): return [] sessions = [] for project_dir in projects_dir.iterdir(): if not project_dir.is_dir(): continue for transcript in project_dir.glob("*.jsonl"): cwd = _transcript_cwd(transcript) mtime = transcript.stat().st_mtime session_id = transcript.stem task_path = get_tasks_base_dir() / session_id if cwd: parts = Path(cwd).parts name = str(Path(*parts[-2:])) if len(parts) >= 2 else cwd else: name = project_dir.name name = f"{name} {session_id[:8]}" dt = datetime.datetime.fromtimestamp(mtime).strftime("%Y-%m-%d %H:%M") description = f"{dt} ({_relative_time(mtime)})" sessions.append({ "name": name, "path": str(task_path), "description": description, "mtime": mtime, }) sessions.sort(key=lambda x: x["mtime"], reverse=True) return [{"name": s["name"], "path": s["path"], "description": s["description"]} for s in sessions] @main.command("select") def select_session(): sessions_script = os.environ.get("CTASK_SESSIONS") if sessions_script: try: result = subprocess.run([sessions_script], capture_output=True, text=True, check=True) sessions = json.loads(result.stdout) except Exception as e: console.print(f"[red]Error getting sessions: {e}[/red]") sys.exit(1) else: sessions = get_available_sessions() if not sessions: console.print("[yellow]No sessions available[/yellow]") return custom_style = Style([ ('session_name', 'bold') ]) choices = [] for session in sessions: name = session["name"] description = session.get("description") if description: choice_text = [ ("class:session_name", name), ("", f" - {description}") ] else: choice_text = [("class:session_name", name)] choices.append(Choice(title=choice_text, value=session)) try: selected = questionary.select( "Select a session:", choices=choices, style=custom_style ).ask() if selected: selected_path = Path(selected["path"]) set_current_session_path(selected_path) console.print(f"[green]Selected session: {selected['name']}[/green]") else: console.print("[yellow]No session selected[/yellow]") except KeyboardInterrupt: console.print("\n[yellow]Selection cancelled[/yellow]") sys.exit(1) @main.command("run") @click.option("--resume", "resume", is_flag=True, help="Resume the last Claude session") def run_claude(resume): _run_impl(resume=resume) if __name__ == "__main__": main()