use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Instant; use rayon::prelude::*; use crate::encoder::OutputEncoder; use crate::error::{PixstripError, Result}; use crate::loader::ImageLoader; use crate::operations::resize::resize_image; use crate::operations::watermark::apply_watermark; use crate::operations::{Flip, Rotation}; use crate::pipeline::ProcessingJob; use crate::types::ImageFormat; #[derive(Debug, Clone)] pub struct ProgressUpdate { pub current: usize, pub total: usize, pub current_file: String, pub succeeded_so_far: usize, pub failed_so_far: usize, } #[derive(Debug, Clone)] pub struct BatchResult { pub total: usize, pub succeeded: usize, pub failed: usize, pub cancelled: bool, pub total_input_bytes: u64, pub total_output_bytes: u64, pub errors: Vec<(String, String)>, pub elapsed_ms: u64, } pub struct PipelineExecutor { cancel_flag: Arc, pause_flag: Arc, thread_count: usize, pause_on_error: bool, } impl PipelineExecutor { pub fn new() -> Self { Self { cancel_flag: Arc::new(AtomicBool::new(false)), pause_flag: Arc::new(AtomicBool::new(false)), thread_count: num_cpus(), pause_on_error: false, } } pub fn with_cancel(cancel_flag: Arc) -> Self { Self { cancel_flag, pause_flag: Arc::new(AtomicBool::new(false)), thread_count: num_cpus(), pause_on_error: false, } } pub fn with_cancel_and_pause( cancel_flag: Arc, pause_flag: Arc, ) -> Self { Self { cancel_flag, pause_flag, thread_count: num_cpus(), pause_on_error: false, } } pub fn set_thread_count(&mut self, count: usize) { self.thread_count = if count == 0 { num_cpus() } else { count }; } pub fn set_pause_on_error(&mut self, pause: bool) { self.pause_on_error = pause; } pub fn execute(&self, job: &ProcessingJob, mut on_progress: F) -> Result where F: FnMut(ProgressUpdate) + Send, { let start = Instant::now(); let total = job.sources.len(); if total == 0 { return Ok(BatchResult { total: 0, succeeded: 0, failed: 0, cancelled: false, total_input_bytes: 0, total_output_bytes: 0, errors: Vec::new(), elapsed_ms: 0, }); } // Use single-threaded path when thread_count is 1 if self.thread_count <= 1 { return self.execute_sequential(job, on_progress); } // Build a scoped rayon thread pool with the configured count let pool = rayon::ThreadPoolBuilder::new() .num_threads(self.thread_count) .build() .map_err(|e| PixstripError::Processing { operation: "thread_pool".into(), reason: e.to_string(), })?; let completed = AtomicUsize::new(0); let succeeded_count = AtomicUsize::new(0); let failed_count = AtomicUsize::new(0); let input_bytes = AtomicU64::new(0); let output_bytes = AtomicU64::new(0); let cancelled = AtomicBool::new(false); let errors: Mutex> = Mutex::new(Vec::new()); // Channel for progress updates from worker threads to the callback let (tx, rx) = std::sync::mpsc::channel::(); let cancel_flag = &self.cancel_flag; let pause_flag = &self.pause_flag; let pause_on_error = self.pause_on_error; // Run the parallel work in a scoped thread so we can drain progress concurrently std::thread::scope(|scope| { // Spawn the rayon pool work let tx_clone = tx.clone(); let completed_ref = &completed; let succeeded_ref = &succeeded_count; let failed_ref = &failed_count; let input_bytes_ref = &input_bytes; let output_bytes_ref = &output_bytes; let cancelled_ref = &cancelled; let errors_ref = &errors; let worker = scope.spawn(move || { pool.install(|| { job.sources.par_iter().enumerate().for_each(|(idx, source)| { // Check cancel if cancel_flag.load(Ordering::Relaxed) { cancelled_ref.store(true, Ordering::Relaxed); return; } // Wait while paused while pause_flag.load(Ordering::Relaxed) { std::thread::sleep(std::time::Duration::from_millis(100)); if cancel_flag.load(Ordering::Relaxed) { cancelled_ref.store(true, Ordering::Relaxed); return; } } if cancelled_ref.load(Ordering::Relaxed) { return; } let file_name = source .path .file_name() .and_then(|n| n.to_str()) .unwrap_or("unknown") .to_string(); let current = completed_ref.fetch_add(1, Ordering::Relaxed) + 1; let _ = tx_clone.send(ProgressUpdate { current, total, current_file: file_name.clone(), succeeded_so_far: succeeded_ref.load(Ordering::Relaxed), failed_so_far: failed_ref.load(Ordering::Relaxed), }); let loader = ImageLoader::new(); let encoder = OutputEncoder::new(); match Self::process_single_static(job, source, &loader, &encoder, idx) { Ok((in_size, out_size)) => { succeeded_ref.fetch_add(1, Ordering::Relaxed); input_bytes_ref.fetch_add(in_size, Ordering::Relaxed); output_bytes_ref.fetch_add(out_size, Ordering::Relaxed); } Err(e) => { failed_ref.fetch_add(1, Ordering::Relaxed); if let Ok(mut errs) = errors_ref.lock() { errs.push((file_name, e.to_string())); } if pause_on_error { pause_flag.store(true, Ordering::Relaxed); } } } }); }); // Drop sender so the receiver loop ends drop(tx_clone); }); // Drop the original sender so only the worker's clone keeps the channel open drop(tx); // Drain progress updates on this thread (the calling thread) for update in rx { on_progress(update); } // Wait for worker to finish (it already has by the time rx is drained) let _ = worker.join(); }); Ok(BatchResult { total, succeeded: succeeded_count.load(Ordering::Relaxed), failed: failed_count.load(Ordering::Relaxed), cancelled: cancelled.load(Ordering::Relaxed), total_input_bytes: input_bytes.load(Ordering::Relaxed), total_output_bytes: output_bytes.load(Ordering::Relaxed), errors: errors.into_inner().unwrap_or_default(), elapsed_ms: start.elapsed().as_millis() as u64, }) } fn execute_sequential(&self, job: &ProcessingJob, mut on_progress: F) -> Result where F: FnMut(ProgressUpdate) + Send, { let start = Instant::now(); let total = job.sources.len(); let loader = ImageLoader::new(); let encoder = OutputEncoder::new(); let mut result = BatchResult { total, succeeded: 0, failed: 0, cancelled: false, total_input_bytes: 0, total_output_bytes: 0, errors: Vec::new(), elapsed_ms: 0, }; for (i, source) in job.sources.iter().enumerate() { if self.cancel_flag.load(Ordering::Relaxed) { result.cancelled = true; break; } while self.pause_flag.load(Ordering::Relaxed) { std::thread::sleep(std::time::Duration::from_millis(100)); if self.cancel_flag.load(Ordering::Relaxed) { result.cancelled = true; break; } } if result.cancelled { break; } let file_name = source .path .file_name() .and_then(|n| n.to_str()) .unwrap_or("unknown") .to_string(); on_progress(ProgressUpdate { current: i + 1, total, current_file: file_name.clone(), succeeded_so_far: result.succeeded, failed_so_far: result.failed, }); match Self::process_single_static(job, source, &loader, &encoder, i) { Ok((input_size, output_size)) => { result.succeeded += 1; result.total_input_bytes += input_size; result.total_output_bytes += output_size; } Err(e) => { result.failed += 1; result.errors.push((file_name, e.to_string())); if self.pause_on_error { self.pause_flag.store(true, Ordering::Relaxed); } } } } result.elapsed_ms = start.elapsed().as_millis() as u64; Ok(result) } fn process_single_static( job: &ProcessingJob, source: &crate::types::ImageSource, loader: &ImageLoader, encoder: &OutputEncoder, index: usize, ) -> std::result::Result<(u64, u64), PixstripError> { let input_size = std::fs::metadata(&source.path) .map(|m| m.len()) .unwrap_or(0); // Load image let mut img = loader.load_pixels(&source.path)?; // Rotation if let Some(ref rotation) = job.rotation { img = match rotation { Rotation::None => img, Rotation::Cw90 => img.rotate90(), Rotation::Cw180 => img.rotate180(), Rotation::Cw270 => img.rotate270(), Rotation::AutoOrient => img, }; } // Flip if let Some(ref flip) = job.flip { img = match flip { Flip::None => img, Flip::Horizontal => img.fliph(), Flip::Vertical => img.flipv(), }; } // Resize if let Some(ref config) = job.resize { img = resize_image(&img, config)?; } // Watermark (after resize so watermark is at correct scale) if let Some(ref config) = job.watermark { img = apply_watermark(img, config)?; } // Determine output format let output_format = if let Some(ref convert) = job.convert { let input_fmt = source.original_format.unwrap_or(ImageFormat::Jpeg); convert.output_format(input_fmt) } else { source.original_format.unwrap_or(ImageFormat::Jpeg) }; // Determine quality let quality = job.compress.as_ref().map(|c| match c { crate::operations::CompressConfig::Preset(preset) => { encoder.quality_for_format(output_format, preset) } crate::operations::CompressConfig::Custom { jpeg_quality, png_level: _, webp_quality, avif_quality: _, } => match output_format { ImageFormat::Jpeg => jpeg_quality.unwrap_or(85), ImageFormat::WebP => webp_quality.map(|q| q as u8).unwrap_or(80), _ => 85, }, }); // Determine output path (with rename if configured) let output_path = if let Some(ref rename) = job.rename { let stem = source .path .file_stem() .and_then(|s| s.to_str()) .unwrap_or("output"); let ext = output_format.extension(); if let Some(ref template) = rename.template { let dims = Some((img.width(), img.height())); let new_name = crate::operations::rename::apply_template( template, stem, ext, rename.counter_start + index as u32, dims, ); job.output_dir.join(new_name) } else { let new_name = rename.apply_simple(stem, ext, index as u32 + 1); job.output_dir.join(new_name) } } else { job.output_path_for(source, Some(output_format)) }; // Ensure output directory exists if let Some(parent) = output_path.parent() { std::fs::create_dir_all(parent).map_err(PixstripError::Io)?; } // Encode and save encoder.encode_to_file(&img, &output_path, output_format, quality)?; // Metadata stripping: re-encoding through the image crate naturally // strips all EXIF/metadata. No additional action is needed for // StripAll, Privacy, or Custom modes. KeepAll mode would require // copying EXIF tags back from the source file using little_exif. if let Some(ref meta_config) = job.metadata { if matches!(meta_config, crate::operations::MetadataConfig::KeepAll) { copy_metadata_from_source(&source.path, &output_path); } } let output_size = std::fs::metadata(&output_path) .map(|m| m.len()) .unwrap_or(0); Ok((input_size, output_size)) } } impl Default for PipelineExecutor { fn default() -> Self { Self::new() } } fn num_cpus() -> usize { std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(1) } fn copy_metadata_from_source(source: &std::path::Path, output: &std::path::Path) { // Best-effort: try to copy EXIF from source to output using little_exif. // If it fails (e.g. non-JPEG, no EXIF), silently continue. let Ok(metadata) = little_exif::metadata::Metadata::new_from_path(source) else { return; }; let _: std::result::Result<(), std::io::Error> = metadata.write_to_file(output); }