summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDennis Kobert <dennis@kobert.dev>2025-03-09 13:15:13 +0100
committerDennis Kobert <dennis@kobert.dev>2025-03-09 13:15:13 +0100
commitb720a1253fb8cc8a3ba3454014bad239233368c1 (patch)
treef1f7fa25edd5c47e3c30c4b984fa9d557409c7d7
parent7b2ec191751489157a9e3193b8aeaabd72fae14a (diff)
Restructure project
-rw-r--r--flake.nix1
-rw-r--r--src/energy.rs174
-rw-r--r--src/energy/budget.rs49
-rw-r--r--src/energy/estimator.rs5
-rw-r--r--src/energy/trackers.rs13
-rw-r--r--src/energy/trackers/kernel.rs49
-rw-r--r--src/energy/trackers/mock.rs14
-rw-r--r--src/energy/trackers/perf.rs (renamed from src/mock/perf.rs)51
-rw-r--r--src/main.rs315
-rw-r--r--src/mock.rs65
-rw-r--r--src/scheduler.rs223
-rw-r--r--src/task_state.rs7
12 files changed, 568 insertions, 398 deletions
diff --git a/flake.nix b/flake.nix
index 7cdb63d..863c0ea 100644
--- a/flake.nix
+++ b/flake.nix
@@ -74,6 +74,7 @@
packages = with pkgs; [
cargo-watch
cargo-expand
+ samply
just
];
diff --git a/src/energy.rs b/src/energy.rs
new file mode 100644
index 0000000..c8da2cf
--- /dev/null
+++ b/src/energy.rs
@@ -0,0 +1,174 @@
+mod budget;
+mod trackers;
+
+use std::collections::{BTreeSet, HashMap};
+use std::sync::{mpsc, Arc};
+use std::thread;
+use std::time::Duration;
+
+use crate::Pid;
+use anyhow::Result;
+use dashmap::DashMap;
+
+pub use budget::BudgetPolicy;
+pub use trackers::{Estimator, KernelDriver, MockEstimator, PerfEstimator};
+
+pub enum Request {
+ NewTask(Pid),
+ RemoveTask(Pid),
+}
+
+pub struct ProcessInfo {
+ energy: u64,
+ last_update: std::time::Instant,
+ parent: Pid,
+}
+
+pub struct EnergyService {
+ estimator: Box<dyn Estimator>,
+ budget_policy: Box<dyn BudgetPolicy>,
+ active_processes: BTreeSet<Pid>,
+ process_info: HashMap<Pid, ProcessInfo>,
+ shared_budgets: Arc<DashMap<Pid, u64>>,
+ request_receiver: mpsc::Receiver<Request>,
+ update_interval: Duration,
+}
+
+impl EnergyService {
+ pub fn new(
+ estimator: Box<dyn Estimator>,
+ budget_policy: Box<dyn BudgetPolicy>,
+ shared_budgets: Arc<DashMap<Pid, u64>>,
+ request_receiver: mpsc::Receiver<Request>,
+ update_interval: Duration,
+ ) -> Self {
+ Self {
+ estimator,
+ budget_policy,
+ active_processes: BTreeSet::new(),
+ process_info: HashMap::new(),
+ shared_budgets,
+ request_receiver,
+ update_interval,
+ }
+ }
+
+ pub fn run(mut self) {
+ thread::spawn(move || {
+ loop {
+ // Process any incoming requests
+ self.handle_requests();
+
+ // Update energy measurements
+ self.update_measurements();
+
+ // Calculate and update budgets
+ self.update_budgets();
+
+ // Sleep for update interval
+ thread::sleep(self.update_interval);
+ }
+ });
+ }
+
+ fn handle_requests(&mut self) {
+ while let Ok(request) = self.request_receiver.try_recv() {
+ match request {
+ Request::NewTask(pid) => {
+ self.estimator.start_trace(pid as u64);
+ self.active_processes.insert(pid);
+ let parent = (|| {
+ let process = procfs::process::Process::new(pid)?;
+ process.stat().map(|stat| stat.ppid)
+ })()
+ .unwrap_or_default();
+ self.process_info.insert(
+ pid,
+ ProcessInfo {
+ energy: 0,
+ last_update: std::time::Instant::now(),
+ parent,
+ },
+ );
+ // Initialize with default budget
+ self.shared_budgets.insert(pid, u64::MAX);
+ }
+ Request::RemoveTask(pid) => {
+ self.estimator.stop_trace(pid as u64);
+ self.active_processes.remove(&pid);
+ self.process_info.remove(&pid);
+ self.shared_budgets.remove(&pid);
+ }
+ }
+ }
+ }
+
+ fn update_measurements(&mut self) {
+ for &pid in &self.active_processes {
+ let energy = self.estimator.read_consumption(pid as u64);
+ if let Some(info) = self.process_info.get_mut(&pid) {
+ info.energy = energy;
+ info.last_update = std::time::Instant::now();
+ }
+ }
+ }
+
+ fn update_budgets(&mut self) {
+ let budgets = self.budget_policy.calculate_budgets(self);
+
+ // Update the shared budgets map
+ for (pid, budget) in budgets {
+ if let Some(mut entry) = self.shared_budgets.try_get_mut(&pid).try_unwrap() {
+ *entry = budget;
+ }
+ }
+ }
+
+ // Accessor methods for BudgetPolicy
+ pub fn active_processes(&self) -> &BTreeSet<Pid> {
+ &self.active_processes
+ }
+
+ pub fn process_energy(&self, pid: Pid) -> Option<u64> {
+ self.process_info.get(&pid).map(|info| info.energy)
+ }
+
+ pub fn all_process_energies(&self) -> HashMap<Pid, u64> {
+ self.process_info
+ .iter()
+ .map(|(&pid, info)| (pid, info.energy))
+ .collect()
+ }
+}
+
+pub fn start_energy_service(
+ use_mocking: bool,
+ power_cap: u64,
+ shared_budgets: Arc<DashMap<Pid, u64>>,
+) -> mpsc::SyncSender<Request> {
+ // Potentially convert back to bounded channel
+ let (request_sender, request_receiver) = mpsc::sync_channel(10000);
+
+ // Create the appropriate estimator based on configuration
+ let estimator: Box<dyn Estimator> = if use_mocking {
+ Box::new(PerfEstimator::default())
+ } else {
+ Box::new(KernelDriver::default())
+ };
+
+ // Create budget policy
+ let budget_policy = Box::new(budget::SimpleCappingPolicy::new(power_cap));
+
+ // Create and start the energy service
+ let service = EnergyService::new(
+ estimator,
+ budget_policy,
+ shared_budgets,
+ request_receiver,
+ Duration::from_millis(50), // 50ms update interval
+ );
+
+ service.run();
+
+ request_sender
+}
diff --git a/src/energy/budget.rs b/src/energy/budget.rs
new file mode 100644
index 0000000..aada2eb
--- /dev/null
+++ b/src/energy/budget.rs
@@ -0,0 +1,49 @@
+use std::collections::HashMap;
+
+use crate::energy::EnergyService;
+
+type Pid = i32;
+
+pub trait BudgetPolicy: Send + 'static {
+ fn calculate_budgets(&self, energy_service: &EnergyService) -> HashMap<Pid, u64>;
+}
+
+pub struct SimpleCappingPolicy {
+ power_cap: u64,
+}
+
+impl SimpleCappingPolicy {
+ pub fn new(power_cap: u64) -> Self {
+ Self { power_cap }
+ }
+}
+
+impl BudgetPolicy for SimpleCappingPolicy {
+ fn calculate_budgets(&self, energy_service: &EnergyService) -> HashMap<Pid, u64> {
+ let mut budgets = HashMap::new();
+ let process_energies = energy_service.all_process_energies();
+
+ // Total energy consumption across all processes
+ let total_energy: u64 = process_energies.values().sum();
+
+ // Simple proportional distribution if over cap
+ if total_energy > self.power_cap {
+ let ratio = self.power_cap as f64 / total_energy as f64;
+
+ for (&pid, &energy) in &process_energies {
+ // Calculate a scaled budget based on the ratio
+ // Higher energy consumers get proportionally reduced budgets
+ let scaling_factor = 1.0 - ((energy as f64 / total_energy as f64) * (1.0 - ratio));
+ let budget = (u64::MAX as f64 * scaling_factor) as u64;
+ budgets.insert(pid, budget);
+ }
+ } else {
+ // Under power cap, assign maximum budget to all
+ for &pid in energy_service.active_processes() {
+ budgets.insert(pid, u64::MAX);
+ }
+ }
+
+ budgets
+ }
+}
diff --git a/src/energy/estimator.rs b/src/energy/estimator.rs
new file mode 100644
index 0000000..f7f1e5d
--- /dev/null
+++ b/src/energy/estimator.rs
@@ -0,0 +1,5 @@
+pub trait Estimator: Send + 'static {
+ fn start_trace(&mut self, pid: u64);
+ fn stop_trace(&mut self, pid: u64);
+ fn read_consumption(&mut self, pid: u64) -> u64;
+}
diff --git a/src/energy/trackers.rs b/src/energy/trackers.rs
new file mode 100644
index 0000000..592c926
--- /dev/null
+++ b/src/energy/trackers.rs
@@ -0,0 +1,13 @@
+mod kernel;
+mod mock;
+mod perf;
+
+pub use kernel::*;
+pub use mock::*;
+pub use perf::*;
+
+pub trait Estimator: Send + 'static {
+ fn start_trace(&mut self, pid: u64);
+ fn stop_trace(&mut self, pid: u64);
+ fn read_consumption(&mut self, pid: u64) -> u64;
+}
diff --git a/src/energy/trackers/kernel.rs b/src/energy/trackers/kernel.rs
new file mode 100644
index 0000000..050805c
--- /dev/null
+++ b/src/energy/trackers/kernel.rs
@@ -0,0 +1,49 @@
+// energy/trackers/kernel.rs
+use iocuddle::*;
+use std::fs::File;
+use std::path::Path;
+
+use crate::energy::Estimator;
+
+#[derive(Debug)]
+pub struct KernelDriver {
+ file: File,
+}
+
+impl Default for KernelDriver {
+ fn default() -> Self {
+ KernelDriver::new("/dev/rust-pow")
+ .unwrap_or_else(|_| panic!("Failed to open kernel driver. Is the module loaded?"))
+ }
+}
+
+impl KernelDriver {
+ pub fn new(path: impl AsRef<Path>) -> Result<Self, std::io::Error> {
+ let file = File::open(path)?;
+ Ok(Self { file })
+ }
+}
+
+const PERF_MON: Group = Group::new(b'|');
+const START_TRACE: Ioctl<Write, &u64> = unsafe { PERF_MON.write(0x80) };
+const STOP_TRACE: Ioctl<Write, &u64> = unsafe { PERF_MON.write(0x81) };
+const READ_POWER: Ioctl<WriteRead, &u64> = unsafe { PERF_MON.write_read(0x82) };
+
+impl Estimator for KernelDriver {
+ fn start_trace(&mut self, pid: u64) {
+ let _ = START_TRACE.ioctl(&mut self.file, &pid);
+ }
+
+ fn stop_trace(&mut self, pid: u64) {
+ let _ = STOP_TRACE.ioctl(&mut self.file, &pid);
+ }
+
+ fn read_consumption(&mut self, pid: u64) -> u64 {
+ let mut arg = pid;
+ if let Ok(_) = READ_POWER.ioctl(&mut self.file, &mut arg) {
+ arg
+ } else {
+ 0
+ }
+ }
+}
diff --git a/src/energy/trackers/mock.rs b/src/energy/trackers/mock.rs
new file mode 100644
index 0000000..eb23421
--- /dev/null
+++ b/src/energy/trackers/mock.rs
@@ -0,0 +1,14 @@
+use super::Estimator;
+
+#[derive(Default)]
+pub struct MockEstimator;
+
+impl Estimator for MockEstimator {
+ fn start_trace(&mut self, _pid: u64) {}
+
+ fn stop_trace(&mut self, _pid: u64) {}
+
+ fn read_consumption(&mut self, _pid: u64) -> u64 {
+ 14
+ }
+}
diff --git a/src/mock/perf.rs b/src/energy/trackers/perf.rs
index 3dde836..19ec0be 100644
--- a/src/mock/perf.rs
+++ b/src/energy/trackers/perf.rs
@@ -1,8 +1,12 @@
+// energy/trackers/perf.rs
use std::collections::HashMap;
-use events::Event;
-use perf_event::events::Hardware;
-use perf_event::*;
+use perf_event::{
+ events::{Event, Hardware},
+ Builder, Counter, Group,
+};
+
+use crate::energy::Estimator;
#[derive(Default)]
pub struct PerfEstimator {
@@ -19,24 +23,34 @@ static EVENT_TYPES: &[(f32, Event)] = &[
(2.0, Event::Hardware(Hardware::INSTRUCTIONS)),
];
-impl super::KernelModule for PerfEstimator {
+impl Estimator for PerfEstimator {
fn start_trace(&mut self, pid: u64) {
let Ok(mut group) = Group::new() else {
- println!("failed to create Group");
+ eprintln!("Failed to create performance counter group for PID {}", pid);
return;
};
- let counters = EVENT_TYPES
+
+ let counters: Result<Vec<_>, _> = EVENT_TYPES
.iter()
- .map(|(_, kind)| {
- Builder::new()
- .group(&mut group)
- .kind(kind.clone())
- .build()
- .unwrap()
- })
+ .map(|(_, kind)| Builder::new().group(&mut group).kind(kind.clone()).build())
.collect();
- group.enable().unwrap();
+ let counters = match counters {
+ Ok(counters) => counters,
+ Err(e) => {
+ eprintln!(
+ "Failed to create performance counter group for PID {}: {}",
+ pid, e
+ );
+ return;
+ }
+ };
+
+ if let Err(e) = group.enable() {
+ eprintln!("Failed to enable performance counters: {}", e);
+ return;
+ }
+
let counters = Counters { counters, group };
self.registry.insert(pid, counters);
}
@@ -49,12 +63,17 @@ impl super::KernelModule for PerfEstimator {
let Some(counters) = self.registry.get_mut(&pid) else {
return 0;
};
+
+ let counts = match counters.group.read() {
+ Ok(counts) => counts,
+ Err(_) => return 0,
+ };
+
let mut sum = 0;
- let counts = counters.group.read().unwrap();
for ((factor, _ty), count) in EVENT_TYPES.iter().zip(counts.iter()) {
sum += *factor as u64 * count.1;
}
- // dbg!(sum);
+
sum
}
}
diff --git a/src/main.rs b/src/main.rs
index c477a0f..0bf1c12 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,324 +3,19 @@ pub use bpf_skel::*;
pub mod bpf_intf;
mod e_core_selector;
-mod mock;
-mod task_state;
+mod energy;
+mod scheduler;
#[rustfmt::skip]
mod bpf;
-use bpf::*;
+use anyhow::Result;
use clap::{Arg, ArgAction, Command};
-use dashmap::DashMap;
-use e_core_selector::{ECoreSelector, RoundRobinSelector};
-use mock::perf::PerfEstimator;
-use scx_utils::{Topology, UserExitInfo};
-
-use libbpf_rs::OpenObject;
-use mock::{KernelDriver, KernelModule, MockModule};
-use task_state::TaskState;
-
-use std::any::Any;
-use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
-use std::fs::File;
-use std::io::{BufRead, BufReader};
+use scheduler::Scheduler;
use std::mem::MaybeUninit;
-use std::ops::Range;
-use std::sync::mpsc::{Receiver, Sender, SyncSender};
-use std::sync::{Arc, OnceLock};
-use std::time::Duration;
-use std::{process, thread};
-
-use anyhow::Result;
type Pid = i32;
-const SLICE_US: u64 = 50000;
-
-struct Scheduler<'a> {
- bpf: BpfScheduler<'a>,
- task_queue: VecDeque<QueuedTask>,
- no_budget_task_queue: VecDeque<QueuedTask>,
- managed_tasks: HashMap<Pid, TaskState>,
- maximum_budget: u64,
- power_cap: u64,
- own_pid: Pid,
- p_cores: Range<i32>,
- e_cores: Range<i32>,
- topology: Topology,
- e_core_selector: Box<dyn ECoreSelector>,
- // reciever: Receiver<(Pid, Response)>,
- sender: SyncSender<(Pid, Request)>,
- hashmap: Arc<DashMap<Pid, (u64, i32)>>,
-}
-
-enum Request {
- NewTask,
- RemoveTask,
-}
-
-impl<'a> Scheduler<'a> {
- fn init(
- open_object: &'a mut MaybeUninit<OpenObject>,
- use_mocking: bool,
- power_cap: u64,
- ) -> Result<Self> {
- let bpf = BpfScheduler::init(
- open_object,
- 0, // exit_dump_len (buffer size of exit info, 0 = default)
- false, // partial (false = include all tasks)
- false, // debug (false = debug mode off)
- )?;
- dbg!("registering rust user space scheduler");
- let map: DashMap<i32, (u64, i32)> = DashMap::with_capacity(100000);
- let map = Arc::new(map);
- let thread_map = map.clone();
-
- let topology = Topology::new().unwrap();
-
- let read_cores = |core_type: &str| {
- let e_cores_file = File::open(format!("/sys/devices/cpu_{core_type}/cpus"))?;
- let e_cores_reader = BufReader::new(e_cores_file);
- let e_cores = if let Some(Ok(line)) = e_cores_reader.lines().next() {
- let cores: Vec<&str> = line.split('-').collect();
- cores[0].parse::<i32>().unwrap()..cores[1].parse::<i32>().unwrap()
- } else {
- panic!(
- "Was not able to differentiate between core types. Does the system have e cores?"
- )
- };
- Ok::<_, std::io::Error>(e_cores)
- };
- let e_cores = read_cores("atom").unwrap_or(0..4);
- let p_cores = read_cores("core").unwrap_or(4..8);
-
- let selector = Box::new(RoundRobinSelector::new(&e_cores));
-
- let (request_send, request_recieve) = std::sync::mpsc::sync_channel(1000);
-
- std::thread::spawn(move || {
- let mut module: Box<dyn KernelModule> = if use_mocking {
- Box::new(PerfEstimator::default())
- // Box::new(MockModule::default())
- } else {
- Box::new(KernelDriver::default())
- };
- let mut tasks = BTreeSet::new();
-
- loop {
- for pid in tasks.iter() {
- let energy = module.read_consumption(*pid as u64);
- if let Some(mut entry) = thread_map.try_get_mut(pid).try_unwrap() {
- entry.0 = energy;
- }
- }
-
- if let Ok((pid, request)) = request_recieve.try_recv() {
- match request {
- Request::NewTask => {
- let parent = (|| {
- let process = procfs::process::Process::new(pid)?;
- process.stat().map(|stat| stat.ppid)
- })()
- .unwrap_or_default();
- module.start_trace(pid as u64);
- tasks.insert(pid);
- thread_map.insert(pid, (0, parent));
- }
- Request::RemoveTask => {
- tasks.remove(&pid);
- module.stop_trace(pid as u64)
- }
- }
- }
- std::thread::sleep(Duration::from_micros(5000));
- }
- });
-
- Ok(Self {
- bpf,
- task_queue: VecDeque::new(),
- no_budget_task_queue: VecDeque::new(),
- managed_tasks: HashMap::new(),
- maximum_budget: u64::MAX,
- power_cap,
- own_pid: process::id() as i32,
- p_cores,
- e_cores,
- topology,
- e_core_selector: selector,
- sender: request_send,
- // reciever: response_recieve,
- hashmap: map,
- })
- }
-
- fn consume_all_tasks(&mut self) {
- // Consume all tasks that are ready to run.
- //
- // Each task contains the following details:
- //
- // pub struct QueuedTask {
- // pub pid: i32, // pid that uniquely identifies a task
- // pub cpu: i32, // CPU where the task is running
- // pub sum_exec_runtime: u64, // Total cpu time
- // pub weight: u64, // Task static priority
- // pub nvcsw: u64, // Total amount of voluntary context switches
- // pub slice: u64, // Remaining time slice budget
- // pub vtime: u64, // Current task vruntime / deadline (set by the scheduler)
- // }
- //
- // Although the FIFO scheduler doesn't use these fields, they can provide valuable data for
- // implementing more sophisticated scheduling policies.
- // let mut contended = Vec::with_capacity(100);
-
- while let Ok(Some(task)) = self.bpf.dequeue_task() {
- // The scheduler itself has to be scheduled regardless of its energy usage
- if task.pid == self.own_pid {
- self.task_queue.push_front(task);
- self.dispatch_next_task();
- continue;
- }
-
- match self.hashmap.try_get(&task.pid) {
- dashmap::try_result::TryResult::Present(value) => {
- let (energy, parent) = *value;
-
- // dbg!(energy);
- // TODO: fix
- if energy == 0 {
- // println!("budget zero");
- self.no_budget_task_queue.push_back(task);
- } else {
- self.task_queue.push_back(task);
- }
- }
- dashmap::try_result::TryResult::Absent => {
- self.hashmap.insert(task.pid, (0, 0));
- self.sender.try_send((task.pid, Request::NewTask)).unwrap();
- self.task_queue.push_back(task);
- }
- dashmap::try_result::TryResult::Locked => {
- println!("locked");
- self.task_queue.push_back(task);
- }
- }
- }
- }
-
- fn dispatch_next_task(&mut self) {
- if let Some(task) = self.task_queue.pop_front() {
- // Create a new task to be dispatched, derived from the received enqueued task.
- //
- // pub struct DispatchedTask {
- // pub pid: i32, // pid that uniquely identifies a task
- // pub cpu: i32, // target CPU selected by the scheduler
- // pub flags: u64, // special dispatch flags
- // pub slice_ns: u64, // time slice assigned to the task (0 = default)
- // }
- //
- // The dispatched task's information are pre-populated from the QueuedTask and they can
- // be modified before dispatching it via self.bpf.dispatch_task().
- let mut dispatched_task = DispatchedTask::new(&task);
-
- // Decide where the task needs to run (target CPU).
- //
- // A call to select_cpu() will return the most suitable idle CPU for the task,
- // considering its previously used CPU.
- let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0);
- if cpu >= 0 {
- dispatched_task.cpu = cpu;
- } else {
- dispatched_task.flags |= RL_CPU_ANY as u64;
- }
-
- // Decide for how long the task needs to run (time slice); if not specified
- // SCX_SLICE_DFL will be used by default.
- dispatched_task.slice_ns = SLICE_US;
-
- // Dispatch the task on the target CPU.
- self.bpf.dispatch_task(&dispatched_task).unwrap();
-
- // Notify the BPF component of the number of pending tasks and immediately give a
- // chance to run to the dispatched task.
- self.bpf.notify_complete(
- self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64,
- );
- } else if let Some(task) = self.no_budget_task_queue.pop_front() {
- //TODO: do something with tasks that have no budget left
- let mut dispatched_task = DispatchedTask::new(&task);
-
- let cpu = self.e_core_selector.next_core();
- if cpu >= 0 {
- dispatched_task.cpu = cpu;
- } else {
- dispatched_task.flags |= RL_CPU_ANY as u64;
- }
-
- if task.pid == self.own_pid {
- dispatched_task.slice_ns = SLICE_US * 1000;
- }
- dispatched_task.slice_ns = SLICE_US;
- self.bpf.dispatch_task(&dispatched_task).unwrap();
- self.bpf.notify_complete(
- self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64,
- );
- }
- }
-
- fn dispatch_tasks(&mut self) {
- //TODO: we should probably not do this every time, but instead wait a predefined amount of time between invocations
- self.reset_budgets_and_garbage_collect();
-
- loop {
- // Consume all tasks before dispatching any.
- self.consume_all_tasks();
-
- // Dispatch one task from the queue.
- self.dispatch_next_task();
-
- // If no task is ready to run (or in case of error), stop dispatching tasks and notify
- // the BPF component that all tasks have been scheduled / dispatched, with no remaining
- // pending tasks.
- //TODO: both queues?
- if self.task_queue.is_empty() && self.no_budget_task_queue.is_empty() {
- self.bpf.notify_complete(0);
-
- break;
- }
- }
- }
-
- fn run(&mut self) -> Result<UserExitInfo> {
- while !self.bpf.exited() {
- self.dispatch_tasks();
- }
- self.bpf.shutdown_and_report()
- }
-
- fn get_new_budget(&self) -> u64 {
- //TODO
- u64::MAX
- }
-
- fn reset_budgets_and_garbage_collect(&mut self) {
- let old_budget = self.maximum_budget;
- self.maximum_budget = self.get_new_budget();
-
- self.managed_tasks.retain(|key, value| {
- let was_scheduled = value.budget == old_budget;
- if was_scheduled {
- value.budget = self.maximum_budget;
- } else {
- self.sender
- .try_send(({ *key }, Request::RemoveTask))
- .unwrap();
- }
- was_scheduled
- });
- }
-}
-
fn main() -> Result<()> {
let matches = Command::new("Energy User Space Scheduler")
.arg(
@@ -343,7 +38,7 @@ fn main() -> Result<()> {
let power_cap = *matches.get_one::<u64>("power_cap").unwrap_or(&u64::MAX);
let use_mocking = matches.get_flag("mock");
- // Initialize and load the FIFO scheduler.
+ // Initialize and load the scheduler.
let mut open_object = MaybeUninit::uninit();
loop {
let mut sched = Scheduler::init(&mut open_object, use_mocking, power_cap)?;
diff --git a/src/mock.rs b/src/mock.rs
deleted file mode 100644
index fc6fcf1..0000000
--- a/src/mock.rs
+++ /dev/null
@@ -1,65 +0,0 @@
-use iocuddle::*;
-use rand::Rng;
-pub mod perf;
-
-pub trait KernelModule {
- fn start_trace(&mut self, _pid: u64) {}
- fn stop_trace(&mut self, _pid: u64) {}
- fn read_consumption(&mut self, pid: u64) -> u64;
-}
-
-#[derive(Default)]
-pub struct MockModule;
-
-impl KernelModule for MockModule {
- fn start_trace(&mut self, pid: u64) {
- // println!("starting trace of {pid}");
- }
-
- fn stop_trace(&mut self, pid: u64) {
- // println!("stopping trace of {pid}");
- }
-
- fn read_consumption(&mut self, _pid: u64) -> u64 {
- // rand::rng().random()
- 14
- }
-}
-
-#[derive(Debug)]
-pub struct KernelDriver {
- file: std::fs::File,
-}
-
-impl Default for KernelDriver {
- fn default() -> Self {
- KernelDriver::new("/dev/rust-pow").unwrap()
- }
-}
-impl KernelDriver {
- fn new(path: impl AsRef<std::path::Path>) -> Result<Self, std::io::Error> {
- let file = std::fs::File::open(path)?;
- Ok(Self { file })
- }
-}
-
-const PERF_MON: Group = Group::new(b'|');
-const START_TRACE: Ioctl<Write, &u64> = unsafe { PERF_MON.write(0x80) };
-const STOP_TRACE: Ioctl<Write, &u64> = unsafe { PERF_MON.write(0x81) };
-const READ_POWER: Ioctl<WriteRead, &u64> = unsafe { PERF_MON.write_read(0x82) };
-
-impl KernelModule for KernelDriver {
- fn start_trace(&mut self, pid: u64) {
- START_TRACE.ioctl(&mut self.file, &pid).unwrap();
- }
-
- fn stop_trace(&mut self, pid: u64) {
- STOP_TRACE.ioctl(&mut self.file, &pid).unwrap();
- }
-
- fn read_consumption(&mut self, pid: u64) -> u64 {
- let mut arg = pid;
- READ_POWER.ioctl(&mut self.file, &mut arg).unwrap();
- arg
- }
-}
diff --git a/src/scheduler.rs b/src/scheduler.rs
new file mode 100644
index 0000000..6188da1
--- /dev/null
+++ b/src/scheduler.rs
@@ -0,0 +1,223 @@
+use crate::bpf::*;
+use crate::energy::{self, Request as EnergyRequest};
+
+use crate::e_core_selector::{ECoreSelector, RoundRobinSelector};
+use anyhow::Result;
+use dashmap::DashMap;
+use libbpf_rs::OpenObject;
+use scx_utils::{Topology, UserExitInfo};
+
+use std::collections::{HashMap, HashSet, VecDeque};
+use std::fs::File;
+use std::io::{BufRead, BufReader};
+use std::mem::MaybeUninit;
+use std::ops::Range;
+use std::process;
+use std::sync::{mpsc, Arc};
+use std::time::{Duration, Instant};
+
+use crate::Pid;
+
+const SLICE_US: u64 = 50000;
+
+pub struct Scheduler<'a> {
+ bpf: BpfScheduler<'a>,
+ task_queue: VecDeque<QueuedTask>,
+ no_budget_task_queue: VecDeque<QueuedTask>,
+ managed_tasks: HashMap<Pid, Instant>,
+ maximum_budget: u64,
+ own_pid: Pid,
+ p_cores: Range<i32>,
+ e_cores: Range<i32>,
+ topology: Topology,
+ to_remove: Vec<Pid>,
+ e_core_selector: Box<dyn ECoreSelector>,
+ energy_sender: mpsc::SyncSender<EnergyRequest>,
+ shared_budgets: Arc<DashMap<Pid, u64>>,
+}
+
+impl<'a> Scheduler<'a> {
+ pub fn init(
+ open_object: &'a mut MaybeUninit<OpenObject>,
+ use_mocking: bool,
+ power_cap: u64,
+ ) -> Result<Self> {
+ let bpf = BpfScheduler::init(
+ open_object,
+ 0, // exit_dump_len (buffer size of exit info, 0 = default)
+ false, // partial (false = include all tasks)
+ false, // debug (false = debug mode off)
+ )?;
+
+ println!("Initializing energy-aware scheduler");
+
+ // Shared budget map between energy service and scheduler
+ let shared_budgets: Arc<DashMap<Pid, u64>> = Arc::new(DashMap::with_capacity(100000));
+
+ // Start energy tracking service
+ let energy_sender =
+ energy::start_energy_service(use_mocking, power_cap, shared_budgets.clone());
+
+ let topology = Topology::new().unwrap();
+
+ let read_cores = |core_type: &str| {
+ let e_cores_file = File::open(format!("/sys/devices/cpu_{core_type}/cpus"))?;
+ let e_cores_reader = BufReader::new(e_cores_file);
+ let e_cores = if let Some(Ok(line)) = e_cores_reader.lines().next() {
+ let cores: Vec<&str> = line.split('-').collect();
+ cores[0].parse::<i32>().unwrap()..cores[1].parse::<i32>().unwrap()
+ } else {
+ panic!(
+ "Was not able to differentiate between core types. Does the system have e cores?"
+ )
+ };
+ Ok::<_, std::io::Error>(e_cores)
+ };
+ let e_cores = read_cores("atom").unwrap_or(0..4);
+ let p_cores = read_cores("core").unwrap_or(4..8);
+
+ let selector = Box::new(RoundRobinSelector::new(&e_cores));
+ let to_remove = Vec::with_capacity(1000);
+
+ Ok(Self {
+ bpf,
+ task_queue: VecDeque::new(),
+ no_budget_task_queue: VecDeque::new(),
+ managed_tasks: HashMap::new(),
+ maximum_budget: u64::MAX,
+ own_pid: process::id() as i32,
+ p_cores,
+ e_cores,
+ topology,
+ e_core_selector: selector,
+ energy_sender,
+ shared_budgets,
+ to_remove,
+ })
+ }
+
+ fn consume_all_tasks(&mut self) {
+ while let Ok(Some(task)) = self.bpf.dequeue_task() {
+ // The scheduler itself has to be scheduled regardless of its energy usage
+ if task.pid == self.own_pid {
+ self.task_queue.push_front(task);
+ self.dispatch_next_task();
+ continue;
+ }
+
+ // Check if we've seen this task before
+ if let std::collections::hash_map::Entry::Vacant(e) = self.managed_tasks.entry(task.pid)
+ {
+ e.insert(Instant::now());
+ // New task - register it with the energy service
+ self.energy_sender
+ .try_send(EnergyRequest::NewTask(task.pid))
+ .unwrap();
+ }
+
+ // Get current budget for this task
+ match self.shared_budgets.try_get(&task.pid) {
+ dashmap::try_result::TryResult::Present(budget) => match *budget {
+ 0 => self.task_queue.push_back(task),
+ _ => self.no_budget_task_queue.push_back(task),
+ },
+ _ => self.task_queue.push_back(task),
+ }
+ }
+ }
+
+ fn dispatch_next_task(&mut self) {
+ if let Some(task) = self.task_queue.pop_front() {
+ let mut dispatched_task = DispatchedTask::new(&task);
+
+ self.managed_tasks.insert(task.pid, Instant::now());
+
+ let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0);
+ if cpu >= 0 {
+ dispatched_task.cpu = cpu;
+ } else {
+ dispatched_task.flags |= RL_CPU_ANY as u64;
+ }
+
+ dispatched_task.slice_ns = SLICE_US;
+
+ if let Err(e) = self.bpf.dispatch_task(&dispatched_task) {
+ eprintln!("Failed to dispatch task: {}", e);
+ }
+
+ self.bpf.notify_complete(
+ self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64,
+ );
+ } else if let Some(task) = self.no_budget_task_queue.pop_front() {
+ let mut dispatched_task = DispatchedTask::new(&task);
+
+ // Low budget tasks go to e-cores
+ let cpu = self.e_core_selector.next_core();
+ if cpu >= 0 {
+ dispatched_task.cpu = cpu;
+ } else {
+ dispatched_task.flags |= RL_CPU_ANY as u64;
+ }
+
+ // Scheduler tasks get longer slices
+ if task.pid == self.own_pid {
+ dispatched_task.slice_ns = SLICE_US * 1000;
+ } else {
+ dispatched_task.slice_ns = SLICE_US;
+ }
+
+ if let Err(e) = self.bpf.dispatch_task(&dispatched_task) {
+ eprintln!("Failed to dispatch low-budget task: {}", e);
+ }
+
+ self.bpf.notify_complete(
+ self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64,
+ );
+ }
+ }
+
+ fn cleanup_old_tasks(&mut self) {
+ let current = Instant::now();
+ for (pid, last_scheduled) in &self.managed_tasks {
+ if current - *last_scheduled > Duration::from_secs(5) {
+ self.to_remove.push(*pid);
+ }
+ }
+ for pid in self.to_remove.drain(..) {
+ self.managed_tasks.remove(&pid);
+ self.energy_sender
+ .try_send(EnergyRequest::RemoveTask(pid))
+ .unwrap();
+ }
+ }
+
+ fn dispatch_tasks(&mut self) {
+ loop {
+ self.consume_all_tasks();
+ self.dispatch_next_task();
+
+ if self.task_queue.is_empty() && self.no_budget_task_queue.is_empty() {
+ self.bpf.notify_complete(0);
+ break;
+ }
+ }
+ }
+
+ pub fn run(&mut self) -> Result<UserExitInfo> {
+ let mut i = 0;
+ while !self.bpf.exited() {
+ i += 1;
+ self.dispatch_tasks();
+ if i % 100 == 0 {
+ self.cleanup_old_tasks();
+ }
+ }
+
+ // Clean up - signal the energy service to stop tracking all managed tasks
+ for pid in self.managed_tasks.keys() {
+ let _ = self.energy_sender.send(EnergyRequest::RemoveTask(*pid));
+ }
+
+ self.bpf.shutdown_and_report()
+ }
+}
diff --git a/src/task_state.rs b/src/task_state.rs
deleted file mode 100644
index f88bcc2..0000000
--- a/src/task_state.rs
+++ /dev/null
@@ -1,7 +0,0 @@
-use crate::Pid;
-
-pub struct TaskState {
- pub previous_energy_usage: u64,
- pub budget: u64,
- pub parent: Pid,
-}