diff options
author | Lennard Kittner <lennard@kittner.dev> | 2025-04-02 15:08:15 +0200 |
---|---|---|
committer | Lennard Kittner <lennard@kittner.dev> | 2025-04-02 15:08:15 +0200 |
commit | 6d627db07af9f40aa05622d240bad91fda783858 (patch) | |
tree | 9ab2fe2ae1e332a2a1658e2486153d534b3434fb | |
parent | c8c05d29419822aff3554af788e910ec69267406 (diff) |
Share last scheduled timestamp with energy service
-rw-r--r-- | Cargo.lock | 51 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/energy.rs | 47 | ||||
-rw-r--r-- | src/scheduler.rs | 63 |
4 files changed, 121 insertions, 41 deletions
@@ -53,6 +53,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] name = "android_system_properties" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -888,6 +894,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" [[package]] +name = "chrono" +version = "0.4.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-link", +] + +[[package]] name = "cipher" version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2478,6 +2498,30 @@ dependencies = [ ] [[package]] +name = "iana-time-zone" +version = "0.1.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core 0.58.0", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] name = "icu_collections" version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3652,6 +3696,7 @@ dependencies = [ "burn", "burn-import", "burn-ndarray", + "chrono", "clap", "csv", "ctrlc", @@ -5802,6 +5847,12 @@ dependencies = [ ] [[package]] +name = "windows-link" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38" + +[[package]] name = "windows-result" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -17,6 +17,7 @@ clap = { version = "4.5" , features = ["derive"] } perf-event = { path = "./perf-event" } procfs = { version = "0.17.0", default-features = false } csv = "1.3.1" +chrono = "0.4.40" burn = {version = "0.16.0", features = ["openblas-system","candle"] } bincode = "=2.0.0-rc.3" bincode_derive = "=2.0.0-rc.3" diff --git a/src/energy.rs b/src/energy.rs index 6692a63..9c90ab9 100644 --- a/src/energy.rs +++ b/src/energy.rs @@ -6,7 +6,7 @@ mod trackers; use crate::energy::estimator::Estimator; use std::collections::{BTreeSet, HashMap}; use std::ops::RangeInclusive; -use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64}; +use std::sync::atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicU64}; use std::sync::{mpsc, Arc, RwLock}; use std::thread; use std::time::{Duration, Instant}; @@ -19,7 +19,7 @@ pub use budget::BudgetPolicy; pub use trackers::{KernelDriver, PerfEstimator}; const IDLE_CONSUMPTION_W: f64 = 7.; -const UPDATE_INTERVAL_MS: u64 = 3; +const UPDATE_INTERVAL: Duration = Duration::from_millis(3); pub enum Request { NewTask(Pid, Arc<TaskInfo>), @@ -29,6 +29,7 @@ pub struct TaskInfo { pub cpu: AtomicI32, pub budget: AtomicU64, pub running_on_e_core: AtomicBool, + pub last_scheduled: AtomicI64, } impl TaskInfo { @@ -42,9 +43,34 @@ impl TaskInfo { self.running_on_e_core .load(std::sync::atomic::Ordering::Relaxed) } + pub fn read_time_since_last_schedule(&self) -> Option<Duration> { + let old_time = self + .last_scheduled + .load(std::sync::atomic::Ordering::Relaxed); + if old_time == -1 { + None + } else { + let now = chrono::Utc::now().timestamp_micros(); + Some(Duration::from_micros((now - old_time) as u64)) + } + } + pub fn read_time_since_last_schedule_raw(&self) -> i64 { + self.last_scheduled + .load(std::sync::atomic::Ordering::Relaxed) + } pub fn set_cpu(&self, cpu: i32) { self.cpu.store(cpu, std::sync::atomic::Ordering::Relaxed); } + pub fn set_last_scheduled_to_now(&self) { + self.last_scheduled.store( + chrono::Utc::now().timestamp_micros(), + std::sync::atomic::Ordering::Relaxed, + ); + } + pub fn set_last_scheduled_raw(&self, last_scheduled: i64) { + self.last_scheduled + .store(last_scheduled, std::sync::atomic::Ordering::Relaxed); + } pub fn set_budget(&self, budget: u64) { self.budget .store(budget, std::sync::atomic::Ordering::Relaxed); @@ -61,6 +87,7 @@ impl Default for TaskInfo { cpu: Default::default(), budget: AtomicU64::new(u64::MAX), running_on_e_core: Default::default(), + last_scheduled: AtomicI64::new(-1), } } } @@ -90,7 +117,6 @@ pub struct EnergyService { old_rapl: f64, system_energy: f64, bias: f64, - offset: f64, graveyard: Vec<i32>, last_measurement: Instant, } @@ -120,7 +146,6 @@ impl EnergyService { old_rapl: 0., system_energy: 0., bias: 1., - offset: 0., graveyard: Vec::with_capacity(100), last_measurement: Instant::now(), } @@ -161,8 +186,10 @@ impl EnergyService { Request::NewTask(pid, task_info) => { if let Some(info) = self.process_info.write().unwrap().get_mut(&pid) { let old_budget = task_info.read_budget(); + let old_time = task_info.read_time_since_last_schedule_raw(); info.task_info = task_info.clone(); info.task_info.set_budget(old_budget); + info.task_info.set_last_scheduled_raw(old_time); return; } if self @@ -181,6 +208,8 @@ impl EnergyService { process.stat().map(|stat| stat.ppid) })() .unwrap_or_default(); + // We don't care whether the task has been scheduled before the counters are set up + task_info.set_last_scheduled_raw(-1); self.process_info.write().unwrap().insert( pid, ProcessInfo { @@ -216,6 +245,14 @@ impl EnergyService { for pid in &self.active_processes { let mut process_info = self.process_info.write().unwrap(); if let Some(info) = process_info.get_mut(&pid) { + if info + .task_info + .read_time_since_last_schedule() + .unwrap_or(UPDATE_INTERVAL) + >= UPDATE_INTERVAL + { + continue; + } if let Some(energy) = self.estimator.read_consumption(*pid as u64) { info.energy += energy * self.bias; self.estimator.update_information( @@ -338,7 +375,7 @@ pub fn start_energy_service( budget_policy, process_info.clone(), request_receiver, - Duration::from_millis(UPDATE_INTERVAL_MS), // 50ms update interval + UPDATE_INTERVAL, shared_cpu_frequency_ranges, shared_policy_frequency_ranges, shared_cpu_current_frequencies, diff --git a/src/scheduler.rs b/src/scheduler.rs index 412d00e..4d4e987 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -11,25 +11,19 @@ use std::collections::{HashMap, VecDeque}; use std::mem::MaybeUninit; use std::ops::{Range, RangeInclusive}; use std::process; -use std::sync::atomic::{AtomicBool, AtomicI32}; use std::sync::mpsc::TrySendError; use std::sync::{mpsc, Arc, RwLock}; -use std::time::{Duration, Instant}; +use std::time::Duration; use crate::Pid; const SLICE_US: u64 = 100000; -struct Task { - task_info: Arc<TaskInfo>, - last_scheduled: Instant, -} - pub struct Scheduler<'a> { bpf: BpfScheduler<'a>, task_queue: VecDeque<QueuedTask>, no_budget_task_queue: VecDeque<QueuedTask>, - managed_tasks: HashMap<Pid, Task>, + managed_tasks: HashMap<Pid, Arc<TaskInfo>>, maximum_budget: u64, tasks_scheduled: u64, //TODO: also consider Pids of children @@ -73,7 +67,9 @@ impl<'a> Scheduler<'a> { let (task_sender, empty_task_infos) = mpsc::sync_channel(200); std::thread::spawn(move || loop { - task_sender.send(Arc::new(TaskInfo::default())); + if task_sender.send(Arc::new(TaskInfo::default())).is_err() { + eprintln!("Failed to allocate TaskInfo"); + } }); let topology = Topology::new().unwrap(); @@ -176,10 +172,7 @@ impl<'a> Scheduler<'a> { let task_info = self.empty_task_infos.recv().unwrap(); task_info.set_cpu(task.cpu); task_info.set_running_on_e_core(is_e_core); - e.insert(Task { - task_info: task_info.clone(), - last_scheduled: Instant::now(), - }); + e.insert(task_info.clone()); self.energy_sender .try_send(EnergyRequest::NewTask(task.pid, task_info)) .unwrap(); @@ -187,7 +180,7 @@ impl<'a> Scheduler<'a> { } std::collections::hash_map::Entry::Occupied(e) => { // Get current budget for this task - match e.get().task_info.read_budget() { + match e.get().read_budget() { 0 => self.no_budget_task_queue.push_back(task), _ => self.task_queue.push_back(task), } @@ -205,12 +198,6 @@ impl<'a> Scheduler<'a> { if let Some(task) = self.task_queue.pop_front() { let mut dispatched_task = DispatchedTask::new(&task); //TODO: do we have to migrate tasks back from e cores? - if let Some(task_info) = self.managed_tasks.get_mut(&task.pid) { - task_info.last_scheduled = Instant::now(); - task_info.task_info.set_cpu(task.cpu); - } else { - println!("Tried to dispatch a task which is not part of managed tasks"); - } let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0); if cpu >= 0 { @@ -229,13 +216,13 @@ impl<'a> Scheduler<'a> { eprintln!("Failed to dispatch task: {}", e); } - // Migrating to new cpu - if dispatched_task.cpu != task.cpu { - let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu); - if let Some(entry) = self.managed_tasks.get_mut(&task.pid) { - entry.task_info.set_cpu(dispatched_task.cpu); - entry.task_info.set_running_on_e_core(running_on_e_core); - } + let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu); + if let Some(entry) = self.managed_tasks.get_mut(&task.pid) { + entry.set_cpu(dispatched_task.cpu); + entry.set_running_on_e_core(running_on_e_core); + entry.set_last_scheduled_to_now(); + } else { + println!("Tried to dispatch a task which is not part of managed tasks"); } self.bpf.notify_complete( @@ -257,13 +244,13 @@ impl<'a> Scheduler<'a> { eprintln!("e core scheduler set cpu to -1"); } - // Migrating to new cpu - if dispatched_task.cpu != task.cpu { - let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu); - if let Some(entry) = self.managed_tasks.get_mut(&task.pid) { - entry.task_info.set_cpu(dispatched_task.cpu); - entry.task_info.set_running_on_e_core(running_on_e_core); - } + let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu); + if let Some(entry) = self.managed_tasks.get_mut(&task.pid) { + entry.set_cpu(dispatched_task.cpu); + entry.set_running_on_e_core(running_on_e_core); + entry.set_last_scheduled_to_now(); + } else { + println!("Tried to dispatch a task which is not part of managed tasks"); } if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { @@ -278,9 +265,13 @@ impl<'a> Scheduler<'a> { } fn cleanup_old_tasks(&mut self) { - let current = Instant::now(); for (pid, task) in &self.managed_tasks { - if current - task.last_scheduled > Duration::from_secs(5) { + // None means that the task was never scheduled so we should probably keep it + if task + .read_time_since_last_schedule() + .unwrap_or(Duration::from_secs(0)) + > Duration::from_secs(5) + { self.to_remove.push(*pid); } } |