diff options
Diffstat (limited to 'src/scheduler.rs')
-rw-r--r-- | src/scheduler.rs | 223 |
1 files changed, 223 insertions, 0 deletions
diff --git a/src/scheduler.rs b/src/scheduler.rs new file mode 100644 index 0000000..6188da1 --- /dev/null +++ b/src/scheduler.rs @@ -0,0 +1,223 @@ +use crate::bpf::*; +use crate::energy::{self, Request as EnergyRequest}; + +use crate::e_core_selector::{ECoreSelector, RoundRobinSelector}; +use anyhow::Result; +use dashmap::DashMap; +use libbpf_rs::OpenObject; +use scx_utils::{Topology, UserExitInfo}; + +use std::collections::{HashMap, HashSet, VecDeque}; +use std::fs::File; +use std::io::{BufRead, BufReader}; +use std::mem::MaybeUninit; +use std::ops::Range; +use std::process; +use std::sync::{mpsc, Arc}; +use std::time::{Duration, Instant}; + +use crate::Pid; + +const SLICE_US: u64 = 50000; + +pub struct Scheduler<'a> { + bpf: BpfScheduler<'a>, + task_queue: VecDeque<QueuedTask>, + no_budget_task_queue: VecDeque<QueuedTask>, + managed_tasks: HashMap<Pid, Instant>, + maximum_budget: u64, + own_pid: Pid, + p_cores: Range<i32>, + e_cores: Range<i32>, + topology: Topology, + to_remove: Vec<Pid>, + e_core_selector: Box<dyn ECoreSelector>, + energy_sender: mpsc::SyncSender<EnergyRequest>, + shared_budgets: Arc<DashMap<Pid, u64>>, +} + +impl<'a> Scheduler<'a> { + pub fn init( + open_object: &'a mut MaybeUninit<OpenObject>, + use_mocking: bool, + power_cap: u64, + ) -> Result<Self> { + let bpf = BpfScheduler::init( + open_object, + 0, // exit_dump_len (buffer size of exit info, 0 = default) + false, // partial (false = include all tasks) + false, // debug (false = debug mode off) + )?; + + println!("Initializing energy-aware scheduler"); + + // Shared budget map between energy service and scheduler + let shared_budgets: Arc<DashMap<Pid, u64>> = Arc::new(DashMap::with_capacity(100000)); + + // Start energy tracking service + let energy_sender = + energy::start_energy_service(use_mocking, power_cap, shared_budgets.clone()); + + let topology = Topology::new().unwrap(); + + let read_cores = |core_type: &str| { + let e_cores_file = File::open(format!("/sys/devices/cpu_{core_type}/cpus"))?; + let e_cores_reader = BufReader::new(e_cores_file); + let e_cores = if let Some(Ok(line)) = e_cores_reader.lines().next() { + let cores: Vec<&str> = line.split('-').collect(); + cores[0].parse::<i32>().unwrap()..cores[1].parse::<i32>().unwrap() + } else { + panic!( + "Was not able to differentiate between core types. Does the system have e cores?" + ) + }; + Ok::<_, std::io::Error>(e_cores) + }; + let e_cores = read_cores("atom").unwrap_or(0..4); + let p_cores = read_cores("core").unwrap_or(4..8); + + let selector = Box::new(RoundRobinSelector::new(&e_cores)); + let to_remove = Vec::with_capacity(1000); + + Ok(Self { + bpf, + task_queue: VecDeque::new(), + no_budget_task_queue: VecDeque::new(), + managed_tasks: HashMap::new(), + maximum_budget: u64::MAX, + own_pid: process::id() as i32, + p_cores, + e_cores, + topology, + e_core_selector: selector, + energy_sender, + shared_budgets, + to_remove, + }) + } + + fn consume_all_tasks(&mut self) { + while let Ok(Some(task)) = self.bpf.dequeue_task() { + // The scheduler itself has to be scheduled regardless of its energy usage + if task.pid == self.own_pid { + self.task_queue.push_front(task); + self.dispatch_next_task(); + continue; + } + + // Check if we've seen this task before + if let std::collections::hash_map::Entry::Vacant(e) = self.managed_tasks.entry(task.pid) + { + e.insert(Instant::now()); + // New task - register it with the energy service + self.energy_sender + .try_send(EnergyRequest::NewTask(task.pid)) + .unwrap(); + } + + // Get current budget for this task + match self.shared_budgets.try_get(&task.pid) { + dashmap::try_result::TryResult::Present(budget) => match *budget { + 0 => self.task_queue.push_back(task), + _ => self.no_budget_task_queue.push_back(task), + }, + _ => self.task_queue.push_back(task), + } + } + } + + fn dispatch_next_task(&mut self) { + if let Some(task) = self.task_queue.pop_front() { + let mut dispatched_task = DispatchedTask::new(&task); + + self.managed_tasks.insert(task.pid, Instant::now()); + + let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0); + if cpu >= 0 { + dispatched_task.cpu = cpu; + } else { + dispatched_task.flags |= RL_CPU_ANY as u64; + } + + dispatched_task.slice_ns = SLICE_US; + + if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { + eprintln!("Failed to dispatch task: {}", e); + } + + self.bpf.notify_complete( + self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64, + ); + } else if let Some(task) = self.no_budget_task_queue.pop_front() { + let mut dispatched_task = DispatchedTask::new(&task); + + // Low budget tasks go to e-cores + let cpu = self.e_core_selector.next_core(); + if cpu >= 0 { + dispatched_task.cpu = cpu; + } else { + dispatched_task.flags |= RL_CPU_ANY as u64; + } + + // Scheduler tasks get longer slices + if task.pid == self.own_pid { + dispatched_task.slice_ns = SLICE_US * 1000; + } else { + dispatched_task.slice_ns = SLICE_US; + } + + if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { + eprintln!("Failed to dispatch low-budget task: {}", e); + } + + self.bpf.notify_complete( + self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64, + ); + } + } + + fn cleanup_old_tasks(&mut self) { + let current = Instant::now(); + for (pid, last_scheduled) in &self.managed_tasks { + if current - *last_scheduled > Duration::from_secs(5) { + self.to_remove.push(*pid); + } + } + for pid in self.to_remove.drain(..) { + self.managed_tasks.remove(&pid); + self.energy_sender + .try_send(EnergyRequest::RemoveTask(pid)) + .unwrap(); + } + } + + fn dispatch_tasks(&mut self) { + loop { + self.consume_all_tasks(); + self.dispatch_next_task(); + + if self.task_queue.is_empty() && self.no_budget_task_queue.is_empty() { + self.bpf.notify_complete(0); + break; + } + } + } + + pub fn run(&mut self) -> Result<UserExitInfo> { + let mut i = 0; + while !self.bpf.exited() { + i += 1; + self.dispatch_tasks(); + if i % 100 == 0 { + self.cleanup_old_tasks(); + } + } + + // Clean up - signal the energy service to stop tracking all managed tasks + for pid in self.managed_tasks.keys() { + let _ = self.energy_sender.send(EnergyRequest::RemoveTask(*pid)); + } + + self.bpf.shutdown_and_report() + } +} |