use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Instant; use crate::encoder::OutputEncoder; use crate::error::{PixstripError, Result}; use crate::loader::ImageLoader; use crate::operations::resize::resize_image; use crate::pipeline::ProcessingJob; use crate::types::ImageFormat; #[derive(Debug, Clone)] pub struct ProgressUpdate { pub current: usize, pub total: usize, pub current_file: String, pub succeeded_so_far: usize, pub failed_so_far: usize, } #[derive(Debug, Clone)] pub struct BatchResult { pub total: usize, pub succeeded: usize, pub failed: usize, pub cancelled: bool, pub total_input_bytes: u64, pub total_output_bytes: u64, pub errors: Vec<(String, String)>, pub elapsed_ms: u64, } pub struct PipelineExecutor { cancel_flag: Arc, pause_flag: Arc, } impl PipelineExecutor { pub fn new() -> Self { Self { cancel_flag: Arc::new(AtomicBool::new(false)), pause_flag: Arc::new(AtomicBool::new(false)), } } pub fn with_cancel(cancel_flag: Arc) -> Self { Self { cancel_flag, pause_flag: Arc::new(AtomicBool::new(false)), } } pub fn with_cancel_and_pause( cancel_flag: Arc, pause_flag: Arc, ) -> Self { Self { cancel_flag, pause_flag, } } pub fn execute(&self, job: &ProcessingJob, mut on_progress: F) -> Result where F: FnMut(ProgressUpdate) + Send, { let start = Instant::now(); let total = job.sources.len(); let loader = ImageLoader::new(); let encoder = OutputEncoder::new(); let mut result = BatchResult { total, succeeded: 0, failed: 0, cancelled: false, total_input_bytes: 0, total_output_bytes: 0, errors: Vec::new(), elapsed_ms: 0, }; for (i, source) in job.sources.iter().enumerate() { if self.cancel_flag.load(Ordering::Relaxed) { result.cancelled = true; break; } // Wait while paused (check every 100ms) while self.pause_flag.load(Ordering::Relaxed) { std::thread::sleep(std::time::Duration::from_millis(100)); if self.cancel_flag.load(Ordering::Relaxed) { result.cancelled = true; break; } } if result.cancelled { break; } let file_name = source .path .file_name() .and_then(|n| n.to_str()) .unwrap_or("unknown") .to_string(); on_progress(ProgressUpdate { current: i + 1, total, current_file: file_name.clone(), succeeded_so_far: result.succeeded, failed_so_far: result.failed, }); match self.process_single(job, source, &loader, &encoder) { Ok((input_size, output_size)) => { result.succeeded += 1; result.total_input_bytes += input_size; result.total_output_bytes += output_size; } Err(e) => { result.failed += 1; result.errors.push((file_name, e.to_string())); } } } result.elapsed_ms = start.elapsed().as_millis() as u64; Ok(result) } fn process_single( &self, job: &ProcessingJob, source: &crate::types::ImageSource, loader: &ImageLoader, encoder: &OutputEncoder, ) -> std::result::Result<(u64, u64), PixstripError> { let input_size = std::fs::metadata(&source.path) .map(|m| m.len()) .unwrap_or(0); // Load image let mut img = loader.load_pixels(&source.path)?; // Resize if let Some(ref config) = job.resize { img = resize_image(&img, config)?; } // Determine output format let output_format = if let Some(ref convert) = job.convert { let input_fmt = source.original_format.unwrap_or(ImageFormat::Jpeg); convert.output_format(input_fmt) } else { source.original_format.unwrap_or(ImageFormat::Jpeg) }; // Determine quality let quality = job.compress.as_ref().map(|c| match c { crate::operations::CompressConfig::Preset(preset) => { encoder.quality_for_format(output_format, preset) } crate::operations::CompressConfig::Custom { jpeg_quality, png_level: _, webp_quality, avif_quality: _, } => match output_format { ImageFormat::Jpeg => jpeg_quality.unwrap_or(85), ImageFormat::WebP => webp_quality.map(|q| q as u8).unwrap_or(80), _ => 85, }, }); // Determine output path let output_path = job.output_path_for(source, Some(output_format)); // Ensure output directory exists if let Some(parent) = output_path.parent() { std::fs::create_dir_all(parent).map_err(PixstripError::Io)?; } // Encode and save encoder.encode_to_file(&img, &output_path, output_format, quality)?; let output_size = std::fs::metadata(&output_path) .map(|m| m.len()) .unwrap_or(0); Ok((input_size, output_size)) } } impl Default for PipelineExecutor { fn default() -> Self { Self::new() } }