commit 3cbb749655bdbf2b9335773ec97b6a7d4af069c3 Author: Zev Averbach Date: Fri Jan 3 13:47:52 2025 +0100 feat: Add CLI module for YouTube summarization tool diff --git a/summarize_yt/cli.py b/summarize_yt/cli.py new file mode 100644 index 0000000..3c1aed9 --- /dev/null +++ b/summarize_yt/cli.py @@ -0,0 +1,311 @@ +# Standard library imports remain the same +import argparse +import os +import re +import signal +import subprocess +import sys +from datetime import datetime +from pathlib import Path +from typing import Optional, Tuple + +# Third-party packages +import anthropic + +# Local modules +from . import __version__ + +# Add cost estimation constants +CLAUDE_COST_PER_1K_INPUT = 0.015 # Cost per 1K tokens for input +CLAUDE_COST_PER_1K_OUTPUT = 0.075 # Cost per 1K tokens for output +ESTIMATED_TOKENS_PER_CHAR = 0.25 # Rough estimate of tokens per character + + +def estimate_api_cost(text: str) -> float: + """Estimate the cost of sending text to Claude API.""" + estimated_input_tokens = len(text) * ESTIMATED_TOKENS_PER_CHAR + estimated_output_tokens = 1024 # max_tokens setting + + input_cost = (estimated_input_tokens / 1000) * CLAUDE_COST_PER_1K_INPUT + output_cost = (estimated_output_tokens / 1000) * CLAUDE_COST_PER_1K_OUTPUT + + return input_cost + output_cost + + +def sanitize_filename(url: str) -> str: + """Convert URL to safe filename, keeping video ID.""" + # Extract video ID if it's a YouTube URL + video_id = None + if "youtube.com" in url or "youtu.be" in url: + if "v=" in url: + video_id = url.split("v=")[1].split("&")[0] + else: + video_id = url.split("/")[-1].split("?")[0] + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + return f"{timestamp}_{video_id if video_id else 'video'}" + + +def ensure_transcript_dir() -> Path: + """Create and return path to transcript directory.""" + transcript_dir = Path.home() / ".yts" / "transcripts" + transcript_dir.mkdir(parents=True, exist_ok=True) + return transcript_dir + + +def download_subtitles(url: str, quiet: bool = False) -> Tuple[bool, Optional[Path]]: + """Download subtitles from YouTube using yt-dlp.""" + try: + cmd = ["yt-dlp", "--skip-download", "--write-auto-sub", "--sub-lang", "en"] + if quiet: + cmd.append("--quiet") + cmd.append(url) + + print(f"Debug: Running command: {cmd}") # Debug line + + process = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True # This might help with encoding issues + ) + + try: + stdout, stderr = process.communicate(timeout=30) + print(f"Debug: stdout: {stdout}") # Debug line + print(f"Debug: stderr: {stderr}") # Debug line + if process.returncode != 0: + print(f"Error downloading subtitles: {stderr}", file=sys.stderr) + return False, None + except subprocess.TimeoutExpired: + process.kill() + print("Download timed out after 30 seconds", file=sys.stderr) + return False, None + + # Find the downloaded VTT file + current_dir = Path(".") + vtt_files = list(current_dir.glob("*.en.vtt")) + print(f"Debug: Found VTT files: {vtt_files}") # Debug line + if not vtt_files: + print("No VTT file found after download", file=sys.stderr) + return False, None + + return True, vtt_files[0] + + except Exception as e: + print(f"Error during subtitle download: {str(e)}", file=sys.stderr) + print(f"Debug: Full exception info: {type(e).__name__}: {str(e)}") # Debug line + import traceback + + traceback.print_exc() # This will print the full traceback + return False, None + + +def clean_vtt_text(text: str) -> str: + """Clean WebVTT text by removing timestamps, formatting tags, and metadata.""" + lines = text.split("\n") + + # Remove header lines + while lines and ( + lines[0].startswith("WEBVTT") + or lines[0].startswith("Kind:") + or lines[0].startswith("Language:") + or not lines[0].strip() + ): + lines.pop(0) + + # Process remaining lines + cleaned_lines = [] + for line in lines: + if "-->" in line or not line.strip(): + continue + line = re.sub(r"<\d{2}:\d{2}:\d{2}\.\d{3}>", "", line) + line = re.sub(r"", "", line) + if line.strip(): + cleaned_lines.append(line.strip()) + + seen = set() + unique_lines = [x for x in cleaned_lines if not (x in seen or seen.add(x))] + return "\n".join(unique_lines) + + +def process_file(input_path: Path, output_path: Path | None = None, quiet: bool = False) -> str: + """Process a single VTT file and output cleaned text.""" + try: + with open(input_path, "r", encoding="utf-8") as f: + content = f.read() + + cleaned_text = clean_vtt_text(content) + + if output_path: + with open(output_path, "w", encoding="utf-8") as f: + f.write(cleaned_text) + if not quiet: # Changed from args.quiet to quiet + print(f"Processed {input_path} -> {output_path}") + return cleaned_text + + except Exception as e: + print(f"Error processing {input_path}: {str(e)}", file=sys.stderr) + return "" + + +def save_transcript(text: str, url: str, prompt: str) -> Path: + """Save transcript with metadata to ~/.yts/transcripts/.""" + transcript_dir = ensure_transcript_dir() + filename = sanitize_filename(url) + ".txt" + filepath = transcript_dir / filename + + metadata = f"""URL: {url} +Script Version: {__version__} +Timestamp: {datetime.now().isoformat()} +Claude Prompt: {prompt} +--- +""" + + with open(filepath, "w", encoding="utf-8") as f: + f.write(metadata + text) + + return filepath + + +def cleanup_files(vtt_path: Optional[Path]): + """Remove downloaded files after processing.""" + try: + if vtt_path and vtt_path.exists(): + vtt_path.unlink() + + # Also cleanup any other VTT files in current directory + current_dir = Path(".") + for vtt_file in current_dir.glob("*.vtt"): + vtt_file.unlink() + + except Exception as e: + print(f"Warning: Could not clean up temporary files: {str(e)}", file=sys.stderr) + + +def get_summary_from_claude(text: str, prompt: str = "Please summarize this transcript in 500 words or less") -> str: + """Send text to Claude API for summarization.""" + try: + api_key = os.environ.get("ANTHROPIC_API_KEY") + if not api_key: + raise ValueError("ANTHROPIC_API_KEY environment variable not set") + + client = anthropic.Anthropic() + message = client.messages.create( + model="claude-3-sonnet-20240229", + max_tokens=1024, + temperature=0, + system="You are a helpful assistant that summarizes transcripts accurately and concisely.", + messages=[{"role": "user", "content": f"{prompt}:\n\n{text}"}], + ) + return message.content[0].text + + except Exception as e: + print(f"Error getting summary from Claude: {str(e)}", file=sys.stderr) + return "" + + +def main(): + parser = argparse.ArgumentParser(description="Download YouTube subtitles and get Claude summary") + parser.add_argument("url", help="YouTube video URL", type=str) + parser.add_argument("-o", "--output", help="Output file for summary") + parser.add_argument("-q", "--quiet", action="store_true", help="Suppress status messages") + parser.add_argument("--keep-files", action="store_true", help="Don't delete downloaded VTT files") + parser.add_argument("--transcript", action="store_true", help="Include full transcript in output") + parser.add_argument( + "--prompt", default="Please summarize this transcript in 500 words or less", help="Custom prompt for Claude" + ) + parser.add_argument("-y", "--yes", action="store_true", help="Skip cost confirmation") + + args = parser.parse_args() + + # Set up signal handler for clean exit + def signal_handler(sig, frame): + print("\nCleaning up and exiting...") + cleanup_files(None) # Clean any VTT files + print(f"Debug: Script terminated by signal at {datetime.now().isoformat()}") + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + + # Download subtitles + if not args.quiet: + print("Downloading subtitles...") + success, vtt_path = download_subtitles(args.url, args.quiet) + if not success: + cleanup_files(None) + sys.exit(1) + + # Process the VTT file + cleaned_text = process_file(vtt_path, None, args.quiet) + + # Save transcript + transcript_path = save_transcript(cleaned_text, args.url, args.prompt) + print(f"\nTranscript saved to: {transcript_path}") + + # Estimate and display cost + estimated_cost = estimate_api_cost(cleaned_text) + print(f"\nEstimated API cost: ${estimated_cost:.4f}") + + if not args.yes: + try: + import tty + import termios + + # Save the terminal settings + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + try: + # Set the terminal to raw mode + tty.setraw(sys.stdin.fileno()) + sys.stdout.write("\nDo you want to proceed with getting the summary? (y/N): ") + sys.stdout.flush() + # Read a single character + char = sys.stdin.read(1) + # Print a newline since we're in raw mode + sys.stdout.write("\n") + sys.stdout.flush() + finally: + # Restore terminal settings + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + + if char.lower() != "y": + print("Operation cancelled by user.") + cleanup_files(vtt_path) + sys.exit(0) + + except (EOFError, KeyboardInterrupt, termios.error): + print("\nOperation cancelled by user.") + cleanup_files(vtt_path) + sys.exit(0) + + # Get summary from Claude + if not args.quiet: + print("\nGetting summary from Claude...") + summary = get_summary_from_claude(cleaned_text, args.prompt) + + # Prepare output + output = "" + if args.transcript: + output += "=== Full Transcript ===\n\n" + output += cleaned_text + output += "\n\n=== Summary ===\n\n" + output += summary + + # Output results + output_path = args.output + if output_path: + with open(output_path, "w", encoding="utf-8") as f: + f.write(output) + print(f"\nSummary saved to: {output_path}") + else: + print("\n=== Summary ===") + print(output) + + # Cleanup downloaded files unless --keep-files is specified + if not args.keep_files: + cleanup_files(vtt_path) + + print(f"\nDebug: Script completed successfully at {datetime.now().isoformat()}") + sys.exit(0) + + +if __name__ == "__main__": + main()