Initial commit of WhisperVoice

This commit is contained in:
2026-01-24 17:03:52 +02:00
commit a938c83a37
117 changed files with 6077 additions and 0 deletions

196
src/core/audio_engine.py Normal file
View File

@@ -0,0 +1,196 @@
"""
Audio Engine Module.
====================
This module handles the low-level audio recording capabilities using `sounddevice`.
It manages the input stream, buffers audio data in memory, and provides a callback
mechanism for real-time visualization of audio amplitude.
Classes:
AudioEngine: The main controller for recording streams.
"""
import sounddevice as sd
import numpy as np
import threading
import queue
import logging
from typing import Optional, Callable
from src.core.config import ConfigManager
class AudioEngine:
"""
Manages audio recording from the default input device.
Uses ConfigManager for settings (device, silence detection).
"""
def __init__(self, sample_rate: int = 16000, channels: int = 1):
"""
Initialize the AudioEngine.
"""
self.config = ConfigManager()
self.sample_rate = sample_rate
self.channels = channels
self.recording = False
self.stream: Optional[sd.InputStream] = None
self.visualizer_callback: Optional[Callable[[float], None]] = None
self.silence_callback: Optional[Callable[[], None]] = None
# Audio buffer to store the current session's frames
self.frames = []
self.last_noise_time = 0.0
def list_devices(self):
"""
Query available audio devices.
Returns:
DeviceList: A list of all available input/output devices seen by PortAudio.
"""
return sd.query_devices()
def set_visualizer_callback(self, callback: Callable[[float], None]):
"""
Register a callback function for visualizer updates.
Args:
callback (function): A function that accepts a single float argument (amplitude).
This will be called roughly every audio block.
"""
self.visualizer_callback = callback
def set_silence_callback(self, callback: Callable[[], None]):
"""
Register a callback function for silence detection (Auto-Stop).
"""
self.silence_callback = callback
def _audio_callback(self, indata: np.ndarray, frames: int, time, status: sd.CallbackFlags):
"""
Internal callback used by sounddevice to process incoming audio chunks.
Args:
indata (numpy.ndarray): The recorded audio data chunk.
frames (int): Number of frames.
time: Timestamp info.
status: Callback status flags (e.g., overflow warnings).
"""
if status:
logging.warning(f"Audio callback status: {status}")
if self.recording:
# Copy data to avoid buffer race conditions
data = indata.copy()
self.frames.append(data)
# Calculate amplitude for visualizer (Root Mean Square)
if self.visualizer_callback:
# Calculate RMS of the current chunk to determine loudness
rms = np.sqrt(np.mean(data**2))
# Apply logarithmic scaling for better sensitivity to quiet sounds
if rms > 0:
# Convert to dB scale, normalize, and apply compression
db = 20 * np.log10(rms + 1e-10) # Add small value to avoid log(0)
# Map from typical range (-60 to 0 dB) to (0 to 1)
amp = float(np.clip((db + 60) / 60, 0.0, 1.0))
# Apply power curve for better sensitivity at low levels
amp = np.power(amp, 0.5) # Square root gives good response to quiet sounds
else:
amp = 0.0
# Apply exponential smoothing to prevent jumpy waveform
if not hasattr(self, '_smoothed_amp'):
self._smoothed_amp = amp
else:
# Exponential moving average with smoothing factor 0.3
self._smoothed_amp = 0.3 * amp + 0.7 * self._smoothed_amp
self.visualizer_callback(self._smoothed_amp)
# --- Silence Detection Logic ---
# We calculate this even if visualizer is off
# Calculate linear RMS for VAD comparison
raw_rms = np.sqrt(np.mean(data**2))
# Heuristic mapping: 0.1 RMS = 100% threshold
vad_level = float(np.clip(raw_rms * 10, 0.0, 1.0))
import time
current_time = time.time()
# Fetch params dynamically
threshold = float(self.config.get("silence_threshold"))
duration = float(self.config.get("silence_duration"))
if vad_level > threshold:
self.last_noise_time = current_time
else:
# If we have been silent for > silence_duration, trigger auto-stop
if (current_time - self.last_noise_time) > duration:
if self.silence_callback:
logging.info(f"Silence detected ({duration}s). Triggering auto-stop.")
# Reset last_noise_time to prevent spamming
self.last_noise_time = current_time
self.silence_callback()
def start_recording(self, device: Optional[int] = None):
"""
Start the recording stream.
Args:
device (int, optional): The device ID to use. Defaults to system default.
"""
if self.recording:
return
self.frames = [] # Reset buffer
self.recording = True
import time
self.last_noise_time = time.time() # Reset silence timer
# Determine Device
# If passed arg is None, check Config. If Config is None, use Default.
if device is None:
device = self.config.get("input_device")
try:
self.stream = sd.InputStream(
samplerate=self.sample_rate,
channels=self.channels,
device=device,
callback=self._audio_callback
)
self.stream.start()
logging.info("Audio recording started.")
except Exception as e:
logging.error(f"Failed to start recording: {e}")
self.recording = False
def stop_recording(self) -> np.ndarray:
"""
Stop the current recording session and return the captured audio.
Returns:
np.ndarray: The complete audio recording flattened into a single numpy array.
Returns an empty array if nothing was recorded.
"""
if not self.recording:
return np.array([], dtype=np.float32)
self.recording = False
if self.stream:
self.stream.stop()
self.stream.close()
self.stream = None
logging.info("Audio recording stopped.")
if not self.frames:
return np.array([], dtype=np.float32)
# Concatenate all buffered chunks into one continuous array
# sounddevice returns (frames, channels), so we get (N, 1).
# Whisper expects flattened 1D array (N,).
audio = np.concatenate(self.frames, axis=0)
return audio.flatten()

