summaryrefslogtreecommitdiff
path: root/src/scheduler.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/scheduler.rs')
-rw-r--r--src/scheduler.rs223
1 files changed, 223 insertions, 0 deletions
diff --git a/src/scheduler.rs b/src/scheduler.rs
new file mode 100644
index 0000000..6188da1
--- /dev/null
+++ b/src/scheduler.rs
@@ -0,0 +1,223 @@
+use crate::bpf::*;
+use crate::energy::{self, Request as EnergyRequest};
+
+use crate::e_core_selector::{ECoreSelector, RoundRobinSelector};
+use anyhow::Result;
+use dashmap::DashMap;
+use libbpf_rs::OpenObject;
+use scx_utils::{Topology, UserExitInfo};
+
+use std::collections::{HashMap, HashSet, VecDeque};
+use std::fs::File;
+use std::io::{BufRead, BufReader};
+use std::mem::MaybeUninit;
+use std::ops::Range;
+use std::process;
+use std::sync::{mpsc, Arc};
+use std::time::{Duration, Instant};
+
+use crate::Pid;
+
+const SLICE_US: u64 = 50000;
+
+pub struct Scheduler<'a> {
+ bpf: BpfScheduler<'a>,
+ task_queue: VecDeque<QueuedTask>,
+ no_budget_task_queue: VecDeque<QueuedTask>,
+ managed_tasks: HashMap<Pid, Instant>,
+ maximum_budget: u64,
+ own_pid: Pid,
+ p_cores: Range<i32>,
+ e_cores: Range<i32>,
+ topology: Topology,
+ to_remove: Vec<Pid>,
+ e_core_selector: Box<dyn ECoreSelector>,
+ energy_sender: mpsc::SyncSender<EnergyRequest>,
+ shared_budgets: Arc<DashMap<Pid, u64>>,
+}
+
+impl<'a> Scheduler<'a> {
+ pub fn init(
+ open_object: &'a mut MaybeUninit<OpenObject>,
+ use_mocking: bool,
+ power_cap: u64,
+ ) -> Result<Self> {
+ let bpf = BpfScheduler::init(
+ open_object,
+ 0, // exit_dump_len (buffer size of exit info, 0 = default)
+ false, // partial (false = include all tasks)
+ false, // debug (false = debug mode off)
+ )?;
+
+ println!("Initializing energy-aware scheduler");
+
+ // Shared budget map between energy service and scheduler
+ let shared_budgets: Arc<DashMap<Pid, u64>> = Arc::new(DashMap::with_capacity(100000));
+
+ // Start energy tracking service
+ let energy_sender =
+ energy::start_energy_service(use_mocking, power_cap, shared_budgets.clone());
+
+ let topology = Topology::new().unwrap();
+
+ let read_cores = |core_type: &str| {
+ let e_cores_file = File::open(format!("/sys/devices/cpu_{core_type}/cpus"))?;
+ let e_cores_reader = BufReader::new(e_cores_file);
+ let e_cores = if let Some(Ok(line)) = e_cores_reader.lines().next() {
+ let cores: Vec<&str> = line.split('-').collect();
+ cores[0].parse::<i32>().unwrap()..cores[1].parse::<i32>().unwrap()
+ } else {
+ panic!(
+ "Was not able to differentiate between core types. Does the system have e cores?"
+ )
+ };
+ Ok::<_, std::io::Error>(e_cores)
+ };
+ let e_cores = read_cores("atom").unwrap_or(0..4);
+ let p_cores = read_cores("core").unwrap_or(4..8);
+
+ let selector = Box::new(RoundRobinSelector::new(&e_cores));
+ let to_remove = Vec::with_capacity(1000);
+
+ Ok(Self {
+ bpf,
+ task_queue: VecDeque::new(),
+ no_budget_task_queue: VecDeque::new(),
+ managed_tasks: HashMap::new(),
+ maximum_budget: u64::MAX,
+ own_pid: process::id() as i32,
+ p_cores,
+ e_cores,
+ topology,
+ e_core_selector: selector,
+ energy_sender,
+ shared_budgets,
+ to_remove,
+ })
+ }
+
+ fn consume_all_tasks(&mut self) {
+ while let Ok(Some(task)) = self.bpf.dequeue_task() {
+ // The scheduler itself has to be scheduled regardless of its energy usage
+ if task.pid == self.own_pid {
+ self.task_queue.push_front(task);
+ self.dispatch_next_task();
+ continue;
+ }
+
+ // Check if we've seen this task before
+ if let std::collections::hash_map::Entry::Vacant(e) = self.managed_tasks.entry(task.pid)
+ {
+ e.insert(Instant::now());
+ // New task - register it with the energy service
+ self.energy_sender
+ .try_send(EnergyRequest::NewTask(task.pid))
+ .unwrap();
+ }
+
+ // Get current budget for this task
+ match self.shared_budgets.try_get(&task.pid) {
+ dashmap::try_result::TryResult::Present(budget) => match *budget {
+ 0 => self.task_queue.push_back(task),
+ _ => self.no_budget_task_queue.push_back(task),
+ },
+ _ => self.task_queue.push_back(task),
+ }
+ }
+ }
+
+ fn dispatch_next_task(&mut self) {
+ if let Some(task) = self.task_queue.pop_front() {
+ let mut dispatched_task = DispatchedTask::new(&task);
+
+ self.managed_tasks.insert(task.pid, Instant::now());
+
+ let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0);
+ if cpu >= 0 {
+ dispatched_task.cpu = cpu;
+ } else {
+ dispatched_task.flags |= RL_CPU_ANY as u64;
+ }
+
+ dispatched_task.slice_ns = SLICE_US;
+
+ if let Err(e) = self.bpf.dispatch_task(&dispatched_task) {
+ eprintln!("Failed to dispatch task: {}", e);
+ }
+
+ self.bpf.notify_complete(
+ self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64,
+ );
+ } else if let Some(task) = self.no_budget_task_queue.pop_front() {
+ let mut dispatched_task = DispatchedTask::new(&task);
+
+ // Low budget tasks go to e-cores
+ let cpu = self.e_core_selector.next_core();
+ if cpu >= 0 {
+ dispatched_task.cpu = cpu;
+ } else {
+ dispatched_task.flags |= RL_CPU_ANY as u64;
+ }
+
+ // Scheduler tasks get longer slices
+ if task.pid == self.own_pid {
+ dispatched_task.slice_ns = SLICE_US * 1000;
+ } else {
+ dispatched_task.slice_ns = SLICE_US;
+ }
+
+ if let Err(e) = self.bpf.dispatch_task(&dispatched_task) {
+ eprintln!("Failed to dispatch low-budget task: {}", e);
+ }
+
+ self.bpf.notify_complete(
+ self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64,
+ );
+ }
+ }
+
+ fn cleanup_old_tasks(&mut self) {
+ let current = Instant::now();
+ for (pid, last_scheduled) in &self.managed_tasks {
+ if current - *last_scheduled > Duration::from_secs(5) {
+ self.to_remove.push(*pid);
+ }
+ }
+ for pid in self.to_remove.drain(..) {
+ self.managed_tasks.remove(&pid);
+ self.energy_sender
+ .try_send(EnergyRequest::RemoveTask(pid))
+ .unwrap();
+ }
+ }
+
+ fn dispatch_tasks(&mut self) {
+ loop {
+ self.consume_all_tasks();
+ self.dispatch_next_task();
+
+ if self.task_queue.is_empty() && self.no_budget_task_queue.is_empty() {
+ self.bpf.notify_complete(0);
+ break;
+ }
+ }
+ }
+
+ pub fn run(&mut self) -> Result<UserExitInfo> {
+ let mut i = 0;
+ while !self.bpf.exited() {
+ i += 1;
+ self.dispatch_tasks();
+ if i % 100 == 0 {
+ self.cleanup_old_tasks();
+ }
+ }
+
+ // Clean up - signal the energy service to stop tracking all managed tasks
+ for pid in self.managed_tasks.keys() {
+ let _ = self.energy_sender.send(EnergyRequest::RemoveTask(*pid));
+ }
+
+ self.bpf.shutdown_and_report()
+ }
+}