summaryrefslogtreecommitdiff
path: root/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/main.rs')
-rw-r--r--src/main.rs315
1 files changed, 5 insertions, 310 deletions
diff --git a/src/main.rs b/src/main.rs
index c477a0f..0bf1c12 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,324 +3,19 @@ pub use bpf_skel::*;
pub mod bpf_intf;
mod e_core_selector;
-mod mock;
-mod task_state;
+mod energy;
+mod scheduler;
#[rustfmt::skip]
mod bpf;
-use bpf::*;
+use anyhow::Result;
use clap::{Arg, ArgAction, Command};
-use dashmap::DashMap;
-use e_core_selector::{ECoreSelector, RoundRobinSelector};
-use mock::perf::PerfEstimator;
-use scx_utils::{Topology, UserExitInfo};
-
-use libbpf_rs::OpenObject;
-use mock::{KernelDriver, KernelModule, MockModule};
-use task_state::TaskState;
-
-use std::any::Any;
-use std::collections::{BTreeSet, HashMap, HashSet, VecDeque};
-use std::fs::File;
-use std::io::{BufRead, BufReader};
+use scheduler::Scheduler;
use std::mem::MaybeUninit;
-use std::ops::Range;
-use std::sync::mpsc::{Receiver, Sender, SyncSender};
-use std::sync::{Arc, OnceLock};
-use std::time::Duration;
-use std::{process, thread};
-
-use anyhow::Result;
type Pid = i32;
-const SLICE_US: u64 = 50000;
-
-struct Scheduler<'a> {
- bpf: BpfScheduler<'a>,
- task_queue: VecDeque<QueuedTask>,
- no_budget_task_queue: VecDeque<QueuedTask>,
- managed_tasks: HashMap<Pid, TaskState>,
- maximum_budget: u64,
- power_cap: u64,
- own_pid: Pid,
- p_cores: Range<i32>,
- e_cores: Range<i32>,
- topology: Topology,
- e_core_selector: Box<dyn ECoreSelector>,
- // reciever: Receiver<(Pid, Response)>,
- sender: SyncSender<(Pid, Request)>,
- hashmap: Arc<DashMap<Pid, (u64, i32)>>,
-}
-
-enum Request {
- NewTask,
- RemoveTask,
-}
-
-impl<'a> Scheduler<'a> {
- fn init(
- open_object: &'a mut MaybeUninit<OpenObject>,
- use_mocking: bool,
- power_cap: u64,
- ) -> Result<Self> {
- let bpf = BpfScheduler::init(
- open_object,
- 0, // exit_dump_len (buffer size of exit info, 0 = default)
- false, // partial (false = include all tasks)
- false, // debug (false = debug mode off)
- )?;
- dbg!("registering rust user space scheduler");
- let map: DashMap<i32, (u64, i32)> = DashMap::with_capacity(100000);
- let map = Arc::new(map);
- let thread_map = map.clone();
-
- let topology = Topology::new().unwrap();
-
- let read_cores = |core_type: &str| {
- let e_cores_file = File::open(format!("/sys/devices/cpu_{core_type}/cpus"))?;
- let e_cores_reader = BufReader::new(e_cores_file);
- let e_cores = if let Some(Ok(line)) = e_cores_reader.lines().next() {
- let cores: Vec<&str> = line.split('-').collect();
- cores[0].parse::<i32>().unwrap()..cores[1].parse::<i32>().unwrap()
- } else {
- panic!(
- "Was not able to differentiate between core types. Does the system have e cores?"
- )
- };
- Ok::<_, std::io::Error>(e_cores)
- };
- let e_cores = read_cores("atom").unwrap_or(0..4);
- let p_cores = read_cores("core").unwrap_or(4..8);
-
- let selector = Box::new(RoundRobinSelector::new(&e_cores));
-
- let (request_send, request_recieve) = std::sync::mpsc::sync_channel(1000);
-
- std::thread::spawn(move || {
- let mut module: Box<dyn KernelModule> = if use_mocking {
- Box::new(PerfEstimator::default())
- // Box::new(MockModule::default())
- } else {
- Box::new(KernelDriver::default())
- };
- let mut tasks = BTreeSet::new();
-
- loop {
- for pid in tasks.iter() {
- let energy = module.read_consumption(*pid as u64);
- if let Some(mut entry) = thread_map.try_get_mut(pid).try_unwrap() {
- entry.0 = energy;
- }
- }
-
- if let Ok((pid, request)) = request_recieve.try_recv() {
- match request {
- Request::NewTask => {
- let parent = (|| {
- let process = procfs::process::Process::new(pid)?;
- process.stat().map(|stat| stat.ppid)
- })()
- .unwrap_or_default();
- module.start_trace(pid as u64);
- tasks.insert(pid);
- thread_map.insert(pid, (0, parent));
- }
- Request::RemoveTask => {
- tasks.remove(&pid);
- module.stop_trace(pid as u64)
- }
- }
- }
- std::thread::sleep(Duration::from_micros(5000));
- }
- });
-
- Ok(Self {
- bpf,
- task_queue: VecDeque::new(),
- no_budget_task_queue: VecDeque::new(),
- managed_tasks: HashMap::new(),
- maximum_budget: u64::MAX,
- power_cap,
- own_pid: process::id() as i32,
- p_cores,
- e_cores,
- topology,
- e_core_selector: selector,
- sender: request_send,
- // reciever: response_recieve,
- hashmap: map,
- })
- }
-
- fn consume_all_tasks(&mut self) {
- // Consume all tasks that are ready to run.
- //
- // Each task contains the following details:
- //
- // pub struct QueuedTask {
- // pub pid: i32, // pid that uniquely identifies a task
- // pub cpu: i32, // CPU where the task is running
- // pub sum_exec_runtime: u64, // Total cpu time
- // pub weight: u64, // Task static priority
- // pub nvcsw: u64, // Total amount of voluntary context switches
- // pub slice: u64, // Remaining time slice budget
- // pub vtime: u64, // Current task vruntime / deadline (set by the scheduler)
- // }
- //
- // Although the FIFO scheduler doesn't use these fields, they can provide valuable data for
- // implementing more sophisticated scheduling policies.
- // let mut contended = Vec::with_capacity(100);
-
- while let Ok(Some(task)) = self.bpf.dequeue_task() {
- // The scheduler itself has to be scheduled regardless of its energy usage
- if task.pid == self.own_pid {
- self.task_queue.push_front(task);
- self.dispatch_next_task();
- continue;
- }
-
- match self.hashmap.try_get(&task.pid) {
- dashmap::try_result::TryResult::Present(value) => {
- let (energy, parent) = *value;
-
- // dbg!(energy);
- // TODO: fix
- if energy == 0 {
- // println!("budget zero");
- self.no_budget_task_queue.push_back(task);
- } else {
- self.task_queue.push_back(task);
- }
- }
- dashmap::try_result::TryResult::Absent => {
- self.hashmap.insert(task.pid, (0, 0));
- self.sender.try_send((task.pid, Request::NewTask)).unwrap();
- self.task_queue.push_back(task);
- }
- dashmap::try_result::TryResult::Locked => {
- println!("locked");
- self.task_queue.push_back(task);
- }
- }
- }
- }
-
- fn dispatch_next_task(&mut self) {
- if let Some(task) = self.task_queue.pop_front() {
- // Create a new task to be dispatched, derived from the received enqueued task.
- //
- // pub struct DispatchedTask {
- // pub pid: i32, // pid that uniquely identifies a task
- // pub cpu: i32, // target CPU selected by the scheduler
- // pub flags: u64, // special dispatch flags
- // pub slice_ns: u64, // time slice assigned to the task (0 = default)
- // }
- //
- // The dispatched task's information are pre-populated from the QueuedTask and they can
- // be modified before dispatching it via self.bpf.dispatch_task().
- let mut dispatched_task = DispatchedTask::new(&task);
-
- // Decide where the task needs to run (target CPU).
- //
- // A call to select_cpu() will return the most suitable idle CPU for the task,
- // considering its previously used CPU.
- let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0);
- if cpu >= 0 {
- dispatched_task.cpu = cpu;
- } else {
- dispatched_task.flags |= RL_CPU_ANY as u64;
- }
-
- // Decide for how long the task needs to run (time slice); if not specified
- // SCX_SLICE_DFL will be used by default.
- dispatched_task.slice_ns = SLICE_US;
-
- // Dispatch the task on the target CPU.
- self.bpf.dispatch_task(&dispatched_task).unwrap();
-
- // Notify the BPF component of the number of pending tasks and immediately give a
- // chance to run to the dispatched task.
- self.bpf.notify_complete(
- self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64,
- );
- } else if let Some(task) = self.no_budget_task_queue.pop_front() {
- //TODO: do something with tasks that have no budget left
- let mut dispatched_task = DispatchedTask::new(&task);
-
- let cpu = self.e_core_selector.next_core();
- if cpu >= 0 {
- dispatched_task.cpu = cpu;
- } else {
- dispatched_task.flags |= RL_CPU_ANY as u64;
- }
-
- if task.pid == self.own_pid {
- dispatched_task.slice_ns = SLICE_US * 1000;
- }
- dispatched_task.slice_ns = SLICE_US;
- self.bpf.dispatch_task(&dispatched_task).unwrap();
- self.bpf.notify_complete(
- self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64,
- );
- }
- }
-
- fn dispatch_tasks(&mut self) {
- //TODO: we should probably not do this every time, but instead wait a predefined amount of time between invocations
- self.reset_budgets_and_garbage_collect();
-
- loop {
- // Consume all tasks before dispatching any.
- self.consume_all_tasks();
-
- // Dispatch one task from the queue.
- self.dispatch_next_task();
-
- // If no task is ready to run (or in case of error), stop dispatching tasks and notify
- // the BPF component that all tasks have been scheduled / dispatched, with no remaining
- // pending tasks.
- //TODO: both queues?
- if self.task_queue.is_empty() && self.no_budget_task_queue.is_empty() {
- self.bpf.notify_complete(0);
-
- break;
- }
- }
- }
-
- fn run(&mut self) -> Result<UserExitInfo> {
- while !self.bpf.exited() {
- self.dispatch_tasks();
- }
- self.bpf.shutdown_and_report()
- }
-
- fn get_new_budget(&self) -> u64 {
- //TODO
- u64::MAX
- }
-
- fn reset_budgets_and_garbage_collect(&mut self) {
- let old_budget = self.maximum_budget;
- self.maximum_budget = self.get_new_budget();
-
- self.managed_tasks.retain(|key, value| {
- let was_scheduled = value.budget == old_budget;
- if was_scheduled {
- value.budget = self.maximum_budget;
- } else {
- self.sender
- .try_send(({ *key }, Request::RemoveTask))
- .unwrap();
- }
- was_scheduled
- });
- }
-}
-
fn main() -> Result<()> {
let matches = Command::new("Energy User Space Scheduler")
.arg(
@@ -343,7 +38,7 @@ fn main() -> Result<()> {
let power_cap = *matches.get_one::<u64>("power_cap").unwrap_or(&u64::MAX);
let use_mocking = matches.get_flag("mock");
- // Initialize and load the FIFO scheduler.
+ // Initialize and load the scheduler.
let mut open_object = MaybeUninit::uninit();
loop {
let mut sched = Scheduler::init(&mut open_object, use_mocking, power_cap)?;