summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDennis Kobert <dennis@kobert.dev>2025-03-05 18:18:50 +0100
committerDennis Kobert <dennis@kobert.dev>2025-03-05 18:18:50 +0100
commit1c5b023fc1a063f6c3817c3e18204d5ddff41941 (patch)
tree4fa71b89a6480ec7ca55c3411ee4c566a2da4650 /src
parent16633bdc25973c4e89c39e41158711479bdbb147 (diff)
Move enery measurement to seperate thread
Diffstat (limited to 'src')
-rw-r--r--src/main.rs154
-rw-r--r--src/mock.rs7
-rw-r--r--src/task_state.rs4
3 files changed, 114 insertions, 51 deletions
diff --git a/src/main.rs b/src/main.rs
index b11ee27..95c31a8 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -19,32 +19,45 @@ use libbpf_rs::OpenObject;
use mock::{KernelDriver, KernelModule, MockModule};
use task_state::TaskState;
-use std::collections::{HashMap, VecDeque};
+use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
use std::fs::File;
use std::io::{BufRead, BufReader};
use std::mem::MaybeUninit;
use std::ops::Range;
-use std::process;
-use std::sync::mpsc::{Receiver, Sender};
+use std::sync::mpsc::{Receiver, Sender, SyncSender};
+use std::time::Duration;
+use std::{process, thread};
use anyhow::Result;
-const SLICE_US: u64 = 5000;
+type Pid = i32;
+
+const SLICE_US: u64 = 50000;
struct Scheduler<'a> {
bpf: BpfScheduler<'a>,
- module: Box<dyn KernelModule>,
task_queue: VecDeque<QueuedTask>,
no_budget_task_queue: VecDeque<QueuedTask>,
- managed_tasks: HashMap<u32, TaskState>,
+ managed_tasks: HashMap<Pid, TaskState>,
maximum_budget: u64,
power_cap: u64,
- own_pid: u32,
+ own_pid: Pid,
p_cores: Range<i32>,
e_cores: Range<i32>,
e_core_selector: Box<dyn ECoreSelector>,
- reciever: Receiver<(i32, i32)>,
- sender: Sender<i32>,
+ reciever: Receiver<(Pid, Response)>,
+ sender: SyncSender<(Pid, Request)>,
+}
+
+enum Request {
+ NewTask,
+ Heartbeat,
+ ReadPerf,
+ RemoveTask,
+}
+enum Response {
+ Parent(i32),
+ Energy(u64),
}
impl<'a> Scheduler<'a> {
@@ -60,11 +73,6 @@ impl<'a> Scheduler<'a> {
false, // debug (false = debug mode off)
)?;
dbg!("registering rust user space scheduler");
- let module: Box<dyn KernelModule> = if use_mocking {
- Box::new(MockModule::default())
- } else {
- Box::new(KernelDriver::default())
- };
let read_cores = |core_type: &str| {
let e_cores_file = File::open(format!("/sys/devices/cpu_{core_type}/cpus"))?;
@@ -84,35 +92,76 @@ impl<'a> Scheduler<'a> {
let selector = Box::new(RoundRobinSelector::new(&e_cores));
- let (pid_send, pid_recieve) = std::sync::mpsc::channel();
- let (parent_send, parent_recieve) = std::sync::mpsc::channel();
+ let (request_send, request_recieve) = std::sync::mpsc::sync_channel(1000);
+ let (response_send, response_recieve) = std::sync::mpsc::sync_channel(100);
- std::thread::spawn(move || loop {
- if let Ok(pid) = pid_recieve.recv() {
- let parent = (|| {
- let process = procfs::process::Process::new(pid)?;
- process.stat().map(|stat| stat.ppid)
- })()
- .unwrap_or_default();
-
- parent_send.send((pid, parent)).unwrap();
+ std::thread::spawn(move || {
+ let mut module: Box<dyn KernelModule> = if use_mocking {
+ Box::new(PerfEstimator::default())
+ // Box::new(MockModule::default())
+ } else {
+ Box::new(KernelDriver::default())
+ };
+ let mut tasks = BTreeSet::new();
+
+ let mut i = 0;
+ loop {
+ i += 1;
+ // if i % 1000 == 0 {
+ // println!("reading energy");
+ for &pid in tasks.iter().nth((i / 100) % 1000) {
+ let _ = response_send
+ .try_send((pid, Response::Energy(module.read_consumption(pid as u64))));
+ }
+ // }
+
+ if let Ok((pid, request)) = request_recieve.try_recv() {
+ match request {
+ Request::NewTask => {
+ let parent = (|| {
+ let process = procfs::process::Process::new(pid)?;
+ process.stat().map(|stat| stat.ppid)
+ })()
+ .unwrap_or_default();
+ module.start_trace(pid as u64);
+ tasks.insert(pid);
+
+ response_send
+ .try_send((pid, Response::Parent(parent)))
+ .unwrap();
+ }
+ Request::ReadPerf => {
+ response_send
+ .try_send((
+ pid,
+ Response::Energy(module.read_consumption(pid as u64)),
+ ))
+ .unwrap();
+ }
+ Request::RemoveTask => {
+ tasks.remove(&pid);
+ module.stop_trace(pid as u64)
+ }
+ Request::Heartbeat => {}
+ }
+ }
+ std::thread::sleep(Duration::from_micros(50000));
}
});
Ok(Self {
bpf,
- module,
task_queue: VecDeque::new(),
no_budget_task_queue: VecDeque::new(),
managed_tasks: HashMap::new(),
maximum_budget: u64::MAX,
power_cap,
- own_pid: process::id(),
+ own_pid: process::id() as i32,
p_cores,
e_cores,
e_core_selector: selector,
- sender: pid_send,
- reciever: parent_recieve,
+ sender: request_send,
+ reciever: response_recieve,
})
}
@@ -136,28 +185,24 @@ impl<'a> Scheduler<'a> {
while let Ok(Some(task)) = self.bpf.dequeue_task() {
// The scheduler itself has to be scheduled regardless of its energy usage
- if task.pid as u32 == self.own_pid {
+ if task.pid == self.own_pid {
self.task_queue.push_back(task);
continue;
}
- if let Some(task_state) = self.managed_tasks.get_mut(&(task.pid as u32)) {
- let energy = self.module.read_consumption(task.pid as u64);
- let used_energy = energy - task_state.previous_energy_usage;
- if task_state.budget < used_energy {
- task_state.budget = 0;
+ if let Some(task_state) = self.managed_tasks.get_mut(&(task.pid)) {
+ // self.sender.try_send((task.pid, Request::ReadPerf)).unwrap();
+ if task_state.budget == 0 {
+ println!("budget zero");
self.no_budget_task_queue.push_back(task);
} else {
- task_state.budget -= used_energy;
self.task_queue.push_back(task);
}
} else {
- self.module.start_trace(task.pid as u64);
-
- self.sender.send(task.pid).unwrap();
+ self.sender.try_send((task.pid, Request::NewTask)).unwrap();
self.managed_tasks.insert(
- task.pid as u32,
+ task.pid,
TaskState {
previous_energy_usage: 0,
budget: self.maximum_budget,
@@ -238,18 +283,31 @@ impl<'a> Scheduler<'a> {
// Dispatch one task from the queue.
self.dispatch_next_task();
- for (pid, parent) in self.reciever.try_iter() {
- if let Some(task) = self.managed_tasks.get_mut(&(pid as u32)) {
- task.parent = parent as u32;
- }
- }
-
// If no task is ready to run (or in case of error), stop dispatching tasks and notify
// the BPF component that all tasks have been scheduled / dispatched, with no remaining
// pending tasks.
//TODO: both queues?
if self.task_queue.is_empty() && self.no_budget_task_queue.is_empty() {
self.bpf.notify_complete(0);
+
+ // self.sender.try_send((0, Request::Heartbeat)).unwrap();
+ for (pid, response) in self.reciever.try_iter() {
+ let Some(task_state) = self.managed_tasks.get_mut(&(pid)) else {
+ continue;
+ };
+ match response {
+ Response::Parent(parent) => {
+ task_state.parent = parent;
+ }
+ Response::Energy(energy) => {
+ let used_energy = energy - task_state.previous_energy_usage.min(energy);
+ task_state.previous_energy_usage = energy;
+
+ task_state.budget -= used_energy.min(task_state.budget);
+ }
+ }
+ }
+
break;
}
}
@@ -276,7 +334,9 @@ impl<'a> Scheduler<'a> {
if was_scheduled {
value.budget = self.maximum_budget;
} else {
- self.module.stop_trace(*key as u64);
+ self.sender
+ .try_send((*key as i32, Request::RemoveTask))
+ .unwrap();
}
was_scheduled
});
diff --git a/src/mock.rs b/src/mock.rs
index d09f83a..fc6fcf1 100644
--- a/src/mock.rs
+++ b/src/mock.rs
@@ -13,15 +13,16 @@ pub struct MockModule;
impl KernelModule for MockModule {
fn start_trace(&mut self, pid: u64) {
- println!("starting trace of {pid}");
+ // println!("starting trace of {pid}");
}
fn stop_trace(&mut self, pid: u64) {
- println!("stopping trace of {pid}");
+ // println!("stopping trace of {pid}");
}
fn read_consumption(&mut self, _pid: u64) -> u64 {
- rand::rng().random()
+ // rand::rng().random()
+ 14
}
}
diff --git a/src/task_state.rs b/src/task_state.rs
index 6043cf4..f88bcc2 100644
--- a/src/task_state.rs
+++ b/src/task_state.rs
@@ -1,5 +1,7 @@
+use crate::Pid;
+
pub struct TaskState {
pub previous_energy_usage: u64,
pub budget: u64,
- pub parent: u32,
+ pub parent: Pid,
}