summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDennis Kobert <dennis@kobert.dev>2025-03-31 19:52:16 +0200
committerDennis Kobert <dennis@kobert.dev>2025-03-31 19:59:38 +0200
commite4ef629d2b6c0d068bf37a4f90e17dcb7f79a7b5 (patch)
tree766b516c2eca4786fb5443759eec98427ab7c173
parent3333510485b0bf12024872ae7a8127e7a0fc59aa (diff)
Schedule tasks in batches before yielding
-rw-r--r--Cargo.lock1
-rw-r--r--Cargo.toml1
-rw-r--r--src/scheduler.rs129
3 files changed, 67 insertions, 64 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 859a16f..60a3996 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3655,7 +3655,6 @@ dependencies = [
"clap",
"csv",
"ctrlc",
- "dashmap",
"iocuddle",
"libbpf-rs 0.24.8",
"libc",
diff --git a/Cargo.toml b/Cargo.toml
index 8de9e1f..21ea539 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,7 +16,6 @@ iocuddle = "0.1.1"
clap = { version = "4.5" , features = ["derive"] }
perf-event = { path = "./perf-event" }
procfs = { version = "0.17.0", default-features = false }
-dashmap = "6.1.0"
csv = "1.3.1"
burn = {version = "0.16.0", features = ["openblas-system","candle"] }
bincode = "=2.0.0-rc.3"
diff --git a/src/scheduler.rs b/src/scheduler.rs
index 83702a0..9ea0d3c 100644
--- a/src/scheduler.rs
+++ b/src/scheduler.rs
@@ -189,81 +189,86 @@ impl<'a> Scheduler<'a> {
}
}
}
-
fn dispatch_next_task(&mut self) {
- self.tasks_scheduled += 1;
- if let Some(task) = self.task_queue.pop_front() {
- let mut dispatched_task = DispatchedTask::new(&task);
- //TODO: do we have to migrate tasks back from e cores?
- if let Some(task_info) = self.managed_tasks.get_mut(&task.pid) {
- task_info.last_scheduled = Instant::now();
- task_info.task_info.set_cpu(task.cpu);
- } else {
- println!("Tried to dispatch a task which is not part of managed tasks");
- }
-
- let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0);
- if cpu >= 0 {
- dispatched_task.cpu = cpu;
- } else {
- dispatched_task.flags |= RL_CPU_ANY as u64;
- }
+ self.batch_dispatch_next_tasks(1);
+ }
- if task.pid == self.own_pid {
- dispatched_task.slice_ns = SLICE_US * 1000;
- } else {
- dispatched_task.slice_ns = SLICE_US;
- }
+ fn batch_dispatch_next_tasks(&mut self, tasks: i32) {
+ for _ in 0..tasks {
+ self.tasks_scheduled += 1;
+ if let Some(task) = self.task_queue.pop_front() {
+ let mut dispatched_task = DispatchedTask::new(&task);
+ //TODO: do we have to migrate tasks back from e cores?
+ if let Some(task_info) = self.managed_tasks.get_mut(&task.pid) {
+ task_info.last_scheduled = Instant::now();
+ task_info.task_info.set_cpu(task.cpu);
+ } else {
+ println!("Tried to dispatch a task which is not part of managed tasks");
+ }
- if let Err(e) = self.bpf.dispatch_task(&dispatched_task) {
- eprintln!("Failed to dispatch task: {}", e);
- }
+ let cpu = self.bpf.select_cpu(task.pid, task.cpu, 0);
+ if cpu >= 0 {
+ dispatched_task.cpu = cpu;
+ } else {
+ dispatched_task.flags |= RL_CPU_ANY as u64;
+ }
- // Migrating to new cpu
- if dispatched_task.cpu != task.cpu {
- let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu);
- if let Some(entry) = self.managed_tasks.get_mut(&task.pid) {
- entry.task_info.set_cpu(dispatched_task.cpu);
- entry.task_info.set_running_on_e_core(running_on_e_core);
+ if task.pid == self.own_pid {
+ dispatched_task.slice_ns = SLICE_US * 1000;
+ } else {
+ dispatched_task.slice_ns = SLICE_US;
}
- }
- self.bpf.notify_complete(
- self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64,
- );
- }
- if !(self.task_queue.is_empty() || self.tasks_scheduled % 10 == 0) {
- return;
- }
- if let Some(task) = self.no_budget_task_queue.pop_front() {
- let mut dispatched_task = DispatchedTask::new(&task);
+ if let Err(e) = self.bpf.dispatch_task(&dispatched_task) {
+ eprintln!("Failed to dispatch task: {}", e);
+ }
- // Low budget tasks go to e-cores
- let cpu = self.e_core_selector.next_core(task.cpu);
+ // Migrating to new cpu
+ if dispatched_task.cpu != task.cpu {
+ let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu);
+ if let Some(entry) = self.managed_tasks.get_mut(&task.pid) {
+ entry.task_info.set_cpu(dispatched_task.cpu);
+ entry.task_info.set_running_on_e_core(running_on_e_core);
+ }
+ }
- if cpu >= 0 {
- dispatched_task.cpu = cpu;
- } else {
- eprintln!("e core scheduler set cpu to -1");
+ self.bpf.notify_complete(
+ self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64,
+ );
}
-
- if let Err(e) = self.bpf.dispatch_task(&dispatched_task) {
- eprintln!("Failed to dispatch low-budget task: {}", e);
+ if !(self.task_queue.is_empty() || self.tasks_scheduled % 10 == 0) {
+ continue;
}
+ if let Some(task) = self.no_budget_task_queue.pop_front() {
+ let mut dispatched_task = DispatchedTask::new(&task);
- // Migrating to new cpu
- if dispatched_task.cpu != task.cpu {
- let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu);
- if let Some(entry) = self.managed_tasks.get_mut(&task.pid) {
- entry.task_info.set_cpu(dispatched_task.cpu);
- entry.task_info.set_running_on_e_core(running_on_e_core);
+ // Low budget tasks go to e-cores
+ let cpu = self.e_core_selector.next_core(task.cpu);
+
+ if cpu >= 0 {
+ dispatched_task.cpu = cpu;
+ } else {
+ eprintln!("e core scheduler set cpu to -1");
}
- }
- self.bpf.notify_complete(
- self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64,
- );
+ // Migrating to new cpu
+ if dispatched_task.cpu != task.cpu {
+ let running_on_e_core = self.e_cores.contains(&dispatched_task.cpu);
+ if let Some(entry) = self.managed_tasks.get_mut(&task.pid) {
+ entry.task_info.set_cpu(dispatched_task.cpu);
+ entry.task_info.set_running_on_e_core(running_on_e_core);
+ }
+ }
+
+ if let Err(e) = self.bpf.dispatch_task(&dispatched_task) {
+ eprintln!("Failed to dispatch low-budget task: {}", e);
+ }
+ } else {
+ break;
+ }
}
+ self.bpf
+ .notify_complete(self.task_queue.len() as u64 + self.no_budget_task_queue.len() as u64);
}
fn cleanup_old_tasks(&mut self) {
@@ -284,7 +289,7 @@ impl<'a> Scheduler<'a> {
fn dispatch_tasks(&mut self) {
loop {
self.consume_all_tasks();
- self.dispatch_next_task();
+ self.batch_dispatch_next_tasks(20);
if self.task_queue.is_empty() && self.no_budget_task_queue.is_empty() {
self.bpf.notify_complete(0);