# Standard library imports remain the same import argparse import os import re import signal import subprocess import sys from datetime import datetime from pathlib import Path from typing import Optional, Tuple # Third-party packages import anthropic # Local modules from . import __version__ # Add cost estimation constants CLAUDE_COST_PER_1K_INPUT = 0.015 # Cost per 1K tokens for input CLAUDE_COST_PER_1K_OUTPUT = 0.075 # Cost per 1K tokens for output ESTIMATED_TOKENS_PER_CHAR = 0.25 # Rough estimate of tokens per character def estimate_api_cost(text: str) -> float: """Estimate the cost of sending text to Claude API.""" estimated_input_tokens = len(text) * ESTIMATED_TOKENS_PER_CHAR estimated_output_tokens = 1024 # max_tokens setting input_cost = (estimated_input_tokens / 1000) * CLAUDE_COST_PER_1K_INPUT output_cost = (estimated_output_tokens / 1000) * CLAUDE_COST_PER_1K_OUTPUT return input_cost + output_cost def sanitize_filename(url: str) -> str: """Convert URL to safe filename, keeping video ID.""" # Extract video ID if it's a YouTube URL video_id = None if "youtube.com" in url or "youtu.be" in url: if "v=" in url: video_id = url.split("v=")[1].split("&")[0] else: video_id = url.split("/")[-1].split("?")[0] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") return f"{timestamp}_{video_id if video_id else 'video'}" def ensure_transcript_dir() -> Path: """Create and return path to transcript directory.""" transcript_dir = Path.home() / ".yts" / "transcripts" transcript_dir.mkdir(parents=True, exist_ok=True) return transcript_dir def download_subtitles(url: str, quiet: bool = False) -> Tuple[bool, Optional[Path]]: """Download subtitles from YouTube using yt-dlp.""" try: cmd = ["yt-dlp", "--skip-download", "--write-auto-sub", "--sub-lang", "en"] if quiet: cmd.append("--quiet") cmd.append(url) print(f"Debug: Running command: {cmd}") # Debug line process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True # This might help with encoding issues ) try: stdout, stderr = process.communicate(timeout=30) print(f"Debug: stdout: {stdout}") # Debug line print(f"Debug: stderr: {stderr}") # Debug line if process.returncode != 0: print(f"Error downloading subtitles: {stderr}", file=sys.stderr) return False, None except subprocess.TimeoutExpired: process.kill() print("Download timed out after 30 seconds", file=sys.stderr) return False, None # Find the downloaded VTT file current_dir = Path(".") vtt_files = list(current_dir.glob("*.en.vtt")) print(f"Debug: Found VTT files: {vtt_files}") # Debug line if not vtt_files: print("No VTT file found after download", file=sys.stderr) return False, None return True, vtt_files[0] except Exception as e: print(f"Error during subtitle download: {str(e)}", file=sys.stderr) print(f"Debug: Full exception info: {type(e).__name__}: {str(e)}") # Debug line import traceback traceback.print_exc() # This will print the full traceback return False, None def clean_vtt_text(text: str) -> str: """Clean WebVTT text by removing timestamps, formatting tags, and metadata.""" lines = text.split("\n") # Remove header lines while lines and ( lines[0].startswith("WEBVTT") or lines[0].startswith("Kind:") or lines[0].startswith("Language:") or not lines[0].strip() ): lines.pop(0) # Process remaining lines cleaned_lines = [] for line in lines: if "-->" in line or not line.strip(): continue line = re.sub(r"<\d{2}:\d{2}:\d{2}\.\d{3}>", "", line) line = re.sub(r"", "", line) if line.strip(): cleaned_lines.append(line.strip()) seen = set() unique_lines = [x for x in cleaned_lines if not (x in seen or seen.add(x))] return "\n".join(unique_lines) def process_file(input_path: Path, output_path: Path | None = None, quiet: bool = False) -> str: """Process a single VTT file and output cleaned text.""" try: with open(input_path, "r", encoding="utf-8") as f: content = f.read() cleaned_text = clean_vtt_text(content) if output_path: with open(output_path, "w", encoding="utf-8") as f: f.write(cleaned_text) if not quiet: # Changed from args.quiet to quiet print(f"Processed {input_path} -> {output_path}") return cleaned_text except Exception as e: print(f"Error processing {input_path}: {str(e)}", file=sys.stderr) return "" def save_transcript(text: str, url: str, prompt: str) -> Path: """Save transcript with metadata to ~/.yts/transcripts/.""" transcript_dir = ensure_transcript_dir() filename = sanitize_filename(url) + ".txt" filepath = transcript_dir / filename metadata = f"""URL: {url} Script Version: {__version__} Timestamp: {datetime.now().isoformat()} Claude Prompt: {prompt} --- """ with open(filepath, "w", encoding="utf-8") as f: f.write(metadata + text) return filepath def cleanup_files(vtt_path: Optional[Path]): """Remove downloaded files after processing.""" try: if vtt_path and vtt_path.exists(): vtt_path.unlink() # Also cleanup any other VTT files in current directory current_dir = Path(".") for vtt_file in current_dir.glob("*.vtt"): vtt_file.unlink() except Exception as e: print(f"Warning: Could not clean up temporary files: {str(e)}", file=sys.stderr) def get_summary_from_claude(text: str, prompt: str = "Please summarize this transcript in 500 words or less") -> str: """Send text to Claude API for summarization.""" try: api_key = os.environ.get("ANTHROPIC_API_KEY") if not api_key: raise ValueError("ANTHROPIC_API_KEY environment variable not set") client = anthropic.Anthropic() message = client.messages.create( model="claude-3-sonnet-20240229", max_tokens=1024, temperature=0, system="You are a helpful assistant that summarizes transcripts accurately and concisely.", messages=[{"role": "user", "content": f"{prompt}:\n\n{text}"}], ) return message.content[0].text except Exception as e: print(f"Error getting summary from Claude: {str(e)}", file=sys.stderr) return "" def main(): parser = argparse.ArgumentParser(description="Download YouTube subtitles and get Claude summary") parser.add_argument("url", help="YouTube video URL", type=str) parser.add_argument("-o", "--output", help="Output file for summary") parser.add_argument("-q", "--quiet", action="store_true", help="Suppress status messages") parser.add_argument("--keep-files", action="store_true", help="Don't delete downloaded VTT files") parser.add_argument("--transcript", action="store_true", help="Include full transcript in output") parser.add_argument( "--prompt", default="Please summarize this transcript in 500 words or less", help="Custom prompt for Claude" ) parser.add_argument("-y", "--yes", action="store_true", help="Skip cost confirmation") args = parser.parse_args() # Set up signal handler for clean exit def signal_handler(sig, frame): print("\nCleaning up and exiting...") cleanup_files(None) # Clean any VTT files print(f"Debug: Script terminated by signal at {datetime.now().isoformat()}") sys.exit(0) signal.signal(signal.SIGINT, signal_handler) # Download subtitles if not args.quiet: print("Downloading subtitles...") success, vtt_path = download_subtitles(args.url, args.quiet) if not success: cleanup_files(None) sys.exit(1) # Process the VTT file cleaned_text = process_file(vtt_path, None, args.quiet) # Save transcript transcript_path = save_transcript(cleaned_text, args.url, args.prompt) print(f"\nTranscript saved to: {transcript_path}") # Estimate and display cost estimated_cost = estimate_api_cost(cleaned_text) print(f"\nEstimated API cost: ${estimated_cost:.4f}") if not args.yes: try: # Check if running in a terminal if sys.stdin.isatty(): import tty import termios # Save the terminal settings fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: # Set the terminal to raw mode tty.setraw(sys.stdin.fileno()) sys.stdout.write("\nDo you want to proceed with getting the summary? (y/N): ") sys.stdout.flush() # Read a single character char = sys.stdin.read(1) # Print a newline since we're in raw mode sys.stdout.write("\n") sys.stdout.flush() finally: # Restore terminal settings termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) else: # If not in a terminal, use regular input sys.stdout.write("\nDo you want to proceed with getting the summary? (y/N): ") sys.stdout.flush() char = input().strip() if not char or char.lower() != "y": print("Operation cancelled by user.") cleanup_files(vtt_path) sys.exit(0) except (EOFError, KeyboardInterrupt, termios.error): print("\nOperation cancelled by user.") cleanup_files(vtt_path) sys.exit(0) # Get summary from Claude if not args.quiet: print("\nGetting summary from Claude...") summary = get_summary_from_claude(cleaned_text, args.prompt) # Prepare output output = "" if args.transcript: output += "=== Full Transcript ===\n\n" output += cleaned_text output += "\n\n=== Summary ===\n\n" output += summary # Output results output_path = args.output if output_path: with open(output_path, "w", encoding="utf-8") as f: f.write(output) print(f"\nSummary saved to: {output_path}") else: print("\n=== Summary ===") print(output) # Cleanup downloaded files unless --keep-files is specified if not args.keep_files: cleanup_files(vtt_path) print(f"\nDebug: Script completed successfully at {datetime.now().isoformat()}") sys.exit(0) if __name__ == "__main__": main()