From fa7936abbd62ffc6b1ce72944908ca473cb77227 Mon Sep 17 00:00:00 2001 From: lashman Date: Fri, 6 Mar 2026 13:57:14 +0200 Subject: [PATCH] Implement parallel image processing with rayon thread pool The executor now uses rayon's thread pool for parallel processing when thread_count > 1. Progress updates are sent via mpsc channel from worker threads. Falls back to sequential processing for thread_count = 1. --- pixstrip-core/src/executor.rs | 161 ++++++++++++++++++++++++++++++++-- 1 file changed, 155 insertions(+), 6 deletions(-) diff --git a/pixstrip-core/src/executor.rs b/pixstrip-core/src/executor.rs index 05f66fa..39a1aa3 100644 --- a/pixstrip-core/src/executor.rs +++ b/pixstrip-core/src/executor.rs @@ -1,7 +1,9 @@ -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; use std::time::Instant; +use rayon::prelude::*; + use crate::encoder::OutputEncoder; use crate::error::{PixstripError, Result}; use crate::loader::ImageLoader; @@ -77,6 +79,155 @@ impl PipelineExecutor { } pub fn execute(&self, job: &ProcessingJob, mut on_progress: F) -> Result + where + F: FnMut(ProgressUpdate) + Send, + { + let start = Instant::now(); + let total = job.sources.len(); + + if total == 0 { + return Ok(BatchResult { + total: 0, + succeeded: 0, + failed: 0, + cancelled: false, + total_input_bytes: 0, + total_output_bytes: 0, + errors: Vec::new(), + elapsed_ms: 0, + }); + } + + // Use single-threaded path when thread_count is 1 + if self.thread_count <= 1 { + return self.execute_sequential(job, on_progress); + } + + // Build a scoped rayon thread pool with the configured count + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(self.thread_count) + .build() + .map_err(|e| PixstripError::Processing { + operation: "thread_pool".into(), + reason: e.to_string(), + })?; + + let completed = AtomicUsize::new(0); + let succeeded_count = AtomicUsize::new(0); + let failed_count = AtomicUsize::new(0); + let input_bytes = AtomicU64::new(0); + let output_bytes = AtomicU64::new(0); + let cancelled = AtomicBool::new(false); + let errors: Mutex> = Mutex::new(Vec::new()); + + // Channel for progress updates from worker threads to the callback + let (tx, rx) = std::sync::mpsc::channel::(); + + let cancel_flag = &self.cancel_flag; + let pause_flag = &self.pause_flag; + let pause_on_error = self.pause_on_error; + + // Run the parallel work in a scoped thread so we can drain progress concurrently + std::thread::scope(|scope| { + // Spawn the rayon pool work + let tx_clone = tx.clone(); + let completed_ref = &completed; + let succeeded_ref = &succeeded_count; + let failed_ref = &failed_count; + let input_bytes_ref = &input_bytes; + let output_bytes_ref = &output_bytes; + let cancelled_ref = &cancelled; + let errors_ref = &errors; + + let worker = scope.spawn(move || { + pool.install(|| { + job.sources.par_iter().for_each(|source| { + // Check cancel + if cancel_flag.load(Ordering::Relaxed) { + cancelled_ref.store(true, Ordering::Relaxed); + return; + } + + // Wait while paused + while pause_flag.load(Ordering::Relaxed) { + std::thread::sleep(std::time::Duration::from_millis(100)); + if cancel_flag.load(Ordering::Relaxed) { + cancelled_ref.store(true, Ordering::Relaxed); + return; + } + } + + if cancelled_ref.load(Ordering::Relaxed) { + return; + } + + let file_name = source + .path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unknown") + .to_string(); + + let current = completed_ref.fetch_add(1, Ordering::Relaxed) + 1; + + let _ = tx_clone.send(ProgressUpdate { + current, + total, + current_file: file_name.clone(), + succeeded_so_far: succeeded_ref.load(Ordering::Relaxed), + failed_so_far: failed_ref.load(Ordering::Relaxed), + }); + + let loader = ImageLoader::new(); + let encoder = OutputEncoder::new(); + + match Self::process_single_static(job, source, &loader, &encoder) { + Ok((in_size, out_size)) => { + succeeded_ref.fetch_add(1, Ordering::Relaxed); + input_bytes_ref.fetch_add(in_size, Ordering::Relaxed); + output_bytes_ref.fetch_add(out_size, Ordering::Relaxed); + } + Err(e) => { + failed_ref.fetch_add(1, Ordering::Relaxed); + if let Ok(mut errs) = errors_ref.lock() { + errs.push((file_name, e.to_string())); + } + if pause_on_error { + pause_flag.store(true, Ordering::Relaxed); + } + } + } + }); + }); + // Drop sender so the receiver loop ends + drop(tx_clone); + }); + + // Drop the original sender so only the worker's clone keeps the channel open + drop(tx); + + // Drain progress updates on this thread (the calling thread) + for update in rx { + on_progress(update); + } + + // Wait for worker to finish (it already has by the time rx is drained) + let _ = worker.join(); + }); + + Ok(BatchResult { + total, + succeeded: succeeded_count.load(Ordering::Relaxed), + failed: failed_count.load(Ordering::Relaxed), + cancelled: cancelled.load(Ordering::Relaxed), + total_input_bytes: input_bytes.load(Ordering::Relaxed), + total_output_bytes: output_bytes.load(Ordering::Relaxed), + errors: errors.into_inner().unwrap_or_default(), + elapsed_ms: start.elapsed().as_millis() as u64, + }) + } + + fn execute_sequential(&self, job: &ProcessingJob, mut on_progress: F) -> Result where F: FnMut(ProgressUpdate) + Send, { @@ -102,7 +253,6 @@ impl PipelineExecutor { break; } - // Wait while paused (check every 100ms) while self.pause_flag.load(Ordering::Relaxed) { std::thread::sleep(std::time::Duration::from_millis(100)); if self.cancel_flag.load(Ordering::Relaxed) { @@ -129,7 +279,7 @@ impl PipelineExecutor { failed_so_far: result.failed, }); - match self.process_single(job, source, &loader, &encoder) { + match Self::process_single_static(job, source, &loader, &encoder) { Ok((input_size, output_size)) => { result.succeeded += 1; result.total_input_bytes += input_size; @@ -149,8 +299,7 @@ impl PipelineExecutor { Ok(result) } - fn process_single( - &self, + fn process_single_static( job: &ProcessingJob, source: &crate::types::ImageSource, loader: &ImageLoader,