summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLennard Kittner <lennard@kittner.dev>2025-04-02 15:08:15 +0200
committerLennard Kittner <lennard@kittner.dev>2025-04-02 15:08:15 +0200
commit6d627db07af9f40aa05622d240bad91fda783858 (patch)
tree9ab2fe2ae1e332a2a1658e2486153d534b3434fb
parentc8c05d29419822aff3554af788e910ec69267406 (diff)
Share last scheduled timestamp with energy service
-rw-r--r--Cargo.lock51
-rw-r--r--Cargo.toml1
-rw-r--r--src/energy.rs47
-rw-r--r--src/scheduler.rs63
4 files changed, 121 insertions, 41 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 60a3996..778f188 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -53,6 +53,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923"
[[package]]
+name = "android-tzdata"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
+
+[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -888,6 +894,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
+name = "chrono"
+version = "0.4.40"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1a7964611d71df112cb1730f2ee67324fcf4d0fc6606acbbe9bfe06df124637c"
+dependencies = [
+ "android-tzdata",
+ "iana-time-zone",
+ "js-sys",
+ "num-traits",
+ "wasm-bindgen",
+ "windows-link",
+]
+
+[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -2478,6 +2498,30 @@ dependencies = [
]
[[package]]
+name = "iana-time-zone"
+version = "0.1.63"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8"
+dependencies = [
+ "android_system_properties",
+ "core-foundation-sys",
+ "iana-time-zone-haiku",
+ "js-sys",
+ "log",
+ "wasm-bindgen",
+ "windows-core 0.58.0",
+]
+
+[[package]]
+name = "iana-time-zone-haiku"
+version = "0.1.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
+dependencies = [
+ "cc",
+]
+
+[[package]]
name = "icu_collections"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -3652,6 +3696,7 @@ dependencies = [
"burn",
"burn-import",
"burn-ndarray",
+ "chrono",
"clap",
"csv",
"ctrlc",
@@ -5802,6 +5847,12 @@ dependencies = [
]
[[package]]
+name = "windows-link"
+version = "0.1.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "76840935b766e1b0a05c0066835fb9ec80071d4c09a16f6bd5f7e655e3c14c38"
+
+[[package]]
name = "windows-result"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index 21ea539..902d3ed 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,6 +17,7 @@ clap = { version = "4.5" , features = ["derive"] }
perf-event = { path = "./perf-event" }
procfs = { version = "0.17.0", default-features = false }
csv = "1.3.1"
+chrono = "0.4.40"
burn = {version = "0.16.0", features = ["openblas-system","candle"] }
bincode = "=2.0.0-rc.3"
bincode_derive = "=2.0.0-rc.3"
diff --git a/src/energy.rs b/src/energy.rs
index 6692a63..9c90ab9 100644
--- a/src/energy.rs
+++ b/src/energy.rs
@@ -6,7 +6,7 @@ mod trackers;
use crate::energy::estimator::Estimator;
use std::collections::{BTreeSet, HashMap};
use std::ops::RangeInclusive;
-use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64};
+use std::sync::atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicU64};
use std::sync::{mpsc, Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};
@@ -19,7 +19,7 @@ pub use budget::BudgetPolicy;
pub use trackers::{KernelDriver, PerfEstimator};
const IDLE_CONSUMPTION_W: f64 = 7.;
-const UPDATE_INTERVAL_MS: u64 = 3;
+const UPDATE_INTERVAL: Duration = Duration::from_millis(3);
pub enum Request {
NewTask(Pid, Arc<TaskInfo>),
@@ -29,6 +29,7 @@ pub struct TaskInfo {
pub cpu: AtomicI32,
pub budget: AtomicU64,
pub running_on_e_core: AtomicBool,
+ pub last_scheduled: AtomicI64,
}
impl TaskInfo {
@@ -42,9 +43,34 @@ impl TaskInfo {
self.running_on_e_core
.load(std::sync::atomic::Ordering::Relaxed)
}
+ pub fn read_time_since_last_schedule(&self) -> Option<Duration> {
+ let old_time = self
+ .last_scheduled
+ .load(std::sync::atomic::Ordering::Relaxed);
+ if old_time == -1 {
+ None
+ } else {
+ let now = chrono::Utc::now().timestamp_micros();
+ Some(Duration::from_micros((now - old_time) as u64))
+ }
+ }
+ pub fn read_time_since_last_schedule_raw(&self) -> i64 {
+ self.last_scheduled
+ .load(std::sync::atomic::Ordering::Relaxed)
+ }
pub fn set_cpu(&self, cpu: i32) {
self.cpu.store(cpu, std::sync::atomic::Ordering::Relaxed);
}
+ pub fn set_last_scheduled_to_now(&self) {
+ self.last_scheduled.store(
+ chrono::Utc::now().timestamp_micros(),
+ std::sync::atomic::Ordering::Relaxed,
+ );
+ }
+ pub fn set_last_scheduled_raw(&self, last_scheduled: i64) {
+ self.last_scheduled
+ .store(last_scheduled, std::sync::atomic::Ordering::Relaxed);
+ }
pub fn set_budget(&self, budget: u64) {
self.budget
.store(budget, std::sync::atomic::Ordering::Relaxed);
@@ -61,6 +87,7 @@ impl Default for TaskInfo {
cpu: Default::default(),
budget: AtomicU64::new(u64::MAX),
running_on_e_core: Default::default(),
+ last_scheduled: AtomicI64::new(-1),
}
}
}
@@ -90,7 +117,6 @@ pub struct EnergyService {
old_rapl: f64,
system_energy: f64,
bias: f64,
- offset: f64,
graveyard: Vec<i32>,
last_measurement: Instant,
}
@@ -120,7 +146,6 @@ impl EnergyService {
old_rapl: 0.,
system_energy: 0.,
bias: 1.,
- offset: 0.,
graveyard: Vec::with_capacity(100),
last_measurement: Instant::now(),
}
@@ -161,8 +186,10 @@ impl EnergyService {
Request::NewTask(pid, task_info) => {
if let Some(info) = self.process_info.write().unwrap().get_mut(&pid) {
let old_budget = task_info.read_budget();
+ let old_time = task_info.read_time_since_last_schedule_raw();
info.task_info = task_info.clone();
info.task_info.set_budget(old_budget);
+ info.task_info.set_last_scheduled_raw(old_time);
return;
}
if self
@@ -181,6 +208,8 @@ impl EnergyService {
process.stat().map(|stat| stat.ppid)
})()
.unwrap_or_default();
+ // We don't care whether the task has been scheduled before the counters are set up
+ task_info.set_last_scheduled_raw(-1);
self.process_info.write().unwrap().insert(
pid,
ProcessInfo {
@@ -216,6 +245,14 @@ impl EnergyService {
for pid in &self.active_processes {
let mut process_info = self.process_info.write().unwrap();
if let Some(info) = process_info.get_mut(&pid) {
+ if info
+ .task_info
+ .read_time_since_last_schedule()
+ .unwrap_or(UPDATE_INTERVAL)
+ >= UPDATE_INTERVAL
+ {
+ continue;
+ }
if let Some(energy) = self.estimator.read_consumption(*pid as u64) {
info.energy += energy * self.bias;
self.estimator.update_information(
@@ -338,7 +375,7 @@ pub fn start_energy_service(
budget_policy,
process_info.clone(),
request_receiver,
- Duration::from_millis(UPDATE_INTERVAL_MS), // 50ms update interval
+ UPDATE_INTERVAL,
shared_cpu_frequency_ranges,
shared_policy_frequency_ranges,
shared_cpu_current_frequencies,
diff --git a/src/scheduler.rs b/src/scheduler.rs
index 412d00e..4d4e987 100644
--- a/src/scheduler.rs
+++ b/src/scheduler.rs
@@ -11,25 +11,19 @@ use std::collections::{HashMap, VecDeque};
use std::mem::MaybeUninit;
use std::ops::{Range, RangeInclusive};
use std::process;
-use std::sync::atomic::{AtomicBool, AtomicI32};
use std::sync::mpsc::TrySendError;
use std::sync::{mpsc, Arc, RwLock};
-use std::time::{Duration, Instant};
+use std::time::Duration;
use crate::Pid;
const SLICE_US: u64 = 100000;
-struct Task {
- task_info: Arc<TaskInfo>,
- last_scheduled: Instant,
-}
-
pub struct Scheduler<'a> {
bpf: BpfScheduler<'a>,
task_queue: VecDeque<QueuedTask>,
no_budget_task_queue: VecDeque<QueuedTask>,
- managed_tasks: HashMap<Pid, Task>,
+ managed_tasks: HashMap<Pid, Arc<TaskInfo>>,
maximum_budget: u64,
tasks_scheduled: u64,
//TODO: also consider Pids of children
@@ -73,7 +67,9 @@ impl<'a> Scheduler<'a> {
let (task_sender, empty_task_infos) = mpsc::sync_channel(200);
std::thread::spawn(move || loop {
- task_sender.send(Arc::new(TaskInfo::default()));
+ if task_sender.send(Arc::new(TaskInfo::default())).is_err() {
+ eprintln!("Failed to allocate TaskInfo");
+ }
});
let topology = Topology::new().unwrap();
@@ -176,10 +172,7 @@ impl<'a> Scheduler<'a> {
let task_info = self.empty_task_infos.recv().unwrap();
task_info.set_cpu(task.cpu);
task_info.set_running_on_e_core(is_e_core);
- e.insert(Task {
- task_info: task_info.clone(),
- last_scheduled: Instant::now(),
- });
+ e.insert(task_info.clone());
self.energy_sender
.try_send(EnergyRequest::NewTask(task.pid, task_info))
.unwrap();
@@ -187,7 +180,7 @@ impl<'a> Scheduler<'a> {
}
std::collections::hash_map::Entry::Occupied(e) => {
// Get current budget for this task
- match e.get().task_info.read_budget() {
+ match e.get().read_budget() {
0 => self.no_budget_task_queue.push_back(task),
_ => self.task_queue.push_back(task),
}
@@ -205,12 +198,6 @@ impl<'a> Scheduler<'a> {
if let Some(task) = self.task_queue.pop_front() {
let mut dispatched_task = DispatchedTask::new(&task);
//TODO: do we have to migrate tasks back from e cores?
- if let Some(task_info) = self.managed_tasks.get_mut(&task.pid) {
- task_info.last_scheduled = Instant::now();
- task_info.task_info.set_cpu(task.cpu);
- } else {
- println!("Tried to dispatch a task which is not part of managed tasks");
- }
let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0);
if cpu >= 0 {
@@ -229,13 +216,13 @@ impl<'a> Scheduler<'a> {
eprintln!("Failed to dispatch task: {}", e);
}
- // Migrating to new cpu
- if dispatched_task.cpu != task.cpu {
- let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu);
- if let Some(entry) = self.managed_tasks.get_mut(&task.pid) {
- entry.task_info.set_cpu(dispatched_task.cpu);
- entry.task_info.set_running_on_e_core(running_on_e_core);
- }
+ let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu);
+ if let Some(entry) = self.managed_tasks.get_mut(&task.pid) {
+ entry.set_cpu(dispatched_task.cpu);
+ entry.set_running_on_e_core(running_on_e_core);
+ entry.set_last_scheduled_to_now();
+ } else {
+ println!("Tried to dispatch a task which is not part of managed tasks");
}
self.bpf.notify_complete(
@@ -257,13 +244,13 @@ impl<'a> Scheduler<'a> {
eprintln!("e core scheduler set cpu to -1");
}
- // Migrating to new cpu
- if dispatched_task.cpu != task.cpu {
- let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu);
- if let Some(entry) = self.managed_tasks.get_mut(&task.pid) {
- entry.task_info.set_cpu(dispatched_task.cpu);
- entry.task_info.set_running_on_e_core(running_on_e_core);
- }
+ let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu);
+ if let Some(entry) = self.managed_tasks.get_mut(&task.pid) {
+ entry.set_cpu(dispatched_task.cpu);
+ entry.set_running_on_e_core(running_on_e_core);
+ entry.set_last_scheduled_to_now();
+ } else {
+ println!("Tried to dispatch a task which is not part of managed tasks");
}
if let Err(e) = self.bpf.dispatch_task(&dispatched_task) {
@@ -278,9 +265,13 @@ impl<'a> Scheduler<'a> {
}
fn cleanup_old_tasks(&mut self) {
- let current = Instant::now();
for (pid, task) in &self.managed_tasks {
- if current - task.last_scheduled > Duration::from_secs(5) {
+ // None means that the task was never scheduled so we should probably keep it
+ if task
+ .read_time_since_last_schedule()
+ .unwrap_or(Duration::from_secs(0))
+ > Duration::from_secs(5)
+ {
self.to_remove.push(*pid);
}
}