summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDennis Kobert <dennis@kobert.dev>2025-03-31 14:50:19 +0200
committerDennis Kobert <dennis@kobert.dev>2025-03-31 14:58:06 +0200
commitfcd46995f4d02d19fc63568ff1fcdaf16e7129f3 (patch)
tree6003cd45326c1f19a5ea63cea40055d1931b463c
parentaa4e02437bf0448d12048f7bcffcaff0c8ba921c (diff)
Improve inference performance and reenable socket communciation
-rw-r--r--src/benchmark.rs114
-rw-r--r--src/main.rs15
2 files changed, 99 insertions, 30 deletions
diff --git a/src/benchmark.rs b/src/benchmark.rs
index 2640ba4..f9651b7 100644
--- a/src/benchmark.rs
+++ b/src/benchmark.rs
@@ -8,26 +8,46 @@ use perf_event::{
Builder, Counter, Group,
};
use rand::seq::IteratorRandom;
+use scx_utils::Topology;
use scx_utils::UserExitInfo;
-use std::collections::HashMap;
use std::fs::File;
use std::mem::MaybeUninit;
use std::process;
use std::thread;
use std::time::{Duration, Instant};
+use std::{collections::HashMap, i32, ops::Range};
const SLICE_US: u64 = 50000;
const LOG_INTERVAL_MS: u64 = 10; // Log every 1 second
// const RESHUFFLE_ROUNDS: usize = 5; // Number of rounds before reshuffling counters
const RESHUFFLE_ROUNDS: usize = 1; // Number of rounds before reshuffling counters
-const MAX_COUNTERS_AT_ONCE: usize = 7;
-
+const MAX_COUNTERS_AT_ONCE_P_CORE: usize = 7;
+const MAX_COUNTERS_AT_ONCE_E_CORE: usize = 8;
type Pid = i32;
pub struct BenchmarkScheduler<'a> {
bpf: BpfScheduler<'a>,
own_pid: Pid,
log_path: String,
+ mode: Mode,
+ p_cores: Range<i32>,
+ e_cores: Range<i32>,
+}
+
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum Mode {
+ ECores,
+ PCores,
+}
+
+impl From<char> for Mode {
+ fn from(value: char) -> Self {
+ match value {
+ 'p' => Mode::PCores,
+ 'e' => Mode::ECores,
+ _ => panic!("{} is not a valid benchmarking mode", value),
+ }
+ }
}
// Represents a single measurement point in time
@@ -138,7 +158,11 @@ impl MeasurementDiff {
}
impl<'a> BenchmarkScheduler<'a> {
- pub fn init(open_object: &'a mut MaybeUninit<OpenObject>, log_path: &str) -> Result<Self> {
+ pub fn init(
+ open_object: &'a mut MaybeUninit<OpenObject>,
+ log_path: &str,
+ mode: Mode,
+ ) -> Result<Self> {
let bpf = BpfScheduler::init(
open_object,
0, // exit_dump_len (default)
@@ -148,10 +172,28 @@ impl<'a> BenchmarkScheduler<'a> {
println!("Initializing benchmark scheduler (single-core profiling mode)");
+ let topology = Topology::new().unwrap();
+ let mut e_core_ids = Vec::new();
+ let mut p_core_ids = Vec::new();
+ for (id, cpu) in &topology.all_cpus {
+ match cpu.core_type {
+ scx_utils::CoreType::Big { turbo: _ } => p_core_ids.push(*id as i32),
+ scx_utils::CoreType::Little => e_core_ids.push(*id as i32),
+ }
+ }
+ // We assume that the CPU IDs for each core type are assigned contiguously.
+ e_core_ids.sort();
+ p_core_ids.sort();
+ let e_cores = *e_core_ids.first().unwrap_or(&0)..(*e_core_ids.last().unwrap_or(&0) + 1);
+ let p_cores = *p_core_ids.first().unwrap_or(&0)..(*p_core_ids.last().unwrap_or(&0) + 1);
+
Ok(Self {
bpf,
own_pid: process::id() as i32,
log_path: log_path.to_string(),
+ mode,
+ p_cores,
+ e_cores,
})
}
@@ -159,14 +201,23 @@ impl<'a> BenchmarkScheduler<'a> {
while let Ok(Some(task)) = self.bpf.dequeue_task() {
let mut dispatched_task = DispatchedTask::new(&task);
- // If it's our own process, schedule it to core 1
- if task.pid == self.own_pid {
- dispatched_task.cpu = 1;
- // dispatched_task.flags |= RL_CPU_ANY as u64;
- } else {
- // Schedule all other tasks on core 0
- // dispatched_task.flags |= RL_CPU_ANY as u64;
- dispatched_task.cpu = 0;
+ match self.mode {
+ // If it's our own process, schedule it to core 1
+ Mode::PCores => {
+ if task.pid == self.own_pid {
+ dispatched_task.cpu = self.p_cores.start + 1;
+ } else {
+ // Schedule all other tasks on core 0
+ dispatched_task.cpu = self.p_cores.start;
+ }
+ }
+ Mode::ECores => {
+ if task.pid == self.own_pid {
+ dispatched_task.cpu = self.e_cores.start + 1;
+ } else {
+ dispatched_task.cpu = self.e_cores.start;
+ }
+ }
}
dispatched_task.slice_ns = SLICE_US;
@@ -185,9 +236,19 @@ impl<'a> BenchmarkScheduler<'a> {
fn start_measurement_thread(&self) -> thread::JoinHandle<()> {
let log_path = self.log_path.clone();
-
+ let mode = self.mode.clone();
+ let e_cores = self.e_cores.clone();
+ let p_cores = self.p_cores.clone();
thread::spawn(move || {
- if let Err(e) = run_measurement_loop(log_path) {
+ if let Err(e) = run_measurement_loop(
+ log_path,
+ mode,
+ if mode == Mode::PCores {
+ p_cores.start
+ } else {
+ e_cores.start
+ },
+ ) {
eprintln!("Measurement thread error: {:?}", e);
}
})
@@ -208,7 +269,7 @@ impl<'a> BenchmarkScheduler<'a> {
}
// Main measurement loop
-fn run_measurement_loop(log_path: String) -> Result<()> {
+fn run_measurement_loop(log_path: String, mode: Mode, cpu_to_monitor: i32) -> Result<()> {
// Define available hardware counters
let available_events = define_available_events();
@@ -218,14 +279,15 @@ fn run_measurement_loop(log_path: String) -> Result<()> {
let mut rng = rand::rng();
let mut round_counter = 0;
+ println!("Monitoring: {cpu_to_monitor}");
+
// Main measurement loop
loop {
// println!("Starting new counter group (round {})", round_counter);
round_counter += 1;
- let cpu = 0;
// Create a new perf group
- let mut group = match Group::new_with_pid_and_cpu(-1, cpu) {
+ let mut group = match Group::new_with_pid_and_cpu(-1, cpu_to_monitor) {
Ok(g) => g,
Err(e) => {
eprintln!("Failed to create perf group: {}", e);
@@ -235,20 +297,27 @@ fn run_measurement_loop(log_path: String) -> Result<()> {
};
// Select random subset of counters
- let selected_events = available_events
- .iter()
- .choose_multiple(&mut rng, MAX_COUNTERS_AT_ONCE);
+ // let selected_events = available_events.iter().choose_multiple(
+ // &mut rng,
+ // if mode == Mode::PCores {
+ // MAX_COUNTERS_AT_ONCE_P_CORE
+ // } else {
+ // MAX_COUNTERS_AT_ONCE_E_CORE
+ // },
+ // );
+
+ let selected_events = available_events[0..MAX_COUNTERS_AT_ONCE_E_CORE].iter();
// println!("Selected {} events for monitoring", selected_events.len());
// Build counters
let mut counters = Vec::new();
- for (name, event) in &selected_events {
+ for (name, event) in selected_events {
match Builder::new()
.group(&mut group)
.kind(event.clone())
.observe_pid(-1)
- .one_cpu(cpu.try_into().unwrap()) // Core 0 is where we're scheduling tasks
+ .one_cpu(cpu_to_monitor.try_into().unwrap())
.build()
{
Ok(counter) => {
@@ -324,7 +393,6 @@ fn run_measurement_loop(log_path: String) -> Result<()> {
}
let _ = group.disable();
- // panic!();
}
}
diff --git a/src/main.rs b/src/main.rs
index c56a41c..f0353e5 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,5 +1,5 @@
mod bpf_skel;
-use benchmark::BenchmarkScheduler;
+use benchmark::{BenchmarkScheduler, Mode};
pub use bpf_skel::*;
pub mod bpf_intf;
@@ -36,9 +36,9 @@ fn main() -> Result<()> {
Arg::new("benchmark")
.short('b')
.long("benchmark")
- .help("Use this flag to enable benckmarking mode")
- .action(ArgAction::SetTrue)
- .required(false),
+ .help("Enable benchmarking mode. Use \"e\" for e cores and \"p\" for p cores")
+ .required(false)
+ .value_name("mode"),
)
.arg(
Arg::new("power_cap")
@@ -51,13 +51,14 @@ fn main() -> Result<()> {
let power_cap = *matches.get_one::<u64>("power_cap").unwrap_or(&u64::MAX);
let use_mocking = matches.get_flag("mock");
- let benchmark = matches.get_flag("benchmark");
+ let benchmark = matches.get_one::<String>("benchmark");
// Initialize and load the scheduler.
let mut open_object = MaybeUninit::uninit();
let log_path = "/tmp/logs.csv";
- if benchmark {
- let mut sched = BenchmarkScheduler::init(&mut open_object, log_path)?;
+ if let Some(mode) = benchmark {
+ let mode = mode.trim().chars().next().unwrap();
+ let mut sched = BenchmarkScheduler::init(&mut open_object, log_path, Mode::from(mode))?;
sched.run()?;
return Ok(());
}