summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDennis Kobert <dennis@kobert.dev>2025-03-31 16:06:00 +0200
committerDennis Kobert <dennis@kobert.dev>2025-03-31 16:06:00 +0200
commit5572834e86dc24284e8bd9e063936004adf02894 (patch)
tree9b763d2acf8dfd80687961a00dd0a95cb6462508 /src
parentfcd46995f4d02d19fc63568ff1fcdaf16e7129f3 (diff)
Use atomic for updating process state instead of sharing a hashmap
Diffstat (limited to 'src')
-rw-r--r--src/energy.rs58
-rw-r--r--src/freq.rs4
-rw-r--r--src/scheduler.rs67
-rw-r--r--src/socket.rs2
4 files changed, 83 insertions, 48 deletions
diff --git a/src/energy.rs b/src/energy.rs
index 02aaf62..91c77f7 100644
--- a/src/energy.rs
+++ b/src/energy.rs
@@ -6,6 +6,7 @@ mod trackers;
use crate::energy::estimator::Estimator;
use std::collections::{BTreeSet, HashMap};
use std::ops::RangeInclusive;
+use std::sync::atomic::{AtomicI32, AtomicU64};
use std::sync::{mpsc, Arc, RwLock};
use std::thread;
use std::time::Duration;
@@ -13,16 +14,44 @@ use std::time::Duration;
use crate::freq::FrequencyKHZ;
use crate::socket;
use crate::Pid;
-use dashmap::DashMap;
pub use budget::BudgetPolicy;
pub use trackers::{KernelDriver, PerfEstimator};
//TODO: add new command to update process_info or make process_info shared
pub enum Request {
- NewTask(Pid, i32),
+ NewTask(Pid, Arc<TaskInfo>),
RemoveTask(Pid),
}
+pub struct TaskInfo {
+ pub cpu: AtomicI32,
+ pub budget: AtomicU64,
+}
+
+impl TaskInfo {
+ pub fn read_cpu(&self) -> i32 {
+ self.cpu.load(std::sync::atomic::Ordering::Relaxed)
+ }
+ pub fn read_budget(&self) -> u64 {
+ self.budget.load(std::sync::atomic::Ordering::Relaxed)
+ }
+ pub fn set_cpu(&self, cpu: i32) {
+ self.cpu.store(cpu, std::sync::atomic::Ordering::Relaxed);
+ }
+ pub fn set_budget(&self, budget: u64) {
+ self.budget
+ .store(budget, std::sync::atomic::Ordering::Relaxed);
+ }
+}
+
+impl Default for TaskInfo {
+ fn default() -> Self {
+ Self {
+ cpu: Default::default(),
+ budget: AtomicU64::new(u64::MAX),
+ }
+ }
+}
#[derive(Clone)]
pub struct ProcessInfo {
@@ -30,7 +59,7 @@ pub struct ProcessInfo {
pub tree_energy: f64,
pub last_update: std::time::Instant,
pub parent: Pid,
- pub cpu: i32,
+ pub task_info: Arc<TaskInfo>,
}
pub struct EnergyService {
@@ -40,7 +69,6 @@ pub struct EnergyService {
// avoids unnecessary clone
active_processes: BTreeSet<Pid>,
process_info: Arc<RwLock<HashMap<Pid, ProcessInfo>>>,
- shared_budgets: Arc<DashMap<Pid, u64>>,
request_receiver: mpsc::Receiver<Request>,
update_interval: Duration,
shared_cpu_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>,
@@ -57,7 +85,6 @@ impl EnergyService {
estimator: Box<dyn Estimator>,
budget_policy: Box<dyn BudgetPolicy>,
process_info: Arc<RwLock<HashMap<Pid, ProcessInfo>>>,
- shared_budgets: Arc<DashMap<Pid, u64>>,
request_receiver: mpsc::Receiver<Request>,
update_interval: Duration,
shared_cpu_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>,
@@ -69,7 +96,6 @@ impl EnergyService {
budget_policy: Some(budget_policy),
active_processes: BTreeSet::new(),
process_info,
- shared_budgets,
request_receiver,
update_interval,
shared_cpu_frequency_ranges,
@@ -108,7 +134,7 @@ impl EnergyService {
fn handle_request(&mut self, request: Request) {
match request {
- Request::NewTask(pid, cpu) => {
+ Request::NewTask(pid, task_info) => {
if self.process_info.read().unwrap().contains_key(&pid) {
return;
}
@@ -125,14 +151,12 @@ impl EnergyService {
tree_energy: 0.,
last_update: std::time::Instant::now(),
parent,
- cpu,
+ task_info: task_info.clone(),
},
);
self.active_processes.insert(pid);
- // Initialize with default budget
- self.shared_budgets.insert(pid, u64::MAX);
if !self.process_info.read().unwrap().contains_key(&parent) && parent != 0 {
- self.handle_request(Request::NewTask(parent, cpu));
+ self.handle_request(Request::NewTask(parent, task_info));
}
}
Request::RemoveTask(pid) => {
@@ -143,7 +167,6 @@ impl EnergyService {
self.estimator.stop_trace(pid as u64);
self.process_info.write().unwrap().remove(&pid);
self.process_info.write().unwrap().remove(&pid);
- self.shared_budgets.remove(&pid);
}
}
}
@@ -159,7 +182,10 @@ impl EnergyService {
for pid in &self.active_processes {
let mut process_info = self.process_info.write().unwrap();
if let Some(info) = process_info.get_mut(&pid) {
- if let Some(energy) = self.estimator.read_consumption(*pid as u64, info.cpu) {
+ if let Some(energy) = self
+ .estimator
+ .read_consumption(*pid as u64, info.task_info.read_cpu())
+ {
info.energy += energy * self.bias;
let mut parent = info.parent;
while let Some(info) = process_info.get_mut(&parent) {
@@ -198,8 +224,8 @@ impl EnergyService {
// Update the shared budgets map
for (pid, budget) in budgets {
- if let Some(mut entry) = self.shared_budgets.try_get_mut(&pid).try_unwrap() {
- *entry = budget;
+ if let Some(entry) = self.process_info.write().unwrap().get(&pid) {
+ entry.task_info.set_budget(budget);
}
}
}
@@ -234,7 +260,6 @@ impl EnergyService {
pub fn start_energy_service(
use_mocking: bool,
power_cap: u64,
- shared_budgets: Arc<DashMap<Pid, u64>>,
shared_cpu_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>,
shared_policy_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>,
shared_cpu_current_frequencies: Arc<RwLock<Vec<FrequencyKHZ>>>,
@@ -262,7 +287,6 @@ pub fn start_energy_service(
estimator,
budget_policy,
process_info.clone(),
- shared_budgets,
request_receiver,
Duration::from_millis(50), // 50ms update interval
shared_cpu_frequency_ranges,
diff --git a/src/freq.rs b/src/freq.rs
index 20a8619..2cfcf75 100644
--- a/src/freq.rs
+++ b/src/freq.rs
@@ -204,7 +204,9 @@ impl SysFSFrequencyService {
self.cpu_descriptors = CPUDescriptors::new_range(&self.cpus).unwrap();
let ranges = self.get_freq_limits().unwrap();
self.frequency_ranges = ranges;
- self.switch_governor("conservative").unwrap();
+ if self.switch_governor("conservative").is_err() {
+ println!("failed to set governor to conservative");
+ }
loop {
self.handle_requests().unwrap();
diff --git a/src/scheduler.rs b/src/scheduler.rs
index 359b9da..e0530b5 100644
--- a/src/scheduler.rs
+++ b/src/scheduler.rs
@@ -1,10 +1,9 @@
use crate::bpf::*;
-use crate::energy::{self, Request as EnergyRequest};
+use crate::energy::{self, Request as EnergyRequest, TaskInfo};
use crate::freq::{self, FrequencyKHZ, Request as FrequencyRequest};
use crate::e_core_selector::{ECoreSelector, RoundRobinSelector};
use anyhow::Result;
-use dashmap::DashMap;
use libbpf_rs::OpenObject;
use scx_utils::{Topology, UserExitInfo};
@@ -12,6 +11,7 @@ use std::collections::{HashMap, VecDeque};
use std::mem::MaybeUninit;
use std::ops::{Range, RangeInclusive};
use std::process;
+use std::sync::atomic::{AtomicI32, AtomicU64};
use std::sync::mpsc::TrySendError;
use std::sync::{mpsc, Arc, RwLock};
use std::time::{Duration, Instant};
@@ -20,11 +20,16 @@ use crate::Pid;
const SLICE_US: u64 = 100000;
+struct Task {
+ task_info: Arc<TaskInfo>,
+ last_scheduled: Instant,
+}
+
pub struct Scheduler<'a> {
bpf: BpfScheduler<'a>,
task_queue: VecDeque<QueuedTask>,
no_budget_task_queue: VecDeque<QueuedTask>,
- managed_tasks: HashMap<Pid, Instant>,
+ managed_tasks: HashMap<Pid, Task>,
maximum_budget: u64,
own_pid: Pid,
p_cores: Range<i32>,
@@ -33,7 +38,6 @@ pub struct Scheduler<'a> {
to_remove: Vec<Pid>,
e_core_selector: Box<dyn ECoreSelector>,
energy_sender: mpsc::SyncSender<EnergyRequest>,
- shared_budgets: Arc<DashMap<Pid, u64>>,
frequency_sender: mpsc::SyncSender<FrequencyRequest>,
shared_cpu_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>,
shared_policy_frequency_ranges: Arc<RwLock<Vec<RangeInclusive<FrequencyKHZ>>>>,
@@ -62,14 +66,10 @@ impl<'a> Scheduler<'a> {
let shared_cpu_current_frequencies: Arc<RwLock<Vec<FrequencyKHZ>>> =
Arc::new(RwLock::new(Vec::new()));
- // Shared budget map between energy service and scheduler
- let shared_budgets: Arc<DashMap<Pid, u64>> = Arc::new(DashMap::with_capacity(100000));
-
// Start energy tracking service
let energy_sender = energy::start_energy_service(
use_mocking,
power_cap,
- shared_budgets.clone(),
shared_cpu_frequency_ranges.clone(),
shared_policy_frequency_ranges.clone(),
shared_cpu_current_frequencies.clone(),
@@ -121,7 +121,6 @@ impl<'a> Scheduler<'a> {
topology,
e_core_selector: selector,
energy_sender,
- shared_budgets,
to_remove,
frequency_sender,
shared_cpu_frequency_ranges,
@@ -159,22 +158,29 @@ impl<'a> Scheduler<'a> {
}
// Check if we've seen this task before
- if let std::collections::hash_map::Entry::Vacant(e) = self.managed_tasks.entry(task.pid)
- {
- e.insert(Instant::now());
- // New task - register it with the energy service
- self.energy_sender
- .try_send(EnergyRequest::NewTask(task.pid, task.cpu))
- .unwrap();
- }
-
- // Get current budget for this task
- match self.shared_budgets.try_get(&task.pid) {
- dashmap::try_result::TryResult::Present(budget) => match *budget {
- 0 => self.no_budget_task_queue.push_back(task),
- _ => self.task_queue.push_back(task),
- },
- _ => self.task_queue.push_back(task),
+ match self.managed_tasks.entry(task.pid) {
+ std::collections::hash_map::Entry::Vacant(e) => {
+ // New task - register it with the energy service
+ let task_info = Arc::new(TaskInfo {
+ cpu: AtomicI32::new(task.cpu),
+ ..Default::default()
+ });
+ e.insert(Task {
+ task_info: task_info.clone(),
+ last_scheduled: Instant::now(),
+ });
+ self.energy_sender
+ .try_send(EnergyRequest::NewTask(task.pid, task_info))
+ .unwrap();
+ self.task_queue.push_back(task);
+ }
+ std::collections::hash_map::Entry::Occupied(e) => {
+ // Get current budget for this task
+ match e.get().task_info.read_budget() {
+ 0 => self.no_budget_task_queue.push_back(task),
+ _ => self.task_queue.push_back(task),
+ }
+ }
}
}
}
@@ -183,7 +189,12 @@ impl<'a> Scheduler<'a> {
if let Some(task) = self.task_queue.pop_front() {
let mut dispatched_task = DispatchedTask::new(&task);
//TODO: do we have to migrate tasks back from e cores?
- self.managed_tasks.insert(task.pid, Instant::now());
+ if let Some(task_info) = self.managed_tasks.get_mut(&task.pid) {
+ task_info.last_scheduled = Instant::now();
+ task_info.task_info.set_cpu(task.cpu);
+ } else {
+ println!("Tried to dispatch a task which is not part of managed tasks");
+ }
let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0);
if cpu >= 0 {
@@ -243,8 +254,8 @@ impl<'a> Scheduler<'a> {
fn cleanup_old_tasks(&mut self) {
let current = Instant::now();
- for (pid, last_scheduled) in &self.managed_tasks {
- if current - *last_scheduled > Duration::from_secs(5) {
+ for (pid, task) in &self.managed_tasks {
+ if current - task.last_scheduled > Duration::from_secs(5) {
self.to_remove.push(*pid);
}
}
diff --git a/src/socket.rs b/src/socket.rs
index 0b42abe..73118ce 100644
--- a/src/socket.rs
+++ b/src/socket.rs
@@ -9,8 +9,6 @@ use std::{
time::Duration,
};
-use dashmap::DashMap;
-
use crate::{energy::ProcessInfo, Pid};
pub struct LoggingSocketService {