Files
yts/summarize_yt/cli.py

312 lines
10 KiB
Python

# Standard library imports remain the same
import argparse
import os
import re
import signal
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from typing import Optional, Tuple
# Third-party packages
import anthropic
# Local modules
from . import __version__
# Add cost estimation constants
CLAUDE_COST_PER_1K_INPUT = 0.015 # Cost per 1K tokens for input
CLAUDE_COST_PER_1K_OUTPUT = 0.075 # Cost per 1K tokens for output
ESTIMATED_TOKENS_PER_CHAR = 0.25 # Rough estimate of tokens per character
def estimate_api_cost(text: str) -> float:
"""Estimate the cost of sending text to Claude API."""
estimated_input_tokens = len(text) * ESTIMATED_TOKENS_PER_CHAR
estimated_output_tokens = 1024 # max_tokens setting
input_cost = (estimated_input_tokens / 1000) * CLAUDE_COST_PER_1K_INPUT
output_cost = (estimated_output_tokens / 1000) * CLAUDE_COST_PER_1K_OUTPUT
return input_cost + output_cost
def sanitize_filename(url: str) -> str:
"""Convert URL to safe filename, keeping video ID."""
# Extract video ID if it's a YouTube URL
video_id = None
if "youtube.com" in url or "youtu.be" in url:
if "v=" in url:
video_id = url.split("v=")[1].split("&")[0]
else:
video_id = url.split("/")[-1].split("?")[0]
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
return f"{timestamp}_{video_id if video_id else 'video'}"
def ensure_transcript_dir() -> Path:
"""Create and return path to transcript directory."""
transcript_dir = Path.home() / ".yts" / "transcripts"
transcript_dir.mkdir(parents=True, exist_ok=True)
return transcript_dir
def download_subtitles(url: str, quiet: bool = False) -> Tuple[bool, Optional[Path]]:
"""Download subtitles from YouTube using yt-dlp."""
try:
cmd = ["yt-dlp", "--skip-download", "--write-auto-sub", "--sub-lang", "en"]
if quiet:
cmd.append("--quiet")
cmd.append(url)
print(f"Debug: Running command: {cmd}") # Debug line
process = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True # This might help with encoding issues
)
try:
stdout, stderr = process.communicate(timeout=30)
print(f"Debug: stdout: {stdout}") # Debug line
print(f"Debug: stderr: {stderr}") # Debug line
if process.returncode != 0:
print(f"Error downloading subtitles: {stderr}", file=sys.stderr)
return False, None
except subprocess.TimeoutExpired:
process.kill()
print("Download timed out after 30 seconds", file=sys.stderr)
return False, None
# Find the downloaded VTT file
current_dir = Path(".")
vtt_files = list(current_dir.glob("*.en.vtt"))
print(f"Debug: Found VTT files: {vtt_files}") # Debug line
if not vtt_files:
print("No VTT file found after download", file=sys.stderr)
return False, None
return True, vtt_files[0]
except Exception as e:
print(f"Error during subtitle download: {str(e)}", file=sys.stderr)
print(f"Debug: Full exception info: {type(e).__name__}: {str(e)}") # Debug line
import traceback
traceback.print_exc() # This will print the full traceback
return False, None
def clean_vtt_text(text: str) -> str:
"""Clean WebVTT text by removing timestamps, formatting tags, and metadata."""
lines = text.split("\n")
# Remove header lines
while lines and (
lines[0].startswith("WEBVTT")
or lines[0].startswith("Kind:")
or lines[0].startswith("Language:")
or not lines[0].strip()
):
lines.pop(0)
# Process remaining lines
cleaned_lines = []
for line in lines:
if "-->" in line or not line.strip():
continue
line = re.sub(r"<\d{2}:\d{2}:\d{2}\.\d{3}>", "", line)
line = re.sub(r"</?c>", "", line)
if line.strip():
cleaned_lines.append(line.strip())
seen = set()
unique_lines = [x for x in cleaned_lines if not (x in seen or seen.add(x))]
return "\n".join(unique_lines)
def process_file(input_path: Path, output_path: Path | None = None, quiet: bool = False) -> str:
"""Process a single VTT file and output cleaned text."""
try:
with open(input_path, "r", encoding="utf-8") as f:
content = f.read()
cleaned_text = clean_vtt_text(content)
if output_path:
with open(output_path, "w", encoding="utf-8") as f:
f.write(cleaned_text)
if not quiet: # Changed from args.quiet to quiet
print(f"Processed {input_path} -> {output_path}")
return cleaned_text
except Exception as e:
print(f"Error processing {input_path}: {str(e)}", file=sys.stderr)
return ""
def save_transcript(text: str, url: str, prompt: str) -> Path:
"""Save transcript with metadata to ~/.yts/transcripts/."""
transcript_dir = ensure_transcript_dir()
filename = sanitize_filename(url) + ".txt"
filepath = transcript_dir / filename
metadata = f"""URL: {url}
Script Version: {__version__}
Timestamp: {datetime.now().isoformat()}
Claude Prompt: {prompt}
---
"""
with open(filepath, "w", encoding="utf-8") as f:
f.write(metadata + text)
return filepath
def cleanup_files(vtt_path: Optional[Path]):
"""Remove downloaded files after processing."""
try:
if vtt_path and vtt_path.exists():
vtt_path.unlink()
# Also cleanup any other VTT files in current directory
current_dir = Path(".")
for vtt_file in current_dir.glob("*.vtt"):
vtt_file.unlink()
except Exception as e:
print(f"Warning: Could not clean up temporary files: {str(e)}", file=sys.stderr)
def get_summary_from_claude(text: str, prompt: str = "Please summarize this transcript in 500 words or less") -> str:
"""Send text to Claude API for summarization."""
try:
api_key = os.environ.get("ANTHROPIC_API_KEY")
if not api_key:
raise ValueError("ANTHROPIC_API_KEY environment variable not set")
client = anthropic.Anthropic()
message = client.messages.create(
model="claude-3-sonnet-20240229",
max_tokens=1024,
temperature=0,
system="You are a helpful assistant that summarizes transcripts accurately and concisely.",
messages=[{"role": "user", "content": f"{prompt}:\n\n{text}"}],
)
return message.content[0].text
except Exception as e:
print(f"Error getting summary from Claude: {str(e)}", file=sys.stderr)
return ""
def main():
parser = argparse.ArgumentParser(description="Download YouTube subtitles and get Claude summary")
parser.add_argument("url", help="YouTube video URL", type=str)
parser.add_argument("-o", "--output", help="Output file for summary")
parser.add_argument("-q", "--quiet", action="store_true", help="Suppress status messages")
parser.add_argument("--keep-files", action="store_true", help="Don't delete downloaded VTT files")
parser.add_argument("--transcript", action="store_true", help="Include full transcript in output")
parser.add_argument(
"--prompt", default="Please summarize this transcript in 500 words or less", help="Custom prompt for Claude"
)
parser.add_argument("-y", "--yes", action="store_true", help="Skip cost confirmation")
args = parser.parse_args()
# Set up signal handler for clean exit
def signal_handler(sig, frame):
print("\nCleaning up and exiting...")
cleanup_files(None) # Clean any VTT files
print(f"Debug: Script terminated by signal at {datetime.now().isoformat()}")
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
# Download subtitles
if not args.quiet:
print("Downloading subtitles...")
success, vtt_path = download_subtitles(args.url, args.quiet)
if not success:
cleanup_files(None)
sys.exit(1)
# Process the VTT file
cleaned_text = process_file(vtt_path, None, args.quiet)
# Save transcript
transcript_path = save_transcript(cleaned_text, args.url, args.prompt)
print(f"\nTranscript saved to: {transcript_path}")
# Estimate and display cost
estimated_cost = estimate_api_cost(cleaned_text)
print(f"\nEstimated API cost: ${estimated_cost:.4f}")
if not args.yes:
try:
import tty
import termios
# Save the terminal settings
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
# Set the terminal to raw mode
tty.setraw(sys.stdin.fileno())
sys.stdout.write("\nDo you want to proceed with getting the summary? (y/N): ")
sys.stdout.flush()
# Read a single character
char = sys.stdin.read(1)
# Print a newline since we're in raw mode
sys.stdout.write("\n")
sys.stdout.flush()
finally:
# Restore terminal settings
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
if char.lower() != "y":
print("Operation cancelled by user.")
cleanup_files(vtt_path)
sys.exit(0)
except (EOFError, KeyboardInterrupt, termios.error):
print("\nOperation cancelled by user.")
cleanup_files(vtt_path)
sys.exit(0)
# Get summary from Claude
if not args.quiet:
print("\nGetting summary from Claude...")
summary = get_summary_from_claude(cleaned_text, args.prompt)
# Prepare output
output = ""
if args.transcript:
output += "=== Full Transcript ===\n\n"
output += cleaned_text
output += "\n\n=== Summary ===\n\n"
output += summary
# Output results
output_path = args.output
if output_path:
with open(output_path, "w", encoding="utf-8") as f:
f.write(output)
print(f"\nSummary saved to: {output_path}")
else:
print("\n=== Summary ===")
print(output)
# Cleanup downloaded files unless --keep-files is specified
if not args.keep_files:
cleanup_files(vtt_path)
print(f"\nDebug: Script completed successfully at {datetime.now().isoformat()}")
sys.exit(0)
if __name__ == "__main__":
main()