diff options
author | Dennis Kobert <dennis@kobert.dev> | 2025-03-05 18:18:50 +0100 |
---|---|---|
committer | Dennis Kobert <dennis@kobert.dev> | 2025-03-05 18:18:50 +0100 |
commit | 1c5b023fc1a063f6c3817c3e18204d5ddff41941 (patch) | |
tree | 4fa71b89a6480ec7ca55c3411ee4c566a2da4650 /src | |
parent | 16633bdc25973c4e89c39e41158711479bdbb147 (diff) |
Move enery measurement to seperate thread
Diffstat (limited to 'src')
-rw-r--r-- | src/main.rs | 154 | ||||
-rw-r--r-- | src/mock.rs | 7 | ||||
-rw-r--r-- | src/task_state.rs | 4 |
3 files changed, 114 insertions, 51 deletions
diff --git a/src/main.rs b/src/main.rs index b11ee27..95c31a8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,32 +19,45 @@ use libbpf_rs::OpenObject; use mock::{KernelDriver, KernelModule, MockModule}; use task_state::TaskState; -use std::collections::{HashMap, VecDeque}; +use std::collections::{BTreeSet, HashMap, HashSet, VecDeque}; use std::fs::File; use std::io::{BufRead, BufReader}; use std::mem::MaybeUninit; use std::ops::Range; -use std::process; -use std::sync::mpsc::{Receiver, Sender}; +use std::sync::mpsc::{Receiver, Sender, SyncSender}; +use std::time::Duration; +use std::{process, thread}; use anyhow::Result; -const SLICE_US: u64 = 5000; +type Pid = i32; + +const SLICE_US: u64 = 50000; struct Scheduler<'a> { bpf: BpfScheduler<'a>, - module: Box<dyn KernelModule>, task_queue: VecDeque<QueuedTask>, no_budget_task_queue: VecDeque<QueuedTask>, - managed_tasks: HashMap<u32, TaskState>, + managed_tasks: HashMap<Pid, TaskState>, maximum_budget: u64, power_cap: u64, - own_pid: u32, + own_pid: Pid, p_cores: Range<i32>, e_cores: Range<i32>, e_core_selector: Box<dyn ECoreSelector>, - reciever: Receiver<(i32, i32)>, - sender: Sender<i32>, + reciever: Receiver<(Pid, Response)>, + sender: SyncSender<(Pid, Request)>, +} + +enum Request { + NewTask, + Heartbeat, + ReadPerf, + RemoveTask, +} +enum Response { + Parent(i32), + Energy(u64), } impl<'a> Scheduler<'a> { @@ -60,11 +73,6 @@ impl<'a> Scheduler<'a> { false, // debug (false = debug mode off) )?; dbg!("registering rust user space scheduler"); - let module: Box<dyn KernelModule> = if use_mocking { - Box::new(MockModule::default()) - } else { - Box::new(KernelDriver::default()) - }; let read_cores = |core_type: &str| { let e_cores_file = File::open(format!("/sys/devices/cpu_{core_type}/cpus"))?; @@ -84,35 +92,76 @@ impl<'a> Scheduler<'a> { let selector = Box::new(RoundRobinSelector::new(&e_cores)); - let (pid_send, pid_recieve) = std::sync::mpsc::channel(); - let (parent_send, parent_recieve) = std::sync::mpsc::channel(); + let (request_send, request_recieve) = std::sync::mpsc::sync_channel(1000); + let (response_send, response_recieve) = std::sync::mpsc::sync_channel(100); - std::thread::spawn(move || loop { - if let Ok(pid) = pid_recieve.recv() { - let parent = (|| { - let process = procfs::process::Process::new(pid)?; - process.stat().map(|stat| stat.ppid) - })() - .unwrap_or_default(); - - parent_send.send((pid, parent)).unwrap(); + std::thread::spawn(move || { + let mut module: Box<dyn KernelModule> = if use_mocking { + Box::new(PerfEstimator::default()) + // Box::new(MockModule::default()) + } else { + Box::new(KernelDriver::default()) + }; + let mut tasks = BTreeSet::new(); + + let mut i = 0; + loop { + i += 1; + // if i % 1000 == 0 { + // println!("reading energy"); + for &pid in tasks.iter().nth((i / 100) % 1000) { + let _ = response_send + .try_send((pid, Response::Energy(module.read_consumption(pid as u64)))); + } + // } + + if let Ok((pid, request)) = request_recieve.try_recv() { + match request { + Request::NewTask => { + let parent = (|| { + let process = procfs::process::Process::new(pid)?; + process.stat().map(|stat| stat.ppid) + })() + .unwrap_or_default(); + module.start_trace(pid as u64); + tasks.insert(pid); + + response_send + .try_send((pid, Response::Parent(parent))) + .unwrap(); + } + Request::ReadPerf => { + response_send + .try_send(( + pid, + Response::Energy(module.read_consumption(pid as u64)), + )) + .unwrap(); + } + Request::RemoveTask => { + tasks.remove(&pid); + module.stop_trace(pid as u64) + } + Request::Heartbeat => {} + } + } + std::thread::sleep(Duration::from_micros(50000)); } }); Ok(Self { bpf, - module, task_queue: VecDeque::new(), no_budget_task_queue: VecDeque::new(), managed_tasks: HashMap::new(), maximum_budget: u64::MAX, power_cap, - own_pid: process::id(), + own_pid: process::id() as i32, p_cores, e_cores, e_core_selector: selector, - sender: pid_send, - reciever: parent_recieve, + sender: request_send, + reciever: response_recieve, }) } @@ -136,28 +185,24 @@ impl<'a> Scheduler<'a> { while let Ok(Some(task)) = self.bpf.dequeue_task() { // The scheduler itself has to be scheduled regardless of its energy usage - if task.pid as u32 == self.own_pid { + if task.pid == self.own_pid { self.task_queue.push_back(task); continue; } - if let Some(task_state) = self.managed_tasks.get_mut(&(task.pid as u32)) { - let energy = self.module.read_consumption(task.pid as u64); - let used_energy = energy - task_state.previous_energy_usage; - if task_state.budget < used_energy { - task_state.budget = 0; + if let Some(task_state) = self.managed_tasks.get_mut(&(task.pid)) { + // self.sender.try_send((task.pid, Request::ReadPerf)).unwrap(); + if task_state.budget == 0 { + println!("budget zero"); self.no_budget_task_queue.push_back(task); } else { - task_state.budget -= used_energy; self.task_queue.push_back(task); } } else { - self.module.start_trace(task.pid as u64); - - self.sender.send(task.pid).unwrap(); + self.sender.try_send((task.pid, Request::NewTask)).unwrap(); self.managed_tasks.insert( - task.pid as u32, + task.pid, TaskState { previous_energy_usage: 0, budget: self.maximum_budget, @@ -238,18 +283,31 @@ impl<'a> Scheduler<'a> { // Dispatch one task from the queue. self.dispatch_next_task(); - for (pid, parent) in self.reciever.try_iter() { - if let Some(task) = self.managed_tasks.get_mut(&(pid as u32)) { - task.parent = parent as u32; - } - } - // If no task is ready to run (or in case of error), stop dispatching tasks and notify // the BPF component that all tasks have been scheduled / dispatched, with no remaining // pending tasks. //TODO: both queues? if self.task_queue.is_empty() && self.no_budget_task_queue.is_empty() { self.bpf.notify_complete(0); + + // self.sender.try_send((0, Request::Heartbeat)).unwrap(); + for (pid, response) in self.reciever.try_iter() { + let Some(task_state) = self.managed_tasks.get_mut(&(pid)) else { + continue; + }; + match response { + Response::Parent(parent) => { + task_state.parent = parent; + } + Response::Energy(energy) => { + let used_energy = energy - task_state.previous_energy_usage.min(energy); + task_state.previous_energy_usage = energy; + + task_state.budget -= used_energy.min(task_state.budget); + } + } + } + break; } } @@ -276,7 +334,9 @@ impl<'a> Scheduler<'a> { if was_scheduled { value.budget = self.maximum_budget; } else { - self.module.stop_trace(*key as u64); + self.sender + .try_send((*key as i32, Request::RemoveTask)) + .unwrap(); } was_scheduled }); diff --git a/src/mock.rs b/src/mock.rs index d09f83a..fc6fcf1 100644 --- a/src/mock.rs +++ b/src/mock.rs @@ -13,15 +13,16 @@ pub struct MockModule; impl KernelModule for MockModule { fn start_trace(&mut self, pid: u64) { - println!("starting trace of {pid}"); + // println!("starting trace of {pid}"); } fn stop_trace(&mut self, pid: u64) { - println!("stopping trace of {pid}"); + // println!("stopping trace of {pid}"); } fn read_consumption(&mut self, _pid: u64) -> u64 { - rand::rng().random() + // rand::rng().random() + 14 } } diff --git a/src/task_state.rs b/src/task_state.rs index 6043cf4..f88bcc2 100644 --- a/src/task_state.rs +++ b/src/task_state.rs @@ -1,5 +1,7 @@ +use crate::Pid; + pub struct TaskState { pub previous_energy_usage: u64, pub budget: u64, - pub parent: u32, + pub parent: Pid, } |