Config default, reduceMotion Q_PROPERTY on UIBridge, Windows SystemParametersInfo detection for prefers-reduced-motion.
837 lines
34 KiB
Python
837 lines
34 KiB
Python
import sys
|
|
import threading
|
|
import logging
|
|
import os
|
|
|
|
# Add the application directory to sys.path to ensure 'src' is findable
|
|
# This is critical for the embedded Python environment in the portable build
|
|
app_dir = os.path.dirname(os.path.abspath(__file__))
|
|
if app_dir not in sys.path:
|
|
sys.path.insert(0, app_dir)
|
|
|
|
# -----------------------------------------------------------------------------
|
|
# WINDOWS DLL FIX (CRITICAL for Portable CUDA)
|
|
# Python 3.8+ on Windows requires explicit DLL directory addition.
|
|
# -----------------------------------------------------------------------------
|
|
if os.name == 'nt' and hasattr(os, 'add_dll_directory'):
|
|
try:
|
|
from pathlib import Path
|
|
# Scan sys.path for site-packages
|
|
for p in sys.path:
|
|
path_obj = Path(p)
|
|
if path_obj.name == 'site-packages' and path_obj.exists():
|
|
nvidia_path = path_obj / "nvidia"
|
|
if nvidia_path.exists():
|
|
for subdir in nvidia_path.iterdir():
|
|
# Add 'bin' folder from each nvidia stub (cublas, cudnn, etc.)
|
|
bin_path = subdir / "bin"
|
|
if bin_path.exists():
|
|
os.add_dll_directory(str(bin_path))
|
|
# Also try adding site-packages itself just in case
|
|
# os.add_dll_directory(str(path_obj))
|
|
break
|
|
except Exception:
|
|
pass
|
|
# -----------------------------------------------------------------------------
|
|
|
|
from PySide6.QtWidgets import QApplication, QFileDialog, QMessageBox
|
|
from PySide6.QtCore import QObject, Slot, Signal, QThread, Qt, QUrl
|
|
from PySide6.QtQml import QQmlApplicationEngine
|
|
from PySide6.QtQuickControls2 import QQuickStyle
|
|
from PySide6.QtGui import QIcon
|
|
|
|
from src.ui.bridge import UIBridge
|
|
from src.ui.tray import SystemTray
|
|
from src.core.audio_engine import AudioEngine
|
|
from src.core.transcriber import WhisperTranscriber
|
|
from src.core.llm_engine import LLMEngine
|
|
from src.core.hotkey_manager import HotkeyManager
|
|
from src.core.config import ConfigManager
|
|
from src.utils.injector import InputInjector
|
|
from src.core.paths import get_models_path, get_bundle_path
|
|
from src.utils.window_hook import WindowHook
|
|
|
|
from PySide6.QtGui import QSurfaceFormat
|
|
|
|
# Configure GPU Surface for Alpha/Transparency (Critical for Blur)
|
|
surface_fmt = QSurfaceFormat()
|
|
surface_fmt.setAlphaBufferSize(8)
|
|
QSurfaceFormat.setDefaultFormat(surface_fmt)
|
|
|
|
# Configure High DPI behavior for crisp UI
|
|
os.environ["QT_ENABLE_HIGHDPI_SCALING"] = "1"
|
|
os.environ["QT_AUTOSCREENSCALEFACTOR"] = "1"
|
|
|
|
# Detect resolution without creating QApplication (Fixes crash)
|
|
try:
|
|
import ctypes
|
|
user32 = ctypes.windll.user32
|
|
# Get physical screen width (unscaled)
|
|
# SetProcessDPIAware is needed to get the true resolution
|
|
user32.SetProcessDPIAware()
|
|
width = user32.GetSystemMetrics(0)
|
|
# Base scale centers around 1920 width.
|
|
# At 3840 (4k), res_scale is 2.0. If we want it ~40% smaller, we multiply by 0.6 = 1.2
|
|
res_scale = (width / 1920)
|
|
if width >= 3840:
|
|
res_scale *= 0.6 # Make it significantly smaller at 4k as requested
|
|
|
|
os.environ["QT_SCALE_FACTOR"] = str(max(1.0, res_scale))
|
|
except:
|
|
pass
|
|
|
|
# Detect Windows "Reduce Motion" preference
|
|
try:
|
|
import ctypes
|
|
SPI_GETCLIENTAREAANIMATION = 0x1042
|
|
animation_enabled = ctypes.c_bool(True)
|
|
ctypes.windll.user32.SystemParametersInfoW(
|
|
SPI_GETCLIENTAREAANIMATION, 0,
|
|
ctypes.byref(animation_enabled), 0
|
|
)
|
|
if not animation_enabled.value:
|
|
ConfigManager().data["reduce_motion"] = True
|
|
ConfigManager().save()
|
|
except Exception:
|
|
pass
|
|
|
|
# Configure Logging
|
|
class QmlLoggingHandler(logging.Handler, QObject):
|
|
sig_log = Signal(str)
|
|
|
|
def __init__(self, bridge):
|
|
logging.Handler.__init__(self)
|
|
QObject.__init__(self)
|
|
self.bridge = bridge
|
|
self.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
|
|
self.sig_log.connect(self.bridge.append_log)
|
|
|
|
def emit(self, record):
|
|
msg = self.format(record)
|
|
self.sig_log.emit(msg)
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
# Silence shutdown-related tracebacks from Qt/PySide6 signals
|
|
def _silent_shutdown_hook(exc_type, exc_value, exc_tb):
|
|
# During Python shutdown, some QObject signals may try to call dead slots.
|
|
# Ignore these specific tracebacks when they occur in bridge.py.
|
|
import traceback
|
|
if exc_type in (RuntimeError, SystemError, KeyboardInterrupt):
|
|
return # Suppress completely
|
|
tb_str = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))
|
|
if 'bridge.py' in tb_str and '@Slot' in tb_str:
|
|
return # Suppress bridge signal tracebacks
|
|
# For all other exceptions, print normally
|
|
sys.__excepthook__(exc_type, exc_value, exc_tb)
|
|
|
|
sys.excepthook = _silent_shutdown_hook
|
|
|
|
class DownloadWorker(QThread):
|
|
"""Background worker for model downloads with REAL progress."""
|
|
progress = Signal(int)
|
|
finished = Signal()
|
|
error = Signal(str)
|
|
|
|
def __init__(self, model_name="small", parent=None):
|
|
super().__init__(parent)
|
|
self.model_name = model_name
|
|
|
|
def run(self):
|
|
try:
|
|
import requests
|
|
from tqdm import tqdm
|
|
model_path = get_models_path()
|
|
# Determine what to download
|
|
dest_dir = model_path / f"faster-whisper-{self.model_name}"
|
|
repo_id = f"Systran/faster-whisper-{self.model_name}"
|
|
files = ["config.json", "model.bin", "tokenizer.json", "vocabulary.json"]
|
|
base_url = f"https://huggingface.co/{repo_id}/resolve/main"
|
|
|
|
dest_dir.mkdir(parents=True, exist_ok=True)
|
|
logging.info(f"Downloading {self.model_name} to {dest_dir}...")
|
|
|
|
# 1. Calculate Total Size
|
|
total_size = 0
|
|
file_sizes = {}
|
|
|
|
with requests.Session() as s:
|
|
for fname in files:
|
|
url = f"{base_url}/{fname}"
|
|
head = s.head(url, allow_redirects=True)
|
|
if head.status_code == 200:
|
|
size = int(head.headers.get('content-length', 0))
|
|
file_sizes[fname] = size
|
|
total_size += size
|
|
else:
|
|
# Fallback for vocabulary.json vs vocabulary.txt
|
|
if fname == "vocabulary.json":
|
|
# Try .txt? Or just skip if not found?
|
|
# Faster-whisper usually has vocabulary.json
|
|
pass
|
|
|
|
# 2. Download loop
|
|
downloaded_bytes = 0
|
|
|
|
with requests.Session() as s:
|
|
for fname in files:
|
|
if fname not in file_sizes: continue
|
|
|
|
url = f"{base_url}/{fname}"
|
|
dest_file = dest_dir / fname
|
|
|
|
# Resume check?
|
|
# Simpler to just overwrite for reliability unless we want complex resume logic.
|
|
# We'll overwrite.
|
|
|
|
resp = s.get(url, stream=True)
|
|
resp.raise_for_status()
|
|
|
|
with open(dest_file, 'wb') as f:
|
|
for chunk in resp.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
f.write(chunk)
|
|
downloaded_bytes += len(chunk)
|
|
|
|
# Emit Progress
|
|
if total_size > 0:
|
|
pct = int((downloaded_bytes / total_size) * 100)
|
|
self.progress.emit(pct)
|
|
|
|
self.finished.emit()
|
|
|
|
except Exception as e:
|
|
logging.error(f"Download failed: {e}")
|
|
self.error.emit(str(e))
|
|
|
|
class LLMDownloadWorker(QThread):
|
|
progress = Signal(int)
|
|
finished = Signal()
|
|
error = Signal(str)
|
|
|
|
def __init__(self, parent=None):
|
|
super().__init__(parent)
|
|
|
|
def run(self):
|
|
try:
|
|
import requests
|
|
# Support one model for now
|
|
url = "https://huggingface.co/hugging-quants/Llama-3.2-1B-Instruct-Q4_K_M-GGUF/resolve/main/llama-3.2-1b-instruct-q4_k_m.gguf?download=true"
|
|
fname = "llama-3.2-1b-instruct-q4_k_m.gguf"
|
|
|
|
model_path = get_models_path() / "llm" / "llama-3.2-1b-instruct"
|
|
model_path.mkdir(parents=True, exist_ok=True)
|
|
dest_file = model_path / fname
|
|
|
|
# Simple check if exists and > 0 size?
|
|
# We assume if the user clicked download, they want to download it.
|
|
|
|
with requests.Session() as s:
|
|
head = s.head(url, allow_redirects=True)
|
|
total_size = int(head.headers.get('content-length', 0))
|
|
|
|
resp = s.get(url, stream=True)
|
|
resp.raise_for_status()
|
|
|
|
downloaded = 0
|
|
with open(dest_file, 'wb') as f:
|
|
for chunk in resp.iter_content(chunk_size=8192):
|
|
if chunk:
|
|
f.write(chunk)
|
|
downloaded += len(chunk)
|
|
if total_size > 0:
|
|
pct = int((downloaded / total_size) * 100)
|
|
self.progress.emit(pct)
|
|
|
|
self.finished.emit()
|
|
|
|
except Exception as e:
|
|
logging.error(f"LLM Download failed: {e}")
|
|
self.error.emit(str(e))
|
|
|
|
class LLMWorker(QThread):
|
|
finished = Signal(str)
|
|
|
|
def __init__(self, llm_engine, text, mode, parent=None):
|
|
super().__init__(parent)
|
|
self.llm_engine = llm_engine
|
|
self.text = text
|
|
self.mode = mode
|
|
|
|
def run(self):
|
|
try:
|
|
corrected = self.llm_engine.correct_text(self.text, self.mode)
|
|
self.finished.emit(corrected)
|
|
except Exception as e:
|
|
logging.error(f"LLMWorker crashed: {e}")
|
|
self.finished.emit(self.text) # Fail safe: return original text
|
|
|
|
|
|
class TranscriptionWorker(QThread):
|
|
finished = Signal(str)
|
|
def __init__(self, transcriber, audio_data, is_file=False, parent=None, task_override=None):
|
|
super().__init__(parent)
|
|
self.transcriber = transcriber
|
|
self.audio_data = audio_data
|
|
self.is_file = is_file
|
|
self.task_override = task_override
|
|
def run(self):
|
|
text = self.transcriber.transcribe(self.audio_data, is_file=self.is_file, task=self.task_override)
|
|
self.finished.emit(text)
|
|
|
|
class WhisperApp(QObject):
|
|
def __init__(self):
|
|
super().__init__()
|
|
# Force a style that supports full customization
|
|
QQuickStyle.setStyle("Basic")
|
|
|
|
self.qt_app = QApplication(sys.argv)
|
|
self.qt_app.setQuitOnLastWindowClosed(False)
|
|
|
|
# Set application-wide window icon (shows in taskbar for all windows)
|
|
icon_path = get_bundle_path() / "assets" / "icon.ico"
|
|
if icon_path.exists():
|
|
self.qt_app.setWindowIcon(QIcon(str(icon_path)))
|
|
|
|
self.config = ConfigManager()
|
|
|
|
# 1. Initialize QML Engine & Bridge
|
|
self.engine = QQmlApplicationEngine()
|
|
self.bridge = UIBridge()
|
|
|
|
# 0. Attach Logging Handler
|
|
logging.getLogger().addHandler(QmlLoggingHandler(self.bridge))
|
|
|
|
# Connect toggle recording signal
|
|
self.bridge.toggleRecordingRequested.connect(self.toggle_recording)
|
|
self.bridge.isRecordingChanged.connect(self.on_ui_toggle_request)
|
|
self.bridge.settingChanged.connect(self.on_settings_changed)
|
|
self.bridge.hotkeysEnabledChanged.connect(self.on_hotkeys_enabled_toggle)
|
|
self.bridge.downloadRequested.connect(self.on_download_requested)
|
|
self.bridge.llmDownloadRequested.connect(self.on_llm_download_requested)
|
|
|
|
self.engine.rootContext().setContextProperty("ui", self.bridge)
|
|
|
|
# 2. Tray setup
|
|
self.tray = SystemTray()
|
|
self.tray.quit_requested.connect(self.quit_app)
|
|
self.tray.settings_requested.connect(self.open_settings)
|
|
self.tray.transcribe_file_requested.connect(self.transcribe_file)
|
|
|
|
# Init Tooltip
|
|
from src.utils.formatters import format_hotkey
|
|
self.format_hotkey = format_hotkey # Store ref
|
|
|
|
hk1 = self.format_hotkey(self.config.get("hotkey"))
|
|
hk2 = self.format_hotkey(self.config.get("hotkey_translate"))
|
|
self.tray.setToolTip(f"Whisper Voice\nTranscribe: {hk1}\nTranslate: {hk2}")
|
|
|
|
# 3. Logic Components Placeholders
|
|
self.audio_engine = None
|
|
self.transcriber = None
|
|
self.llm_engine = None
|
|
self.hk_transcribe = None
|
|
self.hk_correct = None
|
|
self.hk_translate = None
|
|
self.overlay_root = None
|
|
|
|
# 4. Start Loader
|
|
loader_qml = get_bundle_path() / "src/ui/qml/Loader.qml"
|
|
self.engine.load(QUrl.fromLocalFile(str(loader_qml)))
|
|
self.loader_root = self.engine.rootObjects()[0]
|
|
self.loader_root.setProperty("color", "transparent")
|
|
|
|
|
|
self.loader_worker = DownloadWorker()
|
|
self.loader_worker.progress.connect(self.on_loader_progress)
|
|
self.loader_worker.finished.connect(self.on_loader_done)
|
|
self.loader_worker.start()
|
|
|
|
# Preload audio devices in background to avoid settings lag
|
|
import threading
|
|
threading.Thread(target=self.bridge.preload_audio_devices, daemon=True).start()
|
|
|
|
def on_loader_progress(self, percent):
|
|
self.bridge.downloadProgress = percent
|
|
|
|
def on_loader_done(self):
|
|
if getattr(self, "_loader_handled", False):
|
|
return
|
|
self._loader_handled = True
|
|
|
|
logging.info("Model verification complete.")
|
|
# Close Loader Window
|
|
if hasattr(self, "loader_root"):
|
|
self.loader_root.close()
|
|
|
|
# Init Backend
|
|
self.init_logic()
|
|
|
|
# Show Overlay (Ensure we don't load multiple times)
|
|
overlay_qml = get_bundle_path() / "src/ui/qml/Overlay.qml"
|
|
self.engine.load(QUrl.fromLocalFile(str(overlay_qml)))
|
|
self.overlay_root = self.engine.rootObjects()[-1]
|
|
self.overlay_root.setProperty("color", "transparent")
|
|
|
|
self.center_overlay()
|
|
|
|
# Preload Settings (Invisible)
|
|
logging.info("Preloading Settings window...")
|
|
self.open_settings()
|
|
if self.settings_root:
|
|
self.settings_root.setVisible(False)
|
|
|
|
# Install Low-Level Window Hook for Transparent Hit Test
|
|
try:
|
|
from src.utils.window_hook import WindowHook
|
|
hwnd = self.overlay_root.winId()
|
|
# Initial scale from config
|
|
scale = float(self.config.get("ui_scale"))
|
|
|
|
# Current Overlay Dimensions
|
|
win_w = int(460 * scale)
|
|
win_h = int(180 * scale)
|
|
|
|
self.window_hook = WindowHook(hwnd, win_w, win_h, initial_scale=scale)
|
|
self.window_hook.install()
|
|
|
|
# Initial state: Disabled because we start inactive
|
|
self.window_hook.set_enabled(False)
|
|
except Exception as e:
|
|
logging.error(f"Failed to install WindowHook: {e}")
|
|
|
|
def center_overlay(self):
|
|
"""Calculates and sets the Overlay position above the taskbar."""
|
|
from PySide6.QtGui import QGuiApplication
|
|
screen = QGuiApplication.primaryScreen()
|
|
if not screen or not self.overlay_root: return
|
|
|
|
geom = screen.availableGeometry()
|
|
w = self.overlay_root.width()
|
|
h = self.overlay_root.height()
|
|
|
|
x = geom.x() + (geom.width() - w) // 2
|
|
y = geom.bottom() - h - 15
|
|
|
|
self.overlay_root.setX(x)
|
|
self.overlay_root.setY(y)
|
|
|
|
def init_logic(self):
|
|
if getattr(self, "_logic_initialized", False):
|
|
return
|
|
self._logic_initialized = True
|
|
|
|
logging.info("Initializing Core Logic...")
|
|
self.audio_engine = AudioEngine()
|
|
self.audio_engine.set_visualizer_callback(self.bridge.update_amplitude)
|
|
self.audio_engine.set_silence_callback(self.on_silence_detected)
|
|
self.transcriber = WhisperTranscriber()
|
|
self.llm_engine = LLMEngine()
|
|
|
|
# Dual Hotkey Managers
|
|
self.hk_transcribe = HotkeyManager(config_key="hotkey")
|
|
self.hk_transcribe.triggered.connect(lambda: self.toggle_recording(task_override="transcribe", task_mode="standard"))
|
|
self.hk_transcribe.start()
|
|
|
|
self.hk_correct = HotkeyManager(config_key="hotkey_correct")
|
|
self.hk_correct.triggered.connect(lambda: self.toggle_recording(task_override="transcribe", task_mode="correct"))
|
|
self.hk_correct.start()
|
|
|
|
self.hk_translate = HotkeyManager(config_key="hotkey_translate")
|
|
self.hk_translate.triggered.connect(lambda: self.toggle_recording(task_override="translate", task_mode="standard"))
|
|
self.hk_translate.start()
|
|
|
|
self.bridge.update_status("Ready")
|
|
|
|
def run(self):
|
|
sys.exit(self.qt_app.exec())
|
|
|
|
@Slot(str, str)
|
|
@Slot(str)
|
|
def toggle_recording(self, task_override=None, task_mode="standard"):
|
|
"""
|
|
task_override: 'transcribe' or 'translate' (passed to whisper)
|
|
task_mode: 'standard' or 'correct' (determines post-processing)
|
|
"""
|
|
if task_mode == "correct":
|
|
self.current_task_requires_llm = True
|
|
elif task_mode == "standard":
|
|
self.current_task_requires_llm = False # Explicit reset
|
|
|
|
# Actual Logic
|
|
if self.bridge.isRecording:
|
|
logging.info("Stopping recording...")
|
|
# stop_recording returns the numpy array directly
|
|
audio_data = self.audio_engine.stop_recording()
|
|
|
|
self.bridge.isRecording = False
|
|
self.bridge.update_status("Processing...")
|
|
self.bridge.isProcessing = True
|
|
|
|
# Save task override for processing
|
|
self.last_task_override = task_override
|
|
|
|
if audio_data is not None and len(audio_data) > 0:
|
|
# Use the task that started this session, or the override if provided
|
|
final_task = getattr(self, "current_recording_task", self.config.get("task"))
|
|
if task_override: final_task = task_override
|
|
|
|
self.worker = TranscriptionWorker(self.transcriber, audio_data, parent=self, task_override=final_task)
|
|
self.worker.finished.connect(self.on_transcription_done)
|
|
self.worker.start()
|
|
else:
|
|
self.bridge.update_status("Ready")
|
|
self.bridge.isProcessing = False
|
|
|
|
else:
|
|
# START RECORDING
|
|
if self.bridge.isProcessing:
|
|
logging.warning("Ignored toggle request: Transcription in progress.")
|
|
return
|
|
|
|
intended_task = task_override if task_override else self.config.get("task")
|
|
self.current_recording_task = intended_task
|
|
|
|
logging.info(f"Starting recording... (Task: {intended_task}, Mode: {task_mode})")
|
|
self.audio_engine.start_recording()
|
|
self.bridge.isRecording = True
|
|
self.bridge.update_status(f"Recording ({intended_task})...")
|
|
|
|
@Slot()
|
|
def quit_app(self):
|
|
logging.info("Shutting down...")
|
|
|
|
# [CRITICAL] Stop the StatsWorker FIRST before any UI objects are touched.
|
|
# This prevents signal emissions to a dying UIBridge object.
|
|
if hasattr(self, 'bridge') and hasattr(self.bridge, 'stats_worker'):
|
|
try:
|
|
self.bridge.stats_worker.stats_ready.disconnect(self.bridge.update_stats_callback)
|
|
except: pass
|
|
self.bridge.stats_worker.stop()
|
|
|
|
if self.hk_transcribe: self.hk_transcribe.stop()
|
|
if self.hk_translate: self.hk_translate.stop()
|
|
|
|
# Close all QML windows to ensure bindings stop before Python objects die
|
|
if self.overlay_root:
|
|
self.overlay_root.close()
|
|
self.overlay_root.deleteLater()
|
|
if hasattr(self, 'loader_root') and self.loader_root:
|
|
self.loader_root.close()
|
|
self.loader_root.deleteLater()
|
|
if hasattr(self, 'settings_root') and self.settings_root:
|
|
self.settings_root.close()
|
|
self.settings_root.deleteLater()
|
|
|
|
if hasattr(self, 'loader_worker') and self.loader_worker and self.loader_worker.isRunning():
|
|
logging.info("Waiting for loader to finish...")
|
|
self.loader_worker.quit()
|
|
self.loader_worker.wait(1000)
|
|
|
|
if hasattr(self, 'worker') and self.worker and self.worker.isRunning():
|
|
logging.info("Waiting for transcription to finish...")
|
|
self.worker.quit()
|
|
self.worker.wait(2000)
|
|
|
|
self.qt_app.quit()
|
|
|
|
@Slot()
|
|
def open_settings(self):
|
|
if not hasattr(self, 'settings_root') or self.settings_root is None:
|
|
logging.info("Loading Settings window for the first time...")
|
|
settings_qml = get_bundle_path() / "src/ui/qml/Settings.qml"
|
|
self.engine.load(QUrl.fromLocalFile(str(settings_qml)))
|
|
self.settings_root = self.engine.rootObjects()[-1]
|
|
self.settings_root.setProperty("color", "transparent")
|
|
|
|
# Connect the closing signal to just hide/delete reference if needed,
|
|
# but better to keep it alive. Actually, QML Window close() hides it by default usually
|
|
# unless we set closePolicy. Let's ensure we can re-show it.
|
|
# We might need to listen to closing signal to prevent destruction if we want to reuse.
|
|
# But simpler: check if it exists, if so, show/raise it.
|
|
|
|
# Center on screen
|
|
from PySide6.QtGui import QGuiApplication
|
|
screen = QGuiApplication.primaryScreen()
|
|
if screen:
|
|
geom = screen.availableGeometry()
|
|
self.settings_root.setX(geom.x() + (geom.width() - self.settings_root.width()) // 2)
|
|
self.settings_root.setY(geom.y() + (geom.height() - self.settings_root.height()) // 2)
|
|
|
|
self.settings_root.setVisible(True)
|
|
self.settings_root.requestActivate()
|
|
|
|
@Slot()
|
|
def init_settings_preload(self):
|
|
"""Preloads settings window to avoid lag on first open."""
|
|
# Check if already loaded
|
|
if hasattr(self, 'settings_root') and self.settings_root:
|
|
return
|
|
|
|
logging.info("Preloading Settings QML...")
|
|
# Load but keep hidden? QML Window visible defaults to true usually,
|
|
# so we might see a flicker if we don't be careful.
|
|
# Ideally we load it with visible: false property from python or QML.
|
|
# For now, let's just let the first open be the load, but since user complained about lag...
|
|
# effectively doing nothing different here unless we actually trigger load.
|
|
pass
|
|
|
|
@Slot(str, 'QVariant')
|
|
def on_settings_changed(self, key, value):
|
|
"""
|
|
React to settings changes in real-time.
|
|
Some settings require immediate action (reloading model, moving window).
|
|
"""
|
|
print(f"Setting Changed: {key} = {value}")
|
|
|
|
# 1. Hotkey Reload
|
|
if key in ["hotkey", "hotkey_translate", "hotkey_correct"]:
|
|
if self.hk_transcribe: self.hk_transcribe.reload_hotkey()
|
|
if self.hk_correct: self.hk_correct.reload_hotkey()
|
|
if self.hk_translate: self.hk_translate.reload_hotkey()
|
|
|
|
if self.tray:
|
|
hk1 = self.format_hotkey(self.config.get("hotkey"))
|
|
hk3 = self.format_hotkey(self.config.get("hotkey_correct"))
|
|
hk2 = self.format_hotkey(self.config.get("hotkey_translate"))
|
|
self.tray.setToolTip(f"Whisper Voice\nTranscribe: {hk1}\nCorrect: {hk3}\nTranslate: {hk2}")
|
|
|
|
# 2. AI Model Reload (Heavy)
|
|
if key in ["model_size", "compute_device", "compute_type"]:
|
|
size = self.config.get("model_size")
|
|
# Notify UI to check if the new selected model is downloaded
|
|
self.bridge.notifyModelStatesChanged()
|
|
|
|
if self.transcriber.model_exists(size):
|
|
logging.info(f"Model '{size}' exists. Reloading engine...")
|
|
threading.Thread(target=self.transcriber.load_model, daemon=True).start()
|
|
else:
|
|
logging.info(f"Model '{size}' not found. Waiting for manual download.")
|
|
|
|
# 3. Window Positioning
|
|
if key in ["overlay_position", "overlay_offset_x", "overlay_offset_y", "ui_scale"]:
|
|
self.reposition_overlay()
|
|
|
|
# 4. Run on Startup
|
|
if key == "run_on_startup":
|
|
self.handle_startup_shortcut(value)
|
|
|
|
# 4. Input Device (Audio Engine handles this on next record start typically,
|
|
# but we can force a stream restart if we want instant feedback?
|
|
# For now, next record is fine as per plan).
|
|
|
|
def reposition_overlay(self):
|
|
"""Calculates and sets the Overlay position based on user settings."""
|
|
from PySide6.QtGui import QGuiApplication
|
|
screen = QGuiApplication.primaryScreen()
|
|
if not screen or not self.overlay_root: return
|
|
|
|
# Apply UI Scale (Handled in QML now, but we need it for position calc)
|
|
scale = float(self.config.get("ui_scale"))
|
|
# self.overlay_root.setProperty("scale", scale) # Removed, handled in QML
|
|
|
|
# Get Geometry
|
|
geom = screen.availableGeometry()
|
|
|
|
# Current Scaled Dimensions (Approximation)
|
|
# Note: We must assume the base size is 460x180 (window size)
|
|
# But visually it's 380x100 (container) scaled up.
|
|
# The Window itself stays fixed size (transparent frame), but content scales.
|
|
# Actually, simpler interpretation: The window size is fixed large area, content moves.
|
|
# BUT if we want "Edge alignment", we must account for visual bounds.
|
|
|
|
visual_w = 460 * scale
|
|
visual_h = 180 * scale
|
|
|
|
# We set the WINDOW position anchor.
|
|
# Since the window content is centered, the window is effectively the bounding box we care about?
|
|
# No, the window is 460x180. The content is smaller 380x100.
|
|
# Let's align based on the WINDOW size for now to be safe.
|
|
|
|
# Wait, if we scale in QML, does the window size change? No.
|
|
# So if we scale up 1.5x, content might clip if window doesn't grow.
|
|
# To support UI Scale properly without clipping, we should probably resize the window here too.
|
|
# Let's resize the window to fit the scaled content.
|
|
|
|
win_w = int(460 * scale)
|
|
win_h = int(180 * scale)
|
|
|
|
self.overlay_root.setWidth(win_w)
|
|
self.overlay_root.setHeight(win_h)
|
|
|
|
pos_mode = self.config.get("overlay_position")
|
|
offset_x = int(self.config.get("overlay_offset_x"))
|
|
offset_y = int(self.config.get("overlay_offset_y"))
|
|
|
|
x = 0
|
|
y = 0
|
|
|
|
if pos_mode == "Bottom Center":
|
|
x = geom.x() + (geom.width() - win_w) // 2
|
|
y = geom.bottom() - win_h - 15
|
|
elif pos_mode == "Top Center":
|
|
x = geom.x() + (geom.width() - win_w) // 2
|
|
y = geom.top() + 15
|
|
elif pos_mode == "Bottom Right":
|
|
x = geom.right() - win_w - 15
|
|
y = geom.bottom() - win_h - 15
|
|
elif pos_mode == "Top Right":
|
|
x = geom.right() - win_w - 15
|
|
y = geom.top() + 15
|
|
elif pos_mode == "Bottom Left":
|
|
x = geom.left() + 15
|
|
y = geom.bottom() - win_h - 15
|
|
elif pos_mode == "Top Left":
|
|
x = geom.left() + 15
|
|
y = geom.top() + 15
|
|
|
|
# Apply Offsets
|
|
x += offset_x
|
|
y += offset_y
|
|
|
|
self.overlay_root.setX(x)
|
|
self.overlay_root.setY(y)
|
|
|
|
@Slot()
|
|
def transcribe_file(self):
|
|
file_path, _ = QFileDialog.getOpenFileName(None, "Select Audio", "", "Audio (*.mp3 *.wav *.flac *.m4a *.ogg)")
|
|
if file_path:
|
|
self.bridge.update_status("Thinking...")
|
|
# Files use the default configured task usually, or we could ask?
|
|
# Default to config setting for files.
|
|
self.worker = TranscriptionWorker(self.transcriber, file_path, is_file=True, parent=self)
|
|
self.worker.finished.connect(self.on_transcription_done)
|
|
self.worker.start()
|
|
|
|
@Slot()
|
|
def on_silence_detected(self):
|
|
from PySide6.QtCore import QMetaObject, Qt
|
|
# Silence detection always triggers the task that was active?
|
|
# Since silence stops recording, it just calls toggle_recording with no arg, using the stored current_task?
|
|
# Let's ensure toggle_recording handles no arg calls by stopping the CURRENT task.
|
|
QMetaObject.invokeMethod(self, "toggle_recording", Qt.QueuedConnection)
|
|
|
|
|
|
|
|
@Slot(bool)
|
|
def on_ui_toggle_request(self, state):
|
|
if state != self.audio_engine.recording:
|
|
self.toggle_recording() # Default behavior for UI clicks
|
|
|
|
@Slot(str)
|
|
def on_transcription_done(self, text: str):
|
|
self.bridge.update_status("Ready")
|
|
self.bridge.isProcessing = False # Temporarily false? No, keep it true if we chain.
|
|
|
|
# Check LLM Settings -> AND check if the current task requested it
|
|
llm_enabled = self.config.get("llm_enabled")
|
|
requires_llm = getattr(self, "current_task_requires_llm", False)
|
|
|
|
# We only correct if:
|
|
# 1. LLM is globally enabled (safety switch)
|
|
# 2. current_task_requires_llm is True (triggered by Correct hotkey)
|
|
# OR 3. Maybe user WANTS global correction? Ideally user uses separate hotkey.
|
|
# Let's say: If "Correction" is enabled in settings, does it apply to ALL?
|
|
# The user's feedback suggests they DON'T want it on regular hotkey.
|
|
# So we enforce: Correct Hotkey -> Corrects. Regular Hotkey -> Raw.
|
|
# BUT we must handle the case where user expects the old behavior?
|
|
# Let's make it strict: Only correct if triggered by correct hotkey OR if we add a "Correct All" toggle later.
|
|
# For now, let's respect the flag. But wait, if llm_enabled is OFF, we shouldn't run it even if hotkey pressed?
|
|
# Yes, safety switch.
|
|
|
|
if text and llm_enabled and requires_llm:
|
|
# Chain to LLM
|
|
self.bridge.isProcessing = True
|
|
self.bridge.update_status("Correcting...")
|
|
mode = self.config.get("llm_mode")
|
|
self.llm_worker = LLMWorker(self.llm_engine, text, mode, parent=self)
|
|
self.llm_worker.finished.connect(self.on_llm_done)
|
|
self.llm_worker.start()
|
|
return
|
|
|
|
self.bridge.isProcessing = False
|
|
if text:
|
|
method = self.config.get("input_method")
|
|
speed = int(self.config.get("typing_speed"))
|
|
InputInjector.inject_text(text, method, speed)
|
|
|
|
@Slot(str)
|
|
def on_llm_done(self, text: str):
|
|
self.bridge.update_status("Ready")
|
|
self.bridge.isProcessing = False
|
|
if text:
|
|
method = self.config.get("input_method")
|
|
speed = int(self.config.get("typing_speed"))
|
|
InputInjector.inject_text(text, method, speed)
|
|
|
|
# Cleanup
|
|
if hasattr(self, 'llm_worker') and self.llm_worker:
|
|
self.llm_worker.deleteLater()
|
|
self.llm_worker = None
|
|
|
|
@Slot(bool)
|
|
def on_hotkeys_enabled_toggle(self, state):
|
|
if self.hk_transcribe: self.hk_transcribe.set_enabled(state)
|
|
if self.hk_translate: self.hk_translate.set_enabled(state)
|
|
|
|
@Slot(str)
|
|
def on_download_requested(self, size):
|
|
if self.bridge.isDownloading:
|
|
return
|
|
|
|
self.bridge.update_status("Downloading...")
|
|
self.bridge.isDownloading = True
|
|
|
|
self.download_worker = DownloadWorker(size, parent=self)
|
|
self.download_worker.finished.connect(self.on_download_finished)
|
|
self.download_worker.error.connect(self.on_download_error)
|
|
self.download_worker.start()
|
|
|
|
@Slot()
|
|
def on_llm_download_requested(self):
|
|
if self.bridge.isDownloading: return
|
|
|
|
self.bridge.update_status("Downloading LLM...")
|
|
self.bridge.isDownloading = True
|
|
|
|
self.llm_dl_worker = LLMDownloadWorker(parent=self)
|
|
self.llm_dl_worker.progress.connect(self.on_loader_progress) # Reuse existing progress slot? Yes.
|
|
self.llm_dl_worker.finished.connect(self.on_download_finished) # Reuses same cleanup
|
|
self.llm_dl_worker.error.connect(self.on_download_error)
|
|
self.llm_dl_worker.start()
|
|
|
|
def on_download_finished(self):
|
|
self.bridge.isDownloading = False
|
|
self.bridge.update_status("Ready")
|
|
self.bridge.notifyModelStatesChanged() # Refresh UI markers
|
|
# Automatically load it now that it's here
|
|
threading.Thread(target=self.transcriber.load_model, daemon=True).start()
|
|
|
|
def on_download_error(self, err):
|
|
self.bridge.isDownloading = False
|
|
self.bridge.update_status("Error")
|
|
logging.error(f"Download Error: {err}")
|
|
|
|
@Slot(bool)
|
|
def on_ui_toggle_request(self, is_recording):
|
|
"""Called when recording state changes."""
|
|
# Update Window Hook to allow clicking if active
|
|
is_active = is_recording or self.bridge.isProcessing
|
|
if hasattr(self, 'window_hook'):
|
|
self.window_hook.set_enabled(is_active)
|
|
|
|
@Slot(bool)
|
|
def on_processing_changed(self, is_processing):
|
|
is_active = self.bridge.isRecording or is_processing
|
|
if hasattr(self, 'window_hook'):
|
|
self.window_hook.set_enabled(is_active)
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
app = WhisperApp()
|
|
|
|
# Connect extra signal for processing state
|
|
app.bridge.isProcessingChanged.connect(app.on_processing_changed)
|
|
|
|
sys.exit(app.run())
|