diff --git a/pixstrip-core/src/executor.rs b/pixstrip-core/src/executor.rs new file mode 100644 index 0000000..3549247 --- /dev/null +++ b/pixstrip-core/src/executor.rs @@ -0,0 +1,173 @@ +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::Instant; + +use crate::encoder::OutputEncoder; +use crate::error::{PixstripError, Result}; +use crate::loader::ImageLoader; +use crate::operations::resize::resize_image; +use crate::pipeline::ProcessingJob; +use crate::types::ImageFormat; + +#[derive(Debug, Clone)] +pub struct ProgressUpdate { + pub current: usize, + pub total: usize, + pub current_file: String, + pub succeeded_so_far: usize, + pub failed_so_far: usize, +} + +#[derive(Debug, Clone)] +pub struct BatchResult { + pub total: usize, + pub succeeded: usize, + pub failed: usize, + pub cancelled: bool, + pub total_input_bytes: u64, + pub total_output_bytes: u64, + pub errors: Vec<(String, String)>, + pub elapsed_ms: u64, +} + +pub struct PipelineExecutor { + cancel_flag: Arc, +} + +impl PipelineExecutor { + pub fn new() -> Self { + Self { + cancel_flag: Arc::new(AtomicBool::new(false)), + } + } + + pub fn with_cancel(cancel_flag: Arc) -> Self { + Self { cancel_flag } + } + + pub fn execute(&self, job: &ProcessingJob, mut on_progress: F) -> Result + where + F: FnMut(ProgressUpdate) + Send, + { + let start = Instant::now(); + let total = job.sources.len(); + let loader = ImageLoader::new(); + let encoder = OutputEncoder::new(); + + let mut result = BatchResult { + total, + succeeded: 0, + failed: 0, + cancelled: false, + total_input_bytes: 0, + total_output_bytes: 0, + errors: Vec::new(), + elapsed_ms: 0, + }; + + for (i, source) in job.sources.iter().enumerate() { + if self.cancel_flag.load(Ordering::Relaxed) { + result.cancelled = true; + break; + } + + let file_name = source + .path + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or("unknown") + .to_string(); + + on_progress(ProgressUpdate { + current: i + 1, + total, + current_file: file_name.clone(), + succeeded_so_far: result.succeeded, + failed_so_far: result.failed, + }); + + match self.process_single(job, source, &loader, &encoder) { + Ok((input_size, output_size)) => { + result.succeeded += 1; + result.total_input_bytes += input_size; + result.total_output_bytes += output_size; + } + Err(e) => { + result.failed += 1; + result.errors.push((file_name, e.to_string())); + } + } + } + + result.elapsed_ms = start.elapsed().as_millis() as u64; + Ok(result) + } + + fn process_single( + &self, + job: &ProcessingJob, + source: &crate::types::ImageSource, + loader: &ImageLoader, + encoder: &OutputEncoder, + ) -> std::result::Result<(u64, u64), PixstripError> { + let input_size = std::fs::metadata(&source.path) + .map(|m| m.len()) + .unwrap_or(0); + + // Load image + let mut img = loader.load_pixels(&source.path)?; + + // Resize + if let Some(ref config) = job.resize { + img = resize_image(&img, config)?; + } + + // Determine output format + let output_format = if let Some(ref convert) = job.convert { + let input_fmt = source.original_format.unwrap_or(ImageFormat::Jpeg); + convert.output_format(input_fmt) + } else { + source.original_format.unwrap_or(ImageFormat::Jpeg) + }; + + // Determine quality + let quality = job.compress.as_ref().map(|c| match c { + crate::operations::CompressConfig::Preset(preset) => { + encoder.quality_for_format(output_format, preset) + } + crate::operations::CompressConfig::Custom { + jpeg_quality, + png_level: _, + webp_quality, + avif_quality: _, + } => match output_format { + ImageFormat::Jpeg => jpeg_quality.unwrap_or(85), + ImageFormat::WebP => webp_quality.map(|q| q as u8).unwrap_or(80), + _ => 85, + }, + }); + + // Determine output path + let output_path = job.output_path_for(source, Some(output_format)); + + // Ensure output directory exists + if let Some(parent) = output_path.parent() { + std::fs::create_dir_all(parent).map_err(PixstripError::Io)?; + } + + // Encode and save + encoder.encode_to_file(&img, &output_path, output_format, quality)?; + + let output_size = std::fs::metadata(&output_path) + .map(|m| m.len()) + .unwrap_or(0); + + Ok((input_size, output_size)) + } +} + +impl Default for PipelineExecutor { + fn default() -> Self { + Self::new() + } +} diff --git a/pixstrip-core/src/lib.rs b/pixstrip-core/src/lib.rs index dd4f864..01259d6 100644 --- a/pixstrip-core/src/lib.rs +++ b/pixstrip-core/src/lib.rs @@ -2,6 +2,7 @@ pub mod config; pub mod discovery; pub mod encoder; pub mod error; +pub mod executor; pub mod loader; pub mod operations; pub mod pipeline; diff --git a/pixstrip-core/tests/executor_tests.rs b/pixstrip-core/tests/executor_tests.rs new file mode 100644 index 0000000..2a66189 --- /dev/null +++ b/pixstrip-core/tests/executor_tests.rs @@ -0,0 +1,134 @@ +use pixstrip_core::executor::PipelineExecutor; +use pixstrip_core::operations::*; +use pixstrip_core::pipeline::ProcessingJob; +use pixstrip_core::types::*; +use std::sync::atomic::AtomicBool; +use std::sync::{Arc, Mutex}; + +fn create_test_jpeg(path: &std::path::Path) { + let img = image::RgbImage::from_fn(200, 150, |x, y| { + image::Rgb([(x % 256) as u8, (y % 256) as u8, 128]) + }); + img.save_with_format(path, image::ImageFormat::Jpeg).unwrap(); +} + +fn setup_test_dir() -> (tempfile::TempDir, std::path::PathBuf, std::path::PathBuf) { + let dir = tempfile::tempdir().unwrap(); + let input = dir.path().join("input"); + let output = dir.path().join("output"); + std::fs::create_dir_all(&input).unwrap(); + std::fs::create_dir_all(&output).unwrap(); + (dir, input, output) +} + +#[test] +fn execute_single_image_resize() { + let (_dir, input, output) = setup_test_dir(); + create_test_jpeg(&input.join("photo.jpg")); + + let mut job = ProcessingJob::new(&input, &output); + job.add_source(input.join("photo.jpg")); + job.resize = Some(ResizeConfig::ByWidth(100)); + + let executor = PipelineExecutor::new(); + let result = executor.execute(&job, |_| {}).unwrap(); + + assert_eq!(result.succeeded, 1); + assert_eq!(result.failed, 0); + assert!(output.join("photo.jpg").exists()); +} + +#[test] +fn execute_resize_and_convert() { + let (_dir, input, output) = setup_test_dir(); + create_test_jpeg(&input.join("photo.jpg")); + + let mut job = ProcessingJob::new(&input, &output); + job.add_source(input.join("photo.jpg")); + job.resize = Some(ResizeConfig::ByWidth(100)); + job.convert = Some(ConvertConfig::SingleFormat(ImageFormat::WebP)); + + let executor = PipelineExecutor::new(); + let result = executor.execute(&job, |_| {}).unwrap(); + + assert_eq!(result.succeeded, 1); + assert!(output.join("photo.webp").exists()); +} + +#[test] +fn execute_multiple_images() { + let (_dir, input, output) = setup_test_dir(); + create_test_jpeg(&input.join("a.jpg")); + create_test_jpeg(&input.join("b.jpg")); + create_test_jpeg(&input.join("c.jpg")); + + let mut job = ProcessingJob::new(&input, &output); + job.add_source(input.join("a.jpg")); + job.add_source(input.join("b.jpg")); + job.add_source(input.join("c.jpg")); + job.resize = Some(ResizeConfig::ByWidth(50)); + + let executor = PipelineExecutor::new(); + let result = executor.execute(&job, |_| {}).unwrap(); + + assert_eq!(result.succeeded, 3); + assert_eq!(result.failed, 0); + assert_eq!(result.total, 3); +} + +#[test] +fn execute_collects_progress() { + let (_dir, input, output) = setup_test_dir(); + create_test_jpeg(&input.join("photo.jpg")); + + let mut job = ProcessingJob::new(&input, &output); + job.add_source(input.join("photo.jpg")); + job.resize = Some(ResizeConfig::ByWidth(100)); + + let updates = Arc::new(Mutex::new(Vec::new())); + let updates_clone = updates.clone(); + + let executor = PipelineExecutor::new(); + executor.execute(&job, move |update| { + updates_clone.lock().unwrap().push(update); + }).unwrap(); + + let updates = updates.lock().unwrap(); + assert!(!updates.is_empty()); +} + +#[test] +fn execute_with_cancellation() { + let (_dir, input, output) = setup_test_dir(); + create_test_jpeg(&input.join("a.jpg")); + create_test_jpeg(&input.join("b.jpg")); + + let mut job = ProcessingJob::new(&input, &output); + job.add_source(input.join("a.jpg")); + job.add_source(input.join("b.jpg")); + job.resize = Some(ResizeConfig::ByWidth(100)); + + let cancel = Arc::new(AtomicBool::new(true)); // cancel immediately + let executor = PipelineExecutor::with_cancel(cancel); + let result = executor.execute(&job, |_| {}).unwrap(); + + // With immediate cancellation, fewer images should be processed + assert!(result.succeeded + result.failed <= 2); + assert!(result.cancelled); +} + +#[test] +fn batch_result_tracks_sizes() { + let (_dir, input, output) = setup_test_dir(); + create_test_jpeg(&input.join("photo.jpg")); + + let mut job = ProcessingJob::new(&input, &output); + job.add_source(input.join("photo.jpg")); + job.compress = Some(CompressConfig::Preset(QualityPreset::Low)); + + let executor = PipelineExecutor::new(); + let result = executor.execute(&job, |_| {}).unwrap(); + + assert!(result.total_input_bytes > 0); + assert!(result.total_output_bytes > 0); +}