diff options
Diffstat (limited to 'src/main.rs')
-rw-r--r-- | src/main.rs | 315 |
1 files changed, 5 insertions, 310 deletions
diff --git a/src/main.rs b/src/main.rs index c477a0f..0bf1c12 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,324 +3,19 @@ pub use bpf_skel::*; pub mod bpf_intf; mod e_core_selector; -mod mock; -mod task_state; +mod energy; +mod scheduler; #[rustfmt::skip] mod bpf; -use bpf::*; +use anyhow::Result; use clap::{Arg, ArgAction, Command}; -use dashmap::DashMap; -use e_core_selector::{ECoreSelector, RoundRobinSelector}; -use mock::perf::PerfEstimator; -use scx_utils::{Topology, UserExitInfo}; - -use libbpf_rs::OpenObject; -use mock::{KernelDriver, KernelModule, MockModule}; -use task_state::TaskState; - -use std::any::Any; -use std::collections::{BTreeSet, HashMap, HashSet, VecDeque}; -use std::fs::File; -use std::io::{BufRead, BufReader}; +use scheduler::Scheduler; use std::mem::MaybeUninit; -use std::ops::Range; -use std::sync::mpsc::{Receiver, Sender, SyncSender}; -use std::sync::{Arc, OnceLock}; -use std::time::Duration; -use std::{process, thread}; - -use anyhow::Result; type Pid = i32; -const SLICE_US: u64 = 50000; - -struct Scheduler<'a> { - bpf: BpfScheduler<'a>, - task_queue: VecDeque<QueuedTask>, - no_budget_task_queue: VecDeque<QueuedTask>, - managed_tasks: HashMap<Pid, TaskState>, - maximum_budget: u64, - power_cap: u64, - own_pid: Pid, - p_cores: Range<i32>, - e_cores: Range<i32>, - topology: Topology, - e_core_selector: Box<dyn ECoreSelector>, - // reciever: Receiver<(Pid, Response)>, - sender: SyncSender<(Pid, Request)>, - hashmap: Arc<DashMap<Pid, (u64, i32)>>, -} - -enum Request { - NewTask, - RemoveTask, -} - -impl<'a> Scheduler<'a> { - fn init( - open_object: &'a mut MaybeUninit<OpenObject>, - use_mocking: bool, - power_cap: u64, - ) -> Result<Self> { - let bpf = BpfScheduler::init( - open_object, - 0, // exit_dump_len (buffer size of exit info, 0 = default) - false, // partial (false = include all tasks) - false, // debug (false = debug mode off) - )?; - dbg!("registering rust user space scheduler"); - let map: DashMap<i32, (u64, i32)> = DashMap::with_capacity(100000); - let map = Arc::new(map); - let thread_map = map.clone(); - - let topology = Topology::new().unwrap(); - - let read_cores = |core_type: &str| { - let e_cores_file = File::open(format!("/sys/devices/cpu_{core_type}/cpus"))?; - let e_cores_reader = BufReader::new(e_cores_file); - let e_cores = if let Some(Ok(line)) = e_cores_reader.lines().next() { - let cores: Vec<&str> = line.split('-').collect(); - cores[0].parse::<i32>().unwrap()..cores[1].parse::<i32>().unwrap() - } else { - panic!( - "Was not able to differentiate between core types. Does the system have e cores?" - ) - }; - Ok::<_, std::io::Error>(e_cores) - }; - let e_cores = read_cores("atom").unwrap_or(0..4); - let p_cores = read_cores("core").unwrap_or(4..8); - - let selector = Box::new(RoundRobinSelector::new(&e_cores)); - - let (request_send, request_recieve) = std::sync::mpsc::sync_channel(1000); - - std::thread::spawn(move || { - let mut module: Box<dyn KernelModule> = if use_mocking { - Box::new(PerfEstimator::default()) - // Box::new(MockModule::default()) - } else { - Box::new(KernelDriver::default()) - }; - let mut tasks = BTreeSet::new(); - - loop { - for pid in tasks.iter() { - let energy = module.read_consumption(*pid as u64); - if let Some(mut entry) = thread_map.try_get_mut(pid).try_unwrap() { - entry.0 = energy; - } - } - - if let Ok((pid, request)) = request_recieve.try_recv() { - match request { - Request::NewTask => { - let parent = (|| { - let process = procfs::process::Process::new(pid)?; - process.stat().map(|stat| stat.ppid) - })() - .unwrap_or_default(); - module.start_trace(pid as u64); - tasks.insert(pid); - thread_map.insert(pid, (0, parent)); - } - Request::RemoveTask => { - tasks.remove(&pid); - module.stop_trace(pid as u64) - } - } - } - std::thread::sleep(Duration::from_micros(5000)); - } - }); - - Ok(Self { - bpf, - task_queue: VecDeque::new(), - no_budget_task_queue: VecDeque::new(), - managed_tasks: HashMap::new(), - maximum_budget: u64::MAX, - power_cap, - own_pid: process::id() as i32, - p_cores, - e_cores, - topology, - e_core_selector: selector, - sender: request_send, - // reciever: response_recieve, - hashmap: map, - }) - } - - fn consume_all_tasks(&mut self) { - // Consume all tasks that are ready to run. - // - // Each task contains the following details: - // - // pub struct QueuedTask { - // pub pid: i32, // pid that uniquely identifies a task - // pub cpu: i32, // CPU where the task is running - // pub sum_exec_runtime: u64, // Total cpu time - // pub weight: u64, // Task static priority - // pub nvcsw: u64, // Total amount of voluntary context switches - // pub slice: u64, // Remaining time slice budget - // pub vtime: u64, // Current task vruntime / deadline (set by the scheduler) - // } - // - // Although the FIFO scheduler doesn't use these fields, they can provide valuable data for - // implementing more sophisticated scheduling policies. - // let mut contended = Vec::with_capacity(100); - - while let Ok(Some(task)) = self.bpf.dequeue_task() { - // The scheduler itself has to be scheduled regardless of its energy usage - if task.pid == self.own_pid { - self.task_queue.push_front(task); - self.dispatch_next_task(); - continue; - } - - match self.hashmap.try_get(&task.pid) { - dashmap::try_result::TryResult::Present(value) => { - let (energy, parent) = *value; - - // dbg!(energy); - // TODO: fix - if energy == 0 { - // println!("budget zero"); - self.no_budget_task_queue.push_back(task); - } else { - self.task_queue.push_back(task); - } - } - dashmap::try_result::TryResult::Absent => { - self.hashmap.insert(task.pid, (0, 0)); - self.sender.try_send((task.pid, Request::NewTask)).unwrap(); - self.task_queue.push_back(task); - } - dashmap::try_result::TryResult::Locked => { - println!("locked"); - self.task_queue.push_back(task); - } - } - } - } - - fn dispatch_next_task(&mut self) { - if let Some(task) = self.task_queue.pop_front() { - // Create a new task to be dispatched, derived from the received enqueued task. - // - // pub struct DispatchedTask { - // pub pid: i32, // pid that uniquely identifies a task - // pub cpu: i32, // target CPU selected by the scheduler - // pub flags: u64, // special dispatch flags - // pub slice_ns: u64, // time slice assigned to the task (0 = default) - // } - // - // The dispatched task's information are pre-populated from the QueuedTask and they can - // be modified before dispatching it via self.bpf.dispatch_task(). - let mut dispatched_task = DispatchedTask::new(&task); - - // Decide where the task needs to run (target CPU). - // - // A call to select_cpu() will return the most suitable idle CPU for the task, - // considering its previously used CPU. - let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0); - if cpu >= 0 { - dispatched_task.cpu = cpu; - } else { - dispatched_task.flags |= RL_CPU_ANY as u64; - } - - // Decide for how long the task needs to run (time slice); if not specified - // SCX_SLICE_DFL will be used by default. - dispatched_task.slice_ns = SLICE_US; - - // Dispatch the task on the target CPU. - self.bpf.dispatch_task(&dispatched_task).unwrap(); - - // Notify the BPF component of the number of pending tasks and immediately give a - // chance to run to the dispatched task. - self.bpf.notify_complete( - self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64, - ); - } else if let Some(task) = self.no_budget_task_queue.pop_front() { - //TODO: do something with tasks that have no budget left - let mut dispatched_task = DispatchedTask::new(&task); - - let cpu = self.e_core_selector.next_core(); - if cpu >= 0 { - dispatched_task.cpu = cpu; - } else { - dispatched_task.flags |= RL_CPU_ANY as u64; - } - - if task.pid == self.own_pid { - dispatched_task.slice_ns = SLICE_US * 1000; - } - dispatched_task.slice_ns = SLICE_US; - self.bpf.dispatch_task(&dispatched_task).unwrap(); - self.bpf.notify_complete( - self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64, - ); - } - } - - fn dispatch_tasks(&mut self) { - //TODO: we should probably not do this every time, but instead wait a predefined amount of time between invocations - self.reset_budgets_and_garbage_collect(); - - loop { - // Consume all tasks before dispatching any. - self.consume_all_tasks(); - - // Dispatch one task from the queue. - self.dispatch_next_task(); - - // If no task is ready to run (or in case of error), stop dispatching tasks and notify - // the BPF component that all tasks have been scheduled / dispatched, with no remaining - // pending tasks. - //TODO: both queues? - if self.task_queue.is_empty() && self.no_budget_task_queue.is_empty() { - self.bpf.notify_complete(0); - - break; - } - } - } - - fn run(&mut self) -> Result<UserExitInfo> { - while !self.bpf.exited() { - self.dispatch_tasks(); - } - self.bpf.shutdown_and_report() - } - - fn get_new_budget(&self) -> u64 { - //TODO - u64::MAX - } - - fn reset_budgets_and_garbage_collect(&mut self) { - let old_budget = self.maximum_budget; - self.maximum_budget = self.get_new_budget(); - - self.managed_tasks.retain(|key, value| { - let was_scheduled = value.budget == old_budget; - if was_scheduled { - value.budget = self.maximum_budget; - } else { - self.sender - .try_send(({ *key }, Request::RemoveTask)) - .unwrap(); - } - was_scheduled - }); - } -} - fn main() -> Result<()> { let matches = Command::new("Energy User Space Scheduler") .arg( @@ -343,7 +38,7 @@ fn main() -> Result<()> { let power_cap = *matches.get_one::<u64>("power_cap").unwrap_or(&u64::MAX); let use_mocking = matches.get_flag("mock"); - // Initialize and load the FIFO scheduler. + // Initialize and load the scheduler. let mut open_object = MaybeUninit::uninit(); loop { let mut sched = Scheduler::init(&mut open_object, use_mocking, power_cap)?; |