use crate::bpf::*; use crate::energy::{self, Request as EnergyRequest, TaskInfo}; use crate::freq::{self, FrequencyKHZ, Governor, Request as FrequencyRequest}; use crate::core_selector::{CoreSelector, RoundRobinSelector}; use anyhow::Result; use libbpf_rs::OpenObject; use scx_utils::{Topology, UserExitInfo}; use std::collections::{HashMap, VecDeque}; use std::mem::MaybeUninit; use std::ops::{Range, RangeInclusive}; use std::process; use std::sync::mpsc::TrySendError; use std::sync::{mpsc, Arc, RwLock}; use std::time::Duration; use crate::Pid; const SLICE_US: u64 = 100000; pub struct Scheduler<'a> { bpf: BpfScheduler<'a>, task_queue: VecDeque, no_budget_task_queue: VecDeque, managed_tasks: HashMap>, tasks_scheduled: u64, //TODO: also consider Pids of children own_pid: Pid, p_cores: Range, e_cores: Option>, garbage_core: i32, to_remove: Vec, e_core_selector: Box, energy_sender: mpsc::SyncSender, empty_task_infos: mpsc::Receiver>, frequency_sender: mpsc::SyncSender, } impl<'a> Scheduler<'a> { pub fn init(open_object: &'a mut MaybeUninit, use_mocking: bool) -> Result { println!("Initializing energy-aware scheduler"); let shared_cpu_frequency_ranges: Arc>>> = Arc::new(RwLock::new(Vec::new())); let shared_policy_frequency_ranges: Arc>>> = Arc::new(RwLock::new(Vec::new())); let shared_cpu_current_frequencies: Arc>> = Arc::new(RwLock::new(Vec::new())); // Start energy tracking service let energy_sender = energy::start_energy_service(use_mocking, shared_cpu_current_frequencies.clone())?; let (task_sender, empty_task_infos) = mpsc::sync_channel(200); std::thread::spawn(move || loop { if task_sender.send(Arc::new(TaskInfo::default())).is_err() { eprintln!("Failed to allocate TaskInfo"); } }); let topology = Topology::new().unwrap(); let mut e_core_ids = Vec::new(); let mut p_core_ids = Vec::new(); for (id, cpu) in &topology.all_cpus { match cpu.core_type { scx_utils::CoreType::Big { turbo: _ } => p_core_ids.push(*id as i32), scx_utils::CoreType::Little => e_core_ids.push(*id as i32), } } // We assume that the CPU IDs for each core type are assigned contiguously. e_core_ids.sort(); p_core_ids.sort(); let e_cores = *e_core_ids.first().unwrap_or(&0)..(*e_core_ids.last().unwrap_or(&-1) + 1); let p_cores = *p_core_ids.first().unwrap_or(&0)..(*p_core_ids.last().unwrap_or(&-1) + 1); let all_cores = 0..((e_cores.len() + p_cores.len()) as u32); let e_cores = if !e_cores.is_empty() { Some(e_cores) } else { None }; let e_core_selector = if let Some(e_cores) = &e_cores { // reserve the last e core as garbage core Box::new(RoundRobinSelector::new( &(e_cores.start..e_cores.end.saturating_sub(2)), )) } else { // fallback on systems without e cores Box::new(RoundRobinSelector::new(&(0..1))) }; let to_remove = Vec::with_capacity(1000); let frequency_sender = freq::start_frequency_service( all_cores, shared_cpu_frequency_ranges.clone(), shared_policy_frequency_ranges.clone(), shared_cpu_current_frequencies.clone(), Duration::from_millis(30), )?; frequency_sender .try_send(FrequencyRequest::UpdatePolicyCPUFrequency) .unwrap(); frequency_sender .try_send(FrequencyRequest::UpdatePossibleCPUFrequencyRange) .unwrap(); std::thread::sleep(Duration::from_secs(1)); let bpf = BpfScheduler::init( open_object, 0, // exit_dump_len (buffer size of exit info, 0 = default) false, // partial (false = include all tasks) false, // debug (false = debug mode off) )?; Ok(Self { bpf, task_queue: VecDeque::new(), no_budget_task_queue: VecDeque::new(), managed_tasks: HashMap::new(), own_pid: process::id() as i32, p_cores, empty_task_infos, tasks_scheduled: 0, e_cores, garbage_core: 0, e_core_selector, energy_sender, to_remove, frequency_sender, }) } fn try_set_up_garbage_cpu(&mut self) -> Result> { if let Some(e_cores) = &self.e_cores { self.garbage_core = e_cores.end.saturating_sub(1); self.frequency_sender .try_send(FrequencyRequest::SetFrequencyRangeForCore( self.garbage_core as u32, 800_000..=1_200_000, ))?; self.frequency_sender .try_send(FrequencyRequest::SetGovernorForCore( self.garbage_core as u32, Governor::Powersave, ))?; Ok(true) } else { Ok(false) } } fn consume_all_tasks(&mut self) { while let Ok(Some(mut task)) = self.bpf.dequeue_task() { // The scheduler itself has to be scheduled regardless of its energy usage if task.pid == self.own_pid { task.vtime = 10; self.task_queue.push_front(task); continue; } // TODO: consider adjusting vtime for all processes // Check if we've seen this task before match self.managed_tasks.entry(task.pid) { std::collections::hash_map::Entry::Vacant(e) => { let is_e_core = self .e_cores .as_ref() .map(|e_cores| e_cores.contains(&task.cpu)) .unwrap_or(false); // New task - register it with the energy service let task_info = self.empty_task_infos.recv().unwrap(); task_info.set_cpu(task.cpu); task_info.set_running_on_e_core(is_e_core); e.insert(task_info.clone()); self.energy_sender .try_send(EnergyRequest::NewTask(task.pid, task_info)) .unwrap(); self.task_queue.push_back(task); } std::collections::hash_map::Entry::Occupied(e) => { // Get current budget for this task let slice_ns = e.get().update_runtime(task.sum_exec_runtime); let new_budget = e.get().update_budget(Duration::from_nanos(slice_ns)); match new_budget { x if x < 0 => { task.weight = 0; self.no_budget_task_queue.push_back(task) } x if x < energy::budget::MAX_BUDGET_MJ / 10 => { task.weight = 0; self.task_queue.push_back(task) } _ => self.task_queue.push_back(task), } } } } } fn batch_dispatch_next_tasks(&mut self, tasks: i32) { for _ in 0..tasks { self.tasks_scheduled += 1; if let Some(task) = self.task_queue.pop_front() { let mut dispatched_task = DispatchedTask::new(&task); let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0); if cpu >= 0 { dispatched_task.cpu = cpu; } else { dispatched_task.flags |= RL_CPU_ANY as u64; } if task.weight == 0 && self.p_cores.contains(&dispatched_task.cpu) { dispatched_task.cpu = self.e_core_selector.next_core(task.cpu); } if task.pid == self.own_pid { dispatched_task.slice_ns = SLICE_US * 1000; } else { dispatched_task.slice_ns = SLICE_US; } if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { eprintln!("Failed to dispatch task: {}", e); panic!(); } let running_on_e_core = self .e_cores .as_ref() .map(|e_cores| e_cores.contains(&dispatched_task.cpu)) .unwrap_or(false); if let Some(entry) = self.managed_tasks.get_mut(&task.pid) { entry.set_cpu(dispatched_task.cpu); entry.set_running_on_e_core(running_on_e_core); entry.set_last_scheduled_to_now(); } else { println!("Tried to dispatch a task which is not part of managed tasks"); } self.bpf.notify_complete( self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64, ); } if !(self.task_queue.is_empty() || self.tasks_scheduled % 10 == 0) { continue; } if let Some(task) = self.no_budget_task_queue.pop_front() { let mut dispatched_task = DispatchedTask::new(&task); // Low budget tasks go to garbage_core let cpu = self.garbage_core; if cpu >= 0 { dispatched_task.cpu = cpu; } else { eprintln!("e core scheduler set cpu to -1"); } let running_on_e_core = self .e_cores .as_ref() .map(|e_cores| e_cores.contains(&dispatched_task.cpu)) .unwrap_or(false); if let Some(entry) = self.managed_tasks.get_mut(&task.pid) { entry.set_cpu(dispatched_task.cpu); entry.set_running_on_e_core(running_on_e_core); entry.set_last_scheduled_to_now(); } else { println!("Tried to dispatch a task which is not part of managed tasks"); } if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { eprintln!("Failed to dispatch low-budget task: {}", e); } } else { break; } } self.bpf .notify_complete(self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64); } fn cleanup_old_tasks(&mut self) { for (pid, task) in &self.managed_tasks { // None means that the task was never scheduled so we should probably keep it if task .read_time_since_last_schedule() .unwrap_or(Duration::from_secs(0)) > Duration::from_secs(5) { self.to_remove.push(*pid); } } for pid in self.to_remove.drain(..) { self.managed_tasks.remove(&pid); self.energy_sender .try_send(EnergyRequest::RemoveTask(pid)) .unwrap(); } } fn dispatch_tasks(&mut self) { loop { self.consume_all_tasks(); self.batch_dispatch_next_tasks(20); if self.task_queue.is_empty() && self.no_budget_task_queue.is_empty() { self.bpf.notify_complete(0); break; } } } pub fn run(&mut self) -> Result { self.try_set_up_garbage_cpu()?; let mut i = 0; while !self.bpf.exited() { i += 1; self.dispatch_tasks(); if i % 100 == 0 { self.cleanup_old_tasks(); } } // Clean up - signal the energy service to stop tracking all managed tasks for pid in self.managed_tasks.keys() { let _ = self.energy_sender.send(EnergyRequest::RemoveTask(*pid)); } self.bpf.shutdown_and_report() } }