diff options
author | Dennis Kobert <dennis@kobert.dev> | 2025-03-31 14:50:19 +0200 |
---|---|---|
committer | Dennis Kobert <dennis@kobert.dev> | 2025-03-31 14:58:06 +0200 |
commit | fcd46995f4d02d19fc63568ff1fcdaf16e7129f3 (patch) | |
tree | 6003cd45326c1f19a5ea63cea40055d1931b463c | |
parent | aa4e02437bf0448d12048f7bcffcaff0c8ba921c (diff) |
Improve inference performance and reenable socket communciation
-rw-r--r-- | src/benchmark.rs | 114 | ||||
-rw-r--r-- | src/main.rs | 15 |
2 files changed, 99 insertions, 30 deletions
diff --git a/src/benchmark.rs b/src/benchmark.rs index 2640ba4..f9651b7 100644 --- a/src/benchmark.rs +++ b/src/benchmark.rs @@ -8,26 +8,46 @@ use perf_event::{ Builder, Counter, Group, }; use rand::seq::IteratorRandom; +use scx_utils::Topology; use scx_utils::UserExitInfo; -use std::collections::HashMap; use std::fs::File; use std::mem::MaybeUninit; use std::process; use std::thread; use std::time::{Duration, Instant}; +use std::{collections::HashMap, i32, ops::Range}; const SLICE_US: u64 = 50000; const LOG_INTERVAL_MS: u64 = 10; // Log every 1 second // const RESHUFFLE_ROUNDS: usize = 5; // Number of rounds before reshuffling counters const RESHUFFLE_ROUNDS: usize = 1; // Number of rounds before reshuffling counters -const MAX_COUNTERS_AT_ONCE: usize = 7; - +const MAX_COUNTERS_AT_ONCE_P_CORE: usize = 7; +const MAX_COUNTERS_AT_ONCE_E_CORE: usize = 8; type Pid = i32; pub struct BenchmarkScheduler<'a> { bpf: BpfScheduler<'a>, own_pid: Pid, log_path: String, + mode: Mode, + p_cores: Range<i32>, + e_cores: Range<i32>, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Mode { + ECores, + PCores, +} + +impl From<char> for Mode { + fn from(value: char) -> Self { + match value { + 'p' => Mode::PCores, + 'e' => Mode::ECores, + _ => panic!("{} is not a valid benchmarking mode", value), + } + } } // Represents a single measurement point in time @@ -138,7 +158,11 @@ impl MeasurementDiff { } impl<'a> BenchmarkScheduler<'a> { - pub fn init(open_object: &'a mut MaybeUninit<OpenObject>, log_path: &str) -> Result<Self> { + pub fn init( + open_object: &'a mut MaybeUninit<OpenObject>, + log_path: &str, + mode: Mode, + ) -> Result<Self> { let bpf = BpfScheduler::init( open_object, 0, // exit_dump_len (default) @@ -148,10 +172,28 @@ impl<'a> BenchmarkScheduler<'a> { println!("Initializing benchmark scheduler (single-core profiling mode)"); + let topology = Topology::new().unwrap(); + let mut e_core_ids = Vec::new(); + let mut p_core_ids = Vec::new(); + for (id, cpu) in &topology.all_cpus { + match cpu.core_type { + scx_utils::CoreType::Big { turbo: _ } => p_core_ids.push(*id as i32), + scx_utils::CoreType::Little => e_core_ids.push(*id as i32), + } + } + // We assume that the CPU IDs for each core type are assigned contiguously. + e_core_ids.sort(); + p_core_ids.sort(); + let e_cores = *e_core_ids.first().unwrap_or(&0)..(*e_core_ids.last().unwrap_or(&0) + 1); + let p_cores = *p_core_ids.first().unwrap_or(&0)..(*p_core_ids.last().unwrap_or(&0) + 1); + Ok(Self { bpf, own_pid: process::id() as i32, log_path: log_path.to_string(), + mode, + p_cores, + e_cores, }) } @@ -159,14 +201,23 @@ impl<'a> BenchmarkScheduler<'a> { while let Ok(Some(task)) = self.bpf.dequeue_task() { let mut dispatched_task = DispatchedTask::new(&task); - // If it's our own process, schedule it to core 1 - if task.pid == self.own_pid { - dispatched_task.cpu = 1; - // dispatched_task.flags |= RL_CPU_ANY as u64; - } else { - // Schedule all other tasks on core 0 - // dispatched_task.flags |= RL_CPU_ANY as u64; - dispatched_task.cpu = 0; + match self.mode { + // If it's our own process, schedule it to core 1 + Mode::PCores => { + if task.pid == self.own_pid { + dispatched_task.cpu = self.p_cores.start + 1; + } else { + // Schedule all other tasks on core 0 + dispatched_task.cpu = self.p_cores.start; + } + } + Mode::ECores => { + if task.pid == self.own_pid { + dispatched_task.cpu = self.e_cores.start + 1; + } else { + dispatched_task.cpu = self.e_cores.start; + } + } } dispatched_task.slice_ns = SLICE_US; @@ -185,9 +236,19 @@ impl<'a> BenchmarkScheduler<'a> { fn start_measurement_thread(&self) -> thread::JoinHandle<()> { let log_path = self.log_path.clone(); - + let mode = self.mode.clone(); + let e_cores = self.e_cores.clone(); + let p_cores = self.p_cores.clone(); thread::spawn(move || { - if let Err(e) = run_measurement_loop(log_path) { + if let Err(e) = run_measurement_loop( + log_path, + mode, + if mode == Mode::PCores { + p_cores.start + } else { + e_cores.start + }, + ) { eprintln!("Measurement thread error: {:?}", e); } }) @@ -208,7 +269,7 @@ impl<'a> BenchmarkScheduler<'a> { } // Main measurement loop -fn run_measurement_loop(log_path: String) -> Result<()> { +fn run_measurement_loop(log_path: String, mode: Mode, cpu_to_monitor: i32) -> Result<()> { // Define available hardware counters let available_events = define_available_events(); @@ -218,14 +279,15 @@ fn run_measurement_loop(log_path: String) -> Result<()> { let mut rng = rand::rng(); let mut round_counter = 0; + println!("Monitoring: {cpu_to_monitor}"); + // Main measurement loop loop { // println!("Starting new counter group (round {})", round_counter); round_counter += 1; - let cpu = 0; // Create a new perf group - let mut group = match Group::new_with_pid_and_cpu(-1, cpu) { + let mut group = match Group::new_with_pid_and_cpu(-1, cpu_to_monitor) { Ok(g) => g, Err(e) => { eprintln!("Failed to create perf group: {}", e); @@ -235,20 +297,27 @@ fn run_measurement_loop(log_path: String) -> Result<()> { }; // Select random subset of counters - let selected_events = available_events - .iter() - .choose_multiple(&mut rng, MAX_COUNTERS_AT_ONCE); + // let selected_events = available_events.iter().choose_multiple( + // &mut rng, + // if mode == Mode::PCores { + // MAX_COUNTERS_AT_ONCE_P_CORE + // } else { + // MAX_COUNTERS_AT_ONCE_E_CORE + // }, + // ); + + let selected_events = available_events[0..MAX_COUNTERS_AT_ONCE_E_CORE].iter(); // println!("Selected {} events for monitoring", selected_events.len()); // Build counters let mut counters = Vec::new(); - for (name, event) in &selected_events { + for (name, event) in selected_events { match Builder::new() .group(&mut group) .kind(event.clone()) .observe_pid(-1) - .one_cpu(cpu.try_into().unwrap()) // Core 0 is where we're scheduling tasks + .one_cpu(cpu_to_monitor.try_into().unwrap()) .build() { Ok(counter) => { @@ -324,7 +393,6 @@ fn run_measurement_loop(log_path: String) -> Result<()> { } let _ = group.disable(); - // panic!(); } } diff --git a/src/main.rs b/src/main.rs index c56a41c..f0353e5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,5 @@ mod bpf_skel; -use benchmark::BenchmarkScheduler; +use benchmark::{BenchmarkScheduler, Mode}; pub use bpf_skel::*; pub mod bpf_intf; @@ -36,9 +36,9 @@ fn main() -> Result<()> { Arg::new("benchmark") .short('b') .long("benchmark") - .help("Use this flag to enable benckmarking mode") - .action(ArgAction::SetTrue) - .required(false), + .help("Enable benchmarking mode. Use \"e\" for e cores and \"p\" for p cores") + .required(false) + .value_name("mode"), ) .arg( Arg::new("power_cap") @@ -51,13 +51,14 @@ fn main() -> Result<()> { let power_cap = *matches.get_one::<u64>("power_cap").unwrap_or(&u64::MAX); let use_mocking = matches.get_flag("mock"); - let benchmark = matches.get_flag("benchmark"); + let benchmark = matches.get_one::<String>("benchmark"); // Initialize and load the scheduler. let mut open_object = MaybeUninit::uninit(); let log_path = "/tmp/logs.csv"; - if benchmark { - let mut sched = BenchmarkScheduler::init(&mut open_object, log_path)?; + if let Some(mode) = benchmark { + let mode = mode.trim().chars().next().unwrap(); + let mut sched = BenchmarkScheduler::init(&mut open_object, log_path, Mode::from(mode))?; sched.run()?; return Ok(()); } |