import argparse import os from pathlib import Path import re import signal import sys from datetime import datetime import anthropic from yt_dlp import YoutubeDL from . import __version__ # Add cost estimation constants CLAUDE_COST_PER_1K_INPUT = 0.80 / 1000 CLAUDE_COST_PER_1K_OUTPUT = 4 / 1000 ESTIMATED_TOKENS_PER_CHAR = 0.25 def setup_terminal_control(): """Set up terminal control at program start.""" try: # Put process in its own process group and take control of terminal os.setpgrp() # Ignore terminal control signals signal.signal(signal.SIGTTOU, signal.SIG_IGN) signal.signal(signal.SIGTTIN, signal.SIG_IGN) signal.signal(signal.SIGTSTP, signal.SIG_IGN) # Take control of terminal if we're running in one if sys.stdin.isatty(): import termios termios.tcsetpgrp(sys.stdin.fileno(), os.getpgrp()) # type: ignore except Exception: # If we can't get terminal control, just continue pass def estimate_api_cost(text: str, target_words: float) -> float: """Estimate the cost of sending text to Claude API.""" estimated_input_tokens = len(text) * ESTIMATED_TOKENS_PER_CHAR estimated_output_tokens = target_words input_cost = (estimated_input_tokens / 1000) * CLAUDE_COST_PER_1K_INPUT output_cost = (estimated_output_tokens / 1000) * CLAUDE_COST_PER_1K_OUTPUT return input_cost + output_cost def sanitize_filename(url: str) -> str: """Convert URL to safe filename, keeping video ID.""" # Clean URL and extract video ID clean_url = url.split("&")[0] # Remove everything after first & video_id = None if "youtube.com" in clean_url or "youtu.be" in clean_url: if "v=" in clean_url: video_id = clean_url.split("v=")[1] else: video_id = clean_url.split("/")[-1].split("?")[0] timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") return f"{timestamp}_{video_id if video_id else 'video'}" def ensure_yts_dirs() -> tuple[Path, Path]: """Create and return paths to transcript and summary directories.""" base_dir = Path.home() / ".yts" transcript_dir = base_dir / "transcripts" summary_dir = base_dir / "summaries" transcript_dir.mkdir(parents=True, exist_ok=True) summary_dir.mkdir(parents=True, exist_ok=True) return transcript_dir, summary_dir def download_subtitles(url: str, quiet: bool = False) -> tuple[bool, Path | None, int]: """Download subtitles from YouTube using yt-dlp and return success, path, and duration in minutes.""" try: ydl_opts = { "skip_download": True, "writeautomaticsub": True, "subtitleslangs": ["en"], "quiet": quiet, "no_warnings": quiet, } # First get video duration info_opts = dict(ydl_opts) info_opts["extract_flat"] = True with YoutubeDL(info_opts) as ydl: info = ydl.extract_info(url, download=False) duration_mins = int(info.get("duration", 0) / 60) # type: ignore if not quiet: print(f"Debug: Downloading subtitles for {url}") with YoutubeDL(ydl_opts) as ydl: ydl.download([url]) # Find the downloaded VTT file current_dir = Path(".") vtt_files = list(current_dir.glob("*.en.vtt")) if not quiet: print(f"Debug: Found VTT files: {vtt_files}") if not vtt_files: print("No VTT file found after download", file=sys.stderr) return False, None, 0 return True, vtt_files[0], duration_mins except Exception as e: print(f"Error during subtitle download: {str(e)}", file=sys.stderr) if not quiet: print(f"Debug: Full exception info: {type(e).__name__}: {str(e)}") import traceback traceback.print_exc() return False, None, 0 def clean_vtt_text(text: str) -> str: """Clean WebVTT text by removing timestamps, formatting tags, and metadata.""" lines = text.split("\n") # Remove header lines while lines and ( lines[0].startswith("WEBVTT") or lines[0].startswith("Kind:") or lines[0].startswith("Language:") or not lines[0].strip() ): lines.pop(0) # Process remaining lines cleaned_lines = [] for line in lines: if "-->" in line or not line.strip(): continue line = re.sub(r"<\d{2}:\d{2}:\d{2}\.\d{3}>", "", line) line = re.sub(r"", "", line) if line.strip(): cleaned_lines.append(line.strip()) seen = set() unique_lines = [x for x in cleaned_lines if not (x in seen or seen.add(x))] return "\n".join(unique_lines) def process_file(input_path: Path, output_path: Path | None = None, quiet: bool = False) -> str: """Process a single VTT file and output cleaned text.""" try: with open(input_path, "r", encoding="utf-8") as f: content = f.read() cleaned_text = clean_vtt_text(content) if output_path: with open(output_path, "w", encoding="utf-8") as f: f.write(cleaned_text) if not quiet: # Changed from args.quiet to quiet print(f"Processed {input_path} -> {output_path}") return cleaned_text except Exception as e: print(f"Error processing {input_path}: {str(e)}", file=sys.stderr) return "" def save_transcript(text: str, url: str) -> Path: """Save transcript with metadata to ~/.yts/transcripts/.""" transcript_dir, _ = ensure_yts_dirs() filename = sanitize_filename(url) + ".txt" filepath = transcript_dir / filename metadata = f"""URL: {url} Script Version: {__version__} Timestamp: {datetime.now().isoformat()} --- """ with open(filepath, "w", encoding="utf-8") as f: f.write(metadata + text) return filepath def cleanup_files(vtt_path: Path | None): """Remove downloaded files after processing.""" try: if vtt_path and vtt_path.exists(): vtt_path.unlink() # Also cleanup any other VTT files in current directory current_dir = Path(".") for vtt_file in current_dir.glob("*.vtt"): vtt_file.unlink() except Exception as e: print(f"Warning: Could not clean up temporary files: {str(e)}", file=sys.stderr) def get_summary_from_claude(text: str, target_words: float) -> str: """Send text to Claude API for summarization.""" try: api_key = os.environ.get("ANTHROPIC_API_KEY") if not api_key: raise ValueError("ANTHROPIC_API_KEY environment variable not set") client = anthropic.Anthropic() prompt = f"Please summarize this transcript in {target_words} or less." message = client.messages.create( model="claude-3-5-haiku-latest", max_tokens=2048, # Increased for longer summaries temperature=0, system="You are a helpful assistant that summarizes transcripts accurately.", messages=[{"role": "user", "content": f"{prompt}:\n\n{text}"}], ) return message.content[0].text # type: ignore except Exception as e: print(f"Error getting summary from Claude: {str(e)}", file=sys.stderr) return "" def main(): setup_terminal_control() parser = argparse.ArgumentParser(description="Download YouTube subtitles and get Claude summary") parser.add_argument("url", help="YouTube video URL", type=str) parser.add_argument("-q", "--quiet", action="store_true", help="Suppress status messages") parser.add_argument("--keep-files", action="store_true", help="Don't delete downloaded VTT files") parser.add_argument("--transcript", action="store_true", help="Include full transcript in output") args = parser.parse_args() # Download subtitles if not args.quiet: print("Downloading subtitles...") success, vtt_path, duration_mins = download_subtitles(args.url, args.quiet) if not success: cleanup_files(None) sys.exit(1) target_words = max(500, (duration_mins // 10) * 500) # Process the VTT file cleaned_text = process_file(vtt_path, None, args.quiet) # type: ignore # Save transcript transcript_path = save_transcript(cleaned_text, args.url) print(f"\nTranscript saved to: {transcript_path}") # Estimate and display cost estimated_cost = estimate_api_cost(cleaned_text, target_words) print(f"\nEstimated API cost: ${estimated_cost:.4f}") # Get summary from Claude if not args.quiet: print("\nGetting summary from Claude...") summary = get_summary_from_claude(cleaned_text, target_words) output = "" if args.transcript: output += "=== Full Transcript ===\n\n" output += cleaned_text output += "\n\n=== Summary ===\n\n" output += summary # Save summary _, summary_dir = ensure_yts_dirs() summary_filename = sanitize_filename(args.url) + "_summary.txt" summary_path = summary_dir / summary_filename with open(summary_path, "w", encoding="utf-8") as f: f.write(output) print("\n=== Summary ===\n") print(summary) print() print(f"\nSummary saved to: {summary_path}") if not args.keep_files: cleanup_files(vtt_path) print(f"\nDebug: Script completed successfully at {datetime.now().isoformat()}") sys.exit(0) if __name__ == "__main__": main()