summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorLennard Kittner <lennard@kittner.dev>2025-04-02 15:08:15 +0200
committerLennard Kittner <lennard@kittner.dev>2025-04-02 15:08:15 +0200
commit6d627db07af9f40aa05622d240bad91fda783858 (patch)
tree9ab2fe2ae1e332a2a1658e2486153d534b3434fb /src
parentc8c05d29419822aff3554af788e910ec69267406 (diff)
Share last scheduled timestamp with energy service
Diffstat (limited to 'src')
-rw-r--r--src/energy.rs47
-rw-r--r--src/scheduler.rs63
2 files changed, 69 insertions, 41 deletions
diff --git a/src/energy.rs b/src/energy.rs
index 6692a63..9c90ab9 100644
--- a/src/energy.rs
+++ b/src/energy.rs
@@ -6,7 +6,7 @@ mod trackers;
use crate::energy::estimator::Estimator;
use std::collections::{BTreeSet, HashMap};
use std::ops::RangeInclusive;
-use std::sync::atomic::{AtomicBool, AtomicI32, AtomicU64};
+use std::sync::atomic::{AtomicBool, AtomicI32, AtomicI64, AtomicU64};
use std::sync::{mpsc, Arc, RwLock};
use std::thread;
use std::time::{Duration, Instant};
@@ -19,7 +19,7 @@ pub use budget::BudgetPolicy;
pub use trackers::{KernelDriver, PerfEstimator};
const IDLE_CONSUMPTION_W: f64 = 7.;
-const UPDATE_INTERVAL_MS: u64 = 3;
+const UPDATE_INTERVAL: Duration = Duration::from_millis(3);
pub enum Request {
NewTask(Pid, Arc<TaskInfo>),
@@ -29,6 +29,7 @@ pub struct TaskInfo {
pub cpu: AtomicI32,
pub budget: AtomicU64,
pub running_on_e_core: AtomicBool,
+ pub last_scheduled: AtomicI64,
}
impl TaskInfo {
@@ -42,9 +43,34 @@ impl TaskInfo {
self.running_on_e_core
.load(std::sync::atomic::Ordering::Relaxed)
}
+ pub fn read_time_since_last_schedule(&self) -> Option<Duration> {
+ let old_time = self
+ .last_scheduled
+ .load(std::sync::atomic::Ordering::Relaxed);
+ if old_time == -1 {
+ None
+ } else {
+ let now = chrono::Utc::now().timestamp_micros();
+ Some(Duration::from_micros((now - old_time) as u64))
+ }
+ }
+ pub fn read_time_since_last_schedule_raw(&self) -> i64 {
+ self.last_scheduled
+ .load(std::sync::atomic::Ordering::Relaxed)
+ }
pub fn set_cpu(&self, cpu: i32) {
self.cpu.store(cpu, std::sync::atomic::Ordering::Relaxed);
}
+ pub fn set_last_scheduled_to_now(&self) {
+ self.last_scheduled.store(
+ chrono::Utc::now().timestamp_micros(),
+ std::sync::atomic::Ordering::Relaxed,
+ );
+ }
+ pub fn set_last_scheduled_raw(&self, last_scheduled: i64) {
+ self.last_scheduled
+ .store(last_scheduled, std::sync::atomic::Ordering::Relaxed);
+ }
pub fn set_budget(&self, budget: u64) {
self.budget
.store(budget, std::sync::atomic::Ordering::Relaxed);
@@ -61,6 +87,7 @@ impl Default for TaskInfo {
cpu: Default::default(),
budget: AtomicU64::new(u64::MAX),
running_on_e_core: Default::default(),
+ last_scheduled: AtomicI64::new(-1),
}
}
}
@@ -90,7 +117,6 @@ pub struct EnergyService {
old_rapl: f64,
system_energy: f64,
bias: f64,
- offset: f64,
graveyard: Vec<i32>,
last_measurement: Instant,
}
@@ -120,7 +146,6 @@ impl EnergyService {
old_rapl: 0.,
system_energy: 0.,
bias: 1.,
- offset: 0.,
graveyard: Vec::with_capacity(100),
last_measurement: Instant::now(),
}
@@ -161,8 +186,10 @@ impl EnergyService {
Request::NewTask(pid, task_info) => {
if let Some(info) = self.process_info.write().unwrap().get_mut(&pid) {
let old_budget = task_info.read_budget();
+ let old_time = task_info.read_time_since_last_schedule_raw();
info.task_info = task_info.clone();
info.task_info.set_budget(old_budget);
+ info.task_info.set_last_scheduled_raw(old_time);
return;
}
if self
@@ -181,6 +208,8 @@ impl EnergyService {
process.stat().map(|stat| stat.ppid)
})()
.unwrap_or_default();
+ // We don't care whether the task has been scheduled before the counters are set up
+ task_info.set_last_scheduled_raw(-1);
self.process_info.write().unwrap().insert(
pid,
ProcessInfo {
@@ -216,6 +245,14 @@ impl EnergyService {
for pid in &self.active_processes {
let mut process_info = self.process_info.write().unwrap();
if let Some(info) = process_info.get_mut(&pid) {
+ if info
+ .task_info
+ .read_time_since_last_schedule()
+ .unwrap_or(UPDATE_INTERVAL)
+ >= UPDATE_INTERVAL
+ {
+ continue;
+ }
if let Some(energy) = self.estimator.read_consumption(*pid as u64) {
info.energy += energy * self.bias;
self.estimator.update_information(
@@ -338,7 +375,7 @@ pub fn start_energy_service(
budget_policy,
process_info.clone(),
request_receiver,
- Duration::from_millis(UPDATE_INTERVAL_MS), // 50ms update interval
+ UPDATE_INTERVAL,
shared_cpu_frequency_ranges,
shared_policy_frequency_ranges,
shared_cpu_current_frequencies,
diff --git a/src/scheduler.rs b/src/scheduler.rs
index 412d00e..4d4e987 100644
--- a/src/scheduler.rs
+++ b/src/scheduler.rs
@@ -11,25 +11,19 @@ use std::collections::{HashMap, VecDeque};
use std::mem::MaybeUninit;
use std::ops::{Range, RangeInclusive};
use std::process;
-use std::sync::atomic::{AtomicBool, AtomicI32};
use std::sync::mpsc::TrySendError;
use std::sync::{mpsc, Arc, RwLock};
-use std::time::{Duration, Instant};
+use std::time::Duration;
use crate::Pid;
const SLICE_US: u64 = 100000;
-struct Task {
- task_info: Arc<TaskInfo>,
- last_scheduled: Instant,
-}
-
pub struct Scheduler<'a> {
bpf: BpfScheduler<'a>,
task_queue: VecDeque<QueuedTask>,
no_budget_task_queue: VecDeque<QueuedTask>,
- managed_tasks: HashMap<Pid, Task>,
+ managed_tasks: HashMap<Pid, Arc<TaskInfo>>,
maximum_budget: u64,
tasks_scheduled: u64,
//TODO: also consider Pids of children
@@ -73,7 +67,9 @@ impl<'a> Scheduler<'a> {
let (task_sender, empty_task_infos) = mpsc::sync_channel(200);
std::thread::spawn(move || loop {
- task_sender.send(Arc::new(TaskInfo::default()));
+ if task_sender.send(Arc::new(TaskInfo::default())).is_err() {
+ eprintln!("Failed to allocate TaskInfo");
+ }
});
let topology = Topology::new().unwrap();
@@ -176,10 +172,7 @@ impl<'a> Scheduler<'a> {
let task_info = self.empty_task_infos.recv().unwrap();
task_info.set_cpu(task.cpu);
task_info.set_running_on_e_core(is_e_core);
- e.insert(Task {
- task_info: task_info.clone(),
- last_scheduled: Instant::now(),
- });
+ e.insert(task_info.clone());
self.energy_sender
.try_send(EnergyRequest::NewTask(task.pid, task_info))
.unwrap();
@@ -187,7 +180,7 @@ impl<'a> Scheduler<'a> {
}
std::collections::hash_map::Entry::Occupied(e) => {
// Get current budget for this task
- match e.get().task_info.read_budget() {
+ match e.get().read_budget() {
0 => self.no_budget_task_queue.push_back(task),
_ => self.task_queue.push_back(task),
}
@@ -205,12 +198,6 @@ impl<'a> Scheduler<'a> {
if let Some(task) = self.task_queue.pop_front() {
let mut dispatched_task = DispatchedTask::new(&task);
//TODO: do we have to migrate tasks back from e cores?
- if let Some(task_info) = self.managed_tasks.get_mut(&task.pid) {
- task_info.last_scheduled = Instant::now();
- task_info.task_info.set_cpu(task.cpu);
- } else {
- println!("Tried to dispatch a task which is not part of managed tasks");
- }
let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0);
if cpu >= 0 {
@@ -229,13 +216,13 @@ impl<'a> Scheduler<'a> {
eprintln!("Failed to dispatch task: {}", e);
}
- // Migrating to new cpu
- if dispatched_task.cpu != task.cpu {
- let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu);
- if let Some(entry) = self.managed_tasks.get_mut(&task.pid) {
- entry.task_info.set_cpu(dispatched_task.cpu);
- entry.task_info.set_running_on_e_core(running_on_e_core);
- }
+ let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu);
+ if let Some(entry) = self.managed_tasks.get_mut(&task.pid) {
+ entry.set_cpu(dispatched_task.cpu);
+ entry.set_running_on_e_core(running_on_e_core);
+ entry.set_last_scheduled_to_now();
+ } else {
+ println!("Tried to dispatch a task which is not part of managed tasks");
}
self.bpf.notify_complete(
@@ -257,13 +244,13 @@ impl<'a> Scheduler<'a> {
eprintln!("e core scheduler set cpu to -1");
}
- // Migrating to new cpu
- if dispatched_task.cpu != task.cpu {
- let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu);
- if let Some(entry) = self.managed_tasks.get_mut(&task.pid) {
- entry.task_info.set_cpu(dispatched_task.cpu);
- entry.task_info.set_running_on_e_core(running_on_e_core);
- }
+ let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu);
+ if let Some(entry) = self.managed_tasks.get_mut(&task.pid) {
+ entry.set_cpu(dispatched_task.cpu);
+ entry.set_running_on_e_core(running_on_e_core);
+ entry.set_last_scheduled_to_now();
+ } else {
+ println!("Tried to dispatch a task which is not part of managed tasks");
}
if let Err(e) = self.bpf.dispatch_task(&dispatched_task) {
@@ -278,9 +265,13 @@ impl<'a> Scheduler<'a> {
}
fn cleanup_old_tasks(&mut self) {
- let current = Instant::now();
for (pid, task) in &self.managed_tasks {
- if current - task.last_scheduled > Duration::from_secs(5) {
+ // None means that the task was never scheduled so we should probably keep it
+ if task
+ .read_time_since_last_schedule()
+ .unwrap_or(Duration::from_secs(0))
+ > Duration::from_secs(5)
+ {
self.to_remove.push(*pid);
}
}