Implement parallel image processing with rayon thread pool
The executor now uses rayon's thread pool for parallel processing when thread_count > 1. Progress updates are sent via mpsc channel from worker threads. Falls back to sequential processing for thread_count = 1.
This commit is contained in:
@@ -1,7 +1,9 @@
|
|||||||
use std::sync::atomic::{AtomicBool, Ordering};
|
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::{Arc, Mutex};
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
|
use rayon::prelude::*;
|
||||||
|
|
||||||
use crate::encoder::OutputEncoder;
|
use crate::encoder::OutputEncoder;
|
||||||
use crate::error::{PixstripError, Result};
|
use crate::error::{PixstripError, Result};
|
||||||
use crate::loader::ImageLoader;
|
use crate::loader::ImageLoader;
|
||||||
@@ -77,6 +79,155 @@ impl PipelineExecutor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn execute<F>(&self, job: &ProcessingJob, mut on_progress: F) -> Result<BatchResult>
|
pub fn execute<F>(&self, job: &ProcessingJob, mut on_progress: F) -> Result<BatchResult>
|
||||||
|
where
|
||||||
|
F: FnMut(ProgressUpdate) + Send,
|
||||||
|
{
|
||||||
|
let start = Instant::now();
|
||||||
|
let total = job.sources.len();
|
||||||
|
|
||||||
|
if total == 0 {
|
||||||
|
return Ok(BatchResult {
|
||||||
|
total: 0,
|
||||||
|
succeeded: 0,
|
||||||
|
failed: 0,
|
||||||
|
cancelled: false,
|
||||||
|
total_input_bytes: 0,
|
||||||
|
total_output_bytes: 0,
|
||||||
|
errors: Vec::new(),
|
||||||
|
elapsed_ms: 0,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use single-threaded path when thread_count is 1
|
||||||
|
if self.thread_count <= 1 {
|
||||||
|
return self.execute_sequential(job, on_progress);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build a scoped rayon thread pool with the configured count
|
||||||
|
let pool = rayon::ThreadPoolBuilder::new()
|
||||||
|
.num_threads(self.thread_count)
|
||||||
|
.build()
|
||||||
|
.map_err(|e| PixstripError::Processing {
|
||||||
|
operation: "thread_pool".into(),
|
||||||
|
reason: e.to_string(),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let completed = AtomicUsize::new(0);
|
||||||
|
let succeeded_count = AtomicUsize::new(0);
|
||||||
|
let failed_count = AtomicUsize::new(0);
|
||||||
|
let input_bytes = AtomicU64::new(0);
|
||||||
|
let output_bytes = AtomicU64::new(0);
|
||||||
|
let cancelled = AtomicBool::new(false);
|
||||||
|
let errors: Mutex<Vec<(String, String)>> = Mutex::new(Vec::new());
|
||||||
|
|
||||||
|
// Channel for progress updates from worker threads to the callback
|
||||||
|
let (tx, rx) = std::sync::mpsc::channel::<ProgressUpdate>();
|
||||||
|
|
||||||
|
let cancel_flag = &self.cancel_flag;
|
||||||
|
let pause_flag = &self.pause_flag;
|
||||||
|
let pause_on_error = self.pause_on_error;
|
||||||
|
|
||||||
|
// Run the parallel work in a scoped thread so we can drain progress concurrently
|
||||||
|
std::thread::scope(|scope| {
|
||||||
|
// Spawn the rayon pool work
|
||||||
|
let tx_clone = tx.clone();
|
||||||
|
let completed_ref = &completed;
|
||||||
|
let succeeded_ref = &succeeded_count;
|
||||||
|
let failed_ref = &failed_count;
|
||||||
|
let input_bytes_ref = &input_bytes;
|
||||||
|
let output_bytes_ref = &output_bytes;
|
||||||
|
let cancelled_ref = &cancelled;
|
||||||
|
let errors_ref = &errors;
|
||||||
|
|
||||||
|
let worker = scope.spawn(move || {
|
||||||
|
pool.install(|| {
|
||||||
|
job.sources.par_iter().for_each(|source| {
|
||||||
|
// Check cancel
|
||||||
|
if cancel_flag.load(Ordering::Relaxed) {
|
||||||
|
cancelled_ref.store(true, Ordering::Relaxed);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait while paused
|
||||||
|
while pause_flag.load(Ordering::Relaxed) {
|
||||||
|
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||||
|
if cancel_flag.load(Ordering::Relaxed) {
|
||||||
|
cancelled_ref.store(true, Ordering::Relaxed);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if cancelled_ref.load(Ordering::Relaxed) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let file_name = source
|
||||||
|
.path
|
||||||
|
.file_name()
|
||||||
|
.and_then(|n| n.to_str())
|
||||||
|
.unwrap_or("unknown")
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
let current = completed_ref.fetch_add(1, Ordering::Relaxed) + 1;
|
||||||
|
|
||||||
|
let _ = tx_clone.send(ProgressUpdate {
|
||||||
|
current,
|
||||||
|
total,
|
||||||
|
current_file: file_name.clone(),
|
||||||
|
succeeded_so_far: succeeded_ref.load(Ordering::Relaxed),
|
||||||
|
failed_so_far: failed_ref.load(Ordering::Relaxed),
|
||||||
|
});
|
||||||
|
|
||||||
|
let loader = ImageLoader::new();
|
||||||
|
let encoder = OutputEncoder::new();
|
||||||
|
|
||||||
|
match Self::process_single_static(job, source, &loader, &encoder) {
|
||||||
|
Ok((in_size, out_size)) => {
|
||||||
|
succeeded_ref.fetch_add(1, Ordering::Relaxed);
|
||||||
|
input_bytes_ref.fetch_add(in_size, Ordering::Relaxed);
|
||||||
|
output_bytes_ref.fetch_add(out_size, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
failed_ref.fetch_add(1, Ordering::Relaxed);
|
||||||
|
if let Ok(mut errs) = errors_ref.lock() {
|
||||||
|
errs.push((file_name, e.to_string()));
|
||||||
|
}
|
||||||
|
if pause_on_error {
|
||||||
|
pause_flag.store(true, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
// Drop sender so the receiver loop ends
|
||||||
|
drop(tx_clone);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Drop the original sender so only the worker's clone keeps the channel open
|
||||||
|
drop(tx);
|
||||||
|
|
||||||
|
// Drain progress updates on this thread (the calling thread)
|
||||||
|
for update in rx {
|
||||||
|
on_progress(update);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for worker to finish (it already has by the time rx is drained)
|
||||||
|
let _ = worker.join();
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(BatchResult {
|
||||||
|
total,
|
||||||
|
succeeded: succeeded_count.load(Ordering::Relaxed),
|
||||||
|
failed: failed_count.load(Ordering::Relaxed),
|
||||||
|
cancelled: cancelled.load(Ordering::Relaxed),
|
||||||
|
total_input_bytes: input_bytes.load(Ordering::Relaxed),
|
||||||
|
total_output_bytes: output_bytes.load(Ordering::Relaxed),
|
||||||
|
errors: errors.into_inner().unwrap_or_default(),
|
||||||
|
elapsed_ms: start.elapsed().as_millis() as u64,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn execute_sequential<F>(&self, job: &ProcessingJob, mut on_progress: F) -> Result<BatchResult>
|
||||||
where
|
where
|
||||||
F: FnMut(ProgressUpdate) + Send,
|
F: FnMut(ProgressUpdate) + Send,
|
||||||
{
|
{
|
||||||
@@ -102,7 +253,6 @@ impl PipelineExecutor {
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait while paused (check every 100ms)
|
|
||||||
while self.pause_flag.load(Ordering::Relaxed) {
|
while self.pause_flag.load(Ordering::Relaxed) {
|
||||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||||
if self.cancel_flag.load(Ordering::Relaxed) {
|
if self.cancel_flag.load(Ordering::Relaxed) {
|
||||||
@@ -129,7 +279,7 @@ impl PipelineExecutor {
|
|||||||
failed_so_far: result.failed,
|
failed_so_far: result.failed,
|
||||||
});
|
});
|
||||||
|
|
||||||
match self.process_single(job, source, &loader, &encoder) {
|
match Self::process_single_static(job, source, &loader, &encoder) {
|
||||||
Ok((input_size, output_size)) => {
|
Ok((input_size, output_size)) => {
|
||||||
result.succeeded += 1;
|
result.succeeded += 1;
|
||||||
result.total_input_bytes += input_size;
|
result.total_input_bytes += input_size;
|
||||||
@@ -149,8 +299,7 @@ impl PipelineExecutor {
|
|||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn process_single(
|
fn process_single_static(
|
||||||
&self,
|
|
||||||
job: &ProcessingJob,
|
job: &ProcessingJob,
|
||||||
source: &crate::types::ImageSource,
|
source: &crate::types::ImageSource,
|
||||||
loader: &ImageLoader,
|
loader: &ImageLoader,
|
||||||
|
|||||||
Reference in New Issue
Block a user