summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDennis Kobert <dennis@kobert.dev>2025-03-10 21:36:30 +0100
committerDennis Kobert <dennis@kobert.dev>2025-03-25 15:52:04 +0100
commit776dee32aca73ded5c0720afbe25ef2aae7b67f3 (patch)
tree1e4c8b9bde5c944869134e0ed751315efe09198a /src
parentc2d63b96cbbd3be653fa0023fadae7df3babaf32 (diff)
Implement benchmarking
Diffstat (limited to 'src')
-rw-r--r--src/benchmark.rs664
-rw-r--r--src/energy.rs2
-rw-r--r--src/energy/trackers/perf.rs12
-rw-r--r--src/main.rs17
4 files changed, 692 insertions, 3 deletions
diff --git a/src/benchmark.rs b/src/benchmark.rs
new file mode 100644
index 0000000..606f29b
--- /dev/null
+++ b/src/benchmark.rs
@@ -0,0 +1,664 @@
+use crate::bpf::*;
+use crate::energy::rapl;
+use anyhow::Result;
+use csv::Writer;
+use libbpf_rs::OpenObject;
+use perf_event::{
+ events::{Cache, CacheOp, CacheResult, Event, Hardware, Software, WhichCache},
+ Builder, Counter, Group,
+};
+use rand::seq::IteratorRandom;
+use scx_utils::UserExitInfo;
+use std::collections::HashMap;
+use std::fs::File;
+use std::mem::MaybeUninit;
+use std::process;
+use std::thread;
+use std::time::{Duration, Instant};
+
+const SLICE_US: u64 = 50000;
+const LOG_INTERVAL_MS: u64 = 10; // Log every 1 second
+ // const RESHUFFLE_ROUNDS: usize = 5; // Number of rounds before reshuffling counters
+const RESHUFFLE_ROUNDS: usize = 1; // Number of rounds before reshuffling counters
+const MAX_COUNTERS_AT_ONCE: usize = 5;
+
+type Pid = i32;
+
+pub struct BenchmarkScheduler<'a> {
+ bpf: BpfScheduler<'a>,
+ own_pid: Pid,
+ log_path: String,
+}
+
+// Represents a single measurement point in time
+struct Measurement {
+ timestamp: Instant,
+ energy: Option<f64>,
+ frequency: Option<f64>,
+ counter_values: Vec<Option<u64>>,
+}
+
+impl Measurement {
+ fn new() -> Self {
+ Self {
+ timestamp: Instant::now(),
+ energy: None,
+ frequency: None,
+ counter_values: Vec::new(),
+ }
+ }
+
+ // Take a measurement with the given counter group
+ fn take(counters: &[(String, Counter)], group: &mut Group) -> Result<Self> {
+ let mut measurement = Self::new();
+
+ // Read energy
+ // Basline is 4.5W
+ measurement.energy = rapl::read_package_energy().ok();
+
+ // Read CPU frequency
+ measurement.frequency = read_cpu_frequency(0);
+
+ // Read performance counters
+ let counts = group.read()?;
+ // dbg!(&counts);
+
+ let counters: HashMap<_, _> = counters.iter().map(|(a, b)| (a.clone(), b)).collect();
+
+ // Extract counter values
+ for (name, _) in define_available_events() {
+ let Some(counter) = counters.get(&name) else {
+ measurement.counter_values.push(None);
+ continue;
+ };
+ measurement
+ .counter_values
+ .push(counts.get(counter).cloned());
+ }
+
+ Ok(measurement)
+ }
+
+ // Calculate the difference between two measurements
+ fn diff(&self, previous: &Measurement) -> MeasurementDiff {
+ let duration_ms = self
+ .timestamp
+ .duration_since(previous.timestamp)
+ .as_millis() as u64;
+
+ // Calculate energy delta
+ let energy_delta = match (previous.energy, self.energy) {
+ (Some(prev), Some(curr)) => curr - prev,
+ _ => 0.0,
+ };
+
+ MeasurementDiff {
+ timestamp: self.timestamp,
+ duration_ms,
+ energy_delta,
+ frequency: self.frequency,
+ counter_deltas: self.counter_values.clone(),
+ }
+ }
+}
+
+// Represents the difference between two measurements
+struct MeasurementDiff {
+ timestamp: Instant,
+ duration_ms: u64,
+ energy_delta: f64,
+ frequency: Option<f64>,
+ counter_deltas: Vec<Option<u64>>,
+}
+
+impl MeasurementDiff {
+ // Write this diff as a CSV record
+ fn write_csv_record(&self, writer: &mut Writer<File>) -> Result<()> {
+ // Prepare CSV record
+ let mut record = vec![
+ self.timestamp.elapsed().as_secs_f64().to_string(),
+ self.duration_ms.to_string(),
+ self.energy_delta.to_string(),
+ self.frequency
+ .map(|f| f.to_string())
+ .unwrap_or_else(|| "N/A".to_string()),
+ ];
+ record.extend(
+ self.counter_deltas
+ .iter()
+ .map(|x| x.map(|x| x.to_string()).unwrap_or_default()),
+ );
+
+ // Write record
+ writer.write_record(&record)?;
+ writer.flush()?;
+
+ Ok(())
+ }
+}
+
+impl<'a> BenchmarkScheduler<'a> {
+ pub fn init(open_object: &'a mut MaybeUninit<OpenObject>, log_path: &str) -> Result<Self> {
+ let bpf = BpfScheduler::init(
+ open_object,
+ 0, // exit_dump_len (default)
+ false, // partial (include all tasks)
+ false, // debug mode off
+ )?;
+
+ println!("Initializing benchmark scheduler (single-core profiling mode)");
+
+ Ok(Self {
+ bpf,
+ own_pid: process::id() as i32,
+ log_path: log_path.to_string(),
+ })
+ }
+
+ fn consume_all_tasks(&mut self) -> Result<()> {
+ while let Ok(Some(task)) = self.bpf.dequeue_task() {
+ let mut dispatched_task = DispatchedTask::new(&task);
+
+ // If it's our own process, schedule it to core 1
+ if task.pid == self.own_pid {
+ dispatched_task.cpu = 1;
+ // dispatched_task.flags |= RL_CPU_ANY as u64;
+ } else {
+ // Schedule all other tasks on core 0
+ // dispatched_task.flags |= RL_CPU_ANY as u64;
+ dispatched_task.cpu = 0;
+ }
+
+ dispatched_task.slice_ns = SLICE_US;
+
+ // Dispatch the task
+ if let Err(e) = self.bpf.dispatch_task(&dispatched_task) {
+ eprintln!("Failed to dispatch task: {}", e);
+ }
+ }
+
+ // Notify BPF we're done processing tasks
+ self.bpf.notify_complete(0);
+
+ Ok(())
+ }
+
+ fn start_measurement_thread(&self) -> thread::JoinHandle<()> {
+ let log_path = self.log_path.clone();
+
+ thread::spawn(move || {
+ if let Err(e) = run_measurement_loop(log_path) {
+ eprintln!("Measurement thread error: {:?}", e);
+ }
+ })
+ }
+
+ pub fn run(&mut self) -> Result<UserExitInfo> {
+ // Start the measurement thread
+ self.start_measurement_thread();
+
+ // Main scheduling loop
+ while !self.bpf.exited() {
+ // Process all tasks
+ self.consume_all_tasks()?;
+ }
+
+ self.bpf.shutdown_and_report()
+ }
+}
+
+// Main measurement loop
+fn run_measurement_loop(log_path: String) -> Result<()> {
+ // Define available hardware counters
+ let available_events = define_available_events();
+
+ // Initialize CSV writer with header
+ let mut csv_writer = initialize_csv_writer(&log_path, &available_events)?;
+
+ let mut rng = rand::rng();
+ let mut round_counter = 0;
+
+ // Main measurement loop
+ loop {
+ // println!("Starting new counter group (round {})", round_counter);
+ round_counter += 1;
+
+ let cpu = 0;
+ // Create a new perf group
+ let mut group = match Group::new_with_pid_and_cpu(-1, cpu) {
+ Ok(g) => g,
+ Err(e) => {
+ eprintln!("Failed to create perf group: {}", e);
+ thread::sleep(Duration::from_millis(100));
+ continue;
+ }
+ };
+
+ // Select random subset of counters
+ let selected_events = available_events
+ .iter()
+ .choose_multiple(&mut rng, MAX_COUNTERS_AT_ONCE);
+
+ // println!("Selected {} events for monitoring", selected_events.len());
+
+ // Build counters
+ let mut counters = Vec::new();
+ for (name, event) in &selected_events {
+ match Builder::new()
+ .group(&mut group)
+ .kind(event.clone())
+ .observe_pid(-1)
+ .one_cpu(cpu.try_into().unwrap()) // Core 0 is where we're scheduling tasks
+ .build()
+ {
+ Ok(counter) => {
+ // println!("Successfully created counter for {}", name);
+ counters.push((name.clone(), counter));
+ }
+ Err(e) => {
+ eprintln!("Failed to create counter for {}: {}", name, e);
+ }
+ }
+ }
+ group.enable().unwrap();
+
+ if counters.is_empty() {
+ eprintln!("Failed to create any counters, retrying...");
+ thread::sleep(Duration::from_millis(100));
+ continue;
+ }
+
+ // Enable the counter group
+ if let Err(e) = group.enable() {
+ eprintln!("Failed to enable perf group: {}", e);
+ thread::sleep(Duration::from_millis(100));
+ continue;
+ }
+
+ // println!(
+ // "Successfully enabled counter group with {} counters",
+ // counters.len()
+ // );
+
+ // Take initial measurement
+ let mut prev_measurement = match Measurement::take(&counters, &mut group) {
+ Ok(m) => m,
+ Err(e) => {
+ eprintln!("Failed to take initial measurement: {}", e);
+ thread::sleep(Duration::from_millis(100));
+ continue;
+ }
+ };
+
+ // println!("Took initial measurement");
+
+ // Monitor for several rounds before reshuffling
+ for round in 0..RESHUFFLE_ROUNDS {
+ group.enable().unwrap();
+ group.reset().unwrap();
+ // Wait for the sampling interval
+ thread::sleep(Duration::from_millis(LOG_INTERVAL_MS));
+
+ // Take current measurement
+ let curr_measurement = match Measurement::take(&counters, &mut group) {
+ Ok(m) => m,
+ Err(e) => {
+ eprintln!("Failed to take measurement in round {}: {}", round, e);
+ continue;
+ }
+ };
+
+ // Calculate difference and write to CSV
+ let diff = curr_measurement.diff(&prev_measurement);
+ // println!(
+ // "Measurement diff: duration={}ms, energy={}J",
+ // diff.duration_ms, diff.energy_delta
+ // );
+
+ if let Err(e) = diff.write_csv_record(&mut csv_writer) {
+ eprintln!("Failed to write CSV record: {}", e);
+ }
+
+ // Current becomes previous for next iteration
+ prev_measurement = curr_measurement;
+ }
+
+ let _ = group.disable();
+ // panic!();
+ }
+}
+
+fn initialize_csv_writer(
+ log_path: &str,
+ available_events: &[(String, Event)],
+) -> Result<Writer<File>> {
+ let file = File::create(log_path)?;
+ let mut csv_writer = Writer::from_writer(file);
+
+ // Write header with all possible counter names
+ let mut header = vec![
+ "timestamp".to_string(),
+ "duration_ms".to_string(),
+ "package_power_j".to_string(),
+ "cpu_frequency_mhz".to_string(),
+ ];
+
+ // Add counter deltas
+ for (name, _) in available_events {
+ header.push(name.into());
+ }
+
+ csv_writer.write_record(&header)?;
+ csv_writer.flush()?;
+
+ Ok(csv_writer)
+}
+
+fn read_cpu_frequency(cpu_id: u32) -> Option<f64> {
+ // Try to read frequency from sysfs
+ let freq_path = format!(
+ "/sys/devices/system/cpu/cpu{}/cpufreq/scaling_cur_freq",
+ cpu_id
+ );
+
+ match std::fs::read_to_string(freq_path) {
+ Ok(content) => {
+ // Convert from kHz to MHz
+ content.trim().parse::<f64>().ok().map(|freq| freq / 1000.0)
+ }
+ Err(_) => None,
+ }
+}
+
+fn define_available_events() -> Vec<(String, Event)> {
+ let mut events = Vec::new();
+
+ // Hardware events
+ events.extend([
+ (
+ "cpu_cycles".to_string(),
+ Event::Hardware(Hardware::CPU_CYCLES),
+ ),
+ (
+ "instructions".to_string(),
+ Event::Hardware(Hardware::INSTRUCTIONS),
+ ),
+ (
+ "cache_references".to_string(),
+ Event::Hardware(Hardware::CACHE_REFERENCES),
+ ),
+ (
+ "cache_misses".to_string(),
+ Event::Hardware(Hardware::CACHE_MISSES),
+ ),
+ (
+ "branch_instructions".to_string(),
+ Event::Hardware(Hardware::BRANCH_INSTRUCTIONS),
+ ),
+ (
+ "branch_misses".to_string(),
+ Event::Hardware(Hardware::BRANCH_MISSES),
+ ),
+ // (
+ // "bus_cycles".to_string(),
+ // Event::Hardware(Hardware::BUS_CYCLES),
+ // ),
+ // (
+ // "stalled_cycles_frontend".to_string(),
+ // Event::Hardware(Hardware::STALLED_CYCLES_FRONTEND),
+ // ),
+ // (
+ // "stalled_cycles_backend".to_string(),
+ // Event::Hardware(Hardware::STALLED_CYCLES_BACKEND),
+ // ),
+ (
+ "ref_cpu_cycles".to_string(),
+ Event::Hardware(Hardware::REF_CPU_CYCLES),
+ ),
+ ]);
+
+ // Software events
+ events.extend([
+ (
+ "sw_cpu_clock".to_string(),
+ Event::Software(Software::CPU_CLOCK),
+ ),
+ (
+ "sw_task_clock".to_string(),
+ Event::Software(Software::TASK_CLOCK),
+ ),
+ (
+ "sw_page_faults".to_string(),
+ Event::Software(Software::PAGE_FAULTS),
+ ),
+ (
+ "sw_context_switches".to_string(),
+ Event::Software(Software::CONTEXT_SWITCHES),
+ ),
+ (
+ "sw_cpu_migrations".to_string(),
+ Event::Software(Software::CPU_MIGRATIONS),
+ ),
+ (
+ "sw_page_faults_min".to_string(),
+ Event::Software(Software::PAGE_FAULTS_MIN),
+ ),
+ (
+ "sw_page_faults_maj".to_string(),
+ Event::Software(Software::PAGE_FAULTS_MAJ),
+ ),
+ (
+ "sw_alignment_faults".to_string(),
+ Event::Software(Software::ALIGNMENT_FAULTS),
+ ),
+ (
+ "sw_emulation_faults".to_string(),
+ Event::Software(Software::EMULATION_FAULTS),
+ ),
+ ]);
+
+ // L1 Data Cache events
+ events.extend([
+ (
+ "l1d_read_access".to_string(),
+ Event::Cache(Cache {
+ which: WhichCache::L1D,
+ operation: CacheOp::READ,
+ result: CacheResult::ACCESS,
+ }),
+ ),
+ (
+ "l1d_read_miss".to_string(),
+ Event::Cache(Cache {
+ which: WhichCache::L1D,
+ operation: CacheOp::READ,
+ result: CacheResult::MISS,
+ }),
+ ),
+ // (
+ // "l1d_write_access".to_string(),
+ // Event::Cache(Cache {
+ // which: WhichCache::L1D,
+ // operation: CacheOp::WRITE,
+ // result: CacheResult::ACCESS,
+ // }),
+ // ),
+ // (
+ // "l1d_write_miss".to_string(),
+ // Event::Cache(Cache {
+ // which: WhichCache::L1D,
+ // operation: CacheOp::WRITE,
+ // result: CacheResult::MISS,
+ // }),
+ // ),
+ // (
+ // "l1d_prefetch_access".to_string(),
+ // Event::Cache(Cache {
+ // which: WhichCache::L1D,
+ // operation: CacheOp::PREFETCH,
+ // result: CacheResult::ACCESS,
+ // }),
+ // ),
+ // (
+ // "l1d_prefetch_miss".to_string(),
+ // Event::Cache(Cache {
+ // which: WhichCache::L1D,
+ // operation: CacheOp::PREFETCH,
+ // result: CacheResult::MISS,
+ // }),
+ // ),
+ ]);
+
+ // L1 Instruction Cache events
+ events.extend([
+ (
+ "l1i_read_access".to_string(),
+ Event::Cache(Cache {
+ which: WhichCache::L1I,
+ operation: CacheOp::READ,
+ result: CacheResult::ACCESS,
+ }),
+ ),
+ (
+ "l1i_read_miss".to_string(),
+ Event::Cache(Cache {
+ which: WhichCache::L1I,
+ operation: CacheOp::READ,
+ result: CacheResult::MISS,
+ }),
+ ),
+ ]);
+
+ // Last Level Cache events
+ events.extend([
+ // (
+ // "llc_read_access".to_string(),
+ // Event::Cache(Cache {
+ // which: WhichCache::LL,
+ // operation: CacheOp::READ,
+ // result: CacheResult::ACCESS,
+ // }),
+ // ),
+ // (
+ // "llc_read_miss".to_string(),
+ // Event::Cache(Cache {
+ // which: WhichCache::LL,
+ // operation: CacheOp::READ,
+ // result: CacheResult::MISS,
+ // }),
+ // ),
+ // (
+ // "llc_write_access".to_string(),
+ // Event::Cache(Cache {
+ // which: WhichCache::LL,
+ // operation: CacheOp::WRITE,
+ // result: CacheResult::ACCESS,
+ // }),
+ // ),
+ // (
+ // "llc_write_miss".to_string(),
+ // Event::Cache(Cache {
+ // which: WhichCache::LL,
+ // operation: CacheOp::WRITE,
+ // result: CacheResult::MISS,
+ // }),
+ // ),
+ // (
+ // "llc_prefetch_access".to_string(),
+ // Event::Cache(Cache {
+ // which: WhichCache::LL,
+ // operation: CacheOp::PREFETCH,
+ // result: CacheResult::ACCESS,
+ // }),
+ // ),
+ // (
+ // "llc_prefetch_miss".to_string(),
+ // Event::Cache(Cache {
+ // which: WhichCache::LL,
+ // operation: CacheOp::PREFETCH,
+ // result: CacheResult::MISS,
+ // }),
+ // ),
+ ]);
+
+ // Data TLB events
+ events.extend([
+ (
+ "dtlb_read_access".to_string(),
+ Event::Cache(Cache {
+ which: WhichCache::DTLB,
+ operation: CacheOp::READ,
+ result: CacheResult::ACCESS,
+ }),
+ ),
+ (
+ "dtlb_read_miss".to_string(),
+ Event::Cache(Cache {
+ which: WhichCache::DTLB,
+ operation: CacheOp::READ,
+ result: CacheResult::MISS,
+ }),
+ ),
+ // (
+ // "dtlb_write_access".to_string(),
+ // Event::Cache(Cache {
+ // which: WhichCache::DTLB,
+ // operation: CacheOp::WRITE,
+ // result: CacheResult::ACCESS,
+ // }),
+ // ),
+ // (
+ // "dtlb_write_miss".to_string(),
+ // Event::Cache(Cache {
+ // which: WhichCache::DTLB,
+ // operation: CacheOp::WRITE,
+ // result: CacheResult::MISS,
+ // }),
+ // ),
+ ]);
+
+ // Instruction TLB events
+ events.extend([
+ (
+ "itlb_read_access".to_string(),
+ Event::Cache(Cache {
+ which: WhichCache::ITLB,
+ operation: CacheOp::READ,
+ result: CacheResult::ACCESS,
+ }),
+ ),
+ (
+ "itlb_read_miss".to_string(),
+ Event::Cache(Cache {
+ which: WhichCache::ITLB,
+ operation: CacheOp::READ,
+ result: CacheResult::MISS,
+ }),
+ ),
+ ]);
+
+ // Branch Prediction Unit events
+ events.extend([
+ (
+ "bpu_read_access".to_string(),
+ Event::Cache(Cache {
+ which: WhichCache::BPU,
+ operation: CacheOp::READ,
+ result: CacheResult::ACCESS,
+ }),
+ ),
+ (
+ "bpu_read_miss".to_string(),
+ Event::Cache(Cache {
+ which: WhichCache::BPU,
+ operation: CacheOp::READ,
+ result: CacheResult::MISS,
+ }),
+ ),
+ ]);
+
+ // Sort events by name for consistent ordering
+ events.sort_unstable_by_key(|(name, _)| name.clone());
+
+ events
+}
diff --git a/src/energy.rs b/src/energy.rs
index ed8b65e..a26a2b3 100644
--- a/src/energy.rs
+++ b/src/energy.rs
@@ -1,5 +1,5 @@
mod budget;
-mod rapl;
+pub mod rapl;
mod trackers;
use std::collections::{BTreeSet, HashMap};
diff --git a/src/energy/trackers/perf.rs b/src/energy/trackers/perf.rs
index 7c26bdb..17bc693 100644
--- a/src/energy/trackers/perf.rs
+++ b/src/energy/trackers/perf.rs
@@ -24,14 +24,22 @@ static EVENT_TYPES: &[(f32, Event)] = &[
impl Estimator for PerfEstimator {
fn start_trace(&mut self, pid: u64) {
- let Ok(mut group) = Group::new() else {
+ let Ok(mut group) = Group::new_with_pid_and_cpu(-1, 0) else {
eprintln!("Failed to create performance counter group for PID {}", pid);
return;
};
let counters: Result<Vec<_>, _> = EVENT_TYPES
.iter()
- .map(|(_, kind)| Builder::new().group(&mut group).kind(kind.clone()).build())
+ .map(|(_, kind)| {
+ Builder::new()
+ .group(&mut group)
+ .kind(kind.clone())
+ // .observe_pid(pid as i32)
+ .observe_pid(-1)
+ .one_cpu(0)
+ .build()
+ })
.collect();
let counters = match counters {
diff --git a/src/main.rs b/src/main.rs
index d3c9628..f3d4992 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,7 +1,9 @@
mod bpf_skel;
+use benchmark::BenchmarkScheduler;
pub use bpf_skel::*;
pub mod bpf_intf;
+mod benchmark;
mod e_core_selector;
mod energy;
mod freq;
@@ -28,6 +30,14 @@ fn main() -> Result<()> {
.required(false),
)
.arg(
+ Arg::new("benchmark")
+ .short('b')
+ .long("benchmark")
+ .help("Use this flag to enable benckmarking mode")
+ .action(ArgAction::SetTrue)
+ .required(false),
+ )
+ .arg(
Arg::new("power_cap")
.long("energy_cap")
.help("Set a power cap for the processor")
@@ -38,9 +48,16 @@ fn main() -> Result<()> {
let power_cap = *matches.get_one::<u64>("power_cap").unwrap_or(&u64::MAX);
let use_mocking = matches.get_flag("mock");
+ let benchmark = matches.get_flag("benchmark");
// Initialize and load the scheduler.
let mut open_object = MaybeUninit::uninit();
+ let log_path = "logs.csv";
+ if benchmark {
+ let mut sched = BenchmarkScheduler::init(&mut open_object, log_path)?;
+ sched.run();
+ return Ok(());
+ }
loop {
let mut sched = Scheduler::init(&mut open_object, use_mocking, power_cap)?;
if !sched.run()?.should_restart() {