Files
yts/summarize_yt/cli.py

276 lines
8.9 KiB
Python

# Standard library imports remain the same
import argparse
import os
import re
import sys
from datetime import datetime
from yt_dlp import YoutubeDL
from pathlib import Path
from typing import Optional, Tuple
# Third-party packages
import anthropic
# Local modules
from . import __version__
# Add cost estimation constants
CLAUDE_COST_PER_1K_INPUT = 0.015 # Cost per 1K tokens for input
CLAUDE_COST_PER_1K_OUTPUT = 0.075 # Cost per 1K tokens for output
ESTIMATED_TOKENS_PER_CHAR = 0.25 # Rough estimate of tokens per character
def estimate_api_cost(text: str) -> float:
"""Estimate the cost of sending text to Claude API."""
estimated_input_tokens = len(text) * ESTIMATED_TOKENS_PER_CHAR
estimated_output_tokens = 1024 # max_tokens setting
input_cost = (estimated_input_tokens / 1000) * CLAUDE_COST_PER_1K_INPUT
output_cost = (estimated_output_tokens / 1000) * CLAUDE_COST_PER_1K_OUTPUT
return input_cost + output_cost
def sanitize_filename(url: str) -> str:
"""Convert URL to safe filename, keeping video ID."""
# Extract video ID if it's a YouTube URL
video_id = None
if "youtube.com" in url or "youtu.be" in url:
if "v=" in url:
video_id = url.split("v=")[1].split("&")[0]
else:
video_id = url.split("/")[-1].split("?")[0]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"{timestamp}_{video_id if video_id else 'video'}"
def ensure_transcript_dir() -> Path:
"""Create and return path to transcript directory."""
transcript_dir = Path.home() / ".yts" / "transcripts"
transcript_dir.mkdir(parents=True, exist_ok=True)
return transcript_dir
def download_subtitles(url: str, quiet: bool = False) -> Tuple[bool, Optional[Path]]:
"""Download subtitles from YouTube using yt-dlp."""
try:
ydl_opts = {
'skip_download': True,
'writeautomaticsub': True,
'subtitleslangs': ['en'],
'quiet': quiet,
'no_warnings': quiet,
}
if not quiet:
print(f"Debug: Downloading subtitles for {url}")
with YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
# Find the downloaded VTT file
current_dir = Path(".")
vtt_files = list(current_dir.glob("*.en.vtt"))
if not quiet:
print(f"Debug: Found VTT files: {vtt_files}")
if not vtt_files:
print("No VTT file found after download", file=sys.stderr)
return False, None
return True, vtt_files[0]
except Exception as e:
print(f"Error during subtitle download: {str(e)}", file=sys.stderr)
if not quiet:
print(f"Debug: Full exception info: {type(e).__name__}: {str(e)}")
import traceback
traceback.print_exc()
return False, None
def clean_vtt_text(text: str) -> str:
"""Clean WebVTT text by removing timestamps, formatting tags, and metadata."""
lines = text.split("\n")
# Remove header lines
while lines and (
lines[0].startswith("WEBVTT")
or lines[0].startswith("Kind:")
or lines[0].startswith("Language:")
or not lines[0].strip()
):
lines.pop(0)
# Process remaining lines
cleaned_lines = []
for line in lines:
if "-->" in line or not line.strip():
continue
line = re.sub(r"<\d{2}:\d{2}:\d{2}\.\d{3}>", "", line)
line = re.sub(r"</?c>", "", line)
if line.strip():
cleaned_lines.append(line.strip())
seen = set()
unique_lines = [x for x in cleaned_lines if not (x in seen or seen.add(x))]
return "\n".join(unique_lines)
def process_file(input_path: Path, output_path: Path | None = None, quiet: bool = False) -> str:
"""Process a single VTT file and output cleaned text."""
try:
with open(input_path, "r", encoding="utf-8") as f:
content = f.read()
cleaned_text = clean_vtt_text(content)
if output_path:
with open(output_path, "w", encoding="utf-8") as f:
f.write(cleaned_text)
if not quiet: # Changed from args.quiet to quiet
print(f"Processed {input_path} -> {output_path}")
return cleaned_text
except Exception as e:
print(f"Error processing {input_path}: {str(e)}", file=sys.stderr)
return ""
def save_transcript(text: str, url: str, prompt: str) -> Path:
"""Save transcript with metadata to ~/.yts/transcripts/."""
transcript_dir = ensure_transcript_dir()
filename = sanitize_filename(url) + ".txt"
filepath = transcript_dir / filename
metadata = f"""URL: {url}
Script Version: {__version__}
Timestamp: {datetime.now().isoformat()}
Claude Prompt: {prompt}
---
"""
with open(filepath, "w", encoding="utf-8") as f:
f.write(metadata + text)
return filepath
def cleanup_files(vtt_path: Optional[Path]):
"""Remove downloaded files after processing."""
try:
if vtt_path and vtt_path.exists():
vtt_path.unlink()
# Also cleanup any other VTT files in current directory
current_dir = Path(".")
for vtt_file in current_dir.glob("*.vtt"):
vtt_file.unlink()
except Exception as e:
print(f"Warning: Could not clean up temporary files: {str(e)}", file=sys.stderr)
def get_summary_from_claude(text: str, prompt: str = "Please summarize this transcript in 500 words or less") -> str:
"""Send text to Claude API for summarization."""
try:
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY environment variable not set")
client = anthropic.Anthropic()
message = client.messages.create(
model="claude-3-sonnet-20240229",
max_tokens=1024,
temperature=0,
system="You are a helpful assistant that summarizes transcripts accurately and concisely.",
messages=[{"role": "user", "content": f"{prompt}:\n\n{text}"}],
)
return message.content[0].text
except Exception as e:
print(f"Error getting summary from Claude: {str(e)}", file=sys.stderr)
return ""
def main():
parser = argparse.ArgumentParser(description="Download YouTube subtitles and get Claude summary")
parser.add_argument("url", help="YouTube video URL", type=str)
parser.add_argument("-o", "--output", help="Output file for summary")
parser.add_argument("-q", "--quiet", action="store_true", help="Suppress status messages")
parser.add_argument("--keep-files", action="store_true", help="Don't delete downloaded VTT files")
parser.add_argument("--transcript", action="store_true", help="Include full transcript in output")
parser.add_argument(
"--prompt", default="Please summarize this transcript in 500 words or less", help="Custom prompt for Claude"
)
parser.add_argument("-y", "--yes", action="store_true", help="Skip cost confirmation")
args = parser.parse_args()
# Download subtitles
if not args.quiet:
print("Downloading subtitles...")
success, vtt_path = download_subtitles(args.url, args.quiet)
if not success:
cleanup_files(None)
sys.exit(1)
# Process the VTT file
cleaned_text = process_file(vtt_path, None, args.quiet)
# Save transcript
transcript_path = save_transcript(cleaned_text, args.url, args.prompt)
print(f"\nTranscript saved to: {transcript_path}")
# Estimate and display cost
estimated_cost = estimate_api_cost(cleaned_text)
print(f"\nEstimated API cost: ${estimated_cost:.4f}")
if not args.yes:
try:
response = input("\nDo you want to proceed with getting the summary? (y/N): ").strip().lower()
if response != 'y':
print("Operation cancelled by user.")
cleanup_files(vtt_path)
sys.exit(0)
except (EOFError, KeyboardInterrupt):
print("\nOperation cancelled by user.")
cleanup_files(vtt_path)
sys.exit(0)
# Get summary from Claude
if not args.quiet:
print("\nGetting summary from Claude...")
summary = get_summary_from_claude(cleaned_text, args.prompt)
# Prepare output
output = ""
if args.transcript:
output += "=== Full Transcript ===\n\n"
output += cleaned_text
output += "\n\n=== Summary ===\n\n"
output += summary
# Output results
output_path = args.output
if output_path:
with open(output_path, "w", encoding="utf-8") as f:
f.write(output)
print(f"\nSummary saved to: {output_path}")
else:
print("\n=== Summary ===")
print(output)
# Cleanup downloaded files unless --keep-files is specified
if not args.keep_files:
cleanup_files(vtt_path)
print(f"\nDebug: Script completed successfully at {datetime.now().isoformat()}")
sys.exit(0)
if __name__ == "__main__":
main()