diff options
author | Dennis Kobert <dennis@kobert.dev> | 2025-04-17 13:26:17 +0200 |
---|---|---|
committer | Dennis Kobert <dennis@kobert.dev> | 2025-04-17 13:26:17 +0200 |
commit | 654c6779adc495235621e41be6cc0000d0a88dce (patch) | |
tree | c4ec25054cba1935bcb3bd4ac7b4cb96294fd804 | |
parent | 0394c5144a6cf237ec53cbd694afa29d838519d5 (diff) |
Calculate budget deduction in scheduler
-rw-r--r-- | src/energy.rs | 142 | ||||
-rw-r--r-- | src/energy/budget.rs | 68 | ||||
-rw-r--r-- | src/scheduler.rs | 11 | ||||
-rw-r--r-- | src/socket.rs | 9 |
4 files changed, 110 insertions, 120 deletions
diff --git a/src/energy.rs b/src/energy.rs index 0a49e07..9f218e7 100644 --- a/src/energy.rs +++ b/src/energy.rs @@ -5,6 +5,7 @@ mod trackers; use crate::energy::estimator::Estimator; use std::collections::{BTreeSet, HashMap}; +use std::sync::atomic::Ordering::Relaxed; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicU64}; use std::sync::{mpsc, Arc, RwLock}; use std::thread; @@ -14,11 +15,12 @@ use crate::freq::FrequencyKHZ; use crate::socket; use crate::Pid; -pub use budget::BudgetPolicy; +use budget::SimpleCappingPolicy; pub use trackers::{KernelDriver, PerfEstimator}; const IDLE_CONSUMPTION_W: f64 = 7.; const UPDATE_INTERVAL: Duration = Duration::from_millis(10); +const DEFAULT_POWER_DRAW_MW: u64 = 15_000; pub enum Request { NewTask(Pid, Arc<TaskInfo>), @@ -26,22 +28,22 @@ pub enum Request { } pub struct TaskInfo { pub cpu: AtomicI32, - pub budget: AtomicU64, + pub budget: AtomicI64, + pub last_total_runtime: AtomicU64, + pub power_draw: AtomicU64, pub running_on_e_core: AtomicBool, pub last_scheduled: AtomicI64, } impl TaskInfo { pub fn read_cpu(&self) -> i32 { - self.cpu.load(std::sync::atomic::Ordering::Relaxed) + self.cpu.load(Relaxed) } - pub fn read_budget(&self) -> u64 { - self.budget.load(std::sync::atomic::Ordering::Relaxed) + pub fn read_budget(&self) -> i64 { + self.budget.load(Relaxed) } pub fn read_time_since_last_schedule(&self) -> Option<Duration> { - let old_time = self - .last_scheduled - .load(std::sync::atomic::Ordering::Relaxed); + let old_time = self.last_scheduled.load(Relaxed); if old_time == -1 { None } else { @@ -50,29 +52,42 @@ impl TaskInfo { } } pub fn read_time_since_last_schedule_raw(&self) -> i64 { - self.last_scheduled - .load(std::sync::atomic::Ordering::Relaxed) + self.last_scheduled.load(Relaxed) } pub fn set_cpu(&self, cpu: i32) { - self.cpu.store(cpu, std::sync::atomic::Ordering::Relaxed); + self.cpu.store(cpu, Relaxed); } pub fn set_last_scheduled_to_now(&self) { - self.last_scheduled.store( - chrono::Utc::now().timestamp_micros(), - std::sync::atomic::Ordering::Relaxed, - ); + self.last_scheduled + .store(chrono::Utc::now().timestamp_micros(), Relaxed); } pub fn set_last_scheduled_raw(&self, last_scheduled: i64) { - self.last_scheduled - .store(last_scheduled, std::sync::atomic::Ordering::Relaxed); + self.last_scheduled.store(last_scheduled, Relaxed); + } + pub fn set_budget(&self, budget: i64) { + self.budget.store(budget, Relaxed); } - pub fn set_budget(&self, budget: u64) { - self.budget - .store(budget, std::sync::atomic::Ordering::Relaxed); + pub fn add_budget(&self, budget: i64) { + self.budget.fetch_add(budget, Relaxed); + self.budget.fetch_min(budget::MAX_BUDGET_MJ, Relaxed); } pub fn set_running_on_e_core(&self, running_on_e_core: bool) { - self.running_on_e_core - .store(running_on_e_core, std::sync::atomic::Ordering::Relaxed); + self.running_on_e_core.store(running_on_e_core, Relaxed); + } + pub fn update_runtime(&self, new_running_time: u64) -> u64 { + new_running_time - self.last_total_runtime.swap(new_running_time, Relaxed) + } + pub fn update_budget(&self, running_time: Duration) -> i64 { + let power_draw = self.power_draw.load(Relaxed); + self.budget.fetch_sub( + (power_draw as f64 * running_time.as_secs_f64()) as i64, + Relaxed, + ); + self.budget.fetch_max(-budget::MAX_BUDGET_MJ, Relaxed) + } + // Set powerdraw in milli watt + pub fn set_power_draw(&self, power_draw_mw: u64) { + self.power_draw.store(power_draw_mw, Relaxed); } } @@ -80,18 +95,20 @@ impl Default for TaskInfo { fn default() -> Self { Self { cpu: Default::default(), - budget: AtomicU64::new(u64::MAX), + budget: AtomicI64::new(budget::MAX_BUDGET_MJ), + last_total_runtime: Default::default(), running_on_e_core: Default::default(), last_scheduled: AtomicI64::new(-1), + power_draw: AtomicU64::new(DEFAULT_POWER_DRAW_MW), } } } #[derive(Clone)] pub struct ProcessInfo { - pub energy: f64, - pub energy_delta: f64, - pub tree_energy: f64, + pub energy_j: f64, + pub energy_delta_j: f64, + pub tree_energy_j: f64, pub last_update: std::time::Instant, pub parent: Pid, pub task_info: Arc<TaskInfo>, @@ -99,7 +116,7 @@ pub struct ProcessInfo { pub struct EnergyService { estimator: Box<dyn Estimator>, - budget_policy: Option<Box<dyn BudgetPolicy>>, + budget_policy: Option<SimpleCappingPolicy>, // contains the same data as the keys of process_info but having this reduces contention and // avoids unnecessary clone active_processes: BTreeSet<Pid>, @@ -107,9 +124,9 @@ pub struct EnergyService { request_receiver: mpsc::Receiver<Request>, update_interval: Duration, rapl_offset: f64, - last_energy_diff: f64, + last_energy_diff_j: f64, last_time_between_measurements: Duration, - old_rapl: f64, + old_rapl_j: f64, system_energy: f64, bias: f64, graveyard: Vec<i32>, @@ -120,7 +137,7 @@ impl EnergyService { #[allow(clippy::too_many_arguments)] pub fn new( estimator: Box<dyn Estimator>, - budget_policy: Box<dyn BudgetPolicy>, + budget_policy: SimpleCappingPolicy, process_info: Arc<RwLock<HashMap<Pid, ProcessInfo>>>, request_receiver: mpsc::Receiver<Request>, update_interval: Duration, @@ -133,9 +150,9 @@ impl EnergyService { request_receiver, update_interval, rapl_offset: rapl::read_package_energy().unwrap(), - last_energy_diff: 0f64, + last_energy_diff_j: 0f64, last_time_between_measurements: Duration::new(0, 0), - old_rapl: 0., + old_rapl_j: 0., system_energy: 0., bias: 1., graveyard: Vec::with_capacity(100), @@ -156,10 +173,9 @@ impl EnergyService { self.update_measurements(); self.clear_graveyeard(); - - // Calculate and update budgets - self.update_budgets(); } + // Calculate and update budgets + self.update_budgets(); // Sleep for update interval thread::sleep(self.update_interval); @@ -213,9 +229,9 @@ impl EnergyService { self.process_info.write().unwrap().insert( pid, ProcessInfo { - energy: 0., - energy_delta: 0., - tree_energy: 0., + energy_j: 0., + energy_delta_j: 0., + tree_energy_j: 0., last_update: std::time::Instant::now(), parent, task_info: task_info.clone(), @@ -241,8 +257,9 @@ impl EnergyService { .read() .unwrap() .get(&1) - .map(|info| info.tree_energy) + .map(|info| info.tree_energy_j) .unwrap_or(0.); + let elapsed = self.last_measurement.elapsed(); for pid in &self.active_processes { let mut process_info = self.process_info.write().unwrap(); if let Some(info) = process_info.get_mut(pid) { @@ -254,34 +271,36 @@ impl EnergyService { { continue; } - if let Some(energy) = self.estimator.read_consumption(*pid as u64) { - info.energy_delta = energy * self.bias; - info.energy += energy * self.bias; - info.tree_energy += energy * self.bias; + if let Some(energy_j) = self.estimator.read_consumption(*pid as u64) { + info.energy_delta_j = energy_j * self.bias; + info.task_info.set_power_draw( + (info.energy_delta_j * 1000. / elapsed.as_secs_f64()) as u64, + ); + info.energy_j += energy_j * self.bias; + info.tree_energy_j += energy_j * self.bias; self.estimator .update_information(*pid as u64, info.task_info.read_cpu()); let mut parent = info.parent; while let Some(info) = process_info.get_mut(&parent) { - info.tree_energy += energy * self.bias; + info.tree_energy_j += energy_j * self.bias; info.last_update = std::time::Instant::now(); parent = info.parent; } } } } - let elapsed = self.last_measurement.elapsed(); self.last_time_between_measurements = elapsed; self.last_measurement = Instant::now(); - let rapl = rapl::read_package_energy().unwrap() - self.rapl_offset; - let rapl_diff = rapl - self.old_rapl; - self.last_energy_diff = rapl_diff; - self.old_rapl = rapl; + let rapl_j = rapl::read_package_energy().unwrap() - self.rapl_offset; + let rapl_diff_j = rapl_j - self.old_rapl_j; + self.last_energy_diff_j = rapl_diff_j; + self.old_rapl_j = rapl_j; let idle_consumption = elapsed.as_secs_f64() * IDLE_CONSUMPTION_W; if let Some(init) = self.process_info.write().unwrap().get_mut(&1) { - let est_diff = init.tree_energy - old_energy + idle_consumption; - let current_bias = if init.tree_energy - old_energy > idle_consumption * 0.5 { - (rapl_diff / est_diff).clamp(0.1, 2.) + let est_diff = init.tree_energy_j - old_energy + idle_consumption; + let current_bias = if init.tree_energy_j - old_energy > idle_consumption * 0.5 { + (rapl_diff_j / est_diff).clamp(0.1, 2.) } else { 1. }; @@ -295,14 +314,12 @@ impl EnergyService { fn update_budgets(&mut self) { // We can't call self.budget_policy.calculate_budgets(self) directly because the first self borrows immutable and the self second borrows mutable let mut policy = self.budget_policy.take().unwrap(); - let budgets = policy.calculate_budgets(self); + let budget_refill = policy.calculate_budgets(self); self.budget_policy = Some(policy); // Update the shared budgets map - for (pid, budget) in budgets { - if let Some(entry) = self.process_info.write().unwrap().get(&pid) { - entry.task_info.set_budget(budget); - } + for entry in self.process_info.write().unwrap().values_mut() { + entry.task_info.add_budget(budget_refill); } } @@ -313,15 +330,6 @@ impl EnergyService { self.process_info.write().unwrap().remove(&pid); } } - - pub fn all_process_energy_deltas(&self) -> HashMap<Pid, f64> { - self.process_info - .read() - .unwrap() - .iter() - .map(|(&key, info)| (key, info.energy_delta)) - .collect() - } } pub fn start_energy_service( @@ -343,7 +351,7 @@ pub fn start_energy_service( let power_cap = socket::start_logging_socket_service("/tmp/pm-sched", process_info.clone())?; // Create budget policy - let budget_policy = Box::new(budget::SimpleCappingPolicy::new(power_cap)); + let budget_policy = budget::SimpleCappingPolicy::new(power_cap); // shouldn't be a problem because we are privileged // if PackageEnergy::check_paranoid().unwrap_or(3) > 0 {} diff --git a/src/energy/budget.rs b/src/energy/budget.rs index b049814..6356132 100644 --- a/src/energy/budget.rs +++ b/src/energy/budget.rs @@ -1,19 +1,17 @@ -use std::{collections::HashMap, sync::atomic::AtomicU32}; +use std::{sync::atomic::AtomicU32, time::Instant}; use crate::energy::EnergyService; use std::sync::Arc; -type Pid = i32; +use super::rapl; -const MAX_BUDGET: u64 = 30000; - -pub trait BudgetPolicy: Send + 'static { - fn calculate_budgets(&mut self, energy_service: &mut EnergyService) -> HashMap<Pid, u64>; -} +pub const MAX_BUDGET_MJ: i64 = 30_000; pub struct SimpleCappingPolicy { power_cap: Arc<AtomicU32>, last_ratio: f64, + last_energy: f64, + last_measurement: Instant, } impl SimpleCappingPolicy { @@ -21,55 +19,27 @@ impl SimpleCappingPolicy { Self { power_cap, last_ratio: 1., + last_energy: rapl::read_package_energy().unwrap(), + last_measurement: Instant::now(), } } } -impl BudgetPolicy for SimpleCappingPolicy { - fn calculate_budgets(&mut self, energy_service: &mut EnergyService) -> HashMap<Pid, u64> { - let mut budgets = HashMap::new(); - let process_energies = energy_service.all_process_energy_deltas(); - - // Total energy consumption across all processes - //let total_energy: u64 = process_energies.values().sum(); +impl SimpleCappingPolicy { + pub fn calculate_budgets(&mut self, energy_service: &mut EnergyService) -> i64 { + let active_processes = energy_service.active_processes.len(); + let rapl = rapl::read_package_energy().unwrap(); + let actual_energy = rapl - self.last_energy; + self.last_energy = rapl; - let actual_energy = energy_service.last_energy_diff; let energy_cap = self.power_cap.load(std::sync::atomic::Ordering::Relaxed) as f64 - * energy_service.last_time_between_measurements.as_secs_f64(); - //println!("{actual_energy} {energy_cap}"); - let base_energy_per_process = - energy_cap / process_energies.iter().filter(|(_, e)| **e > 0f64).count() as f64; - let ratio = energy_cap / actual_energy * self.last_ratio; - self.last_ratio = ratio.clamp(0.001, 100.0); - - for (pid, energy) in process_energies { - let budget = budgets.entry(pid).or_insert(0); - *budget = (*budget + ((ratio * base_energy_per_process - energy) * 1000.) as u64) - //.min((ratio * base_energy_per_process * MAX_BUDGET_FACTOR * 1000.) as u64); - .min(MAX_BUDGET); - if energy != 0.0 { - //println!("budget: {budget} energy: {energy} ratio: {ratio} base: {base_energy_per_process}"); - } - } + * self.last_measurement.elapsed().as_secs_f64(); + self.last_measurement = Instant::now(); - // Simple proportional distribution if over cap - //if actual_energy > energy_cap { - // let ratio = energy_cap / actual_energy; - // - // for (&pid, &energy) in &process_energies { - // // Calculate a scaled budget based on the ratio - // // Higher energy consumers get proportionally reduced budgets - // let scaling_factor = 1.0 - ((energy as f64 / actual_energy) * (1.0 - ratio)); - // let budget = (u64::MAX as f64 * scaling_factor) as u64; - // budgets.insert(pid, budget); - // } - // } else { - // // Under power cap, assign maximum budget to all - // for pid in energy_service.active_processes() { - // budgets.insert(*pid, u64::MAX); - // } - // } + let base_energy_per_process = energy_cap / active_processes as f64; + let ratio = energy_cap / actual_energy * self.last_ratio; + self.last_ratio = ratio.clamp(0.001, 1000.0); - budgets + (((ratio * base_energy_per_process) * 1000.) as i64).min(MAX_BUDGET_MJ) } } diff --git a/src/scheduler.rs b/src/scheduler.rs index 0832049..dfd879b 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -158,9 +158,11 @@ impl<'a> Scheduler<'a> { while let Ok(Some(mut task)) = self.bpf.dequeue_task() { // The scheduler itself has to be scheduled regardless of its energy usage if task.pid == self.own_pid { + task.vtime = 10; self.task_queue.push_front(task); continue; } + // TODO: consider adjusting vtime for all processes // Check if we've seen this task before match self.managed_tasks.entry(task.pid) { @@ -182,8 +184,13 @@ impl<'a> Scheduler<'a> { } std::collections::hash_map::Entry::Occupied(e) => { // Get current budget for this task - match e.get().read_budget() { - 0 => self.no_budget_task_queue.push_back(task), + let slice_ns = e.get().update_runtime(task.sum_exec_runtime); + let new_budget = e.get().update_budget(Duration::from_nanos(slice_ns)); + match new_budget { + x if x < 0 => { + task.weight = 0; + self.no_budget_task_queue.push_back(task) + } x if x < 1000 => { task.weight = 0; self.task_queue.push_back(task) diff --git a/src/socket.rs b/src/socket.rs index d851eed..ce3fbdc 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -57,7 +57,12 @@ impl LoggingSocketService { let mut output = String::new(); use std::fmt::Write; for (pid, info) in self.process_info.read().unwrap().iter() { - writeln!(&mut output, "{pid},{},{}", info.energy, info.tree_energy).unwrap(); + writeln!( + &mut output, + "{pid},{},{}", + info.energy_j, info.tree_energy_j + ) + .unwrap(); } writeln!(&mut output, "#",).unwrap(); output @@ -77,7 +82,7 @@ impl LoggingSocketService { if let Some(info) = self.process_info.read().unwrap().get(&pid) { format!( "pid: {pid} process: {}J process tree: {}J\n", - info.energy, info.tree_energy + info.energy_j, info.tree_energy_j ) } else { format!("Unknown pid: {pid}\n") |