diff options
author | Dennis Kobert <dennis@kobert.dev> | 2025-03-09 13:15:13 +0100 |
---|---|---|
committer | Dennis Kobert <dennis@kobert.dev> | 2025-03-09 13:15:13 +0100 |
commit | b720a1253fb8cc8a3ba3454014bad239233368c1 (patch) | |
tree | f1f7fa25edd5c47e3c30c4b984fa9d557409c7d7 | |
parent | 7b2ec191751489157a9e3193b8aeaabd72fae14a (diff) |
Restructure project
-rw-r--r-- | flake.nix | 1 | ||||
-rw-r--r-- | src/energy.rs | 174 | ||||
-rw-r--r-- | src/energy/budget.rs | 49 | ||||
-rw-r--r-- | src/energy/estimator.rs | 5 | ||||
-rw-r--r-- | src/energy/trackers.rs | 13 | ||||
-rw-r--r-- | src/energy/trackers/kernel.rs | 49 | ||||
-rw-r--r-- | src/energy/trackers/mock.rs | 14 | ||||
-rw-r--r-- | src/energy/trackers/perf.rs (renamed from src/mock/perf.rs) | 51 | ||||
-rw-r--r-- | src/main.rs | 315 | ||||
-rw-r--r-- | src/mock.rs | 65 | ||||
-rw-r--r-- | src/scheduler.rs | 223 | ||||
-rw-r--r-- | src/task_state.rs | 7 |
12 files changed, 568 insertions, 398 deletions
@@ -74,6 +74,7 @@ packages = with pkgs; [ cargo-watch cargo-expand + samply just ]; diff --git a/src/energy.rs b/src/energy.rs new file mode 100644 index 0000000..c8da2cf --- /dev/null +++ b/src/energy.rs @@ -0,0 +1,174 @@ +mod budget; +mod trackers; + +use std::collections::{BTreeSet, HashMap}; +use std::sync::{mpsc, Arc}; +use std::thread; +use std::time::Duration; + +use crate::Pid; +use anyhow::Result; +use dashmap::DashMap; + +pub use budget::BudgetPolicy; +pub use trackers::{Estimator, KernelDriver, MockEstimator, PerfEstimator}; + +pub enum Request { + NewTask(Pid), + RemoveTask(Pid), +} + +pub struct ProcessInfo { + energy: u64, + last_update: std::time::Instant, + parent: Pid, +} + +pub struct EnergyService { + estimator: Box<dyn Estimator>, + budget_policy: Box<dyn BudgetPolicy>, + active_processes: BTreeSet<Pid>, + process_info: HashMap<Pid, ProcessInfo>, + shared_budgets: Arc<DashMap<Pid, u64>>, + request_receiver: mpsc::Receiver<Request>, + update_interval: Duration, +} + +impl EnergyService { + pub fn new( + estimator: Box<dyn Estimator>, + budget_policy: Box<dyn BudgetPolicy>, + shared_budgets: Arc<DashMap<Pid, u64>>, + request_receiver: mpsc::Receiver<Request>, + update_interval: Duration, + ) -> Self { + Self { + estimator, + budget_policy, + active_processes: BTreeSet::new(), + process_info: HashMap::new(), + shared_budgets, + request_receiver, + update_interval, + } + } + + pub fn run(mut self) { + thread::spawn(move || { + loop { + // Process any incoming requests + self.handle_requests(); + + // Update energy measurements + self.update_measurements(); + + // Calculate and update budgets + self.update_budgets(); + + // Sleep for update interval + thread::sleep(self.update_interval); + } + }); + } + + fn handle_requests(&mut self) { + while let Ok(request) = self.request_receiver.try_recv() { + match request { + Request::NewTask(pid) => { + self.estimator.start_trace(pid as u64); + self.active_processes.insert(pid); + let parent = (|| { + let process = procfs::process::Process::new(pid)?; + process.stat().map(|stat| stat.ppid) + })() + .unwrap_or_default(); + self.process_info.insert( + pid, + ProcessInfo { + energy: 0, + last_update: std::time::Instant::now(), + parent, + }, + ); + // Initialize with default budget + self.shared_budgets.insert(pid, u64::MAX); + } + Request::RemoveTask(pid) => { + self.estimator.stop_trace(pid as u64); + self.active_processes.remove(&pid); + self.process_info.remove(&pid); + self.shared_budgets.remove(&pid); + } + } + } + } + + fn update_measurements(&mut self) { + for &pid in &self.active_processes { + let energy = self.estimator.read_consumption(pid as u64); + if let Some(info) = self.process_info.get_mut(&pid) { + info.energy = energy; + info.last_update = std::time::Instant::now(); + } + } + } + + fn update_budgets(&mut self) { + let budgets = self.budget_policy.calculate_budgets(self); + + // Update the shared budgets map + for (pid, budget) in budgets { + if let Some(mut entry) = self.shared_budgets.try_get_mut(&pid).try_unwrap() { + *entry = budget; + } + } + } + + // Accessor methods for BudgetPolicy + pub fn active_processes(&self) -> &BTreeSet<Pid> { + &self.active_processes + } + + pub fn process_energy(&self, pid: Pid) -> Option<u64> { + self.process_info.get(&pid).map(|info| info.energy) + } + + pub fn all_process_energies(&self) -> HashMap<Pid, u64> { + self.process_info + .iter() + .map(|(&pid, info)| (pid, info.energy)) + .collect() + } +} + +pub fn start_energy_service( + use_mocking: bool, + power_cap: u64, + shared_budgets: Arc<DashMap<Pid, u64>>, +) -> mpsc::SyncSender<Request> { + // Potentially convert back to bounded channel + let (request_sender, request_receiver) = mpsc::sync_channel(10000); + + // Create the appropriate estimator based on configuration + let estimator: Box<dyn Estimator> = if use_mocking { + Box::new(PerfEstimator::default()) + } else { + Box::new(KernelDriver::default()) + }; + + // Create budget policy + let budget_policy = Box::new(budget::SimpleCappingPolicy::new(power_cap)); + + // Create and start the energy service + let service = EnergyService::new( + estimator, + budget_policy, + shared_budgets, + request_receiver, + Duration::from_millis(50), // 50ms update interval + ); + + service.run(); + + request_sender +} diff --git a/src/energy/budget.rs b/src/energy/budget.rs new file mode 100644 index 0000000..aada2eb --- /dev/null +++ b/src/energy/budget.rs @@ -0,0 +1,49 @@ +use std::collections::HashMap; + +use crate::energy::EnergyService; + +type Pid = i32; + +pub trait BudgetPolicy: Send + 'static { + fn calculate_budgets(&self, energy_service: &EnergyService) -> HashMap<Pid, u64>; +} + +pub struct SimpleCappingPolicy { + power_cap: u64, +} + +impl SimpleCappingPolicy { + pub fn new(power_cap: u64) -> Self { + Self { power_cap } + } +} + +impl BudgetPolicy for SimpleCappingPolicy { + fn calculate_budgets(&self, energy_service: &EnergyService) -> HashMap<Pid, u64> { + let mut budgets = HashMap::new(); + let process_energies = energy_service.all_process_energies(); + + // Total energy consumption across all processes + let total_energy: u64 = process_energies.values().sum(); + + // Simple proportional distribution if over cap + if total_energy > self.power_cap { + let ratio = self.power_cap as f64 / total_energy as f64; + + for (&pid, &energy) in &process_energies { + // Calculate a scaled budget based on the ratio + // Higher energy consumers get proportionally reduced budgets + let scaling_factor = 1.0 - ((energy as f64 / total_energy as f64) * (1.0 - ratio)); + let budget = (u64::MAX as f64 * scaling_factor) as u64; + budgets.insert(pid, budget); + } + } else { + // Under power cap, assign maximum budget to all + for &pid in energy_service.active_processes() { + budgets.insert(pid, u64::MAX); + } + } + + budgets + } +} diff --git a/src/energy/estimator.rs b/src/energy/estimator.rs new file mode 100644 index 0000000..f7f1e5d --- /dev/null +++ b/src/energy/estimator.rs @@ -0,0 +1,5 @@ +pub trait Estimator: Send + 'static { + fn start_trace(&mut self, pid: u64); + fn stop_trace(&mut self, pid: u64); + fn read_consumption(&mut self, pid: u64) -> u64; +} diff --git a/src/energy/trackers.rs b/src/energy/trackers.rs new file mode 100644 index 0000000..592c926 --- /dev/null +++ b/src/energy/trackers.rs @@ -0,0 +1,13 @@ +mod kernel; +mod mock; +mod perf; + +pub use kernel::*; +pub use mock::*; +pub use perf::*; + +pub trait Estimator: Send + 'static { + fn start_trace(&mut self, pid: u64); + fn stop_trace(&mut self, pid: u64); + fn read_consumption(&mut self, pid: u64) -> u64; +} diff --git a/src/energy/trackers/kernel.rs b/src/energy/trackers/kernel.rs new file mode 100644 index 0000000..050805c --- /dev/null +++ b/src/energy/trackers/kernel.rs @@ -0,0 +1,49 @@ +// energy/trackers/kernel.rs +use iocuddle::*; +use std::fs::File; +use std::path::Path; + +use crate::energy::Estimator; + +#[derive(Debug)] +pub struct KernelDriver { + file: File, +} + +impl Default for KernelDriver { + fn default() -> Self { + KernelDriver::new("/dev/rust-pow") + .unwrap_or_else(|_| panic!("Failed to open kernel driver. Is the module loaded?")) + } +} + +impl KernelDriver { + pub fn new(path: impl AsRef<Path>) -> Result<Self, std::io::Error> { + let file = File::open(path)?; + Ok(Self { file }) + } +} + +const PERF_MON: Group = Group::new(b'|'); +const START_TRACE: Ioctl<Write, &u64> = unsafe { PERF_MON.write(0x80) }; +const STOP_TRACE: Ioctl<Write, &u64> = unsafe { PERF_MON.write(0x81) }; +const READ_POWER: Ioctl<WriteRead, &u64> = unsafe { PERF_MON.write_read(0x82) }; + +impl Estimator for KernelDriver { + fn start_trace(&mut self, pid: u64) { + let _ = START_TRACE.ioctl(&mut self.file, &pid); + } + + fn stop_trace(&mut self, pid: u64) { + let _ = STOP_TRACE.ioctl(&mut self.file, &pid); + } + + fn read_consumption(&mut self, pid: u64) -> u64 { + let mut arg = pid; + if let Ok(_) = READ_POWER.ioctl(&mut self.file, &mut arg) { + arg + } else { + 0 + } + } +} diff --git a/src/energy/trackers/mock.rs b/src/energy/trackers/mock.rs new file mode 100644 index 0000000..eb23421 --- /dev/null +++ b/src/energy/trackers/mock.rs @@ -0,0 +1,14 @@ +use super::Estimator; + +#[derive(Default)] +pub struct MockEstimator; + +impl Estimator for MockEstimator { + fn start_trace(&mut self, _pid: u64) {} + + fn stop_trace(&mut self, _pid: u64) {} + + fn read_consumption(&mut self, _pid: u64) -> u64 { + 14 + } +} diff --git a/src/mock/perf.rs b/src/energy/trackers/perf.rs index 3dde836..19ec0be 100644 --- a/src/mock/perf.rs +++ b/src/energy/trackers/perf.rs @@ -1,8 +1,12 @@ +// energy/trackers/perf.rs use std::collections::HashMap; -use events::Event; -use perf_event::events::Hardware; -use perf_event::*; +use perf_event::{ + events::{Event, Hardware}, + Builder, Counter, Group, +}; + +use crate::energy::Estimator; #[derive(Default)] pub struct PerfEstimator { @@ -19,24 +23,34 @@ static EVENT_TYPES: &[(f32, Event)] = &[ (2.0, Event::Hardware(Hardware::INSTRUCTIONS)), ]; -impl super::KernelModule for PerfEstimator { +impl Estimator for PerfEstimator { fn start_trace(&mut self, pid: u64) { let Ok(mut group) = Group::new() else { - println!("failed to create Group"); + eprintln!("Failed to create performance counter group for PID {}", pid); return; }; - let counters = EVENT_TYPES + + let counters: Result<Vec<_>, _> = EVENT_TYPES .iter() - .map(|(_, kind)| { - Builder::new() - .group(&mut group) - .kind(kind.clone()) - .build() - .unwrap() - }) + .map(|(_, kind)| Builder::new().group(&mut group).kind(kind.clone()).build()) .collect(); - group.enable().unwrap(); + let counters = match counters { + Ok(counters) => counters, + Err(e) => { + eprintln!( + "Failed to create performance counter group for PID {}: {}", + pid, e + ); + return; + } + }; + + if let Err(e) = group.enable() { + eprintln!("Failed to enable performance counters: {}", e); + return; + } + let counters = Counters { counters, group }; self.registry.insert(pid, counters); } @@ -49,12 +63,17 @@ impl super::KernelModule for PerfEstimator { let Some(counters) = self.registry.get_mut(&pid) else { return 0; }; + + let counts = match counters.group.read() { + Ok(counts) => counts, + Err(_) => return 0, + }; + let mut sum = 0; - let counts = counters.group.read().unwrap(); for ((factor, _ty), count) in EVENT_TYPES.iter().zip(counts.iter()) { sum += *factor as u64 * count.1; } - // dbg!(sum); + sum } } diff --git a/src/main.rs b/src/main.rs index c477a0f..0bf1c12 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,324 +3,19 @@ pub use bpf_skel::*; pub mod bpf_intf; mod e_core_selector; -mod mock; -mod task_state; +mod energy; +mod scheduler; #[rustfmt::skip] mod bpf; -use bpf::*; +use anyhow::Result; use clap::{Arg, ArgAction, Command}; -use dashmap::DashMap; -use e_core_selector::{ECoreSelector, RoundRobinSelector}; -use mock::perf::PerfEstimator; -use scx_utils::{Topology, UserExitInfo}; - -use libbpf_rs::OpenObject; -use mock::{KernelDriver, KernelModule, MockModule}; -use task_state::TaskState; - -use std::any::Any; -use std::collections::{BTreeSet, HashMap, HashSet, VecDeque}; -use std::fs::File; -use std::io::{BufRead, BufReader}; +use scheduler::Scheduler; use std::mem::MaybeUninit; -use std::ops::Range; -use std::sync::mpsc::{Receiver, Sender, SyncSender}; -use std::sync::{Arc, OnceLock}; -use std::time::Duration; -use std::{process, thread}; - -use anyhow::Result; type Pid = i32; -const SLICE_US: u64 = 50000; - -struct Scheduler<'a> { - bpf: BpfScheduler<'a>, - task_queue: VecDeque<QueuedTask>, - no_budget_task_queue: VecDeque<QueuedTask>, - managed_tasks: HashMap<Pid, TaskState>, - maximum_budget: u64, - power_cap: u64, - own_pid: Pid, - p_cores: Range<i32>, - e_cores: Range<i32>, - topology: Topology, - e_core_selector: Box<dyn ECoreSelector>, - // reciever: Receiver<(Pid, Response)>, - sender: SyncSender<(Pid, Request)>, - hashmap: Arc<DashMap<Pid, (u64, i32)>>, -} - -enum Request { - NewTask, - RemoveTask, -} - -impl<'a> Scheduler<'a> { - fn init( - open_object: &'a mut MaybeUninit<OpenObject>, - use_mocking: bool, - power_cap: u64, - ) -> Result<Self> { - let bpf = BpfScheduler::init( - open_object, - 0, // exit_dump_len (buffer size of exit info, 0 = default) - false, // partial (false = include all tasks) - false, // debug (false = debug mode off) - )?; - dbg!("registering rust user space scheduler"); - let map: DashMap<i32, (u64, i32)> = DashMap::with_capacity(100000); - let map = Arc::new(map); - let thread_map = map.clone(); - - let topology = Topology::new().unwrap(); - - let read_cores = |core_type: &str| { - let e_cores_file = File::open(format!("/sys/devices/cpu_{core_type}/cpus"))?; - let e_cores_reader = BufReader::new(e_cores_file); - let e_cores = if let Some(Ok(line)) = e_cores_reader.lines().next() { - let cores: Vec<&str> = line.split('-').collect(); - cores[0].parse::<i32>().unwrap()..cores[1].parse::<i32>().unwrap() - } else { - panic!( - "Was not able to differentiate between core types. Does the system have e cores?" - ) - }; - Ok::<_, std::io::Error>(e_cores) - }; - let e_cores = read_cores("atom").unwrap_or(0..4); - let p_cores = read_cores("core").unwrap_or(4..8); - - let selector = Box::new(RoundRobinSelector::new(&e_cores)); - - let (request_send, request_recieve) = std::sync::mpsc::sync_channel(1000); - - std::thread::spawn(move || { - let mut module: Box<dyn KernelModule> = if use_mocking { - Box::new(PerfEstimator::default()) - // Box::new(MockModule::default()) - } else { - Box::new(KernelDriver::default()) - }; - let mut tasks = BTreeSet::new(); - - loop { - for pid in tasks.iter() { - let energy = module.read_consumption(*pid as u64); - if let Some(mut entry) = thread_map.try_get_mut(pid).try_unwrap() { - entry.0 = energy; - } - } - - if let Ok((pid, request)) = request_recieve.try_recv() { - match request { - Request::NewTask => { - let parent = (|| { - let process = procfs::process::Process::new(pid)?; - process.stat().map(|stat| stat.ppid) - })() - .unwrap_or_default(); - module.start_trace(pid as u64); - tasks.insert(pid); - thread_map.insert(pid, (0, parent)); - } - Request::RemoveTask => { - tasks.remove(&pid); - module.stop_trace(pid as u64) - } - } - } - std::thread::sleep(Duration::from_micros(5000)); - } - }); - - Ok(Self { - bpf, - task_queue: VecDeque::new(), - no_budget_task_queue: VecDeque::new(), - managed_tasks: HashMap::new(), - maximum_budget: u64::MAX, - power_cap, - own_pid: process::id() as i32, - p_cores, - e_cores, - topology, - e_core_selector: selector, - sender: request_send, - // reciever: response_recieve, - hashmap: map, - }) - } - - fn consume_all_tasks(&mut self) { - // Consume all tasks that are ready to run. - // - // Each task contains the following details: - // - // pub struct QueuedTask { - // pub pid: i32, // pid that uniquely identifies a task - // pub cpu: i32, // CPU where the task is running - // pub sum_exec_runtime: u64, // Total cpu time - // pub weight: u64, // Task static priority - // pub nvcsw: u64, // Total amount of voluntary context switches - // pub slice: u64, // Remaining time slice budget - // pub vtime: u64, // Current task vruntime / deadline (set by the scheduler) - // } - // - // Although the FIFO scheduler doesn't use these fields, they can provide valuable data for - // implementing more sophisticated scheduling policies. - // let mut contended = Vec::with_capacity(100); - - while let Ok(Some(task)) = self.bpf.dequeue_task() { - // The scheduler itself has to be scheduled regardless of its energy usage - if task.pid == self.own_pid { - self.task_queue.push_front(task); - self.dispatch_next_task(); - continue; - } - - match self.hashmap.try_get(&task.pid) { - dashmap::try_result::TryResult::Present(value) => { - let (energy, parent) = *value; - - // dbg!(energy); - // TODO: fix - if energy == 0 { - // println!("budget zero"); - self.no_budget_task_queue.push_back(task); - } else { - self.task_queue.push_back(task); - } - } - dashmap::try_result::TryResult::Absent => { - self.hashmap.insert(task.pid, (0, 0)); - self.sender.try_send((task.pid, Request::NewTask)).unwrap(); - self.task_queue.push_back(task); - } - dashmap::try_result::TryResult::Locked => { - println!("locked"); - self.task_queue.push_back(task); - } - } - } - } - - fn dispatch_next_task(&mut self) { - if let Some(task) = self.task_queue.pop_front() { - // Create a new task to be dispatched, derived from the received enqueued task. - // - // pub struct DispatchedTask { - // pub pid: i32, // pid that uniquely identifies a task - // pub cpu: i32, // target CPU selected by the scheduler - // pub flags: u64, // special dispatch flags - // pub slice_ns: u64, // time slice assigned to the task (0 = default) - // } - // - // The dispatched task's information are pre-populated from the QueuedTask and they can - // be modified before dispatching it via self.bpf.dispatch_task(). - let mut dispatched_task = DispatchedTask::new(&task); - - // Decide where the task needs to run (target CPU). - // - // A call to select_cpu() will return the most suitable idle CPU for the task, - // considering its previously used CPU. - let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0); - if cpu >= 0 { - dispatched_task.cpu = cpu; - } else { - dispatched_task.flags |= RL_CPU_ANY as u64; - } - - // Decide for how long the task needs to run (time slice); if not specified - // SCX_SLICE_DFL will be used by default. - dispatched_task.slice_ns = SLICE_US; - - // Dispatch the task on the target CPU. - self.bpf.dispatch_task(&dispatched_task).unwrap(); - - // Notify the BPF component of the number of pending tasks and immediately give a - // chance to run to the dispatched task. - self.bpf.notify_complete( - self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64, - ); - } else if let Some(task) = self.no_budget_task_queue.pop_front() { - //TODO: do something with tasks that have no budget left - let mut dispatched_task = DispatchedTask::new(&task); - - let cpu = self.e_core_selector.next_core(); - if cpu >= 0 { - dispatched_task.cpu = cpu; - } else { - dispatched_task.flags |= RL_CPU_ANY as u64; - } - - if task.pid == self.own_pid { - dispatched_task.slice_ns = SLICE_US * 1000; - } - dispatched_task.slice_ns = SLICE_US; - self.bpf.dispatch_task(&dispatched_task).unwrap(); - self.bpf.notify_complete( - self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64, - ); - } - } - - fn dispatch_tasks(&mut self) { - //TODO: we should probably not do this every time, but instead wait a predefined amount of time between invocations - self.reset_budgets_and_garbage_collect(); - - loop { - // Consume all tasks before dispatching any. - self.consume_all_tasks(); - - // Dispatch one task from the queue. - self.dispatch_next_task(); - - // If no task is ready to run (or in case of error), stop dispatching tasks and notify - // the BPF component that all tasks have been scheduled / dispatched, with no remaining - // pending tasks. - //TODO: both queues? - if self.task_queue.is_empty() && self.no_budget_task_queue.is_empty() { - self.bpf.notify_complete(0); - - break; - } - } - } - - fn run(&mut self) -> Result<UserExitInfo> { - while !self.bpf.exited() { - self.dispatch_tasks(); - } - self.bpf.shutdown_and_report() - } - - fn get_new_budget(&self) -> u64 { - //TODO - u64::MAX - } - - fn reset_budgets_and_garbage_collect(&mut self) { - let old_budget = self.maximum_budget; - self.maximum_budget = self.get_new_budget(); - - self.managed_tasks.retain(|key, value| { - let was_scheduled = value.budget == old_budget; - if was_scheduled { - value.budget = self.maximum_budget; - } else { - self.sender - .try_send(({ *key }, Request::RemoveTask)) - .unwrap(); - } - was_scheduled - }); - } -} - fn main() -> Result<()> { let matches = Command::new("Energy User Space Scheduler") .arg( @@ -343,7 +38,7 @@ fn main() -> Result<()> { let power_cap = *matches.get_one::<u64>("power_cap").unwrap_or(&u64::MAX); let use_mocking = matches.get_flag("mock"); - // Initialize and load the FIFO scheduler. + // Initialize and load the scheduler. let mut open_object = MaybeUninit::uninit(); loop { let mut sched = Scheduler::init(&mut open_object, use_mocking, power_cap)?; diff --git a/src/mock.rs b/src/mock.rs deleted file mode 100644 index fc6fcf1..0000000 --- a/src/mock.rs +++ /dev/null @@ -1,65 +0,0 @@ -use iocuddle::*; -use rand::Rng; -pub mod perf; - -pub trait KernelModule { - fn start_trace(&mut self, _pid: u64) {} - fn stop_trace(&mut self, _pid: u64) {} - fn read_consumption(&mut self, pid: u64) -> u64; -} - -#[derive(Default)] -pub struct MockModule; - -impl KernelModule for MockModule { - fn start_trace(&mut self, pid: u64) { - // println!("starting trace of {pid}"); - } - - fn stop_trace(&mut self, pid: u64) { - // println!("stopping trace of {pid}"); - } - - fn read_consumption(&mut self, _pid: u64) -> u64 { - // rand::rng().random() - 14 - } -} - -#[derive(Debug)] -pub struct KernelDriver { - file: std::fs::File, -} - -impl Default for KernelDriver { - fn default() -> Self { - KernelDriver::new("/dev/rust-pow").unwrap() - } -} -impl KernelDriver { - fn new(path: impl AsRef<std::path::Path>) -> Result<Self, std::io::Error> { - let file = std::fs::File::open(path)?; - Ok(Self { file }) - } -} - -const PERF_MON: Group = Group::new(b'|'); -const START_TRACE: Ioctl<Write, &u64> = unsafe { PERF_MON.write(0x80) }; -const STOP_TRACE: Ioctl<Write, &u64> = unsafe { PERF_MON.write(0x81) }; -const READ_POWER: Ioctl<WriteRead, &u64> = unsafe { PERF_MON.write_read(0x82) }; - -impl KernelModule for KernelDriver { - fn start_trace(&mut self, pid: u64) { - START_TRACE.ioctl(&mut self.file, &pid).unwrap(); - } - - fn stop_trace(&mut self, pid: u64) { - STOP_TRACE.ioctl(&mut self.file, &pid).unwrap(); - } - - fn read_consumption(&mut self, pid: u64) -> u64 { - let mut arg = pid; - READ_POWER.ioctl(&mut self.file, &mut arg).unwrap(); - arg - } -} diff --git a/src/scheduler.rs b/src/scheduler.rs new file mode 100644 index 0000000..6188da1 --- /dev/null +++ b/src/scheduler.rs @@ -0,0 +1,223 @@ +use crate::bpf::*; +use crate::energy::{self, Request as EnergyRequest}; + +use crate::e_core_selector::{ECoreSelector, RoundRobinSelector}; +use anyhow::Result; +use dashmap::DashMap; +use libbpf_rs::OpenObject; +use scx_utils::{Topology, UserExitInfo}; + +use std::collections::{HashMap, HashSet, VecDeque}; +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::mem::MaybeUninit; +use std::ops::Range; +use std::process; +use std::sync::{mpsc, Arc}; +use std::time::{Duration, Instant}; + +use crate::Pid; + +const SLICE_US: u64 = 50000; + +pub struct Scheduler<'a> { + bpf: BpfScheduler<'a>, + task_queue: VecDeque<QueuedTask>, + no_budget_task_queue: VecDeque<QueuedTask>, + managed_tasks: HashMap<Pid, Instant>, + maximum_budget: u64, + own_pid: Pid, + p_cores: Range<i32>, + e_cores: Range<i32>, + topology: Topology, + to_remove: Vec<Pid>, + e_core_selector: Box<dyn ECoreSelector>, + energy_sender: mpsc::SyncSender<EnergyRequest>, + shared_budgets: Arc<DashMap<Pid, u64>>, +} + +impl<'a> Scheduler<'a> { + pub fn init( + open_object: &'a mut MaybeUninit<OpenObject>, + use_mocking: bool, + power_cap: u64, + ) -> Result<Self> { + let bpf = BpfScheduler::init( + open_object, + 0, // exit_dump_len (buffer size of exit info, 0 = default) + false, // partial (false = include all tasks) + false, // debug (false = debug mode off) + )?; + + println!("Initializing energy-aware scheduler"); + + // Shared budget map between energy service and scheduler + let shared_budgets: Arc<DashMap<Pid, u64>> = Arc::new(DashMap::with_capacity(100000)); + + // Start energy tracking service + let energy_sender = + energy::start_energy_service(use_mocking, power_cap, shared_budgets.clone()); + + let topology = Topology::new().unwrap(); + + let read_cores = |core_type: &str| { + let e_cores_file = File::open(format!("/sys/devices/cpu_{core_type}/cpus"))?; + let e_cores_reader = BufReader::new(e_cores_file); + let e_cores = if let Some(Ok(line)) = e_cores_reader.lines().next() { + let cores: Vec<&str> = line.split('-').collect(); + cores[0].parse::<i32>().unwrap()..cores[1].parse::<i32>().unwrap() + } else { + panic!( + "Was not able to differentiate between core types. Does the system have e cores?" + ) + }; + Ok::<_, std::io::Error>(e_cores) + }; + let e_cores = read_cores("atom").unwrap_or(0..4); + let p_cores = read_cores("core").unwrap_or(4..8); + + let selector = Box::new(RoundRobinSelector::new(&e_cores)); + let to_remove = Vec::with_capacity(1000); + + Ok(Self { + bpf, + task_queue: VecDeque::new(), + no_budget_task_queue: VecDeque::new(), + managed_tasks: HashMap::new(), + maximum_budget: u64::MAX, + own_pid: process::id() as i32, + p_cores, + e_cores, + topology, + e_core_selector: selector, + energy_sender, + shared_budgets, + to_remove, + }) + } + + fn consume_all_tasks(&mut self) { + while let Ok(Some(task)) = self.bpf.dequeue_task() { + // The scheduler itself has to be scheduled regardless of its energy usage + if task.pid == self.own_pid { + self.task_queue.push_front(task); + self.dispatch_next_task(); + continue; + } + + // Check if we've seen this task before + if let std::collections::hash_map::Entry::Vacant(e) = self.managed_tasks.entry(task.pid) + { + e.insert(Instant::now()); + // New task - register it with the energy service + self.energy_sender + .try_send(EnergyRequest::NewTask(task.pid)) + .unwrap(); + } + + // Get current budget for this task + match self.shared_budgets.try_get(&task.pid) { + dashmap::try_result::TryResult::Present(budget) => match *budget { + 0 => self.task_queue.push_back(task), + _ => self.no_budget_task_queue.push_back(task), + }, + _ => self.task_queue.push_back(task), + } + } + } + + fn dispatch_next_task(&mut self) { + if let Some(task) = self.task_queue.pop_front() { + let mut dispatched_task = DispatchedTask::new(&task); + + self.managed_tasks.insert(task.pid, Instant::now()); + + let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0); + if cpu >= 0 { + dispatched_task.cpu = cpu; + } else { + dispatched_task.flags |= RL_CPU_ANY as u64; + } + + dispatched_task.slice_ns = SLICE_US; + + if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { + eprintln!("Failed to dispatch task: {}", e); + } + + self.bpf.notify_complete( + self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64, + ); + } else if let Some(task) = self.no_budget_task_queue.pop_front() { + let mut dispatched_task = DispatchedTask::new(&task); + + // Low budget tasks go to e-cores + let cpu = self.e_core_selector.next_core(); + if cpu >= 0 { + dispatched_task.cpu = cpu; + } else { + dispatched_task.flags |= RL_CPU_ANY as u64; + } + + // Scheduler tasks get longer slices + if task.pid == self.own_pid { + dispatched_task.slice_ns = SLICE_US * 1000; + } else { + dispatched_task.slice_ns = SLICE_US; + } + + if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { + eprintln!("Failed to dispatch low-budget task: {}", e); + } + + self.bpf.notify_complete( + self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64, + ); + } + } + + fn cleanup_old_tasks(&mut self) { + let current = Instant::now(); + for (pid, last_scheduled) in &self.managed_tasks { + if current - *last_scheduled > Duration::from_secs(5) { + self.to_remove.push(*pid); + } + } + for pid in self.to_remove.drain(..) { + self.managed_tasks.remove(&pid); + self.energy_sender + .try_send(EnergyRequest::RemoveTask(pid)) + .unwrap(); + } + } + + fn dispatch_tasks(&mut self) { + loop { + self.consume_all_tasks(); + self.dispatch_next_task(); + + if self.task_queue.is_empty() && self.no_budget_task_queue.is_empty() { + self.bpf.notify_complete(0); + break; + } + } + } + + pub fn run(&mut self) -> Result<UserExitInfo> { + let mut i = 0; + while !self.bpf.exited() { + i += 1; + self.dispatch_tasks(); + if i % 100 == 0 { + self.cleanup_old_tasks(); + } + } + + // Clean up - signal the energy service to stop tracking all managed tasks + for pid in self.managed_tasks.keys() { + let _ = self.energy_sender.send(EnergyRequest::RemoveTask(*pid)); + } + + self.bpf.shutdown_and_report() + } +} diff --git a/src/task_state.rs b/src/task_state.rs deleted file mode 100644 index f88bcc2..0000000 --- a/src/task_state.rs +++ /dev/null @@ -1,7 +0,0 @@ -use crate::Pid; - -pub struct TaskState { - pub previous_energy_usage: u64, - pub budget: u64, - pub parent: Pid, -} |