Files
whisper_voice/main.py

898 lines
37 KiB
Python

import sys
import threading
import logging
import os
# Add the application directory to sys.path to ensure 'src' is findable
# This is critical for the embedded Python environment in the portable build
app_dir = os.path.dirname(os.path.abspath(__file__))
if app_dir not in sys.path:
sys.path.insert(0, app_dir)
# -----------------------------------------------------------------------------
# LINUX: Force XWayland (X11) for reliable window positioning & overlay behavior.
# Our input stack (evdev, UInput, wl-copy) is compositor-agnostic so this is safe.
# Native Wayland lacks app-controlled window positioning which the overlay needs.
# -----------------------------------------------------------------------------
if sys.platform == 'linux' and os.environ.get('WAYLAND_DISPLAY'):
os.environ.setdefault('QT_QPA_PLATFORM', 'xcb')
# -----------------------------------------------------------------------------
# WINDOWS DLL FIX (CRITICAL for Portable CUDA)
# Python 3.8+ on Windows requires explicit DLL directory addition.
# -----------------------------------------------------------------------------
if os.name == 'nt' and hasattr(os, 'add_dll_directory'):
try:
from pathlib import Path
_candidate_dirs = set()
# 1. From sys.path (scan ALL site-packages, not just the first)
for p in sys.path:
path_obj = Path(p)
if path_obj.name == 'site-packages' and path_obj.exists():
_candidate_dirs.add(str(path_obj.resolve()))
# 2. Relative to the Python executable (critical for embedded Python)
_exe_dir = Path(sys.executable).parent
for _sp in [_exe_dir / "Lib" / "site-packages", _exe_dir / "lib" / "site-packages"]:
if _sp.exists():
_candidate_dirs.add(str(_sp.resolve()))
# 3. Scan all candidates for nvidia DLL directories
for _sp_str in _candidate_dirs:
nvidia_path = Path(_sp_str) / "nvidia"
if nvidia_path.exists():
for subdir in nvidia_path.iterdir():
bin_path = subdir / "bin"
if bin_path.exists():
os.add_dll_directory(str(bin_path))
# Also add to PATH as fallback - some libraries
# (e.g. CTranslate2) load DLLs lazily via LoadLibrary
# which may not respect os.add_dll_directory()
os.environ['PATH'] = str(bin_path) + os.pathsep + os.environ.get('PATH', '')
except Exception:
pass
# -----------------------------------------------------------------------------
from PySide6.QtWidgets import QApplication, QFileDialog, QMessageBox
from PySide6.QtCore import QObject, Slot, Signal, QThread, Qt, QUrl
from PySide6.QtQml import QQmlApplicationEngine
from PySide6.QtQuickControls2 import QQuickStyle
from PySide6.QtGui import QIcon
from src.ui.bridge import UIBridge
from src.ui.tray import SystemTray
from src.core.audio_engine import AudioEngine
from src.core.transcriber import WhisperTranscriber
from src.core.llm_engine import LLMEngine
from src.core.hotkey_manager import HotkeyManager
from src.core.config import ConfigManager
from src.utils.injector import InputInjector
from src.core.paths import get_models_path, get_bundle_path
if os.name == 'nt':
from src.utils.window_hook import WindowHook
from PySide6.QtGui import QSurfaceFormat
# Configure GPU Surface for Alpha/Transparency (Critical for Blur)
surface_fmt = QSurfaceFormat()
surface_fmt.setAlphaBufferSize(8)
QSurfaceFormat.setDefaultFormat(surface_fmt)
# Configure High DPI behavior for crisp UI
os.environ["QT_ENABLE_HIGHDPI_SCALING"] = "1"
os.environ["QT_AUTOSCREENSCALEFACTOR"] = "1"
# Detect resolution without creating QApplication (Fixes crash)
if os.name == 'nt':
try:
import ctypes
user32 = ctypes.windll.user32
# Get physical screen width (unscaled)
# SetProcessDPIAware is needed to get the true resolution
user32.SetProcessDPIAware()
width = user32.GetSystemMetrics(0)
# Base scale centers around 1920 width.
# At 3840 (4k), res_scale is 2.0. If we want it ~40% smaller, we multiply by 0.6 = 1.2
res_scale = (width / 1920)
if width >= 3840:
res_scale *= 0.6 # Make it significantly smaller at 4k as requested
os.environ["QT_SCALE_FACTOR"] = str(max(1.0, res_scale))
except:
pass
# On Linux, Qt handles DPI automatically via QT_ENABLE_HIGHDPI_SCALING
# Detect "Reduce Motion" preference
if os.name == 'nt':
try:
import ctypes
SPI_GETCLIENTAREAANIMATION = 0x1042
animation_enabled = ctypes.c_bool(True)
ctypes.windll.user32.SystemParametersInfoW(
SPI_GETCLIENTAREAANIMATION, 0,
ctypes.byref(animation_enabled), 0
)
if not animation_enabled.value:
ConfigManager().data["reduce_motion"] = True
ConfigManager().save()
except Exception:
pass
elif sys.platform == 'linux':
try:
import subprocess as _sp
result = _sp.run(
['gsettings', 'get', 'org.gnome.desktop.interface', 'enable-animations'],
capture_output=True, text=True, timeout=2
)
if result.stdout.strip() == 'false':
ConfigManager().data["reduce_motion"] = True
ConfigManager().save()
except Exception:
pass
# Configure Logging
class QmlLoggingHandler(logging.Handler, QObject):
sig_log = Signal(str)
def __init__(self, bridge):
logging.Handler.__init__(self)
QObject.__init__(self)
self.bridge = bridge
self.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))
self.sig_log.connect(self.bridge.append_log)
def emit(self, record):
msg = self.format(record)
self.sig_log.emit(msg)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Silence shutdown-related tracebacks from Qt/PySide6 signals
def _silent_shutdown_hook(exc_type, exc_value, exc_tb):
# During Python shutdown, some QObject signals may try to call dead slots.
# Ignore these specific tracebacks when they occur in bridge.py.
import traceback
if exc_type in (RuntimeError, SystemError, KeyboardInterrupt):
return # Suppress completely
tb_str = ''.join(traceback.format_exception(exc_type, exc_value, exc_tb))
if 'bridge.py' in tb_str and '@Slot' in tb_str:
return # Suppress bridge signal tracebacks
# For all other exceptions, print normally
sys.__excepthook__(exc_type, exc_value, exc_tb)
sys.excepthook = _silent_shutdown_hook
class DownloadWorker(QThread):
"""Background worker for model downloads with REAL progress."""
progress = Signal(int)
finished = Signal()
error = Signal(str)
def __init__(self, model_name="small", parent=None):
super().__init__(parent)
self.model_name = model_name
def run(self):
try:
import requests
import shutil
model_path = get_models_path()
dest_dir = model_path / f"faster-whisper-{self.model_name}"
repo_id = f"Systran/faster-whisper-{self.model_name}"
required_files = ["config.json", "model.bin", "tokenizer.json", "vocabulary.json"]
base_url = f"https://huggingface.co/{repo_id}/resolve/main"
# Skip if already complete
if dest_dir.exists() and all((dest_dir / f).exists() for f in required_files):
logging.info(f"Model {self.model_name} already downloaded.")
self.finished.emit()
return
# Download to a temp dir first, move on success
tmp_dir = model_path / f".tmp-faster-whisper-{self.model_name}"
if tmp_dir.exists():
shutil.rmtree(tmp_dir)
tmp_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"Downloading {self.model_name} to {dest_dir}...")
# 1. Calculate Total Size
total_size = 0
file_sizes = {}
with requests.Session() as s:
for fname in required_files:
url = f"{base_url}/{fname}"
head = s.head(url, allow_redirects=True)
if head.status_code == 200:
size = int(head.headers.get('content-length', 0))
file_sizes[fname] = size
total_size += size
else:
logging.warning(f"HEAD failed for {fname}: HTTP {head.status_code}")
# Abort if any required file is unavailable
missing = [f for f in required_files if f not in file_sizes]
if missing:
shutil.rmtree(tmp_dir, ignore_errors=True)
raise RuntimeError(f"Required model files unavailable: {missing}")
# 2. Download loop
downloaded_bytes = 0
with requests.Session() as s:
for fname in required_files:
url = f"{base_url}/{fname}"
dest_file = tmp_dir / fname
resp = s.get(url, stream=True)
resp.raise_for_status()
with open(dest_file, 'wb') as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded_bytes += len(chunk)
if total_size > 0:
pct = int((downloaded_bytes / total_size) * 100)
self.progress.emit(pct)
# 3. Validate all files present and non-empty
for fname in required_files:
fpath = tmp_dir / fname
if not fpath.exists() or fpath.stat().st_size == 0:
shutil.rmtree(tmp_dir, ignore_errors=True)
raise RuntimeError(f"Download incomplete: {fname} missing or empty")
# 4. Atomic move: replace dest with completed download
if dest_dir.exists():
shutil.rmtree(dest_dir)
tmp_dir.rename(dest_dir)
self.finished.emit()
except Exception as e:
logging.error(f"Download failed: {e}")
self.error.emit(str(e))
class LLMDownloadWorker(QThread):
progress = Signal(int)
finished = Signal()
error = Signal(str)
def __init__(self, parent=None):
super().__init__(parent)
def run(self):
try:
import requests
# Support one model for now
url = "https://huggingface.co/hugging-quants/Llama-3.2-1B-Instruct-Q4_K_M-GGUF/resolve/main/llama-3.2-1b-instruct-q4_k_m.gguf?download=true"
fname = "llama-3.2-1b-instruct-q4_k_m.gguf"
model_path = get_models_path() / "llm" / "llama-3.2-1b-instruct"
model_path.mkdir(parents=True, exist_ok=True)
dest_file = model_path / fname
# Simple check if exists and > 0 size?
# We assume if the user clicked download, they want to download it.
with requests.Session() as s:
head = s.head(url, allow_redirects=True)
total_size = int(head.headers.get('content-length', 0))
resp = s.get(url, stream=True)
resp.raise_for_status()
downloaded = 0
with open(dest_file, 'wb') as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
pct = int((downloaded / total_size) * 100)
self.progress.emit(pct)
self.finished.emit()
except Exception as e:
logging.error(f"LLM Download failed: {e}")
self.error.emit(str(e))
class LLMWorker(QThread):
finished = Signal(str)
def __init__(self, llm_engine, text, mode, parent=None):
super().__init__(parent)
self.llm_engine = llm_engine
self.text = text
self.mode = mode
def run(self):
try:
corrected = self.llm_engine.correct_text(self.text, self.mode)
self.finished.emit(corrected)
except Exception as e:
logging.error(f"LLMWorker crashed: {e}")
self.finished.emit(self.text) # Fail safe: return original text
class TranscriptionWorker(QThread):
finished = Signal(str)
def __init__(self, transcriber, audio_data, is_file=False, parent=None, task_override=None):
super().__init__(parent)
self.transcriber = transcriber
self.audio_data = audio_data
self.is_file = is_file
self.task_override = task_override
def run(self):
text = self.transcriber.transcribe(self.audio_data, is_file=self.is_file, task=self.task_override)
self.finished.emit(text)
class WhisperApp(QObject):
def __init__(self):
super().__init__()
# Force a style that supports full customization
QQuickStyle.setStyle("Basic")
self.qt_app = QApplication(sys.argv)
self.qt_app.setQuitOnLastWindowClosed(False)
# Set application-wide window icon (shows in taskbar for all windows)
icon_path = get_bundle_path() / "assets" / "icon.ico"
if icon_path.exists():
self.qt_app.setWindowIcon(QIcon(str(icon_path)))
self.config = ConfigManager()
# 1. Initialize QML Engine & Bridge
self.engine = QQmlApplicationEngine()
self.bridge = UIBridge()
# 0. Attach Logging Handler
logging.getLogger().addHandler(QmlLoggingHandler(self.bridge))
# Connect toggle recording signal
self.bridge.toggleRecordingRequested.connect(self.toggle_recording)
self.bridge.isRecordingChanged.connect(self.on_ui_toggle_request)
self.bridge.settingChanged.connect(self.on_settings_changed)
self.bridge.hotkeysEnabledChanged.connect(self.on_hotkeys_enabled_toggle)
self.bridge.downloadRequested.connect(self.on_download_requested)
self.bridge.llmDownloadRequested.connect(self.on_llm_download_requested)
self.engine.rootContext().setContextProperty("ui", self.bridge)
self.engine.rootContext().setContextProperty("isLinux", sys.platform == 'linux')
# 2. Tray setup
self.tray = SystemTray()
self.tray.quit_requested.connect(self.quit_app)
self.tray.settings_requested.connect(self.open_settings)
self.tray.transcribe_file_requested.connect(self.transcribe_file)
# Init Tooltip
from src.utils.formatters import format_hotkey
self.format_hotkey = format_hotkey # Store ref
hk1 = self.format_hotkey(self.config.get("hotkey"))
hk2 = self.format_hotkey(self.config.get("hotkey_translate"))
self.tray.setToolTip(f"Whisper Voice\nTranscribe: {hk1}\nTranslate: {hk2}")
# 3. Logic Components Placeholders
self.audio_engine = None
self.transcriber = None
self.llm_engine = None
self.hk_transcribe = None
self.hk_correct = None
self.hk_translate = None
self.overlay_root = None
# 4. Start Loader
loader_qml = get_bundle_path() / "src/ui/qml/Loader.qml"
self.engine.load(QUrl.fromLocalFile(str(loader_qml)))
self.loader_root = self.engine.rootObjects()[0]
self.loader_root.setProperty("color", "transparent")
self.loader_worker = DownloadWorker()
self.loader_worker.progress.connect(self.on_loader_progress)
self.loader_worker.finished.connect(self.on_loader_done)
self.loader_worker.start()
# Preload audio devices in background to avoid settings lag
import threading
threading.Thread(target=self.bridge.preload_audio_devices, daemon=True).start()
def on_loader_progress(self, percent):
self.bridge.downloadProgress = percent
def on_loader_done(self):
if getattr(self, "_loader_handled", False):
return
self._loader_handled = True
logging.info("Model verification complete.")
# Close Loader Window
if hasattr(self, "loader_root"):
self.loader_root.close()
# Init Backend
self.init_logic()
# Show Overlay (Ensure we don't load multiple times)
overlay_qml = get_bundle_path() / "src/ui/qml/Overlay.qml"
self.engine.load(QUrl.fromLocalFile(str(overlay_qml)))
self.overlay_root = self.engine.rootObjects()[-1]
self.overlay_root.setProperty("color", "transparent")
self.center_overlay()
# Preload Settings (Invisible)
logging.info("Preloading Settings window...")
self.open_settings()
if self.settings_root:
self.settings_root.setVisible(False)
# Install Low-Level Window Hook for Transparent Hit Test
if os.name == 'nt':
try:
from src.utils.window_hook import WindowHook
hwnd = self.overlay_root.winId()
# Initial scale from config
scale = float(self.config.get("ui_scale"))
# Current Overlay Dimensions
win_w = int(460 * scale)
win_h = int(180 * scale)
self.window_hook = WindowHook(hwnd, win_w, win_h, initial_scale=scale)
self.window_hook.install()
# Initial state: Disabled because we start inactive
self.window_hook.set_enabled(False)
except Exception as e:
logging.error(f"Failed to install WindowHook: {e}")
else:
# On Linux, use Qt flag for click-through overlay
self.overlay_root.setFlag(Qt.WindowTransparentForInput, True)
def center_overlay(self):
"""Calculates and sets the Overlay position above the taskbar."""
from PySide6.QtGui import QGuiApplication
screen = QGuiApplication.primaryScreen()
if not screen or not self.overlay_root: return
geom = screen.availableGeometry()
w = self.overlay_root.width()
h = self.overlay_root.height()
x = geom.x() + (geom.width() - w) // 2
y = geom.bottom() - h - 15
self.overlay_root.setX(x)
self.overlay_root.setY(y)
def init_logic(self):
if getattr(self, "_logic_initialized", False):
return
self._logic_initialized = True
logging.info("Initializing Core Logic...")
self.audio_engine = AudioEngine()
self.audio_engine.set_visualizer_callback(self.bridge.update_amplitude)
self.audio_engine.set_silence_callback(self.on_silence_detected)
self.transcriber = WhisperTranscriber()
self.llm_engine = LLMEngine()
# Dual Hotkey Managers
self.hk_transcribe = HotkeyManager(config_key="hotkey")
self.hk_transcribe.triggered.connect(lambda: self.toggle_recording(task_override="transcribe", task_mode="standard"))
self.hk_transcribe.start()
self.hk_correct = HotkeyManager(config_key="hotkey_correct")
self.hk_correct.triggered.connect(lambda: self.toggle_recording(task_override="transcribe", task_mode="correct"))
self.hk_correct.start()
self.hk_translate = HotkeyManager(config_key="hotkey_translate")
self.hk_translate.triggered.connect(lambda: self.toggle_recording(task_override="translate", task_mode="standard"))
self.hk_translate.start()
self.bridge.update_status("Ready")
def run(self):
sys.exit(self.qt_app.exec())
@Slot(str, str)
@Slot(str)
def toggle_recording(self, task_override=None, task_mode="standard"):
"""
task_override: 'transcribe' or 'translate' (passed to whisper)
task_mode: 'standard' or 'correct' (determines post-processing)
"""
if task_mode == "correct":
self.current_task_requires_llm = True
elif task_mode == "standard":
self.current_task_requires_llm = False # Explicit reset
# Actual Logic
if self.bridge.isRecording:
logging.info("Stopping recording...")
# stop_recording returns the numpy array directly
audio_data = self.audio_engine.stop_recording()
self.bridge.isRecording = False
self.bridge.update_status("Processing...")
self.bridge.isProcessing = True
# Save task override for processing
self.last_task_override = task_override
if audio_data is not None and len(audio_data) > 0:
# Use the task that started this session, or the override if provided
final_task = getattr(self, "current_recording_task", self.config.get("task"))
if task_override: final_task = task_override
self.worker = TranscriptionWorker(self.transcriber, audio_data, parent=self, task_override=final_task)
self.worker.finished.connect(self.on_transcription_done)
self.worker.start()
else:
self.bridge.update_status("Ready")
self.bridge.isProcessing = False
else:
# START RECORDING
if self.bridge.isProcessing:
logging.warning("Ignored toggle request: Transcription in progress.")
return
intended_task = task_override if task_override else self.config.get("task")
self.current_recording_task = intended_task
logging.info(f"Starting recording... (Task: {intended_task}, Mode: {task_mode})")
self.audio_engine.start_recording()
self.bridge.isRecording = True
self.bridge.update_status(f"Recording ({intended_task})...")
@Slot()
def quit_app(self):
logging.info("Shutting down...")
# [CRITICAL] Stop the StatsWorker FIRST before any UI objects are touched.
# This prevents signal emissions to a dying UIBridge object.
if hasattr(self, 'bridge') and hasattr(self.bridge, 'stats_worker'):
try:
self.bridge.stats_worker.stats_ready.disconnect(self.bridge.update_stats_callback)
except: pass
self.bridge.stats_worker.stop()
if self.hk_transcribe: self.hk_transcribe.stop()
if self.hk_translate: self.hk_translate.stop()
# Close all QML windows to ensure bindings stop before Python objects die
if self.overlay_root:
self.overlay_root.close()
self.overlay_root.deleteLater()
if hasattr(self, 'loader_root') and self.loader_root:
self.loader_root.close()
self.loader_root.deleteLater()
if hasattr(self, 'settings_root') and self.settings_root:
self.settings_root.close()
self.settings_root.deleteLater()
if hasattr(self, 'loader_worker') and self.loader_worker and self.loader_worker.isRunning():
logging.info("Waiting for loader to finish...")
self.loader_worker.quit()
self.loader_worker.wait(1000)
if hasattr(self, 'worker') and self.worker and self.worker.isRunning():
logging.info("Waiting for transcription to finish...")
self.worker.quit()
self.worker.wait(2000)
self.qt_app.quit()
@Slot()
def open_settings(self):
if not hasattr(self, 'settings_root') or self.settings_root is None:
logging.info("Loading Settings window for the first time...")
settings_qml = get_bundle_path() / "src/ui/qml/Settings.qml"
self.engine.load(QUrl.fromLocalFile(str(settings_qml)))
self.settings_root = self.engine.rootObjects()[-1]
self.settings_root.setProperty("color", "transparent")
# Connect the closing signal to just hide/delete reference if needed,
# but better to keep it alive. Actually, QML Window close() hides it by default usually
# unless we set closePolicy. Let's ensure we can re-show it.
# We might need to listen to closing signal to prevent destruction if we want to reuse.
# But simpler: check if it exists, if so, show/raise it.
# Center on screen
from PySide6.QtGui import QGuiApplication
screen = QGuiApplication.primaryScreen()
if screen:
geom = screen.availableGeometry()
self.settings_root.setX(geom.x() + (geom.width() - self.settings_root.width()) // 2)
self.settings_root.setY(geom.y() + (geom.height() - self.settings_root.height()) // 2)
self.settings_root.setVisible(True)
self.settings_root.requestActivate()
@Slot()
def init_settings_preload(self):
"""Preloads settings window to avoid lag on first open."""
# Check if already loaded
if hasattr(self, 'settings_root') and self.settings_root:
return
logging.info("Preloading Settings QML...")
# Load but keep hidden? QML Window visible defaults to true usually,
# so we might see a flicker if we don't be careful.
# Ideally we load it with visible: false property from python or QML.
# For now, let's just let the first open be the load, but since user complained about lag...
# effectively doing nothing different here unless we actually trigger load.
pass
@Slot(str, 'QVariant')
def on_settings_changed(self, key, value):
"""
React to settings changes in real-time.
Some settings require immediate action (reloading model, moving window).
"""
print(f"Setting Changed: {key} = {value}")
# 1. Hotkey Reload
if key in ["hotkey", "hotkey_translate", "hotkey_correct"]:
if self.hk_transcribe: self.hk_transcribe.reload_hotkey()
if self.hk_correct: self.hk_correct.reload_hotkey()
if self.hk_translate: self.hk_translate.reload_hotkey()
if self.tray:
hk1 = self.format_hotkey(self.config.get("hotkey"))
hk3 = self.format_hotkey(self.config.get("hotkey_correct"))
hk2 = self.format_hotkey(self.config.get("hotkey_translate"))
self.tray.setToolTip(f"Whisper Voice\nTranscribe: {hk1}\nCorrect: {hk3}\nTranslate: {hk2}")
# 2. AI Model Reload (Heavy)
if key in ["model_size", "compute_device", "compute_type"]:
size = self.config.get("model_size")
# Notify UI to check if the new selected model is downloaded
self.bridge.notifyModelStatesChanged()
if self.transcriber.model_exists(size):
logging.info(f"Model '{size}' exists. Reloading engine...")
threading.Thread(target=self.transcriber.load_model, daemon=True).start()
else:
logging.info(f"Model '{size}' not found. Waiting for manual download.")
# 3. Window Positioning
if key in ["overlay_position", "overlay_offset_x", "overlay_offset_y", "ui_scale"]:
self.reposition_overlay()
# 4. Run on Startup
if key == "run_on_startup":
self.handle_startup_shortcut(value)
# 4. Input Device (Audio Engine handles this on next record start typically,
# but we can force a stream restart if we want instant feedback?
# For now, next record is fine as per plan).
def reposition_overlay(self):
"""Calculates and sets the Overlay position based on user settings."""
from PySide6.QtGui import QGuiApplication
screen = QGuiApplication.primaryScreen()
if not screen or not self.overlay_root: return
# Apply UI Scale (Handled in QML now, but we need it for position calc)
scale = float(self.config.get("ui_scale"))
# self.overlay_root.setProperty("scale", scale) # Removed, handled in QML
# Get Geometry
geom = screen.availableGeometry()
# Current Scaled Dimensions (Approximation)
# Note: We must assume the base size is 460x180 (window size)
# But visually it's 380x100 (container) scaled up.
# The Window itself stays fixed size (transparent frame), but content scales.
# Actually, simpler interpretation: The window size is fixed large area, content moves.
# BUT if we want "Edge alignment", we must account for visual bounds.
visual_w = 460 * scale
visual_h = 180 * scale
# We set the WINDOW position anchor.
# Since the window content is centered, the window is effectively the bounding box we care about?
# No, the window is 460x180. The content is smaller 380x100.
# Let's align based on the WINDOW size for now to be safe.
# Wait, if we scale in QML, does the window size change? No.
# So if we scale up 1.5x, content might clip if window doesn't grow.
# To support UI Scale properly without clipping, we should probably resize the window here too.
# Let's resize the window to fit the scaled content.
win_w = int(460 * scale)
win_h = int(180 * scale)
self.overlay_root.setWidth(win_w)
self.overlay_root.setHeight(win_h)
pos_mode = self.config.get("overlay_position")
offset_x = int(self.config.get("overlay_offset_x"))
offset_y = int(self.config.get("overlay_offset_y"))
x = 0
y = 0
if pos_mode == "Bottom Center":
x = geom.x() + (geom.width() - win_w) // 2
y = geom.bottom() - win_h - 15
elif pos_mode == "Top Center":
x = geom.x() + (geom.width() - win_w) // 2
y = geom.top() + 15
elif pos_mode == "Bottom Right":
x = geom.right() - win_w - 15
y = geom.bottom() - win_h - 15
elif pos_mode == "Top Right":
x = geom.right() - win_w - 15
y = geom.top() + 15
elif pos_mode == "Bottom Left":
x = geom.left() + 15
y = geom.bottom() - win_h - 15
elif pos_mode == "Top Left":
x = geom.left() + 15
y = geom.top() + 15
# Apply Offsets
x += offset_x
y += offset_y
self.overlay_root.setX(x)
self.overlay_root.setY(y)
@Slot()
def transcribe_file(self):
file_path, _ = QFileDialog.getOpenFileName(None, "Select Audio", "", "Audio (*.mp3 *.wav *.flac *.m4a *.ogg)")
if file_path:
self.bridge.update_status("Thinking...")
# Files use the default configured task usually, or we could ask?
# Default to config setting for files.
self.worker = TranscriptionWorker(self.transcriber, file_path, is_file=True, parent=self)
self.worker.finished.connect(self.on_transcription_done)
self.worker.start()
@Slot()
def on_silence_detected(self):
from PySide6.QtCore import QMetaObject, Qt
# Silence detection always triggers the task that was active?
# Since silence stops recording, it just calls toggle_recording with no arg, using the stored current_task?
# Let's ensure toggle_recording handles no arg calls by stopping the CURRENT task.
QMetaObject.invokeMethod(self, "toggle_recording", Qt.QueuedConnection)
@Slot(bool)
def on_ui_toggle_request(self, state):
if state != self.audio_engine.recording:
self.toggle_recording() # Default behavior for UI clicks
@Slot(str)
def on_transcription_done(self, text: str):
self.bridge.update_status("Ready")
self.bridge.isProcessing = False # Temporarily false? No, keep it true if we chain.
# Check LLM Settings -> AND check if the current task requested it
llm_enabled = self.config.get("llm_enabled")
requires_llm = getattr(self, "current_task_requires_llm", False)
# We only correct if:
# 1. LLM is globally enabled (safety switch)
# 2. current_task_requires_llm is True (triggered by Correct hotkey)
# OR 3. Maybe user WANTS global correction? Ideally user uses separate hotkey.
# Let's say: If "Correction" is enabled in settings, does it apply to ALL?
# The user's feedback suggests they DON'T want it on regular hotkey.
# So we enforce: Correct Hotkey -> Corrects. Regular Hotkey -> Raw.
# BUT we must handle the case where user expects the old behavior?
# Let's make it strict: Only correct if triggered by correct hotkey OR if we add a "Correct All" toggle later.
# For now, let's respect the flag. But wait, if llm_enabled is OFF, we shouldn't run it even if hotkey pressed?
# Yes, safety switch.
if text and llm_enabled and requires_llm:
# Chain to LLM
self.bridge.isProcessing = True
self.bridge.update_status("Correcting...")
mode = self.config.get("llm_mode")
self.llm_worker = LLMWorker(self.llm_engine, text, mode, parent=self)
self.llm_worker.finished.connect(self.on_llm_done)
self.llm_worker.start()
return
self.bridge.isProcessing = False
if text:
method = self.config.get("input_method")
speed = int(self.config.get("typing_speed"))
InputInjector.inject_text(text, method, speed)
@Slot(str)
def on_llm_done(self, text: str):
self.bridge.update_status("Ready")
self.bridge.isProcessing = False
if text:
method = self.config.get("input_method")
speed = int(self.config.get("typing_speed"))
InputInjector.inject_text(text, method, speed)
# Cleanup
if hasattr(self, 'llm_worker') and self.llm_worker:
self.llm_worker.deleteLater()
self.llm_worker = None
@Slot(bool)
def on_hotkeys_enabled_toggle(self, state):
if self.hk_transcribe: self.hk_transcribe.set_enabled(state)
if self.hk_translate: self.hk_translate.set_enabled(state)
@Slot(str)
def on_download_requested(self, size):
if self.bridge.isDownloading:
return
self.bridge.update_status("Downloading...")
self.bridge.isDownloading = True
self.download_worker = DownloadWorker(size, parent=self)
self.download_worker.finished.connect(self.on_download_finished)
self.download_worker.error.connect(self.on_download_error)
self.download_worker.start()
@Slot()
def on_llm_download_requested(self):
if self.bridge.isDownloading: return
self.bridge.update_status("Downloading LLM...")
self.bridge.isDownloading = True
self.llm_dl_worker = LLMDownloadWorker(parent=self)
self.llm_dl_worker.progress.connect(self.on_loader_progress) # Reuse existing progress slot? Yes.
self.llm_dl_worker.finished.connect(self.on_download_finished) # Reuses same cleanup
self.llm_dl_worker.error.connect(self.on_download_error)
self.llm_dl_worker.start()
def on_download_finished(self):
self.bridge.isDownloading = False
self.bridge.update_status("Ready")
self.bridge.notifyModelStatesChanged() # Refresh UI markers
# Automatically load it now that it's here
threading.Thread(target=self.transcriber.load_model, daemon=True).start()
def on_download_error(self, err):
self.bridge.isDownloading = False
self.bridge.update_status("Error")
logging.error(f"Download Error: {err}")
def _update_overlay_state(self, is_active):
"""Update overlay visibility and input handling based on active state."""
if hasattr(self, 'window_hook'):
self.window_hook.set_enabled(is_active)
elif sys.platform == 'linux' and self.overlay_root:
self.overlay_root.setFlag(Qt.WindowTransparentForInput, not is_active)
@Slot(bool)
def on_ui_toggle_request(self, is_recording):
"""Called when recording state changes."""
is_active = is_recording or self.bridge.isProcessing
self._update_overlay_state(is_active)
@Slot(bool)
def on_processing_changed(self, is_processing):
is_active = self.bridge.isRecording or is_processing
self._update_overlay_state(is_active)
if __name__ == "__main__":
import sys
app = WhisperApp()
# Connect extra signal for processing state
app.bridge.isProcessingChanged.connect(app.on_processing_changed)
sys.exit(app.run())