diff options
author | Lennard Kittner <lennardkittner@icloud.com> | 2025-03-31 17:28:43 +0200 |
---|---|---|
committer | Lennard Kittner <lennardkittner@icloud.com> | 2025-03-31 17:28:43 +0200 |
commit | 5557ac073d86690986b4a73bdd97d8d5388fa4ce (patch) | |
tree | a3fae91e6da09ed7302f5ddbfe6700c581d68dbe /src | |
parent | 477d9f1190096f4215d1559aa62145460eff0238 (diff) |
Add migration message and e core handling in the estimator
Diffstat (limited to 'src')
-rw-r--r-- | src/energy.rs | 45 | ||||
-rw-r--r-- | src/energy/estimator.rs | 2 | ||||
-rw-r--r-- | src/energy/trackers/kernel.rs | 2 | ||||
-rw-r--r-- | src/energy/trackers/mock.rs | 2 | ||||
-rw-r--r-- | src/energy/trackers/perf.rs | 89 | ||||
-rw-r--r-- | src/model.rs | 21 | ||||
-rw-r--r-- | src/scheduler.rs | 22 |
7 files changed, 131 insertions, 52 deletions
diff --git a/src/energy.rs b/src/energy.rs index 91c77f7..94e82f4 100644 --- a/src/energy.rs +++ b/src/energy.rs @@ -6,7 +6,7 @@ mod trackers; use crate::energy::estimator::Estimator; use std::collections::{BTreeSet, HashMap}; use std::ops::RangeInclusive; -use std::sync::atomic::{AtomicI32, AtomicU64}; +use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64}; use std::sync::{mpsc, Arc, RwLock}; use std::thread; use std::time::Duration; @@ -18,14 +18,19 @@ use crate::Pid; pub use budget::BudgetPolicy; pub use trackers::{KernelDriver, PerfEstimator}; -//TODO: add new command to update process_info or make process_info shared pub enum Request { - NewTask(Pid, Arc<TaskInfo>), + NewTask(Pid, TaskInfo), RemoveTask(Pid), + MigrateTask { + pid: Pid, + new_cpu: i32, + is_e_core: bool, + }, } pub struct TaskInfo { pub cpu: AtomicI32, pub budget: AtomicU64, + pub running_on_e_core: AtomicBool, } impl TaskInfo { @@ -35,6 +40,9 @@ impl TaskInfo { pub fn read_budget(&self) -> u64 { self.budget.load(std::sync::atomic::Ordering::Relaxed) } + pub fn is_running_on_e_core(&self) -> bool { + self.running_on_e_core.load(std::sync::atomic::Ordering::Relaxed) + } pub fn set_cpu(&self, cpu: i32) { self.cpu.store(cpu, std::sync::atomic::Ordering::Relaxed); } @@ -42,6 +50,10 @@ impl TaskInfo { self.budget .store(budget, std::sync::atomic::Ordering::Relaxed); } + pub fn set_running_on_e_core(&self, running_on_e_core: bool) { + self.running_on_e_core + .store(running_on_e_core, std::sync::atomic::Ordering::Relaxed); + } } impl Default for TaskInfo { @@ -49,6 +61,7 @@ impl Default for TaskInfo { Self { cpu: Default::default(), budget: AtomicU64::new(u64::MAX), + running_on_e_core: Default::default() } } } @@ -136,9 +149,14 @@ impl EnergyService { match request { Request::NewTask(pid, task_info) => { if self.process_info.read().unwrap().contains_key(&pid) { + self.handle_request(Request::MigrateTask { + pid, + new_cpu: cpu, + is_e_core, + }); return; } - self.estimator.start_trace(pid as u64); + self.estimator.start_trace(pid as u64, is_e_core); let parent = (|| { let process = procfs::process::Process::new(pid)?; process.stat().map(|stat| stat.ppid) @@ -168,6 +186,21 @@ impl EnergyService { self.process_info.write().unwrap().remove(&pid); self.process_info.write().unwrap().remove(&pid); } + Request::MigrateTask { + pid, + new_cpu, + is_e_core, + } => { + if let Some(info) = self.process_info.write().unwrap().get_mut(&pid) { + info.cpu = new_cpu; + if info.running_on_e_core != is_e_core { + //TODO: maybe read consumption before removing? + self.estimator.stop_trace(pid as u64); + self.estimator.start_trace(pid as u64, is_e_core); + } + info.running_on_e_core = is_e_core; + } + } } } @@ -177,8 +210,8 @@ impl EnergyService { .read() .unwrap() .get(&1) - .unwrap() - .tree_energy; + .map(|info| info.tree_energy) + .unwrap_or(0.); for pid in &self.active_processes { let mut process_info = self.process_info.write().unwrap(); if let Some(info) = process_info.get_mut(&pid) { diff --git a/src/energy/estimator.rs b/src/energy/estimator.rs index 85cb0eb..7964b00 100644 --- a/src/energy/estimator.rs +++ b/src/energy/estimator.rs @@ -1,5 +1,5 @@ pub trait Estimator: Send + 'static { - fn start_trace(&mut self, pid: u64); + fn start_trace(&mut self, pid: u64, running_on_e_core: bool); fn stop_trace(&mut self, pid: u64); fn read_consumption(&mut self, pid: u64, cpu: i32) -> Option<f64>; } diff --git a/src/energy/trackers/kernel.rs b/src/energy/trackers/kernel.rs index 89a0d89..f7e0811 100644 --- a/src/energy/trackers/kernel.rs +++ b/src/energy/trackers/kernel.rs @@ -30,7 +30,7 @@ const STOP_TRACE: Ioctl<Write, &u64> = unsafe { PERF_MON.write(0x81) }; const READ_POWER: Ioctl<WriteRead, &u64> = unsafe { PERF_MON.write_read(0x82) }; impl Estimator for KernelDriver { - fn start_trace(&mut self, pid: u64) { + fn start_trace(&mut self, pid: u64, running_on_e_core: bool) { let _ = START_TRACE.ioctl(&mut self.file, &pid); } diff --git a/src/energy/trackers/mock.rs b/src/energy/trackers/mock.rs index 597cab0..b2a504e 100644 --- a/src/energy/trackers/mock.rs +++ b/src/energy/trackers/mock.rs @@ -3,7 +3,7 @@ use crate::energy::estimator::Estimator; pub struct MockEstimator; impl Estimator for MockEstimator { - fn start_trace(&mut self, _pid: u64) {} + fn start_trace(&mut self, _pid: u64, _running_on_e_core: bool) {} fn stop_trace(&mut self, _pid: u64) {} diff --git a/src/energy/trackers/perf.rs b/src/energy/trackers/perf.rs index 71e87d8..8da2bf8 100644 --- a/src/energy/trackers/perf.rs +++ b/src/energy/trackers/perf.rs @@ -9,22 +9,25 @@ use perf_event::{ use crate::freq::FrequencyKHZ; use std::sync::{Arc, RwLock}; -use crate::energy::{rapl, Estimator}; +use crate::energy::Estimator; use crate::model::ArrayBackend; pub struct PerfEstimator { registry: HashMap<u64, Counters>, - model: crate::model::Net<ArrayBackend>, + model_p: crate::model::Net<ArrayBackend>, + model_e: crate::model::Net<ArrayBackend>, device: <ArrayBackend as burn::prelude::Backend>::Device, shared_cpu_current_frequencies: Arc<RwLock<Vec<FrequencyKHZ>>>, } impl PerfEstimator { pub fn new(shared_cpu_current_frequencies: Arc<RwLock<Vec<FrequencyKHZ>>>) -> Self { - let model = crate::model::load_model(); + let model_p = crate::model::load_model_p(); + let model_e = crate::model::load_model_e(); Self { registry: Default::default(), - model, + model_p, + model_e, device: Default::default(), shared_cpu_current_frequencies, } @@ -36,9 +39,21 @@ struct Counters { counters: Vec<Counter>, old_time: u64, old_total_energy: f64, + running_on_e_core: bool, } -static EVENT_TYPES: &[Event] = &[ +static EVENT_TYPES_P: &[Event] = &[ + Event::Hardware(Hardware::BRANCH_INSTRUCTIONS), + Event::Hardware(Hardware::BRANCH_MISSES), + Event::Hardware(Hardware::CACHE_MISSES), + Event::Hardware(Hardware::CACHE_REFERENCES), + Event::Hardware(Hardware::CPU_CYCLES), + Event::Hardware(Hardware::INSTRUCTIONS), + Event::Hardware(Hardware::REF_CPU_CYCLES), + Event::Software(Software::TASK_CLOCK), +]; +//TODO: use correct counter +static EVENT_TYPES_E: &[Event] = &[ Event::Hardware(Hardware::BRANCH_INSTRUCTIONS), Event::Hardware(Hardware::BRANCH_MISSES), Event::Hardware(Hardware::CACHE_MISSES), @@ -50,8 +65,7 @@ static EVENT_TYPES: &[Event] = &[ ]; impl Estimator for PerfEstimator { - fn start_trace(&mut self, pid: u64) { - //let Ok(mut group) = Group::new_with_pid_and_cpu(-1, 0) else { + fn start_trace(&mut self, pid: u64, running_on_e_core: bool) { let mut group = match Group::new_with_pid_and_cpu(pid as i32, -1) { Ok(counters) => counters, Err(e) => { @@ -62,23 +76,21 @@ impl Estimator for PerfEstimator { return; } }; - //let Ok(mut group) = Group::new_with_pid_and_cpu(pid as i32, -1) else { - // eprintln!("Failed to create performance counter group for PID {}", pid); - // return; - //}; - - let counters: Result<Vec<_>, _> = EVENT_TYPES - .iter() - .map(|kind| { - Builder::new() - .group(&mut group) - .kind(kind.clone()) - .observe_pid(pid as i32) - //.observe_pid(-1) - //.one_cpu(0) - .build() - }) - .collect(); + + let counters: Result<Vec<_>, _> = if running_on_e_core { + EVENT_TYPES_E + } else { + EVENT_TYPES_P + } + .iter() + .map(|kind| { + Builder::new() + .group(&mut group) + .kind(kind.clone()) + .observe_pid(pid as i32) + .build() + }) + .collect(); let counters = match counters { Ok(counters) => counters, @@ -95,13 +107,17 @@ impl Estimator for PerfEstimator { eprintln!("Failed to enable performance counters: {}", e); return; } - group.reset(); + if let Err(e) = group.reset() { + eprintln!("Failed to reset performance counters: {}", e); + return; + } let counters = Counters { counters, group, old_time: 0, old_total_energy: 0., + running_on_e_core, }; self.registry.insert(pid, counters); } @@ -122,36 +138,33 @@ impl Estimator for PerfEstimator { return None; } }; - //let task_clock = counts[&counters.counters[7]]; - let task_clock = counts[&counters.counters[7]]; + let num_counter = counters.counters.len(); + let task_clock = counts[&counters.counters[num_counter - 1]]; if task_clock == 0 { return None; } - let current_time = std::time::Instant::now(); let time_running_ns = counts.time_running(); let correction_factor = 10_000_000. / (time_running_ns - counters.old_time) as f64; counters.old_time = time_running_ns; - //dbg!(time_running_ns); - //dbg!(counters.old_time.elapsed().as_millis()); let mut values = vec![(self.shared_cpu_current_frequencies.read().unwrap()[cpu as usize] / 1000) as f64]; - //let mut values = vec![crate::benchmark::read_cpu_frequency(0).unwrap()]; - for ty in counters.counters.iter().take(7) { + for ty in counters.counters.iter().take(num_counter - 1) { let count: u64 = counts[&ty]; values.push((count as f64) * correction_factor); } - //dbg!(&values); - let result = self - .model - .forward(Tensor::from_floats(&values.as_slice()[0..8], &self.device)); + + let result = if counters.running_on_e_core { + &self.model_e + } else { + &self.model_p + } + .forward(Tensor::from_floats(&values.as_slice()[0..], &self.device)); let energy = result.into_scalar() as f64; - //dbg!(energy, correction_factor); counters.old_total_energy += energy / correction_factor; counters.group.reset().unwrap(); - //dbg!(counters.old_total_energy); Some(energy / correction_factor) } } diff --git a/src/model.rs b/src/model.rs index ea01d4d..fbb6e0b 100644 --- a/src/model.rs +++ b/src/model.rs @@ -1,11 +1,10 @@ use burn::{ - nn::conv::{Conv2d, Conv2dConfig}, nn::{Linear, Relu}, prelude::*, }; -use nn::{LeakyReluConfig, LinearConfig}; +use nn::LinearConfig; -use burn::record::{FullPrecisionSettings, NamedMpkFileRecorder, Recorder}; +use burn::record::{FullPrecisionSettings, Recorder}; use burn_import::pytorch::PyTorchFileRecorder; //type ArrayBackend = burn_ndarray::NdArray<f32>; @@ -50,8 +49,20 @@ impl<B: Backend> Net<B> { } } -/// Load the model from the file in your source code (not in build.rs or script). -pub fn load_model() -> Net<ArrayBackend> { +/// Load the p core model from the file in your source code (not in build.rs or script). +pub fn load_model_p() -> Net<ArrayBackend> { + let device = Default::default(); + let record: NetRecord<ArrayBackend> = PyTorchFileRecorder::<FullPrecisionSettings>::default() + .load("./perf.pt".into(), &device) + .expect("Failed to decode state"); + + Net::<ArrayBackend>::init(&device).load_record(record) +} + +/// Load the e core model from the file in your source code (not in build.rs or script). +pub fn load_model_e() -> Net<ArrayBackend> { + //TODO: load e model + println!("Falling back to p model"); let device = Default::default(); let record: NetRecord<ArrayBackend> = PyTorchFileRecorder::<FullPrecisionSettings>::default() .load("./perf.pt".into(), &device) diff --git a/src/scheduler.rs b/src/scheduler.rs index 05a4986..7d2060b 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -162,9 +162,11 @@ impl<'a> Scheduler<'a> { // Check if we've seen this task before match self.managed_tasks.entry(task.pid) { std::collections::hash_map::Entry::Vacant(e) => { + let is_e_core = self.e_cores.contains(&task.cpu); // New task - register it with the energy service let task_info = Arc::new(TaskInfo { cpu: AtomicI32::new(task.cpu), + running_on_e_core: is_e_core, ..Default::default() }); e.insert(Task { @@ -212,6 +214,16 @@ impl<'a> Scheduler<'a> { eprintln!("Failed to dispatch task: {}", e); } + if self.e_cores.contains(&dispatched_task.cpu) != self.e_cores.contains(&task.cpu) { + self.energy_sender + .try_send(EnergyRequest::MigrateTask { + pid: task.pid, + new_cpu: dispatched_task.cpu, + is_e_core: self.e_cores.contains(&dispatched_task.cpu), + }) + .unwrap(); + } + self.bpf.notify_complete( self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64, ); @@ -253,6 +265,16 @@ impl<'a> Scheduler<'a> { eprintln!("Failed to dispatch low-budget task: {}", e); } + if self.e_cores.contains(&dispatched_task.cpu) != self.e_cores.contains(&task.cpu) { + self.energy_sender + .try_send(EnergyRequest::MigrateTask { + pid: task.pid, + new_cpu: dispatched_task.cpu, + is_e_core: self.e_cores.contains(&dispatched_task.cpu), + }) + .unwrap(); + } + self.bpf.notify_complete( self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64, ); |