304 lines
9.7 KiB
Python
304 lines
9.7 KiB
Python
import argparse
|
|
import os
|
|
from pathlib import Path
|
|
import re
|
|
import signal
|
|
import sys
|
|
from datetime import datetime
|
|
|
|
import anthropic
|
|
from yt_dlp import YoutubeDL
|
|
|
|
from . import __version__
|
|
|
|
# Add cost estimation constants
|
|
CLAUDE_COST_PER_1K_INPUT = 0.80 / 1000
|
|
CLAUDE_COST_PER_1K_OUTPUT = 4 / 1000
|
|
ESTIMATED_TOKENS_PER_CHAR = 0.25
|
|
|
|
|
|
def setup_terminal_control():
|
|
"""Set up terminal control at program start."""
|
|
try:
|
|
# Put process in its own process group and take control of terminal
|
|
os.setpgrp()
|
|
|
|
# Ignore terminal control signals
|
|
signal.signal(signal.SIGTTOU, signal.SIG_IGN)
|
|
signal.signal(signal.SIGTTIN, signal.SIG_IGN)
|
|
signal.signal(signal.SIGTSTP, signal.SIG_IGN)
|
|
|
|
# Take control of terminal if we're running in one
|
|
if sys.stdin.isatty():
|
|
import termios
|
|
|
|
termios.tcsetpgrp(sys.stdin.fileno(), os.getpgrp()) # type: ignore
|
|
except Exception:
|
|
# If we can't get terminal control, just continue
|
|
pass
|
|
|
|
|
|
def estimate_api_cost(text: str, target_words: float) -> float:
|
|
"""Estimate the cost of sending text to Claude API."""
|
|
estimated_input_tokens = len(text) * ESTIMATED_TOKENS_PER_CHAR
|
|
estimated_output_tokens = target_words
|
|
|
|
input_cost = (estimated_input_tokens / 1000) * CLAUDE_COST_PER_1K_INPUT
|
|
output_cost = (estimated_output_tokens / 1000) * CLAUDE_COST_PER_1K_OUTPUT
|
|
|
|
return input_cost + output_cost
|
|
|
|
|
|
def sanitize_filename(url: str) -> str:
|
|
"""Convert URL to safe filename, keeping video ID."""
|
|
# Clean URL and extract video ID
|
|
clean_url = url.split("&")[0] # Remove everything after first &
|
|
video_id = None
|
|
if "youtube.com" in clean_url or "youtu.be" in clean_url:
|
|
if "v=" in clean_url:
|
|
video_id = clean_url.split("v=")[1]
|
|
else:
|
|
video_id = clean_url.split("/")[-1].split("?")[0]
|
|
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
return f"{timestamp}_{video_id if video_id else 'video'}"
|
|
|
|
|
|
def ensure_yts_dirs() -> tuple[Path, Path]:
|
|
"""Create and return paths to transcript and summary directories."""
|
|
base_dir = Path.home() / ".yts"
|
|
transcript_dir = base_dir / "transcripts"
|
|
summary_dir = base_dir / "summaries"
|
|
transcript_dir.mkdir(parents=True, exist_ok=True)
|
|
summary_dir.mkdir(parents=True, exist_ok=True)
|
|
return transcript_dir, summary_dir
|
|
|
|
|
|
def download_subtitles(url: str, quiet: bool = False) -> tuple[bool, Path | None, int, str]:
|
|
"""Download subtitles from YouTube using yt-dlp and return success, path, duration in minutes, and title."""
|
|
try:
|
|
ydl_opts = {
|
|
"skip_download": True,
|
|
"writeautomaticsub": True,
|
|
"subtitleslangs": ["en"],
|
|
"quiet": quiet,
|
|
"no_warnings": quiet,
|
|
}
|
|
|
|
# First get video info
|
|
info_opts = dict(ydl_opts)
|
|
info_opts["extract_flat"] = True
|
|
with YoutubeDL(info_opts) as ydl:
|
|
info = ydl.extract_info(url, download=False)
|
|
duration_mins = int(info.get("duration", 0) / 60) # type: ignore
|
|
title = info.get("title", "Unknown Title") # type: ignore
|
|
|
|
if not quiet:
|
|
print(f"Debug: Downloading subtitles for {url}")
|
|
|
|
with YoutubeDL(ydl_opts) as ydl:
|
|
ydl.download([url])
|
|
|
|
# Find the downloaded VTT file
|
|
current_dir = Path(".")
|
|
vtt_files = list(current_dir.glob("*.en.vtt"))
|
|
|
|
if not quiet:
|
|
print(f"Debug: Found VTT files: {vtt_files}")
|
|
|
|
if not vtt_files:
|
|
print("No VTT file found after download", file=sys.stderr)
|
|
return False, None, 0, "Unknown Title"
|
|
|
|
return True, vtt_files[0], duration_mins, title
|
|
|
|
except Exception as e:
|
|
print(f"Error during subtitle download: {str(e)}", file=sys.stderr)
|
|
if not quiet:
|
|
print(f"Debug: Full exception info: {type(e).__name__}: {str(e)}")
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
return False, None, 0, "Unknown Title"
|
|
|
|
|
|
def clean_vtt_text(text: str) -> str:
|
|
"""Clean WebVTT text by removing timestamps, formatting tags, and metadata."""
|
|
lines = text.split("\n")
|
|
|
|
# Remove header lines
|
|
while lines and (
|
|
lines[0].startswith("WEBVTT")
|
|
or lines[0].startswith("Kind:")
|
|
or lines[0].startswith("Language:")
|
|
or not lines[0].strip()
|
|
):
|
|
lines.pop(0)
|
|
|
|
# Process remaining lines
|
|
cleaned_lines = []
|
|
for line in lines:
|
|
if "-->" in line or not line.strip():
|
|
continue
|
|
line = re.sub(r"<\d{2}:\d{2}:\d{2}\.\d{3}>", "", line)
|
|
line = re.sub(r"</?c>", "", line)
|
|
if line.strip():
|
|
cleaned_lines.append(line.strip())
|
|
|
|
seen = set()
|
|
unique_lines = [x for x in cleaned_lines if not (x in seen or seen.add(x))]
|
|
return "\n".join(unique_lines)
|
|
|
|
|
|
def process_file(input_path: Path, output_path: Path | None = None, quiet: bool = False) -> str:
|
|
"""Process a single VTT file and output cleaned text."""
|
|
try:
|
|
with open(input_path, "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
cleaned_text = clean_vtt_text(content)
|
|
|
|
if output_path:
|
|
with open(output_path, "w", encoding="utf-8") as f:
|
|
f.write(cleaned_text)
|
|
if not quiet: # Changed from args.quiet to quiet
|
|
print(f"Processed {input_path} -> {output_path}")
|
|
return cleaned_text
|
|
|
|
except Exception as e:
|
|
print(f"Error processing {input_path}: {str(e)}", file=sys.stderr)
|
|
return ""
|
|
|
|
|
|
def save_transcript(text: str, url: str, title: str) -> Path:
|
|
"""Save transcript with metadata to ~/.yts/transcripts/."""
|
|
transcript_dir, _ = ensure_yts_dirs()
|
|
filename = sanitize_filename(url) + ".txt"
|
|
filepath = transcript_dir / filename
|
|
|
|
metadata = f"""Title: {title}
|
|
URL: {url}
|
|
Script Version: {__version__}
|
|
Timestamp: {datetime.now().isoformat()}
|
|
Type: Transcript
|
|
---
|
|
"""
|
|
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
f.write(metadata + text)
|
|
|
|
return filepath
|
|
|
|
|
|
def cleanup_files(vtt_path: Path | None):
|
|
"""Remove downloaded files after processing."""
|
|
try:
|
|
if vtt_path and vtt_path.exists():
|
|
vtt_path.unlink()
|
|
|
|
# Also cleanup any other VTT files in current directory
|
|
current_dir = Path(".")
|
|
for vtt_file in current_dir.glob("*.vtt"):
|
|
vtt_file.unlink()
|
|
|
|
except Exception as e:
|
|
print(f"Warning: Could not clean up temporary files: {str(e)}", file=sys.stderr)
|
|
|
|
|
|
def get_summary_from_claude(text: str, target_words: float) -> str:
|
|
"""Send text to Claude API for summarization."""
|
|
try:
|
|
api_key = os.environ.get("ANTHROPIC_API_KEY")
|
|
if not api_key:
|
|
raise ValueError("ANTHROPIC_API_KEY environment variable not set")
|
|
|
|
client = anthropic.Anthropic()
|
|
|
|
prompt = f"Please summarize this transcript in {target_words} or less."
|
|
|
|
message = client.messages.create(
|
|
model="claude-3-5-haiku-latest",
|
|
max_tokens=2048, # Increased for longer summaries
|
|
temperature=0,
|
|
system="You are a helpful assistant that summarizes transcripts accurately.",
|
|
messages=[{"role": "user", "content": f"{prompt}:\n\n{text}"}],
|
|
)
|
|
return message.content[0].text # type: ignore
|
|
|
|
except Exception as e:
|
|
print(f"Error getting summary from Claude: {str(e)}", file=sys.stderr)
|
|
return ""
|
|
|
|
|
|
def main():
|
|
setup_terminal_control()
|
|
parser = argparse.ArgumentParser(description="Download YouTube subtitles and get Claude summary")
|
|
parser.add_argument("url", help="YouTube video URL", type=str)
|
|
parser.add_argument("-q", "--quiet", action="store_true", help="Suppress status messages")
|
|
parser.add_argument("--keep-files", action="store_true", help="Don't delete downloaded VTT files")
|
|
parser.add_argument("--transcript", action="store_true", help="Include full transcript in output")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Download subtitles
|
|
if not args.quiet:
|
|
print("Downloading subtitles...")
|
|
success, vtt_path, duration_mins, video_title = download_subtitles(args.url, args.quiet)
|
|
if not success:
|
|
cleanup_files(None)
|
|
sys.exit(1)
|
|
target_words = max(500, (duration_mins // 10) * 500)
|
|
|
|
# Process the VTT file
|
|
cleaned_text = process_file(vtt_path, None, args.quiet) # type: ignore
|
|
|
|
# Save transcript
|
|
transcript_path = save_transcript(cleaned_text, args.url, video_title)
|
|
print(f"\nTranscript saved to: {transcript_path}")
|
|
|
|
# Estimate and display cost
|
|
estimated_cost = estimate_api_cost(cleaned_text, target_words)
|
|
print(f"\nEstimated API cost: ${estimated_cost:.4f}")
|
|
|
|
# Get summary from Claude
|
|
if not args.quiet:
|
|
print("\nGetting summary from Claude...")
|
|
summary = get_summary_from_claude(cleaned_text, target_words)
|
|
|
|
output = ""
|
|
if args.transcript:
|
|
output += "=== Full Transcript ===\n\n"
|
|
output += cleaned_text
|
|
output += "\n\n=== Summary ===\n\n"
|
|
output += summary
|
|
|
|
# Save summary
|
|
_, summary_dir = ensure_yts_dirs()
|
|
summary_filename = sanitize_filename(args.url) + "_summary.txt"
|
|
summary_path = summary_dir / summary_filename
|
|
|
|
summary_metadata = f"""Title: {video_title}
|
|
URL: {args.url}
|
|
Script Version: {__version__}
|
|
Timestamp: {datetime.now().isoformat()}
|
|
Type: Summary
|
|
---
|
|
|
|
"""
|
|
with open(summary_path, "w", encoding="utf-8") as f:
|
|
f.write(summary_metadata + output)
|
|
print("\n=== Summary ===\n")
|
|
print(summary)
|
|
print()
|
|
print(f"\nSummary saved to: {summary_path}")
|
|
|
|
if not args.keep_files:
|
|
cleanup_files(vtt_path)
|
|
|
|
print(f"\nDebug: Script completed successfully at {datetime.now().isoformat()}")
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|