summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLennard Kittner <lennardkittner@icloud.com>2025-03-31 17:28:43 +0200
committerLennard Kittner <lennardkittner@icloud.com>2025-03-31 17:28:43 +0200
commit5557ac073d86690986b4a73bdd97d8d5388fa4ce (patch)
treea3fae91e6da09ed7302f5ddbfe6700c581d68dbe /src
parent477d9f1190096f4215d1559aa62145460eff0238 (diff)
Add migration message and e core handling in the estimator
Diffstat (limited to 'src')
-rw-r--r--src/energy.rs45
-rw-r--r--src/energy/estimator.rs2
-rw-r--r--src/energy/trackers/kernel.rs2
-rw-r--r--src/energy/trackers/mock.rs2
-rw-r--r--src/energy/trackers/perf.rs89
-rw-r--r--src/model.rs21
-rw-r--r--src/scheduler.rs22
7 files changed, 131 insertions, 52 deletions
diff --git a/src/energy.rs b/src/energy.rs
index 91c77f7..94e82f4 100644
--- a/src/energy.rs
+++ b/src/energy.rs
@@ -6,7 +6,7 @@ mod trackers;
use crate::energy::estimator::Estimator;
use std::collections::{BTreeSet, HashMap};
use std::ops::RangeInclusive;
-use std::sync::atomic::{AtomicI32, AtomicU64};
+use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64};
use std::sync::{mpsc, Arc, RwLock};
use std::thread;
use std::time::Duration;
@@ -18,14 +18,19 @@ use crate::Pid;
pub use budget::BudgetPolicy;
pub use trackers::{KernelDriver, PerfEstimator};
-//TODO: add new command to update process_info or make process_info shared
pub enum Request {
- NewTask(Pid, Arc<TaskInfo>),
+ NewTask(Pid, TaskInfo),
RemoveTask(Pid),
+ MigrateTask {
+ pid: Pid,
+ new_cpu: i32,
+ is_e_core: bool,
+ },
}
pub struct TaskInfo {
pub cpu: AtomicI32,
pub budget: AtomicU64,
+ pub running_on_e_core: AtomicBool,
}
impl TaskInfo {
@@ -35,6 +40,9 @@ impl TaskInfo {
pub fn read_budget(&self) -> u64 {
self.budget.load(std::sync::atomic::Ordering::Relaxed)
}
+ pub fn is_running_on_e_core(&self) -> bool {
+ self.running_on_e_core.load(std::sync::atomic::Ordering::Relaxed)
+ }
pub fn set_cpu(&self, cpu: i32) {
self.cpu.store(cpu, std::sync::atomic::Ordering::Relaxed);
}
@@ -42,6 +50,10 @@ impl TaskInfo {
self.budget
.store(budget, std::sync::atomic::Ordering::Relaxed);
}
+ pub fn set_running_on_e_core(&self, running_on_e_core: bool) {
+ self.running_on_e_core
+ .store(running_on_e_core, std::sync::atomic::Ordering::Relaxed);
+ }
}
impl Default for TaskInfo {
@@ -49,6 +61,7 @@ impl Default for TaskInfo {
Self {
cpu: Default::default(),
budget: AtomicU64::new(u64::MAX),
+ running_on_e_core: Default::default()
}
}
}
@@ -136,9 +149,14 @@ impl EnergyService {
match request {
Request::NewTask(pid, task_info) => {
if self.process_info.read().unwrap().contains_key(&pid) {
+ self.handle_request(Request::MigrateTask {
+ pid,
+ new_cpu: cpu,
+ is_e_core,
+ });
return;
}
- self.estimator.start_trace(pid as u64);
+ self.estimator.start_trace(pid as u64, is_e_core);
let parent = (|| {
let process = procfs::process::Process::new(pid)?;
process.stat().map(|stat| stat.ppid)
@@ -168,6 +186,21 @@ impl EnergyService {
self.process_info.write().unwrap().remove(&pid);
self.process_info.write().unwrap().remove(&pid);
}
+ Request::MigrateTask {
+ pid,
+ new_cpu,
+ is_e_core,
+ } => {
+ if let Some(info) = self.process_info.write().unwrap().get_mut(&pid) {
+ info.cpu = new_cpu;
+ if info.running_on_e_core != is_e_core {
+ //TODO: maybe read consumption before removing?
+ self.estimator.stop_trace(pid as u64);
+ self.estimator.start_trace(pid as u64, is_e_core);
+ }
+ info.running_on_e_core = is_e_core;
+ }
+ }
}
}
@@ -177,8 +210,8 @@ impl EnergyService {
.read()
.unwrap()
.get(&1)
- .unwrap()
- .tree_energy;
+ .map(|info| info.tree_energy)
+ .unwrap_or(0.);
for pid in &self.active_processes {
let mut process_info = self.process_info.write().unwrap();
if let Some(info) = process_info.get_mut(&pid) {
diff --git a/src/energy/estimator.rs b/src/energy/estimator.rs
index 85cb0eb..7964b00 100644
--- a/src/energy/estimator.rs
+++ b/src/energy/estimator.rs
@@ -1,5 +1,5 @@
pub trait Estimator: Send + 'static {
- fn start_trace(&mut self, pid: u64);
+ fn start_trace(&mut self, pid: u64, running_on_e_core: bool);
fn stop_trace(&mut self, pid: u64);
fn read_consumption(&mut self, pid: u64, cpu: i32) -> Option<f64>;
}
diff --git a/src/energy/trackers/kernel.rs b/src/energy/trackers/kernel.rs
index 89a0d89..f7e0811 100644
--- a/src/energy/trackers/kernel.rs
+++ b/src/energy/trackers/kernel.rs
@@ -30,7 +30,7 @@ const STOP_TRACE: Ioctl<Write, &u64> = unsafe { PERF_MON.write(0x81) };
const READ_POWER: Ioctl<WriteRead, &u64> = unsafe { PERF_MON.write_read(0x82) };
impl Estimator for KernelDriver {
- fn start_trace(&mut self, pid: u64) {
+ fn start_trace(&mut self, pid: u64, running_on_e_core: bool) {
let _ = START_TRACE.ioctl(&mut self.file, &pid);
}
diff --git a/src/energy/trackers/mock.rs b/src/energy/trackers/mock.rs
index 597cab0..b2a504e 100644
--- a/src/energy/trackers/mock.rs
+++ b/src/energy/trackers/mock.rs
@@ -3,7 +3,7 @@ use crate::energy::estimator::Estimator;
pub struct MockEstimator;
impl Estimator for MockEstimator {
- fn start_trace(&mut self, _pid: u64) {}
+ fn start_trace(&mut self, _pid: u64, _running_on_e_core: bool) {}
fn stop_trace(&mut self, _pid: u64) {}
diff --git a/src/energy/trackers/perf.rs b/src/energy/trackers/perf.rs
index 71e87d8..8da2bf8 100644
--- a/src/energy/trackers/perf.rs
+++ b/src/energy/trackers/perf.rs
@@ -9,22 +9,25 @@ use perf_event::{
use crate::freq::FrequencyKHZ;
use std::sync::{Arc, RwLock};
-use crate::energy::{rapl, Estimator};
+use crate::energy::Estimator;
use crate::model::ArrayBackend;
pub struct PerfEstimator {
registry: HashMap<u64, Counters>,
- model: crate::model::Net<ArrayBackend>,
+ model_p: crate::model::Net<ArrayBackend>,
+ model_e: crate::model::Net<ArrayBackend>,
device: <ArrayBackend as burn::prelude::Backend>::Device,
shared_cpu_current_frequencies: Arc<RwLock<Vec<FrequencyKHZ>>>,
}
impl PerfEstimator {
pub fn new(shared_cpu_current_frequencies: Arc<RwLock<Vec<FrequencyKHZ>>>) -> Self {
- let model = crate::model::load_model();
+ let model_p = crate::model::load_model_p();
+ let model_e = crate::model::load_model_e();
Self {
registry: Default::default(),
- model,
+ model_p,
+ model_e,
device: Default::default(),
shared_cpu_current_frequencies,
}
@@ -36,9 +39,21 @@ struct Counters {
counters: Vec<Counter>,
old_time: u64,
old_total_energy: f64,
+ running_on_e_core: bool,
}
-static EVENT_TYPES: &[Event] = &[
+static EVENT_TYPES_P: &[Event] = &[
+ Event::Hardware(Hardware::BRANCH_INSTRUCTIONS),
+ Event::Hardware(Hardware::BRANCH_MISSES),
+ Event::Hardware(Hardware::CACHE_MISSES),
+ Event::Hardware(Hardware::CACHE_REFERENCES),
+ Event::Hardware(Hardware::CPU_CYCLES),
+ Event::Hardware(Hardware::INSTRUCTIONS),
+ Event::Hardware(Hardware::REF_CPU_CYCLES),
+ Event::Software(Software::TASK_CLOCK),
+];
+//TODO: use correct counter
+static EVENT_TYPES_E: &[Event] = &[
Event::Hardware(Hardware::BRANCH_INSTRUCTIONS),
Event::Hardware(Hardware::BRANCH_MISSES),
Event::Hardware(Hardware::CACHE_MISSES),
@@ -50,8 +65,7 @@ static EVENT_TYPES: &[Event] = &[
];
impl Estimator for PerfEstimator {
- fn start_trace(&mut self, pid: u64) {
- //let Ok(mut group) = Group::new_with_pid_and_cpu(-1, 0) else {
+ fn start_trace(&mut self, pid: u64, running_on_e_core: bool) {
let mut group = match Group::new_with_pid_and_cpu(pid as i32, -1) {
Ok(counters) => counters,
Err(e) => {
@@ -62,23 +76,21 @@ impl Estimator for PerfEstimator {
return;
}
};
- //let Ok(mut group) = Group::new_with_pid_and_cpu(pid as i32, -1) else {
- // eprintln!("Failed to create performance counter group for PID {}", pid);
- // return;
- //};
-
- let counters: Result<Vec<_>, _> = EVENT_TYPES
- .iter()
- .map(|kind| {
- Builder::new()
- .group(&mut group)
- .kind(kind.clone())
- .observe_pid(pid as i32)
- //.observe_pid(-1)
- //.one_cpu(0)
- .build()
- })
- .collect();
+
+ let counters: Result<Vec<_>, _> = if running_on_e_core {
+ EVENT_TYPES_E
+ } else {
+ EVENT_TYPES_P
+ }
+ .iter()
+ .map(|kind| {
+ Builder::new()
+ .group(&mut group)
+ .kind(kind.clone())
+ .observe_pid(pid as i32)
+ .build()
+ })
+ .collect();
let counters = match counters {
Ok(counters) => counters,
@@ -95,13 +107,17 @@ impl Estimator for PerfEstimator {
eprintln!("Failed to enable performance counters: {}", e);
return;
}
- group.reset();
+ if let Err(e) = group.reset() {
+ eprintln!("Failed to reset performance counters: {}", e);
+ return;
+ }
let counters = Counters {
counters,
group,
old_time: 0,
old_total_energy: 0.,
+ running_on_e_core,
};
self.registry.insert(pid, counters);
}
@@ -122,36 +138,33 @@ impl Estimator for PerfEstimator {
return None;
}
};
- //let task_clock = counts[&counters.counters[7]];
- let task_clock = counts[&counters.counters[7]];
+ let num_counter = counters.counters.len();
+ let task_clock = counts[&counters.counters[num_counter - 1]];
if task_clock == 0 {
return None;
}
- let current_time = std::time::Instant::now();
let time_running_ns = counts.time_running();
let correction_factor = 10_000_000. / (time_running_ns - counters.old_time) as f64;
counters.old_time = time_running_ns;
- //dbg!(time_running_ns);
- //dbg!(counters.old_time.elapsed().as_millis());
let mut values =
vec![(self.shared_cpu_current_frequencies.read().unwrap()[cpu as usize] / 1000) as f64];
- //let mut values = vec![crate::benchmark::read_cpu_frequency(0).unwrap()];
- for ty in counters.counters.iter().take(7) {
+ for ty in counters.counters.iter().take(num_counter - 1) {
let count: u64 = counts[&ty];
values.push((count as f64) * correction_factor);
}
- //dbg!(&values);
- let result = self
- .model
- .forward(Tensor::from_floats(&values.as_slice()[0..8], &self.device));
+
+ let result = if counters.running_on_e_core {
+ &self.model_e
+ } else {
+ &self.model_p
+ }
+ .forward(Tensor::from_floats(&values.as_slice()[0..], &self.device));
let energy = result.into_scalar() as f64;
- //dbg!(energy, correction_factor);
counters.old_total_energy += energy / correction_factor;
counters.group.reset().unwrap();
- //dbg!(counters.old_total_energy);
Some(energy / correction_factor)
}
}
diff --git a/src/model.rs b/src/model.rs
index ea01d4d..fbb6e0b 100644
--- a/src/model.rs
+++ b/src/model.rs
@@ -1,11 +1,10 @@
use burn::{
- nn::conv::{Conv2d, Conv2dConfig},
nn::{Linear, Relu},
prelude::*,
};
-use nn::{LeakyReluConfig, LinearConfig};
+use nn::LinearConfig;
-use burn::record::{FullPrecisionSettings, NamedMpkFileRecorder, Recorder};
+use burn::record::{FullPrecisionSettings, Recorder};
use burn_import::pytorch::PyTorchFileRecorder;
//type ArrayBackend = burn_ndarray::NdArray<f32>;
@@ -50,8 +49,20 @@ impl<B: Backend> Net<B> {
}
}
-/// Load the model from the file in your source code (not in build.rs or script).
-pub fn load_model() -> Net<ArrayBackend> {
+/// Load the p core model from the file in your source code (not in build.rs or script).
+pub fn load_model_p() -> Net<ArrayBackend> {
+ let device = Default::default();
+ let record: NetRecord<ArrayBackend> = PyTorchFileRecorder::<FullPrecisionSettings>::default()
+ .load("./perf.pt".into(), &device)
+ .expect("Failed to decode state");
+
+ Net::<ArrayBackend>::init(&device).load_record(record)
+}
+
+/// Load the e core model from the file in your source code (not in build.rs or script).
+pub fn load_model_e() -> Net<ArrayBackend> {
+ //TODO: load e model
+ println!("Falling back to p model");
let device = Default::default();
let record: NetRecord<ArrayBackend> = PyTorchFileRecorder::<FullPrecisionSettings>::default()
.load("./perf.pt".into(), &device)
diff --git a/src/scheduler.rs b/src/scheduler.rs
index 05a4986..7d2060b 100644
--- a/src/scheduler.rs
+++ b/src/scheduler.rs
@@ -162,9 +162,11 @@ impl<'a> Scheduler<'a> {
// Check if we've seen this task before
match self.managed_tasks.entry(task.pid) {
std::collections::hash_map::Entry::Vacant(e) => {
+ let is_e_core = self.e_cores.contains(&task.cpu);
// New task - register it with the energy service
let task_info = Arc::new(TaskInfo {
cpu: AtomicI32::new(task.cpu),
+ running_on_e_core: is_e_core,
..Default::default()
});
e.insert(Task {
@@ -212,6 +214,16 @@ impl<'a> Scheduler<'a> {
eprintln!("Failed to dispatch task: {}", e);
}
+ if self.e_cores.contains(&dispatched_task.cpu) != self.e_cores.contains(&task.cpu) {
+ self.energy_sender
+ .try_send(EnergyRequest::MigrateTask {
+ pid: task.pid,
+ new_cpu: dispatched_task.cpu,
+ is_e_core: self.e_cores.contains(&dispatched_task.cpu),
+ })
+ .unwrap();
+ }
+
self.bpf.notify_complete(
self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64,
);
@@ -253,6 +265,16 @@ impl<'a> Scheduler<'a> {
eprintln!("Failed to dispatch low-budget task: {}", e);
}
+ if self.e_cores.contains(&dispatched_task.cpu) != self.e_cores.contains(&task.cpu) {
+ self.energy_sender
+ .try_send(EnergyRequest::MigrateTask {
+ pid: task.pid,
+ new_cpu: dispatched_task.cpu,
+ is_e_core: self.e_cores.contains(&dispatched_task.cpu),
+ })
+ .unwrap();
+ }
+
self.bpf.notify_complete(
self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64,
);