use crate::bpf::*; use crate::energy::{self, Request as EnergyRequest, TaskInfo}; use crate::freq::{self, FrequencyKHZ, Request as FrequencyRequest}; use crate::e_core_selector::{ECoreSelector, RoundRobinSelector}; use anyhow::Result; use libbpf_rs::OpenObject; use scx_utils::{Topology, UserExitInfo}; use std::collections::{HashMap, VecDeque}; use std::mem::MaybeUninit; use std::ops::{Range, RangeInclusive}; use std::process; use std::sync::mpsc::TrySendError; use std::sync::{mpsc, Arc, RwLock}; use std::time::Duration; use crate::Pid; const SLICE_US: u64 = 100000; pub struct Scheduler<'a> { bpf: BpfScheduler<'a>, task_queue: VecDeque, no_budget_task_queue: VecDeque, managed_tasks: HashMap>, maximum_budget: u64, tasks_scheduled: u64, //TODO: also consider Pids of children own_pid: Pid, p_cores: Range, e_cores: Range, topology: Topology, to_remove: Vec, e_core_selector: Box, energy_sender: mpsc::SyncSender, empty_task_infos: mpsc::Receiver>, frequency_sender: mpsc::SyncSender, shared_cpu_frequency_ranges: Arc>>>, shared_policy_frequency_ranges: Arc>>>, shared_cpu_current_frequencies: Arc>>, } impl<'a> Scheduler<'a> { pub fn init( open_object: &'a mut MaybeUninit, use_mocking: bool, power_cap: u64, ) -> Result { println!("Initializing energy-aware scheduler"); let shared_cpu_frequency_ranges: Arc>>> = Arc::new(RwLock::new(Vec::new())); let shared_policy_frequency_ranges: Arc>>> = Arc::new(RwLock::new(Vec::new())); let shared_cpu_current_frequencies: Arc>> = Arc::new(RwLock::new(Vec::new())); // Start energy tracking service let energy_sender = energy::start_energy_service( use_mocking, power_cap, shared_cpu_frequency_ranges.clone(), shared_policy_frequency_ranges.clone(), shared_cpu_current_frequencies.clone(), )?; let (task_sender, empty_task_infos) = mpsc::sync_channel(200); std::thread::spawn(move || loop { if task_sender.send(Arc::new(TaskInfo::default())).is_err() { eprintln!("Failed to allocate TaskInfo"); } }); let topology = Topology::new().unwrap(); let mut e_core_ids = Vec::new(); let mut p_core_ids = Vec::new(); for (id, cpu) in &topology.all_cpus { match cpu.core_type { scx_utils::CoreType::Big { turbo: _ } => p_core_ids.push(*id as i32), scx_utils::CoreType::Little => e_core_ids.push(*id as i32), } } // We assume that the CPU IDs for each core type are assigned contiguously. e_core_ids.sort(); p_core_ids.sort(); let e_cores = *e_core_ids.first().unwrap_or(&0)..(*e_core_ids.last().unwrap_or(&-1) + 1); let p_cores = *p_core_ids.first().unwrap_or(&0)..(*p_core_ids.last().unwrap_or(&-1) + 1); let all_cores = 0..((e_cores.len() + p_cores.len()) as u32); let selector = Box::new(RoundRobinSelector::new(&e_cores)); let to_remove = Vec::with_capacity(1000); let frequency_sender = freq::start_frequency_service( all_cores, shared_cpu_frequency_ranges.clone(), shared_policy_frequency_ranges.clone(), shared_cpu_current_frequencies.clone(), Duration::from_millis(30), )?; frequency_sender .try_send(FrequencyRequest::GetPolicyCPUFrequency) .unwrap(); frequency_sender .try_send(FrequencyRequest::GetPossibleCPUFrequencyRange) .unwrap(); std::thread::sleep(Duration::from_secs(1)); let bpf = BpfScheduler::init( open_object, 0, // exit_dump_len (buffer size of exit info, 0 = default) false, // partial (false = include all tasks) false, // debug (false = debug mode off) )?; Ok(Self { bpf, task_queue: VecDeque::new(), no_budget_task_queue: VecDeque::new(), managed_tasks: HashMap::new(), maximum_budget: u64::MAX, own_pid: process::id() as i32, p_cores, empty_task_infos, tasks_scheduled: 0, e_cores, topology, e_core_selector: selector, energy_sender, to_remove, frequency_sender, shared_cpu_frequency_ranges, shared_policy_frequency_ranges, shared_cpu_current_frequencies, }) } fn try_set_up_garbage_cpu(&self, cpu: u32) -> Result> { if self.shared_cpu_frequency_ranges.read().unwrap().len() <= cpu as usize { // We wait until shared_cpu_frequency_ranges has been initialized return Ok(false); } let target = self.shared_cpu_frequency_ranges.read().unwrap()[cpu as usize] .clone() .min() .unwrap(); self.frequency_sender .try_send(FrequencyRequest::SetTargetFrequencyForCore(cpu, target))?; self.frequency_sender .try_send(FrequencyRequest::SetFrequencyRangeForCore( cpu, target..=(target), ))?; Ok(true) } fn consume_all_tasks(&mut self) { while let Ok(Some(task)) = self.bpf.dequeue_task() { // The scheduler itself has to be scheduled regardless of its energy usage if task.pid == self.own_pid { self.task_queue.push_front(task); continue; } // Check if we've seen this task before match self.managed_tasks.entry(task.pid) { std::collections::hash_map::Entry::Vacant(e) => { let is_e_core = self.e_cores.contains(&task.cpu); // New task - register it with the energy service let task_info = self.empty_task_infos.recv().unwrap(); task_info.set_cpu(task.cpu); task_info.set_running_on_e_core(is_e_core); e.insert(task_info.clone()); self.energy_sender .try_send(EnergyRequest::NewTask(task.pid, task_info)) .unwrap(); self.task_queue.push_back(task); } std::collections::hash_map::Entry::Occupied(e) => { // Get current budget for this task match e.get().read_budget() { 0 => self.no_budget_task_queue.push_back(task), _ => self.task_queue.push_back(task), } } } } } fn dispatch_next_task(&mut self) { self.batch_dispatch_next_tasks(1); } fn batch_dispatch_next_tasks(&mut self, tasks: i32) { for _ in 0..tasks { self.tasks_scheduled += 1; if let Some(task) = self.task_queue.pop_front() { let mut dispatched_task = DispatchedTask::new(&task); //TODO: do we have to migrate tasks back from e cores? let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0); if cpu >= 0 { dispatched_task.cpu = cpu; } else { dispatched_task.flags |= RL_CPU_ANY as u64; } if task.pid == self.own_pid { dispatched_task.slice_ns = SLICE_US * 1000; } else { dispatched_task.slice_ns = SLICE_US; } if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { eprintln!("Failed to dispatch task: {}", e); } let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu); if let Some(entry) = self.managed_tasks.get_mut(&task.pid) { entry.set_cpu(dispatched_task.cpu); entry.set_running_on_e_core(running_on_e_core); entry.set_last_scheduled_to_now(); } else { println!("Tried to dispatch a task which is not part of managed tasks"); } self.bpf.notify_complete( self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64, ); } if !(self.task_queue.is_empty() || self.tasks_scheduled % 10 == 0) { continue; } if let Some(task) = self.no_budget_task_queue.pop_front() { let mut dispatched_task = DispatchedTask::new(&task); // Low budget tasks go to e-cores let cpu = self.e_core_selector.next_core(task.cpu); if cpu >= 0 { dispatched_task.cpu = cpu; } else { eprintln!("e core scheduler set cpu to -1"); } let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu); if let Some(entry) = self.managed_tasks.get_mut(&task.pid) { entry.set_cpu(dispatched_task.cpu); entry.set_running_on_e_core(running_on_e_core); entry.set_last_scheduled_to_now(); } else { println!("Tried to dispatch a task which is not part of managed tasks"); } if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { eprintln!("Failed to dispatch low-budget task: {}", e); } } else { break; } } self.bpf .notify_complete(self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64); } fn cleanup_old_tasks(&mut self) { for (pid, task) in &self.managed_tasks { // None means that the task was never scheduled so we should probably keep it if task .read_time_since_last_schedule() .unwrap_or(Duration::from_secs(0)) > Duration::from_secs(5) { self.to_remove.push(*pid); } } for pid in self.to_remove.drain(..) { self.managed_tasks.remove(&pid); self.energy_sender .try_send(EnergyRequest::RemoveTask(pid)) .unwrap(); } } fn dispatch_tasks(&mut self) { loop { self.consume_all_tasks(); self.batch_dispatch_next_tasks(20); if self.task_queue.is_empty() && self.no_budget_task_queue.is_empty() { self.bpf.notify_complete(0); break; } } } pub fn run(&mut self) -> Result { let mut i = 0; // let mut created_garbage_core = false; while !self.bpf.exited() { // This is how a garbage core could be created // The core should also be excluded from the e core scheduler //if !created_garbage_core { // created_garbage_core = // self.try_set_up_garbage_cpu(self.e_cores.clone().max().unwrap_or(0) as u32)?; //} i += 1; self.dispatch_tasks(); if i % 100 == 0 { self.cleanup_old_tasks(); } } // Clean up - signal the energy service to stop tracking all managed tasks for pid in self.managed_tasks.keys() { let _ = self.energy_sender.send(EnergyRequest::RemoveTask(*pid)); } self.bpf.shutdown_and_report() } }