diff options
author | Dennis Kobert <dennis@kobert.dev> | 2025-03-05 16:02:02 +0100 |
---|---|---|
committer | Dennis Kobert <dennis@kobert.dev> | 2025-03-05 16:02:02 +0100 |
commit | fd7cbf016eae2080878e97a75c38a909cde7a192 (patch) | |
tree | d01c8d962f9fb417af4a50c252c60981cd01b193 /src | |
parent | 7440947e81a18dca0fa9136720aa91675f2d0797 (diff) |
Fix locking
Diffstat (limited to 'src')
-rw-r--r-- | src/main.rs | 95 | ||||
-rw-r--r-- | src/task_state.rs | 4 |
2 files changed, 46 insertions, 53 deletions
diff --git a/src/main.rs b/src/main.rs index 5a5051d..7bce3c7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -25,7 +25,7 @@ use std::io::{BufRead, BufReader}; use std::mem::MaybeUninit; use std::ops::Range; use std::process; -use sysinfo::System; +use std::sync::mpsc::{Receiver, Sender}; use anyhow::Result; @@ -43,7 +43,8 @@ struct Scheduler<'a> { p_cores: Range<i32>, e_cores: Range<i32>, e_core_selector: Box<dyn ECoreSelector>, - system: System, + reciever: Receiver<(i32, i32)>, + sender: Sender<i32>, } impl<'a> Scheduler<'a> { @@ -60,39 +61,43 @@ impl<'a> Scheduler<'a> { )?; dbg!("registering rust user space scheduler"); let module: Box<dyn KernelModule> = if use_mocking { - Box::new(PerfEstimator::default()) + Box::new(MockModule::default()) } else { Box::new(KernelDriver::default()) }; - // let e_cores_file = File::open("/sys/devices/cpu_atom/cpus")?; - // let e_cores_reader = BufReader::new(e_cores_file); - // let e_cores = if let Some(Ok(line)) = e_cores_reader.lines().next() { - // let cores: Vec<&str> = line.split('-').collect(); - // cores[0].parse::<i32>().unwrap()..cores[1].parse::<i32>().unwrap() - // } else { - // panic!( - // "Was not able to differentiate between core types. Does the system have e cores?" - // ) - // }; - - // let p_cores_file = File::open("/sys/devices/cpu_core/cpus")?; - // let p_cores_reader = BufReader::new(p_cores_file); - // let p_cores = if let Some(Ok(line)) = p_cores_reader.lines().next() { - // let cores: Vec<&str> = line.split('-').collect(); - // cores[0].parse::<i32>().unwrap()..cores[1].parse::<i32>().unwrap() - // } else { - // panic!( - // "Was not able to differentiate between core types. Does the system have e cores?" - // ) - // }; - let e_cores = 0..2; - let p_cores = 2..4; + let read_cores = |core_type: &str| { + let e_cores_file = File::open(format!("/sys/devices/cpu_{core_type}/cpus"))?; + let e_cores_reader = BufReader::new(e_cores_file); + let e_cores = if let Some(Ok(line)) = e_cores_reader.lines().next() { + let cores: Vec<&str> = line.split('-').collect(); + cores[0].parse::<i32>().unwrap()..cores[1].parse::<i32>().unwrap() + } else { + panic!( + "Was not able to differentiate between core types. Does the system have e cores?" + ) + }; + Ok::<_, std::io::Error>(e_cores) + }; + let e_cores = read_cores("atom").unwrap_or(0..0); + let p_cores = read_cores("core").unwrap_or(0..4); let selector = Box::new(RoundRobinSelector::new(&e_cores)); - let mut system = System::new_all(); - system.refresh_processes(sysinfo::ProcessesToUpdate::All, true); + let (pid_send, pid_recieve) = std::sync::mpsc::channel(); + let (parent_send, parent_recieve) = std::sync::mpsc::channel(); + + std::thread::spawn(move || loop { + if let Ok(pid) = pid_recieve.recv() { + let parent = (|| { + let process = procfs::process::Process::new(pid)?; + process.stat().map(|stat| stat.ppid) + })() + .unwrap_or_default(); + + parent_send.send((pid, parent)).unwrap(); + } + }); Ok(Self { bpf, @@ -106,7 +111,8 @@ impl<'a> Scheduler<'a> { p_cores, e_cores, e_core_selector: selector, - system, + sender: pid_send, + reciever: parent_recieve, }) } @@ -147,12 +153,15 @@ impl<'a> Scheduler<'a> { } } else { self.module.start_trace(task.pid as u64); + + self.sender.send(task.pid).unwrap(); + self.managed_tasks.insert( task.pid as u32, TaskState { previous_energy_usage: 0, budget: self.maximum_budget, - children: vec![], + parent: 0, }, ); self.task_queue.push_back(task); @@ -219,12 +228,9 @@ impl<'a> Scheduler<'a> { } fn dispatch_tasks(&mut self) { - panic!(); loop { //TODO: we should probably not do this every time, but instead wait a predefined amount of time between invocations self.reset_budgets_and_garbage_collect(); - //TODO: we should probably not do this every time, but instead wait a predefined amount of time between invocations - self.refresh_children(); // Consume all tasks before dispatching any. self.consume_all_tasks(); @@ -232,6 +238,10 @@ impl<'a> Scheduler<'a> { // Dispatch one task from the queue. self.dispatch_next_task(); + for (pid, parent) in self.reciever.try_iter() { + self.managed_tasks.get_mut(&(pid as u32)).unwrap().parent = parent as u32; + } + // If no task is ready to run (or in case of error), stop dispatching tasks and notify // the BPF component that all tasks have been scheduled / dispatched, with no remaining // pending tasks. @@ -244,7 +254,7 @@ impl<'a> Scheduler<'a> { } fn run(&mut self) -> Result<UserExitInfo> { - while dbg!(!self.bpf.exited()) { + while !self.bpf.exited() { self.dispatch_tasks(); } self.bpf.shutdown_and_report() @@ -269,23 +279,6 @@ impl<'a> Scheduler<'a> { was_scheduled }); } - - //TODO: this is probably not very efficient - fn refresh_children(&mut self) { - self.system - .refresh_processes(sysinfo::ProcessesToUpdate::All, true); - for entry in self.managed_tasks.values_mut() { - entry.children.clear(); - } - - for (pid, process) in self.system.processes() { - if let Some(parent_pid) = process.parent() { - if let Some(entry) = self.managed_tasks.get_mut(&parent_pid.as_u32()) { - entry.children.push(pid.as_u32()); - } - } - } - } } fn main() -> Result<()> { diff --git a/src/task_state.rs b/src/task_state.rs index e56b109..6043cf4 100644 --- a/src/task_state.rs +++ b/src/task_state.rs @@ -1,5 +1,5 @@ pub struct TaskState { pub previous_energy_usage: u64, pub budget: u64, - pub children: Vec<u32>, -}
\ No newline at end of file + pub parent: u32, +} |