use super::{CheckRequest, GpuError, Message}; use std::sync::mpsc::{channel, Receiver, Sender}; use std::thread::JoinHandle; type Buffer<'a> = (&'a [u64], &'a [Vec]); #[derive(Debug)] struct RequestBuffer { mask_buff: Vec, row_buff: Vec>, pointer: usize, } impl RequestBuffer { pub fn new(size: usize) -> Self { RequestBuffer { mask_buff: vec![0; size], row_buff: vec![Vec::new(); size], pointer: 0, } } pub fn read(&mut self, request: CheckRequest) -> Result, GpuError> { self.mask_buff[self.pointer] = request.bitmask; self.row_buff[self.pointer] = request.rows; self.pointer += 1; if self.pointer == self.mask_buff.len() { self.pointer = 0; return Ok(Some((self.mask_buff.as_ref(), self.row_buff.as_ref()))); } Ok(None) } fn flush(&mut self) -> Result<(), GpuError> { while self.read(CheckRequest::new(vec![], 0, 0))?.is_none() {} Ok(()) } } pub struct OclManager { job_id: u64, host_sender: Sender, output_sender: Sender, receiver: Receiver, buffers: Vec, output_handle: JoinHandle<()>, host_handle: Option>, } impl OclManager { pub fn launch_sevice( permutations: &[Vec], permutations_mask: &[u64], n: u32, // Workgroup size, set to 0 for max mut wg_size: usize, result_output: Sender, ) -> Result<(Sender, JoinHandle<()>), GpuError> { let (h, w) = crate::solvers::wall_stats(n); let src = include_str!("check.cl"); let platform = ocl::Platform::default(); let device = ocl::Device::first(platform)?; let max_wg_size = device.max_wg_size()?; if wg_size == 0 { wg_size = max_wg_size; } else if wg_size > max_wg_size { println!("invalid workgroup size"); } let (output_sender, output_handle) = super::output::Output::launch_sevice(permutations, permutations_mask, result_output); if let Ok((host_sender, host_handle)) = super::host::Host::launch_sevice( permutations_mask, n, h, w, wg_size, src, output_sender.clone(), ) { let (sender, receiver) = channel(); println!("wg {}", wg_size); let mut buffers = Vec::with_capacity((n - h + 1) as usize); for _ in 0..=(n - h) { buffers.push(RequestBuffer::new(wg_size as usize)); } let manager = Self { job_id: 0, host_sender, output_sender, receiver, buffers, output_handle, host_handle: Some(host_handle), }; Ok(( sender, std::thread::Builder::new() .name("GPU Manager Deamon".into()) .spawn(move || { if let Err(err) = manager.run() { println!("{}", err); } }) .unwrap(), )) } else { Err(GpuError::from( "Failed to launch the opnecl thread".to_string(), )) } } fn run(mut self) -> Result<(), GpuError> { loop { match self.receiver.recv()? { Message::CheckRequest(request) => { let queue = request.queue; //println!("num: {:?} bit {:b}", request.rows, request.bitmask); if let Some(buffer) = self.buffers[queue as usize].read(request)? { self.host_sender.send(Message::HostMessage(( self.job_id, queue, buffer.0.into(), )))?; self.output_sender .send(Message::OutputMessage((self.job_id, buffer.1.into())))?; self.job_id += 1; } } Message::CpuDone => { for (i, b) in self.buffers.iter_mut().enumerate() { b.flush()?; self.host_sender.send(Message::HostMessage(( self.job_id, i as u32, b.mask_buff.clone(), )))?; self.output_sender .send(Message::OutputMessage((self.job_id, b.row_buff.clone())))?; self.job_id += 1; } println!("flushing buffers"); self.host_sender.send(Message::CpuDone)?; self.host_handle .take() .unwrap() .join() .expect("failed to join host thread"); } Message::Terminate => { self.output_sender.send(Message::Terminate)?; self.output_handle .join() .expect("failed to join ouput thread"); return Ok(()); } _ => println!("Invalid MessageType"), } } } }