summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/energy.rs142
-rw-r--r--src/energy/budget.rs68
-rw-r--r--src/scheduler.rs11
-rw-r--r--src/socket.rs9
4 files changed, 110 insertions, 120 deletions
diff --git a/src/energy.rs b/src/energy.rs
index 0a49e07..9f218e7 100644
--- a/src/energy.rs
+++ b/src/energy.rs
@@ -5,6 +5,7 @@ mod trackers;
use crate::energy::estimator::Estimator;
use std::collections::{BTreeSet, HashMap};
+use std::sync::atomic::Ordering::Relaxed;
use std::sync::atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicU64};
use std::sync::{mpsc, Arc, RwLock};
use std::thread;
@@ -14,11 +15,12 @@ use crate::freq::FrequencyKHZ;
use crate::socket;
use crate::Pid;
-pub use budget::BudgetPolicy;
+use budget::SimpleCappingPolicy;
pub use trackers::{KernelDriver, PerfEstimator};
const IDLE_CONSUMPTION_W: f64 = 7.;
const UPDATE_INTERVAL: Duration = Duration::from_millis(10);
+const DEFAULT_POWER_DRAW_MW: u64 = 15_000;
pub enum Request {
NewTask(Pid, Arc<TaskInfo>),
@@ -26,22 +28,22 @@ pub enum Request {
}
pub struct TaskInfo {
pub cpu: AtomicI32,
- pub budget: AtomicU64,
+ pub budget: AtomicI64,
+ pub last_total_runtime: AtomicU64,
+ pub power_draw: AtomicU64,
pub running_on_e_core: AtomicBool,
pub last_scheduled: AtomicI64,
}
impl TaskInfo {
pub fn read_cpu(&self) -> i32 {
- self.cpu.load(std::sync::atomic::Ordering::Relaxed)
+ self.cpu.load(Relaxed)
}
- pub fn read_budget(&self) -> u64 {
- self.budget.load(std::sync::atomic::Ordering::Relaxed)
+ pub fn read_budget(&self) -> i64 {
+ self.budget.load(Relaxed)
}
pub fn read_time_since_last_schedule(&self) -> Option<Duration> {
- let old_time = self
- .last_scheduled
- .load(std::sync::atomic::Ordering::Relaxed);
+ let old_time = self.last_scheduled.load(Relaxed);
if old_time == -1 {
None
} else {
@@ -50,29 +52,42 @@ impl TaskInfo {
}
}
pub fn read_time_since_last_schedule_raw(&self) -> i64 {
- self.last_scheduled
- .load(std::sync::atomic::Ordering::Relaxed)
+ self.last_scheduled.load(Relaxed)
}
pub fn set_cpu(&self, cpu: i32) {
- self.cpu.store(cpu, std::sync::atomic::Ordering::Relaxed);
+ self.cpu.store(cpu, Relaxed);
}
pub fn set_last_scheduled_to_now(&self) {
- self.last_scheduled.store(
- chrono::Utc::now().timestamp_micros(),
- std::sync::atomic::Ordering::Relaxed,
- );
+ self.last_scheduled
+ .store(chrono::Utc::now().timestamp_micros(), Relaxed);
}
pub fn set_last_scheduled_raw(&self, last_scheduled: i64) {
- self.last_scheduled
- .store(last_scheduled, std::sync::atomic::Ordering::Relaxed);
+ self.last_scheduled.store(last_scheduled, Relaxed);
+ }
+ pub fn set_budget(&self, budget: i64) {
+ self.budget.store(budget, Relaxed);
}
- pub fn set_budget(&self, budget: u64) {
- self.budget
- .store(budget, std::sync::atomic::Ordering::Relaxed);
+ pub fn add_budget(&self, budget: i64) {
+ self.budget.fetch_add(budget, Relaxed);
+ self.budget.fetch_min(budget::MAX_BUDGET_MJ, Relaxed);
}
pub fn set_running_on_e_core(&self, running_on_e_core: bool) {
- self.running_on_e_core
- .store(running_on_e_core, std::sync::atomic::Ordering::Relaxed);
+ self.running_on_e_core.store(running_on_e_core, Relaxed);
+ }
+ pub fn update_runtime(&self, new_running_time: u64) -> u64 {
+ new_running_time - self.last_total_runtime.swap(new_running_time, Relaxed)
+ }
+ pub fn update_budget(&self, running_time: Duration) -> i64 {
+ let power_draw = self.power_draw.load(Relaxed);
+ self.budget.fetch_sub(
+ (power_draw as f64 * running_time.as_secs_f64()) as i64,
+ Relaxed,
+ );
+ self.budget.fetch_max(-budget::MAX_BUDGET_MJ, Relaxed)
+ }
+ // Set powerdraw in milli watt
+ pub fn set_power_draw(&self, power_draw_mw: u64) {
+ self.power_draw.store(power_draw_mw, Relaxed);
}
}
@@ -80,18 +95,20 @@ impl Default for TaskInfo {
fn default() -> Self {
Self {
cpu: Default::default(),
- budget: AtomicU64::new(u64::MAX),
+ budget: AtomicI64::new(budget::MAX_BUDGET_MJ),
+ last_total_runtime: Default::default(),
running_on_e_core: Default::default(),
last_scheduled: AtomicI64::new(-1),
+ power_draw: AtomicU64::new(DEFAULT_POWER_DRAW_MW),
}
}
}
#[derive(Clone)]
pub struct ProcessInfo {
- pub energy: f64,
- pub energy_delta: f64,
- pub tree_energy: f64,
+ pub energy_j: f64,
+ pub energy_delta_j: f64,
+ pub tree_energy_j: f64,
pub last_update: std::time::Instant,
pub parent: Pid,
pub task_info: Arc<TaskInfo>,
@@ -99,7 +116,7 @@ pub struct ProcessInfo {
pub struct EnergyService {
estimator: Box<dyn Estimator>,
- budget_policy: Option<Box<dyn BudgetPolicy>>,
+ budget_policy: Option<SimpleCappingPolicy>,
// contains the same data as the keys of process_info but having this reduces contention and
// avoids unnecessary clone
active_processes: BTreeSet<Pid>,
@@ -107,9 +124,9 @@ pub struct EnergyService {
request_receiver: mpsc::Receiver<Request>,
update_interval: Duration,
rapl_offset: f64,
- last_energy_diff: f64,
+ last_energy_diff_j: f64,
last_time_between_measurements: Duration,
- old_rapl: f64,
+ old_rapl_j: f64,
system_energy: f64,
bias: f64,
graveyard: Vec<i32>,
@@ -120,7 +137,7 @@ impl EnergyService {
#[allow(clippy::too_many_arguments)]
pub fn new(
estimator: Box<dyn Estimator>,
- budget_policy: Box<dyn BudgetPolicy>,
+ budget_policy: SimpleCappingPolicy,
process_info: Arc<RwLock<HashMap<Pid, ProcessInfo>>>,
request_receiver: mpsc::Receiver<Request>,
update_interval: Duration,
@@ -133,9 +150,9 @@ impl EnergyService {
request_receiver,
update_interval,
rapl_offset: rapl::read_package_energy().unwrap(),
- last_energy_diff: 0f64,
+ last_energy_diff_j: 0f64,
last_time_between_measurements: Duration::new(0, 0),
- old_rapl: 0.,
+ old_rapl_j: 0.,
system_energy: 0.,
bias: 1.,
graveyard: Vec::with_capacity(100),
@@ -156,10 +173,9 @@ impl EnergyService {
self.update_measurements();
self.clear_graveyeard();
-
- // Calculate and update budgets
- self.update_budgets();
}
+ // Calculate and update budgets
+ self.update_budgets();
// Sleep for update interval
thread::sleep(self.update_interval);
@@ -213,9 +229,9 @@ impl EnergyService {
self.process_info.write().unwrap().insert(
pid,
ProcessInfo {
- energy: 0.,
- energy_delta: 0.,
- tree_energy: 0.,
+ energy_j: 0.,
+ energy_delta_j: 0.,
+ tree_energy_j: 0.,
last_update: std::time::Instant::now(),
parent,
task_info: task_info.clone(),
@@ -241,8 +257,9 @@ impl EnergyService {
.read()
.unwrap()
.get(&1)
- .map(|info| info.tree_energy)
+ .map(|info| info.tree_energy_j)
.unwrap_or(0.);
+ let elapsed = self.last_measurement.elapsed();
for pid in &self.active_processes {
let mut process_info = self.process_info.write().unwrap();
if let Some(info) = process_info.get_mut(pid) {
@@ -254,34 +271,36 @@ impl EnergyService {
{
continue;
}
- if let Some(energy) = self.estimator.read_consumption(*pid as u64) {
- info.energy_delta = energy * self.bias;
- info.energy += energy * self.bias;
- info.tree_energy += energy * self.bias;
+ if let Some(energy_j) = self.estimator.read_consumption(*pid as u64) {
+ info.energy_delta_j = energy_j * self.bias;
+ info.task_info.set_power_draw(
+ (info.energy_delta_j * 1000. / elapsed.as_secs_f64()) as u64,
+ );
+ info.energy_j += energy_j * self.bias;
+ info.tree_energy_j += energy_j * self.bias;
self.estimator
.update_information(*pid as u64, info.task_info.read_cpu());
let mut parent = info.parent;
while let Some(info) = process_info.get_mut(&parent) {
- info.tree_energy += energy * self.bias;
+ info.tree_energy_j += energy_j * self.bias;
info.last_update = std::time::Instant::now();
parent = info.parent;
}
}
}
}
- let elapsed = self.last_measurement.elapsed();
self.last_time_between_measurements = elapsed;
self.last_measurement = Instant::now();
- let rapl = rapl::read_package_energy().unwrap() - self.rapl_offset;
- let rapl_diff = rapl - self.old_rapl;
- self.last_energy_diff = rapl_diff;
- self.old_rapl = rapl;
+ let rapl_j = rapl::read_package_energy().unwrap() - self.rapl_offset;
+ let rapl_diff_j = rapl_j - self.old_rapl_j;
+ self.last_energy_diff_j = rapl_diff_j;
+ self.old_rapl_j = rapl_j;
let idle_consumption = elapsed.as_secs_f64() * IDLE_CONSUMPTION_W;
if let Some(init) = self.process_info.write().unwrap().get_mut(&1) {
- let est_diff = init.tree_energy - old_energy + idle_consumption;
- let current_bias = if init.tree_energy - old_energy > idle_consumption * 0.5 {
- (rapl_diff / est_diff).clamp(0.1, 2.)
+ let est_diff = init.tree_energy_j - old_energy + idle_consumption;
+ let current_bias = if init.tree_energy_j - old_energy > idle_consumption * 0.5 {
+ (rapl_diff_j / est_diff).clamp(0.1, 2.)
} else {
1.
};
@@ -295,14 +314,12 @@ impl EnergyService {
fn update_budgets(&mut self) {
// We can't call self.budget_policy.calculate_budgets(self) directly because the first self borrows immutable and the self second borrows mutable
let mut policy = self.budget_policy.take().unwrap();
- let budgets = policy.calculate_budgets(self);
+ let budget_refill = policy.calculate_budgets(self);
self.budget_policy = Some(policy);
// Update the shared budgets map
- for (pid, budget) in budgets {
- if let Some(entry) = self.process_info.write().unwrap().get(&pid) {
- entry.task_info.set_budget(budget);
- }
+ for entry in self.process_info.write().unwrap().values_mut() {
+ entry.task_info.add_budget(budget_refill);
}
}
@@ -313,15 +330,6 @@ impl EnergyService {
self.process_info.write().unwrap().remove(&pid);
}
}
-
- pub fn all_process_energy_deltas(&self) -> HashMap<Pid, f64> {
- self.process_info
- .read()
- .unwrap()
- .iter()
- .map(|(&key, info)| (key, info.energy_delta))
- .collect()
- }
}
pub fn start_energy_service(
@@ -343,7 +351,7 @@ pub fn start_energy_service(
let power_cap = socket::start_logging_socket_service("/tmp/pm-sched", process_info.clone())?;
// Create budget policy
- let budget_policy = Box::new(budget::SimpleCappingPolicy::new(power_cap));
+ let budget_policy = budget::SimpleCappingPolicy::new(power_cap);
// shouldn't be a problem because we are privileged
// if PackageEnergy::check_paranoid().unwrap_or(3) > 0 {}
diff --git a/src/energy/budget.rs b/src/energy/budget.rs
index b049814..6356132 100644
--- a/src/energy/budget.rs
+++ b/src/energy/budget.rs
@@ -1,19 +1,17 @@
-use std::{collections::HashMap, sync::atomic::AtomicU32};
+use std::{sync::atomic::AtomicU32, time::Instant};
use crate::energy::EnergyService;
use std::sync::Arc;
-type Pid = i32;
+use super::rapl;
-const MAX_BUDGET: u64 = 30000;
-
-pub trait BudgetPolicy: Send + 'static {
- fn calculate_budgets(&mut self, energy_service: &mut EnergyService) -> HashMap<Pid, u64>;
-}
+pub const MAX_BUDGET_MJ: i64 = 30_000;
pub struct SimpleCappingPolicy {
power_cap: Arc<AtomicU32>,
last_ratio: f64,
+ last_energy: f64,
+ last_measurement: Instant,
}
impl SimpleCappingPolicy {
@@ -21,55 +19,27 @@ impl SimpleCappingPolicy {
Self {
power_cap,
last_ratio: 1.,
+ last_energy: rapl::read_package_energy().unwrap(),
+ last_measurement: Instant::now(),
}
}
}
-impl BudgetPolicy for SimpleCappingPolicy {
- fn calculate_budgets(&mut self, energy_service: &mut EnergyService) -> HashMap<Pid, u64> {
- let mut budgets = HashMap::new();
- let process_energies = energy_service.all_process_energy_deltas();
-
- // Total energy consumption across all processes
- //let total_energy: u64 = process_energies.values().sum();
+impl SimpleCappingPolicy {
+ pub fn calculate_budgets(&mut self, energy_service: &mut EnergyService) -> i64 {
+ let active_processes = energy_service.active_processes.len();
+ let rapl = rapl::read_package_energy().unwrap();
+ let actual_energy = rapl - self.last_energy;
+ self.last_energy = rapl;
- let actual_energy = energy_service.last_energy_diff;
let energy_cap = self.power_cap.load(std::sync::atomic::Ordering::Relaxed) as f64
- * energy_service.last_time_between_measurements.as_secs_f64();
- //println!("{actual_energy} {energy_cap}");
- let base_energy_per_process =
- energy_cap / process_energies.iter().filter(|(_, e)| **e > 0f64).count() as f64;
- let ratio = energy_cap / actual_energy * self.last_ratio;
- self.last_ratio = ratio.clamp(0.001, 100.0);
-
- for (pid, energy) in process_energies {
- let budget = budgets.entry(pid).or_insert(0);
- *budget = (*budget + ((ratio * base_energy_per_process - energy) * 1000.) as u64)
- //.min((ratio * base_energy_per_process * MAX_BUDGET_FACTOR * 1000.) as u64);
- .min(MAX_BUDGET);
- if energy != 0.0 {
- //println!("budget: {budget} energy: {energy} ratio: {ratio} base: {base_energy_per_process}");
- }
- }
+ * self.last_measurement.elapsed().as_secs_f64();
+ self.last_measurement = Instant::now();
- // Simple proportional distribution if over cap
- //if actual_energy > energy_cap {
- // let ratio = energy_cap / actual_energy;
- //
- // for (&pid, &energy) in &process_energies {
- // // Calculate a scaled budget based on the ratio
- // // Higher energy consumers get proportionally reduced budgets
- // let scaling_factor = 1.0 - ((energy as f64 / actual_energy) * (1.0 - ratio));
- // let budget = (u64::MAX as f64 * scaling_factor) as u64;
- // budgets.insert(pid, budget);
- // }
- // } else {
- // // Under power cap, assign maximum budget to all
- // for pid in energy_service.active_processes() {
- // budgets.insert(*pid, u64::MAX);
- // }
- // }
+ let base_energy_per_process = energy_cap / active_processes as f64;
+ let ratio = energy_cap / actual_energy * self.last_ratio;
+ self.last_ratio = ratio.clamp(0.001, 1000.0);
- budgets
+ (((ratio * base_energy_per_process) * 1000.) as i64).min(MAX_BUDGET_MJ)
}
}
diff --git a/src/scheduler.rs b/src/scheduler.rs
index 0832049..dfd879b 100644
--- a/src/scheduler.rs
+++ b/src/scheduler.rs
@@ -158,9 +158,11 @@ impl<'a> Scheduler<'a> {
while let Ok(Some(mut task)) = self.bpf.dequeue_task() {
// The scheduler itself has to be scheduled regardless of its energy usage
if task.pid == self.own_pid {
+ task.vtime = 10;
self.task_queue.push_front(task);
continue;
}
+ // TODO: consider adjusting vtime for all processes
// Check if we've seen this task before
match self.managed_tasks.entry(task.pid) {
@@ -182,8 +184,13 @@ impl<'a> Scheduler<'a> {
}
std::collections::hash_map::Entry::Occupied(e) => {
// Get current budget for this task
- match e.get().read_budget() {
- 0 => self.no_budget_task_queue.push_back(task),
+ let slice_ns = e.get().update_runtime(task.sum_exec_runtime);
+ let new_budget = e.get().update_budget(Duration::from_nanos(slice_ns));
+ match new_budget {
+ x if x < 0 => {
+ task.weight = 0;
+ self.no_budget_task_queue.push_back(task)
+ }
x if x < 1000 => {
task.weight = 0;
self.task_queue.push_back(task)
diff --git a/src/socket.rs b/src/socket.rs
index d851eed..ce3fbdc 100644
--- a/src/socket.rs
+++ b/src/socket.rs
@@ -57,7 +57,12 @@ impl LoggingSocketService {
let mut output = String::new();
use std::fmt::Write;
for (pid, info) in self.process_info.read().unwrap().iter() {
- writeln!(&mut output, "{pid},{},{}", info.energy, info.tree_energy).unwrap();
+ writeln!(
+ &mut output,
+ "{pid},{},{}",
+ info.energy_j, info.tree_energy_j
+ )
+ .unwrap();
}
writeln!(&mut output, "#",).unwrap();
output
@@ -77,7 +82,7 @@ impl LoggingSocketService {
if let Some(info) = self.process_info.read().unwrap().get(&pid) {
format!(
"pid: {pid} process: {}J process tree: {}J\n",
- info.energy, info.tree_energy
+ info.energy_j, info.tree_energy_j
)
} else {
format!("Unknown pid: {pid}\n")