diff options
author | Lennard Kittner <lennardkittner@icloud.com> | 2025-04-15 16:45:11 +0200 |
---|---|---|
committer | Lennard Kittner <lennardkittner@icloud.com> | 2025-04-15 16:45:11 +0200 |
commit | be9d8527821e8999799930e922d79afd2ef998e9 (patch) | |
tree | a6de07b413202543eb28ad1dbb403be594307e2c | |
parent | 1e05cd3bf6896ad4da7420c6914c1381714cdf43 (diff) |
Change CLI args
Add p core selector
Fix socket argument parsing
Better budget calculation
-rw-r--r-- | src/core_selector.rs | 31 | ||||
-rw-r--r-- | src/energy.rs | 34 | ||||
-rw-r--r-- | src/energy/budget.rs | 58 | ||||
-rw-r--r-- | src/energy/trackers/perf.rs | 2 | ||||
-rw-r--r-- | src/main.rs | 22 | ||||
-rw-r--r-- | src/scheduler.rs | 29 | ||||
-rw-r--r-- | src/socket.rs | 6 |
7 files changed, 116 insertions, 66 deletions
diff --git a/src/core_selector.rs b/src/core_selector.rs new file mode 100644 index 0000000..f70b252 --- /dev/null +++ b/src/core_selector.rs @@ -0,0 +1,31 @@ +use std::ops::Range; + +pub trait CoreSelector { + fn next_core(&mut self, previous_cpu: i32) -> i32; +} + +pub struct RoundRobinSelector { + offset: u32, + num_cores: u32, + last_used: u32, +} + +impl RoundRobinSelector { + pub fn new(cores: &Range<i32>) -> RoundRobinSelector { + Self { + offset: cores.start as u32, + num_cores: cores.len() as u32, + last_used: 0, + } + } +} + +impl CoreSelector for RoundRobinSelector { + fn next_core(&mut self, previous_cpu: i32) -> i32 { + if (self.offset..(self.offset + self.num_cores)).contains(&(previous_cpu as u32)) { + return previous_cpu; + } + self.last_used += 1; + (self.offset + (self.last_used % self.num_cores.max(1))) as i32 + } +} diff --git a/src/energy.rs b/src/energy.rs index 21283b8..b9cfced 100644 --- a/src/energy.rs +++ b/src/energy.rs @@ -95,6 +95,7 @@ impl Default for TaskInfo { #[derive(Clone)] pub struct ProcessInfo { pub energy: f64, + pub energy_delta: f64, pub tree_energy: f64, pub last_update: std::time::Instant, pub parent: Pid, @@ -114,6 +115,8 @@ pub struct EnergyService { shared_policy_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>, shared_cpu_current_frequencies: Arc<RwLock<Vec<FrequencyKHZ>>>, rapl_offset: f64, + last_energy_diff: f64, + last_time_between_measurements: Duration, old_rapl: f64, system_energy: f64, bias: f64, @@ -144,6 +147,8 @@ impl EnergyService { shared_policy_frequency_ranges, shared_cpu_current_frequencies, rapl_offset: rapl::read_package_energy().unwrap(), + last_energy_diff: 0f64, + last_time_between_measurements: Duration::new(0, 0), old_rapl: 0., system_energy: 0., bias: 1., @@ -215,6 +220,7 @@ impl EnergyService { pid, ProcessInfo { energy: 0., + energy_delta: 0., tree_energy: 0., last_update: std::time::Instant::now(), parent, @@ -255,6 +261,7 @@ impl EnergyService { continue; } if let Some(energy) = self.estimator.read_consumption(*pid as u64) { + info.energy_delta = energy * self.bias; info.energy += energy * self.bias; info.tree_energy += energy * self.bias; self.estimator.update_information( @@ -272,13 +279,16 @@ impl EnergyService { } } let elapsed = self.last_measurement.elapsed(); + self.last_time_between_measurements = elapsed; self.last_measurement = Instant::now(); + let rapl = rapl::read_package_energy().unwrap() - self.rapl_offset; + let rapl_diff = rapl - self.old_rapl; + self.last_energy_diff = rapl_diff; + self.old_rapl = rapl; + let power_comsumption_watt = rapl_diff / elapsed.as_secs_f64(); + let idle_consumption = elapsed.as_secs_f64() * IDLE_CONSUMPTION_W; if let Some(init) = self.process_info.write().unwrap().get_mut(&1) { - let rapl = rapl::read_package_energy().unwrap() - self.rapl_offset; - let rapl_diff = rapl - self.old_rapl; - let idle_consumption = elapsed.as_secs_f64() * IDLE_CONSUMPTION_W; let est_diff = init.tree_energy - old_energy + idle_consumption; - self.old_rapl = rapl; // let offset_bias = (rapl / (init.tree_energy + idle_consumption)).clamp(0.1, 2.); let current_bias = if init.tree_energy - old_energy > idle_consumption * 0.5 { (rapl_diff / est_diff).clamp(0.1, 2.) @@ -291,8 +301,8 @@ impl EnergyService { .clamp(0.1, 5.); self.system_energy += est_diff; println!( - "Energy estimation: {:.1} rapl: {:.1}, est diff: {:.1} rapl diff: {:.1}, bias: {:.1}", - self.system_energy, rapl, est_diff, rapl_diff, self.bias, + "Energy estimation: {:.1} rapl: {:.1}, est diff: {:.1} rapl diff: {:.1}, bias: {:.1}, power consumption: {:.1}", + self.system_energy, rapl, est_diff, rapl_diff, self.bias, power_comsumption_watt, ); } } @@ -336,19 +346,18 @@ impl EnergyService { .map(|info| info.energy) } - pub fn all_process_energies(&self) -> HashMap<Pid, f64> { + pub fn all_process_energy_deltas(&self) -> HashMap<Pid, f64> { self.process_info .read() .unwrap() .iter() - .map(|(&key, info)| (key, info.energy)) + .map(|(&key, info)| (key, info.energy_delta)) .collect() } } pub fn start_energy_service( use_mocking: bool, - power_cap: u64, shared_cpu_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>, shared_policy_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>, shared_cpu_current_frequencies: Arc<RwLock<Vec<FrequencyKHZ>>>, @@ -363,14 +372,16 @@ pub fn start_energy_service( Box::new(KernelDriver::default()) }; + let process_info = Arc::new(RwLock::new(HashMap::new())); + + let power_cap = socket::start_logging_socket_service("/tmp/pm-sched", process_info.clone())?; + // Create budget policy let budget_policy = Box::new(budget::SimpleCappingPolicy::new(power_cap)); // shouldn't be a problem because we are privileged // if PackageEnergy::check_paranoid().unwrap_or(3) > 0 {} - let process_info = Arc::new(RwLock::new(HashMap::new())); - // Create and start the energy service let service = EnergyService::new( estimator, @@ -384,7 +395,6 @@ pub fn start_energy_service( ); service.run(); - let budget = socket::start_logging_socket_service("/tmp/pm-sched", process_info.clone())?; Ok(request_sender) } diff --git a/src/energy/budget.rs b/src/energy/budget.rs index 6154187..3edce30 100644 --- a/src/energy/budget.rs +++ b/src/energy/budget.rs @@ -1,19 +1,22 @@ -use std::collections::HashMap; +use std::{collections::HashMap, sync::atomic::AtomicU32}; use crate::energy::EnergyService; +use std::sync::Arc; type Pid = i32; +const MAX_BUDGET: u64 = 100; + pub trait BudgetPolicy: Send + 'static { fn calculate_budgets(&self, energy_service: &mut EnergyService) -> HashMap<Pid, u64>; } pub struct SimpleCappingPolicy { - power_cap: u64, + power_cap: Arc<AtomicU32>, } impl SimpleCappingPolicy { - pub fn new(power_cap: u64) -> Self { + pub fn new(power_cap: Arc<AtomicU32>) -> Self { Self { power_cap } } } @@ -21,32 +24,43 @@ impl SimpleCappingPolicy { impl BudgetPolicy for SimpleCappingPolicy { fn calculate_budgets(&self, energy_service: &mut EnergyService) -> HashMap<Pid, u64> { let mut budgets = HashMap::new(); - let process_energies = energy_service.all_process_energies(); + let process_energies = energy_service.all_process_energy_deltas(); // Total energy consumption across all processes //let total_energy: u64 = process_energies.values().sum(); - let actual_energy = - energy_service.package_energy() / energy_service.update_interval.as_secs_f64(); + let actual_energy = energy_service.last_energy_diff; + let energy_cap = self.power_cap.load(std::sync::atomic::Ordering::Relaxed) as f64 + * energy_service.last_time_between_measurements.as_secs_f64(); + println!("{actual_energy} {energy_cap}"); + let base_energy_per_process = + energy_cap / process_energies.iter().filter(|(_, e)| **e > 0f64).count() as f64; + let ratio = energy_cap / actual_energy; - // Simple proportional distribution if over cap - if actual_energy > self.power_cap as f64 { - let ratio = self.power_cap as f64 / actual_energy; - - for (&pid, &energy) in &process_energies { - // Calculate a scaled budget based on the ratio - // Higher energy consumers get proportionally reduced budgets - let scaling_factor = 1.0 - ((energy as f64 / actual_energy) * (1.0 - ratio)); - let budget = (u64::MAX as f64 * scaling_factor) as u64; - budgets.insert(pid, budget); - } - } else { - // Under power cap, assign maximum budget to all - for pid in energy_service.active_processes() { - budgets.insert(*pid, u64::MAX); - } + for (pid, energy) in process_energies { + let budget = budgets.entry(pid).or_insert(0); + *budget = (*budget + (ratio * base_energy_per_process - energy as f64) as u64) + .min(MAX_BUDGET); } + // Simple proportional distribution if over cap + //if actual_energy > energy_cap { + // let ratio = energy_cap / actual_energy; + // + // for (&pid, &energy) in &process_energies { + // // Calculate a scaled budget based on the ratio + // // Higher energy consumers get proportionally reduced budgets + // let scaling_factor = 1.0 - ((energy as f64 / actual_energy) * (1.0 - ratio)); + // let budget = (u64::MAX as f64 * scaling_factor) as u64; + // budgets.insert(pid, budget); + // } + // } else { + // // Under power cap, assign maximum budget to all + // for pid in energy_service.active_processes() { + // budgets.insert(*pid, u64::MAX); + // } + // } + budgets } } diff --git a/src/energy/trackers/perf.rs b/src/energy/trackers/perf.rs index 9262677..7223b12 100644 --- a/src/energy/trackers/perf.rs +++ b/src/energy/trackers/perf.rs @@ -161,7 +161,7 @@ impl Estimator for PerfEstimator { let time_running_ns = counts.time_running(); if time_running_ns - counters.old_time == 0 || counts.iter().next().unwrap().1 == &0 { - println!("The counters are zero although the task has been scheduled!!"); + //println!("The counters are zero although the task has been scheduled!!"); return None; } let correction_factor = 10_000_000. / (time_running_ns - counters.old_time) as f64; diff --git a/src/main.rs b/src/main.rs index 9f3528c..d5a3dcd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ pub use bpf_skel::*; pub mod bpf_intf; mod benchmark; -mod e_core_selector; +mod core_selector; mod energy; mod freq; mod model; @@ -26,10 +26,10 @@ type Pid = i32; fn main() -> Result<()> { let matches = Command::new("Energy User Space Scheduler") .arg( - Arg::new("mock") - .short('m') - .long("mock") - .help("Use this flag if to activate kernel module mocking") + Arg::new("perf") + .short('p') + .long("perf") + .help("Use this flag to switch between the kernel module and perf") .action(ArgAction::SetTrue) .required(false), ) @@ -41,13 +41,6 @@ fn main() -> Result<()> { .required(false) .value_name("mode"), ) - .arg( - Arg::new("power_cap") - .long("energy_cap") - .help("Set a power cap for the processor") - .required(false) - .value_name("power in watts"), - ) .get_matches(); let device = Default::default(); @@ -65,8 +58,7 @@ fn main() -> Result<()> { println!("energy: {energy}"); // panic!(); - let power_cap = *matches.get_one::<u64>("power_cap").unwrap_or(&u64::MAX); - let use_mocking = matches.get_flag("mock"); + let use_perf = matches.get_flag("perf"); let benchmark = matches.get_one::<String>("benchmark"); // Initialize and load the scheduler. @@ -79,7 +71,7 @@ fn main() -> Result<()> { return Ok(()); } loop { - let mut sched = Scheduler::init(&mut open_object, use_mocking, power_cap)?; + let mut sched = Scheduler::init(&mut open_object, use_perf)?; if !sched.run()?.should_restart() { break; } diff --git a/src/scheduler.rs b/src/scheduler.rs index c34e151..bcc3134 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -2,7 +2,7 @@ use crate::bpf::*; use crate::energy::{self, Request as EnergyRequest, TaskInfo}; use crate::freq::{self, FrequencyKHZ, Request as FrequencyRequest}; -use crate::e_core_selector::{ECoreSelector, RoundRobinSelector}; +use crate::core_selector::{CoreSelector, RoundRobinSelector}; use anyhow::Result; use libbpf_rs::OpenObject; use scx_utils::{Topology, UserExitInfo}; @@ -10,10 +10,10 @@ use scx_utils::{Topology, UserExitInfo}; use std::collections::{HashMap, VecDeque}; use std::mem::MaybeUninit; use std::ops::{Range, RangeInclusive}; -use std::process; use std::sync::mpsc::TrySendError; use std::sync::{mpsc, Arc, RwLock}; use std::time::Duration; +use std::{process, usize}; use crate::Pid; @@ -32,7 +32,8 @@ pub struct Scheduler<'a> { e_cores: Range<i32>, topology: Topology, to_remove: Vec<Pid>, - e_core_selector: Box<dyn ECoreSelector>, + e_core_selector: Box<dyn CoreSelector>, + p_core_selector: Box<dyn CoreSelector>, energy_sender: mpsc::SyncSender<EnergyRequest>, empty_task_infos: mpsc::Receiver<Arc<TaskInfo>>, frequency_sender: mpsc::SyncSender<FrequencyRequest>, @@ -42,11 +43,7 @@ pub struct Scheduler<'a> { } impl<'a> Scheduler<'a> { - pub fn init( - open_object: &'a mut MaybeUninit<OpenObject>, - use_mocking: bool, - power_cap: u64, - ) -> Result<Self> { + pub fn init(open_object: &'a mut MaybeUninit<OpenObject>, use_mocking: bool) -> Result<Self> { println!("Initializing energy-aware scheduler"); let shared_cpu_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>> = @@ -59,7 +56,6 @@ impl<'a> Scheduler<'a> { // Start energy tracking service let energy_sender = energy::start_energy_service( use_mocking, - power_cap, shared_cpu_frequency_ranges.clone(), shared_policy_frequency_ranges.clone(), shared_cpu_current_frequencies.clone(), @@ -88,7 +84,8 @@ impl<'a> Scheduler<'a> { let p_cores = *p_core_ids.first().unwrap_or(&0)..(*p_core_ids.last().unwrap_or(&-1) + 1); let all_cores = 0..((e_cores.len() + p_cores.len()) as u32); - let selector = Box::new(RoundRobinSelector::new(&e_cores)); + let p_core_selector = Box::new(RoundRobinSelector::new(&p_cores)); + let e_core_selector = Box::new(RoundRobinSelector::new(&e_cores)); let to_remove = Vec::with_capacity(1000); let frequency_sender = freq::start_frequency_service( @@ -127,7 +124,8 @@ impl<'a> Scheduler<'a> { tasks_scheduled: 0, e_cores, topology, - e_core_selector: selector, + e_core_selector, + p_core_selector, energy_sender, to_remove, frequency_sender, @@ -197,13 +195,17 @@ impl<'a> Scheduler<'a> { self.tasks_scheduled += 1; if let Some(task) = self.task_queue.pop_front() { let mut dispatched_task = DispatchedTask::new(&task); - //TODO: do we have to migrate tasks back from e cores? let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0); if cpu >= 0 { dispatched_task.cpu = cpu; } else { - dispatched_task.flags |= RL_CPU_ANY as u64; + dispatched_task.cpu = self.p_core_selector.next_core(task.cpu); + //dispatched_task.flags |= RL_CPU_ANY as u64; + } + + if self.e_cores.contains(&dispatched_task.cpu) { + dispatched_task.cpu = self.p_core_selector.next_core(task.cpu); } if task.pid == self.own_pid { @@ -214,6 +216,7 @@ impl<'a> Scheduler<'a> { if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { eprintln!("Failed to dispatch task: {}", e); + panic!(); } let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu); diff --git a/src/socket.rs b/src/socket.rs index 2bb0dbb..d851eed 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -41,8 +41,8 @@ impl LoggingSocketService { break; } let (command, args) = line.split_once(' ').unwrap_or((line.trim(), "")); - let output = match dbg!(command) { - "pid" => self.get_process(args.parse().unwrap_or_default()), + let output = match command { + "pid" => self.get_process(args.trim().parse().unwrap_or_default()), "list" => self.list_processes(), "limit" => self.set_power_limit(args), _ => "Unrecognized command#".into(), @@ -64,7 +64,7 @@ impl LoggingSocketService { } fn set_power_limit(&self, args: &str) -> String { - if let Ok(power_limit) = args.parse() { + if let Ok(power_limit) = args.trim().parse() { self.power_limit .store(power_limit, std::sync::atomic::Ordering::Relaxed); "#".into() |