diff options
author | Dennis Kobert <dennis@kobert.dev> | 2025-03-10 21:36:30 +0100 |
---|---|---|
committer | Dennis Kobert <dennis@kobert.dev> | 2025-03-25 15:52:04 +0100 |
commit | 776dee32aca73ded5c0720afbe25ef2aae7b67f3 (patch) | |
tree | 1e4c8b9bde5c944869134e0ed751315efe09198a /src | |
parent | c2d63b96cbbd3be653fa0023fadae7df3babaf32 (diff) |
Implement benchmarking
Diffstat (limited to 'src')
-rw-r--r-- | src/benchmark.rs | 664 | ||||
-rw-r--r-- | src/energy.rs | 2 | ||||
-rw-r--r-- | src/energy/trackers/perf.rs | 12 | ||||
-rw-r--r-- | src/main.rs | 17 |
4 files changed, 692 insertions, 3 deletions
diff --git a/src/benchmark.rs b/src/benchmark.rs new file mode 100644 index 0000000..606f29b --- /dev/null +++ b/src/benchmark.rs @@ -0,0 +1,664 @@ +use crate::bpf::*; +use crate::energy::rapl; +use anyhow::Result; +use csv::Writer; +use libbpf_rs::OpenObject; +use perf_event::{ + events::{Cache, CacheOp, CacheResult, Event, Hardware, Software, WhichCache}, + Builder, Counter, Group, +}; +use rand::seq::IteratorRandom; +use scx_utils::UserExitInfo; +use std::collections::HashMap; +use std::fs::File; +use std::mem::MaybeUninit; +use std::process; +use std::thread; +use std::time::{Duration, Instant}; + +const SLICE_US: u64 = 50000; +const LOG_INTERVAL_MS: u64 = 10; // Log every 1 second + // const RESHUFFLE_ROUNDS: usize = 5; // Number of rounds before reshuffling counters +const RESHUFFLE_ROUNDS: usize = 1; // Number of rounds before reshuffling counters +const MAX_COUNTERS_AT_ONCE: usize = 5; + +type Pid = i32; + +pub struct BenchmarkScheduler<'a> { + bpf: BpfScheduler<'a>, + own_pid: Pid, + log_path: String, +} + +// Represents a single measurement point in time +struct Measurement { + timestamp: Instant, + energy: Option<f64>, + frequency: Option<f64>, + counter_values: Vec<Option<u64>>, +} + +impl Measurement { + fn new() -> Self { + Self { + timestamp: Instant::now(), + energy: None, + frequency: None, + counter_values: Vec::new(), + } + } + + // Take a measurement with the given counter group + fn take(counters: &[(String, Counter)], group: &mut Group) -> Result<Self> { + let mut measurement = Self::new(); + + // Read energy + // Basline is 4.5W + measurement.energy = rapl::read_package_energy().ok(); + + // Read CPU frequency + measurement.frequency = read_cpu_frequency(0); + + // Read performance counters + let counts = group.read()?; + // dbg!(&counts); + + let counters: HashMap<_, _> = counters.iter().map(|(a, b)| (a.clone(), b)).collect(); + + // Extract counter values + for (name, _) in define_available_events() { + let Some(counter) = counters.get(&name) else { + measurement.counter_values.push(None); + continue; + }; + measurement + .counter_values + .push(counts.get(counter).cloned()); + } + + Ok(measurement) + } + + // Calculate the difference between two measurements + fn diff(&self, previous: &Measurement) -> MeasurementDiff { + let duration_ms = self + .timestamp + .duration_since(previous.timestamp) + .as_millis() as u64; + + // Calculate energy delta + let energy_delta = match (previous.energy, self.energy) { + (Some(prev), Some(curr)) => curr - prev, + _ => 0.0, + }; + + MeasurementDiff { + timestamp: self.timestamp, + duration_ms, + energy_delta, + frequency: self.frequency, + counter_deltas: self.counter_values.clone(), + } + } +} + +// Represents the difference between two measurements +struct MeasurementDiff { + timestamp: Instant, + duration_ms: u64, + energy_delta: f64, + frequency: Option<f64>, + counter_deltas: Vec<Option<u64>>, +} + +impl MeasurementDiff { + // Write this diff as a CSV record + fn write_csv_record(&self, writer: &mut Writer<File>) -> Result<()> { + // Prepare CSV record + let mut record = vec![ + self.timestamp.elapsed().as_secs_f64().to_string(), + self.duration_ms.to_string(), + self.energy_delta.to_string(), + self.frequency + .map(|f| f.to_string()) + .unwrap_or_else(|| "N/A".to_string()), + ]; + record.extend( + self.counter_deltas + .iter() + .map(|x| x.map(|x| x.to_string()).unwrap_or_default()), + ); + + // Write record + writer.write_record(&record)?; + writer.flush()?; + + Ok(()) + } +} + +impl<'a> BenchmarkScheduler<'a> { + pub fn init(open_object: &'a mut MaybeUninit<OpenObject>, log_path: &str) -> Result<Self> { + let bpf = BpfScheduler::init( + open_object, + 0, // exit_dump_len (default) + false, // partial (include all tasks) + false, // debug mode off + )?; + + println!("Initializing benchmark scheduler (single-core profiling mode)"); + + Ok(Self { + bpf, + own_pid: process::id() as i32, + log_path: log_path.to_string(), + }) + } + + fn consume_all_tasks(&mut self) -> Result<()> { + while let Ok(Some(task)) = self.bpf.dequeue_task() { + let mut dispatched_task = DispatchedTask::new(&task); + + // If it's our own process, schedule it to core 1 + if task.pid == self.own_pid { + dispatched_task.cpu = 1; + // dispatched_task.flags |= RL_CPU_ANY as u64; + } else { + // Schedule all other tasks on core 0 + // dispatched_task.flags |= RL_CPU_ANY as u64; + dispatched_task.cpu = 0; + } + + dispatched_task.slice_ns = SLICE_US; + + // Dispatch the task + if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { + eprintln!("Failed to dispatch task: {}", e); + } + } + + // Notify BPF we're done processing tasks + self.bpf.notify_complete(0); + + Ok(()) + } + + fn start_measurement_thread(&self) -> thread::JoinHandle<()> { + let log_path = self.log_path.clone(); + + thread::spawn(move || { + if let Err(e) = run_measurement_loop(log_path) { + eprintln!("Measurement thread error: {:?}", e); + } + }) + } + + pub fn run(&mut self) -> Result<UserExitInfo> { + // Start the measurement thread + self.start_measurement_thread(); + + // Main scheduling loop + while !self.bpf.exited() { + // Process all tasks + self.consume_all_tasks()?; + } + + self.bpf.shutdown_and_report() + } +} + +// Main measurement loop +fn run_measurement_loop(log_path: String) -> Result<()> { + // Define available hardware counters + let available_events = define_available_events(); + + // Initialize CSV writer with header + let mut csv_writer = initialize_csv_writer(&log_path, &available_events)?; + + let mut rng = rand::rng(); + let mut round_counter = 0; + + // Main measurement loop + loop { + // println!("Starting new counter group (round {})", round_counter); + round_counter += 1; + + let cpu = 0; + // Create a new perf group + let mut group = match Group::new_with_pid_and_cpu(-1, cpu) { + Ok(g) => g, + Err(e) => { + eprintln!("Failed to create perf group: {}", e); + thread::sleep(Duration::from_millis(100)); + continue; + } + }; + + // Select random subset of counters + let selected_events = available_events + .iter() + .choose_multiple(&mut rng, MAX_COUNTERS_AT_ONCE); + + // println!("Selected {} events for monitoring", selected_events.len()); + + // Build counters + let mut counters = Vec::new(); + for (name, event) in &selected_events { + match Builder::new() + .group(&mut group) + .kind(event.clone()) + .observe_pid(-1) + .one_cpu(cpu.try_into().unwrap()) // Core 0 is where we're scheduling tasks + .build() + { + Ok(counter) => { + // println!("Successfully created counter for {}", name); + counters.push((name.clone(), counter)); + } + Err(e) => { + eprintln!("Failed to create counter for {}: {}", name, e); + } + } + } + group.enable().unwrap(); + + if counters.is_empty() { + eprintln!("Failed to create any counters, retrying..."); + thread::sleep(Duration::from_millis(100)); + continue; + } + + // Enable the counter group + if let Err(e) = group.enable() { + eprintln!("Failed to enable perf group: {}", e); + thread::sleep(Duration::from_millis(100)); + continue; + } + + // println!( + // "Successfully enabled counter group with {} counters", + // counters.len() + // ); + + // Take initial measurement + let mut prev_measurement = match Measurement::take(&counters, &mut group) { + Ok(m) => m, + Err(e) => { + eprintln!("Failed to take initial measurement: {}", e); + thread::sleep(Duration::from_millis(100)); + continue; + } + }; + + // println!("Took initial measurement"); + + // Monitor for several rounds before reshuffling + for round in 0..RESHUFFLE_ROUNDS { + group.enable().unwrap(); + group.reset().unwrap(); + // Wait for the sampling interval + thread::sleep(Duration::from_millis(LOG_INTERVAL_MS)); + + // Take current measurement + let curr_measurement = match Measurement::take(&counters, &mut group) { + Ok(m) => m, + Err(e) => { + eprintln!("Failed to take measurement in round {}: {}", round, e); + continue; + } + }; + + // Calculate difference and write to CSV + let diff = curr_measurement.diff(&prev_measurement); + // println!( + // "Measurement diff: duration={}ms, energy={}J", + // diff.duration_ms, diff.energy_delta + // ); + + if let Err(e) = diff.write_csv_record(&mut csv_writer) { + eprintln!("Failed to write CSV record: {}", e); + } + + // Current becomes previous for next iteration + prev_measurement = curr_measurement; + } + + let _ = group.disable(); + // panic!(); + } +} + +fn initialize_csv_writer( + log_path: &str, + available_events: &[(String, Event)], +) -> Result<Writer<File>> { + let file = File::create(log_path)?; + let mut csv_writer = Writer::from_writer(file); + + // Write header with all possible counter names + let mut header = vec![ + "timestamp".to_string(), + "duration_ms".to_string(), + "package_power_j".to_string(), + "cpu_frequency_mhz".to_string(), + ]; + + // Add counter deltas + for (name, _) in available_events { + header.push(name.into()); + } + + csv_writer.write_record(&header)?; + csv_writer.flush()?; + + Ok(csv_writer) +} + +fn read_cpu_frequency(cpu_id: u32) -> Option<f64> { + // Try to read frequency from sysfs + let freq_path = format!( + "/sys/devices/system/cpu/cpu{}/cpufreq/scaling_cur_freq", + cpu_id + ); + + match std::fs::read_to_string(freq_path) { + Ok(content) => { + // Convert from kHz to MHz + content.trim().parse::<f64>().ok().map(|freq| freq / 1000.0) + } + Err(_) => None, + } +} + +fn define_available_events() -> Vec<(String, Event)> { + let mut events = Vec::new(); + + // Hardware events + events.extend([ + ( + "cpu_cycles".to_string(), + Event::Hardware(Hardware::CPU_CYCLES), + ), + ( + "instructions".to_string(), + Event::Hardware(Hardware::INSTRUCTIONS), + ), + ( + "cache_references".to_string(), + Event::Hardware(Hardware::CACHE_REFERENCES), + ), + ( + "cache_misses".to_string(), + Event::Hardware(Hardware::CACHE_MISSES), + ), + ( + "branch_instructions".to_string(), + Event::Hardware(Hardware::BRANCH_INSTRUCTIONS), + ), + ( + "branch_misses".to_string(), + Event::Hardware(Hardware::BRANCH_MISSES), + ), + // ( + // "bus_cycles".to_string(), + // Event::Hardware(Hardware::BUS_CYCLES), + // ), + // ( + // "stalled_cycles_frontend".to_string(), + // Event::Hardware(Hardware::STALLED_CYCLES_FRONTEND), + // ), + // ( + // "stalled_cycles_backend".to_string(), + // Event::Hardware(Hardware::STALLED_CYCLES_BACKEND), + // ), + ( + "ref_cpu_cycles".to_string(), + Event::Hardware(Hardware::REF_CPU_CYCLES), + ), + ]); + + // Software events + events.extend([ + ( + "sw_cpu_clock".to_string(), + Event::Software(Software::CPU_CLOCK), + ), + ( + "sw_task_clock".to_string(), + Event::Software(Software::TASK_CLOCK), + ), + ( + "sw_page_faults".to_string(), + Event::Software(Software::PAGE_FAULTS), + ), + ( + "sw_context_switches".to_string(), + Event::Software(Software::CONTEXT_SWITCHES), + ), + ( + "sw_cpu_migrations".to_string(), + Event::Software(Software::CPU_MIGRATIONS), + ), + ( + "sw_page_faults_min".to_string(), + Event::Software(Software::PAGE_FAULTS_MIN), + ), + ( + "sw_page_faults_maj".to_string(), + Event::Software(Software::PAGE_FAULTS_MAJ), + ), + ( + "sw_alignment_faults".to_string(), + Event::Software(Software::ALIGNMENT_FAULTS), + ), + ( + "sw_emulation_faults".to_string(), + Event::Software(Software::EMULATION_FAULTS), + ), + ]); + + // L1 Data Cache events + events.extend([ + ( + "l1d_read_access".to_string(), + Event::Cache(Cache { + which: WhichCache::L1D, + operation: CacheOp::READ, + result: CacheResult::ACCESS, + }), + ), + ( + "l1d_read_miss".to_string(), + Event::Cache(Cache { + which: WhichCache::L1D, + operation: CacheOp::READ, + result: CacheResult::MISS, + }), + ), + // ( + // "l1d_write_access".to_string(), + // Event::Cache(Cache { + // which: WhichCache::L1D, + // operation: CacheOp::WRITE, + // result: CacheResult::ACCESS, + // }), + // ), + // ( + // "l1d_write_miss".to_string(), + // Event::Cache(Cache { + // which: WhichCache::L1D, + // operation: CacheOp::WRITE, + // result: CacheResult::MISS, + // }), + // ), + // ( + // "l1d_prefetch_access".to_string(), + // Event::Cache(Cache { + // which: WhichCache::L1D, + // operation: CacheOp::PREFETCH, + // result: CacheResult::ACCESS, + // }), + // ), + // ( + // "l1d_prefetch_miss".to_string(), + // Event::Cache(Cache { + // which: WhichCache::L1D, + // operation: CacheOp::PREFETCH, + // result: CacheResult::MISS, + // }), + // ), + ]); + + // L1 Instruction Cache events + events.extend([ + ( + "l1i_read_access".to_string(), + Event::Cache(Cache { + which: WhichCache::L1I, + operation: CacheOp::READ, + result: CacheResult::ACCESS, + }), + ), + ( + "l1i_read_miss".to_string(), + Event::Cache(Cache { + which: WhichCache::L1I, + operation: CacheOp::READ, + result: CacheResult::MISS, + }), + ), + ]); + + // Last Level Cache events + events.extend([ + // ( + // "llc_read_access".to_string(), + // Event::Cache(Cache { + // which: WhichCache::LL, + // operation: CacheOp::READ, + // result: CacheResult::ACCESS, + // }), + // ), + // ( + // "llc_read_miss".to_string(), + // Event::Cache(Cache { + // which: WhichCache::LL, + // operation: CacheOp::READ, + // result: CacheResult::MISS, + // }), + // ), + // ( + // "llc_write_access".to_string(), + // Event::Cache(Cache { + // which: WhichCache::LL, + // operation: CacheOp::WRITE, + // result: CacheResult::ACCESS, + // }), + // ), + // ( + // "llc_write_miss".to_string(), + // Event::Cache(Cache { + // which: WhichCache::LL, + // operation: CacheOp::WRITE, + // result: CacheResult::MISS, + // }), + // ), + // ( + // "llc_prefetch_access".to_string(), + // Event::Cache(Cache { + // which: WhichCache::LL, + // operation: CacheOp::PREFETCH, + // result: CacheResult::ACCESS, + // }), + // ), + // ( + // "llc_prefetch_miss".to_string(), + // Event::Cache(Cache { + // which: WhichCache::LL, + // operation: CacheOp::PREFETCH, + // result: CacheResult::MISS, + // }), + // ), + ]); + + // Data TLB events + events.extend([ + ( + "dtlb_read_access".to_string(), + Event::Cache(Cache { + which: WhichCache::DTLB, + operation: CacheOp::READ, + result: CacheResult::ACCESS, + }), + ), + ( + "dtlb_read_miss".to_string(), + Event::Cache(Cache { + which: WhichCache::DTLB, + operation: CacheOp::READ, + result: CacheResult::MISS, + }), + ), + // ( + // "dtlb_write_access".to_string(), + // Event::Cache(Cache { + // which: WhichCache::DTLB, + // operation: CacheOp::WRITE, + // result: CacheResult::ACCESS, + // }), + // ), + // ( + // "dtlb_write_miss".to_string(), + // Event::Cache(Cache { + // which: WhichCache::DTLB, + // operation: CacheOp::WRITE, + // result: CacheResult::MISS, + // }), + // ), + ]); + + // Instruction TLB events + events.extend([ + ( + "itlb_read_access".to_string(), + Event::Cache(Cache { + which: WhichCache::ITLB, + operation: CacheOp::READ, + result: CacheResult::ACCESS, + }), + ), + ( + "itlb_read_miss".to_string(), + Event::Cache(Cache { + which: WhichCache::ITLB, + operation: CacheOp::READ, + result: CacheResult::MISS, + }), + ), + ]); + + // Branch Prediction Unit events + events.extend([ + ( + "bpu_read_access".to_string(), + Event::Cache(Cache { + which: WhichCache::BPU, + operation: CacheOp::READ, + result: CacheResult::ACCESS, + }), + ), + ( + "bpu_read_miss".to_string(), + Event::Cache(Cache { + which: WhichCache::BPU, + operation: CacheOp::READ, + result: CacheResult::MISS, + }), + ), + ]); + + // Sort events by name for consistent ordering + events.sort_unstable_by_key(|(name, _)| name.clone()); + + events +} diff --git a/src/energy.rs b/src/energy.rs index ed8b65e..a26a2b3 100644 --- a/src/energy.rs +++ b/src/energy.rs @@ -1,5 +1,5 @@ mod budget; -mod rapl; +pub mod rapl; mod trackers; use std::collections::{BTreeSet, HashMap}; diff --git a/src/energy/trackers/perf.rs b/src/energy/trackers/perf.rs index 7c26bdb..17bc693 100644 --- a/src/energy/trackers/perf.rs +++ b/src/energy/trackers/perf.rs @@ -24,14 +24,22 @@ static EVENT_TYPES: &[(f32, Event)] = &[ impl Estimator for PerfEstimator { fn start_trace(&mut self, pid: u64) { - let Ok(mut group) = Group::new() else { + let Ok(mut group) = Group::new_with_pid_and_cpu(-1, 0) else { eprintln!("Failed to create performance counter group for PID {}", pid); return; }; let counters: Result<Vec<_>, _> = EVENT_TYPES .iter() - .map(|(_, kind)| Builder::new().group(&mut group).kind(kind.clone()).build()) + .map(|(_, kind)| { + Builder::new() + .group(&mut group) + .kind(kind.clone()) + // .observe_pid(pid as i32) + .observe_pid(-1) + .one_cpu(0) + .build() + }) .collect(); let counters = match counters { diff --git a/src/main.rs b/src/main.rs index d3c9628..f3d4992 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,9 @@ mod bpf_skel; +use benchmark::BenchmarkScheduler; pub use bpf_skel::*; pub mod bpf_intf; +mod benchmark; mod e_core_selector; mod energy; mod freq; @@ -28,6 +30,14 @@ fn main() -> Result<()> { .required(false), ) .arg( + Arg::new("benchmark") + .short('b') + .long("benchmark") + .help("Use this flag to enable benckmarking mode") + .action(ArgAction::SetTrue) + .required(false), + ) + .arg( Arg::new("power_cap") .long("energy_cap") .help("Set a power cap for the processor") @@ -38,9 +48,16 @@ fn main() -> Result<()> { let power_cap = *matches.get_one::<u64>("power_cap").unwrap_or(&u64::MAX); let use_mocking = matches.get_flag("mock"); + let benchmark = matches.get_flag("benchmark"); // Initialize and load the scheduler. let mut open_object = MaybeUninit::uninit(); + let log_path = "logs.csv"; + if benchmark { + let mut sched = BenchmarkScheduler::init(&mut open_object, log_path)?; + sched.run(); + return Ok(()); + } loop { let mut sched = Scheduler::init(&mut open_object, use_mocking, power_cap)?; if !sched.run()?.should_restart() { |