import pandas as pd import numpy as np from sklearn.impute import KNNImputer from sklearn.preprocessing import StandardScaler from sklearn.feature_selection import mutual_info_regression from sklearn.model_selection import train_test_split import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset import matplotlib.pyplot as plt # Step 1: Load and prepare the data def load_data(csv_path): # Read the CSV file df = pd.read_csv(csv_path) # Extract target (power usage) y = df['package_power_j'].values # Extract features (CPU frequency and performance counters) # Skip timestamp and duration_ms columns X = df.iloc[:, 3:].copy() # Starting from cpu_frequency_mhz # Print information about the dataset print(f"Loaded dataset with {X.shape[0]} samples and {X.shape[1]} features") print(f"Feature names: {X.columns.tolist()}") return X, y # Step 2: Handle missing values using KNN imputation def impute_missing_values(X): # Replace empty strings with NaN X = X.replace('', np.nan) # Convert all values to float X = X.astype(float) # Count missing values per column missing_counts = X.isna().sum() print("Missing values per column:") for col, count in missing_counts.items(): if count > 0: print(f" {col}: {count} ({count/len(X)*100:.1f}%)") # Impute missing values using KNN imputer = KNNImputer(n_neighbors=5) X_imputed = imputer.fit_transform(X) return pd.DataFrame(X_imputed, columns=X.columns) # Step 3: Feature importance analysis def analyze_feature_importance(X, y): # Calculate mutual information scores mi_scores = mutual_info_regression(X, y) # Create a DataFrame of features and their importance scores feature_importance = pd.DataFrame({ 'Feature': X.columns, 'Importance': mi_scores }) # Sort by importance feature_importance = feature_importance.sort_values('Importance', ascending=False) # Plot feature importance plt.figure(figsize=(10, 6)) plt.barh(feature_importance['Feature'][:10], feature_importance['Importance'][:10]) plt.xlabel('Mutual Information Score') plt.title('Top 10 Most Important Features') plt.tight_layout() plt.savefig('feature_importance.png') print("\nTop 5 most important features:") for i, (_, row) in enumerate(feature_importance.head(15).iterrows(), 1): print(f"{i}. {row['Feature']} - importance: {row['Importance']:.4f}") return feature_importance # Step 4: Define the neural network model in PyTorch class PowerEstimator(nn.Module): def __init__(self, input_size): super(PowerEstimator, self).__init__() self.model = nn.Sequential( nn.Linear(input_size, 64), nn.ReLU(), nn.Dropout(0.2), nn.Linear(64, 32), nn.ReLU(), nn.Linear(32, 1) ) def forward(self, x): return self.model(x) # Step 5: Train the model def train_model(X, y, batch_size=32, epochs=100, lr=0.001): # Split data into training and validation sets X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42) # Scale the features scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_val_scaled = scaler.transform(X_val) # Convert to PyTorch tensors X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32) y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1) X_val_tensor = torch.tensor(X_val_scaled, dtype=torch.float32) y_val_tensor = torch.tensor(y_val, dtype=torch.float32).view(-1, 1) # Create data loaders train_dataset = TensorDataset(X_train_tensor, y_train_tensor) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) val_dataset = TensorDataset(X_val_tensor, y_val_tensor) val_loader = DataLoader(val_dataset, batch_size=batch_size) # Initialize model, loss function, and optimizer model = PowerEstimator(X_train.shape[1]) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=lr) # Training loop train_losses = [] val_losses = [] for epoch in range(epochs): # Training model.train() train_loss = 0.0 for inputs, targets in train_loader: optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, targets) loss.backward() optimizer.step() train_loss += loss.item() * inputs.size(0) train_loss /= len(train_loader.dataset) train_losses.append(train_loss) # Validation model.eval() val_loss = 0.0 with torch.no_grad(): for inputs, targets in val_loader: outputs = model(inputs) loss = criterion(outputs, targets) val_loss += loss.item() * inputs.size(0) val_loss /= len(val_loader.dataset) val_losses.append(val_loss) # Print progress if (epoch + 1) % 10 == 0: print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}") # Plot training history plt.figure(figsize=(10, 5)) plt.plot(train_losses, label='Training Loss') plt.plot(val_losses, label='Validation Loss') plt.xlabel('Epoch') plt.ylabel('MSE Loss') plt.title('Training and Validation Loss') plt.legend() plt.savefig('training_history.png') return model, scaler # Step 6: Evaluate the model def evaluate_model(model, X, y, scaler): # Scale the features X_scaled = scaler.transform(X) # Convert to PyTorch tensors X_tensor = torch.tensor(X_scaled, dtype=torch.float32) y_tensor = torch.tensor(y, dtype=torch.float32) # Make predictions model.eval() with torch.no_grad(): y_pred = model(X_tensor).squeeze().numpy() # Calculate metrics mse = np.mean((y_pred - y) ** 2) rmse = np.sqrt(mse) mae = np.mean(np.abs(y_pred - y)) r2 = 1 - (np.sum((y - y_pred) ** 2) / np.sum((y - np.mean(y)) ** 2)) print("\nModel Evaluation:") print(f"MSE: {mse:.6f}") print(f"RMSE: {rmse:.6f}") print(f"MAE: {mae:.6f}") print(f"R²: {r2:.6f}") # Plot actual vs predicted values plt.figure(figsize=(8, 8)) plt.scatter(y, y_pred, alpha=0.5) plt.plot([min(y), max(y)], [min(y), max(y)], 'r--') plt.xlabel('Actual Power Usage (J)') plt.ylabel('Predicted Power Usage (J)') plt.title('Actual vs Predicted Power Usage') plt.tight_layout() plt.savefig('prediction_scatter.png') return mse, rmse, mae, r2 def remove_outliers(X, y, threshold=3): # Scale features for distance calculations scaler = StandardScaler() X_scaled = scaler.fit_transform(X) # Calculate z-scores for target values z_scores = np.abs((y - np.mean(y)) / np.std(y)) # Identify inliers inliers = z_scores < threshold # Remove outliers X_clean = X.iloc[inliers] y_clean = y[inliers] removed = len(y) - len(y_clean) print(f"Removed {removed} outliers ({removed/len(y)*100:.2f}% of data)") return X_clean, y_clean # Step 7: Main function def main(): csv_path = 'logs.csv' # Load and prepare data print("Loading data...") X, y = load_data(csv_path) X, y = remove_outliers(X, y) # Impute missing values print("\nImputing missing values...") X_imputed = impute_missing_values(X) # Analyze feature importance print("\nAnalyzing feature importance...") feature_importance = analyze_feature_importance(X_imputed, y) # Train the model print("\nTraining model...") model, scaler = train_model(X_imputed, y, batch_size=8, epochs=100) # Evaluate the model print("\nEvaluating model...") evaluate_model(model, X_imputed, y, scaler) print("\nDone!") if __name__ == "__main__": main()