117
src/core/config.py Normal file
View File

@@ -0,0 +1,117 @@
"""
Configuration Manager Module.
=============================
Singleton class to manage loading and saving application settings to a JSON file.
Ensures robustness by merging with defaults and handling file paths correctly.
"""
import json
import logging
from pathlib import Path
from typing import Any, Dict
from src.core.paths import get_base_path
# Default Configuration
DEFAULT_SETTINGS = {
"hotkey": "f8",
"model_size": "small",
"input_device": None, # Device ID (int) or Name (str), None = Default
"save_recordings": False, # Save .wav files for debugging
"silence_threshold": 0.02, # Amplitude threshold (0.0 - 1.0)
"silence_duration": 1.0, # Seconds of silence to trigger auto-submit
"visualizer_style": "line", # 'bar' or 'line'
"opacity": 1.0, # Window opacity (0.1 - 1.0)
"ui_scale": 1.0, # Global UI Scale (0.75 - 1.5)
"always_on_top": True,
"run_on_startup": False, # (Placeholder)
# Window Position
"overlay_position": "Bottom Center",
"overlay_offset_x": 0,
"overlay_offset_y": 0,
# Input
"input_method": "Clipboard Paste", # "Clipboard Paste" or "Simulate Typing"
"typing_speed": 100, # CPM (Chars Per Minute) if typing
# AI - Advanced
"language": "auto", # "auto" or ISO code
"compute_device": "auto", # "auto", "cuda", "cpu"
"compute_type": "int8", # "int8", "float16", "float32"
"beam_size": 5,
"best_of": 5,
"vad_filter": True,
"no_repeat_ngram_size": 0,
"condition_on_previous_text": True
}
class ConfigManager:
"""
Singleton Configuration Manager.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(ConfigManager, cls).__new__(cls)
cls._instance._init()
return cls._instance
def _init(self):
"""Initialize the config manager (called only once)."""
self.base_path = get_base_path()
self.config_file = self.base_path / "settings.json"
self.data = DEFAULT_SETTINGS.copy()
self.load()
def load(self):
"""Load settings from JSON file, merging with defaults."""
if self.config_file.exists():
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
loaded = json.load(f)
# Merge loaded data into defaults (preserves new default keys)
for key, value in loaded.items():
if key in DEFAULT_SETTINGS:
self.data[key] = value
logging.info(f"Settings loaded from {self.config_file}")
except Exception as e:
logging.error(f"Failed to load settings: {e}")
else:
logging.info("No settings file found. Using defaults.")
self.save()
def save(self):
"""Save current settings to JSON file."""
try:
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(self.data, f, indent=4)
logging.info("Settings saved.")
except Exception as e:
logging.error(f"Failed to save settings: {e}")
def get(self, key: str) -> Any:
"""Get a setting value."""
return self.data.get(key, DEFAULT_SETTINGS.get(key))
def set(self, key: str, value: Any):
"""Set a setting value and save."""
if self.data.get(key) != value:
self.data[key] = value
self.save()
def set_bulk(self, updates: Dict[str, Any]):
"""Update multiple keys and save once."""
changed = False
for k, v in updates.items():
if self.data.get(k) != v:
self.data[k] = v
changed = True
if changed:
self.save()

View File

@@ -0,0 +1,31 @@
@echo off
echo [DEBUG] LAUNCHER STARTED
echo [DEBUG] CWD: %CD%
echo [DEBUG] Python Path (expected relative): ..\python\python.exe
REM Read stdin to a file to verify data input (optional debugging)
REM python.exe might be in different relative path depending on where this bat is run
REM We assume this bat is in runtime/app/src/core/
REM So python is in ../../../python/python.exe
set PYTHON_EXE=..\..\..\python\python.exe
if exist "%PYTHON_EXE%" (
echo [DEBUG] Found Python at %PYTHON_EXE%
) else (
echo [ERROR] Python NOT found at %PYTHON_EXE%
echo [ERROR] Listing relative directories:
dir ..\..\..\
pause
exit /b 1
)
echo [DEBUG] Launching script: transcribe_worker.py
"%PYTHON_EXE%" transcribe_worker.py
if %ERRORLEVEL% NEQ 0 (
echo [ERROR] Python script failed with code %ERRORLEVEL%
pause
) else (
echo [SUCCESS] Script finished.
pause
)

View File

@@ -0,0 +1,95 @@
"""
Hotkey Manager Module.
======================
This module wraps the `keyboard` library to provide Global Hotkey functionality.
It allows the application to respond to key presses even when it is not in focus
(background operation).
Classes:
HotkeyManager: Qt-compatible wrapper for keyboard hooks.
"""
import keyboard
import logging
from PySide6.QtCore import QObject, Signal
from typing import Optional
class HotkeyManager(QObject):
"""
Manages global keyboard shortcuts using the `keyboard` library.
inherits from QObject to allow Signal/Slot integration with PySide6.
Signals:
triggered: Emitted when the hotkey is pressed.
Attributes:
hotkey (str): The key combination as a string (e.g. "f8", "ctrl+alt+r").
is_listening (bool): State of the listener.
"""
triggered = Signal()
def __init__(self, hotkey: str = "f8"):
"""
Initialize the HotkeyManager.
Args:
hotkey (str): The global hotkey string description. Default: "f8".
"""
super().__init__()
self.hotkey = hotkey
self.is_listening = False
self._enabled = True
def set_enabled(self, enabled: bool):
"""Enable or disable the hotkey trigger without unhooking."""
self._enabled = enabled
logging.info(f"Hotkey listener {'enabled' if enabled else 'suspended'}")
def start(self):
"""Start listening for the hotkey."""
self.reload_hotkey()
def reload_hotkey(self):
"""Unregister old hotkey and register new one from Config."""
if self.is_listening:
self.stop()
from src.core.config import ConfigManager
config = ConfigManager()
self.hotkey = config.get("hotkey")
logging.info(f"Registering global hotkey: {self.hotkey}")
try:
# We don't suppress=True here because we want the app to see keys during recording
# (Wait, actually if we are recording we WANT keyboard to see it,
# but usually global hotkeys should be suppressed if we don't want them leaking to other apps)
# However, the user is fixing the internal collision.
keyboard.add_hotkey(self.hotkey, self.on_press, suppress=False)
self.is_listening = True
except Exception as e:
logging.error(f"Failed to bind hotkey: {e}")
def stop(self):
"""
Stop listening and unregister the hook.
Safe to call even if not listening.
"""
if self.is_listening:
try:
keyboard.remove_hotkey(self.hotkey)
except:
pass
self.is_listening = False
logging.info(f"Unregistered global hotkey: {self.hotkey}")
def on_press(self):
"""
Callback triggered internally by the keyboard library when the key is pressed.
Emits the Qt `triggered` signal.
"""
if not self._enabled:
return
logging.info(f"Hotkey {self.hotkey} detected.")
self.triggered.emit()

86
src/core/paths.py Normal file
View File

@@ -0,0 +1,86 @@
"""
Paths Module.
=============
This module handles all file system path resolution for the application.
It is critical for ensuring portability, distinguishing between:
1. Running as a raw Python script (using `__file__`).
2. Running as a frozen PyInstaller EXE (using `sys.executable`).
It creates necessary directories (models, libs) if they do not exist.
"""
import sys
import os
from pathlib import Path
from typing import Optional
def get_bundle_path() -> Path:
"""
Returns the root directory of the application bundle.
When frozen, this is the internal temporary directory (sys._MEIPASS).
When running as script, this is the project root.
Use this for bundled assets like QML, SVGs, etc.
"""
if getattr(sys, 'frozen', False):
return Path(sys._MEIPASS)
# Project root (assuming this file is at src/core/paths.py)
return Path(__file__).resolve().parent.parent.parent
def get_base_path() -> Path:
"""
Returns the directory where persistent data should be stored.
Always points to the directory containing the .exe or the project root.
Use this for models, settings, recordings.
"""
if getattr(sys, 'frozen', False):
return Path(sys.executable).parent
return get_bundle_path()
def get_models_path() -> Path:
"""
Returns the absolute path to the 'models' directory.
This directory is used to store the Whisper AI model files.
The directory is automatically created if it does not exist.
Returns:
Path: Absolute path to the ./models directory next to the output binary.
"""
path = get_base_path() / "models"
path.mkdir(parents=True, exist_ok=True)
return path
def get_libs_path() -> Path:
"""
Returns the absolute path to the 'libs' directory.
This directory is used to store external binaries like `ffmpeg.exe`.
The directory is automatically created if it does not exist.
Returns:
Path: Absolute path to the ./libs directory next to the output binary.
"""
path = get_base_path() / "libs"
path.mkdir(parents=True, exist_ok=True)
return path
def get_ffmpeg_path() -> str:
"""
Resolves the path to the FFmpeg executable.
Logic:
1. Checks for `ffmpeg.exe` in the local `./libs` folder.
2. Fallbacks to the system-wide "ffmpeg" command if local file is missing.
Returns:
str: Absolute path to the local binary, or just "ffmpeg" string for system PATH lookup.
"""
libs_path = get_libs_path()
ffmpeg_exe = libs_path / "ffmpeg.exe"
if ffmpeg_exe.exists():
return str(ffmpeg_exe.absolute())
# Fallback to system PATH
return "ffmpeg"

View File

@@ -0,0 +1,127 @@
"""
Transcription Worker Subprocess.
================================
This script is designed to be run as a subprocess. It:
1. Receives configuration and audio data via stdin (pickled)
2. Loads the Whisper model
3. Transcribes the audio
4. Prints the result to stdout
5. Exits (letting the OS reclaim all memory)
This ensures complete RAM/VRAM cleanup after each transcription.
"""
import sys
import pickle
import logging
import os
import traceback
# Enable debug logging to file for definitive troubleshooting
log_file = os.path.join(os.path.dirname(__file__), "worker_debug.log")
logging.basicConfig(
level=logging.DEBUG,
filename=log_file,
filemode='w',
format='[WORKER] %(message)s'
)
def main():
try:
# Read pickled data from stdin
data = pickle.load(sys.stdin.buffer)
config = data['config']
audio_data = data['audio']
model_path = data['model_path']
libs_path = data['libs_path']
# Add libs to PATH for cuDNN etc
os.environ["PATH"] += os.pathsep + str(libs_path)
# Import and load model
from faster_whisper import WhisperModel
config = data.get('config', {})
model_path_arg = config.get('model_size') # Now receives full path
device = config.get("compute_device", "cuda")
compute = config.get("compute_type", "float16")
logging.info(f"Worker initializing model from: '{model_path_arg}'")
# Verify path existence for debugging
if os.path.exists(model_path_arg):
logging.info(f"Path verification: EXISTS. Is dir: {os.path.isdir(model_path_arg)}")
else:
logging.error(f"Path verification: DOES NOT EXIST!")
model = WhisperModel(
model_path_arg,
device=device,
compute_type=compute,
download_root=model_path,
local_files_only=True # FORCE offline mode
)
# Transcription parameters
lang = config.get("language", "auto")
if lang == "auto": lang = None
beam_size = int(config.get("beam_size", 5))
best_of = int(config.get("best_of", 5))
vad = config.get("vad_filter", True)
no_repeat_ngram = int(config.get("no_repeat_ngram_size", 0))
condition_prev = config.get("condition_on_previous_text", True)
# Transcribe with more lenient settings for challenging audio
segments, info = model.transcribe(
audio_data,
beam_size=beam_size,
best_of=best_of,
language=lang,
vad_filter=vad,
vad_parameters=dict(min_silence_duration_ms=500),
no_repeat_ngram_size=no_repeat_ngram,
condition_on_previous_text=condition_prev,
# Lenient thresholds for music/singing
compression_ratio_threshold=10.0, # Default 2.4, higher = more lenient
log_prob_threshold=-2.0, # Default -1.0, lower = more lenient
no_speech_threshold=0.9, # Default 0.6, higher = more lenient
without_timestamps=True, # Faster for file processing
)
text_result = ""
for segment in segments:
text_result += segment.text
text_result = text_result.strip()
# Output result as pickled data
pickle.dump({'success': True, 'text': text_result}, sys.stdout.buffer)
sys.stdout.buffer.flush()
except Exception as e:
# Output error with detailed traceback
error_msg = f"{str(e)}\n{traceback.format_exc()}"
logging.error(f"Worker failed: {error_msg}")
pickle.dump({'success': False, 'error': error_msg}, sys.stdout.buffer)
sys.stdout.buffer.flush()
if __name__ == "__main__":
try:
main()
except Exception:
# Catch ALL errors and print them so the user can see in the console
import traceback
traceback.print_exc()
# Log to file if possible as well
logging.error("CRITICAL WORKER CRASH")
logging.error(traceback.format_exc())
# KEY: Pause so the window doesn't close immediately
print("\n" + "="*60)
print("CRITICAL ERROR IN WORKER PROCESS")
print("Please take a screenshot of this window.")
print("="*60)
input("Press Enter to close this window...")

129
src/core/transcriber.py Normal file
View File

@@ -0,0 +1,129 @@
"""
Whisper Transcriber Module.
===========================
Transcriber Module.
===================
Handles audio transcription using faster-whisper.
Runs IN-PROCESS (no subprocess) to ensure stability on all systems.
"""
import os
import logging
from typing import Optional
import numpy as np
from src.core.config import ConfigManager
from src.core.paths import get_models_path
# Import directly - valid since we are now running in the full environment
from faster_whisper import WhisperModel
class WhisperTranscriber:
"""
Manages the faster-whisper model and transcription process.
"""
def __init__(self):
"""Initialize settings."""
self.config = ConfigManager()
self.model = None
self.current_model_size = None
self.current_compute_device = None
self.current_compute_type = None
def load_model(self):
"""
Loads the model specified in config.
Safe to call multiple times (checks if reload needed).
"""
size = self.config.get("model_size")
device = self.config.get("compute_device")
compute = self.config.get("compute_type")
# Check if already loaded
if (self.model and
self.current_model_size == size and
self.current_compute_device == device and
self.current_compute_type == compute):
return
logging.info(f"Loading Model: {size} on {device} ({compute})...")
try:
# Construct path to local model for offline support
new_path = get_models_path() / f"faster-whisper-{size}"
model_input = str(new_path) if new_path.exists() else size
# Force offline if path exists to avoid HF errors
local_only = new_path.exists()
self.model = WhisperModel(
model_input,
device=device,
compute_type=compute,
download_root=str(get_models_path()),
local_files_only=local_only
)
self.current_model_size = size
self.current_compute_device = device
self.current_compute_type = compute
logging.info("Model loaded successfully.")
except Exception as e:
logging.error(f"Failed to load model: {e}")
self.model = None
def transcribe(self, audio_data, is_file: bool = False) -> str:
"""
Transcribe audio data.
"""
logging.info(f"Starting transcription... (is_file={is_file})")
# Ensure model is loaded
if not self.model:
self.load_model()
if not self.model:
return "Error: Model failed to load."
try:
# Config
beam_size = int(self.config.get("beam_size"))
best_of = int(self.config.get("best_of"))
vad = False if is_file else self.config.get("vad_filter")
# Transcribe
segments, info = self.model.transcribe(
audio_data,
beam_size=beam_size,
best_of=best_of,
vad_filter=vad,
vad_parameters=dict(min_silence_duration_ms=500),
condition_on_previous_text=self.config.get("condition_on_previous_text"),
without_timestamps=True
)
# Aggregate text
text_result = ""
for segment in segments:
text_result += segment.text + " "
return text_result.strip()
except Exception as e:
logging.error(f"Transcription failed: {e}")
return f"Error: {str(e)}"
def model_exists(self, size: str) -> bool:
"""Checks if a model size is already downloaded."""
new_path = get_models_path() / f"faster-whisper-{size}"
if (new_path / "config.json").exists():
return True
# Legacy HF cache check
folder_name = f"models--Systran--faster-whisper-{size}"
path = get_models_path() / folder_name / "snapshots"
if path.exists() and any(path.iterdir()):
return True
return False