//! An interface for dealing with the kinds of parallel computations involved in //! `bellman`. It's currently just a thin wrapper around [`CpuPool`] and //! [`crossbeam`] but may be extended in the future to allow for various //! parallelism strategies. //! //! [`CpuPool`]: futures_cpupool::CpuPool #[cfg(feature = "multicore")] mod implementation { use crossbeam::{self, thread::Scope}; use futures::{Future, IntoFuture, Poll}; use futures_cpupool::{CpuFuture, CpuPool}; use num_cpus; #[derive(Clone)] pub struct Worker { cpus: usize, pool: CpuPool, } impl Worker { // We don't expose this outside the library so that // all `Worker` instances have the same number of // CPUs configured. pub(crate) fn new_with_cpus(cpus: usize) -> Worker { Worker { cpus, pool: CpuPool::new(cpus), } } pub fn new() -> Worker { Self::new_with_cpus(num_cpus::get()) } pub fn log_num_cpus(&self) -> u32 { log2_floor(self.cpus) } pub fn compute(&self, f: F) -> WorkerFuture where F: FnOnce() -> R + Send + 'static, R: IntoFuture + 'static, R::Future: Send + 'static, R::Item: Send + 'static, R::Error: Send + 'static, { WorkerFuture { future: self.pool.spawn_fn(f), } } pub fn scope<'a, F, R>(&self, elements: usize, f: F) -> R where F: FnOnce(&Scope<'a>, usize) -> R, { let chunk_size = if elements < self.cpus { 1 } else { elements / self.cpus }; // TODO: Handle case where threads fail crossbeam::scope(|scope| f(scope, chunk_size)) .expect("Threads aren't allowed to fail yet") } } pub struct WorkerFuture { future: CpuFuture, } impl Future for WorkerFuture { type Item = T; type Error = E; fn poll(&mut self) -> Poll { self.future.poll() } } fn log2_floor(num: usize) -> u32 { assert!(num > 0); let mut pow = 0; while (1 << (pow + 1)) <= num { pow += 1; } pow } #[test] fn test_log2_floor() { assert_eq!(log2_floor(1), 0); assert_eq!(log2_floor(2), 1); assert_eq!(log2_floor(3), 1); assert_eq!(log2_floor(4), 2); assert_eq!(log2_floor(5), 2); assert_eq!(log2_floor(6), 2); assert_eq!(log2_floor(7), 2); assert_eq!(log2_floor(8), 3); } } #[cfg(not(feature = "multicore"))] mod implementation { use futures::{future, Future, IntoFuture, Poll}; #[derive(Clone)] pub struct Worker; impl Worker { pub fn new() -> Worker { Worker } pub fn log_num_cpus(&self) -> u32 { 0 } pub fn compute(&self, f: F) -> R::Future where F: FnOnce() -> R + Send + 'static, R: IntoFuture + 'static, R::Future: Send + 'static, R::Item: Send + 'static, R::Error: Send + 'static, { f().into_future() } pub fn scope(&self, elements: usize, f: F) -> R where F: FnOnce(&DummyScope, usize) -> R, { f(&DummyScope, elements) } } pub struct WorkerFuture { future: future::FutureResult, } impl Future for WorkerFuture { type Item = T; type Error = E; fn poll(&mut self) -> Poll { self.future.poll() } } pub struct DummyScope; impl DummyScope { pub fn spawn(&self, f: F) { f(self); } } } pub use self::implementation::*;