diff options
author | Dennis Kobert <dennis@kobert.dev> | 2025-04-16 14:22:21 +0200 |
---|---|---|
committer | Dennis Kobert <dennis@kobert.dev> | 2025-04-16 14:22:21 +0200 |
commit | 3e905300138dcd05acc8d877ffe687c4caddc597 (patch) | |
tree | d00ab91ba1d6f42cf29fc1191741ca37ddbdd6f6 | |
parent | 62e5b43451c31b9edca4fa5c5df8dd9e441c0949 (diff) |
Implement better budgeting
-rw-r--r-- | src/energy.rs | 12 | ||||
-rw-r--r-- | src/energy/budget.rs | 22 | ||||
-rw-r--r-- | src/freq.rs | 61 | ||||
-rw-r--r-- | src/scheduler.rs | 90 |
4 files changed, 125 insertions, 60 deletions
diff --git a/src/energy.rs b/src/energy.rs index 66b87cf..7440fe6 100644 --- a/src/energy.rs +++ b/src/energy.rs @@ -201,8 +201,14 @@ impl EnergyService { info.task_info.set_last_scheduled_raw(old_time); return; } - let main_thread = process.task_main_thread().unwrap().pid; - if !self.process_info.read().unwrap().contains_key(&main_thread) + let Ok(main_thread) = process.task_main_thread() else { + return; + }; + if !self + .process_info + .read() + .unwrap() + .contains_key(&main_thread.pid) && self .estimator .start_trace( @@ -315,7 +321,7 @@ impl EnergyService { fn update_budgets(&mut self) { // We can't call self.budget_policy.calculate_budgets(self) directly because the first self borrows immutable and the self second borrows mutable - let policy = self.budget_policy.take().unwrap(); + let mut policy = self.budget_policy.take().unwrap(); let budgets = policy.calculate_budgets(self); self.budget_policy = Some(policy); diff --git a/src/energy/budget.rs b/src/energy/budget.rs index 3edce30..cdb2b6d 100644 --- a/src/energy/budget.rs +++ b/src/energy/budget.rs @@ -5,24 +5,29 @@ use std::sync::Arc; type Pid = i32; -const MAX_BUDGET: u64 = 100; +const MAX_BUDGET_FACTOR: f64 = 5.0; +const MAX_BUDGET: u64 = 30000; pub trait BudgetPolicy: Send + 'static { - fn calculate_budgets(&self, energy_service: &mut EnergyService) -> HashMap<Pid, u64>; + fn calculate_budgets(&mut self, energy_service: &mut EnergyService) -> HashMap<Pid, u64>; } pub struct SimpleCappingPolicy { power_cap: Arc<AtomicU32>, + last_ratio: f64, } impl SimpleCappingPolicy { pub fn new(power_cap: Arc<AtomicU32>) -> Self { - Self { power_cap } + Self { + power_cap, + last_ratio: 1., + } } } impl BudgetPolicy for SimpleCappingPolicy { - fn calculate_budgets(&self, energy_service: &mut EnergyService) -> HashMap<Pid, u64> { + fn calculate_budgets(&mut self, energy_service: &mut EnergyService) -> HashMap<Pid, u64> { let mut budgets = HashMap::new(); let process_energies = energy_service.all_process_energy_deltas(); @@ -35,12 +40,17 @@ impl BudgetPolicy for SimpleCappingPolicy { println!("{actual_energy} {energy_cap}"); let base_energy_per_process = energy_cap / process_energies.iter().filter(|(_, e)| **e > 0f64).count() as f64; - let ratio = energy_cap / actual_energy; + let ratio = energy_cap / actual_energy * self.last_ratio; + self.last_ratio = ratio.clamp(0.001, 100.0); for (pid, energy) in process_energies { let budget = budgets.entry(pid).or_insert(0); - *budget = (*budget + (ratio * base_energy_per_process - energy as f64) as u64) + *budget = (*budget + ((ratio * base_energy_per_process - energy) * 1000.) as u64) + //.min((ratio * base_energy_per_process * MAX_BUDGET_FACTOR * 1000.) as u64); .min(MAX_BUDGET); + if energy != 0.0 { + println!("budget: {budget} energy: {energy} ratio: {ratio} base: {base_energy_per_process}"); + } } // Simple proportional distribution if over cap diff --git a/src/freq.rs b/src/freq.rs index 2cfcf75..aa575f9 100644 --- a/src/freq.rs +++ b/src/freq.rs @@ -8,10 +8,33 @@ use std::time::Duration; pub type FrequencyKHZ = u32; +pub enum Governor { + Conservative, + Ondemand, + Userspace, + Powersave, + Performance, + Schedutil, +} + +impl Governor { + fn to_sysfs_string(&self) -> &str { + match self { + Governor::Conservative => "conservative", + Governor::Performance => "performance", + Governor::Schedutil => "schedutil", + Governor::Powersave => "powersave", + Governor::Userspace => "userspace", + Governor::Ondemand => "ondemand", + } + } +} + pub enum Request { GetPossibleCPUFrequencyRange, GetPolicyCPUFrequency, GetCurrentFrequencies, + SetGovernorForCore(u32, Governor), SetFrequencyRangeAllCores(RangeInclusive<FrequencyKHZ>), SetFrequencyRangeForCore(u32, RangeInclusive<FrequencyKHZ>), SetTargetFrequencyForCore(u32, FrequencyKHZ), @@ -204,8 +227,10 @@ impl SysFSFrequencyService { self.cpu_descriptors = CPUDescriptors::new_range(&self.cpus).unwrap(); let ranges = self.get_freq_limits().unwrap(); self.frequency_ranges = ranges; - if self.switch_governor("conservative").is_err() { - println!("failed to set governor to conservative"); + for cpu in self.cpus.clone() { + if self.switch_governor(cpu, Governor::Conservative).is_err() { + println!("failed to set governor to conservative"); + } } loop { @@ -232,6 +257,9 @@ impl SysFSFrequencyService { *self.cpu_current_frequencies.write().unwrap() = self.get_current_frequencies()?; } + Request::SetGovernorForCore(cpu, governor) => { + self.switch_governor(cpu, governor)?; + } Request::SetFrequencyRangeAllCores(frequency) => { self.set_frequency_range_all_cores(&frequency)?; } @@ -267,27 +295,26 @@ impl SysFSFrequencyService { Ok(ranges) } - fn switch_governor(&self, governor: &str) -> io::Result<()> { + fn switch_governor(&self, cpu: u32, governor: Governor) -> io::Result<()> { + let governor = governor.to_sysfs_string(); let available_governor = fs::read_to_string("/sys/devices/system/cpu/cpu0/cpufreq/scaling_available_governors")?; if !available_governor.contains(governor) { io::Error::new(io::ErrorKind::NotFound, governor); } - for cpu in self.cpus.clone() { - let current_governor = fs::read_to_string(format!( - "/sys/devices/system/cpu/cpu{}/cpufreq/scaling_governor", - cpu - ))?; - if current_governor != governor { - fs::write( - format!( - "/sys/devices/system/cpu/cpu{}/cpufreq/scaling_governor", - cpu - ), - governor, - )?; - } + let current_governor = fs::read_to_string(format!( + "/sys/devices/system/cpu/cpu{}/cpufreq/scaling_governor", + cpu + ))?; + if current_governor != governor { + fs::write( + format!( + "/sys/devices/system/cpu/cpu{}/cpufreq/scaling_governor", + cpu + ), + governor, + )?; } Ok(()) } diff --git a/src/scheduler.rs b/src/scheduler.rs index bcc3134..f99d2ec 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -1,6 +1,6 @@ use crate::bpf::*; use crate::energy::{self, Request as EnergyRequest, TaskInfo}; -use crate::freq::{self, FrequencyKHZ, Request as FrequencyRequest}; +use crate::freq::{self, FrequencyKHZ, Governor, Request as FrequencyRequest}; use crate::core_selector::{CoreSelector, RoundRobinSelector}; use anyhow::Result; @@ -29,7 +29,7 @@ pub struct Scheduler<'a> { //TODO: also consider Pids of children own_pid: Pid, p_cores: Range<i32>, - e_cores: Range<i32>, + e_cores: Option<Range<i32>>, topology: Topology, to_remove: Vec<Pid>, e_core_selector: Box<dyn CoreSelector>, @@ -84,8 +84,23 @@ impl<'a> Scheduler<'a> { let p_cores = *p_core_ids.first().unwrap_or(&0)..(*p_core_ids.last().unwrap_or(&-1) + 1); let all_cores = 0..((e_cores.len() + p_cores.len()) as u32); + let e_cores = if !e_cores.is_empty() { + Some(e_cores) + } else { + None + }; + let p_core_selector = Box::new(RoundRobinSelector::new(&p_cores)); - let e_core_selector = Box::new(RoundRobinSelector::new(&e_cores)); + let e_core_selector = if let Some(e_cores) = &e_cores { + // reserve the last e core as garbage core + Box::new(RoundRobinSelector::new( + &(e_cores.start..e_cores.end.saturating_sub(2)), + )) + } else { + // fallback on systems without e cores + Box::new(RoundRobinSelector::new(&(0..1))) + }; + let to_remove = Vec::with_capacity(1000); let frequency_sender = freq::start_frequency_service( @@ -135,23 +150,17 @@ impl<'a> Scheduler<'a> { }) } - fn try_set_up_garbage_cpu(&self, cpu: u32) -> Result<bool, TrySendError<FrequencyRequest>> { - if self.shared_cpu_frequency_ranges.read().unwrap().len() <= cpu as usize { - // We wait until shared_cpu_frequency_ranges has been initialized - return Ok(false); + fn try_set_up_garbage_cpu(&self) -> Result<bool, TrySendError<FrequencyRequest>> { + if let Some(e_cores) = &self.e_cores { + self.frequency_sender + .try_send(FrequencyRequest::SetGovernorForCore( + e_cores.end.saturating_sub(1) as u32, + Governor::Powersave, + ))?; + Ok(true) + } else { + Ok(false) } - let target = self.shared_cpu_frequency_ranges.read().unwrap()[cpu as usize] - .clone() - .min() - .unwrap(); - self.frequency_sender - .try_send(FrequencyRequest::SetTargetFrequencyForCore(cpu, target))?; - self.frequency_sender - .try_send(FrequencyRequest::SetFrequencyRangeForCore( - cpu, - target..=(target), - ))?; - Ok(true) } fn consume_all_tasks(&mut self) { @@ -165,7 +174,11 @@ impl<'a> Scheduler<'a> { // Check if we've seen this task before match self.managed_tasks.entry(task.pid) { std::collections::hash_map::Entry::Vacant(e) => { - let is_e_core = self.e_cores.contains(&task.cpu); + let is_e_core = self + .e_cores + .as_ref() + .map(|e_cores| e_cores.contains(&task.cpu)) + .unwrap_or(false); // New task - register it with the energy service let task_info = self.empty_task_infos.recv().unwrap(); task_info.set_cpu(task.cpu); @@ -200,13 +213,18 @@ impl<'a> Scheduler<'a> { if cpu >= 0 { dispatched_task.cpu = cpu; } else { - dispatched_task.cpu = self.p_core_selector.next_core(task.cpu); - //dispatched_task.flags |= RL_CPU_ANY as u64; + //dispatched_task.cpu = self.p_core_selector.next_core(task.cpu); + dispatched_task.flags |= RL_CPU_ANY as u64; } - if self.e_cores.contains(&dispatched_task.cpu) { - dispatched_task.cpu = self.p_core_selector.next_core(task.cpu); - } + // if self + // .e_cores + // .as_ref() + // .map(|e_cores| e_cores.contains(&dispatched_task.cpu)) + // .unwrap_or(false) + // { + // dispatched_task.cpu = self.p_core_selector.next_core(task.cpu); + // } if task.pid == self.own_pid { dispatched_task.slice_ns = SLICE_US * 1000; @@ -219,7 +237,12 @@ impl<'a> Scheduler<'a> { panic!(); } - let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu); + let running_on_e_core = self + .e_cores + .as_ref() + .map(|e_cores| e_cores.contains(&dispatched_task.cpu)) + .unwrap_or(false); + if let Some(entry) = self.managed_tasks.get_mut(&task.pid) { entry.set_cpu(dispatched_task.cpu); entry.set_running_on_e_core(running_on_e_core); @@ -247,7 +270,12 @@ impl<'a> Scheduler<'a> { eprintln!("e core scheduler set cpu to -1"); } - let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu); + let running_on_e_core = self + .e_cores + .as_ref() + .map(|e_cores| e_cores.contains(&dispatched_task.cpu)) + .unwrap_or(false); + if let Some(entry) = self.managed_tasks.get_mut(&task.pid) { entry.set_cpu(dispatched_task.cpu); entry.set_running_on_e_core(running_on_e_core); @@ -299,15 +327,9 @@ impl<'a> Scheduler<'a> { } pub fn run(&mut self) -> Result<UserExitInfo> { + self.try_set_up_garbage_cpu()?; let mut i = 0; - // let mut created_garbage_core = false; while !self.bpf.exited() { - // This is how a garbage core could be created - // The core should also be excluded from the e core scheduler - //if !created_garbage_core { - // created_garbage_core = - // self.try_set_up_garbage_cpu(self.e_cores.clone().max().unwrap_or(0) as u32)?; - //} i += 1; self.dispatch_tasks(); if i % 100 == 0 { |