From dd9a8082d900ec95cdde0b0c17c5411207850a7a Mon Sep 17 00:00:00 2001 From: lashman Date: Thu, 19 Feb 2026 01:16:17 +0200 Subject: [PATCH] existing TutorialDock Python app --- scripts/rename_folder.py | 33 + tutorial.py | 5282 ++++++++++++++++++++++++++++++++++++++ tutorials.bat | 6 + 3 files changed, 5321 insertions(+) create mode 100644 scripts/rename_folder.py create mode 100644 tutorial.py create mode 100644 tutorials.bat diff --git a/scripts/rename_folder.py b/scripts/rename_folder.py new file mode 100644 index 0000000..7bb0d69 --- /dev/null +++ b/scripts/rename_folder.py @@ -0,0 +1,33 @@ +import os + + +def rename_folder(): + # Get the current working directory + current_dir = os.getcwd() + + print(f"Current directory: {current_dir}") + + # Prompt user for the current folder name and new folder name + current_name = input("Enter the current folder name to rename: ") + new_name = input("Enter the new folder name: ") + + # Construct full paths + current_path = os.path.join(current_dir, current_name) + new_path = os.path.join(current_dir, new_name) + + try: + # Rename the folder + os.rename(current_path, new_path) + print(f"Folder '{current_name}' renamed to '{new_name}' successfully.") + except FileNotFoundError: + print(f"Error: Folder '{current_name}' does not exist in {current_dir}.") + except PermissionError: + print( + "Error: Permission denied. Try running the script with administrator privileges." + ) + except Exception as e: + print(f"An unexpected error occurred: {e}") + + +if __name__ == "__main__": + rename_folder() diff --git a/tutorial.py b/tutorial.py new file mode 100644 index 0000000..6c56e89 --- /dev/null +++ b/tutorial.py @@ -0,0 +1,5282 @@ +#!/usr/bin/env python3 +"""TutorialDock - A video tutorial library manager with progress tracking.""" + +import hashlib +import json +import mimetypes +import os +import re +import shutil +import socket +import subprocess +import threading +import time +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Tuple +from urllib.parse import urlparse +from urllib.request import Request, urlopen + +from flask import Flask, Response, abort, request, send_file + +# ============================================================================= +# Configuration and Constants +# ============================================================================= + +APP_DIR = Path(__file__).resolve().parent +STATE_DIR = APP_DIR / "state" +STATE_DIR.mkdir(parents=True, exist_ok=True) + +WEBVIEW_PROFILE_DIR = STATE_DIR / "webview_profile" +WEBVIEW_PROFILE_DIR.mkdir(parents=True, exist_ok=True) + +FONTS_DIR = STATE_DIR / "fonts" +FONTS_DIR.mkdir(parents=True, exist_ok=True) +FONTS_CSS_PATH = FONTS_DIR / "fonts.css" +FONTS_META_PATH = FONTS_DIR / "fonts_meta.json" + +FA_DIR = STATE_DIR / "fontawesome" +FA_DIR.mkdir(parents=True, exist_ok=True) +FA_WEBFONTS_DIR = FA_DIR / "webfonts" +FA_WEBFONTS_DIR.mkdir(parents=True, exist_ok=True) +FA_CSS_PATH = FA_DIR / "fa.css" +FA_META_PATH = FA_DIR / "fa_meta.json" + +SUBS_DIR = STATE_DIR / "subtitles" +SUBS_DIR.mkdir(parents=True, exist_ok=True) + +RECENTS_PATH = STATE_DIR / "recent_folders.json" +RECENTS_MAX = 50 + +# Register additional MIME types +mimetypes.add_type("font/woff2", ".woff2") +mimetypes.add_type("font/woff", ".woff") +mimetypes.add_type("font/ttf", ".ttf") +mimetypes.add_type("application/vnd.ms-fontobject", ".eot") +mimetypes.add_type("image/svg+xml", ".svg") +mimetypes.add_type("text/vtt", ".vtt") + +if os.name == "nt": + os.environ["WEBVIEW2_USER_DATA_FOLDER"] = str(WEBVIEW_PROFILE_DIR) + +import webview # noqa: E402 # Must import after setting env vars + +# Supported file extensions +VIDEO_EXTS: set[str] = { + ".mp4", ".m4v", ".mov", ".webm", ".mkv", + ".avi", ".mpg", ".mpeg", ".m2ts", ".mts", + ".ogv" +} +SUB_EXTS: set[str] = {".vtt", ".srt"} + +BACKUP_COUNT: int = 8 +HOST: str = "127.0.0.1" +PREFS_PATH: Path = STATE_DIR / "prefs.json" + +app = Flask(__name__) + +# ============================================================================= +# Utility Functions +# ============================================================================= + + +def _subprocess_no_window_kwargs() -> dict[str, Any]: + """Return subprocess kwargs to hide console windows on Windows.""" + if os.name != "nt": + return {} + si = subprocess.STARTUPINFO() + si.dwFlags |= subprocess.STARTF_USESHOWWINDOW + return {"startupinfo": si, "creationflags": subprocess.CREATE_NO_WINDOW} + +def clamp(v: float, a: float, b: float) -> float: + """Clamp value v to the range [a, b].""" + return max(a, min(b, v)) + + +def atomic_write_json(path: Path, data: Dict[str, Any], backup_count: int = BACKUP_COUNT) -> None: + """ + Write JSON data to a file atomically with backup rotation. + + Creates rolling backups (.bak1 through .bakN) and a .lastgood copy + for crash recovery. + """ + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + payload = json.dumps(data, ensure_ascii=False, indent=2) + + if path.exists(): + # Rotate existing backups + for i in range(backup_count, 0, -1): + src = path.with_suffix(path.suffix + f".bak{i}") + dst = path.with_suffix(path.suffix + f".bak{i+1}") + if src.exists(): + try: + if dst.exists(): + dst.unlink() + src.rename(dst) + except OSError: + pass + + # Move current to .bak1 + bak1 = path.with_suffix(path.suffix + ".bak1") + try: + if bak1.exists(): + bak1.unlink() + path.rename(bak1) + except OSError: + pass + + # Write atomically via tmp file + tmp.write_text(payload, encoding="utf-8") + tmp.replace(path) + + # Keep a lastgood copy for recovery + try: + lastgood = path.with_suffix(path.suffix + ".lastgood") + lastgood.write_text(payload, encoding="utf-8") + except OSError: + pass + + +def load_json_with_fallbacks(path: Path) -> Optional[Dict[str, Any]]: + """ + Load JSON from path, falling back to backups if primary is corrupted. + + Tries: path -> .lastgood -> .bak1 -> .bak2 -> ... + """ + candidates = [path, path.with_suffix(path.suffix + ".lastgood")] + for i in range(1, BACKUP_COUNT + 3): + candidates.append(path.with_suffix(path.suffix + f".bak{i}")) + + for p in candidates: + if p.exists(): + try: + return json.loads(p.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + continue + return None + + +def is_within_root(root: Path, target: Path) -> bool: + """Check if target path is within the root directory (prevents path traversal).""" + try: + root = root.resolve() + target = target.resolve() + return str(target).startswith(str(root) + os.sep) or target == root + except OSError: + return False + +def truthy(v: Any) -> bool: + """Convert various types to boolean (handles string 'true', 'yes', '1', etc.).""" + if isinstance(v, bool): + return v + if isinstance(v, (int, float)): + return v != 0 + if isinstance(v, str): + return v.strip().lower() in {"1", "true", "yes", "y", "on"} + return False + + +def pick_free_port(host: str = HOST) -> int: + """Find and return an available TCP port on the specified host.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind((host, 0)) + return s.getsockname()[1] + + +def folder_display_name(path_str: str) -> str: + """Get a display-friendly name for a folder path.""" + try: + p = Path(path_str) + return p.name or str(p) + except (ValueError, OSError): + return path_str + + +def _deduplicate_list(items: List[str]) -> List[str]: + """Remove duplicates from a list while preserving order.""" + seen: set[str] = set() + result: List[str] = [] + for item in items: + if isinstance(item, str) and item.strip(): + s = item.strip() + if s not in seen: + result.append(s) + seen.add(s) + return result + + +def load_recents() -> List[str]: + """Load the list of recently opened folders.""" + data = load_json_with_fallbacks(RECENTS_PATH) + if not isinstance(data, dict): + return [] + items = data.get("items", []) + if not isinstance(items, list): + return [] + return _deduplicate_list(items)[:RECENTS_MAX] + + +def save_recents(paths: List[str]) -> None: + """Save the list of recently opened folders.""" + cleaned = _deduplicate_list(paths)[:RECENTS_MAX] + atomic_write_json(RECENTS_PATH, { + "version": 2, + "updated_at": int(time.time()), + "items": cleaned + }) + + +def push_recent(path_str: str) -> None: + """Add a folder to the top of the recent folders list.""" + paths = load_recents() + path_str = path_str.strip() + paths = [p for p in paths if p != path_str] + paths.insert(0, path_str) + save_recents(paths) + +# Compiled regex patterns (compiled once for efficiency) +CSS_URL_RE = re.compile(r"url\((https://[^)]+)\)\s*format\('([^']+)'\)") + + +def _http_get_text(url: str, timeout: int = 20) -> str: + """Fetch text content from a URL.""" + req = Request(url, headers={"User-Agent": "Mozilla/5.0"}) + with urlopen(req, timeout=timeout) as r: + return r.read().decode("utf-8", errors="replace") + + +def _http_get_bytes(url: str, timeout: int = 30) -> bytes: + """Fetch binary content from a URL.""" + req = Request(url, headers={"User-Agent": "Mozilla/5.0"}) + with urlopen(req, timeout=timeout) as r: + return r.read() + + +def _safe_filename_from_url(u: str) -> str: + """Generate a safe local filename from a URL with a hash for uniqueness.""" + p = urlparse(u) + base = Path(p.path).name or "font.woff2" + url_hash = hashlib.sha256(u.encode("utf-8")).hexdigest()[:10] + if "." not in base: + base += ".woff2" + stem, suffix = Path(base).stem, Path(base).suffix + return f"{stem}-{url_hash}{suffix}" + + +def ensure_google_fonts_local() -> None: + """Download and cache Google Fonts locally for offline use.""" + wanted = { + "Sora": "https://fonts.googleapis.com/css2?family=Sora:wght@500;600;700;800&display=swap", + "Manrope": "https://fonts.googleapis.com/css2?family=Manrope:wght@400;500;600;700;800&display=swap", + "IBM Plex Mono": "https://fonts.googleapis.com/css2?family=IBM+Plex+Mono:wght@400;500;600&display=swap", + } + + meta = load_json_with_fallbacks(FONTS_META_PATH) or {} + if (FONTS_CSS_PATH.exists() + and isinstance(meta, dict) + and meta.get("ok") is True + and meta.get("version") == 7): + return + + combined_css_parts: List[str] = [] + downloaded: Dict[str, int] = {} + ok = True + errors: List[str] = [] + + for family, css_url in wanted.items(): + try: + css = _http_get_text(css_url) + + def replace_match(m: re.Match) -> str: + url, fmt = m.group(1), m.group(2) + if fmt.lower() != "woff2": + return m.group(0) + fn = _safe_filename_from_url(url) + local_path = FONTS_DIR / fn + if not local_path.exists(): + data = _http_get_bytes(url) + local_path.write_bytes(data) + downloaded[fn] = len(data) + return f"url(/fonts/{fn}) format('woff2')" + + css2 = CSS_URL_RE.sub(replace_match, css) + combined_css_parts.append(css2.strip()) + except Exception as e: + ok = False + errors.append(f"{family}: {e!s}") + + combined_css = "\n\n".join(combined_css_parts).strip() + try: + FONTS_CSS_PATH.write_text(combined_css, encoding="utf-8") + except OSError as e: + ok = False + errors.append(f"write fonts.css: {e!s}") + + atomic_write_json(FONTS_META_PATH, { + "version": 7, + "ok": ok, + "updated_at": int(time.time()), + "downloaded_files": downloaded, + "errors": errors + }) + + +FA_URL_RE = re.compile(r"url\(([^)]+)\)") + +def ensure_fontawesome_local() -> None: + """Download and cache Font Awesome locally for offline use.""" + meta = load_json_with_fallbacks(FA_META_PATH) or {} + if (FA_CSS_PATH.exists() + and isinstance(meta, dict) + and meta.get("ok") is True + and meta.get("version") == 3): + return + + ok = True + errors: List[str] = [] + downloaded: Dict[str, int] = {} + css_url = "https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.5.2/css/all.min.css" + + try: + css = _http_get_text(css_url, timeout=25) + except Exception as e: + ok = False + errors.append(f"download fa css: {e!s}") + css = "" + + def _clean_url(u: str) -> str: + """Normalize a URL from CSS to an absolute URL.""" + u = u.strip().strip("'\"") + if u.startswith("data:"): + return u + if u.startswith("//"): + return "https:" + u + if u.startswith(("http://", "https://")): + return u + return "https://cdnjs.cloudflare.com/ajax/libs/font-awesome/6.5.2/webfonts/" + u.lstrip("./").replace("../", "") + + def repl(m: re.Match) -> str: + nonlocal ok + raw = m.group(1) + u = _clean_url(raw) + if u.startswith("data:"): + return m.group(0) + fn = Path(urlparse(u).path).name + if not fn: + return m.group(0) + local_path = FA_WEBFONTS_DIR / fn + if not local_path.exists(): + try: + data = _http_get_bytes(u, timeout=30) + local_path.write_bytes(data) + downloaded[fn] = len(data) + except Exception as e: + ok = False + errors.append(f"download {fn}: {e!s}") + return m.group(0) + return f"url(/fa/webfonts/{fn})" + + css2 = FA_URL_RE.sub(repl, css) + try: + FA_CSS_PATH.write_text(css2, encoding="utf-8") + except OSError as e: + ok = False + errors.append(f"write fa.css: {e!s}") + + atomic_write_json(FA_META_PATH, { + "version": 3, + "ok": ok, + "updated_at": int(time.time()), + "downloaded_files": downloaded, + "errors": errors, + "source_css": css_url + }) + + +# ============================================================================= +# File and String Processing +# ============================================================================= + +_NUM_SPLIT_RE = re.compile(r"(\d+)") + + +def natural_key(s: str) -> List[Any]: + """ + Generate a sort key for natural sorting (e.g., '2' before '10'). + + Splits string into numeric and non-numeric parts, converting numeric + parts to integers for proper ordering. + """ + parts = _NUM_SPLIT_RE.split(s) + return [int(p) if p.isdigit() else p.casefold() for p in parts] + + +_LEADING_INDEX_RE = re.compile(r"^\s*(?:\(?\s*)?(?P\d+)(?:\s*[.\-_]\s*\d+)*(?:\s*[.)\]-]\s*|\s+)") + +# Words that should remain lowercase in title case (except at start) +_SMALL_WORDS: set[str] = { + "a", "an", "the", "and", "or", "but", "for", "nor", "as", "at", + "by", "in", "of", "on", "per", "to", "vs", "via", "with", "into", "from" +} + + +def _smart_title_case(text: str) -> str: + """Convert text to title case, keeping small words lowercase (except at start).""" + words = re.split(r"(\s+)", text.strip()) + out: List[str] = [] + for i, w in enumerate(words): + if w.isspace(): + out.append(w) + continue + # Preserve words with digits or all-caps acronyms + if any(ch.isdigit() for ch in w) or w.isupper(): + out.append(w) + continue + lw = w.lower() + # First word always capitalized, small words stay lowercase elsewhere + if i != 0 and lw in _SMALL_WORDS: + out.append(lw) + else: + out.append(lw[:1].upper() + lw[1:]) + return "".join(out).strip() + +def pretty_title_from_filename(filename: str) -> str: + """ + Convert a filename to a human-readable title. + + Removes leading indices, underscores, and applies smart title case. + Example: '01_introduction_to_python.mp4' -> 'Introduction to Python' + """ + base = Path(filename).stem + # Replace underscores with spaces + base = re.sub(r"[_]+", " ", base) + base = re.sub(r"\s+", " ", base).strip() + # Remove leading index numbers (e.g., "01.", "1)", "(2)") + m = _LEADING_INDEX_RE.match(base) + if m: + base = base[m.end():].strip() + # Remove leading punctuation + base = re.sub(r"^\s*[---:.\)]\s*", "", base).strip() + base = re.sub(r"\s+", " ", base).strip() + # Fall back to original stem if nothing left + if not base: + base = Path(filename).stem + return _smart_title_case(base) + + +def _windows_where(exe_name: str) -> Optional[str]: + """Find an executable using Windows 'where' command.""" + try: + out = subprocess.check_output( + ["where", exe_name], + stderr=subprocess.STDOUT, + text=True, + **_subprocess_no_window_kwargs() + ) + for line in out.splitlines(): + line = line.strip() + if line and Path(line).exists(): + return line + except (subprocess.SubprocessError, OSError): + pass + return None + + +def find_ffprobe_ffmpeg() -> Tuple[Optional[str], Optional[str]]: + """ + Locate ffprobe and ffmpeg executables. + + Searches in PATH, then Windows 'where', then local app directory. + """ + ffprobe = shutil.which("ffprobe") + ffmpeg = shutil.which("ffmpeg") + + # Try Windows 'where' command + if os.name == "nt": + if not ffprobe: + ffprobe = _windows_where("ffprobe.exe") or _windows_where("ffprobe") + if not ffmpeg: + ffmpeg = _windows_where("ffmpeg.exe") or _windows_where("ffmpeg") + + # Check for local copies in app directory + if not ffprobe: + exe_name = "ffprobe.exe" if os.name == "nt" else "ffprobe" + cand = APP_DIR / exe_name + if cand.exists(): + ffprobe = str(cand) + if not ffmpeg: + exe_name = "ffmpeg.exe" if os.name == "nt" else "ffmpeg" + cand = APP_DIR / exe_name + if cand.exists(): + ffmpeg = str(cand) + + return ffprobe, ffmpeg + + +def duration_seconds( + path: Path, + ffprobe_path: Optional[str], + ffmpeg_path: Optional[str] +) -> Optional[float]: + """Get video duration in seconds using ffprobe or ffmpeg.""" + # Try ffprobe first (more reliable) + if ffprobe_path: + try: + cmd = [ + ffprobe_path, "-v", "error", + "-show_entries", "format=duration", + "-of", "default=nw=1:nk=1", + str(path) + ] + out = subprocess.check_output( + cmd, + stderr=subprocess.STDOUT, + text=True, + timeout=20, + **_subprocess_no_window_kwargs() + ).strip() + if out: + d = float(out) + if d > 0: + return d + except (subprocess.SubprocessError, ValueError, OSError): + pass + + # Fall back to parsing ffmpeg stderr + if ffmpeg_path: + try: + p = subprocess.run( + [ffmpeg_path, "-hide_banner", "-i", str(path)], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + timeout=20, + **_subprocess_no_window_kwargs() + ) + err = p.stderr or "" + marker = "Duration:" + idx = err.find(marker) + if idx != -1: + s = err[idx + len(marker):].strip() + timepart = s.split(",")[0].strip() + hh, mm, ss = timepart.split(":") + total = int(hh) * 3600 + int(mm) * 60 + float(ss) + if total > 0: + return float(total) + except (subprocess.SubprocessError, ValueError, OSError): + pass + + return None + + +def ffprobe_video_metadata(path: Path, ffprobe_path: Optional[str]) -> Optional[Dict[str, Any]]: + """Extract detailed video metadata using ffprobe, including subtitle tracks.""" + if not ffprobe_path: + return None + + try: + cmd = [ + ffprobe_path, "-v", "error", + "-print_format", "json", + "-show_streams", "-show_format", + str(path) + ] + out = subprocess.check_output( + cmd, + stderr=subprocess.STDOUT, + text=True, + timeout=25, + **_subprocess_no_window_kwargs() + ) + data = json.loads(out) + streams = data.get("streams", []) + fmt = data.get("format", {}) + + if not isinstance(streams, list): + streams = [] + + meta: Dict[str, Any] = {} + video_stream: Optional[Dict] = None + audio_stream: Optional[Dict] = None + subtitle_tracks: List[Dict[str, Any]] = [] + + # Find first video/audio streams and all subtitle streams + for idx, s in enumerate(streams): + if not isinstance(s, dict): + continue + codec_type = s.get("codec_type") + if codec_type == "video" and video_stream is None: + video_stream = s + elif codec_type == "audio" and audio_stream is None: + audio_stream = s + elif codec_type == "subtitle": + # Extract subtitle track info + tags = s.get("tags", {}) + if not isinstance(tags, dict): + tags = {} + sub_info = { + "index": s.get("index", idx), + "codec": s.get("codec_name", "unknown"), + "language": tags.get("language", tags.get("LANGUAGE", "")), + "title": tags.get("title", tags.get("TITLE", "")), + } + subtitle_tracks.append(sub_info) + + # Extract video metadata + if video_stream: + meta["v_codec"] = video_stream.get("codec_name") + meta["width"] = video_stream.get("width") + meta["height"] = video_stream.get("height") + + # Parse frame rate + frame_rate = video_stream.get("r_frame_rate") or video_stream.get("avg_frame_rate") + if isinstance(frame_rate, str) and "/" in frame_rate: + try: + num, den = frame_rate.split("/", 1) + fps = float(num) / float(den) + if fps > 0: + meta["fps"] = fps + except (ValueError, ZeroDivisionError): + pass + + # Video bitrate + vb = video_stream.get("bit_rate") + if vb is not None: + try: + meta["v_bitrate"] = int(vb) + except ValueError: + pass + + # Pixel format and color info + meta["pix_fmt"] = video_stream.get("pix_fmt") + meta["color_space"] = video_stream.get("color_space") + + # Extract audio metadata + if audio_stream: + meta["a_codec"] = audio_stream.get("codec_name") + meta["channels"] = audio_stream.get("channels") + meta["sample_rate"] = audio_stream.get("sample_rate") + ab = audio_stream.get("bit_rate") + if ab is not None: + try: + meta["a_bitrate"] = int(ab) + except ValueError: + pass + + # Add subtitle tracks + if subtitle_tracks: + meta["subtitle_tracks"] = subtitle_tracks + + # Extract format metadata + if isinstance(fmt, dict): + if fmt.get("bit_rate") is not None: + try: + meta["container_bitrate"] = int(fmt.get("bit_rate")) + except ValueError: + pass + if fmt.get("duration") is not None: + try: + meta["duration"] = float(fmt.get("duration")) + except ValueError: + pass + meta["format_name"] = fmt.get("format_name") + + # Extract container tags (title, encoder, etc) + ftags = fmt.get("tags", {}) + if isinstance(ftags, dict): + if ftags.get("title"): + meta["container_title"] = ftags.get("title") + if ftags.get("encoder"): + meta["encoder"] = ftags.get("encoder") + + return meta if meta else None + except (subprocess.SubprocessError, json.JSONDecodeError, OSError): + return None + +def file_fingerprint(path: Path) -> str: + """ + Generate a content-based fingerprint that survives file renames/moves. + + Uses sha256 of: file size + first 256KB + last 256KB + This allows identifying the same video even if renamed or moved. + """ + CHUNK_SIZE = 256 * 1024 # 256KB + + try: + size = path.stat().st_size + except OSError: + size = 0 + + h = hashlib.sha256() + h.update(b"VIDFIDv1\0") + h.update(str(size).encode("ascii", errors="ignore")) + h.update(b"\0") + + try: + with path.open("rb") as f: + # Read head + head = f.read(CHUNK_SIZE) + h.update(head) + # Read tail if file is large enough + if size > CHUNK_SIZE: + try: + f.seek(max(0, size - CHUNK_SIZE)) + tail = f.read(CHUNK_SIZE) + h.update(tail) + except OSError: + pass + except OSError: + pass + + return h.hexdigest()[:20] + + +def compute_library_id_from_fids(fids: List[str]) -> str: + """ + Compute a stable library ID from a list of file fingerprints. + + The same set of files will always produce the same library ID, + regardless of file order. + """ + # Filter and sort for deterministic output + valid_fids = sorted(fid for fid in fids if isinstance(fid, str) and fid) + + h = hashlib.sha256() + h.update(b"LIBFIDv2\0") + for fid in valid_fids: + h.update(fid.encode("ascii", errors="ignore")) + h.update(b"\n") + return h.hexdigest()[:16] + +def srt_to_vtt_bytes(srt_text: str) -> bytes: + """ + Convert SRT subtitle format to WebVTT format. + + Handles BOM removal and timestamp format conversion (comma -> dot). + """ + # Remove BOM if present + text = srt_text.replace("\ufeff", "") + lines = text.splitlines() + out: List[str] = ["WEBVTT", ""] + + i = 0 + while i < len(lines): + line = lines[i].strip("\r\n") + + # Skip empty lines + if not line.strip(): + out.append("") + i += 1 + continue + + # Skip cue index numbers (e.g., "1", "2", "3") + if re.fullmatch(r"\d+", line.strip()): + i += 1 + if i >= len(lines): + break + line = lines[i].strip("\r\n") + + # Process timestamp lines + if "-->" in line: + # Convert SRT timestamp format (comma) to VTT format (dot) + line = line.replace(",", ".") + out.append(line) + i += 1 + + # Collect subtitle text until empty line + while i < len(lines): + t = lines[i].rstrip("\r\n") + if not t.strip(): + out.append("") + i += 1 + break + out.append(t) + i += 1 + else: + i += 1 + + return ("\n".join(out).strip() + "\n").encode("utf-8") + + +# ============================================================================= +# State Management Classes +# ============================================================================= + + +class Prefs: + """Thread-safe preferences manager with persistent storage.""" + + DEFAULT_PREFS: Dict[str, Any] = { + "version": 19, + "ui_zoom": 1.0, + "split_ratio": 0.62, + "dock_ratio": 0.62, + "always_on_top": False, + "window": {"width": 1320, "height": 860, "x": None, "y": None}, + "last_folder_path": None, + "last_library_id": None, + } + + def __init__(self) -> None: + self.lock = threading.Lock() + self.data: Dict[str, Any] = dict(self.DEFAULT_PREFS) + self.data["window"] = dict(self.DEFAULT_PREFS["window"]) + + # Load saved preferences + loaded = load_json_with_fallbacks(PREFS_PATH) + if isinstance(loaded, dict): + # Handle window dict specially to merge rather than replace + win = loaded.get("window") + if isinstance(win, dict): + self.data["window"].update(win) + # Update other preferences + for k, v in loaded.items(): + if k != "window": + self.data[k] = v + + def get(self) -> Dict[str, Any]: + """Get a deep copy of current preferences.""" + with self.lock: + # Deep copy via JSON round-trip + return json.loads(json.dumps(self.data)) + + def update(self, patch: Dict[str, Any]) -> None: + """Update preferences with a partial dict and save to disk.""" + with self.lock: + # Handle window dict specially + if "window" in patch and isinstance(patch["window"], dict): + self.data.setdefault("window", {}) + if not isinstance(self.data["window"], dict): + self.data["window"] = {} + self.data["window"].update(patch["window"]) + patch = {k: v for k, v in patch.items() if k != "window"} + + self.data.update(patch) + self.data["updated_at"] = int(time.time()) + atomic_write_json(PREFS_PATH, self.data) + +PREFS = Prefs() + + +class Library: + """ + Manages a video library with progress tracking, ordering, and metadata. + + Thread-safe for concurrent access from Flask routes and background scanners. + State is persisted to disk and survives application restarts. + """ + + def __init__(self) -> None: + # Library contents + self.root: Optional[Path] = None + self.files: List[Path] = [] + self.fids: List[str] = [] # File fingerprints + self.relpaths: List[str] = [] + self.rel_to_fid: Dict[str, str] = {} + self.fid_to_rel: Dict[str, str] = {} + + # Persistent state + self.state_path: Optional[Path] = None + self.state: Dict[str, Any] = {} + self.lock = threading.Lock() + + # Background duration scanner + self._scan_lock = threading.Lock() + self._scan_thread: Optional[threading.Thread] = None + self._scan_stop = False + + # Metadata cache + self._meta_cache: Dict[str, Dict[str, Any]] = {} # fid -> probe meta + self._meta_lock = threading.Lock() + self._ffprobe, self._ffmpeg = find_ffprobe_ffmpeg() + + @property + def ffprobe_found(self) -> bool: + """Check if ffprobe is available for metadata extraction.""" + return bool(self._ffprobe) + + def _sorted_default(self, relpaths: List[str]) -> List[str]: + """Sort relative paths using natural sort order.""" + return sorted(relpaths, key=natural_key) + + def _apply_saved_order( + self, + all_fids: List[str], + fid_to_rel: Dict[str, str], + order_fids: Optional[List[str]] + ) -> List[str]: + """Apply saved ordering to file IDs, handling additions and removals.""" + if not order_fids or not isinstance(order_fids, list): + # Default: sort by relative path using the provided fid_to_rel mapping + rel_to_fid_local = {v: k for k, v in fid_to_rel.items()} + return [ + rel_to_fid_local[rp] + for rp in self._sorted_default(list(fid_to_rel.values())) + if rp in rel_to_fid_local + ] + + existing = set(all_fids) + ordered: List[str] = [] + used: set[str] = set() + + # Add files in saved order + for fid in order_fids: + if isinstance(fid, str) and fid in existing and fid not in used: + ordered.append(fid) + used.add(fid) + + # Append any new files not in saved order + remaining = [fid for fid in all_fids if fid not in used] + remaining.sort(key=lambda fid: natural_key(fid_to_rel.get(fid, fid))) + return ordered + remaining + + def set_root(self, folder: str) -> Dict[str, Any]: + """ + Set a new root folder for the library. + + Scans for video files, loads/creates state, and starts duration scanning. + Returns library info dict on success, raises ValueError for invalid folder. + """ + root = Path(folder).expanduser().resolve() + if not root.exists() or not root.is_dir(): + raise ValueError("Invalid folder") + + # Discover video files + all_files: List[Path] = [ + p for p in root.rglob("*") + if p.is_file() and p.suffix.lower() in VIDEO_EXTS + ] + + # Build relative paths and fingerprints + relpaths: List[str] = [] + fids: List[str] = [] + rel_to_fid: Dict[str, str] = {} + fid_to_rel: Dict[str, str] = {} + + for p in all_files: + try: + rel = str(p.relative_to(root)).replace("\\", "/") + except ValueError: + rel = p.name + fid = file_fingerprint(p) + relpaths.append(rel) + fids.append(fid) + rel_to_fid[rel] = fid + # On fingerprint collision, keep first (rare) + fid_to_rel.setdefault(fid, rel) + + library_id = compute_library_id_from_fids(fids) + state_path = STATE_DIR / f"library_{library_id}.json" + loaded = load_json_with_fallbacks(state_path) or {} + + # Create baseline state for new libraries + now = int(time.time()) + baseline: Dict[str, Any] = { + "version": 19, + "library_id": library_id, + "last_path": str(root), + "updated_at": now, + "current_fid": fids[0] if fids else None, + "current_time": 0.0, + "volume": 1.0, + "autoplay": True, # Default ON for new folders + "playback_rate": 1.0, + "order_fids": [], + "videos": {}, # fid -> video metadata + } + + # Merge with loaded state if same library + state = dict(baseline) + if isinstance(loaded, dict) and loaded.get("library_id") == library_id: + for key in baseline.keys(): + if key in loaded: + state[key] = loaded[key] + for key, value in loaded.items(): + if key not in state: + state[key] = value + + state["last_path"] = str(root) + + # Normalize video metadata + vids = state.get("videos", {}) + if not isinstance(vids, dict): + vids = {} + + normalized: Dict[str, Any] = {} + for fid in fids: + m = vids.get(fid) if isinstance(vids.get(fid), dict) else {} + watched = float(m.get("watched") or 0.0) + normalized[fid] = { + "pos": float(m.get("pos") if m.get("pos") is not None else watched), + "watched": watched, + "duration": m.get("duration"), + "finished": bool(m.get("finished")), # Once done, stays done + "note": str(m.get("note") or ""), + "last_open": int(m.get("last_open") or 0), + "subtitle": m.get("subtitle"), # {"vtt": "...", "label": "..."} + } + state["videos"] = normalized + + # Normalize settings + try: + state["volume"] = clamp(float(state.get("volume", 1.0)), 0.0, 1.0) + except (ValueError, TypeError): + state["volume"] = 1.0 + + state["autoplay"] = truthy(state.get("autoplay", True)) + + try: + state["playback_rate"] = clamp(float(state.get("playback_rate", 1.0)), 0.25, 3.0) + except (ValueError, TypeError): + state["playback_rate"] = 1.0 + + # Clean up order_fids to only include existing files + if not isinstance(state.get("order_fids"), list): + state["order_fids"] = [] + else: + fid_set = set(fids) + state["order_fids"] = [ + fid for fid in state["order_fids"] + if isinstance(fid, str) and fid in fid_set + ] + + ordered_fids = self._apply_saved_order(fids, fid_to_rel, state.get("order_fids")) + + # Build ordered file lists + ordered_relpaths = [ + fid_to_rel.get(fid) for fid in ordered_fids + ] + ordered_relpaths = [rp for rp in ordered_relpaths if isinstance(rp, str)] + ordered_files: List[Path] = [] + for rp in ordered_relpaths: + p = (root / rp) + if p.exists(): + ordered_files.append(p) + + # current fid + cur_fid = state.get("current_fid") + if not isinstance(cur_fid, str) or cur_fid not in set(ordered_fids): + cur_fid = ordered_fids[0] if ordered_fids else None + state["current_fid"] = cur_fid + + try: + state["current_time"] = max(0.0, float(state.get("current_time", 0.0))) + except Exception: + state["current_time"] = 0.0 + + with self.lock: + self.root = root + self.files = ordered_files + self.fids = ordered_fids + self.relpaths = ordered_relpaths + self.rel_to_fid = rel_to_fid + self.fid_to_rel = {fid: fid_to_rel.get(fid, fid) for fid in ordered_fids} + self.state_path = state_path + self.state = state + + self.save_state() + self.start_duration_scan() + + PREFS.update({"last_folder_path": str(root), "last_library_id": library_id}) + push_recent(str(root)) + return self.get_library_info() + + def save_state(self) -> None: + with self.lock: + if not self.state_path: return + self.state["updated_at"] = int(time.time()) + atomic_write_json(self.state_path, self.state) + + def _folder_stats(self, fids: List[str], state: Dict[str, Any]) -> Dict[str, Any]: + vids = state.get("videos", {}) + if not isinstance(vids, dict): vids = {} + + finished = 0 + remaining = 0 + total_dur = 0.0 + total_watch = 0.0 + known = 0 + + # simple "top folders" based on current relpaths (still useful) + top_counts: Dict[str, int] = {} + fid_to_rel = self.fid_to_rel or {} + + for fid in fids: + rp = fid_to_rel.get(fid, "") + parts = rp.split("/") + top = parts[0] if len(parts) > 1 else "(root)" + top_counts[top] = top_counts.get(top, 0) + 1 + + m = vids.get(fid) if isinstance(vids.get(fid), dict) else {} + d = m.get("duration", None) + w = float(m.get("watched") or 0.0) + fin = bool(m.get("finished") or False) + finished += 1 if fin else 0 + remaining += 0 if fin else 1 + if isinstance(d, (int, float)) and d and d > 0: + known += 1 + total_dur += float(d) + total_watch += min(w, float(d)) + + remaining_sec = 0.0 + for fid in fids: + m = vids.get(fid) if isinstance(vids.get(fid), dict) else {} + d = m.get("duration", None) + if isinstance(d, (int, float)) and d and d > 0: + w = float(m.get("watched") or 0.0) + remaining_sec += max(0.0, float(d) - min(w, float(d))) + + top_list = sorted(top_counts.items(), key=lambda kv: (-kv[1], natural_key(kv[0]))) + + return { + "finished_count": finished, + "remaining_count": remaining, + "remaining_seconds_known": remaining_sec if known > 0 else None, + "top_folders": top_list[:12], + "durations_known": known, + "overall_progress": (total_watch / total_dur) if total_dur > 0 else None, + } + + def _tree_flags(self, rels: List[str]) -> Dict[str, Dict[str, Any]]: + """ + Calculate tree display flags for each file. + + Limits depth to 1 level (immediate parent folder only) to avoid + overly deep nesting in the UI. + """ + # First, normalize paths to only consider immediate parent folder + normalized: List[str] = [] + for rp in rels: + parts = rp.split("/") + if len(parts) >= 2: + # Use only immediate parent + filename + normalized.append(parts[-2] + "/" + parts[-1]) + else: + # File in root + normalized.append(parts[-1]) + + last_index: Dict[str, int] = {} + first_index: Dict[str, int] = {} + + for i, rp in enumerate(normalized): + parts = rp.split("/") + last_index[""] = i + first_index.setdefault("", i) + for d in range(1, len(parts)): + folder = "/".join(parts[:d]) + last_index[folder] = i + if folder not in first_index: + first_index[folder] = i + + out: Dict[str, Dict[str, Any]] = {} + for i, rp in enumerate(rels): + norm_rp = normalized[i] + parts = norm_rp.split("/") + depth = len(parts) - 1 # Max 1 now + parent = norm_rp.rsplit("/", 1)[0] if "/" in norm_rp else "" + is_last = (last_index.get(parent, i) == i) + has_prev_in_parent = i > first_index.get(parent, i) + + pipes: List[bool] = [] + for j in range(depth): + folder = "/".join(parts[:j+1]) + pipes.append(last_index.get(folder, i) > i) + + out[rp] = { + "depth": depth, + "pipes": pipes, + "is_last": is_last, + "has_prev_in_parent": has_prev_in_parent + } + return out + + def _basic_file_meta(self, fid: str) -> Dict[str, Any]: + with self.lock: + root = self.root + fid_to_rel = dict(self.fid_to_rel) + if not root: return {} + rp = fid_to_rel.get(fid) + if not rp: return {} + try: + p = (root / rp).resolve() + if not is_within_root(root, p) or not p.exists(): return {} + st = p.stat() + ext = p.suffix.lower() + folder = str(Path(rp).parent).replace("\\", "/") + if folder == ".": folder = "(root)" + return {"ext": ext[1:] if ext.startswith(".") else ext, "size": st.st_size, "mtime": int(st.st_mtime), "folder": folder} + except Exception: + return {} + + def _get_cached_meta(self, fid: str) -> Dict[str, Any]: + with self._meta_lock: + return dict(self._meta_cache.get(fid, {})) + + def _set_cached_meta(self, fid: str, meta: Dict[str, Any]) -> None: + with self._meta_lock: + self._meta_cache[fid] = dict(meta) + + def _auto_subtitle_sidecar(self, video_path: Path) -> Optional[Path]: + """Find subtitle file with same name as video (case-insensitive, flexible matching). + + Handles patterns like: + - video.srt / video.vtt (exact match) + - video.en.srt / video.en.vtt (with language code) + - video.eng.srt / video.english.srt (other language patterns) + - Flexible matching: ignores dashes, underscores, extra spaces + """ + import re + + def normalize(s: str) -> str: + """Normalize string for flexible comparison.""" + s = s.lower() + # Remove common separators and normalize spaces + s = re.sub(r'[-_]+', ' ', s) + # Collapse multiple spaces + s = re.sub(r'\s+', ' ', s) + return s.strip() + + base_stem = video_path.stem + base_norm = normalize(base_stem) + parent = video_path.parent + + # Check all files in the same directory + try: + candidates = [] + for f in parent.iterdir(): + if not f.is_file(): + continue + ext = f.suffix.lower() + if ext not in [".vtt", ".srt"]: + continue + + f_stem = f.stem + f_norm = normalize(f_stem) + + # Exact match (video.srt) - case insensitive + if f_stem.lower() == base_stem.lower(): + candidates.append((0, f)) # Priority 0 = exact match + continue + + # Normalized exact match + if f_norm == base_norm: + candidates.append((1, f)) # Priority 1 = normalized exact + continue + + # Check if it starts with the video name and has a language suffix + # e.g., "video.en" matches "video", "video.eng" matches "video" + if f_stem.lower().startswith(base_stem.lower() + "."): + lang_part = f_stem[len(base_stem) + 1:].lower() + if lang_part in ["en", "eng", "english"]: + candidates.append((2, f)) # Priority 2 = English with exact base + else: + candidates.append((4, f)) # Priority 4 = other lang with exact base + continue + + # Normalized match with language suffix + # Remove potential language code from subtitle stem for comparison + f_stem_no_lang = re.sub(r'\.(en|eng|english|fr|de|es|it|pt|ru|ja|ko|zh)$', '', f_stem, flags=re.IGNORECASE) + f_norm_no_lang = normalize(f_stem_no_lang) + + if f_norm_no_lang == base_norm: + # Check if it had an English language code + lang_match = re.search(r'\.(en|eng|english)$', f_stem, flags=re.IGNORECASE) + if lang_match: + candidates.append((3, f)) # Priority 3 = English with normalized base + else: + candidates.append((5, f)) # Priority 5 = other/no lang with normalized base + + # Sort by priority and return best match + if candidates: + candidates.sort(key=lambda x: x[0]) + return candidates[0][1] + except Exception: + pass + return None + + def _store_subtitle_for_fid(self, fid: str, src_path: Path) -> Optional[Dict[str, Any]]: + try: + ext = src_path.suffix.lower() + if ext not in SUB_EXTS: + return None + + # create stable file name based on fid + subtitle source basename + name_part = re.sub(r"[^a-zA-Z0-9._-]+", "_", src_path.stem)[:60] or "subtitle" + out_name = f"{fid}_{name_part}.vtt" + out_path = (SUBS_DIR / out_name).resolve() + + if ext == ".vtt": + out_path.write_bytes(src_path.read_bytes()) + else: + data = src_path.read_text(encoding="utf-8", errors="replace") + out_path.write_bytes(srt_to_vtt_bytes(data)) + + rel = f"subtitles/{out_name}" + return {"vtt": rel, "label": src_path.name} + except Exception: + return None + + def get_subtitle_for_current(self) -> Dict[str, Any]: + with self.lock: + root = self.root + state = dict(self.state) + cur = state.get("current_fid") + vids = state.get("videos", {}) + files = list(self.files) if self.files else [] + fids = list(self.fids) if self.fids else [] + + if not root or not cur or not isinstance(vids, dict): + return {"ok": True, "has": False} + + # Check for already-stored subtitle + m = vids.get(cur) + if isinstance(m, dict) and isinstance(m.get("subtitle"), dict): + sub = m["subtitle"] + vtt = sub.get("vtt") + if isinstance(vtt, str): + p = (STATE_DIR / vtt).resolve() + if p.exists(): + return {"ok": True, "has": True, "url": f"/sub/{state.get('library_id')}/{cur}", "label": sub.get("label", "Subtitles")} + + # Get video path from files/fids + vp = None + try: + if cur in fids: + idx = fids.index(cur) + if idx < len(files): + vp = files[idx] + except (ValueError, IndexError): + pass + + if not vp or not vp.exists(): + return {"ok": True, "has": False} + + # 1. First try: sidecar subtitle file (case-insensitive) + sc = self._auto_subtitle_sidecar(vp) + if sc and sc.exists(): + # store it so it becomes persistent and portable (stored next to script) + stored = self._store_subtitle_for_fid(cur, sc) + if stored: + with self.lock: + mm = self.state["videos"].get(cur, {}) + if not isinstance(mm, dict): + mm = {} + mm["subtitle"] = stored + self.state["videos"][cur] = mm + self.save_state() + return {"ok": True, "has": True, "url": f"/sub/{state.get('library_id')}/{cur}", "label": stored.get("label", "Subtitles")} + + # 2. Second try: embedded subtitles (prefer English) + if self._ffprobe and self._ffmpeg: + try: + meta = ffprobe_video_metadata(vp, self._ffprobe) + if meta and "subtitle_tracks" in meta: + tracks = meta["subtitle_tracks"] + if tracks: + # Try to find English track first + english_track = None + first_track = None + for t in tracks: + if first_track is None: + first_track = t + lang = (t.get("language") or "").lower() + title = (t.get("title") or "").lower() + if "eng" in lang or "english" in title or lang == "en": + english_track = t + break + + # Use English if found, otherwise first track + chosen = english_track or first_track + if chosen: + track_idx = chosen.get("index") + if track_idx is not None: + # Extract the subtitle + result = self.extract_embedded_subtitle(track_idx) + if result.get("ok"): + return {"ok": True, "has": True, "url": result.get("url"), "label": result.get("label", "Subtitles")} + except Exception: + pass + + return {"ok": True, "has": False} + + def set_subtitle_for_current(self, file_path: str) -> Dict[str, Any]: + with self.lock: + root = self.root + cur = self.state.get("current_fid") + if not root or not cur: + return {"ok": False, "error": "No video loaded"} + p = Path(file_path).expanduser() + if not p.exists() or not p.is_file(): + return {"ok": False, "error": "Subtitle file not found"} + stored = self._store_subtitle_for_fid(cur, p) + if not stored: + return {"ok": False, "error": "Unsupported subtitle type (use .srt or .vtt)"} + with self.lock: + mm = self.state["videos"].get(cur, {}) + if not isinstance(mm, dict): + mm = {"pos": 0.0, "watched": 0.0, "duration": None, "finished": False, "note": "", "last_open": 0, "subtitle": None} + mm["subtitle"] = stored + self.state["videos"][cur] = mm + self.save_state() + return {"ok": True, "url": f"/sub/{self.state.get('library_id')}/{cur}", "label": stored.get("label", "Subtitles")} + + def get_embedded_subtitles(self) -> Dict[str, Any]: + """Get list of embedded subtitle tracks for current video.""" + with self.lock: + root = self.root + files = list(self.files) if self.files else [] + fids = list(self.fids) if self.fids else [] + state = dict(self.state) + + if not root or not files or not fids: + return {"ok": False, "tracks": []} + + cur = state.get("current_fid") + if not isinstance(cur, str) or cur not in set(fids): + return {"ok": False, "tracks": []} + + try: + idx = fids.index(cur) + video_path = files[idx] + except (ValueError, IndexError): + return {"ok": False, "tracks": []} + + # Use ffprobe to get subtitle tracks + meta = ffprobe_video_metadata(video_path, self._ffprobe) + if not meta or "subtitle_tracks" not in meta: + return {"ok": True, "tracks": []} + + return {"ok": True, "tracks": meta["subtitle_tracks"]} + + def extract_embedded_subtitle(self, track_index: int) -> Dict[str, Any]: + """Extract an embedded subtitle track using ffmpeg.""" + with self.lock: + root = self.root + files = list(self.files) if self.files else [] + fids = list(self.fids) if self.fids else [] + cur = self.state.get("current_fid") + library_id = self.state.get("library_id") + + if not root or not files or not fids or not cur: + return {"ok": False, "error": "No video loaded"} + + if not self._ffmpeg: + return {"ok": False, "error": "ffmpeg not found"} + + try: + idx = fids.index(cur) + video_path = files[idx] + except (ValueError, IndexError): + return {"ok": False, "error": "Video not found"} + + # Extract subtitle to VTT format + SUBS_DIR.mkdir(parents=True, exist_ok=True) + out_name = f"{cur}_embedded_{track_index}.vtt" + out_path = SUBS_DIR / out_name + + try: + cmd = [ + self._ffmpeg, "-y", "-i", str(video_path), + "-map", f"0:{track_index}", + "-c:s", "webvtt", + str(out_path) + ] + subprocess.run( + cmd, + capture_output=True, + timeout=60, + **_subprocess_no_window_kwargs() + ) + + if not out_path.exists(): + return {"ok": False, "error": "Failed to extract subtitle"} + + # Get track label + meta = ffprobe_video_metadata(video_path, self._ffprobe) + label = "Embedded" + if meta and "subtitle_tracks" in meta: + for t in meta["subtitle_tracks"]: + if t.get("index") == track_index: + lang = t.get("language", "") + title = t.get("title", "") + if title: + label = title + elif lang: + label = lang.upper() + break + + # Store in state + stored = {"vtt": f"subtitles/{out_name}", "label": label} + with self.lock: + mm = self.state["videos"].get(cur, {}) + if not isinstance(mm, dict): + mm = {} + mm["subtitle"] = stored + self.state["videos"][cur] = mm + self.save_state() + + return {"ok": True, "url": f"/sub/{library_id}/{cur}", "label": label} + except Exception as e: + return {"ok": False, "error": str(e)} + + def get_available_subtitles(self) -> Dict[str, Any]: + """Get all available subtitle options (sidecar files + embedded tracks).""" + import re + + with self.lock: + root = self.root + files = list(self.files) if self.files else [] + fids = list(self.fids) if self.fids else [] + state = dict(self.state) + + cur = state.get("current_fid") + if not root or not files or not fids or not cur: + return {"ok": False, "sidecar": [], "embedded": []} + + try: + idx = fids.index(cur) + video_path = files[idx] + except (ValueError, IndexError): + return {"ok": False, "sidecar": [], "embedded": []} + + sidecar_subs = [] + embedded_subs = [] + + # Find all sidecar subtitle files + def normalize(s: str) -> str: + s = s.lower() + s = re.sub(r'[-_]+', ' ', s) + s = re.sub(r'\s+', ' ', s) + return s.strip() + + base_stem = video_path.stem + base_norm = normalize(base_stem) + parent = video_path.parent + + seen_labels = set() # To dedupe vtt/srt with same name + + try: + for f in parent.iterdir(): + if not f.is_file(): + continue + ext = f.suffix.lower() + if ext not in [".vtt", ".srt"]: + continue + + f_stem = f.stem + f_norm = normalize(f_stem) + + # Check if this subtitle matches the video + is_match = False + + # Exact match + if f_stem.lower() == base_stem.lower(): + is_match = True + # Normalized match + elif f_norm == base_norm: + is_match = True + # With language suffix (exact base) + elif f_stem.lower().startswith(base_stem.lower() + "."): + is_match = True + # With language suffix (normalized base) + else: + f_stem_no_lang = re.sub(r'\.(en|eng|english|fr|fra|french|de|deu|german|es|spa|spanish|it|ita|italian|pt|por|portuguese|ru|rus|russian|ja|jpn|japanese|ko|kor|korean|zh|chi|chinese|nl|nld|dutch|pl|pol|polish|sv|swe|swedish|ar|ara|arabic)$', '', f_stem, flags=re.IGNORECASE) + if normalize(f_stem_no_lang) == base_norm: + is_match = True + + if is_match: + # Extract language from filename + lang_match = re.search(r'\.([a-z]{2,3})$', f_stem, flags=re.IGNORECASE) + if lang_match: + lang = lang_match.group(1).upper() + # Map common codes to full names + lang_map = {"EN": "English", "ENG": "English", "FR": "French", "FRA": "French", + "DE": "German", "DEU": "German", "ES": "Spanish", "SPA": "Spanish", + "IT": "Italian", "ITA": "Italian", "PT": "Portuguese", "POR": "Portuguese", + "RU": "Russian", "RUS": "Russian", "JA": "Japanese", "JPN": "Japanese", + "KO": "Korean", "KOR": "Korean", "ZH": "Chinese", "CHI": "Chinese", + "NL": "Dutch", "NLD": "Dutch", "PL": "Polish", "POL": "Polish", + "SV": "Swedish", "SWE": "Swedish", "AR": "Arabic", "ARA": "Arabic"} + label = lang_map.get(lang, lang) + else: + label = "Subtitles" + + # Dedupe: if we already have this label, skip (prefer vtt over srt) + dedupe_key = label.lower() + if dedupe_key in seen_labels: + continue + seen_labels.add(dedupe_key) + + sidecar_subs.append({ + "path": str(f), + "label": label, + "format": ext[1:].upper() + }) + except Exception: + pass + + # Sort sidecar subs: English first, then alphabetically + def sub_sort_key(s): + label = s.get("label", "").lower() + if "english" in label or label == "en": + return (0, label) + return (1, label) + sidecar_subs.sort(key=sub_sort_key) + + # Get embedded subtitles + if self._ffprobe: + try: + meta = ffprobe_video_metadata(video_path, self._ffprobe) + if meta and "subtitle_tracks" in meta: + for track in meta["subtitle_tracks"]: + lang = track.get("language", "") + title = track.get("title", "") + if title: + label = title + elif lang: + label = lang.upper() + else: + label = f"Track {track.get('index', '?')}" + + embedded_subs.append({ + "index": track.get("index"), + "label": label, + "codec": track.get("codec", ""), + "language": lang + }) + except Exception: + pass + + return {"ok": True, "sidecar": sidecar_subs, "embedded": embedded_subs} + + def load_sidecar_subtitle(self, file_path: str) -> Dict[str, Any]: + """Load a specific sidecar subtitle file.""" + with self.lock: + cur = self.state.get("current_fid") + library_id = self.state.get("library_id") + + if not cur: + return {"ok": False, "error": "No video loaded"} + + p = Path(file_path) + if not p.exists() or not p.is_file(): + return {"ok": False, "error": "Subtitle file not found"} + + stored = self._store_subtitle_for_fid(cur, p) + if not stored: + return {"ok": False, "error": "Unsupported subtitle type"} + + with self.lock: + mm = self.state["videos"].get(cur, {}) + if not isinstance(mm, dict): + mm = {} + mm["subtitle"] = stored + self.state["videos"][cur] = mm + self.save_state() + + return {"ok": True, "url": f"/sub/{library_id}/{cur}", "label": stored.get("label", "Subtitles")} + + def reset_watch_progress(self) -> Dict[str, Any]: + # Reset only progress fields; keep notes, duration, subtitles, volume, rate, etc. + with self.lock: + vids = self.state.get("videos", {}) + if not isinstance(vids, dict): return {"ok": False, "error": "No library"} + for fid, m in list(vids.items()): + if not isinstance(m, dict): continue + m["pos"] = 0.0 + m["watched"] = 0.0 + m["finished"] = False + vids[fid] = m + self.state["videos"] = vids + self.state["current_time"] = 0.0 + self.save_state() + return {"ok": True} + + def get_current_video_metadata(self) -> Dict[str, Any]: + with self.lock: + root = self.root + files = list(self.files) if self.files else [] + fids = list(self.fids) if self.fids else [] + state = dict(self.state) + + if not root or not files or not fids: + return {"ok": False, "error": "No library", "ffprobe_found": self.ffprobe_found} + + cur = state.get("current_fid") + if not isinstance(cur, str) or cur not in set(fids): + cur = fids[0] + + idx = fids.index(cur) + p = files[idx] + basic = self._basic_file_meta(cur) + cached = self._get_cached_meta(cur) + + out = {"ok": True, "fid": cur, "basic": basic, "probe": cached or None, "ffprobe_found": self.ffprobe_found} + if cached: + return out + + probe = ffprobe_video_metadata(p, self._ffprobe) + if probe: + self._set_cached_meta(cur, probe) + out["probe"] = probe + else: + out["probe"] = None + return out + + def get_library_info(self) -> Dict[str, Any]: + with self.lock: + root = self.root + files = list(self.files) + fids = list(self.fids) + rels = list(self.relpaths) + state = dict(self.state) + + if not root: + return {"ok": False, "error": "No folder selected"} + + has_subdirs = any("/" in rp for rp in rels) + tree = self._tree_flags(rels) if has_subdirs else {} + vids = state.get("videos", {}) + if not isinstance(vids, dict): vids = {} + + items = [] + for i, (fid, f) in enumerate(zip(fids, files)): + rel = rels[i] if i < len(rels) else str(f.relative_to(root)).replace("\\", "/") + meta = vids.get(fid, {"pos": 0.0, "watched": 0.0, "duration": None, "finished": False, "note": "", "last_open": 0, "subtitle": None}) + nice = pretty_title_from_filename(f.name) + tf = tree.get(rel, {"depth": 0, "pipes": [], "is_last": False, "has_prev_in_parent": False}) + items.append({ + "index": i, + "fid": fid, + "name": f.name, + "title": nice, + "relpath": rel, + + "depth": int(tf.get("depth", 0)), + "pipes": tf.get("pipes", []), + "is_last": bool(tf.get("is_last", False)), + "has_prev_in_parent": bool(tf.get("has_prev_in_parent", False)), + + "pos": float(meta.get("pos") or 0.0), + "watched": float(meta.get("watched") or 0.0), + "duration": meta.get("duration"), + "finished": bool(meta.get("finished") or False), + "note_len": len(str(meta.get("note") or "")), + "last_open": int(meta.get("last_open") or 0), + "has_sub": isinstance(meta.get("subtitle"), dict), + }) + + stats = self._folder_stats(fids, state) + + cur_fid = state.get("current_fid") + cur_idx = fids.index(cur_fid) if isinstance(cur_fid, str) and cur_fid in fids else 0 + + next_up = None + for j in range(cur_idx + 1, len(items)): + if not items[j]["finished"]: + next_up = items[j]; break + + return { + "ok": True, + "folder": str(root), + "library_id": state.get("library_id"), + "count": len(files), + "current_index": cur_idx, + "current_fid": fids[cur_idx] if fids else None, + "current_time": float(state.get("current_time", 0.0)), + "folder_volume": float(state.get("volume", 1.0)), + "folder_autoplay": bool(state.get("autoplay", True)), + "folder_rate": float(state.get("playback_rate", 1.0)), + "items": items, + "has_subdirs": has_subdirs, + "overall_progress": stats["overall_progress"], + "durations_known": stats["durations_known"], + "finished_count": stats["finished_count"], + "remaining_count": stats["remaining_count"], + "remaining_seconds_known": stats["remaining_seconds_known"], + "top_folders": stats["top_folders"], + "next_up": {"index": next_up["index"], "title": next_up["title"]} if next_up else None, + } + + def update_progress(self, index: int, current_time: float, duration: Optional[float], playing: bool) -> Dict[str, Any]: + now = int(time.time()) + with self.lock: + if not self.root or not self.files or not self.fids: + return {"ok": False, "error": "No library"} + + index = max(0, min(int(index), len(self.files) - 1)) + fid = self.fids[index] + current_time = max(0.0, float(current_time or 0.0)) + if duration is not None: + try: duration = float(duration) + except Exception: duration = None + + self.state["current_fid"] = fid + self.state["current_time"] = current_time + + meta = self.state["videos"].get(fid, {"pos": 0.0, "watched": 0.0, "duration": None, "finished": False, "note": "", "last_open": 0, "subtitle": None}) + + meta["pos"] = current_time + meta["watched"] = max(float(meta.get("watched") or 0.0), current_time) + if isinstance(duration, (int, float)) and duration and duration > 0: + meta["duration"] = duration + + # once finished, always finished (do NOT flip it back) + d2 = meta.get("duration") + if isinstance(d2, (int, float)) and d2 and d2 > 0: + meta["finished"] = bool(meta.get("finished") or (current_time >= max(0.0, float(d2) - 2.0))) + + if playing: + meta["last_open"] = now + + self.state["videos"][fid] = meta + + self.save_state() + return {"ok": True} + + def set_current(self, index: int, timecode: float = 0.0) -> Dict[str, Any]: + with self.lock: + if not self.root or not self.files or not self.fids: + return {"ok": False, "error": "No library"} + index = max(0, min(int(index), len(self.files) - 1)) + self.state["current_fid"] = self.fids[index] + self.state["current_time"] = max(0.0, float(timecode or 0.0)) + self.save_state() + return {"ok": True} + + def set_folder_volume(self, volume: float) -> Dict[str, Any]: + with self.lock: + if not self.root: return {"ok": False, "error": "No library"} + try: v = float(volume) + except Exception: v = 1.0 + self.state["volume"] = clamp(v, 0.0, 1.0) + self.save_state() + return {"ok": True} + + def set_folder_autoplay(self, enabled: bool) -> Dict[str, Any]: + with self.lock: + if not self.root: return {"ok": False, "error": "No library"} + self.state["autoplay"] = bool(enabled) + self.save_state() + return {"ok": True} + + def set_folder_rate(self, rate: float) -> Dict[str, Any]: + with self.lock: + if not self.root: return {"ok": False, "error": "No library"} + try: r = float(rate) + except Exception: r = 1.0 + self.state["playback_rate"] = clamp(r, 0.25, 3.0) + self.save_state() + return {"ok": True} + + def set_order(self, fids: List[str]) -> Dict[str, Any]: + with self.lock: + if not self.root or not self.files or not self.fids: + return {"ok": False, "error": "No library"} + if not isinstance(fids, list) or not all(isinstance(x, str) for x in fids): + return {"ok": False, "error": "order must be a list of fids"} + + existing = set(self.fids) + seen, cleaned = set(), [] + for fid in fids: + if fid in existing and fid not in seen: + cleaned.append(fid); seen.add(fid) + + # append remaining + remaining = [fid for fid in self.fids if fid not in seen] + remaining.sort(key=lambda fid: natural_key(self.fid_to_rel.get(fid, fid))) + new_fids = cleaned + remaining + + # rebuild lists + fid_to_rel = dict(self.fid_to_rel) + rel_to_file = {str(p.relative_to(self.root)).replace("\\", "/"): p for p in self.root.rglob("*") if p.is_file() and p.suffix.lower() in VIDEO_EXTS} + new_rels = [fid_to_rel.get(fid) for fid in new_fids] + new_rels = [rp for rp in new_rels if isinstance(rp, str)] + new_files = [rel_to_file[rp] for rp in new_rels if rp in rel_to_file] + + cur = self.state.get("current_fid") + if not isinstance(cur, str) or cur not in set(new_fids): + cur = new_fids[0] if new_fids else None + + self.fids = new_fids + self.relpaths = new_rels + self.files = new_files + self.state["order_fids"] = cleaned + self.state["current_fid"] = cur + + self.save_state() + return {"ok": True} + + def get_video_path(self, index: int) -> Path: + with self.lock: + if not self.root or not self.files: + raise FileNotFoundError("No library") + index = max(0, min(int(index), len(self.files) - 1)) + p, root = self.files[index], self.root + if not is_within_root(root, p): + raise PermissionError("Invalid path") + return p + + def get_note(self, fid: str) -> str: + with self.lock: + vids = self.state.get("videos", {}) + if not isinstance(vids, dict): return "" + meta = vids.get(fid) + if not isinstance(meta, dict): return "" + return str(meta.get("note") or "") + + def set_note(self, fid: str, note: str) -> Dict[str, Any]: + note = str(note or "") + with self.lock: + if not self.root: return {"ok": False, "error": "No library"} + vids = self.state.get("videos", {}) + if not isinstance(vids, dict): + vids = {}; self.state["videos"] = vids + meta = vids.get(fid) + if not isinstance(meta, dict): + meta = {"pos": 0.0, "watched": 0.0, "duration": None, "finished": False, "note": "", "last_open": 0, "subtitle": None} + meta["note"] = note + vids[fid] = meta + self.save_state() + return {"ok": True, "len": len(note)} + + def start_duration_scan(self) -> None: + with self._scan_lock: + if self._scan_thread and self._scan_thread.is_alive(): + return + self._scan_stop = False + t = threading.Thread(target=self._duration_scan_worker, daemon=True) + self._scan_thread = t + t.start() + + def _duration_scan_worker(self) -> None: + ffprobe, ffmpeg = self._ffprobe, self._ffmpeg + if not ffprobe and not ffmpeg: + return + + with self.lock: + if not self.root or not self.files or not self.fids: + return + root = self.root + files = list(self.files) + fids = list(self.fids) + + for fid, f in zip(fids, files): + if self._scan_stop: + return + + with self.lock: + if not self.root or str(self.root) != str(root): + return + meta = self.state["videos"].get(fid) + if not isinstance(meta, dict): + continue + d = meta.get("duration") + if isinstance(d, (int, float)) and d and d > 0: + continue + + dur = duration_seconds(f, ffprobe, ffmpeg) + if dur and dur > 0: + with self.lock: + if not self.root or str(self.root) != str(root): + return + m = self.state["videos"].get(fid, {"pos": 0.0, "watched": 0.0, "duration": None, "finished": False, "note": "", "last_open": 0, "subtitle": None}) + m["duration"] = float(dur) + # do NOT unset finished if it was already true + try: + m["finished"] = bool(m.get("finished") or (float(m.get("watched") or 0.0) >= max(0.0, float(dur) - 2.0))) + except Exception: + pass + self.state["videos"][fid] = m + self.save_state() + +LIB = Library() + +def _send_file_range(path: Path) -> Response: + file_size = path.stat().st_size + range_header = request.headers.get("Range", None) + + if not range_header: + def gen(): + with path.open("rb") as f: + while True: + chunk = f.read(1024 * 512) + if not chunk: break + yield chunk + resp = Response(gen(), status=200, mimetype="application/octet-stream") + resp.headers["Content-Length"] = str(file_size) + resp.headers["Accept-Ranges"] = "bytes" + resp.headers["Cache-Control"] = "no-store" + return resp + + try: + units, rng = range_header.split("=", 1) + if units.strip() != "bytes": raise ValueError + start_s, end_s = rng.split("-", 1) + start = int(start_s) if start_s else 0 + end = int(end_s) if end_s else (file_size - 1) + if start < 0 or end < start: raise ValueError + end = min(end, file_size - 1) + except Exception: + return abort(416) + + length = end - start + 1 + def gen(): + with path.open("rb") as f: + f.seek(start) + remaining = length + while remaining > 0: + chunk = f.read(min(1024 * 512, remaining)) + if not chunk: break + remaining -= len(chunk) + yield chunk + + resp = Response(gen(), status=206, mimetype="application/octet-stream") + resp.headers["Content-Range"] = f"bytes {start}-{end}/{file_size}" + resp.headers["Content-Length"] = str(length) + resp.headers["Accept-Ranges"] = "bytes" + resp.headers["Cache-Control"] = "no-store" + return resp + +@app.get("/") +def index(): + return Response(HTML, mimetype="text/html") + +@app.get("/fonts.css") +def fonts_css(): + if not FONTS_CSS_PATH.exists(): + return Response("", mimetype="text/css") + return Response(FONTS_CSS_PATH.read_text(encoding="utf-8", errors="replace"), mimetype="text/css") + +@app.get("/fonts/") +def fonts_file(filename: str): + path = (FONTS_DIR / filename).resolve() + if not path.exists() or not str(path).startswith(str(FONTS_DIR.resolve())): + abort(404) + mime, _ = mimetypes.guess_type(str(path)) + return send_file(str(path), mimetype=mime or "application/octet-stream", conditional=True, etag=True, max_age=0) + +@app.get("/fa.css") +def fa_css(): + if not FA_CSS_PATH.exists(): + return Response("", mimetype="text/css") + return Response(FA_CSS_PATH.read_text(encoding="utf-8", errors="replace"), mimetype="text/css") + +@app.get("/fa/webfonts/") +def fa_webfont(filename: str): + path = (FA_WEBFONTS_DIR / filename).resolve() + if not path.exists() or not str(path).startswith(str(FA_WEBFONTS_DIR.resolve())): + abort(404) + mime, _ = mimetypes.guess_type(str(path)) + return send_file(str(path), mimetype=mime or "application/octet-stream", conditional=True, etag=True, max_age=0) + +@app.get("/video/") +def video(index: int): + try: + p = LIB.get_video_path(index) + except Exception: + return abort(404) + + ext = p.suffix.lower() + mime_map = { + ".mp4": "video/mp4", + ".m4v": "video/mp4", + ".webm": "video/webm", + ".ogv": "video/ogg", + ".mov": "video/quicktime", + ".mkv": "video/x-matroska", + ".avi": "video/x-msvideo", + ".mpeg": "video/mpeg", + ".mpg": "video/mpeg", + ".m2ts": "video/mp2t", + ".mts": "video/mp2t", + } + resp = _send_file_range(p) + resp.mimetype = mime_map.get(ext, "application/octet-stream") + return resp + +@app.get("/sub//") +def subtitle(libid: str, fid: str): + # libid is informational; fid is the actual key we store subtitles under + with LIB.lock: + state = dict(LIB.state) + vids = state.get("videos", {}) + if not isinstance(vids, dict): + abort(404) + m = vids.get(fid) + if not isinstance(m, dict): + abort(404) + sub = m.get("subtitle") + if not isinstance(sub, dict): + abort(404) + rel = sub.get("vtt") + if not isinstance(rel, str): + abort(404) + p = (STATE_DIR / rel).resolve() + if not p.exists() or not str(p).startswith(str(SUBS_DIR.resolve().parent)): + abort(404) + return send_file(str(p), mimetype="text/vtt", conditional=True, etag=True, max_age=0) + +def run_flask(port: int): + import logging + logging.getLogger("werkzeug").setLevel(logging.ERROR) + app.run(host=HOST, port=port, debug=False, use_reloader=False, threaded=True) + +class API: + def select_folder(self) -> Dict[str, Any]: + win = webview.windows[0] + res = win.create_file_dialog(webview.FOLDER_DIALOG if not hasattr(webview, 'FileDialog') else webview.FileDialog.FOLDER, allow_multiple=False) + if not res: return {"ok": False, "cancelled": True} + folder = res[0] + try: return LIB.set_root(folder) + except Exception as e: return {"ok": False, "error": str(e)} + + def open_folder_path(self, folder: str) -> Dict[str, Any]: + try: return LIB.set_root(folder) + except Exception as e: return {"ok": False, "error": str(e)} + + def get_recents(self) -> Dict[str, Any]: + items = load_recents() + valid: List[str] = [] + for p in items: + try: + if Path(p).expanduser().exists() and Path(p).expanduser().is_dir(): + valid.append(p) + except Exception: + pass + # if anything invalid was removed, persist the cleaned list + if valid != items: + save_recents(valid) + out = [{"name": folder_display_name(p), "path": p} for p in valid] + return {"ok": True, "items": out} + + def remove_recent(self, path: str) -> Dict[str, Any]: + """Remove a specific folder from the recent folders list.""" + items = load_recents() + if path in items: + items.remove(path) + save_recents(items) + return {"ok": True} + + def get_library(self) -> Dict[str, Any]: + return LIB.get_library_info() + + def set_current(self, index: int, timecode: float = 0.0) -> Dict[str, Any]: + return LIB.set_current(index, timecode) + + def tick_progress(self, index: int, current_time: float, duration: Optional[float], playing: bool) -> Dict[str, Any]: + return LIB.update_progress(index, current_time, duration, playing) + + def set_folder_volume(self, volume: float) -> Dict[str, Any]: + return LIB.set_folder_volume(volume) + + def set_folder_autoplay(self, enabled: bool) -> Dict[str, Any]: + return LIB.set_folder_autoplay(bool(enabled)) + + def set_folder_rate(self, rate: float) -> Dict[str, Any]: + return LIB.set_folder_rate(float(rate)) + + def set_order(self, fids: List[str]) -> Dict[str, Any]: + return LIB.set_order(fids) + + def start_duration_scan(self) -> Dict[str, Any]: + LIB.start_duration_scan() + return {"ok": True} + + def get_prefs(self) -> Dict[str, Any]: + return {"ok": True, "prefs": PREFS.get()} + + def set_prefs(self, patch: Dict[str, Any]) -> Dict[str, Any]: + if not isinstance(patch, dict): return {"ok": False, "error": "patch must be an object"} + if "ui_zoom" in patch: + try: patch["ui_zoom"] = float(patch["ui_zoom"]) + except Exception: patch.pop("ui_zoom", None) + if "split_ratio" in patch: + try: patch["split_ratio"] = float(patch["split_ratio"]) + except Exception: patch.pop("split_ratio", None) + if "dock_ratio" in patch: + try: patch["dock_ratio"] = float(patch["dock_ratio"]) + except Exception: patch.pop("dock_ratio", None) + if "always_on_top" in patch: + patch["always_on_top"] = bool(patch["always_on_top"]) + PREFS.update(patch) + return {"ok": True} + + def set_always_on_top(self, enabled: bool) -> Dict[str, Any]: + enabled = bool(enabled) + PREFS.update({"always_on_top": enabled}) + try: + win = webview.windows[0] + if hasattr(win, "on_top"): + setattr(win, "on_top", enabled) + except Exception: + pass + return {"ok": True} + + def save_window_state(self) -> Dict[str, Any]: + try: + win = webview.windows[0] + w = getattr(win, "width", None) + h = getattr(win, "height", None) + x = getattr(win, "x", None) + y = getattr(win, "y", None) + patch: Dict[str, Any] = {"window": {}} + if isinstance(w, int) and w > 100: patch["window"]["width"] = w + if isinstance(h, int) and h > 100: patch["window"]["height"] = h + if isinstance(x, int): patch["window"]["x"] = x + if isinstance(y, int): patch["window"]["y"] = y + PREFS.update(patch) + return {"ok": True} + except Exception as e: + return {"ok": False, "error": str(e)} + + def get_note(self, fid: str) -> Dict[str, Any]: + if not isinstance(fid, str): return {"ok": False, "error": "bad fid"} + return {"ok": True, "note": LIB.get_note(fid)} + + def set_note(self, fid: str, note: str) -> Dict[str, Any]: + if not isinstance(fid, str): return {"ok": False, "error": "bad fid"} + return LIB.set_note(fid, note) + + def get_current_video_meta(self) -> Dict[str, Any]: + return LIB.get_current_video_metadata() + + def get_current_subtitle(self) -> Dict[str, Any]: + return LIB.get_subtitle_for_current() + + def get_embedded_subtitles(self) -> Dict[str, Any]: + """Get list of embedded subtitle tracks for current video.""" + return LIB.get_embedded_subtitles() + + def extract_embedded_subtitle(self, track_index: int) -> Dict[str, Any]: + """Extract an embedded subtitle track and return its URL.""" + return LIB.extract_embedded_subtitle(track_index) + + def get_available_subtitles(self) -> Dict[str, Any]: + """Get all available subtitle options (sidecar + embedded).""" + return LIB.get_available_subtitles() + + def load_sidecar_subtitle(self, file_path: str) -> Dict[str, Any]: + """Load a specific sidecar subtitle file.""" + return LIB.load_sidecar_subtitle(file_path) + + def choose_subtitle_file(self) -> Dict[str, Any]: + win = webview.windows[0] + dialog_type = webview.OPEN_DIALOG if not hasattr(webview, 'FileDialog') else webview.FileDialog.OPEN + res = win.create_file_dialog( + dialog_type, + allow_multiple=False, + file_types=("Subtitle files (*.srt;*.vtt)",) + ) + if not res: + return {"ok": False, "cancelled": True} + return LIB.set_subtitle_for_current(res[0]) + + def reset_watch_progress(self) -> Dict[str, Any]: + return LIB.reset_watch_progress() + +HTML = r""" + + + + +TutorialDock + + + + + +
+
+
+
+ +
+
TutorialDock
+
Watch local tutorials, resume instantly, and actually finish them.
+
+
+ +
+
+ + + +
+ +
+ +
+
+ + 100% + +
+
+ +
+ +
+
+ + +
+
+ +
+ +
+ + +
+
+
+ + + +
+
+
+
+
No video loaded
+
-
+
+ +
+
Overall
+
+
-
+
+
+ +
+ +
+
+ +
+
+
+ +
+
+
+ + + + + + +
+
+
00:00 / 00:00
+
+
+ +
+
+ + +
+ +
+ +
+
+
+
+ +
+
100%
+
+ +
+ + + + + + + + +
+ + +
+
+ + +
+
+ +
+
+
+
+ +
+
+ +
+
+
+
+
Notes
+
+
+ +
Saved
+
+
+
+ +
+
+
+ +
+
+
+
Info
+
+
+
+
Folder
-
+
Next up
-
+
Structure
-
+
+ +
+
Title
-
+
Relpath
-
+
Position
-
+
+ +
+
File
-
+
Video
-
+
Audio
-
+
Subtitles
-
+
+ +
+
Finished
-
+
Remaining
-
+
ETA
-
+
+ +
+
Volume
-
+
Speed
-
+
Durations
-
+
+ +
+
Top folders
-
+
+
+
+
+
+ +
+ +
+
+
+ +
+
+
Playlist
+
+
+
+
+
+
+ +
+
+
+
+ +
+
+
+
-
+
+
+ +
+ + + + +""" + +# ============================================================================= +# Application Entry Point +# ============================================================================= + + +def main() -> None: + """Initialize and run the TutorialDock application.""" + # Download and cache fonts for offline use (fail silently) + try: + ensure_google_fonts_local() + except Exception: + pass + try: + ensure_fontawesome_local() + except Exception: + pass + + # Restore last opened folder if it exists + prefs = PREFS.get() + last_path = prefs.get("last_folder_path") + if isinstance(last_path, str) and last_path.strip(): + try: + if Path(last_path).expanduser().exists(): + LIB.set_root(last_path) + except Exception: + pass + + # Start Flask server on a free port + port = pick_free_port(HOST) + flask_thread = threading.Thread(target=run_flask, args=(port,), daemon=True) + flask_thread.start() + + # Get window dimensions from saved preferences + window_prefs = prefs.get("window", {}) if isinstance(prefs.get("window"), dict) else {} + width = int(window_prefs.get("width") or 1320) + height = int(window_prefs.get("height") or 860) + x = window_prefs.get("x") + y = window_prefs.get("y") + on_top = bool(prefs.get("always_on_top", False)) + + # Create and start the webview window + api = API() + webview.create_window( + "TutorialDock", + url=f"http://{HOST}:{port}/", + width=width, + height=height, + x=x if isinstance(x, int) else None, + y=y if isinstance(y, int) else None, + on_top=on_top, + js_api=api, + ) + webview.start(debug=False) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tutorials.bat b/tutorials.bat new file mode 100644 index 0000000..e4b096c --- /dev/null +++ b/tutorials.bat @@ -0,0 +1,6 @@ +@echo off +set SCRIPT_DIR=%~dp0 +pushd "%SCRIPT_DIR%" +start "" /B pythonw "tutorial.py" +popd +exit \ No newline at end of file