summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs95
-rw-r--r--src/task_state.rs4
2 files changed, 46 insertions, 53 deletions
diff --git a/src/main.rs b/src/main.rs
index 5a5051d..7bce3c7 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -25,7 +25,7 @@ use std::io::{BufRead, BufReader};
use std::mem::MaybeUninit;
use std::ops::Range;
use std::process;
-use sysinfo::System;
+use std::sync::mpsc::{Receiver, Sender};
use anyhow::Result;
@@ -43,7 +43,8 @@ struct Scheduler<'a> {
p_cores: Range<i32>,
e_cores: Range<i32>,
e_core_selector: Box<dyn ECoreSelector>,
- system: System,
+ reciever: Receiver<(i32, i32)>,
+ sender: Sender<i32>,
}
impl<'a> Scheduler<'a> {
@@ -60,39 +61,43 @@ impl<'a> Scheduler<'a> {
)?;
dbg!("registering rust user space scheduler");
let module: Box<dyn KernelModule> = if use_mocking {
- Box::new(PerfEstimator::default())
+ Box::new(MockModule::default())
} else {
Box::new(KernelDriver::default())
};
- // let e_cores_file = File::open("/sys/devices/cpu_atom/cpus")?;
- // let e_cores_reader = BufReader::new(e_cores_file);
- // let e_cores = if let Some(Ok(line)) = e_cores_reader.lines().next() {
- // let cores: Vec<&str> = line.split('-').collect();
- // cores[0].parse::<i32>().unwrap()..cores[1].parse::<i32>().unwrap()
- // } else {
- // panic!(
- // "Was not able to differentiate between core types. Does the system have e cores?"
- // )
- // };
-
- // let p_cores_file = File::open("/sys/devices/cpu_core/cpus")?;
- // let p_cores_reader = BufReader::new(p_cores_file);
- // let p_cores = if let Some(Ok(line)) = p_cores_reader.lines().next() {
- // let cores: Vec<&str> = line.split('-').collect();
- // cores[0].parse::<i32>().unwrap()..cores[1].parse::<i32>().unwrap()
- // } else {
- // panic!(
- // "Was not able to differentiate between core types. Does the system have e cores?"
- // )
- // };
- let e_cores = 0..2;
- let p_cores = 2..4;
+ let read_cores = |core_type: &str| {
+ let e_cores_file = File::open(format!("/sys/devices/cpu_{core_type}/cpus"))?;
+ let e_cores_reader = BufReader::new(e_cores_file);
+ let e_cores = if let Some(Ok(line)) = e_cores_reader.lines().next() {
+ let cores: Vec<&str> = line.split('-').collect();
+ cores[0].parse::<i32>().unwrap()..cores[1].parse::<i32>().unwrap()
+ } else {
+ panic!(
+ "Was not able to differentiate between core types. Does the system have e cores?"
+ )
+ };
+ Ok::<_, std::io::Error>(e_cores)
+ };
+ let e_cores = read_cores("atom").unwrap_or(0..0);
+ let p_cores = read_cores("core").unwrap_or(0..4);
let selector = Box::new(RoundRobinSelector::new(&e_cores));
- let mut system = System::new_all();
- system.refresh_processes(sysinfo::ProcessesToUpdate::All, true);
+ let (pid_send, pid_recieve) = std::sync::mpsc::channel();
+ let (parent_send, parent_recieve) = std::sync::mpsc::channel();
+
+ std::thread::spawn(move || loop {
+ if let Ok(pid) = pid_recieve.recv() {
+ let parent = (|| {
+ let process = procfs::process::Process::new(pid)?;
+ process.stat().map(|stat| stat.ppid)
+ })()
+ .unwrap_or_default();
+
+ parent_send.send((pid, parent)).unwrap();
+ }
+ });
Ok(Self {
bpf,
@@ -106,7 +111,8 @@ impl<'a> Scheduler<'a> {
p_cores,
e_cores,
e_core_selector: selector,
- system,
+ sender: pid_send,
+ reciever: parent_recieve,
})
}
@@ -147,12 +153,15 @@ impl<'a> Scheduler<'a> {
}
} else {
self.module.start_trace(task.pid as u64);
+
+ self.sender.send(task.pid).unwrap();
+
self.managed_tasks.insert(
task.pid as u32,
TaskState {
previous_energy_usage: 0,
budget: self.maximum_budget,
- children: vec![],
+ parent: 0,
},
);
self.task_queue.push_back(task);
@@ -219,12 +228,9 @@ impl<'a> Scheduler<'a> {
}
fn dispatch_tasks(&mut self) {
- panic!();
loop {
//TODO: we should probably not do this every time, but instead wait a predefined amount of time between invocations
self.reset_budgets_and_garbage_collect();
- //TODO: we should probably not do this every time, but instead wait a predefined amount of time between invocations
- self.refresh_children();
// Consume all tasks before dispatching any.
self.consume_all_tasks();
@@ -232,6 +238,10 @@ impl<'a> Scheduler<'a> {
// Dispatch one task from the queue.
self.dispatch_next_task();
+ for (pid, parent) in self.reciever.try_iter() {
+ self.managed_tasks.get_mut(&(pid as u32)).unwrap().parent = parent as u32;
+ }
+
// If no task is ready to run (or in case of error), stop dispatching tasks and notify
// the BPF component that all tasks have been scheduled / dispatched, with no remaining
// pending tasks.
@@ -244,7 +254,7 @@ impl<'a> Scheduler<'a> {
}
fn run(&mut self) -> Result<UserExitInfo> {
- while dbg!(!self.bpf.exited()) {
+ while !self.bpf.exited() {
self.dispatch_tasks();
}
self.bpf.shutdown_and_report()
@@ -269,23 +279,6 @@ impl<'a> Scheduler<'a> {
was_scheduled
});
}
-
- //TODO: this is probably not very efficient
- fn refresh_children(&mut self) {
- self.system
- .refresh_processes(sysinfo::ProcessesToUpdate::All, true);
- for entry in self.managed_tasks.values_mut() {
- entry.children.clear();
- }
-
- for (pid, process) in self.system.processes() {
- if let Some(parent_pid) = process.parent() {
- if let Some(entry) = self.managed_tasks.get_mut(&parent_pid.as_u32()) {
- entry.children.push(pid.as_u32());
- }
- }
- }
- }
}
fn main() -> Result<()> {
diff --git a/src/task_state.rs b/src/task_state.rs
index e56b109..6043cf4 100644
--- a/src/task_state.rs
+++ b/src/task_state.rs
@@ -1,5 +1,5 @@
pub struct TaskState {
pub previous_energy_usage: u64,
pub budget: u64,
- pub children: Vec<u32>,
-} \ No newline at end of file
+ pub parent: u32,
+}