Add pipeline executor with progress reporting and cancellation
Sequential execution with per-image progress callbacks, cancellation via atomic flag, batch result tracking (success/fail/sizes/timing). Phase 5 complete - 85 tests passing, zero clippy warnings.
This commit is contained in:
173
pixstrip-core/src/executor.rs
Normal file
173
pixstrip-core/src/executor.rs
Normal file
@@ -0,0 +1,173 @@
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use crate::encoder::OutputEncoder;
|
||||
use crate::error::{PixstripError, Result};
|
||||
use crate::loader::ImageLoader;
|
||||
use crate::operations::resize::resize_image;
|
||||
use crate::pipeline::ProcessingJob;
|
||||
use crate::types::ImageFormat;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ProgressUpdate {
|
||||
pub current: usize,
|
||||
pub total: usize,
|
||||
pub current_file: String,
|
||||
pub succeeded_so_far: usize,
|
||||
pub failed_so_far: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct BatchResult {
|
||||
pub total: usize,
|
||||
pub succeeded: usize,
|
||||
pub failed: usize,
|
||||
pub cancelled: bool,
|
||||
pub total_input_bytes: u64,
|
||||
pub total_output_bytes: u64,
|
||||
pub errors: Vec<(String, String)>,
|
||||
pub elapsed_ms: u64,
|
||||
}
|
||||
|
||||
pub struct PipelineExecutor {
|
||||
cancel_flag: Arc<AtomicBool>,
|
||||
}
|
||||
|
||||
impl PipelineExecutor {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
cancel_flag: Arc::new(AtomicBool::new(false)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_cancel(cancel_flag: Arc<AtomicBool>) -> Self {
|
||||
Self { cancel_flag }
|
||||
}
|
||||
|
||||
pub fn execute<F>(&self, job: &ProcessingJob, mut on_progress: F) -> Result<BatchResult>
|
||||
where
|
||||
F: FnMut(ProgressUpdate) + Send,
|
||||
{
|
||||
let start = Instant::now();
|
||||
let total = job.sources.len();
|
||||
let loader = ImageLoader::new();
|
||||
let encoder = OutputEncoder::new();
|
||||
|
||||
let mut result = BatchResult {
|
||||
total,
|
||||
succeeded: 0,
|
||||
failed: 0,
|
||||
cancelled: false,
|
||||
total_input_bytes: 0,
|
||||
total_output_bytes: 0,
|
||||
errors: Vec::new(),
|
||||
elapsed_ms: 0,
|
||||
};
|
||||
|
||||
for (i, source) in job.sources.iter().enumerate() {
|
||||
if self.cancel_flag.load(Ordering::Relaxed) {
|
||||
result.cancelled = true;
|
||||
break;
|
||||
}
|
||||
|
||||
let file_name = source
|
||||
.path
|
||||
.file_name()
|
||||
.and_then(|n| n.to_str())
|
||||
.unwrap_or("unknown")
|
||||
.to_string();
|
||||
|
||||
on_progress(ProgressUpdate {
|
||||
current: i + 1,
|
||||
total,
|
||||
current_file: file_name.clone(),
|
||||
succeeded_so_far: result.succeeded,
|
||||
failed_so_far: result.failed,
|
||||
});
|
||||
|
||||
match self.process_single(job, source, &loader, &encoder) {
|
||||
Ok((input_size, output_size)) => {
|
||||
result.succeeded += 1;
|
||||
result.total_input_bytes += input_size;
|
||||
result.total_output_bytes += output_size;
|
||||
}
|
||||
Err(e) => {
|
||||
result.failed += 1;
|
||||
result.errors.push((file_name, e.to_string()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result.elapsed_ms = start.elapsed().as_millis() as u64;
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
fn process_single(
|
||||
&self,
|
||||
job: &ProcessingJob,
|
||||
source: &crate::types::ImageSource,
|
||||
loader: &ImageLoader,
|
||||
encoder: &OutputEncoder,
|
||||
) -> std::result::Result<(u64, u64), PixstripError> {
|
||||
let input_size = std::fs::metadata(&source.path)
|
||||
.map(|m| m.len())
|
||||
.unwrap_or(0);
|
||||
|
||||
// Load image
|
||||
let mut img = loader.load_pixels(&source.path)?;
|
||||
|
||||
// Resize
|
||||
if let Some(ref config) = job.resize {
|
||||
img = resize_image(&img, config)?;
|
||||
}
|
||||
|
||||
// Determine output format
|
||||
let output_format = if let Some(ref convert) = job.convert {
|
||||
let input_fmt = source.original_format.unwrap_or(ImageFormat::Jpeg);
|
||||
convert.output_format(input_fmt)
|
||||
} else {
|
||||
source.original_format.unwrap_or(ImageFormat::Jpeg)
|
||||
};
|
||||
|
||||
// Determine quality
|
||||
let quality = job.compress.as_ref().map(|c| match c {
|
||||
crate::operations::CompressConfig::Preset(preset) => {
|
||||
encoder.quality_for_format(output_format, preset)
|
||||
}
|
||||
crate::operations::CompressConfig::Custom {
|
||||
jpeg_quality,
|
||||
png_level: _,
|
||||
webp_quality,
|
||||
avif_quality: _,
|
||||
} => match output_format {
|
||||
ImageFormat::Jpeg => jpeg_quality.unwrap_or(85),
|
||||
ImageFormat::WebP => webp_quality.map(|q| q as u8).unwrap_or(80),
|
||||
_ => 85,
|
||||
},
|
||||
});
|
||||
|
||||
// Determine output path
|
||||
let output_path = job.output_path_for(source, Some(output_format));
|
||||
|
||||
// Ensure output directory exists
|
||||
if let Some(parent) = output_path.parent() {
|
||||
std::fs::create_dir_all(parent).map_err(PixstripError::Io)?;
|
||||
}
|
||||
|
||||
// Encode and save
|
||||
encoder.encode_to_file(&img, &output_path, output_format, quality)?;
|
||||
|
||||
let output_size = std::fs::metadata(&output_path)
|
||||
.map(|m| m.len())
|
||||
.unwrap_or(0);
|
||||
|
||||
Ok((input_size, output_size))
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for PipelineExecutor {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ pub mod config;
|
||||
pub mod discovery;
|
||||
pub mod encoder;
|
||||
pub mod error;
|
||||
pub mod executor;
|
||||
pub mod loader;
|
||||
pub mod operations;
|
||||
pub mod pipeline;
|
||||
|
||||
134
pixstrip-core/tests/executor_tests.rs
Normal file
134
pixstrip-core/tests/executor_tests.rs
Normal file
@@ -0,0 +1,134 @@
|
||||
use pixstrip_core::executor::PipelineExecutor;
|
||||
use pixstrip_core::operations::*;
|
||||
use pixstrip_core::pipeline::ProcessingJob;
|
||||
use pixstrip_core::types::*;
|
||||
use std::sync::atomic::AtomicBool;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
fn create_test_jpeg(path: &std::path::Path) {
|
||||
let img = image::RgbImage::from_fn(200, 150, |x, y| {
|
||||
image::Rgb([(x % 256) as u8, (y % 256) as u8, 128])
|
||||
});
|
||||
img.save_with_format(path, image::ImageFormat::Jpeg).unwrap();
|
||||
}
|
||||
|
||||
fn setup_test_dir() -> (tempfile::TempDir, std::path::PathBuf, std::path::PathBuf) {
|
||||
let dir = tempfile::tempdir().unwrap();
|
||||
let input = dir.path().join("input");
|
||||
let output = dir.path().join("output");
|
||||
std::fs::create_dir_all(&input).unwrap();
|
||||
std::fs::create_dir_all(&output).unwrap();
|
||||
(dir, input, output)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn execute_single_image_resize() {
|
||||
let (_dir, input, output) = setup_test_dir();
|
||||
create_test_jpeg(&input.join("photo.jpg"));
|
||||
|
||||
let mut job = ProcessingJob::new(&input, &output);
|
||||
job.add_source(input.join("photo.jpg"));
|
||||
job.resize = Some(ResizeConfig::ByWidth(100));
|
||||
|
||||
let executor = PipelineExecutor::new();
|
||||
let result = executor.execute(&job, |_| {}).unwrap();
|
||||
|
||||
assert_eq!(result.succeeded, 1);
|
||||
assert_eq!(result.failed, 0);
|
||||
assert!(output.join("photo.jpg").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn execute_resize_and_convert() {
|
||||
let (_dir, input, output) = setup_test_dir();
|
||||
create_test_jpeg(&input.join("photo.jpg"));
|
||||
|
||||
let mut job = ProcessingJob::new(&input, &output);
|
||||
job.add_source(input.join("photo.jpg"));
|
||||
job.resize = Some(ResizeConfig::ByWidth(100));
|
||||
job.convert = Some(ConvertConfig::SingleFormat(ImageFormat::WebP));
|
||||
|
||||
let executor = PipelineExecutor::new();
|
||||
let result = executor.execute(&job, |_| {}).unwrap();
|
||||
|
||||
assert_eq!(result.succeeded, 1);
|
||||
assert!(output.join("photo.webp").exists());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn execute_multiple_images() {
|
||||
let (_dir, input, output) = setup_test_dir();
|
||||
create_test_jpeg(&input.join("a.jpg"));
|
||||
create_test_jpeg(&input.join("b.jpg"));
|
||||
create_test_jpeg(&input.join("c.jpg"));
|
||||
|
||||
let mut job = ProcessingJob::new(&input, &output);
|
||||
job.add_source(input.join("a.jpg"));
|
||||
job.add_source(input.join("b.jpg"));
|
||||
job.add_source(input.join("c.jpg"));
|
||||
job.resize = Some(ResizeConfig::ByWidth(50));
|
||||
|
||||
let executor = PipelineExecutor::new();
|
||||
let result = executor.execute(&job, |_| {}).unwrap();
|
||||
|
||||
assert_eq!(result.succeeded, 3);
|
||||
assert_eq!(result.failed, 0);
|
||||
assert_eq!(result.total, 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn execute_collects_progress() {
|
||||
let (_dir, input, output) = setup_test_dir();
|
||||
create_test_jpeg(&input.join("photo.jpg"));
|
||||
|
||||
let mut job = ProcessingJob::new(&input, &output);
|
||||
job.add_source(input.join("photo.jpg"));
|
||||
job.resize = Some(ResizeConfig::ByWidth(100));
|
||||
|
||||
let updates = Arc::new(Mutex::new(Vec::new()));
|
||||
let updates_clone = updates.clone();
|
||||
|
||||
let executor = PipelineExecutor::new();
|
||||
executor.execute(&job, move |update| {
|
||||
updates_clone.lock().unwrap().push(update);
|
||||
}).unwrap();
|
||||
|
||||
let updates = updates.lock().unwrap();
|
||||
assert!(!updates.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn execute_with_cancellation() {
|
||||
let (_dir, input, output) = setup_test_dir();
|
||||
create_test_jpeg(&input.join("a.jpg"));
|
||||
create_test_jpeg(&input.join("b.jpg"));
|
||||
|
||||
let mut job = ProcessingJob::new(&input, &output);
|
||||
job.add_source(input.join("a.jpg"));
|
||||
job.add_source(input.join("b.jpg"));
|
||||
job.resize = Some(ResizeConfig::ByWidth(100));
|
||||
|
||||
let cancel = Arc::new(AtomicBool::new(true)); // cancel immediately
|
||||
let executor = PipelineExecutor::with_cancel(cancel);
|
||||
let result = executor.execute(&job, |_| {}).unwrap();
|
||||
|
||||
// With immediate cancellation, fewer images should be processed
|
||||
assert!(result.succeeded + result.failed <= 2);
|
||||
assert!(result.cancelled);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn batch_result_tracks_sizes() {
|
||||
let (_dir, input, output) = setup_test_dir();
|
||||
create_test_jpeg(&input.join("photo.jpg"));
|
||||
|
||||
let mut job = ProcessingJob::new(&input, &output);
|
||||
job.add_source(input.join("photo.jpg"));
|
||||
job.compress = Some(CompressConfig::Preset(QualityPreset::Low));
|
||||
|
||||
let executor = PipelineExecutor::new();
|
||||
let result = executor.execute(&job, |_| {}).unwrap();
|
||||
|
||||
assert!(result.total_input_bytes > 0);
|
||||
assert!(result.total_output_bytes > 0);
|
||||
}
|
||||
Reference in New Issue
Block a user