Files
whisper_voice/main.py
Your Name a70e76b4ab WCAG: Add reduce_motion config, bridge property, OS detection
Config default, reduceMotion Q_PROPERTY on UIBridge, Windows
SystemParametersInfo detection for prefers-reduced-motion.
2026-02-18 21:02:27 +02:00

837 lines
34 KiB
Python

import sys
import threading
import logging
import os
# Add the application directory to sys.path to ensure 'src' is findable
# This is critical for the embedded Python environment in the portable build
app_dir = os.path.dirname(os.path.abspath(__file__))
if app_dir not in sys.path:
sys.path.insert(0, app_dir)
# -----------------------------------------------------------------------------
# WINDOWS DLL FIX (CRITICAL for Portable CUDA)
# Python 3.8+ on Windows requires explicit DLL directory addition.
# -----------------------------------------------------------------------------
if os.name == 'nt' and hasattr(os, 'add_dll_directory'):
try:
from pathlib import Path
# Scan sys.path for site-packages
for p in sys.path:
path_obj = Path(p)
if path_obj.name == 'site-packages' and path_obj.exists():
nvidia_path = path_obj / "nvidia"
if nvidia_path.exists():
for subdir in nvidia_path.iterdir():
# Add 'bin' folder from each nvidia stub (cublas, cudnn, etc.)
bin_path = subdir / "bin"
if bin_path.exists():
os.add_dll_directory(str(bin_path))
# Also try adding site-packages itself just in case
# os.add_dll_directory(str(path_obj))
break
except Exception:
pass
# -----------------------------------------------------------------------------
from PySide6.QtWidgets import QApplication, QFileDialog, QMessageBox
from PySide6.QtCore import QObject, Slot, Signal, QThread, Qt, QUrl
from PySide6.QtQml import QQmlApplicationEngine
from PySide6.QtQuickControls2 import QQuickStyle
from PySide6.QtGui import QIcon
from src.ui.bridge import UIBridge
from src.ui.tray import SystemTray
from src.core.audio_engine import AudioEngine
from src.core.transcriber import WhisperTranscriber
from src.core.llm_engine import LLMEngine
from src.core.hotkey_manager import HotkeyManager
from src.core.config import ConfigManager
from src.utils.injector import InputInjector
from src.core.paths import get_models_path, get_bundle_path
from src.utils.window_hook import WindowHook
from PySide6.QtGui import QSurfaceFormat
# Configure GPU Surface for Alpha/Transparency (Critical for Blur)
surface_fmt = QSurfaceFormat()
surface_fmt.setAlphaBufferSize(8)
QSurfaceFormat.setDefaultFormat(surface_fmt)
# Configure High DPI behavior for crisp UI
os.environ["QT_ENABLE_HIGHDPI_SCALING"] = "1"
os.environ["QT_AUTOSCREENSCALEFACTOR"] = "1"
# Detect resolution without creating QApplication (Fixes crash)
try:
import ctypes
user32 = ctypes.windll.user32
# Get physical screen width (unscaled)
# SetProcessDPIAware is needed to get the true resolution
user32.SetProcessDPIAware()
width = user32.GetSystemMetrics(0)
# Base scale centers around 1920 width.
# At 3840 (4k), res_scale is 2.0. If we want it ~40% smaller, we multiply by 0.6 = 1.2
res_scale = (width / 1920)
if width >= 3840:
res_scale *= 0.6 # Make it significantly smaller at 4k as requested
os.environ["QT_SCALE_FACTOR"] = str(max(1.0, res_scale))
except:
pass
# Detect Windows "Reduce Motion" preference
try:
import ctypes
SPI_GETCLIENTAREAANIMATION = 0x1042
animation_enabled = ctypes.c_bool(True)
ctypes.windll.user32.SystemParametersInfoW(
SPI_GETCLIENTAREAANIMATION, 0,
ctypes.byref(animation_enabled), 0
)
if not animation_enabled.value:
ConfigManager().data["reduce_motion"] = True
ConfigManager().save()
except Exception:
pass
# Configure Logging
class QmlLoggingHandler(logging.Handler, QObject):
sig_log = Signal(str)
def __init__(self, bridge):
logging.Handler.__init__(self)
QObject.__init__(self)
self.bridge = bridge
self.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
self.sig_log.connect(self.bridge.append_log)
def emit(self, record):
msg = self.format(record)
self.sig_log.emit(msg)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Silence shutdown-related tracebacks from Qt/PySide6 signals
def _silent_shutdown_hook(exc_type, exc_value, exc_tb):
# During Python shutdown, some QObject signals may try to call dead slots.
# Ignore these specific tracebacks when they occur in bridge.py.
import traceback
if exc_type in (RuntimeError, SystemError, KeyboardInterrupt):
return # Suppress completely
tb_str = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))
if 'bridge.py' in tb_str and '@Slot' in tb_str:
return # Suppress bridge signal tracebacks
# For all other exceptions, print normally
sys.__excepthook__(exc_type, exc_value, exc_tb)
sys.excepthook = _silent_shutdown_hook
class DownloadWorker(QThread):
"""Background worker for model downloads with REAL progress."""
progress = Signal(int)
finished = Signal()
error = Signal(str)
def __init__(self, model_name="small", parent=None):
super().__init__(parent)
self.model_name = model_name
def run(self):
try:
import requests
from tqdm import tqdm
model_path = get_models_path()
# Determine what to download
dest_dir = model_path / f"faster-whisper-{self.model_name}"
repo_id = f"Systran/faster-whisper-{self.model_name}"
files = ["config.json", "model.bin", "tokenizer.json", "vocabulary.json"]
base_url = f"https://huggingface.co/{repo_id}/resolve/main"
dest_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"Downloading {self.model_name} to {dest_dir}...")
# 1. Calculate Total Size
total_size = 0
file_sizes = {}
with requests.Session() as s:
for fname in files:
url = f"{base_url}/{fname}"
head = s.head(url, allow_redirects=True)
if head.status_code == 200:
size = int(head.headers.get('content-length', 0))
file_sizes[fname] = size
total_size += size
else:
# Fallback for vocabulary.json vs vocabulary.txt
if fname == "vocabulary.json":
# Try .txt? Or just skip if not found?
# Faster-whisper usually has vocabulary.json
pass
# 2. Download loop
downloaded_bytes = 0
with requests.Session() as s:
for fname in files:
if fname not in file_sizes: continue
url = f"{base_url}/{fname}"
dest_file = dest_dir / fname
# Resume check?
# Simpler to just overwrite for reliability unless we want complex resume logic.
# We'll overwrite.
resp = s.get(url, stream=True)
resp.raise_for_status()
with open(dest_file, 'wb') as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded_bytes += len(chunk)
# Emit Progress
if total_size > 0:
pct = int((downloaded_bytes / total_size) * 100)
self.progress.emit(pct)
self.finished.emit()
except Exception as e:
logging.error(f"Download failed: {e}")
self.error.emit(str(e))
class LLMDownloadWorker(QThread):
progress = Signal(int)
finished = Signal()
error = Signal(str)
def __init__(self, parent=None):
super().__init__(parent)
def run(self):
try:
import requests
# Support one model for now
url = "https://huggingface.co/hugging-quants/Llama-3.2-1B-Instruct-Q4_K_M-GGUF/resolve/main/llama-3.2-1b-instruct-q4_k_m.gguf?download=true"
fname = "llama-3.2-1b-instruct-q4_k_m.gguf"
model_path = get_models_path() / "llm" / "llama-3.2-1b-instruct"
model_path.mkdir(parents=True, exist_ok=True)
dest_file = model_path / fname
# Simple check if exists and > 0 size?
# We assume if the user clicked download, they want to download it.
with requests.Session() as s:
head = s.head(url, allow_redirects=True)
total_size = int(head.headers.get('content-length', 0))
resp = s.get(url, stream=True)
resp.raise_for_status()
downloaded = 0
with open(dest_file, 'wb') as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
pct = int((downloaded / total_size) * 100)
self.progress.emit(pct)
self.finished.emit()
except Exception as e:
logging.error(f"LLM Download failed: {e}")
self.error.emit(str(e))
class LLMWorker(QThread):
finished = Signal(str)
def __init__(self, llm_engine, text, mode, parent=None):
super().__init__(parent)
self.llm_engine = llm_engine
self.text = text
self.mode = mode
def run(self):
try:
corrected = self.llm_engine.correct_text(self.text, self.mode)
self.finished.emit(corrected)
except Exception as e:
logging.error(f"LLMWorker crashed: {e}")
self.finished.emit(self.text) # Fail safe: return original text
class TranscriptionWorker(QThread):
finished = Signal(str)
def __init__(self, transcriber, audio_data, is_file=False, parent=None, task_override=None):
super().__init__(parent)
self.transcriber = transcriber
self.audio_data = audio_data
self.is_file = is_file
self.task_override = task_override
def run(self):
text = self.transcriber.transcribe(self.audio_data, is_file=self.is_file, task=self.task_override)
self.finished.emit(text)
class WhisperApp(QObject):
def __init__(self):
super().__init__()
# Force a style that supports full customization
QQuickStyle.setStyle("Basic")
self.qt_app = QApplication(sys.argv)
self.qt_app.setQuitOnLastWindowClosed(False)
# Set application-wide window icon (shows in taskbar for all windows)
icon_path = get_bundle_path() / "assets" / "icon.ico"
if icon_path.exists():
self.qt_app.setWindowIcon(QIcon(str(icon_path)))
self.config = ConfigManager()
# 1. Initialize QML Engine & Bridge
self.engine = QQmlApplicationEngine()
self.bridge = UIBridge()
# 0. Attach Logging Handler
logging.getLogger().addHandler(QmlLoggingHandler(self.bridge))
# Connect toggle recording signal
self.bridge.toggleRecordingRequested.connect(self.toggle_recording)
self.bridge.isRecordingChanged.connect(self.on_ui_toggle_request)
self.bridge.settingChanged.connect(self.on_settings_changed)
self.bridge.hotkeysEnabledChanged.connect(self.on_hotkeys_enabled_toggle)
self.bridge.downloadRequested.connect(self.on_download_requested)
self.bridge.llmDownloadRequested.connect(self.on_llm_download_requested)
self.engine.rootContext().setContextProperty("ui", self.bridge)
# 2. Tray setup
self.tray = SystemTray()
self.tray.quit_requested.connect(self.quit_app)
self.tray.settings_requested.connect(self.open_settings)
self.tray.transcribe_file_requested.connect(self.transcribe_file)
# Init Tooltip
from src.utils.formatters import format_hotkey
self.format_hotkey = format_hotkey # Store ref
hk1 = self.format_hotkey(self.config.get("hotkey"))
hk2 = self.format_hotkey(self.config.get("hotkey_translate"))
self.tray.setToolTip(f"Whisper Voice\nTranscribe: {hk1}\nTranslate: {hk2}")
# 3. Logic Components Placeholders
self.audio_engine = None
self.transcriber = None
self.llm_engine = None
self.hk_transcribe = None
self.hk_correct = None
self.hk_translate = None
self.overlay_root = None
# 4. Start Loader
loader_qml = get_bundle_path() / "src/ui/qml/Loader.qml"
self.engine.load(QUrl.fromLocalFile(str(loader_qml)))
self.loader_root = self.engine.rootObjects()[0]
self.loader_root.setProperty("color", "transparent")
self.loader_worker = DownloadWorker()
self.loader_worker.progress.connect(self.on_loader_progress)
self.loader_worker.finished.connect(self.on_loader_done)
self.loader_worker.start()
# Preload audio devices in background to avoid settings lag
import threading
threading.Thread(target=self.bridge.preload_audio_devices, daemon=True).start()
def on_loader_progress(self, percent):
self.bridge.downloadProgress = percent
def on_loader_done(self):
if getattr(self, "_loader_handled", False):
return
self._loader_handled = True
logging.info("Model verification complete.")
# Close Loader Window
if hasattr(self, "loader_root"):
self.loader_root.close()
# Init Backend
self.init_logic()
# Show Overlay (Ensure we don't load multiple times)
overlay_qml = get_bundle_path() / "src/ui/qml/Overlay.qml"
self.engine.load(QUrl.fromLocalFile(str(overlay_qml)))
self.overlay_root = self.engine.rootObjects()[-1]
self.overlay_root.setProperty("color", "transparent")
self.center_overlay()
# Preload Settings (Invisible)
logging.info("Preloading Settings window...")
self.open_settings()
if self.settings_root:
self.settings_root.setVisible(False)
# Install Low-Level Window Hook for Transparent Hit Test
try:
from src.utils.window_hook import WindowHook
hwnd = self.overlay_root.winId()
# Initial scale from config
scale = float(self.config.get("ui_scale"))
# Current Overlay Dimensions
win_w = int(460 * scale)
win_h = int(180 * scale)
self.window_hook = WindowHook(hwnd, win_w, win_h, initial_scale=scale)
self.window_hook.install()
# Initial state: Disabled because we start inactive
self.window_hook.set_enabled(False)
except Exception as e:
logging.error(f"Failed to install WindowHook: {e}")
def center_overlay(self):
"""Calculates and sets the Overlay position above the taskbar."""
from PySide6.QtGui import QGuiApplication
screen = QGuiApplication.primaryScreen()
if not screen or not self.overlay_root: return
geom = screen.availableGeometry()
w = self.overlay_root.width()
h = self.overlay_root.height()
x = geom.x() + (geom.width() - w) // 2
y = geom.bottom() - h - 15
self.overlay_root.setX(x)
self.overlay_root.setY(y)
def init_logic(self):
if getattr(self, "_logic_initialized", False):
return
self._logic_initialized = True
logging.info("Initializing Core Logic...")
self.audio_engine = AudioEngine()
self.audio_engine.set_visualizer_callback(self.bridge.update_amplitude)
self.audio_engine.set_silence_callback(self.on_silence_detected)
self.transcriber = WhisperTranscriber()
self.llm_engine = LLMEngine()
# Dual Hotkey Managers
self.hk_transcribe = HotkeyManager(config_key="hotkey")
self.hk_transcribe.triggered.connect(lambda: self.toggle_recording(task_override="transcribe", task_mode="standard"))
self.hk_transcribe.start()
self.hk_correct = HotkeyManager(config_key="hotkey_correct")
self.hk_correct.triggered.connect(lambda: self.toggle_recording(task_override="transcribe", task_mode="correct"))
self.hk_correct.start()
self.hk_translate = HotkeyManager(config_key="hotkey_translate")
self.hk_translate.triggered.connect(lambda: self.toggle_recording(task_override="translate", task_mode="standard"))
self.hk_translate.start()
self.bridge.update_status("Ready")
def run(self):
sys.exit(self.qt_app.exec())
@Slot(str, str)
@Slot(str)
def toggle_recording(self, task_override=None, task_mode="standard"):
"""
task_override: 'transcribe' or 'translate' (passed to whisper)
task_mode: 'standard' or 'correct' (determines post-processing)
"""
if task_mode == "correct":
self.current_task_requires_llm = True
elif task_mode == "standard":
self.current_task_requires_llm = False # Explicit reset
# Actual Logic
if self.bridge.isRecording:
logging.info("Stopping recording...")
# stop_recording returns the numpy array directly
audio_data = self.audio_engine.stop_recording()
self.bridge.isRecording = False
self.bridge.update_status("Processing...")
self.bridge.isProcessing = True
# Save task override for processing
self.last_task_override = task_override
if audio_data is not None and len(audio_data) > 0:
# Use the task that started this session, or the override if provided
final_task = getattr(self, "current_recording_task", self.config.get("task"))
if task_override: final_task = task_override
self.worker = TranscriptionWorker(self.transcriber, audio_data, parent=self, task_override=final_task)
self.worker.finished.connect(self.on_transcription_done)
self.worker.start()
else:
self.bridge.update_status("Ready")
self.bridge.isProcessing = False
else:
# START RECORDING
if self.bridge.isProcessing:
logging.warning("Ignored toggle request: Transcription in progress.")
return
intended_task = task_override if task_override else self.config.get("task")
self.current_recording_task = intended_task
logging.info(f"Starting recording... (Task: {intended_task}, Mode: {task_mode})")
self.audio_engine.start_recording()
self.bridge.isRecording = True
self.bridge.update_status(f"Recording ({intended_task})...")
@Slot()
def quit_app(self):
logging.info("Shutting down...")
# [CRITICAL] Stop the StatsWorker FIRST before any UI objects are touched.
# This prevents signal emissions to a dying UIBridge object.
if hasattr(self, 'bridge') and hasattr(self.bridge, 'stats_worker'):
try:
self.bridge.stats_worker.stats_ready.disconnect(self.bridge.update_stats_callback)
except: pass
self.bridge.stats_worker.stop()
if self.hk_transcribe: self.hk_transcribe.stop()
if self.hk_translate: self.hk_translate.stop()
# Close all QML windows to ensure bindings stop before Python objects die
if self.overlay_root:
self.overlay_root.close()
self.overlay_root.deleteLater()
if hasattr(self, 'loader_root') and self.loader_root:
self.loader_root.close()
self.loader_root.deleteLater()
if hasattr(self, 'settings_root') and self.settings_root:
self.settings_root.close()
self.settings_root.deleteLater()
if hasattr(self, 'loader_worker') and self.loader_worker and self.loader_worker.isRunning():
logging.info("Waiting for loader to finish...")
self.loader_worker.quit()
self.loader_worker.wait(1000)
if hasattr(self, 'worker') and self.worker and self.worker.isRunning():
logging.info("Waiting for transcription to finish...")
self.worker.quit()
self.worker.wait(2000)
self.qt_app.quit()
@Slot()
def open_settings(self):
if not hasattr(self, 'settings_root') or self.settings_root is None:
logging.info("Loading Settings window for the first time...")
settings_qml = get_bundle_path() / "src/ui/qml/Settings.qml"
self.engine.load(QUrl.fromLocalFile(str(settings_qml)))
self.settings_root = self.engine.rootObjects()[-1]
self.settings_root.setProperty("color", "transparent")
# Connect the closing signal to just hide/delete reference if needed,
# but better to keep it alive. Actually, QML Window close() hides it by default usually
# unless we set closePolicy. Let's ensure we can re-show it.
# We might need to listen to closing signal to prevent destruction if we want to reuse.
# But simpler: check if it exists, if so, show/raise it.
# Center on screen
from PySide6.QtGui import QGuiApplication
screen = QGuiApplication.primaryScreen()
if screen:
geom = screen.availableGeometry()
self.settings_root.setX(geom.x() + (geom.width() - self.settings_root.width()) // 2)
self.settings_root.setY(geom.y() + (geom.height() - self.settings_root.height()) // 2)
self.settings_root.setVisible(True)
self.settings_root.requestActivate()
@Slot()
def init_settings_preload(self):
"""Preloads settings window to avoid lag on first open."""
# Check if already loaded
if hasattr(self, 'settings_root') and self.settings_root:
return
logging.info("Preloading Settings QML...")
# Load but keep hidden? QML Window visible defaults to true usually,
# so we might see a flicker if we don't be careful.
# Ideally we load it with visible: false property from python or QML.
# For now, let's just let the first open be the load, but since user complained about lag...
# effectively doing nothing different here unless we actually trigger load.
pass
@Slot(str, 'QVariant')
def on_settings_changed(self, key, value):
"""
React to settings changes in real-time.
Some settings require immediate action (reloading model, moving window).
"""
print(f"Setting Changed: {key} = {value}")
# 1. Hotkey Reload
if key in ["hotkey", "hotkey_translate", "hotkey_correct"]:
if self.hk_transcribe: self.hk_transcribe.reload_hotkey()
if self.hk_correct: self.hk_correct.reload_hotkey()
if self.hk_translate: self.hk_translate.reload_hotkey()
if self.tray:
hk1 = self.format_hotkey(self.config.get("hotkey"))
hk3 = self.format_hotkey(self.config.get("hotkey_correct"))
hk2 = self.format_hotkey(self.config.get("hotkey_translate"))
self.tray.setToolTip(f"Whisper Voice\nTranscribe: {hk1}\nCorrect: {hk3}\nTranslate: {hk2}")
# 2. AI Model Reload (Heavy)
if key in ["model_size", "compute_device", "compute_type"]:
size = self.config.get("model_size")
# Notify UI to check if the new selected model is downloaded
self.bridge.notifyModelStatesChanged()
if self.transcriber.model_exists(size):
logging.info(f"Model '{size}' exists. Reloading engine...")
threading.Thread(target=self.transcriber.load_model, daemon=True).start()
else:
logging.info(f"Model '{size}' not found. Waiting for manual download.")
# 3. Window Positioning
if key in ["overlay_position", "overlay_offset_x", "overlay_offset_y", "ui_scale"]:
self.reposition_overlay()
# 4. Run on Startup
if key == "run_on_startup":
self.handle_startup_shortcut(value)
# 4. Input Device (Audio Engine handles this on next record start typically,
# but we can force a stream restart if we want instant feedback?
# For now, next record is fine as per plan).
def reposition_overlay(self):
"""Calculates and sets the Overlay position based on user settings."""
from PySide6.QtGui import QGuiApplication
screen = QGuiApplication.primaryScreen()
if not screen or not self.overlay_root: return
# Apply UI Scale (Handled in QML now, but we need it for position calc)
scale = float(self.config.get("ui_scale"))
# self.overlay_root.setProperty("scale", scale) # Removed, handled in QML
# Get Geometry
geom = screen.availableGeometry()
# Current Scaled Dimensions (Approximation)
# Note: We must assume the base size is 460x180 (window size)
# But visually it's 380x100 (container) scaled up.
# The Window itself stays fixed size (transparent frame), but content scales.
# Actually, simpler interpretation: The window size is fixed large area, content moves.
# BUT if we want "Edge alignment", we must account for visual bounds.
visual_w = 460 * scale
visual_h = 180 * scale
# We set the WINDOW position anchor.
# Since the window content is centered, the window is effectively the bounding box we care about?
# No, the window is 460x180. The content is smaller 380x100.
# Let's align based on the WINDOW size for now to be safe.
# Wait, if we scale in QML, does the window size change? No.
# So if we scale up 1.5x, content might clip if window doesn't grow.
# To support UI Scale properly without clipping, we should probably resize the window here too.
# Let's resize the window to fit the scaled content.
win_w = int(460 * scale)
win_h = int(180 * scale)
self.overlay_root.setWidth(win_w)
self.overlay_root.setHeight(win_h)
pos_mode = self.config.get("overlay_position")
offset_x = int(self.config.get("overlay_offset_x"))
offset_y = int(self.config.get("overlay_offset_y"))
x = 0
y = 0
if pos_mode == "Bottom Center":
x = geom.x() + (geom.width() - win_w) // 2
y = geom.bottom() - win_h - 15
elif pos_mode == "Top Center":
x = geom.x() + (geom.width() - win_w) // 2
y = geom.top() + 15
elif pos_mode == "Bottom Right":
x = geom.right() - win_w - 15
y = geom.bottom() - win_h - 15
elif pos_mode == "Top Right":
x = geom.right() - win_w - 15
y = geom.top() + 15
elif pos_mode == "Bottom Left":
x = geom.left() + 15
y = geom.bottom() - win_h - 15
elif pos_mode == "Top Left":
x = geom.left() + 15
y = geom.top() + 15
# Apply Offsets
x += offset_x
y += offset_y
self.overlay_root.setX(x)
self.overlay_root.setY(y)
@Slot()
def transcribe_file(self):
file_path, _ = QFileDialog.getOpenFileName(None, "Select Audio", "", "Audio (*.mp3 *.wav *.flac *.m4a *.ogg)")
if file_path:
self.bridge.update_status("Thinking...")
# Files use the default configured task usually, or we could ask?
# Default to config setting for files.
self.worker = TranscriptionWorker(self.transcriber, file_path, is_file=True, parent=self)
self.worker.finished.connect(self.on_transcription_done)
self.worker.start()
@Slot()
def on_silence_detected(self):
from PySide6.QtCore import QMetaObject, Qt
# Silence detection always triggers the task that was active?
# Since silence stops recording, it just calls toggle_recording with no arg, using the stored current_task?
# Let's ensure toggle_recording handles no arg calls by stopping the CURRENT task.
QMetaObject.invokeMethod(self, "toggle_recording", Qt.QueuedConnection)
@Slot(bool)
def on_ui_toggle_request(self, state):
if state != self.audio_engine.recording:
self.toggle_recording() # Default behavior for UI clicks
@Slot(str)
def on_transcription_done(self, text: str):
self.bridge.update_status("Ready")
self.bridge.isProcessing = False # Temporarily false? No, keep it true if we chain.
# Check LLM Settings -> AND check if the current task requested it
llm_enabled = self.config.get("llm_enabled")
requires_llm = getattr(self, "current_task_requires_llm", False)
# We only correct if:
# 1. LLM is globally enabled (safety switch)
# 2. current_task_requires_llm is True (triggered by Correct hotkey)
# OR 3. Maybe user WANTS global correction? Ideally user uses separate hotkey.
# Let's say: If "Correction" is enabled in settings, does it apply to ALL?
# The user's feedback suggests they DON'T want it on regular hotkey.
# So we enforce: Correct Hotkey -> Corrects. Regular Hotkey -> Raw.
# BUT we must handle the case where user expects the old behavior?
# Let's make it strict: Only correct if triggered by correct hotkey OR if we add a "Correct All" toggle later.
# For now, let's respect the flag. But wait, if llm_enabled is OFF, we shouldn't run it even if hotkey pressed?
# Yes, safety switch.
if text and llm_enabled and requires_llm:
# Chain to LLM
self.bridge.isProcessing = True
self.bridge.update_status("Correcting...")
mode = self.config.get("llm_mode")
self.llm_worker = LLMWorker(self.llm_engine, text, mode, parent=self)
self.llm_worker.finished.connect(self.on_llm_done)
self.llm_worker.start()
return
self.bridge.isProcessing = False
if text:
method = self.config.get("input_method")
speed = int(self.config.get("typing_speed"))
InputInjector.inject_text(text, method, speed)
@Slot(str)
def on_llm_done(self, text: str):
self.bridge.update_status("Ready")
self.bridge.isProcessing = False
if text:
method = self.config.get("input_method")
speed = int(self.config.get("typing_speed"))
InputInjector.inject_text(text, method, speed)
# Cleanup
if hasattr(self, 'llm_worker') and self.llm_worker:
self.llm_worker.deleteLater()
self.llm_worker = None
@Slot(bool)
def on_hotkeys_enabled_toggle(self, state):
if self.hk_transcribe: self.hk_transcribe.set_enabled(state)
if self.hk_translate: self.hk_translate.set_enabled(state)
@Slot(str)
def on_download_requested(self, size):
if self.bridge.isDownloading:
return
self.bridge.update_status("Downloading...")
self.bridge.isDownloading = True
self.download_worker = DownloadWorker(size, parent=self)
self.download_worker.finished.connect(self.on_download_finished)
self.download_worker.error.connect(self.on_download_error)
self.download_worker.start()
@Slot()
def on_llm_download_requested(self):
if self.bridge.isDownloading: return
self.bridge.update_status("Downloading LLM...")
self.bridge.isDownloading = True
self.llm_dl_worker = LLMDownloadWorker(parent=self)
self.llm_dl_worker.progress.connect(self.on_loader_progress) # Reuse existing progress slot? Yes.
self.llm_dl_worker.finished.connect(self.on_download_finished) # Reuses same cleanup
self.llm_dl_worker.error.connect(self.on_download_error)
self.llm_dl_worker.start()
def on_download_finished(self):
self.bridge.isDownloading = False
self.bridge.update_status("Ready")
self.bridge.notifyModelStatesChanged() # Refresh UI markers
# Automatically load it now that it's here
threading.Thread(target=self.transcriber.load_model, daemon=True).start()
def on_download_error(self, err):
self.bridge.isDownloading = False
self.bridge.update_status("Error")
logging.error(f"Download Error: {err}")
@Slot(bool)
def on_ui_toggle_request(self, is_recording):
"""Called when recording state changes."""
# Update Window Hook to allow clicking if active
is_active = is_recording or self.bridge.isProcessing
if hasattr(self, 'window_hook'):
self.window_hook.set_enabled(is_active)
@Slot(bool)
def on_processing_changed(self, is_processing):
is_active = self.bridge.isRecording or is_processing
if hasattr(self, 'window_hook'):
self.window_hook.set_enabled(is_active)
if __name__ == "__main__":
import sys
app = WhisperApp()
# Connect extra signal for processing state
app.bridge.isProcessingChanged.connect(app.on_processing_changed)
sys.exit(app.run())