From 042641ea2cfe3ad53eeae5c0cc311bf46f587d07 Mon Sep 17 00:00:00 2001 From: Dennis Kobert Date: Wed, 15 Jan 2020 01:20:04 +0100 Subject: Add better error handling instead of unwapping everything --- src/main.rs | 4 +- src/solvers/gpu/host.rs | 43 +++++++-------- src/solvers/gpu/manager.rs | 129 ++++++++++++++++++++++++--------------------- src/solvers/gpu/mod.rs | 39 ++++++++++++++ src/solvers/gpu/output.rs | 49 ++++++++--------- src/solvers/gpusolver.rs | 37 +++++++------ src/solvers/mod.rs | 2 +- src/solvers/single.rs | 10 ++-- src/structs.rs | 19 +++++-- 9 files changed, 191 insertions(+), 141 deletions(-) diff --git a/src/main.rs b/src/main.rs index c7997bb..ee7082d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,13 +3,13 @@ mod solvers; mod structs; use crate::solvers::{IteratorSolver, Solver}; -pub static N: u32 = 6; +pub static N: u32 = 8; fn main() { //let mut solver = solvers::single::NormalSolver::new(N); //solver.solve(); let solver = solvers::gpusolver::GpuSolver::new(N); //println!("solver: {:?}", solver); for (i, solution) in solver.solve().enumerate() { - //println!("{}: {:?}", i, solution); + println!("{}: {:?}", i, solution); } } diff --git a/src/solvers/gpu/host.rs b/src/solvers/gpu/host.rs index 4fe835a..6b91d74 100644 --- a/src/solvers/gpu/host.rs +++ b/src/solvers/gpu/host.rs @@ -1,4 +1,4 @@ -use super::{Message, ResultMessage}; +use super::{GpuError, Message, ResultMessage}; use ocl::{flags, Buffer, Context, Device, Kernel, Platform, Program, Queue}; use std::sync::mpsc::{channel, Receiver, Sender}; @@ -70,7 +70,9 @@ impl Host { let handle = std::thread::Builder::new() .name("GPU Host Deamon".into()) .spawn(move || { - solver.run(); + if let Err(err) = solver.run() { + println!("{}", err); + } }) .unwrap(); println!("started gpu thread"); @@ -84,11 +86,10 @@ impl Host { } fn get_off(&self, queue: usize) -> usize { let chunk = self.permutations.len() / self.n as usize; - let off = self.permutations.len() - chunk - self.get_dim(queue); - if off > isize::max_value() as usize { - panic!("workgroup size to big, offset underflow") + if self.permutations.len() < chunk + self.get_dim(queue) { + panic!("workgroup size too big; offset underflow") } - off + self.permutations.len() - chunk - self.get_dim(queue) } fn get_res(&self, queue: usize) -> usize { let dim = self.get_dim(queue); @@ -98,7 +99,7 @@ impl Host { (self.wg_size + 63) / 64 } - fn run(self) { + fn run(self) -> Result<(), GpuError> { let queues = (self.n - self.h + 1) as usize; let mut instruction_buffer = Vec::with_capacity((self.n - self.h) as usize); let mut result_buffer = Vec::with_capacity((self.n - self.h) as usize); @@ -108,27 +109,25 @@ impl Host { .queue(self.queue.clone()) .len(self.wg_size) .flags(flags::MEM_READ_WRITE) - .build() - .unwrap(); + .build()?; instruction_buffer.push(buffer); let results: Buffer = Buffer::builder() .queue(self.queue.clone()) .len(self.get_res(i)) .flags(flags::MEM_READ_WRITE) - .build() - .unwrap(); + .build()?; result_buffer.push(results); } println!("finished gpu setup"); loop { - match self.receiver.recv().expect("Channel to Host broke") { + match self.receiver.recv()? { Message::CpuDone => { - self.output_sender.send(Message::CpuDone); - return; + self.output_sender.send(Message::CpuDone)?; + return Ok(()); } Message::Terminate => { - return; + return Ok(()); } Message::HostMessage((id, i, buffer)) => { let i = i as usize; @@ -137,7 +136,7 @@ impl Host { let res = self.get_res(i); let res_size = self.get_res_save_dim(); - instruction_buffer[i].write(&buffer).enq().unwrap(); + instruction_buffer[i].write(&buffer).enq()?; //println!("dim: {}", dim); //println!("off: {}", self.get_off(i)); @@ -154,8 +153,7 @@ impl Host { .arg(self.n) .arg(self.w) .arg(off) - .build() - .unwrap(); + .build()?; unsafe { kernel @@ -164,8 +162,7 @@ impl Host { .global_work_offset(kernel.default_global_work_offset()) .global_work_size(dim) .local_work_size(self.wg_size) - .enq() - .unwrap(); + .enq()?; } // (5) Read results from the device into a vector (`::block` not shown): @@ -175,8 +172,7 @@ impl Host { .queue(&self.queue) .offset(0) .read(&mut data) - .enq() - .unwrap(); + .enq()?; self.output_sender .send(Message::ResultMessage(ResultMessage::new( data, @@ -184,8 +180,7 @@ impl Host { res_size, self.wg_size, id, - ))) - .unwrap(); + )))?; } m => println!("Invalid MessageType {:?} recived by host", m), } diff --git a/src/solvers/gpu/manager.rs b/src/solvers/gpu/manager.rs index 42af314..6eaaaaa 100644 --- a/src/solvers/gpu/manager.rs +++ b/src/solvers/gpu/manager.rs @@ -1,6 +1,7 @@ -use super::{CheckRequest, Message, RowResult}; +use super::{CheckRequest, GpuError, Message}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::thread::JoinHandle; +type Buffer<'a> = (&'a [u64], &'a [Vec]); #[derive(Debug)] struct RequestBuffer { @@ -17,18 +18,19 @@ impl RequestBuffer { pointer: 0, } } - pub fn read(&mut self, request: CheckRequest) -> Option<(&[u64], &[Vec])> { + pub fn read(&mut self, request: CheckRequest) -> Result, GpuError> { self.mask_buff[self.pointer] = request.bitmask; self.row_buff[self.pointer] = request.rows; self.pointer += 1; if self.pointer == self.mask_buff.len() { self.pointer = 0; - return Some((self.mask_buff.as_ref(), self.row_buff.as_ref())); + return Ok(Some((self.mask_buff.as_ref(), self.row_buff.as_ref()))); } - None + Ok(None) } - fn flush(&mut self) { - while self.read(CheckRequest::new(vec![], 0, 0)).is_none() {} + fn flush(&mut self) -> Result<(), GpuError> { + while self.read(CheckRequest::new(vec![], 0, 0))?.is_none() {} + Ok(()) } } @@ -50,13 +52,13 @@ impl OclManager { // Workgroup size, set to 0 for max mut wg_size: usize, result_output: Sender, - ) -> (Sender, JoinHandle<()>) { + ) -> Result<(Sender, JoinHandle<()>), GpuError> { let (h, w) = crate::solvers::wall_stats(n); let src = include_str!("check.cl"); let platform = ocl::Platform::default(); - let device = ocl::Device::first(platform).expect("failed to create opencl device"); - let max_wg_size = device.max_wg_size().expect("failed to query max_wg_size"); + let device = ocl::Device::first(platform)?; + let max_wg_size = device.max_wg_size()?; if wg_size == 0 { wg_size = max_wg_size; } else if wg_size > max_wg_size { @@ -65,7 +67,7 @@ impl OclManager { let (output_sender, output_handle) = super::output::Output::launch_sevice(permutations, permutations_mask, result_output); - let (host_sender, host_handle) = super::host::Host::launch_sevice( + if let Ok((host_sender, host_handle)) = super::host::Host::launch_sevice( permutations_mask, n, h, @@ -73,76 +75,85 @@ impl OclManager { wg_size, src, output_sender.clone(), - ) - .unwrap(); + ) { + let (sender, receiver) = channel(); - let (sender, receiver) = channel(); + println!("wg {}", wg_size); + let mut buffers = Vec::with_capacity((n - h + 1) as usize); + for _ in 0..=(n - h) { + buffers.push(RequestBuffer::new(wg_size as usize)); + } - println!("wg {}", wg_size); - let mut buffers = Vec::with_capacity((n - h + 1) as usize); - for _ in 0..=(n - h) { - buffers.push(RequestBuffer::new(wg_size as usize)); + let manager = Self { + job_id: 0, + host_sender, + output_sender, + receiver, + buffers, + output_handle, + host_handle: Some(host_handle), + }; + Ok(( + sender, + std::thread::Builder::new() + .name("GPU Manager Deamon".into()) + .spawn(move || { + if let Err(err) = manager.run() { + println!("{}", err); + } + }) + .unwrap(), + )) + } else { + Err(GpuError::from( + "Failed to launch the opnecl thread".to_string(), + )) } - - let manager = Self { - job_id: 0, - host_sender, - output_sender, - receiver, - buffers, - output_handle, - host_handle: Some(host_handle), - }; - ( - sender, - std::thread::Builder::new() - .name("GPU Manager Deamon".into()) - .spawn(move || { - manager.run(); - }) - .unwrap(), - ) } - fn run(mut self) { + fn run(mut self) -> Result<(), GpuError> { loop { - match self.receiver.recv().expect("Channel to GPU Manager broke") { + match self.receiver.recv()? { Message::CheckRequest(request) => { let queue = request.queue; //println!("num: {:?} bit {:b}", request.rows, request.bitmask); - if let Some(buffer) = self.buffers[queue as usize].read(request) { - self.host_sender - .send(Message::HostMessage((self.job_id, queue, buffer.0.into()))) - .unwrap(); + if let Some(buffer) = self.buffers[queue as usize].read(request)? { + self.host_sender.send(Message::HostMessage(( + self.job_id, + queue, + buffer.0.into(), + )))?; self.output_sender - .send(Message::OutputMessage((self.job_id, buffer.1.into()))) - .unwrap(); + .send(Message::OutputMessage((self.job_id, buffer.1.into())))?; self.job_id += 1; } } Message::CpuDone => { for (i, b) in self.buffers.iter_mut().enumerate() { - b.flush(); - self.host_sender - .send(Message::HostMessage(( - self.job_id, - i as u32, - b.mask_buff.clone(), - ))) - .unwrap(); + b.flush()?; + self.host_sender.send(Message::HostMessage(( + self.job_id, + i as u32, + b.mask_buff.clone(), + )))?; self.output_sender - .send(Message::OutputMessage((self.job_id, b.row_buff.clone()))) - .unwrap(); + .send(Message::OutputMessage((self.job_id, b.row_buff.clone())))?; self.job_id += 1; } println!("flushing buffers"); - self.host_sender.send(Message::CpuDone); - self.host_handle.take().unwrap().join(); + self.host_sender.send(Message::CpuDone)?; + self.host_handle + .take() + .unwrap() + .join() + .expect("failed to join host thread"); } Message::Terminate => { - self.output_sender.send(Message::Terminate); - self.output_handle.join(); - return; + self.output_sender.send(Message::Terminate)?; + self.output_handle + .join() + .expect("failed to join ouput thread"); + return Ok(()); } _ => println!("Invalid MessageType"), } diff --git a/src/solvers/gpu/mod.rs b/src/solvers/gpu/mod.rs index 17bc964..431c035 100644 --- a/src/solvers/gpu/mod.rs +++ b/src/solvers/gpu/mod.rs @@ -72,6 +72,45 @@ impl ResultMessage { result } } +use std::sync::mpsc::{RecvError, SendError}; + +pub struct GpuError { + message: String, +} + +impl From> for GpuError { + fn from(error: SendError) -> Self { + Self { + message: format!("Gpu error occured: {}", error), + } + } +} +impl From for GpuError { + fn from(error: RecvError) -> Self { + Self { + message: format!("Gpu error occured: {}", error), + } + } +} +impl From for GpuError { + fn from(error: String) -> Self { + Self { + message: format!("Gpu error occured: {}", error), + } + } +} +impl From for GpuError { + fn from(error: ocl::Error) -> Self { + Self { + message: format!("Gpu error occured: {}", error), + } + } +} +impl std::fmt::Display for GpuError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.message) + } +} #[derive(Debug)] pub struct CheckRequest { diff --git a/src/solvers/gpu/output.rs b/src/solvers/gpu/output.rs index 9cbef1a..2ff1d01 100644 --- a/src/solvers/gpu/output.rs +++ b/src/solvers/gpu/output.rs @@ -19,31 +19,22 @@ impl InBuffer { banned_requests: HashSet::new(), } } - fn read(&mut self) -> Option> { + fn read(&mut self) -> Result>, super::GpuError> { loop { - //println!("{:?}", self.receiver.recv().unwrap()); - //continue; - - match self - .receiver - .recv() - .expect("Channel to Output Daemon broke") - { + match self.receiver.recv()? { Message::ResultMessage(results) => { if results.data.iter().any(|x| *x != 0) { println!("Horay results!"); if let Some(result_walls) = self.row_requests.get(&results.id) { - return Some(Self::calc_results( + return Ok(Some(Self::calc_results( results.valid_walls().as_ref(), result_walls, - )); + ))); } else { self.results_requests.insert(results.id, results); } - } else { - if self.row_requests.remove(&results.id).is_none() { - self.banned_requests.insert(results.id); - } + } else if self.row_requests.remove(&results.id).is_none() { + self.banned_requests.insert(results.id); } } Message::OutputMessage((id, output)) => { @@ -51,19 +42,19 @@ impl InBuffer { continue; } if let Some(results) = self.results_requests.get(&id) { - return Some(Self::calc_results( + return Ok(Some(Self::calc_results( results.valid_walls().as_ref(), output.as_ref(), - )); + ))); } else { self.row_requests.insert(id, output); } } Message::CpuDone => { - return None; + return Ok(None); } Message::Terminate => { - return None; + return Ok(None); } _ => { println!("Invalid MessageType"); @@ -86,7 +77,9 @@ impl InBuffer { pub struct Output { input: InBuffer, + #[allow(unused)] permutations: Vec>, + #[allow(unused)] permutations_mask: Vec, results: HashSet, result_sender: Sender, @@ -113,21 +106,21 @@ impl Output { std::thread::Builder::new() .name("GPU Output Deamon".into()) .spawn(move || { - output.run(); + if let Err(err) = output.run() { + println!("{}", err); + } }) .unwrap(), ) } - fn run(mut self) { + fn run(mut self) -> Result<(), super::GpuError> { loop { - if let Some(walls) = self.input.read() { + if let Some(walls) = self.input.read()? { for wall in walls { if !self.results.contains(&wall) { wall.output(); - self.result_sender - .send(Message::RowResult(wall.clone())) - .or_else(|_| Err(println!("Failed to transmit result back"))); + self.result_sender.send(Message::RowResult(wall.clone()))?; } self.results.insert(wall); } @@ -135,10 +128,10 @@ impl Output { for wall in self.results { wall.output() } - self.result_sender.send(Message::GpuDone).unwrap(); + self.result_sender.send(Message::GpuDone)?; // wait for second exit signal - self.input.read(); - return; + self.input.read()?; + return Ok(()); } } } diff --git a/src/solvers/gpusolver.rs b/src/solvers/gpusolver.rs index 99274cb..62e6e0f 100644 --- a/src/solvers/gpusolver.rs +++ b/src/solvers/gpusolver.rs @@ -19,24 +19,27 @@ pub struct GpuSolver { impl GpuSolver { fn solve_to_vec(&mut self) -> Vec { let (sender, receiver) = std::sync::mpsc::channel(); - let (sender, handle) = - OclManager::launch_sevice(&self.permutations, &self.masks, self.n, 0, sender); - let chunk = permutohedron::factorial(self.n as usize - 1) as u32; - self.permute( - 0, - 0, - ((0..(self.h - 2)).map(|x| x * chunk).collect::>()).as_ref(), - sender.clone(), - ); - sender.send(Message::CpuDone).unwrap(); - let mut walls = Vec::new(); - while let Ok(Message::RowResult(wall)) = receiver.recv() { - walls.push(wall); + if let Ok((sender, handle)) = + OclManager::launch_sevice(&self.permutations, &self.masks, self.n, 0, sender) + { + let chunk = permutohedron::factorial(self.n as usize - 1) as u32; + self.permute( + 0, + 0, + ((0..(self.h - 2)).map(|x| x * chunk).collect::>()).as_ref(), + sender.clone(), + ); + sender.send(Message::CpuDone).unwrap(); + let mut walls = Vec::new(); + while let Ok(Message::RowResult(wall)) = receiver.recv() { + walls.push(wall); + } + sender.send(Message::Terminate).unwrap(); + //println!("{:?}", walls); + handle.join().unwrap(); + return walls; } - sender.send(Message::Terminate).unwrap(); - //println!("{:?}", walls); - handle.join().unwrap(); - walls + vec![] } fn permute(&self, index: usize, curr_mask: u64, numbers: &[u32], sender: Sender) { diff --git a/src/solvers/mod.rs b/src/solvers/mod.rs index 5b7a495..de61639 100644 --- a/src/solvers/mod.rs +++ b/src/solvers/mod.rs @@ -1,7 +1,7 @@ //pub mod incremental_block; pub mod gpu; pub mod gpusolver; -pub mod single; +//pub mod single; //use crate::structs::StoneWall; pub use gpu::*; diff --git a/src/solvers/single.rs b/src/solvers/single.rs index 471c283..90bbc00 100644 --- a/src/solvers/single.rs +++ b/src/solvers/single.rs @@ -54,11 +54,11 @@ impl NormalSolver { } pub fn solve(&mut self) { - //for (n, i) in self.permutations.iter().enumerate() { - //let tmp: Vec = i.clone(); - //println!("perm {}: {:?}", n, tmp); - //println!("perm {}: {:b}", n, self.masks[n]); - //} + for (n, i) in self.permutations.iter().enumerate() { + let tmp: Vec = i.clone(); + println!("perm {}: {:?}", n, tmp); + println!("perm {}: {:b}", n, self.masks[n]); + } println!("calculate results"); self.permute( permutohedron::factorial(self.n as usize), diff --git a/src/structs.rs b/src/structs.rs index c61bb14..67cce30 100644 --- a/src/structs.rs +++ b/src/structs.rs @@ -3,16 +3,19 @@ pub struct StoneWall { rows: Vec>, } +#[allow(clippy::option_map_unit_fn)] impl StoneWall { + #[allow(dead_code)] pub fn create_empty(n: u32) -> Self { let n = n as usize; let h = n / 2 + 1; let mut rows = vec![vec![0; n]; h]; - rows.get_mut(0).map(|r| r[0] = 1); - rows.get_mut(1).map(|r| r[1] = 2); + rows.get_mut(0).map(|r| r[0] = 1); // Should be rewitten to express + rows.get_mut(1).map(|r| r[1] = 2); // its purpose more clearly Self { rows } } + #[allow(dead_code)] pub fn set_stone(&mut self, row: u32, pos: u32, stone: u32) -> Option<()> { self.rows .get_mut(row as usize) @@ -20,6 +23,7 @@ impl StoneWall { .map(|v| *v = stone) } + #[allow(dead_code)] pub fn output(&mut self) { let colors = [[31, 32], [33, 35]]; //self.rows.sort_by_key(|x| x[0]); @@ -43,6 +47,7 @@ pub struct GapHeights { } impl GapHeights { + #[allow(dead_code)] pub fn from_heights(heights: Vec) -> Self { Self { heights } } @@ -62,10 +67,12 @@ impl GapHeights { Self { heights } } + #[allow(dead_code)] pub fn add_gap(&mut self, height: u32) { self.heights.push(height) } + #[allow(dead_code)] pub fn calculate_row(&self, r: u32, stones: &mut [u32]) { let mut len = 1; let mut i = 0; @@ -79,9 +86,9 @@ impl GapHeights { } } + #[allow(dead_code)] pub fn output(&self, n: u32, h: u32) { let mut stones = vec![0; n as usize]; - let mut toggle = 0; let mut colors = [ "\x1b[31m", "\x1b[32m", "\x1b[33m", "\x1b[34m", "\x1b[35m", "\x1b[36m", ] @@ -107,13 +114,15 @@ impl GapHeights { } } - pub fn iter<'a>(&'a self) -> GapIter<'a> { + #[allow(dead_code)] + pub fn iter(&self) -> GapIter { GapIter { gap_heights: self, i: 0, } } + #[allow(dead_code)] pub fn as_stone_wall(&self, n: u32) -> StoneWall { let h = n / 2 + 1; let mut rows = Vec::with_capacity(h as usize); @@ -136,6 +145,6 @@ impl<'a> Iterator for GapIter<'a> { fn next(&mut self) -> Option { let i = self.i; self.i += 1; - self.gap_heights.heights.get(i).map(|&x| x) + self.gap_heights.heights.get(i).cloned() } } -- cgit v1.2.3