# Standard library imports remain the same import argparse import os import re import signal import sys from datetime import datetime from yt_dlp import YoutubeDL from pathlib import Path from typing import Optional, Tuple # Third-party packages import anthropic # Local modules from . import __version__ # Add cost estimation constants CLAUDE_COST_PER_1K_INPUT = 0.015 # Cost per 1K tokens for input CLAUDE_COST_PER_1K_OUTPUT = 0.075 # Cost per 1K tokens for output ESTIMATED_TOKENS_PER_CHAR = 0.25 # Rough estimate of tokens per character def estimate_api_cost(text: str) -> float: """Estimate the cost of sending text to Claude API.""" estimated_input_tokens = len(text) * ESTIMATED_TOKENS_PER_CHAR estimated_output_tokens = 1024 # max_tokens setting input_cost = (estimated_input_tokens / 1000) * CLAUDE_COST_PER_1K_INPUT output_cost = (estimated_output_tokens / 1000) * CLAUDE_COST_PER_1K_OUTPUT return input_cost + output_cost def sanitize_filename(url: str) -> str: """Convert URL to safe filename, keeping video ID.""" # Extract video ID if it's a YouTube URL video_id = None if "youtube.com" in url or "youtu.be" in url: if "v=" in url: video_id = url.split("v=")[1].split("&")[0] else: video_id = url.split("/")[-1].split("?")[0] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") return f"{timestamp}_{video_id if video_id else 'video'}" def ensure_transcript_dir() -> Path: """Create and return path to transcript directory.""" transcript_dir = Path.home() / ".yts" / "transcripts" transcript_dir.mkdir(parents=True, exist_ok=True) return transcript_dir def download_subtitles(url: str, quiet: bool = False) -> Tuple[bool, Optional[Path]]: """Download subtitles from YouTube using yt-dlp.""" try: ydl_opts = { 'skip_download': True, 'writeautomaticsub': True, 'subtitleslangs': ['en'], 'quiet': quiet, 'no_warnings': quiet, } if not quiet: print(f"Debug: Downloading subtitles for {url}") with YoutubeDL(ydl_opts) as ydl: ydl.download([url]) # Find the downloaded VTT file current_dir = Path(".") vtt_files = list(current_dir.glob("*.en.vtt")) if not quiet: print(f"Debug: Found VTT files: {vtt_files}") if not vtt_files: print("No VTT file found after download", file=sys.stderr) return False, None return True, vtt_files[0] except Exception as e: print(f"Error during subtitle download: {str(e)}", file=sys.stderr) if not quiet: print(f"Debug: Full exception info: {type(e).__name__}: {str(e)}") import traceback traceback.print_exc() return False, None def clean_vtt_text(text: str) -> str: """Clean WebVTT text by removing timestamps, formatting tags, and metadata.""" lines = text.split("\n") # Remove header lines while lines and ( lines[0].startswith("WEBVTT") or lines[0].startswith("Kind:") or lines[0].startswith("Language:") or not lines[0].strip() ): lines.pop(0) # Process remaining lines cleaned_lines = [] for line in lines: if "-->" in line or not line.strip(): continue line = re.sub(r"<\d{2}:\d{2}:\d{2}\.\d{3}>", "", line) line = re.sub(r"", "", line) if line.strip(): cleaned_lines.append(line.strip()) seen = set() unique_lines = [x for x in cleaned_lines if not (x in seen or seen.add(x))] return "\n".join(unique_lines) def process_file(input_path: Path, output_path: Path | None = None, quiet: bool = False) -> str: """Process a single VTT file and output cleaned text.""" try: with open(input_path, "r", encoding="utf-8") as f: content = f.read() cleaned_text = clean_vtt_text(content) if output_path: with open(output_path, "w", encoding="utf-8") as f: f.write(cleaned_text) if not quiet: # Changed from args.quiet to quiet print(f"Processed {input_path} -> {output_path}") return cleaned_text except Exception as e: print(f"Error processing {input_path}: {str(e)}", file=sys.stderr) return "" def save_transcript(text: str, url: str, prompt: str) -> Path: """Save transcript with metadata to ~/.yts/transcripts/.""" transcript_dir = ensure_transcript_dir() filename = sanitize_filename(url) + ".txt" filepath = transcript_dir / filename metadata = f"""URL: {url} Script Version: {__version__} Timestamp: {datetime.now().isoformat()} Claude Prompt: {prompt} --- """ with open(filepath, "w", encoding="utf-8") as f: f.write(metadata + text) return filepath def cleanup_files(vtt_path: Optional[Path]): """Remove downloaded files after processing.""" try: if vtt_path and vtt_path.exists(): vtt_path.unlink() # Also cleanup any other VTT files in current directory current_dir = Path(".") for vtt_file in current_dir.glob("*.vtt"): vtt_file.unlink() except Exception as e: print(f"Warning: Could not clean up temporary files: {str(e)}", file=sys.stderr) def get_summary_from_claude(text: str, prompt: str = "Please summarize this transcript in 500 words or less") -> str: """Send text to Claude API for summarization.""" try: api_key = os.environ.get("ANTHROPIC_API_KEY") if not api_key: raise ValueError("ANTHROPIC_API_KEY environment variable not set") client = anthropic.Anthropic() message = client.messages.create( model="claude-3-sonnet-20240229", max_tokens=1024, temperature=0, system="You are a helpful assistant that summarizes transcripts accurately and concisely.", messages=[{"role": "user", "content": f"{prompt}:\n\n{text}"}], ) return message.content[0].text except Exception as e: print(f"Error getting summary from Claude: {str(e)}", file=sys.stderr) return "" def main(): parser = argparse.ArgumentParser(description="Download YouTube subtitles and get Claude summary") parser.add_argument("url", help="YouTube video URL", type=str) parser.add_argument("-o", "--output", help="Output file for summary") parser.add_argument("-q", "--quiet", action="store_true", help="Suppress status messages") parser.add_argument("--keep-files", action="store_true", help="Don't delete downloaded VTT files") parser.add_argument("--transcript", action="store_true", help="Include full transcript in output") parser.add_argument( "--prompt", default="Please summarize this transcript in 500 words or less", help="Custom prompt for Claude" ) parser.add_argument("-y", "--yes", action="store_true", help="Skip cost confirmation") args = parser.parse_args() # Set up signal handler for clean exit def signal_handler(sig, frame): print("\nCleaning up and exiting...") cleanup_files(None) # Clean any VTT files print(f"Debug: Script terminated by signal at {datetime.now().isoformat()}") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) # Download subtitles if not args.quiet: print("Downloading subtitles...") success, vtt_path = download_subtitles(args.url, args.quiet) if not success: cleanup_files(None) sys.exit(1) # Process the VTT file cleaned_text = process_file(vtt_path, None, args.quiet) # Save transcript transcript_path = save_transcript(cleaned_text, args.url, args.prompt) print(f"\nTranscript saved to: {transcript_path}") # Estimate and display cost estimated_cost = estimate_api_cost(cleaned_text) print(f"\nEstimated API cost: ${estimated_cost:.4f}") if not args.yes: try: # Check if running in a terminal if sys.stdin.isatty(): import tty import termios # Save the terminal settings fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: # Set the terminal to raw mode tty.setraw(sys.stdin.fileno()) sys.stdout.write("\nDo you want to proceed with getting the summary? (y/N): ") sys.stdout.flush() # Read a single character char = sys.stdin.read(1) # Print a newline since we're in raw mode sys.stdout.write("\n") sys.stdout.flush() finally: # Restore terminal settings termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) else: # If not in a terminal, use regular input sys.stdout.write("\nDo you want to proceed with getting the summary? (y/N): ") sys.stdout.flush() char = input().strip() if not char or char.lower() != "y": print("Operation cancelled by user.") cleanup_files(vtt_path) sys.exit(0) except (EOFError, KeyboardInterrupt, termios.error): print("\nOperation cancelled by user.") cleanup_files(vtt_path) sys.exit(0) # Get summary from Claude if not args.quiet: print("\nGetting summary from Claude...") summary = get_summary_from_claude(cleaned_text, args.prompt) # Prepare output output = "" if args.transcript: output += "=== Full Transcript ===\n\n" output += cleaned_text output += "\n\n=== Summary ===\n\n" output += summary # Output results output_path = args.output if output_path: with open(output_path, "w", encoding="utf-8") as f: f.write(output) print(f"\nSummary saved to: {output_path}") else: print("\n=== Summary ===") print(output) # Cleanup downloaded files unless --keep-files is specified if not args.keep_files: cleanup_files(vtt_path) print(f"\nDebug: Script completed successfully at {datetime.now().isoformat()}") sys.exit(0) if __name__ == "__main__": main()