#!/usr/bin/env python3 """TutorialDock - A video tutorial library manager with progress tracking.""" import hashlib import json import mimetypes import os import re import shutil import socket import subprocess import threading import time from pathlib import Path from typing import Any, Callable, Dict, List, Optional, Tuple from urllib.parse import urlparse from urllib.request import Request, urlopen from flask import Flask, Response, abort, request, send_file # ============================================================================= # Configuration and Constants # ============================================================================= APP_DIR = Path(__file__).resolve().parent STATE_DIR = APP_DIR / "state" STATE_DIR.mkdir(parents=True, exist_ok=True) WEBVIEW_PROFILE_DIR = STATE_DIR / "webview_profile" WEBVIEW_PROFILE_DIR.mkdir(parents=True, exist_ok=True) FONTS_DIR = STATE_DIR / "fonts" FONTS_DIR.mkdir(parents=True, exist_ok=True) FONTS_CSS_PATH = FONTS_DIR / "fonts.css" FONTS_META_PATH = FONTS_DIR / "fonts_meta.json" FA_DIR = STATE_DIR / "fontawesome" FA_DIR.mkdir(parents=True, exist_ok=True) FA_WEBFONTS_DIR = FA_DIR / "webfonts" FA_WEBFONTS_DIR.mkdir(parents=True, exist_ok=True) FA_CSS_PATH = FA_DIR / "fa.css" FA_META_PATH = FA_DIR / "fa_meta.json" SUBS_DIR = STATE_DIR / "subtitles" SUBS_DIR.mkdir(parents=True, exist_ok=True) RECENTS_PATH = STATE_DIR / "recent_folders.json" RECENTS_MAX = 50 # Register additional MIME types mimetypes.add_type("font/woff2", ".woff2") mimetypes.add_type("font/woff", ".woff") mimetypes.add_type("font/ttf", ".ttf") mimetypes.add_type("application/vnd.ms-fontobject", ".eot") mimetypes.add_type("image/svg+xml", ".svg") mimetypes.add_type("text/vtt", ".vtt") if os.name == "nt": os.environ["WEBVIEW2_USER_DATA_FOLDER"] = str(WEBVIEW_PROFILE_DIR) import webview # noqa: E402 # Must import after setting env vars # Supported file extensions VIDEO_EXTS: set[str] = { ".mp4", ".m4v", ".mov", ".webm", ".mkv", ".avi", ".mpg", ".mpeg", ".m2ts", ".mts", ".ogv" } SUB_EXTS: set[str] = {".vtt", ".srt"} BACKUP_COUNT: int = 8 HOST: str = "127.0.0.1" PREFS_PATH: Path = STATE_DIR / "prefs.json" app = Flask(__name__) # ============================================================================= # Utility Functions # ============================================================================= def _subprocess_no_window_kwargs() -> dict[str, Any]: """Return subprocess kwargs to hide console windows on Windows.""" if os.name != "nt": return {} si = subprocess.STARTUPINFO() si.dwFlags |= subprocess.STARTF_USESHOWWINDOW return {"startupinfo": si, "creationflags": subprocess.CREATE_NO_WINDOW} def clamp(v: float, a: float, b: float) -> float: """Clamp value v to the range [a, b].""" return max(a, min(b, v)) def atomic_write_json(path: Path, data: Dict[str, Any], backup_count: int = BACKUP_COUNT) -> None: """ Write JSON data to a file atomically with backup rotation. Creates rolling backups (.bak1 through .bakN) and a .lastgood copy for crash recovery. """ path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") payload = json.dumps(data, ensure_ascii=False, indent=2) if path.exists(): # Rotate existing backups for i in range(backup_count, 0, -1): src = path.with_suffix(path.suffix + f".bak{i}") dst = path.with_suffix(path.suffix + f".bak{i+1}") if src.exists(): try: if dst.exists(): dst.unlink() src.rename(dst) except OSError: pass # Move current to .bak1 bak1 = path.with_suffix(path.suffix + ".bak1") try: if bak1.exists(): bak1.unlink() path.rename(bak1) except OSError: pass # Write atomically via tmp file tmp.write_text(payload, encoding="utf-8") tmp.replace(path) # Keep a lastgood copy for recovery try: lastgood = path.with_suffix(path.suffix + ".lastgood") lastgood.write_text(payload, encoding="utf-8") except OSError: pass def load_json_with_fallbacks(path: Path) -> Optional[Dict[str, Any]]: """ Load JSON from path, falling back to backups if primary is corrupted. Tries: path -> .lastgood -> .bak1 -> .bak2 -> ... """ candidates = [path, path.with_suffix(path.suffix + ".lastgood")] for i in range(1, BACKUP_COUNT + 3): candidates.append(path.with_suffix(path.suffix + f".bak{i}")) for p in candidates: if p.exists(): try: return json.loads(p.read_text(encoding="utf-8")) except (json.JSONDecodeError, OSError): continue return None def is_within_root(root: Path, target: Path) -> bool: """Check if target path is within the root directory (prevents path traversal).""" try: root = root.resolve() target = target.resolve() return str(target).startswith(str(root) + os.sep) or target == root except OSError: return False def truthy(v: Any) -> bool: """Convert various types to boolean (handles string 'true', 'yes', '1', etc.).""" if isinstance(v, bool): return v if isinstance(v, (int, float)): return v != 0 if isinstance(v, str): return v.strip().lower() in {"1", "true", "yes", "y", "on"} return False def pick_free_port(host: str = HOST) -> int: """Find and return an available TCP port on the specified host.""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind((host, 0)) return s.getsockname()[1] def folder_display_name(path_str: str) -> str: """Get a display-friendly name for a folder path.""" try: p = Path(path_str) return p.name or str(p) except (ValueError, OSError): return path_str def _deduplicate_list(items: List[str]) -> List[str]: """Remove duplicates from a list while preserving order.""" seen: set[str] = set() result: List[str] = [] for item in items: if isinstance(item, str) and item.strip(): s = item.strip() if s not in seen: result.append(s) seen.add(s) return result def load_recents() -> List[str]: """Load the list of recently opened folders.""" data = load_json_with_fallbacks(RECENTS_PATH) if not isinstance(data, dict): return [] items = data.get("items", []) if not isinstance(items, list): return [] return _deduplicate_list(items)[:RECENTS_MAX] def save_recents(paths: List[str]) -> None: """Save the list of recently opened folders.""" cleaned = _deduplicate_list(paths)[:RECENTS_MAX] atomic_write_json(RECENTS_PATH, { "version": 2, "updated_at": int(time.time()), "items": cleaned }) def push_recent(path_str: str) -> None: """Add a folder to the top of the recent folders list.""" paths = load_recents() path_str = path_str.strip() paths = [p for p in paths if p != path_str] paths.insert(0, path_str) save_recents(paths) # Compiled regex patterns (compiled once for efficiency) CSS_URL_RE = re.compile(r"url\((https://[^)]+)\)\s*format\('([^']+)'\)") def _http_get_text(url: str, timeout: int = 20) -> str: """Fetch text content from a URL.""" req = Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urlopen(req, timeout=timeout) as r: return r.read().decode("utf-8", errors="replace") def _http_get_bytes(url: str, timeout: int = 30) -> bytes: """Fetch binary content from a URL.""" req = Request(url, headers={"User-Agent": "Mozilla/5.0"}) with urlopen(req, timeout=timeout) as r: return r.read() def _safe_filename_from_url(u: str) -> str: """Generate a safe local filename from a URL with a hash for uniqueness.""" p = urlparse(u) base = Path(p.path).name or "font.woff2" url_hash = hashlib.sha256(u.encode("utf-8")).hexdigest()[:10] if "." not in base: base += ".woff2" stem, suffix = Path(base).stem, Path(base).suffix return f"{stem}-{url_hash}{suffix}" def ensure_google_fonts_local() -> None: """Download and cache Google Fonts locally for offline use.""" wanted = { "Sora": "https://fonts.googleapis.com/css2?family=Sora:wght@500;600;700;800&display=swap", "Manrope": "https://fonts.googleapis.com/css2?family=Manrope:wght@400;500;600;700;800&display=swap", "IBM Plex Mono": "https://fonts.googleapis.com/css2?family=IBM+Plex+Mono:wght@400;500;600&display=swap", } meta = load_json_with_fallbacks(FONTS_META_PATH) or {} if (FONTS_CSS_PATH.exists() and isinstance(meta, dict) and meta.get("ok") is True and meta.get("version") == 7): return combined_css_parts: List[str] = [] downloaded: Dict[str, int] = {} ok = True errors: List[str] = [] for family, css_url in wanted.items(): try: css = _http_get_text(css_url) def replace_match(m: re.Match) -> str: url, fmt = m.group(1), m.group(2) if fmt.lower() != "woff2": return m.group(0) fn = _safe_filename_from_url(url) local_path = FONTS_DIR / fn if not local_path.exists(): data = _http_get_bytes(url) local_path.write_bytes(data) downloaded[fn] = len(data) return f"url(/fonts/{fn}) format('woff2')" css2 = CSS_URL_RE.sub(replace_match, css) combined_css_parts.append(css2.strip()) except Exception as e: ok = False errors.append(f"{family}: {e!s}") combined_css = "\n\n".join(combined_css_parts).strip() try: FONTS_CSS_PATH.write_text(combined_css, encoding="utf-8") except OSError as e: ok = False errors.append(f"write fonts.css: {e!s}") atomic_write_json(FONTS_META_PATH, { "version": 7, "ok": ok, "updated_at": int(time.time()), "downloaded_files": downloaded, "errors": errors }) FA_URL_RE = re.compile(r"url\(([^)]+)\)") def ensure_fontawesome_local() -> None: """Download and cache Font Awesome locally for offline use.""" meta = load_json_with_fallbacks(FA_META_PATH) or {} if (FA_CSS_PATH.exists() and isinstance(meta, dict) and meta.get("ok") is True and meta.get("version") == 3): return ok = True errors: List[str] = [] downloaded: Dict[str, int] = {} css_url = "https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.5.2/css/all.min.css" try: css = _http_get_text(css_url, timeout=25) except Exception as e: ok = False errors.append(f"download fa css: {e!s}") css = "" def _clean_url(u: str) -> str: """Normalize a URL from CSS to an absolute URL.""" u = u.strip().strip("'\"") if u.startswith("data:"): return u if u.startswith("//"): return "https:" + u if u.startswith(("http://", "https://")): return u return "https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.5.2/webfonts/" + u.lstrip("./").replace("../", "") def repl(m: re.Match) -> str: nonlocal ok raw = m.group(1) u = _clean_url(raw) if u.startswith("data:"): return m.group(0) fn = Path(urlparse(u).path).name if not fn: return m.group(0) local_path = FA_WEBFONTS_DIR / fn if not local_path.exists(): try: data = _http_get_bytes(u, timeout=30) local_path.write_bytes(data) downloaded[fn] = len(data) except Exception as e: ok = False errors.append(f"download {fn}: {e!s}") return m.group(0) return f"url(/fa/webfonts/{fn})" css2 = FA_URL_RE.sub(repl, css) try: FA_CSS_PATH.write_text(css2, encoding="utf-8") except OSError as e: ok = False errors.append(f"write fa.css: {e!s}") atomic_write_json(FA_META_PATH, { "version": 3, "ok": ok, "updated_at": int(time.time()), "downloaded_files": downloaded, "errors": errors, "source_css": css_url }) # ============================================================================= # File and String Processing # ============================================================================= _NUM_SPLIT_RE = re.compile(r"(\d+)") def natural_key(s: str) -> List[Any]: """ Generate a sort key for natural sorting (e.g., '2' before '10'). Splits string into numeric and non-numeric parts, converting numeric parts to integers for proper ordering. """ parts = _NUM_SPLIT_RE.split(s) return [int(p) if p.isdigit() else p.casefold() for p in parts] _LEADING_INDEX_RE = re.compile(r"^\s*(?:\(?\s*)?(?P\d+)(?:\s*[.\-_]\s*\d+)*(?:\s*[.)\]-]\s*|\s+)") # Words that should remain lowercase in title case (except at start) _SMALL_WORDS: set[str] = { "a", "an", "the", "and", "or", "but", "for", "nor", "as", "at", "by", "in", "of", "on", "per", "to", "vs", "via", "with", "into", "from" } def _smart_title_case(text: str) -> str: """Convert text to title case, keeping small words lowercase (except at start).""" words = re.split(r"(\s+)", text.strip()) out: List[str] = [] for i, w in enumerate(words): if w.isspace(): out.append(w) continue # Preserve words with digits or all-caps acronyms if any(ch.isdigit() for ch in w) or w.isupper(): out.append(w) continue lw = w.lower() # First word always capitalized, small words stay lowercase elsewhere if i != 0 and lw in _SMALL_WORDS: out.append(lw) else: out.append(lw[:1].upper() + lw[1:]) return "".join(out).strip() def pretty_title_from_filename(filename: str) -> str: """ Convert a filename to a human-readable title. Removes leading indices, underscores, and applies smart title case. Example: '01_introduction_to_python.mp4' -> 'Introduction to Python' """ base = Path(filename).stem # Replace underscores with spaces base = re.sub(r"[_]+", " ", base) base = re.sub(r"\s+", " ", base).strip() # Remove leading index numbers (e.g., "01.", "1)", "(2)") m = _LEADING_INDEX_RE.match(base) if m: base = base[m.end():].strip() # Remove leading punctuation base = re.sub(r"^\s*[-–—:.\)]\s*", "", base).strip() base = re.sub(r"\s+", " ", base).strip() # Fall back to original stem if nothing left if not base: base = Path(filename).stem return _smart_title_case(base) def _windows_where(exe_name: str) -> Optional[str]: """Find an executable using Windows 'where' command.""" try: out = subprocess.check_output( ["where", exe_name], stderr=subprocess.STDOUT, text=True, **_subprocess_no_window_kwargs() ) for line in out.splitlines(): line = line.strip() if line and Path(line).exists(): return line except (subprocess.SubprocessError, OSError): pass return None def find_ffprobe_ffmpeg() -> Tuple[Optional[str], Optional[str]]: """ Locate ffprobe and ffmpeg executables. Searches in PATH, then Windows 'where', then local app directory. """ ffprobe = shutil.which("ffprobe") ffmpeg = shutil.which("ffmpeg") # Try Windows 'where' command if os.name == "nt": if not ffprobe: ffprobe = _windows_where("ffprobe.exe") or _windows_where("ffprobe") if not ffmpeg: ffmpeg = _windows_where("ffmpeg.exe") or _windows_where("ffmpeg") # Check for local copies in app directory if not ffprobe: exe_name = "ffprobe.exe" if os.name == "nt" else "ffprobe" cand = APP_DIR / exe_name if cand.exists(): ffprobe = str(cand) if not ffmpeg: exe_name = "ffmpeg.exe" if os.name == "nt" else "ffmpeg" cand = APP_DIR / exe_name if cand.exists(): ffmpeg = str(cand) return ffprobe, ffmpeg def duration_seconds( path: Path, ffprobe_path: Optional[str], ffmpeg_path: Optional[str] ) -> Optional[float]: """Get video duration in seconds using ffprobe or ffmpeg.""" # Try ffprobe first (more reliable) if ffprobe_path: try: cmd = [ ffprobe_path, "-v", "error", "-show_entries", "format=duration", "-of", "default=nw=1:nk=1", str(path) ] out = subprocess.check_output( cmd, stderr=subprocess.STDOUT, text=True, timeout=20, **_subprocess_no_window_kwargs() ).strip() if out: d = float(out) if d > 0: return d except (subprocess.SubprocessError, ValueError, OSError): pass # Fall back to parsing ffmpeg stderr if ffmpeg_path: try: p = subprocess.run( [ffmpeg_path, "-hide_banner", "-i", str(path)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=20, **_subprocess_no_window_kwargs() ) err = p.stderr or "" marker = "Duration:" idx = err.find(marker) if idx != -1: s = err[idx + len(marker):].strip() timepart = s.split(",")[0].strip() hh, mm, ss = timepart.split(":") total = int(hh) * 3600 + int(mm) * 60 + float(ss) if total > 0: return float(total) except (subprocess.SubprocessError, ValueError, OSError): pass return None def ffprobe_video_metadata(path: Path, ffprobe_path: Optional[str]) -> Optional[Dict[str, Any]]: """Extract detailed video metadata using ffprobe, including subtitle tracks.""" if not ffprobe_path: return None try: cmd = [ ffprobe_path, "-v", "error", "-print_format", "json", "-show_streams", "-show_format", str(path) ] out = subprocess.check_output( cmd, stderr=subprocess.STDOUT, text=True, timeout=25, **_subprocess_no_window_kwargs() ) data = json.loads(out) streams = data.get("streams", []) fmt = data.get("format", {}) if not isinstance(streams, list): streams = [] meta: Dict[str, Any] = {} video_stream: Optional[Dict] = None audio_stream: Optional[Dict] = None subtitle_tracks: List[Dict[str, Any]] = [] # Find first video/audio streams and all subtitle streams for idx, s in enumerate(streams): if not isinstance(s, dict): continue codec_type = s.get("codec_type") if codec_type == "video" and video_stream is None: video_stream = s elif codec_type == "audio" and audio_stream is None: audio_stream = s elif codec_type == "subtitle": # Extract subtitle track info tags = s.get("tags", {}) if not isinstance(tags, dict): tags = {} sub_info = { "index": s.get("index", idx), "codec": s.get("codec_name", "unknown"), "language": tags.get("language", tags.get("LANGUAGE", "")), "title": tags.get("title", tags.get("TITLE", "")), } subtitle_tracks.append(sub_info) # Extract video metadata if video_stream: meta["v_codec"] = video_stream.get("codec_name") meta["width"] = video_stream.get("width") meta["height"] = video_stream.get("height") # Parse frame rate frame_rate = video_stream.get("r_frame_rate") or video_stream.get("avg_frame_rate") if isinstance(frame_rate, str) and "/" in frame_rate: try: num, den = frame_rate.split("/", 1) fps = float(num) / float(den) if fps > 0: meta["fps"] = fps except (ValueError, ZeroDivisionError): pass # Video bitrate vb = video_stream.get("bit_rate") if vb is not None: try: meta["v_bitrate"] = int(vb) except ValueError: pass # Pixel format and color info meta["pix_fmt"] = video_stream.get("pix_fmt") meta["color_space"] = video_stream.get("color_space") # Extract audio metadata if audio_stream: meta["a_codec"] = audio_stream.get("codec_name") meta["channels"] = audio_stream.get("channels") meta["sample_rate"] = audio_stream.get("sample_rate") ab = audio_stream.get("bit_rate") if ab is not None: try: meta["a_bitrate"] = int(ab) except ValueError: pass # Add subtitle tracks if subtitle_tracks: meta["subtitle_tracks"] = subtitle_tracks # Extract format metadata if isinstance(fmt, dict): if fmt.get("bit_rate") is not None: try: meta["container_bitrate"] = int(fmt.get("bit_rate")) except ValueError: pass if fmt.get("duration") is not None: try: meta["duration"] = float(fmt.get("duration")) except ValueError: pass meta["format_name"] = fmt.get("format_name") # Extract container tags (title, encoder, etc) ftags = fmt.get("tags", {}) if isinstance(ftags, dict): if ftags.get("title"): meta["container_title"] = ftags.get("title") if ftags.get("encoder"): meta["encoder"] = ftags.get("encoder") return meta if meta else None except (subprocess.SubprocessError, json.JSONDecodeError, OSError): return None def file_fingerprint(path: Path) -> str: """ Generate a content-based fingerprint that survives file renames/moves. Uses sha256 of: file size + first 256KB + last 256KB This allows identifying the same video even if renamed or moved. """ CHUNK_SIZE = 256 * 1024 # 256KB try: size = path.stat().st_size except OSError: size = 0 h = hashlib.sha256() h.update(b"VIDFIDv1\0") h.update(str(size).encode("ascii", errors="ignore")) h.update(b"\0") try: with path.open("rb") as f: # Read head head = f.read(CHUNK_SIZE) h.update(head) # Read tail if file is large enough if size > CHUNK_SIZE: try: f.seek(max(0, size - CHUNK_SIZE)) tail = f.read(CHUNK_SIZE) h.update(tail) except OSError: pass except OSError: pass return h.hexdigest()[:20] def compute_library_id_from_fids(fids: List[str]) -> str: """ Compute a stable library ID from a list of file fingerprints. The same set of files will always produce the same library ID, regardless of file order. """ # Filter and sort for deterministic output valid_fids = sorted(fid for fid in fids if isinstance(fid, str) and fid) h = hashlib.sha256() h.update(b"LIBFIDv2\0") for fid in valid_fids: h.update(fid.encode("ascii", errors="ignore")) h.update(b"\n") return h.hexdigest()[:16] def srt_to_vtt_bytes(srt_text: str) -> bytes: """ Convert SRT subtitle format to WebVTT format. Handles BOM removal and timestamp format conversion (comma -> dot). """ # Remove BOM if present text = srt_text.replace("\ufeff", "") lines = text.splitlines() out: List[str] = ["WEBVTT", ""] i = 0 while i < len(lines): line = lines[i].strip("\r\n") # Skip empty lines if not line.strip(): out.append("") i += 1 continue # Skip cue index numbers (e.g., "1", "2", "3") if re.fullmatch(r"\d+", line.strip()): i += 1 if i >= len(lines): break line = lines[i].strip("\r\n") # Process timestamp lines if "-->" in line: # Convert SRT timestamp format (comma) to VTT format (dot) line = line.replace(",", ".") out.append(line) i += 1 # Collect subtitle text until empty line while i < len(lines): t = lines[i].rstrip("\r\n") if not t.strip(): out.append("") i += 1 break out.append(t) i += 1 else: i += 1 return ("\n".join(out).strip() + "\n").encode("utf-8") # ============================================================================= # State Management Classes # ============================================================================= class Prefs: """Thread-safe preferences manager with persistent storage.""" DEFAULT_PREFS: Dict[str, Any] = { "version": 19, "ui_zoom": 1.0, "split_ratio": 0.62, "dock_ratio": 0.62, "always_on_top": False, "window": {"width": 1320, "height": 860, "x": None, "y": None}, "last_folder_path": None, "last_library_id": None, } def __init__(self) -> None: self.lock = threading.Lock() self.data: Dict[str, Any] = dict(self.DEFAULT_PREFS) self.data["window"] = dict(self.DEFAULT_PREFS["window"]) # Load saved preferences loaded = load_json_with_fallbacks(PREFS_PATH) if isinstance(loaded, dict): # Handle window dict specially to merge rather than replace win = loaded.get("window") if isinstance(win, dict): self.data["window"].update(win) # Update other preferences for k, v in loaded.items(): if k != "window": self.data[k] = v def get(self) -> Dict[str, Any]: """Get a deep copy of current preferences.""" with self.lock: # Deep copy via JSON round-trip return json.loads(json.dumps(self.data)) def update(self, patch: Dict[str, Any]) -> None: """Update preferences with a partial dict and save to disk.""" with self.lock: # Handle window dict specially if "window" in patch and isinstance(patch["window"], dict): self.data.setdefault("window", {}) if not isinstance(self.data["window"], dict): self.data["window"] = {} self.data["window"].update(patch["window"]) patch = {k: v for k, v in patch.items() if k != "window"} self.data.update(patch) self.data["updated_at"] = int(time.time()) atomic_write_json(PREFS_PATH, self.data) PREFS = Prefs() class Library: """ Manages a video library with progress tracking, ordering, and metadata. Thread-safe for concurrent access from Flask routes and background scanners. State is persisted to disk and survives application restarts. """ def __init__(self) -> None: # Library contents self.root: Optional[Path] = None self.files: List[Path] = [] self.fids: List[str] = [] # File fingerprints self.relpaths: List[str] = [] self.rel_to_fid: Dict[str, str] = {} self.fid_to_rel: Dict[str, str] = {} # Persistent state self.state_path: Optional[Path] = None self.state: Dict[str, Any] = {} self.lock = threading.Lock() # Background duration scanner self._scan_lock = threading.Lock() self._scan_thread: Optional[threading.Thread] = None self._scan_stop = False # Metadata cache self._meta_cache: Dict[str, Dict[str, Any]] = {} # fid -> probe meta self._meta_lock = threading.Lock() self._ffprobe, self._ffmpeg = find_ffprobe_ffmpeg() @property def ffprobe_found(self) -> bool: """Check if ffprobe is available for metadata extraction.""" return bool(self._ffprobe) def _sorted_default(self, relpaths: List[str]) -> List[str]: """Sort relative paths using natural sort order.""" return sorted(relpaths, key=natural_key) def _apply_saved_order( self, all_fids: List[str], fid_to_rel: Dict[str, str], order_fids: Optional[List[str]] ) -> List[str]: """Apply saved ordering to file IDs, handling additions and removals.""" if not order_fids or not isinstance(order_fids, list): # Default: sort by relative path using the provided fid_to_rel mapping rel_to_fid_local = {v: k for k, v in fid_to_rel.items()} return [ rel_to_fid_local[rp] for rp in self._sorted_default(list(fid_to_rel.values())) if rp in rel_to_fid_local ] existing = set(all_fids) ordered: List[str] = [] used: set[str] = set() # Add files in saved order for fid in order_fids: if isinstance(fid, str) and fid in existing and fid not in used: ordered.append(fid) used.add(fid) # Append any new files not in saved order remaining = [fid for fid in all_fids if fid not in used] remaining.sort(key=lambda fid: natural_key(fid_to_rel.get(fid, fid))) return ordered + remaining def set_root(self, folder: str) -> Dict[str, Any]: """ Set a new root folder for the library. Scans for video files, loads/creates state, and starts duration scanning. Returns library info dict on success, raises ValueError for invalid folder. """ root = Path(folder).expanduser().resolve() if not root.exists() or not root.is_dir(): raise ValueError("Invalid folder") # Discover video files all_files: List[Path] = [ p for p in root.rglob("*") if p.is_file() and p.suffix.lower() in VIDEO_EXTS ] # Build relative paths and fingerprints relpaths: List[str] = [] fids: List[str] = [] rel_to_fid: Dict[str, str] = {} fid_to_rel: Dict[str, str] = {} for p in all_files: try: rel = str(p.relative_to(root)).replace("\\", "/") except ValueError: rel = p.name fid = file_fingerprint(p) relpaths.append(rel) fids.append(fid) rel_to_fid[rel] = fid # On fingerprint collision, keep first (rare) fid_to_rel.setdefault(fid, rel) library_id = compute_library_id_from_fids(fids) state_path = STATE_DIR / f"library_{library_id}.json" loaded = load_json_with_fallbacks(state_path) or {} # Create baseline state for new libraries now = int(time.time()) baseline: Dict[str, Any] = { "version": 19, "library_id": library_id, "last_path": str(root), "updated_at": now, "current_fid": fids[0] if fids else None, "current_time": 0.0, "volume": 1.0, "autoplay": True, # Default ON for new folders "playback_rate": 1.0, "order_fids": [], "videos": {}, # fid -> video metadata } # Merge with loaded state if same library state = dict(baseline) if isinstance(loaded, dict) and loaded.get("library_id") == library_id: for key in baseline.keys(): if key in loaded: state[key] = loaded[key] for key, value in loaded.items(): if key not in state: state[key] = value state["last_path"] = str(root) # Normalize video metadata vids = state.get("videos", {}) if not isinstance(vids, dict): vids = {} normalized: Dict[str, Any] = {} for fid in fids: m = vids.get(fid) if isinstance(vids.get(fid), dict) else {} watched = float(m.get("watched") or 0.0) normalized[fid] = { "pos": float(m.get("pos") if m.get("pos") is not None else watched), "watched": watched, "duration": m.get("duration"), "finished": bool(m.get("finished")), # Once done, stays done "note": str(m.get("note") or ""), "last_open": int(m.get("last_open") or 0), "subtitle": m.get("subtitle"), # {"vtt": "...", "label": "..."} } state["videos"] = normalized # Normalize settings try: state["volume"] = clamp(float(state.get("volume", 1.0)), 0.0, 1.0) except (ValueError, TypeError): state["volume"] = 1.0 state["autoplay"] = truthy(state.get("autoplay", True)) try: state["playback_rate"] = clamp(float(state.get("playback_rate", 1.0)), 0.25, 3.0) except (ValueError, TypeError): state["playback_rate"] = 1.0 # Clean up order_fids to only include existing files if not isinstance(state.get("order_fids"), list): state["order_fids"] = [] else: fid_set = set(fids) state["order_fids"] = [ fid for fid in state["order_fids"] if isinstance(fid, str) and fid in fid_set ] ordered_fids = self._apply_saved_order(fids, fid_to_rel, state.get("order_fids")) # Build ordered file lists ordered_relpaths = [ fid_to_rel.get(fid) for fid in ordered_fids ] ordered_relpaths = [rp for rp in ordered_relpaths if isinstance(rp, str)] ordered_files: List[Path] = [] for rp in ordered_relpaths: p = (root / rp) if p.exists(): ordered_files.append(p) # current fid cur_fid = state.get("current_fid") if not isinstance(cur_fid, str) or cur_fid not in set(ordered_fids): cur_fid = ordered_fids[0] if ordered_fids else None state["current_fid"] = cur_fid try: state["current_time"] = max(0.0, float(state.get("current_time", 0.0))) except Exception: state["current_time"] = 0.0 with self.lock: self.root = root self.files = ordered_files self.fids = ordered_fids self.relpaths = ordered_relpaths self.rel_to_fid = rel_to_fid self.fid_to_rel = {fid: fid_to_rel.get(fid, fid) for fid in ordered_fids} self.state_path = state_path self.state = state self.save_state() self.start_duration_scan() PREFS.update({"last_folder_path": str(root), "last_library_id": library_id}) push_recent(str(root)) return self.get_library_info() def save_state(self) -> None: with self.lock: if not self.state_path: return self.state["updated_at"] = int(time.time()) atomic_write_json(self.state_path, self.state) def _folder_stats(self, fids: List[str], state: Dict[str, Any]) -> Dict[str, Any]: vids = state.get("videos", {}) if not isinstance(vids, dict): vids = {} finished = 0 remaining = 0 total_dur = 0.0 total_watch = 0.0 known = 0 # simple "top folders" based on current relpaths (still useful) top_counts: Dict[str, int] = {} fid_to_rel = self.fid_to_rel or {} for fid in fids: rp = fid_to_rel.get(fid, "") parts = rp.split("/") top = parts[0] if len(parts) > 1 else "(root)" top_counts[top] = top_counts.get(top, 0) + 1 m = vids.get(fid) if isinstance(vids.get(fid), dict) else {} d = m.get("duration", None) w = float(m.get("watched") or 0.0) fin = bool(m.get("finished") or False) finished += 1 if fin else 0 remaining += 0 if fin else 1 if isinstance(d, (int, float)) and d and d > 0: known += 1 total_dur += float(d) total_watch += min(w, float(d)) remaining_sec = 0.0 for fid in fids: m = vids.get(fid) if isinstance(vids.get(fid), dict) else {} d = m.get("duration", None) if isinstance(d, (int, float)) and d and d > 0: w = float(m.get("watched") or 0.0) remaining_sec += max(0.0, float(d) - min(w, float(d))) top_list = sorted(top_counts.items(), key=lambda kv: (-kv[1], natural_key(kv[0]))) return { "finished_count": finished, "remaining_count": remaining, "remaining_seconds_known": remaining_sec if known > 0 else None, "top_folders": top_list[:12], "durations_known": known, "overall_progress": (total_watch / total_dur) if total_dur > 0 else None, } def _tree_flags(self, rels: List[str]) -> Dict[str, Dict[str, Any]]: """ Calculate tree display flags for each file. Limits depth to 1 level (immediate parent folder only) to avoid overly deep nesting in the UI. """ # First, normalize paths to only consider immediate parent folder normalized: List[str] = [] for rp in rels: parts = rp.split("/") if len(parts) >= 2: # Use only immediate parent + filename normalized.append(parts[-2] + "/" + parts[-1]) else: # File in root normalized.append(parts[-1]) last_index: Dict[str, int] = {} first_index: Dict[str, int] = {} for i, rp in enumerate(normalized): parts = rp.split("/") last_index[""] = i first_index.setdefault("", i) for d in range(1, len(parts)): folder = "/".join(parts[:d]) last_index[folder] = i if folder not in first_index: first_index[folder] = i out: Dict[str, Dict[str, Any]] = {} for i, rp in enumerate(rels): norm_rp = normalized[i] parts = norm_rp.split("/") depth = len(parts) - 1 # Max 1 now parent = norm_rp.rsplit("/", 1)[0] if "/" in norm_rp else "" is_last = (last_index.get(parent, i) == i) has_prev_in_parent = i > first_index.get(parent, i) pipes: List[bool] = [] for j in range(depth): folder = "/".join(parts[:j+1]) pipes.append(last_index.get(folder, i) > i) out[rp] = { "depth": depth, "pipes": pipes, "is_last": is_last, "has_prev_in_parent": has_prev_in_parent } return out def _basic_file_meta(self, fid: str) -> Dict[str, Any]: with self.lock: root = self.root fid_to_rel = dict(self.fid_to_rel) if not root: return {} rp = fid_to_rel.get(fid) if not rp: return {} try: p = (root / rp).resolve() if not is_within_root(root, p) or not p.exists(): return {} st = p.stat() ext = p.suffix.lower() folder = str(Path(rp).parent).replace("\\", "/") if folder == ".": folder = "(root)" return {"ext": ext[1:] if ext.startswith(".") else ext, "size": st.st_size, "mtime": int(st.st_mtime), "folder": folder} except Exception: return {} def _get_cached_meta(self, fid: str) -> Dict[str, Any]: with self._meta_lock: return dict(self._meta_cache.get(fid, {})) def _set_cached_meta(self, fid: str, meta: Dict[str, Any]) -> None: with self._meta_lock: self._meta_cache[fid] = dict(meta) def _auto_subtitle_sidecar(self, video_path: Path) -> Optional[Path]: """Find subtitle file with same name as video (case-insensitive, flexible matching). Handles patterns like: - video.srt / video.vtt (exact match) - video.en.srt / video.en.vtt (with language code) - video.eng.srt / video.english.srt (other language patterns) - Flexible matching: ignores dashes, underscores, extra spaces """ import re def normalize(s: str) -> str: """Normalize string for flexible comparison.""" s = s.lower() # Remove common separators and normalize spaces s = re.sub(r'[-_]+', ' ', s) # Collapse multiple spaces s = re.sub(r'\s+', ' ', s) return s.strip() base_stem = video_path.stem base_norm = normalize(base_stem) parent = video_path.parent # Check all files in the same directory try: candidates = [] for f in parent.iterdir(): if not f.is_file(): continue ext = f.suffix.lower() if ext not in [".vtt", ".srt"]: continue f_stem = f.stem f_norm = normalize(f_stem) # Exact match (video.srt) - case insensitive if f_stem.lower() == base_stem.lower(): candidates.append((0, f)) # Priority 0 = exact match continue # Normalized exact match if f_norm == base_norm: candidates.append((1, f)) # Priority 1 = normalized exact continue # Check if it starts with the video name and has a language suffix # e.g., "video.en" matches "video", "video.eng" matches "video" if f_stem.lower().startswith(base_stem.lower() + "."): lang_part = f_stem[len(base_stem) + 1:].lower() if lang_part in ["en", "eng", "english"]: candidates.append((2, f)) # Priority 2 = English with exact base else: candidates.append((4, f)) # Priority 4 = other lang with exact base continue # Normalized match with language suffix # Remove potential language code from subtitle stem for comparison f_stem_no_lang = re.sub(r'\.(en|eng|english|fr|de|es|it|pt|ru|ja|ko|zh)$', '', f_stem, flags=re.IGNORECASE) f_norm_no_lang = normalize(f_stem_no_lang) if f_norm_no_lang == base_norm: # Check if it had an English language code lang_match = re.search(r'\.(en|eng|english)$', f_stem, flags=re.IGNORECASE) if lang_match: candidates.append((3, f)) # Priority 3 = English with normalized base else: candidates.append((5, f)) # Priority 5 = other/no lang with normalized base # Sort by priority and return best match if candidates: candidates.sort(key=lambda x: x[0]) return candidates[0][1] except Exception: pass return None def _store_subtitle_for_fid(self, fid: str, src_path: Path) -> Optional[Dict[str, Any]]: try: ext = src_path.suffix.lower() if ext not in SUB_EXTS: return None # create stable file name based on fid + subtitle source basename name_part = re.sub(r"[^a-zA-Z0-9._-]+", "_", src_path.stem)[:60] or "subtitle" out_name = f"{fid}_{name_part}.vtt" out_path = (SUBS_DIR / out_name).resolve() if ext == ".vtt": out_path.write_bytes(src_path.read_bytes()) else: data = src_path.read_text(encoding="utf-8", errors="replace") out_path.write_bytes(srt_to_vtt_bytes(data)) rel = f"subtitles/{out_name}" return {"vtt": rel, "label": src_path.name} except Exception: return None def get_subtitle_for_current(self) -> Dict[str, Any]: with self.lock: root = self.root state = dict(self.state) cur = state.get("current_fid") vids = state.get("videos", {}) files = list(self.files) if self.files else [] fids = list(self.fids) if self.fids else [] if not root or not cur or not isinstance(vids, dict): return {"ok": True, "has": False} # Check for already-stored subtitle m = vids.get(cur) if isinstance(m, dict) and isinstance(m.get("subtitle"), dict): sub = m["subtitle"] vtt = sub.get("vtt") if isinstance(vtt, str): p = (STATE_DIR / vtt).resolve() if p.exists(): return {"ok": True, "has": True, "url": f"/sub/{state.get('library_id')}/{cur}", "label": sub.get("label", "Subtitles")} # Get video path from files/fids vp = None try: if cur in fids: idx = fids.index(cur) if idx < len(files): vp = files[idx] except (ValueError, IndexError): pass if not vp or not vp.exists(): return {"ok": True, "has": False} # 1. First try: sidecar subtitle file (case-insensitive) sc = self._auto_subtitle_sidecar(vp) if sc and sc.exists(): # store it so it becomes persistent and portable (stored next to script) stored = self._store_subtitle_for_fid(cur, sc) if stored: with self.lock: mm = self.state["videos"].get(cur, {}) if not isinstance(mm, dict): mm = {} mm["subtitle"] = stored self.state["videos"][cur] = mm self.save_state() return {"ok": True, "has": True, "url": f"/sub/{state.get('library_id')}/{cur}", "label": stored.get("label", "Subtitles")} # 2. Second try: embedded subtitles (prefer English) if self._ffprobe and self._ffmpeg: try: meta = ffprobe_video_metadata(vp, self._ffprobe) if meta and "subtitle_tracks" in meta: tracks = meta["subtitle_tracks"] if tracks: # Try to find English track first english_track = None first_track = None for t in tracks: if first_track is None: first_track = t lang = (t.get("language") or "").lower() title = (t.get("title") or "").lower() if "eng" in lang or "english" in title or lang == "en": english_track = t break # Use English if found, otherwise first track chosen = english_track or first_track if chosen: track_idx = chosen.get("index") if track_idx is not None: # Extract the subtitle result = self.extract_embedded_subtitle(track_idx) if result.get("ok"): return {"ok": True, "has": True, "url": result.get("url"), "label": result.get("label", "Subtitles")} except Exception: pass return {"ok": True, "has": False} def set_subtitle_for_current(self, file_path: str) -> Dict[str, Any]: with self.lock: root = self.root cur = self.state.get("current_fid") if not root or not cur: return {"ok": False, "error": "No video loaded"} p = Path(file_path).expanduser() if not p.exists() or not p.is_file(): return {"ok": False, "error": "Subtitle file not found"} stored = self._store_subtitle_for_fid(cur, p) if not stored: return {"ok": False, "error": "Unsupported subtitle type (use .srt or .vtt)"} with self.lock: mm = self.state["videos"].get(cur, {}) if not isinstance(mm, dict): mm = {"pos": 0.0, "watched": 0.0, "duration": None, "finished": False, "note": "", "last_open": 0, "subtitle": None} mm["subtitle"] = stored self.state["videos"][cur] = mm self.save_state() return {"ok": True, "url": f"/sub/{self.state.get('library_id')}/{cur}", "label": stored.get("label", "Subtitles")} def get_embedded_subtitles(self) -> Dict[str, Any]: """Get list of embedded subtitle tracks for current video.""" with self.lock: root = self.root files = list(self.files) if self.files else [] fids = list(self.fids) if self.fids else [] state = dict(self.state) if not root or not files or not fids: return {"ok": False, "tracks": []} cur = state.get("current_fid") if not isinstance(cur, str) or cur not in set(fids): return {"ok": False, "tracks": []} try: idx = fids.index(cur) video_path = files[idx] except (ValueError, IndexError): return {"ok": False, "tracks": []} # Use ffprobe to get subtitle tracks meta = ffprobe_video_metadata(video_path, self._ffprobe) if not meta or "subtitle_tracks" not in meta: return {"ok": True, "tracks": []} return {"ok": True, "tracks": meta["subtitle_tracks"]} def extract_embedded_subtitle(self, track_index: int) -> Dict[str, Any]: """Extract an embedded subtitle track using ffmpeg.""" with self.lock: root = self.root files = list(self.files) if self.files else [] fids = list(self.fids) if self.fids else [] cur = self.state.get("current_fid") library_id = self.state.get("library_id") if not root or not files or not fids or not cur: return {"ok": False, "error": "No video loaded"} if not self._ffmpeg: return {"ok": False, "error": "ffmpeg not found"} try: idx = fids.index(cur) video_path = files[idx] except (ValueError, IndexError): return {"ok": False, "error": "Video not found"} # Extract subtitle to VTT format SUBS_DIR.mkdir(parents=True, exist_ok=True) out_name = f"{cur}_embedded_{track_index}.vtt" out_path = SUBS_DIR / out_name try: cmd = [ self._ffmpeg, "-y", "-i", str(video_path), "-map", f"0:{track_index}", "-c:s", "webvtt", str(out_path) ] subprocess.run( cmd, capture_output=True, timeout=60, **_subprocess_no_window_kwargs() ) if not out_path.exists(): return {"ok": False, "error": "Failed to extract subtitle"} # Get track label meta = ffprobe_video_metadata(video_path, self._ffprobe) label = "Embedded" if meta and "subtitle_tracks" in meta: for t in meta["subtitle_tracks"]: if t.get("index") == track_index: lang = t.get("language", "") title = t.get("title", "") if title: label = title elif lang: label = lang.upper() break # Store in state stored = {"vtt": f"subtitles/{out_name}", "label": label} with self.lock: mm = self.state["videos"].get(cur, {}) if not isinstance(mm, dict): mm = {} mm["subtitle"] = stored self.state["videos"][cur] = mm self.save_state() return {"ok": True, "url": f"/sub/{library_id}/{cur}", "label": label} except Exception as e: return {"ok": False, "error": str(e)} def get_available_subtitles(self) -> Dict[str, Any]: """Get all available subtitle options (sidecar files + embedded tracks).""" import re with self.lock: root = self.root files = list(self.files) if self.files else [] fids = list(self.fids) if self.fids else [] state = dict(self.state) cur = state.get("current_fid") if not root or not files or not fids or not cur: return {"ok": False, "sidecar": [], "embedded": []} try: idx = fids.index(cur) video_path = files[idx] except (ValueError, IndexError): return {"ok": False, "sidecar": [], "embedded": []} sidecar_subs = [] embedded_subs = [] # Find all sidecar subtitle files def normalize(s: str) -> str: s = s.lower() s = re.sub(r'[-_]+', ' ', s) s = re.sub(r'\s+', ' ', s) return s.strip() base_stem = video_path.stem base_norm = normalize(base_stem) parent = video_path.parent seen_labels = set() # To dedupe vtt/srt with same name try: for f in parent.iterdir(): if not f.is_file(): continue ext = f.suffix.lower() if ext not in [".vtt", ".srt"]: continue f_stem = f.stem f_norm = normalize(f_stem) # Check if this subtitle matches the video is_match = False # Exact match if f_stem.lower() == base_stem.lower(): is_match = True # Normalized match elif f_norm == base_norm: is_match = True # With language suffix (exact base) elif f_stem.lower().startswith(base_stem.lower() + "."): is_match = True # With language suffix (normalized base) else: f_stem_no_lang = re.sub(r'\.(en|eng|english|fr|fra|french|de|deu|german|es|spa|spanish|it|ita|italian|pt|por|portuguese|ru|rus|russian|ja|jpn|japanese|ko|kor|korean|zh|chi|chinese|nl|nld|dutch|pl|pol|polish|sv|swe|swedish|ar|ara|arabic)$', '', f_stem, flags=re.IGNORECASE) if normalize(f_stem_no_lang) == base_norm: is_match = True if is_match: # Extract language from filename lang_match = re.search(r'\.([a-z]{2,3})$', f_stem, flags=re.IGNORECASE) if lang_match: lang = lang_match.group(1).upper() # Map common codes to full names lang_map = {"EN": "English", "ENG": "English", "FR": "French", "FRA": "French", "DE": "German", "DEU": "German", "ES": "Spanish", "SPA": "Spanish", "IT": "Italian", "ITA": "Italian", "PT": "Portuguese", "POR": "Portuguese", "RU": "Russian", "RUS": "Russian", "JA": "Japanese", "JPN": "Japanese", "KO": "Korean", "KOR": "Korean", "ZH": "Chinese", "CHI": "Chinese", "NL": "Dutch", "NLD": "Dutch", "PL": "Polish", "POL": "Polish", "SV": "Swedish", "SWE": "Swedish", "AR": "Arabic", "ARA": "Arabic"} label = lang_map.get(lang, lang) else: label = "Subtitles" # Dedupe: if we already have this label, skip (prefer vtt over srt) dedupe_key = label.lower() if dedupe_key in seen_labels: continue seen_labels.add(dedupe_key) sidecar_subs.append({ "path": str(f), "label": label, "format": ext[1:].upper() }) except Exception: pass # Sort sidecar subs: English first, then alphabetically def sub_sort_key(s): label = s.get("label", "").lower() if "english" in label or label == "en": return (0, label) return (1, label) sidecar_subs.sort(key=sub_sort_key) # Get embedded subtitles if self._ffprobe: try: meta = ffprobe_video_metadata(video_path, self._ffprobe) if meta and "subtitle_tracks" in meta: for track in meta["subtitle_tracks"]: lang = track.get("language", "") title = track.get("title", "") if title: label = title elif lang: label = lang.upper() else: label = f"Track {track.get('index', '?')}" embedded_subs.append({ "index": track.get("index"), "label": label, "codec": track.get("codec", ""), "language": lang }) except Exception: pass return {"ok": True, "sidecar": sidecar_subs, "embedded": embedded_subs} def load_sidecar_subtitle(self, file_path: str) -> Dict[str, Any]: """Load a specific sidecar subtitle file.""" with self.lock: cur = self.state.get("current_fid") library_id = self.state.get("library_id") if not cur: return {"ok": False, "error": "No video loaded"} p = Path(file_path) if not p.exists() or not p.is_file(): return {"ok": False, "error": "Subtitle file not found"} stored = self._store_subtitle_for_fid(cur, p) if not stored: return {"ok": False, "error": "Unsupported subtitle type"} with self.lock: mm = self.state["videos"].get(cur, {}) if not isinstance(mm, dict): mm = {} mm["subtitle"] = stored self.state["videos"][cur] = mm self.save_state() return {"ok": True, "url": f"/sub/{library_id}/{cur}", "label": stored.get("label", "Subtitles")} def reset_watch_progress(self) -> Dict[str, Any]: # Reset only progress fields; keep notes, duration, subtitles, volume, rate, etc. with self.lock: vids = self.state.get("videos", {}) if not isinstance(vids, dict): return {"ok": False, "error": "No library"} for fid, m in list(vids.items()): if not isinstance(m, dict): continue m["pos"] = 0.0 m["watched"] = 0.0 m["finished"] = False vids[fid] = m self.state["videos"] = vids self.state["current_time"] = 0.0 self.save_state() return {"ok": True} def get_current_video_metadata(self) -> Dict[str, Any]: with self.lock: root = self.root files = list(self.files) if self.files else [] fids = list(self.fids) if self.fids else [] state = dict(self.state) if not root or not files or not fids: return {"ok": False, "error": "No library", "ffprobe_found": self.ffprobe_found} cur = state.get("current_fid") if not isinstance(cur, str) or cur not in set(fids): cur = fids[0] idx = fids.index(cur) p = files[idx] basic = self._basic_file_meta(cur) cached = self._get_cached_meta(cur) out = {"ok": True, "fid": cur, "basic": basic, "probe": cached or None, "ffprobe_found": self.ffprobe_found} if cached: return out probe = ffprobe_video_metadata(p, self._ffprobe) if probe: self._set_cached_meta(cur, probe) out["probe"] = probe else: out["probe"] = None return out def get_library_info(self) -> Dict[str, Any]: with self.lock: root = self.root files = list(self.files) fids = list(self.fids) rels = list(self.relpaths) state = dict(self.state) if not root: return {"ok": False, "error": "No folder selected"} has_subdirs = any("/" in rp for rp in rels) tree = self._tree_flags(rels) if has_subdirs else {} vids = state.get("videos", {}) if not isinstance(vids, dict): vids = {} items = [] for i, (fid, f) in enumerate(zip(fids, files)): rel = rels[i] if i < len(rels) else str(f.relative_to(root)).replace("\\", "/") meta = vids.get(fid, {"pos": 0.0, "watched": 0.0, "duration": None, "finished": False, "note": "", "last_open": 0, "subtitle": None}) nice = pretty_title_from_filename(f.name) tf = tree.get(rel, {"depth": 0, "pipes": [], "is_last": False, "has_prev_in_parent": False}) items.append({ "index": i, "fid": fid, "name": f.name, "title": nice, "relpath": rel, "depth": int(tf.get("depth", 0)), "pipes": tf.get("pipes", []), "is_last": bool(tf.get("is_last", False)), "has_prev_in_parent": bool(tf.get("has_prev_in_parent", False)), "pos": float(meta.get("pos") or 0.0), "watched": float(meta.get("watched") or 0.0), "duration": meta.get("duration"), "finished": bool(meta.get("finished") or False), "note_len": len(str(meta.get("note") or "")), "last_open": int(meta.get("last_open") or 0), "has_sub": isinstance(meta.get("subtitle"), dict), }) stats = self._folder_stats(fids, state) cur_fid = state.get("current_fid") cur_idx = fids.index(cur_fid) if isinstance(cur_fid, str) and cur_fid in fids else 0 next_up = None for j in range(cur_idx + 1, len(items)): if not items[j]["finished"]: next_up = items[j]; break return { "ok": True, "folder": str(root), "library_id": state.get("library_id"), "count": len(files), "current_index": cur_idx, "current_fid": fids[cur_idx] if fids else None, "current_time": float(state.get("current_time", 0.0)), "folder_volume": float(state.get("volume", 1.0)), "folder_autoplay": bool(state.get("autoplay", True)), "folder_rate": float(state.get("playback_rate", 1.0)), "items": items, "has_subdirs": has_subdirs, "overall_progress": stats["overall_progress"], "durations_known": stats["durations_known"], "finished_count": stats["finished_count"], "remaining_count": stats["remaining_count"], "remaining_seconds_known": stats["remaining_seconds_known"], "top_folders": stats["top_folders"], "next_up": {"index": next_up["index"], "title": next_up["title"]} if next_up else None, } def update_progress(self, index: int, current_time: float, duration: Optional[float], playing: bool) -> Dict[str, Any]: now = int(time.time()) with self.lock: if not self.root or not self.files or not self.fids: return {"ok": False, "error": "No library"} index = max(0, min(int(index), len(self.files) - 1)) fid = self.fids[index] current_time = max(0.0, float(current_time or 0.0)) if duration is not None: try: duration = float(duration) except Exception: duration = None self.state["current_fid"] = fid self.state["current_time"] = current_time meta = self.state["videos"].get(fid, {"pos": 0.0, "watched": 0.0, "duration": None, "finished": False, "note": "", "last_open": 0, "subtitle": None}) meta["pos"] = current_time meta["watched"] = max(float(meta.get("watched") or 0.0), current_time) if isinstance(duration, (int, float)) and duration and duration > 0: meta["duration"] = duration # once finished, always finished (do NOT flip it back) d2 = meta.get("duration") if isinstance(d2, (int, float)) and d2 and d2 > 0: meta["finished"] = bool(meta.get("finished") or (current_time >= max(0.0, float(d2) - 2.0))) if playing: meta["last_open"] = now self.state["videos"][fid] = meta self.save_state() return {"ok": True} def set_current(self, index: int, timecode: float = 0.0) -> Dict[str, Any]: with self.lock: if not self.root or not self.files or not self.fids: return {"ok": False, "error": "No library"} index = max(0, min(int(index), len(self.files) - 1)) self.state["current_fid"] = self.fids[index] self.state["current_time"] = max(0.0, float(timecode or 0.0)) self.save_state() return {"ok": True} def set_folder_volume(self, volume: float) -> Dict[str, Any]: with self.lock: if not self.root: return {"ok": False, "error": "No library"} try: v = float(volume) except Exception: v = 1.0 self.state["volume"] = clamp(v, 0.0, 1.0) self.save_state() return {"ok": True} def set_folder_autoplay(self, enabled: bool) -> Dict[str, Any]: with self.lock: if not self.root: return {"ok": False, "error": "No library"} self.state["autoplay"] = bool(enabled) self.save_state() return {"ok": True} def set_folder_rate(self, rate: float) -> Dict[str, Any]: with self.lock: if not self.root: return {"ok": False, "error": "No library"} try: r = float(rate) except Exception: r = 1.0 self.state["playback_rate"] = clamp(r, 0.25, 3.0) self.save_state() return {"ok": True} def set_order(self, fids: List[str]) -> Dict[str, Any]: with self.lock: if not self.root or not self.files or not self.fids: return {"ok": False, "error": "No library"} if not isinstance(fids, list) or not all(isinstance(x, str) for x in fids): return {"ok": False, "error": "order must be a list of fids"} existing = set(self.fids) seen, cleaned = set(), [] for fid in fids: if fid in existing and fid not in seen: cleaned.append(fid); seen.add(fid) # append remaining remaining = [fid for fid in self.fids if fid not in seen] remaining.sort(key=lambda fid: natural_key(self.fid_to_rel.get(fid, fid))) new_fids = cleaned + remaining # rebuild lists fid_to_rel = dict(self.fid_to_rel) rel_to_file = {str(p.relative_to(self.root)).replace("\\", "/"): p for p in self.root.rglob("*") if p.is_file() and p.suffix.lower() in VIDEO_EXTS} new_rels = [fid_to_rel.get(fid) for fid in new_fids] new_rels = [rp for rp in new_rels if isinstance(rp, str)] new_files = [rel_to_file[rp] for rp in new_rels if rp in rel_to_file] cur = self.state.get("current_fid") if not isinstance(cur, str) or cur not in set(new_fids): cur = new_fids[0] if new_fids else None self.fids = new_fids self.relpaths = new_rels self.files = new_files self.state["order_fids"] = cleaned self.state["current_fid"] = cur self.save_state() return {"ok": True} def get_video_path(self, index: int) -> Path: with self.lock: if not self.root or not self.files: raise FileNotFoundError("No library") index = max(0, min(int(index), len(self.files) - 1)) p, root = self.files[index], self.root if not is_within_root(root, p): raise PermissionError("Invalid path") return p def get_note(self, fid: str) -> str: with self.lock: vids = self.state.get("videos", {}) if not isinstance(vids, dict): return "" meta = vids.get(fid) if not isinstance(meta, dict): return "" return str(meta.get("note") or "") def set_note(self, fid: str, note: str) -> Dict[str, Any]: note = str(note or "") with self.lock: if not self.root: return {"ok": False, "error": "No library"} vids = self.state.get("videos", {}) if not isinstance(vids, dict): vids = {}; self.state["videos"] = vids meta = vids.get(fid) if not isinstance(meta, dict): meta = {"pos": 0.0, "watched": 0.0, "duration": None, "finished": False, "note": "", "last_open": 0, "subtitle": None} meta["note"] = note vids[fid] = meta self.save_state() return {"ok": True, "len": len(note)} def start_duration_scan(self) -> None: with self._scan_lock: if self._scan_thread and self._scan_thread.is_alive(): return self._scan_stop = False t = threading.Thread(target=self._duration_scan_worker, daemon=True) self._scan_thread = t t.start() def _duration_scan_worker(self) -> None: ffprobe, ffmpeg = self._ffprobe, self._ffmpeg if not ffprobe and not ffmpeg: return with self.lock: if not self.root or not self.files or not self.fids: return root = self.root files = list(self.files) fids = list(self.fids) for fid, f in zip(fids, files): if self._scan_stop: return with self.lock: if not self.root or str(self.root) != str(root): return meta = self.state["videos"].get(fid) if not isinstance(meta, dict): continue d = meta.get("duration") if isinstance(d, (int, float)) and d and d > 0: continue dur = duration_seconds(f, ffprobe, ffmpeg) if dur and dur > 0: with self.lock: if not self.root or str(self.root) != str(root): return m = self.state["videos"].get(fid, {"pos": 0.0, "watched": 0.0, "duration": None, "finished": False, "note": "", "last_open": 0, "subtitle": None}) m["duration"] = float(dur) # do NOT unset finished if it was already true try: m["finished"] = bool(m.get("finished") or (float(m.get("watched") or 0.0) >= max(0.0, float(dur) - 2.0))) except Exception: pass self.state["videos"][fid] = m self.save_state() LIB = Library() def _send_file_range(path: Path) -> Response: file_size = path.stat().st_size range_header = request.headers.get("Range", None) if not range_header: def gen(): with path.open("rb") as f: while True: chunk = f.read(1024 * 512) if not chunk: break yield chunk resp = Response(gen(), status=200, mimetype="application/octet-stream") resp.headers["Content-Length"] = str(file_size) resp.headers["Accept-Ranges"] = "bytes" resp.headers["Cache-Control"] = "no-store" return resp try: units, rng = range_header.split("=", 1) if units.strip() != "bytes": raise ValueError start_s, end_s = rng.split("-", 1) start = int(start_s) if start_s else 0 end = int(end_s) if end_s else (file_size - 1) if start < 0 or end < start: raise ValueError end = min(end, file_size - 1) except Exception: return abort(416) length = end - start + 1 def gen(): with path.open("rb") as f: f.seek(start) remaining = length while remaining > 0: chunk = f.read(min(1024 * 512, remaining)) if not chunk: break remaining -= len(chunk) yield chunk resp = Response(gen(), status=206, mimetype="application/octet-stream") resp.headers["Content-Range"] = f"bytes {start}-{end}/{file_size}" resp.headers["Content-Length"] = str(length) resp.headers["Accept-Ranges"] = "bytes" resp.headers["Cache-Control"] = "no-store" return resp @app.get("/") def index(): return Response(HTML, mimetype="text/html") @app.get("/fonts.css") def fonts_css(): if not FONTS_CSS_PATH.exists(): return Response("", mimetype="text/css") return Response(FONTS_CSS_PATH.read_text(encoding="utf-8", errors="replace"), mimetype="text/css") @app.get("/fonts/") def fonts_file(filename: str): path = (FONTS_DIR / filename).resolve() if not path.exists() or not str(path).startswith(str(FONTS_DIR.resolve())): abort(404) mime, _ = mimetypes.guess_type(str(path)) return send_file(str(path), mimetype=mime or "application/octet-stream", conditional=True, etag=True, max_age=0) @app.get("/fa.css") def fa_css(): if not FA_CSS_PATH.exists(): return Response("", mimetype="text/css") return Response(FA_CSS_PATH.read_text(encoding="utf-8", errors="replace"), mimetype="text/css") @app.get("/fa/webfonts/") def fa_webfont(filename: str): path = (FA_WEBFONTS_DIR / filename).resolve() if not path.exists() or not str(path).startswith(str(FA_WEBFONTS_DIR.resolve())): abort(404) mime, _ = mimetypes.guess_type(str(path)) return send_file(str(path), mimetype=mime or "application/octet-stream", conditional=True, etag=True, max_age=0) @app.get("/video/") def video(index: int): try: p = LIB.get_video_path(index) except Exception: return abort(404) ext = p.suffix.lower() mime_map = { ".mp4": "video/mp4", ".m4v": "video/mp4", ".webm": "video/webm", ".ogv": "video/ogg", ".mov": "video/quicktime", ".mkv": "video/x-matroska", ".avi": "video/x-msvideo", ".mpeg": "video/mpeg", ".mpg": "video/mpeg", ".m2ts": "video/mp2t", ".mts": "video/mp2t", } resp = _send_file_range(p) resp.mimetype = mime_map.get(ext, "application/octet-stream") return resp @app.get("/sub//") def subtitle(libid: str, fid: str): # libid is informational; fid is the actual key we store subtitles under with LIB.lock: state = dict(LIB.state) vids = state.get("videos", {}) if not isinstance(vids, dict): abort(404) m = vids.get(fid) if not isinstance(m, dict): abort(404) sub = m.get("subtitle") if not isinstance(sub, dict): abort(404) rel = sub.get("vtt") if not isinstance(rel, str): abort(404) p = (STATE_DIR / rel).resolve() if not p.exists() or not str(p).startswith(str(SUBS_DIR.resolve().parent)): abort(404) return send_file(str(p), mimetype="text/vtt", conditional=True, etag=True, max_age=0) def run_flask(port: int): import logging logging.getLogger("werkzeug").setLevel(logging.ERROR) app.run(host=HOST, port=port, debug=False, use_reloader=False, threaded=True) class API: def select_folder(self) -> Dict[str, Any]: win = webview.windows[0] res = win.create_file_dialog(webview.FOLDER_DIALOG if not hasattr(webview, 'FileDialog') else webview.FileDialog.FOLDER, allow_multiple=False) if not res: return {"ok": False, "cancelled": True} folder = res[0] try: return LIB.set_root(folder) except Exception as e: return {"ok": False, "error": str(e)} def open_folder_path(self, folder: str) -> Dict[str, Any]: try: return LIB.set_root(folder) except Exception as e: return {"ok": False, "error": str(e)} def get_recents(self) -> Dict[str, Any]: items = load_recents() valid: List[str] = [] for p in items: try: if Path(p).expanduser().exists() and Path(p).expanduser().is_dir(): valid.append(p) except Exception: pass # if anything invalid was removed, persist the cleaned list if valid != items: save_recents(valid) out = [{"name": folder_display_name(p), "path": p} for p in valid] return {"ok": True, "items": out} def remove_recent(self, path: str) -> Dict[str, Any]: """Remove a specific folder from the recent folders list.""" items = load_recents() if path in items: items.remove(path) save_recents(items) return {"ok": True} def get_library(self) -> Dict[str, Any]: return LIB.get_library_info() def set_current(self, index: int, timecode: float = 0.0) -> Dict[str, Any]: return LIB.set_current(index, timecode) def tick_progress(self, index: int, current_time: float, duration: Optional[float], playing: bool) -> Dict[str, Any]: return LIB.update_progress(index, current_time, duration, playing) def set_folder_volume(self, volume: float) -> Dict[str, Any]: return LIB.set_folder_volume(volume) def set_folder_autoplay(self, enabled: bool) -> Dict[str, Any]: return LIB.set_folder_autoplay(bool(enabled)) def set_folder_rate(self, rate: float) -> Dict[str, Any]: return LIB.set_folder_rate(float(rate)) def set_order(self, fids: List[str]) -> Dict[str, Any]: return LIB.set_order(fids) def start_duration_scan(self) -> Dict[str, Any]: LIB.start_duration_scan() return {"ok": True} def get_prefs(self) -> Dict[str, Any]: return {"ok": True, "prefs": PREFS.get()} def set_prefs(self, patch: Dict[str, Any]) -> Dict[str, Any]: if not isinstance(patch, dict): return {"ok": False, "error": "patch must be an object"} if "ui_zoom" in patch: try: patch["ui_zoom"] = float(patch["ui_zoom"]) except Exception: patch.pop("ui_zoom", None) if "split_ratio" in patch: try: patch["split_ratio"] = float(patch["split_ratio"]) except Exception: patch.pop("split_ratio", None) if "dock_ratio" in patch: try: patch["dock_ratio"] = float(patch["dock_ratio"]) except Exception: patch.pop("dock_ratio", None) if "always_on_top" in patch: patch["always_on_top"] = bool(patch["always_on_top"]) PREFS.update(patch) return {"ok": True} def set_always_on_top(self, enabled: bool) -> Dict[str, Any]: enabled = bool(enabled) PREFS.update({"always_on_top": enabled}) try: win = webview.windows[0] if hasattr(win, "on_top"): setattr(win, "on_top", enabled) except Exception: pass return {"ok": True} def save_window_state(self) -> Dict[str, Any]: try: win = webview.windows[0] w = getattr(win, "width", None) h = getattr(win, "height", None) x = getattr(win, "x", None) y = getattr(win, "y", None) patch: Dict[str, Any] = {"window": {}} if isinstance(w, int) and w > 100: patch["window"]["width"] = w if isinstance(h, int) and h > 100: patch["window"]["height"] = h if isinstance(x, int): patch["window"]["x"] = x if isinstance(y, int): patch["window"]["y"] = y PREFS.update(patch) return {"ok": True} except Exception as e: return {"ok": False, "error": str(e)} def get_note(self, fid: str) -> Dict[str, Any]: if not isinstance(fid, str): return {"ok": False, "error": "bad fid"} return {"ok": True, "note": LIB.get_note(fid)} def set_note(self, fid: str, note: str) -> Dict[str, Any]: if not isinstance(fid, str): return {"ok": False, "error": "bad fid"} return LIB.set_note(fid, note) def get_current_video_meta(self) -> Dict[str, Any]: return LIB.get_current_video_metadata() def get_current_subtitle(self) -> Dict[str, Any]: return LIB.get_subtitle_for_current() def get_embedded_subtitles(self) -> Dict[str, Any]: """Get list of embedded subtitle tracks for current video.""" return LIB.get_embedded_subtitles() def extract_embedded_subtitle(self, track_index: int) -> Dict[str, Any]: """Extract an embedded subtitle track and return its URL.""" return LIB.extract_embedded_subtitle(track_index) def get_available_subtitles(self) -> Dict[str, Any]: """Get all available subtitle options (sidecar + embedded).""" return LIB.get_available_subtitles() def load_sidecar_subtitle(self, file_path: str) -> Dict[str, Any]: """Load a specific sidecar subtitle file.""" return LIB.load_sidecar_subtitle(file_path) def choose_subtitle_file(self) -> Dict[str, Any]: win = webview.windows[0] dialog_type = webview.OPEN_DIALOG if not hasattr(webview, 'FileDialog') else webview.FileDialog.OPEN res = win.create_file_dialog( dialog_type, allow_multiple=False, file_types=("Subtitle files (*.srt;*.vtt)",) ) if not res: return {"ok": False, "cancelled": True} return LIB.set_subtitle_for_current(res[0]) def reset_watch_progress(self) -> Dict[str, Any]: return LIB.reset_watch_progress() HTML = r""" TutorialDock
TutorialDock
Watch local tutorials, resume instantly, and actually finish them.
100%
No video loaded
-
Overall
-
00:00 / 00:00
100%
Notes
Saved
Info
Folder
-
Next up
-
Structure
-
Title
-
Relpath
-
Position
-
File
-
Video
-
Audio
-
Subtitles
-
Finished
-
Remaining
-
ETA
-
Volume
-
Speed
-
Durations
-
Top folders
-
Playlist
-
""" # ============================================================================= # Application Entry Point # ============================================================================= def main() -> None: """Initialize and run the TutorialDock application.""" # Download and cache fonts for offline use (fail silently) try: ensure_google_fonts_local() except Exception: pass try: ensure_fontawesome_local() except Exception: pass # Restore last opened folder if it exists prefs = PREFS.get() last_path = prefs.get("last_folder_path") if isinstance(last_path, str) and last_path.strip(): try: if Path(last_path).expanduser().exists(): LIB.set_root(last_path) except Exception: pass # Start Flask server on a free port port = pick_free_port(HOST) flask_thread = threading.Thread(target=run_flask, args=(port,), daemon=True) flask_thread.start() # Get window dimensions from saved preferences window_prefs = prefs.get("window", {}) if isinstance(prefs.get("window"), dict) else {} width = int(window_prefs.get("width") or 1320) height = int(window_prefs.get("height") or 860) x = window_prefs.get("x") y = window_prefs.get("y") on_top = bool(prefs.get("always_on_top", False)) # Create and start the webview window api = API() webview.create_window( "TutorialDock", url=f"http://{HOST}:{port}/", width=width, height=height, x=x if isinstance(x, int) else None, y=y if isinstance(y, int) else None, on_top=on_top, js_api=api, ) webview.start(debug=False) if __name__ == "__main__": main()