# Standard library imports remain the same import argparse import os import re import sys from datetime import datetime from yt_dlp import YoutubeDL from pathlib import Path from typing import Optional, Tuple # Third-party packages import anthropic # Local modules from . import __version__ # Add cost estimation constants CLAUDE_COST_PER_1K_INPUT = 0.015 # Cost per 1K tokens for input CLAUDE_COST_PER_1K_OUTPUT = 0.075 # Cost per 1K tokens for output ESTIMATED_TOKENS_PER_CHAR = 0.25 # Rough estimate of tokens per character def estimate_api_cost(text: str) -> float: """Estimate the cost of sending text to Claude API.""" estimated_input_tokens = len(text) * ESTIMATED_TOKENS_PER_CHAR estimated_output_tokens = 1024 # max_tokens setting input_cost = (estimated_input_tokens / 1000) * CLAUDE_COST_PER_1K_INPUT output_cost = (estimated_output_tokens / 1000) * CLAUDE_COST_PER_1K_OUTPUT return input_cost + output_cost def sanitize_filename(url: str) -> str: """Convert URL to safe filename, keeping video ID.""" # Extract video ID if it's a YouTube URL video_id = None if "youtube.com" in url or "youtu.be" in url: if "v=" in url: video_id = url.split("v=")[1].split("&")[0] else: video_id = url.split("/")[-1].split("?")[0] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") return f"{timestamp}_{video_id if video_id else 'video'}" def ensure_yts_dirs() -> Tuple[Path, Path]: """Create and return paths to transcript and summary directories.""" base_dir = Path.home() / ".yts" transcript_dir = base_dir / "transcripts" summary_dir = base_dir / "summaries" transcript_dir.mkdir(parents=True, exist_ok=True) summary_dir.mkdir(parents=True, exist_ok=True) return transcript_dir, summary_dir def download_subtitles(url: str, quiet: bool = False) -> Tuple[bool, Optional[Path], int]: """Download subtitles from YouTube using yt-dlp and return success, path, and duration in minutes.""" try: ydl_opts = { 'skip_download': True, 'writeautomaticsub': True, 'subtitleslangs': ['en'], 'quiet': quiet, 'no_warnings': quiet, } # First get video duration info_opts = dict(ydl_opts) info_opts['extract_flat'] = True with YoutubeDL(info_opts) as ydl: info = ydl.extract_info(url, download=False) duration_mins = int(info.get('duration', 0) / 60) if not quiet: print(f"Debug: Downloading subtitles for {url}") with YoutubeDL(ydl_opts) as ydl: ydl.download([url]) # Find the downloaded VTT file current_dir = Path(".") vtt_files = list(current_dir.glob("*.en.vtt")) if not quiet: print(f"Debug: Found VTT files: {vtt_files}") if not vtt_files: print("No VTT file found after download", file=sys.stderr) return False, None return True, vtt_files[0], duration_mins except Exception as e: print(f"Error during subtitle download: {str(e)}", file=sys.stderr) if not quiet: print(f"Debug: Full exception info: {type(e).__name__}: {str(e)}") import traceback traceback.print_exc() return False, None def clean_vtt_text(text: str) -> str: """Clean WebVTT text by removing timestamps, formatting tags, and metadata.""" lines = text.split("\n") # Remove header lines while lines and ( lines[0].startswith("WEBVTT") or lines[0].startswith("Kind:") or lines[0].startswith("Language:") or not lines[0].strip() ): lines.pop(0) # Process remaining lines cleaned_lines = [] for line in lines: if "-->" in line or not line.strip(): continue line = re.sub(r"<\d{2}:\d{2}:\d{2}\.\d{3}>", "", line) line = re.sub(r"", "", line) if line.strip(): cleaned_lines.append(line.strip()) seen = set() unique_lines = [x for x in cleaned_lines if not (x in seen or seen.add(x))] return "\n".join(unique_lines) def process_file(input_path: Path, output_path: Path | None = None, quiet: bool = False) -> str: """Process a single VTT file and output cleaned text.""" try: with open(input_path, "r", encoding="utf-8") as f: content = f.read() cleaned_text = clean_vtt_text(content) if output_path: with open(output_path, "w", encoding="utf-8") as f: f.write(cleaned_text) if not quiet: # Changed from args.quiet to quiet print(f"Processed {input_path} -> {output_path}") return cleaned_text except Exception as e: print(f"Error processing {input_path}: {str(e)}", file=sys.stderr) return "" def save_transcript(text: str, url: str, prompt: str) -> Path: """Save transcript with metadata to ~/.yts/transcripts/.""" transcript_dir, _ = ensure_yts_dirs() filename = sanitize_filename(url) + ".txt" filepath = transcript_dir / filename metadata = f"""URL: {url} Script Version: {__version__} Timestamp: {datetime.now().isoformat()} Claude Prompt: {prompt} --- """ with open(filepath, "w", encoding="utf-8") as f: f.write(metadata + text) return filepath def cleanup_files(vtt_path: Optional[Path]): """Remove downloaded files after processing.""" try: if vtt_path and vtt_path.exists(): vtt_path.unlink() # Also cleanup any other VTT files in current directory current_dir = Path(".") for vtt_file in current_dir.glob("*.vtt"): vtt_file.unlink() except Exception as e: print(f"Warning: Could not clean up temporary files: {str(e)}", file=sys.stderr) def get_summary_from_claude(text: str, duration_mins: int, prompt: str = None) -> str: """Send text to Claude API for summarization.""" try: api_key = os.environ.get("ANTHROPIC_API_KEY") if not api_key: raise ValueError("ANTHROPIC_API_KEY environment variable not set") client = anthropic.Anthropic() # Calculate target word count based on duration target_words = max(500, (duration_mins // 10) * 500) if prompt is None: prompt = f"Please summarize this transcript in approximately {target_words} words" message = client.messages.create( model="claude-3-sonnet-20240229", max_tokens=2048, # Increased for longer summaries temperature=0, system="You are a helpful assistant that summarizes transcripts accurately and concisely.", messages=[{"role": "user", "content": f"{prompt}:\n\n{text}"}], ) return message.content[0].text except Exception as e: print(f"Error getting summary from Claude: {str(e)}", file=sys.stderr) return "" def main(): parser = argparse.ArgumentParser(description="Download YouTube subtitles and get Claude summary") parser.add_argument("url", help="YouTube video URL", type=str) parser.add_argument("-o", "--output", help="Output file for summary") parser.add_argument("-q", "--quiet", action="store_true", help="Suppress status messages") parser.add_argument("--keep-files", action="store_true", help="Don't delete downloaded VTT files") parser.add_argument("--transcript", action="store_true", help="Include full transcript in output") parser.add_argument( "--prompt", help="Custom prompt for Claude (default: auto-calculated based on video length)" ) parser.add_argument("-y", "--yes", action="store_true", help="Skip cost confirmation") args = parser.parse_args() # Download subtitles if not args.quiet: print("Downloading subtitles...") success, vtt_path, duration_mins = download_subtitles(args.url, args.quiet) if not success: cleanup_files(None) sys.exit(1) # Process the VTT file cleaned_text = process_file(vtt_path, None, args.quiet) # Save transcript transcript_path = save_transcript(cleaned_text, args.url, args.prompt) print(f"\nTranscript saved to: {transcript_path}") # Estimate and display cost estimated_cost = estimate_api_cost(cleaned_text) print(f"\nEstimated API cost: ${estimated_cost:.4f}") if not args.yes: try: response = input("\nDo you want to proceed with getting the summary? (y/N): ").strip().lower() if response != 'y': print("Operation cancelled by user.") cleanup_files(vtt_path) sys.exit(0) except (EOFError, KeyboardInterrupt): print("\nOperation cancelled by user.") cleanup_files(vtt_path) sys.exit(0) # Get summary from Claude if not args.quiet: print("\nGetting summary from Claude...") summary = get_summary_from_claude(cleaned_text, duration_mins, args.prompt) # Prepare output output = "" if args.transcript: output += "=== Full Transcript ===\n\n" output += cleaned_text output += "\n\n=== Summary ===\n\n" output += summary # Save summary _, summary_dir = ensure_yts_dirs() summary_filename = sanitize_filename(args.url) + "_summary.txt" summary_path = summary_dir / summary_filename with open(summary_path, "w", encoding="utf-8") as f: f.write(output) print(f"\nSummary saved to: {summary_path}") # If output path specified, also save there if args.output: with open(args.output, "w", encoding="utf-8") as f: f.write(output) print(f"Summary also saved to: {args.output}") # Cleanup downloaded files unless --keep-files is specified if not args.keep_files: cleanup_files(vtt_path) print(f"\nDebug: Script completed successfully at {datetime.now().isoformat()}") sys.exit(0) if __name__ == "__main__": main()