summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLennard Kittner <lennardkittner@icloud.com>2025-04-15 16:45:11 +0200
committerLennard Kittner <lennardkittner@icloud.com>2025-04-15 16:45:11 +0200
commitbe9d8527821e8999799930e922d79afd2ef998e9 (patch)
treea6de07b413202543eb28ad1dbb403be594307e2c
parent1e05cd3bf6896ad4da7420c6914c1381714cdf43 (diff)
Change CLI args
Add p core selector Fix socket argument parsing Better budget calculation
-rw-r--r--src/core_selector.rs31
-rw-r--r--src/energy.rs34
-rw-r--r--src/energy/budget.rs58
-rw-r--r--src/energy/trackers/perf.rs2
-rw-r--r--src/main.rs22
-rw-r--r--src/scheduler.rs29
-rw-r--r--src/socket.rs6
7 files changed, 116 insertions, 66 deletions
diff --git a/src/core_selector.rs b/src/core_selector.rs
new file mode 100644
index 0000000..f70b252
--- /dev/null
+++ b/src/core_selector.rs
@@ -0,0 +1,31 @@
+use std::ops::Range;
+
+pub trait CoreSelector {
+ fn next_core(&mut self, previous_cpu: i32) -> i32;
+}
+
+pub struct RoundRobinSelector {
+ offset: u32,
+ num_cores: u32,
+ last_used: u32,
+}
+
+impl RoundRobinSelector {
+ pub fn new(cores: &Range<i32>) -> RoundRobinSelector {
+ Self {
+ offset: cores.start as u32,
+ num_cores: cores.len() as u32,
+ last_used: 0,
+ }
+ }
+}
+
+impl CoreSelector for RoundRobinSelector {
+ fn next_core(&mut self, previous_cpu: i32) -> i32 {
+ if (self.offset..(self.offset + self.num_cores)).contains(&(previous_cpu as u32)) {
+ return previous_cpu;
+ }
+ self.last_used += 1;
+ (self.offset + (self.last_used % self.num_cores.max(1))) as i32
+ }
+}
diff --git a/src/energy.rs b/src/energy.rs
index 21283b8..b9cfced 100644
--- a/src/energy.rs
+++ b/src/energy.rs
@@ -95,6 +95,7 @@ impl Default for TaskInfo {
#[derive(Clone)]
pub struct ProcessInfo {
pub energy: f64,
+ pub energy_delta: f64,
pub tree_energy: f64,
pub last_update: std::time::Instant,
pub parent: Pid,
@@ -114,6 +115,8 @@ pub struct EnergyService {
shared_policy_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>,
shared_cpu_current_frequencies: Arc<RwLock<Vec<FrequencyKHZ>>>,
rapl_offset: f64,
+ last_energy_diff: f64,
+ last_time_between_measurements: Duration,
old_rapl: f64,
system_energy: f64,
bias: f64,
@@ -144,6 +147,8 @@ impl EnergyService {
shared_policy_frequency_ranges,
shared_cpu_current_frequencies,
rapl_offset: rapl::read_package_energy().unwrap(),
+ last_energy_diff: 0f64,
+ last_time_between_measurements: Duration::new(0, 0),
old_rapl: 0.,
system_energy: 0.,
bias: 1.,
@@ -215,6 +220,7 @@ impl EnergyService {
pid,
ProcessInfo {
energy: 0.,
+ energy_delta: 0.,
tree_energy: 0.,
last_update: std::time::Instant::now(),
parent,
@@ -255,6 +261,7 @@ impl EnergyService {
continue;
}
if let Some(energy) = self.estimator.read_consumption(*pid as u64) {
+ info.energy_delta = energy * self.bias;
info.energy += energy * self.bias;
info.tree_energy += energy * self.bias;
self.estimator.update_information(
@@ -272,13 +279,16 @@ impl EnergyService {
}
}
let elapsed = self.last_measurement.elapsed();
+ self.last_time_between_measurements = elapsed;
self.last_measurement = Instant::now();
+ let rapl = rapl::read_package_energy().unwrap() - self.rapl_offset;
+ let rapl_diff = rapl - self.old_rapl;
+ self.last_energy_diff = rapl_diff;
+ self.old_rapl = rapl;
+ let power_comsumption_watt = rapl_diff / elapsed.as_secs_f64();
+ let idle_consumption = elapsed.as_secs_f64() * IDLE_CONSUMPTION_W;
if let Some(init) = self.process_info.write().unwrap().get_mut(&1) {
- let rapl = rapl::read_package_energy().unwrap() - self.rapl_offset;
- let rapl_diff = rapl - self.old_rapl;
- let idle_consumption = elapsed.as_secs_f64() * IDLE_CONSUMPTION_W;
let est_diff = init.tree_energy - old_energy + idle_consumption;
- self.old_rapl = rapl;
// let offset_bias = (rapl / (init.tree_energy + idle_consumption)).clamp(0.1, 2.);
let current_bias = if init.tree_energy - old_energy > idle_consumption * 0.5 {
(rapl_diff / est_diff).clamp(0.1, 2.)
@@ -291,8 +301,8 @@ impl EnergyService {
.clamp(0.1, 5.);
self.system_energy += est_diff;
println!(
- "Energy estimation: {:.1} rapl: {:.1}, est diff: {:.1} rapl diff: {:.1}, bias: {:.1}",
- self.system_energy, rapl, est_diff, rapl_diff, self.bias,
+ "Energy estimation: {:.1} rapl: {:.1}, est diff: {:.1} rapl diff: {:.1}, bias: {:.1}, power consumption: {:.1}",
+ self.system_energy, rapl, est_diff, rapl_diff, self.bias, power_comsumption_watt,
);
}
}
@@ -336,19 +346,18 @@ impl EnergyService {
.map(|info| info.energy)
}
- pub fn all_process_energies(&self) -> HashMap<Pid, f64> {
+ pub fn all_process_energy_deltas(&self) -> HashMap<Pid, f64> {
self.process_info
.read()
.unwrap()
.iter()
- .map(|(&key, info)| (key, info.energy))
+ .map(|(&key, info)| (key, info.energy_delta))
.collect()
}
}
pub fn start_energy_service(
use_mocking: bool,
- power_cap: u64,
shared_cpu_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>,
shared_policy_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>,
shared_cpu_current_frequencies: Arc<RwLock<Vec<FrequencyKHZ>>>,
@@ -363,14 +372,16 @@ pub fn start_energy_service(
Box::new(KernelDriver::default())
};
+ let process_info = Arc::new(RwLock::new(HashMap::new()));
+
+ let power_cap = socket::start_logging_socket_service("/tmp/pm-sched", process_info.clone())?;
+
// Create budget policy
let budget_policy = Box::new(budget::SimpleCappingPolicy::new(power_cap));
// shouldn't be a problem because we are privileged
// if PackageEnergy::check_paranoid().unwrap_or(3) > 0 {}
- let process_info = Arc::new(RwLock::new(HashMap::new()));
-
// Create and start the energy service
let service = EnergyService::new(
estimator,
@@ -384,7 +395,6 @@ pub fn start_energy_service(
);
service.run();
- let budget = socket::start_logging_socket_service("/tmp/pm-sched", process_info.clone())?;
Ok(request_sender)
}
diff --git a/src/energy/budget.rs b/src/energy/budget.rs
index 6154187..3edce30 100644
--- a/src/energy/budget.rs
+++ b/src/energy/budget.rs
@@ -1,19 +1,22 @@
-use std::collections::HashMap;
+use std::{collections::HashMap, sync::atomic::AtomicU32};
use crate::energy::EnergyService;
+use std::sync::Arc;
type Pid = i32;
+const MAX_BUDGET: u64 = 100;
+
pub trait BudgetPolicy: Send + 'static {
fn calculate_budgets(&self, energy_service: &mut EnergyService) -> HashMap<Pid, u64>;
}
pub struct SimpleCappingPolicy {
- power_cap: u64,
+ power_cap: Arc<AtomicU32>,
}
impl SimpleCappingPolicy {
- pub fn new(power_cap: u64) -> Self {
+ pub fn new(power_cap: Arc<AtomicU32>) -> Self {
Self { power_cap }
}
}
@@ -21,32 +24,43 @@ impl SimpleCappingPolicy {
impl BudgetPolicy for SimpleCappingPolicy {
fn calculate_budgets(&self, energy_service: &mut EnergyService) -> HashMap<Pid, u64> {
let mut budgets = HashMap::new();
- let process_energies = energy_service.all_process_energies();
+ let process_energies = energy_service.all_process_energy_deltas();
// Total energy consumption across all processes
//let total_energy: u64 = process_energies.values().sum();
- let actual_energy =
- energy_service.package_energy() / energy_service.update_interval.as_secs_f64();
+ let actual_energy = energy_service.last_energy_diff;
+ let energy_cap = self.power_cap.load(std::sync::atomic::Ordering::Relaxed) as f64
+ * energy_service.last_time_between_measurements.as_secs_f64();
+ println!("{actual_energy} {energy_cap}");
+ let base_energy_per_process =
+ energy_cap / process_energies.iter().filter(|(_, e)| **e > 0f64).count() as f64;
+ let ratio = energy_cap / actual_energy;
- // Simple proportional distribution if over cap
- if actual_energy > self.power_cap as f64 {
- let ratio = self.power_cap as f64 / actual_energy;
-
- for (&pid, &energy) in &process_energies {
- // Calculate a scaled budget based on the ratio
- // Higher energy consumers get proportionally reduced budgets
- let scaling_factor = 1.0 - ((energy as f64 / actual_energy) * (1.0 - ratio));
- let budget = (u64::MAX as f64 * scaling_factor) as u64;
- budgets.insert(pid, budget);
- }
- } else {
- // Under power cap, assign maximum budget to all
- for pid in energy_service.active_processes() {
- budgets.insert(*pid, u64::MAX);
- }
+ for (pid, energy) in process_energies {
+ let budget = budgets.entry(pid).or_insert(0);
+ *budget = (*budget + (ratio * base_energy_per_process - energy as f64) as u64)
+ .min(MAX_BUDGET);
}
+ // Simple proportional distribution if over cap
+ //if actual_energy > energy_cap {
+ // let ratio = energy_cap / actual_energy;
+ //
+ // for (&pid, &energy) in &process_energies {
+ // // Calculate a scaled budget based on the ratio
+ // // Higher energy consumers get proportionally reduced budgets
+ // let scaling_factor = 1.0 - ((energy as f64 / actual_energy) * (1.0 - ratio));
+ // let budget = (u64::MAX as f64 * scaling_factor) as u64;
+ // budgets.insert(pid, budget);
+ // }
+ // } else {
+ // // Under power cap, assign maximum budget to all
+ // for pid in energy_service.active_processes() {
+ // budgets.insert(*pid, u64::MAX);
+ // }
+ // }
+
budgets
}
}
diff --git a/src/energy/trackers/perf.rs b/src/energy/trackers/perf.rs
index 9262677..7223b12 100644
--- a/src/energy/trackers/perf.rs
+++ b/src/energy/trackers/perf.rs
@@ -161,7 +161,7 @@ impl Estimator for PerfEstimator {
let time_running_ns = counts.time_running();
if time_running_ns - counters.old_time == 0 || counts.iter().next().unwrap().1 == &0 {
- println!("The counters are zero although the task has been scheduled!!");
+ //println!("The counters are zero although the task has been scheduled!!");
return None;
}
let correction_factor = 10_000_000. / (time_running_ns - counters.old_time) as f64;
diff --git a/src/main.rs b/src/main.rs
index 9f3528c..d5a3dcd 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -4,7 +4,7 @@ pub use bpf_skel::*;
pub mod bpf_intf;
mod benchmark;
-mod e_core_selector;
+mod core_selector;
mod energy;
mod freq;
mod model;
@@ -26,10 +26,10 @@ type Pid = i32;
fn main() -> Result<()> {
let matches = Command::new("Energy User Space Scheduler")
.arg(
- Arg::new("mock")
- .short('m')
- .long("mock")
- .help("Use this flag if to activate kernel module mocking")
+ Arg::new("perf")
+ .short('p')
+ .long("perf")
+ .help("Use this flag to switch between the kernel module and perf")
.action(ArgAction::SetTrue)
.required(false),
)
@@ -41,13 +41,6 @@ fn main() -> Result<()> {
.required(false)
.value_name("mode"),
)
- .arg(
- Arg::new("power_cap")
- .long("energy_cap")
- .help("Set a power cap for the processor")
- .required(false)
- .value_name("power in watts"),
- )
.get_matches();
let device = Default::default();
@@ -65,8 +58,7 @@ fn main() -> Result<()> {
println!("energy: {energy}");
// panic!();
- let power_cap = *matches.get_one::<u64>("power_cap").unwrap_or(&u64::MAX);
- let use_mocking = matches.get_flag("mock");
+ let use_perf = matches.get_flag("perf");
let benchmark = matches.get_one::<String>("benchmark");
// Initialize and load the scheduler.
@@ -79,7 +71,7 @@ fn main() -> Result<()> {
return Ok(());
}
loop {
- let mut sched = Scheduler::init(&mut open_object, use_mocking, power_cap)?;
+ let mut sched = Scheduler::init(&mut open_object, use_perf)?;
if !sched.run()?.should_restart() {
break;
}
diff --git a/src/scheduler.rs b/src/scheduler.rs
index c34e151..bcc3134 100644
--- a/src/scheduler.rs
+++ b/src/scheduler.rs
@@ -2,7 +2,7 @@ use crate::bpf::*;
use crate::energy::{self, Request as EnergyRequest, TaskInfo};
use crate::freq::{self, FrequencyKHZ, Request as FrequencyRequest};
-use crate::e_core_selector::{ECoreSelector, RoundRobinSelector};
+use crate::core_selector::{CoreSelector, RoundRobinSelector};
use anyhow::Result;
use libbpf_rs::OpenObject;
use scx_utils::{Topology, UserExitInfo};
@@ -10,10 +10,10 @@ use scx_utils::{Topology, UserExitInfo};
use std::collections::{HashMap, VecDeque};
use std::mem::MaybeUninit;
use std::ops::{Range, RangeInclusive};
-use std::process;
use std::sync::mpsc::TrySendError;
use std::sync::{mpsc, Arc, RwLock};
use std::time::Duration;
+use std::{process, usize};
use crate::Pid;
@@ -32,7 +32,8 @@ pub struct Scheduler<'a> {
e_cores: Range<i32>,
topology: Topology,
to_remove: Vec<Pid>,
- e_core_selector: Box<dyn ECoreSelector>,
+ e_core_selector: Box<dyn CoreSelector>,
+ p_core_selector: Box<dyn CoreSelector>,
energy_sender: mpsc::SyncSender<EnergyRequest>,
empty_task_infos: mpsc::Receiver<Arc<TaskInfo>>,
frequency_sender: mpsc::SyncSender<FrequencyRequest>,
@@ -42,11 +43,7 @@ pub struct Scheduler<'a> {
}
impl<'a> Scheduler<'a> {
- pub fn init(
- open_object: &'a mut MaybeUninit<OpenObject>,
- use_mocking: bool,
- power_cap: u64,
- ) -> Result<Self> {
+ pub fn init(open_object: &'a mut MaybeUninit<OpenObject>, use_mocking: bool) -> Result<Self> {
println!("Initializing energy-aware scheduler");
let shared_cpu_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>> =
@@ -59,7 +56,6 @@ impl<'a> Scheduler<'a> {
// Start energy tracking service
let energy_sender = energy::start_energy_service(
use_mocking,
- power_cap,
shared_cpu_frequency_ranges.clone(),
shared_policy_frequency_ranges.clone(),
shared_cpu_current_frequencies.clone(),
@@ -88,7 +84,8 @@ impl<'a> Scheduler<'a> {
let p_cores = *p_core_ids.first().unwrap_or(&0)..(*p_core_ids.last().unwrap_or(&-1) + 1);
let all_cores = 0..((e_cores.len() + p_cores.len()) as u32);
- let selector = Box::new(RoundRobinSelector::new(&e_cores));
+ let p_core_selector = Box::new(RoundRobinSelector::new(&p_cores));
+ let e_core_selector = Box::new(RoundRobinSelector::new(&e_cores));
let to_remove = Vec::with_capacity(1000);
let frequency_sender = freq::start_frequency_service(
@@ -127,7 +124,8 @@ impl<'a> Scheduler<'a> {
tasks_scheduled: 0,
e_cores,
topology,
- e_core_selector: selector,
+ e_core_selector,
+ p_core_selector,
energy_sender,
to_remove,
frequency_sender,
@@ -197,13 +195,17 @@ impl<'a> Scheduler<'a> {
self.tasks_scheduled += 1;
if let Some(task) = self.task_queue.pop_front() {
let mut dispatched_task = DispatchedTask::new(&task);
- //TODO: do we have to migrate tasks back from e cores?
let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0);
if cpu >= 0 {
dispatched_task.cpu = cpu;
} else {
- dispatched_task.flags |= RL_CPU_ANY as u64;
+ dispatched_task.cpu = self.p_core_selector.next_core(task.cpu);
+ //dispatched_task.flags |= RL_CPU_ANY as u64;
+ }
+
+ if self.e_cores.contains(&dispatched_task.cpu) {
+ dispatched_task.cpu = self.p_core_selector.next_core(task.cpu);
}
if task.pid == self.own_pid {
@@ -214,6 +216,7 @@ impl<'a> Scheduler<'a> {
if let Err(e) = self.bpf.dispatch_task(&dispatched_task) {
eprintln!("Failed to dispatch task: {}", e);
+ panic!();
}
let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu);
diff --git a/src/socket.rs b/src/socket.rs
index 2bb0dbb..d851eed 100644
--- a/src/socket.rs
+++ b/src/socket.rs
@@ -41,8 +41,8 @@ impl LoggingSocketService {
break;
}
let (command, args) = line.split_once(' ').unwrap_or((line.trim(), ""));
- let output = match dbg!(command) {
- "pid" => self.get_process(args.parse().unwrap_or_default()),
+ let output = match command {
+ "pid" => self.get_process(args.trim().parse().unwrap_or_default()),
"list" => self.list_processes(),
"limit" => self.set_power_limit(args),
_ => "Unrecognized command#".into(),
@@ -64,7 +64,7 @@ impl LoggingSocketService {
}
fn set_power_limit(&self, args: &str) -> String {
- if let Ok(power_limit) = args.parse() {
+ if let Ok(power_limit) = args.trim().parse() {
self.power_limit
.store(power_limit, std::sync::atomic::Ordering::Relaxed);
"#".into()