diff options
author | Dennis Kobert <dennis@kobert.dev> | 2025-03-31 19:52:16 +0200 |
---|---|---|
committer | Dennis Kobert <dennis@kobert.dev> | 2025-03-31 19:59:38 +0200 |
commit | e4ef629d2b6c0d068bf37a4f90e17dcb7f79a7b5 (patch) | |
tree | 766b516c2eca4786fb5443759eec98427ab7c173 | |
parent | 3333510485b0bf12024872ae7a8127e7a0fc59aa (diff) |
Schedule tasks in batches before yielding
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | src/scheduler.rs | 129 |
3 files changed, 67 insertions, 64 deletions
@@ -3655,7 +3655,6 @@ dependencies = [ "clap", "csv", "ctrlc", - "dashmap", "iocuddle", "libbpf-rs 0.24.8", "libc", @@ -16,7 +16,6 @@ iocuddle = "0.1.1" clap = { version = "4.5" , features = ["derive"] } perf-event = { path = "./perf-event" } procfs = { version = "0.17.0", default-features = false } -dashmap = "6.1.0" csv = "1.3.1" burn = {version = "0.16.0", features = ["openblas-system","candle"] } bincode = "=2.0.0-rc.3" diff --git a/src/scheduler.rs b/src/scheduler.rs index 83702a0..9ea0d3c 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -189,81 +189,86 @@ impl<'a> Scheduler<'a> { } } } - fn dispatch_next_task(&mut self) { - self.tasks_scheduled += 1; - if let Some(task) = self.task_queue.pop_front() { - let mut dispatched_task = DispatchedTask::new(&task); - //TODO: do we have to migrate tasks back from e cores? - if let Some(task_info) = self.managed_tasks.get_mut(&task.pid) { - task_info.last_scheduled = Instant::now(); - task_info.task_info.set_cpu(task.cpu); - } else { - println!("Tried to dispatch a task which is not part of managed tasks"); - } - - let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0); - if cpu >= 0 { - dispatched_task.cpu = cpu; - } else { - dispatched_task.flags |= RL_CPU_ANY as u64; - } + self.batch_dispatch_next_tasks(1); + } - if task.pid == self.own_pid { - dispatched_task.slice_ns = SLICE_US * 1000; - } else { - dispatched_task.slice_ns = SLICE_US; - } + fn batch_dispatch_next_tasks(&mut self, tasks: i32) { + for _ in 0..tasks { + self.tasks_scheduled += 1; + if let Some(task) = self.task_queue.pop_front() { + let mut dispatched_task = DispatchedTask::new(&task); + //TODO: do we have to migrate tasks back from e cores? + if let Some(task_info) = self.managed_tasks.get_mut(&task.pid) { + task_info.last_scheduled = Instant::now(); + task_info.task_info.set_cpu(task.cpu); + } else { + println!("Tried to dispatch a task which is not part of managed tasks"); + } - if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { - eprintln!("Failed to dispatch task: {}", e); - } + let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0); + if cpu >= 0 { + dispatched_task.cpu = cpu; + } else { + dispatched_task.flags |= RL_CPU_ANY as u64; + } - // Migrating to new cpu - if dispatched_task.cpu != task.cpu { - let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu); - if let Some(entry) = self.managed_tasks.get_mut(&task.pid) { - entry.task_info.set_cpu(dispatched_task.cpu); - entry.task_info.set_running_on_e_core(running_on_e_core); + if task.pid == self.own_pid { + dispatched_task.slice_ns = SLICE_US * 1000; + } else { + dispatched_task.slice_ns = SLICE_US; } - } - self.bpf.notify_complete( - self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64, - ); - } - if !(self.task_queue.is_empty() || self.tasks_scheduled % 10 == 0) { - return; - } - if let Some(task) = self.no_budget_task_queue.pop_front() { - let mut dispatched_task = DispatchedTask::new(&task); + if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { + eprintln!("Failed to dispatch task: {}", e); + } - // Low budget tasks go to e-cores - let cpu = self.e_core_selector.next_core(task.cpu); + // Migrating to new cpu + if dispatched_task.cpu != task.cpu { + let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu); + if let Some(entry) = self.managed_tasks.get_mut(&task.pid) { + entry.task_info.set_cpu(dispatched_task.cpu); + entry.task_info.set_running_on_e_core(running_on_e_core); + } + } - if cpu >= 0 { - dispatched_task.cpu = cpu; - } else { - eprintln!("e core scheduler set cpu to -1"); + self.bpf.notify_complete( + self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64, + ); } - - if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { - eprintln!("Failed to dispatch low-budget task: {}", e); + if !(self.task_queue.is_empty() || self.tasks_scheduled % 10 == 0) { + continue; } + if let Some(task) = self.no_budget_task_queue.pop_front() { + let mut dispatched_task = DispatchedTask::new(&task); - // Migrating to new cpu - if dispatched_task.cpu != task.cpu { - let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu); - if let Some(entry) = self.managed_tasks.get_mut(&task.pid) { - entry.task_info.set_cpu(dispatched_task.cpu); - entry.task_info.set_running_on_e_core(running_on_e_core); + // Low budget tasks go to e-cores + let cpu = self.e_core_selector.next_core(task.cpu); + + if cpu >= 0 { + dispatched_task.cpu = cpu; + } else { + eprintln!("e core scheduler set cpu to -1"); } - } - self.bpf.notify_complete( - self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64, - ); + // Migrating to new cpu + if dispatched_task.cpu != task.cpu { + let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu); + if let Some(entry) = self.managed_tasks.get_mut(&task.pid) { + entry.task_info.set_cpu(dispatched_task.cpu); + entry.task_info.set_running_on_e_core(running_on_e_core); + } + } + + if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { + eprintln!("Failed to dispatch low-budget task: {}", e); + } + } else { + break; + } } + self.bpf + .notify_complete(self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64); } fn cleanup_old_tasks(&mut self) { @@ -284,7 +289,7 @@ impl<'a> Scheduler<'a> { fn dispatch_tasks(&mut self) { loop { self.consume_all_tasks(); - self.dispatch_next_task(); + self.batch_dispatch_next_tasks(20); if self.task_queue.is_empty() && self.no_budget_task_queue.is_empty() { self.bpf.notify_complete(0); |