pub mod budget; pub mod estimator; pub mod rapl; mod trackers; use crate::energy::estimator::Estimator; use std::collections::{BTreeSet, HashMap}; use std::sync::atomic::Ordering::Relaxed; use std::sync::atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicU64}; use std::sync::{mpsc, Arc, RwLock}; use std::thread; use std::time::{Duration, Instant}; use crate::freq::FrequencyKHZ; use crate::socket; use crate::Pid; use budget::SimpleCappingPolicy; pub use trackers::{KernelDriver, PerfEstimator}; const IDLE_CONSUMPTION_W: f64 = 7.; const UPDATE_INTERVAL: Duration = Duration::from_millis(10); const DEFAULT_POWER_DRAW_MW: u64 = 15_000; pub enum Request { NewTask(Pid, Arc), RemoveTask(Pid), } pub struct TaskInfo { pub cpu: AtomicI32, pub budget_mj: AtomicI64, pub last_total_runtime: AtomicU64, pub power_draw_mw: AtomicU64, pub running_on_e_core: AtomicBool, pub last_scheduled: AtomicI64, } impl TaskInfo { pub fn read_cpu(&self) -> i32 { self.cpu.load(Relaxed) } pub fn read_budget(&self) -> i64 { self.budget_mj.load(Relaxed) } pub fn read_time_since_last_schedule(&self) -> Option { let old_time = self.last_scheduled.load(Relaxed); if old_time == -1 { None } else { let now = chrono::Utc::now().timestamp_micros(); Some(Duration::from_micros((now - old_time) as u64)) } } pub fn read_time_since_last_schedule_raw(&self) -> i64 { self.last_scheduled.load(Relaxed) } pub fn set_cpu(&self, cpu: i32) { self.cpu.store(cpu, Relaxed); } pub fn set_last_scheduled_to_now(&self) { self.last_scheduled .store(chrono::Utc::now().timestamp_micros(), Relaxed); } pub fn set_last_scheduled_raw(&self, last_scheduled: i64) { self.last_scheduled.store(last_scheduled, Relaxed); } pub fn set_budget(&self, budget: i64) { self.budget_mj.store(budget, Relaxed); } pub fn add_budget(&self, budget: i64) { self.budget_mj.fetch_add(budget, Relaxed); self.budget_mj.fetch_min(budget::MAX_BUDGET_MJ, Relaxed); } pub fn set_running_on_e_core(&self, running_on_e_core: bool) { self.running_on_e_core.store(running_on_e_core, Relaxed); } pub fn update_runtime(&self, new_running_time: u64) -> u64 { new_running_time - self.last_total_runtime.swap(new_running_time, Relaxed) } pub fn update_budget(&self, running_time: Duration) -> i64 { let power_draw = self.power_draw_mw.load(Relaxed); self.budget_mj.fetch_sub( (power_draw as f64 * running_time.as_secs_f64()) as i64, Relaxed, ); self.budget_mj.fetch_max(-budget::MAX_BUDGET_MJ, Relaxed) } // Set powerdraw in milli watt pub fn set_power_draw(&self, power_draw_mw: u64) { self.power_draw_mw.store(power_draw_mw, Relaxed); } } impl Default for TaskInfo { fn default() -> Self { Self { cpu: Default::default(), budget_mj: AtomicI64::new(budget::MAX_BUDGET_MJ), last_total_runtime: Default::default(), running_on_e_core: Default::default(), last_scheduled: AtomicI64::new(-1), power_draw_mw: AtomicU64::new(DEFAULT_POWER_DRAW_MW), } } } #[derive(Clone)] pub struct ProcessInfo { pub energy_j: f64, pub energy_delta_j: f64, pub tree_energy_j: f64, pub last_update: std::time::Instant, pub parent: Pid, pub task_info: Arc, } pub struct EnergyService { estimator: Box, budget_policy: Option, // contains the same data as the keys of process_info but having this reduces contention and // avoids unnecessary clone active_processes: BTreeSet, process_info: Arc>>, request_receiver: mpsc::Receiver, update_interval: Duration, rapl_offset: f64, last_energy_diff_j: f64, last_time_between_measurements: Duration, old_rapl_j: f64, system_energy: f64, bias: f64, graveyard: Vec, last_measurement: Instant, } impl EnergyService { #[allow(clippy::too_many_arguments)] pub fn new( estimator: Box, budget_policy: SimpleCappingPolicy, process_info: Arc>>, request_receiver: mpsc::Receiver, update_interval: Duration, ) -> Self { Self { estimator, budget_policy: Some(budget_policy), active_processes: BTreeSet::new(), process_info, request_receiver, update_interval, rapl_offset: rapl::read_package_energy().unwrap(), last_energy_diff_j: 0f64, last_time_between_measurements: Duration::new(0, 0), old_rapl_j: 0., system_energy: 0., bias: 1., graveyard: Vec::with_capacity(100), last_measurement: Instant::now(), } } pub fn run(mut self) { thread::spawn(move || { let mut i = 0; loop { i += 1; // Process any incoming requests self.handle_requests(); if i % 10 == 0 { // Update energy measurements self.update_measurements(); self.clear_graveyeard(); } // Calculate and update budgets self.update_budgets(); // Sleep for update interval thread::sleep(self.update_interval); } }); } fn handle_requests(&mut self) { while let Ok(request) = self.request_receiver.try_recv() { self.handle_request(request); } } fn handle_request(&mut self, request: Request) { match request { Request::NewTask(pid, task_info) => { let Ok(process) = procfs::process::Process::new(pid) else { return; }; if let Some(info) = self.process_info.write().unwrap().get_mut(&pid) { let old_budget = task_info.read_budget(); let old_time = task_info.read_time_since_last_schedule_raw(); info.task_info = task_info.clone(); info.task_info.set_budget(old_budget); info.task_info.set_last_scheduled_raw(old_time); return; } let Ok(main_thread) = process.task_main_thread() else { return; }; if !self .process_info .read() .unwrap() .contains_key(&main_thread.pid) && self .estimator .start_trace(pid as u64, task_info.read_cpu()) .is_err() { return; } let parent = (|| { let process = procfs::process::Process::new(pid)?; process.stat().map(|stat| stat.ppid) })() .unwrap_or_default(); // We don't care whether the task has been scheduled before the counters are set up task_info.set_last_scheduled_raw(-1); self.process_info.write().unwrap().insert( pid, ProcessInfo { energy_j: 0., energy_delta_j: 0., tree_energy_j: 0., last_update: std::time::Instant::now(), parent, task_info: task_info.clone(), }, ); self.active_processes.insert(pid); if !self.process_info.read().unwrap().contains_key(&parent) && parent != 0 { self.handle_request(Request::NewTask(parent, task_info)); } } Request::RemoveTask(pid) => { if procfs::process::Process::new(pid).is_ok() { return; } self.graveyard.push(pid); } } } fn update_measurements(&mut self) { let old_energy = self .process_info .read() .unwrap() .get(&1) .map(|info| info.tree_energy_j) .unwrap_or(0.); let elapsed = self.last_measurement.elapsed(); for pid in &self.active_processes { let mut process_info = self.process_info.write().unwrap(); if let Some(info) = process_info.get_mut(pid) { if info .task_info .read_time_since_last_schedule() .unwrap_or(UPDATE_INTERVAL) >= UPDATE_INTERVAL { continue; } if let Some(energy_j) = self.estimator.read_consumption(*pid as u64) { info.energy_delta_j = energy_j * self.bias; info.task_info.set_power_draw( (info.energy_delta_j * 1000. / elapsed.as_secs_f64()) as u64, ); info.energy_j += energy_j * self.bias; info.tree_energy_j += energy_j * self.bias; self.estimator .update_information(*pid as u64, info.task_info.read_cpu()); let mut parent = info.parent; while let Some(info) = process_info.get_mut(&parent) { info.tree_energy_j += energy_j * self.bias; info.last_update = std::time::Instant::now(); parent = info.parent; } } } } self.last_time_between_measurements = elapsed; self.last_measurement = Instant::now(); let rapl_j = rapl::read_package_energy().unwrap() - self.rapl_offset; let rapl_diff_j = rapl_j - self.old_rapl_j; self.last_energy_diff_j = rapl_diff_j; self.old_rapl_j = rapl_j; let idle_consumption = elapsed.as_secs_f64() * IDLE_CONSUMPTION_W; if let Some(init) = self.process_info.write().unwrap().get_mut(&1) { let est_diff = init.tree_energy_j - old_energy + idle_consumption; let current_bias = if init.tree_energy_j - old_energy > idle_consumption * 0.5 { (rapl_diff_j / est_diff).clamp(0.1, 2.) } else { 1. }; let alpha: f64 = 10. * elapsed.as_secs_f64().recip(); self.bias = (self.bias * (alpha.recip() * current_bias + ((alpha - 1.) / alpha))) .clamp(0.1, 5.); self.system_energy += est_diff; } } fn update_budgets(&mut self) { // We can't call self.budget_policy.calculate_budgets(self) directly because the first self borrows immutable and the self second borrows mutable let mut policy = self.budget_policy.take().unwrap(); let budget_refill = policy.calculate_budgets(self); self.budget_policy = Some(policy); // Update the shared budgets map for entry in self.process_info.write().unwrap().values_mut() { entry.task_info.add_budget(budget_refill); } } fn clear_graveyeard(&mut self) { for pid in self.graveyard.drain(..) { self.estimator.stop_trace(pid as u64); self.active_processes.remove(&pid); self.process_info.write().unwrap().remove(&pid); } } } pub fn start_energy_service( use_mocking: bool, shared_cpu_current_frequencies: Arc>>, ) -> std::io::Result> { // Potentially convert back to bounded channel let (request_sender, request_receiver) = mpsc::sync_channel(10000); // Create the appropriate estimator based on configuration let estimator: Box = if use_mocking { Box::new(PerfEstimator::new(shared_cpu_current_frequencies.clone())) } else { Box::new(KernelDriver::default()) }; let process_info = Arc::new(RwLock::new(HashMap::new())); let power_cap = socket::start_logging_socket_service("/tmp/pm-sched", process_info.clone())?; // Create budget policy let budget_policy = budget::SimpleCappingPolicy::new(power_cap); // shouldn't be a problem because we are privileged // if PackageEnergy::check_paranoid().unwrap_or(3) > 0 {} // Create and start the energy service let service = EnergyService::new( estimator, budget_policy, process_info.clone(), request_receiver, UPDATE_INTERVAL, ); service.run(); Ok(request_sender) }