use crate::bpf::*; use crate::energy::rapl; use anyhow::Result; use csv::Writer; use libbpf_rs::OpenObject; use perf_event::{ events::{Event, Hardware}, Builder, Counter, Group, }; use scx_utils::Topology; use scx_utils::UserExitInfo; use std::mem::MaybeUninit; use std::process; use std::thread; use std::time::{Duration, Instant}; use std::{collections::HashMap, ops::Range}; use std::{fs::File, sync::atomic::AtomicI32}; const SLICE_US: u64 = 5000; const LOG_INTERVAL_MS: u64 = 10; const ROUNDS_PER_CPU: usize = 1000; // Number of rounds before changing cpu type Pid = i32; static CPU: AtomicI32 = AtomicI32::new(0); pub struct BenchmarkScheduler<'a> { bpf: BpfScheduler<'a>, own_pid: Pid, log_path: String, p_cores: Range, e_cores: Range, } // Represents a single measurement point in time struct Measurement { timestamp: Instant, energy: Option, frequency: Option, counter_values: Vec>, } impl Measurement { fn new() -> Self { Self { timestamp: Instant::now(), energy: None, frequency: None, counter_values: Vec::new(), } } // Take a measurement with the given counter group fn take(counters: &[(String, Counter)], group: &mut Group, cpu_id: u32) -> Result { let mut measurement = Self::new(); // Read energy // Basline is 4.5W measurement.energy = rapl::read_package_energy().ok(); // Read CPU frequency measurement.frequency = read_cpu_frequency(cpu_id); // Read performance counters let counts = group.read()?; let counters: HashMap<_, _> = counters.iter().map(|(a, b)| (a.clone(), b)).collect(); // Extract counter values for (name, _) in define_available_events() { let Some(counter) = counters.get(&name) else { measurement.counter_values.push(None); continue; }; measurement .counter_values .push(counts.get(counter).cloned()); } Ok(measurement) } // Calculate the difference between two measurements fn diff(&self, previous: &Measurement) -> MeasurementDiff { let duration_ms = self .timestamp .duration_since(previous.timestamp) .as_millis() as u64; // Calculate energy delta let energy_delta = match (previous.energy, self.energy) { (Some(prev), Some(curr)) => curr - prev, _ => 0.0, }; MeasurementDiff { timestamp: self.timestamp, duration_ms, energy_delta, frequency: self.frequency, counter_deltas: self.counter_values.clone(), e_core: 0, } } } // Represents the difference between two measurements struct MeasurementDiff { timestamp: Instant, e_core: u8, duration_ms: u64, energy_delta: f64, frequency: Option, counter_deltas: Vec>, } impl MeasurementDiff { // Write this diff as a CSV record fn write_csv_record(&self, writer: &mut Writer) -> Result<()> { // Prepare CSV record let mut record = vec![ self.timestamp.elapsed().as_secs_f64().to_string(), self.duration_ms.to_string(), self.e_core.to_string(), self.energy_delta.to_string(), self.frequency .map(|f| f.to_string()) .unwrap_or_else(|| "N/A".to_string()), ]; record.extend( self.counter_deltas .iter() .map(|x| x.map(|x| x.to_string()).unwrap_or_default()), ); // Write record writer.write_record(&record)?; // writer.flush()?; Ok(()) } } impl<'a> BenchmarkScheduler<'a> { pub fn init(open_object: &'a mut MaybeUninit, log_path: &str) -> Result { let bpf = BpfScheduler::init( open_object, 0, // exit_dump_len (default) false, // partial (include all tasks) false, // debug mode off )?; println!("Initializing benchmark scheduler (single-core profiling mode)"); let topology = Topology::new().unwrap(); let mut e_core_ids = Vec::new(); let mut p_core_ids = Vec::new(); for (id, cpu) in &topology.all_cpus { match cpu.core_type { scx_utils::CoreType::Big { turbo: _ } => p_core_ids.push(*id as i32), scx_utils::CoreType::Little => e_core_ids.push(*id as i32), } } // We assume that the CPU IDs for each core type are assigned contiguously. e_core_ids.sort(); p_core_ids.sort(); let e_cores = *e_core_ids.first().unwrap_or(&0)..(*e_core_ids.last().unwrap_or(&0) + 1); let p_cores = *p_core_ids.first().unwrap_or(&0)..(*p_core_ids.last().unwrap_or(&0) + 1); Ok(Self { bpf, own_pid: process::id() as i32, log_path: log_path.to_string(), p_cores, e_cores, }) } fn consume_all_tasks(&mut self) -> Result<()> { while let Ok(Some(task)) = self.bpf.dequeue_task() { let mut dispatched_task = DispatchedTask::new(&task); let cpu = CPU.load(std::sync::atomic::Ordering::Relaxed); if task.pid == self.own_pid { dispatched_task.cpu = cpu + 1; } else { // Schedule all other tasks on core 0 dispatched_task.cpu = cpu; } dispatched_task.slice_ns = SLICE_US; // Dispatch the task if let Err(e) = self.bpf.dispatch_task(&dispatched_task) { eprintln!("Failed to dispatch task: {}", e); } } // Notify BPF we're done processing tasks self.bpf.notify_complete(0); Ok(()) } fn start_measurement_thread(&self) -> thread::JoinHandle<()> { let log_path = self.log_path.clone(); let e_cores = self.e_cores.clone(); let p_cores = self.p_cores.clone(); thread::spawn(move || { if let Err(e) = run_measurement_loop(log_path, p_cores.start, e_cores.start) { eprintln!("Measurement thread error: {:?}", e); } }) } pub fn run(&mut self) -> Result { // Start the measurement thread self.start_measurement_thread(); // Main scheduling loop while !self.bpf.exited() { // Process all tasks self.consume_all_tasks()?; } self.bpf.shutdown_and_report() } } // Main measurement loop fn run_measurement_loop(log_path: String, p_core: i32, e_core: i32) -> Result<()> { // Define available hardware counters let available_events = define_available_events(); // Initialize CSV writer with header let mut csv_writer = initialize_csv_writer(&log_path, &available_events)?; let mut round_counter = 0; let mut cpu_to_monitor = p_core; println!("Monitoring: {cpu_to_monitor}"); // Main measurement loop loop { // println!("Starting new counter group (round {})", round_counter); round_counter += 1; let is_e_core = round_counter % 2 == 0; cpu_to_monitor = if is_e_core { p_core } else { e_core }; CPU.store(cpu_to_monitor, std::sync::atomic::Ordering::Relaxed); // Create a new perf group let mut group = match Group::new_with_pid_and_cpu(-1, cpu_to_monitor) { Ok(g) => g, Err(e) => { eprintln!("Failed to create perf group: {}", e); thread::sleep(Duration::from_millis(100)); continue; } }; let selected_events = available_events.clone(); // Build counters let mut counters = Vec::new(); for (name, event) in selected_events { match Builder::new() .group(&mut group) .kind(event.clone()) .observe_pid(-1) .one_cpu(cpu_to_monitor.try_into().unwrap()) .build() { Ok(counter) => { // println!("Successfully created counter for {}", name); counters.push((name.clone(), counter)); } Err(e) => { eprintln!("Failed to create counter for {}: {}", name, e); } } } group.enable().unwrap(); if counters.is_empty() { eprintln!("Failed to create any counters, retrying..."); thread::sleep(Duration::from_millis(100)); continue; } // Enable the counter group if let Err(e) = group.enable() { eprintln!("Failed to enable perf group: {}", e); thread::sleep(Duration::from_millis(100)); continue; } // println!( // "Successfully enabled counter group with {} counters", // counters.len() // ); // Take initial measurement let mut prev_measurement = match Measurement::take(&counters, &mut group, cpu_to_monitor as u32) { Ok(m) => m, Err(e) => { eprintln!("Failed to take initial measurement: {}", e); thread::sleep(Duration::from_millis(100)); continue; } }; // println!("Took initial measurement"); // Monitor for several rounds before changing CPU for round in 0..ROUNDS_PER_CPU { group.enable().unwrap(); group.reset().unwrap(); // Wait for the sampling interval thread::sleep(Duration::from_millis(LOG_INTERVAL_MS)); // Take current measurement let curr_measurement = match Measurement::take(&counters, &mut group, cpu_to_monitor as u32) { Ok(m) => m, Err(e) => { eprintln!("Failed to take measurement in round {}: {}", round, e); continue; } }; // Calculate difference and write to CSV let mut diff = curr_measurement.diff(&prev_measurement); // println!( // "Measurement diff: duration={}ms, energy={}J", // diff.duration_ms, diff.energy_delta // ); diff.e_core = if is_e_core { 1 } else { 0 }; // We have to throw away the first few measurements after changing from one core to the other to avoid noise from tasks executing on both cores at the same time if round >= 250 { if let Err(e) = diff.write_csv_record(&mut csv_writer) { eprintln!("Failed to write CSV record: {}", e); } } // Current becomes previous for next iteration prev_measurement = curr_measurement; } let _ = group.disable(); } } fn initialize_csv_writer( log_path: &str, available_events: &[(String, Event)], ) -> Result> { let file = File::create(log_path)?; let mut csv_writer = Writer::from_writer(file); // Write header with all possible counter names let mut header = vec![ "timestamp".to_string(), "duration_ms".to_string(), "is_e_core".to_string(), "package_power_j".to_string(), "cpu_frequency_mhz".to_string(), ]; // Add counter deltas for (name, _) in available_events { header.push(name.into()); } csv_writer.write_record(&header)?; csv_writer.flush()?; Ok(csv_writer) } pub fn read_cpu_frequency(cpu_id: u32) -> Option { // Try to read frequency from sysfs let freq_path = format!( "/sys/devices/system/cpu/cpu{}/cpufreq/scaling_cur_freq", cpu_id ); match std::fs::read_to_string(freq_path) { Ok(content) => { // Convert from kHz to MHz content.trim().parse::().ok().map(|freq| freq / 1000.0) } Err(_) => None, } } fn define_available_events() -> Vec<(String, Event)> { let mut events = Vec::new(); // Hardware events events.extend([ ( "cpu_cycles".to_string(), Event::Hardware(Hardware::CPU_CYCLES), ), ( "instructions".to_string(), Event::Hardware(Hardware::INSTRUCTIONS), ), ( "cache_references".to_string(), Event::Hardware(Hardware::CACHE_REFERENCES), ), ( "cache_misses".to_string(), Event::Hardware(Hardware::CACHE_MISSES), ), // ( // "branch_instructions".to_string(), // Event::Hardware(Hardware::BRANCH_INSTRUCTIONS), // ), ( "branch_misses".to_string(), Event::Hardware(Hardware::BRANCH_MISSES), ), ( "ref_cpu_cycles".to_string(), Event::Hardware(Hardware::REF_CPU_CYCLES), ), // ( // "task_clock".to_string(), // Event::Software(Software::TASK_CLOCK), // ), // ( // "stalled-cycles-frontend".to_string(), // Event::Hardware(Hardware::STALLED_CYCLES_FRONTEND), // ), ]); // L1 Data Cache events events.extend([ // ( // "l1d_read_access".to_string(), // Event::Cache(Cache { // which: WhichCache::L1D, // operation: CacheOp::READ, // result: CacheResult::ACCESS, // }), // ), // ( // "l1d_read_miss".to_string(), // Event::Cache(Cache { // which: WhichCache::L1D, // operation: CacheOp::READ, // result: CacheResult::MISS, // }), // ), ]); // events.push(( // "fp_ops_retired_by_type.all".to_string(), // Event::Raw(4, 0xFF0A), // EventCode 0x0A with UMask 0xFF // )); // events.push(( // "sse_avx_ops_retired.all".to_string(), // Event::Raw(4, 0xFF0B), // EventCode 0x0B with UMask 0xFF // )); // L1 Instruction Cache events events.extend([ // ( // "l1i_read_access".to_string(), // Event::Cache(Cache { // which: WhichCache::L1I, // operation: CacheOp::READ, // result: CacheResult::ACCESS, // }), // ), // ( // "l1i_read_miss".to_string(), // Event::Cache(Cache { // which: WhichCache::L1I, // operation: CacheOp::READ, // result: CacheResult::MISS, // }), // ), ]); // Data TLB events events.extend([ // ( // "dtlb_read_access".to_string(), // Event::Cache(Cache { // which: WhichCache::DTLB, // operation: CacheOp::READ, // result: CacheResult::ACCESS, // }), // ), // ( // "dtlb_read_miss".to_string(), // Event::Cache(Cache { // which: WhichCache::DTLB, // operation: CacheOp::READ, // result: CacheResult::MISS, // }), // ), ]); // Instruction TLB events events.extend([ // ( // "itlb_read_access".to_string(), // Event::Cache(Cache { // which: WhichCache::ITLB, // operation: CacheOp::READ, // result: CacheResult::ACCESS, // }), // ), // ( // "itlb_read_miss".to_string(), // Event::Cache(Cache { // which: WhichCache::ITLB, // operation: CacheOp::READ, // result: CacheResult::MISS, // }), // ), ]); // Branch Prediction Unit events events.extend([ // ( // "bpu_read_access".to_string(), // Event::Cache(Cache { // which: WhichCache::BPU, // operation: CacheOp::READ, // result: CacheResult::ACCESS, // }), // ), // ( // "bpu_read_miss".to_string(), // Event::Cache(Cache { // which: WhichCache::BPU, // operation: CacheOp::READ, // result: CacheResult::MISS, // }), // ), ]); // Sort events by name for consistent ordering events.sort_unstable_by_key(|(name, _)| name.clone()); events }