diff options
author | Dennis Kobert <dennis@kobert.dev> | 2025-03-31 16:06:00 +0200 |
---|---|---|
committer | Dennis Kobert <dennis@kobert.dev> | 2025-03-31 16:06:00 +0200 |
commit | 5572834e86dc24284e8bd9e063936004adf02894 (patch) | |
tree | 9b763d2acf8dfd80687961a00dd0a95cb6462508 /src | |
parent | fcd46995f4d02d19fc63568ff1fcdaf16e7129f3 (diff) |
Use atomic for updating process state instead of sharing a hashmap
Diffstat (limited to 'src')
-rw-r--r-- | src/energy.rs | 58 | ||||
-rw-r--r-- | src/freq.rs | 4 | ||||
-rw-r--r-- | src/scheduler.rs | 67 | ||||
-rw-r--r-- | src/socket.rs | 2 |
4 files changed, 83 insertions, 48 deletions
diff --git a/src/energy.rs b/src/energy.rs index 02aaf62..91c77f7 100644 --- a/src/energy.rs +++ b/src/energy.rs @@ -6,6 +6,7 @@ mod trackers; use crate::energy::estimator::Estimator; use std::collections::{BTreeSet, HashMap}; use std::ops::RangeInclusive; +use std::sync::atomic::{AtomicI32, AtomicU64}; use std::sync::{mpsc, Arc, RwLock}; use std::thread; use std::time::Duration; @@ -13,16 +14,44 @@ use std::time::Duration; use crate::freq::FrequencyKHZ; use crate::socket; use crate::Pid; -use dashmap::DashMap; pub use budget::BudgetPolicy; pub use trackers::{KernelDriver, PerfEstimator}; //TODO: add new command to update process_info or make process_info shared pub enum Request { - NewTask(Pid, i32), + NewTask(Pid, Arc<TaskInfo>), RemoveTask(Pid), } +pub struct TaskInfo { + pub cpu: AtomicI32, + pub budget: AtomicU64, +} + +impl TaskInfo { + pub fn read_cpu(&self) -> i32 { + self.cpu.load(std::sync::atomic::Ordering::Relaxed) + } + pub fn read_budget(&self) -> u64 { + self.budget.load(std::sync::atomic::Ordering::Relaxed) + } + pub fn set_cpu(&self, cpu: i32) { + self.cpu.store(cpu, std::sync::atomic::Ordering::Relaxed); + } + pub fn set_budget(&self, budget: u64) { + self.budget + .store(budget, std::sync::atomic::Ordering::Relaxed); + } +} + +impl Default for TaskInfo { + fn default() -> Self { + Self { + cpu: Default::default(), + budget: AtomicU64::new(u64::MAX), + } + } +} #[derive(Clone)] pub struct ProcessInfo { @@ -30,7 +59,7 @@ pub struct ProcessInfo { pub tree_energy: f64, pub last_update: std::time::Instant, pub parent: Pid, - pub cpu: i32, + pub task_info: Arc<TaskInfo>, } pub struct EnergyService { @@ -40,7 +69,6 @@ pub struct EnergyService { // avoids unnecessary clone active_processes: BTreeSet<Pid>, process_info: Arc<RwLock<HashMap<Pid, ProcessInfo>>>, - shared_budgets: Arc<DashMap<Pid, u64>>, request_receiver: mpsc::Receiver<Request>, update_interval: Duration, shared_cpu_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>, @@ -57,7 +85,6 @@ impl EnergyService { estimator: Box<dyn Estimator>, budget_policy: Box<dyn BudgetPolicy>, process_info: Arc<RwLock<HashMap<Pid, ProcessInfo>>>, - shared_budgets: Arc<DashMap<Pid, u64>>, request_receiver: mpsc::Receiver<Request>, update_interval: Duration, shared_cpu_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>, @@ -69,7 +96,6 @@ impl EnergyService { budget_policy: Some(budget_policy), active_processes: BTreeSet::new(), process_info, - shared_budgets, request_receiver, update_interval, shared_cpu_frequency_ranges, @@ -108,7 +134,7 @@ impl EnergyService { fn handle_request(&mut self, request: Request) { match request { - Request::NewTask(pid, cpu) => { + Request::NewTask(pid, task_info) => { if self.process_info.read().unwrap().contains_key(&pid) { return; } @@ -125,14 +151,12 @@ impl EnergyService { tree_energy: 0., last_update: std::time::Instant::now(), parent, - cpu, + task_info: task_info.clone(), }, ); self.active_processes.insert(pid); - // Initialize with default budget - self.shared_budgets.insert(pid, u64::MAX); if !self.process_info.read().unwrap().contains_key(&parent) && parent != 0 { - self.handle_request(Request::NewTask(parent, cpu)); + self.handle_request(Request::NewTask(parent, task_info)); } } Request::RemoveTask(pid) => { @@ -143,7 +167,6 @@ impl EnergyService { self.estimator.stop_trace(pid as u64); self.process_info.write().unwrap().remove(&pid); self.process_info.write().unwrap().remove(&pid); - self.shared_budgets.remove(&pid); } } } @@ -159,7 +182,10 @@ impl EnergyService { for pid in &self.active_processes { let mut process_info = self.process_info.write().unwrap(); if let Some(info) = process_info.get_mut(&pid) { - if let Some(energy) = self.estimator.read_consumption(*pid as u64, info.cpu) { + if let Some(energy) = self + .estimator + .read_consumption(*pid as u64, info.task_info.read_cpu()) + { info.energy += energy * self.bias; let mut parent = info.parent; while let Some(info) = process_info.get_mut(&parent) { @@ -198,8 +224,8 @@ impl EnergyService { // Update the shared budgets map for (pid, budget) in budgets { - if let Some(mut entry) = self.shared_budgets.try_get_mut(&pid).try_unwrap() { - *entry = budget; + if let Some(entry) = self.process_info.write().unwrap().get(&pid) { + entry.task_info.set_budget(budget); } } } @@ -234,7 +260,6 @@ impl EnergyService { pub fn start_energy_service( use_mocking: bool, power_cap: u64, - shared_budgets: Arc<DashMap<Pid, u64>>, shared_cpu_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>, shared_policy_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>, shared_cpu_current_frequencies: Arc<RwLock<Vec<FrequencyKHZ>>>, @@ -262,7 +287,6 @@ pub fn start_energy_service( estimator, budget_policy, process_info.clone(), - shared_budgets, request_receiver, Duration::from_millis(50), // 50ms update interval shared_cpu_frequency_ranges, diff --git a/src/freq.rs b/src/freq.rs index 20a8619..2cfcf75 100644 --- a/src/freq.rs +++ b/src/freq.rs @@ -204,7 +204,9 @@ impl SysFSFrequencyService { self.cpu_descriptors = CPUDescriptors::new_range(&self.cpus).unwrap(); let ranges = self.get_freq_limits().unwrap(); self.frequency_ranges = ranges; - self.switch_governor("conservative").unwrap(); + if self.switch_governor("conservative").is_err() { + println!("failed to set governor to conservative"); + } loop { self.handle_requests().unwrap(); diff --git a/src/scheduler.rs b/src/scheduler.rs index 359b9da..e0530b5 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -1,10 +1,9 @@ use crate::bpf::*; -use crate::energy::{self, Request as EnergyRequest}; +use crate::energy::{self, Request as EnergyRequest, TaskInfo}; use crate::freq::{self, FrequencyKHZ, Request as FrequencyRequest}; use crate::e_core_selector::{ECoreSelector, RoundRobinSelector}; use anyhow::Result; -use dashmap::DashMap; use libbpf_rs::OpenObject; use scx_utils::{Topology, UserExitInfo}; @@ -12,6 +11,7 @@ use std::collections::{HashMap, VecDeque}; use std::mem::MaybeUninit; use std::ops::{Range, RangeInclusive}; use std::process; +use std::sync::atomic::{AtomicI32, AtomicU64}; use std::sync::mpsc::TrySendError; use std::sync::{mpsc, Arc, RwLock}; use std::time::{Duration, Instant}; @@ -20,11 +20,16 @@ use crate::Pid; const SLICE_US: u64 = 100000; +struct Task { + task_info: Arc<TaskInfo>, + last_scheduled: Instant, +} + pub struct Scheduler<'a> { bpf: BpfScheduler<'a>, task_queue: VecDeque<QueuedTask>, no_budget_task_queue: VecDeque<QueuedTask>, - managed_tasks: HashMap<Pid, Instant>, + managed_tasks: HashMap<Pid, Task>, maximum_budget: u64, own_pid: Pid, p_cores: Range<i32>, @@ -33,7 +38,6 @@ pub struct Scheduler<'a> { to_remove: Vec<Pid>, e_core_selector: Box<dyn ECoreSelector>, energy_sender: mpsc::SyncSender<EnergyRequest>, - shared_budgets: Arc<DashMap<Pid, u64>>, frequency_sender: mpsc::SyncSender<FrequencyRequest>, shared_cpu_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>, shared_policy_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>, @@ -62,14 +66,10 @@ impl<'a> Scheduler<'a> { let shared_cpu_current_frequencies: Arc<RwLock<Vec<FrequencyKHZ>>> = Arc::new(RwLock::new(Vec::new())); - // Shared budget map between energy service and scheduler - let shared_budgets: Arc<DashMap<Pid, u64>> = Arc::new(DashMap::with_capacity(100000)); - // Start energy tracking service let energy_sender = energy::start_energy_service( use_mocking, power_cap, - shared_budgets.clone(), shared_cpu_frequency_ranges.clone(), shared_policy_frequency_ranges.clone(), shared_cpu_current_frequencies.clone(), @@ -121,7 +121,6 @@ impl<'a> Scheduler<'a> { topology, e_core_selector: selector, energy_sender, - shared_budgets, to_remove, frequency_sender, shared_cpu_frequency_ranges, @@ -159,22 +158,29 @@ impl<'a> Scheduler<'a> { } // Check if we've seen this task before - if let std::collections::hash_map::Entry::Vacant(e) = self.managed_tasks.entry(task.pid) - { - e.insert(Instant::now()); - // New task - register it with the energy service - self.energy_sender - .try_send(EnergyRequest::NewTask(task.pid, task.cpu)) - .unwrap(); - } - - // Get current budget for this task - match self.shared_budgets.try_get(&task.pid) { - dashmap::try_result::TryResult::Present(budget) => match *budget { - 0 => self.no_budget_task_queue.push_back(task), - _ => self.task_queue.push_back(task), - }, - _ => self.task_queue.push_back(task), + match self.managed_tasks.entry(task.pid) { + std::collections::hash_map::Entry::Vacant(e) => { + // New task - register it with the energy service + let task_info = Arc::new(TaskInfo { + cpu: AtomicI32::new(task.cpu), + ..Default::default() + }); + e.insert(Task { + task_info: task_info.clone(), + last_scheduled: Instant::now(), + }); + self.energy_sender + .try_send(EnergyRequest::NewTask(task.pid, task_info)) + .unwrap(); + self.task_queue.push_back(task); + } + std::collections::hash_map::Entry::Occupied(e) => { + // Get current budget for this task + match e.get().task_info.read_budget() { + 0 => self.no_budget_task_queue.push_back(task), + _ => self.task_queue.push_back(task), + } + } } } } @@ -183,7 +189,12 @@ impl<'a> Scheduler<'a> { if let Some(task) = self.task_queue.pop_front() { let mut dispatched_task = DispatchedTask::new(&task); //TODO: do we have to migrate tasks back from e cores? - self.managed_tasks.insert(task.pid, Instant::now()); + if let Some(task_info) = self.managed_tasks.get_mut(&task.pid) { + task_info.last_scheduled = Instant::now(); + task_info.task_info.set_cpu(task.cpu); + } else { + println!("Tried to dispatch a task which is not part of managed tasks"); + } let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0); if cpu >= 0 { @@ -243,8 +254,8 @@ impl<'a> Scheduler<'a> { fn cleanup_old_tasks(&mut self) { let current = Instant::now(); - for (pid, last_scheduled) in &self.managed_tasks { - if current - *last_scheduled > Duration::from_secs(5) { + for (pid, task) in &self.managed_tasks { + if current - task.last_scheduled > Duration::from_secs(5) { self.to_remove.push(*pid); } } diff --git a/src/socket.rs b/src/socket.rs index 0b42abe..73118ce 100644 --- a/src/socket.rs +++ b/src/socket.rs @@ -9,8 +9,6 @@ use std::{ time::Duration, }; -use dashmap::DashMap; - use crate::{energy::ProcessInfo, Pid}; pub struct LoggingSocketService { |