import pandas as pd import numpy as np from sklearn.impute import KNNImputer from sklearn.preprocessing import StandardScaler from sklearn.feature_selection import mutual_info_regression from sklearn.model_selection import train_test_split import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset import matplotlib.pyplot as plt # Step 1: Load and prepare the data def load_data(csv_path): # Read the CSV file df = pd.read_csv(csv_path) df = df.sample(frac=1).reset_index(drop=True) # Extract target (power usage) y = df['package_power_j'].values # Extract features (CPU frequency and performance counters) # Skip timestamp and duration_ms columns X = df.iloc[:, 3:].copy() # Starting from cpu_frequency_mhz # Print information about the dataset print(f"Loaded dataset with {X.shape[0]} samples and {X.shape[1]} features") print(f"Feature names: {X.columns.tolist()}") return X, y # Step 2: Handle missing values using KNN imputation def impute_missing_values(X): # Replace empty strings with NaN X = X.replace('', np.nan) # Convert all values to float X = X.astype(float) # Count missing values per column missing_counts = X.isna().sum() print("Missing values per column:") for col, count in missing_counts.items(): if count > 0: print(f" {col}: {count} ({count/len(X)*100:.1f}%)") # Impute missing values using KNN imputer = KNNImputer(n_neighbors=5) X_imputed = imputer.fit_transform(X) return pd.DataFrame(X_imputed, columns=X.columns) # Step 3: Feature importance analysis def analyze_feature_importance(X, y): # Calculate mutual information scores mi_scores = mutual_info_regression(X, y) # Create a DataFrame of features and their importance scores feature_importance = pd.DataFrame({ 'Feature': X.columns, 'Importance': mi_scores }) # Sort by importance feature_importance = feature_importance.sort_values('Importance', ascending=False) # Plot feature importance plt.figure(figsize=(10, 6)) plt.barh(feature_importance['Feature'][:10], feature_importance['Importance'][:10]) plt.xlabel('Mutual Information Score') plt.title('Top 10 Most Important Features') plt.tight_layout() plt.savefig('feature_importance.png') print("\nTop 5 most important features:") for i, (_, row) in enumerate(feature_importance.head(15).iterrows(), 1): print(f"{i}. {row['Feature']} - importance: {row['Importance']:.4f}") return feature_importance # Step 4: Define the neural network model in PyTorch class PowerEstimator(nn.Module): def __init__(self, input_size): super(PowerEstimator, self).__init__() self.input_lin = nn.Linear(input_size, 4) self.relu1 = nn.ReLU() self.lin2 = nn.Linear(4, 2) self.relu2 = nn.ReLU() self.lin3 = nn.Linear(2, 1) def forward(self, x): x = self.input_lin(x) x = self.relu1(x) x = self.lin2(x) x = self.relu2(x) x = self.lin3(x) return x # class PowerEstimator(nn.Module): # def __init__(self, input_size): # super(PowerEstimator, self).__init__() # self.model = nn.Sequential( # # nn.Linear(input_size, 128), # # nn.ReLU(), # # nn.Dropout(0.3), # # nn.Linear(128, 64), # # nn.ReLU(), # # nn.Dropout(0.2), # # nn.Linear(64, 32), # # nn.ReLU(), # # nn.Linear(32, 1) # nn.Linear(input_size, 16), # nn.ReLU(), # # // leaky relu # # nn.Dropout(0.2), # nn.Linear(16, 4), # nn.ReLU(), # nn.Linear(4, 1) # ) # def forward(self, x): # return self.model(x) # Step 5: Train the model def train_model(X, y, batch_size=32, epochs=100, lr=0.001, early_stopping_patience=10): # Split data into training and validation sets X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42) # Scale the features scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) # X_train_scaled = X_train X_val_scaled = scaler.transform(X_val) # X_val_scaled = X_val # Convert to PyTorch tensors X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32) y_train_tensor = torch.tensor(y_train, dtype=torch.float32).view(-1, 1) X_val_tensor = torch.tensor(X_val_scaled, dtype=torch.float32) y_val_tensor = torch.tensor(y_val, dtype=torch.float32).view(-1, 1) # Create data loaders train_dataset = TensorDataset(X_train_tensor, y_train_tensor) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) val_dataset = TensorDataset(X_val_tensor, y_val_tensor) val_loader = DataLoader(val_dataset, batch_size=batch_size) # Initialize model, loss function, and optimizer model = PowerEstimator(X_train.shape[1]) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=lr) print(model.state_dict().keys()); # Training loop train_losses = [] val_losses = [] best_val_loss = float('inf') epochs_without_improvement = 0 best_model_state = None for epoch in range(epochs): # Training model.train() train_loss = 0.0 for inputs, targets in train_loader: optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, targets) loss.backward() optimizer.step() train_loss += loss.item() * inputs.size(0) train_loss /= len(train_loader.dataset) train_losses.append(train_loss) # Validation model.eval() val_loss = 0.0 with torch.no_grad(): for inputs, targets in val_loader: outputs = model(inputs) loss = criterion(outputs, targets) val_loss += loss.item() * inputs.size(0) val_loss /= len(val_loader.dataset) val_losses.append(val_loss) # Early stopping check if val_loss < best_val_loss: best_val_loss = val_loss best_model_state = model.state_dict().copy() epochs_without_improvement = 0 else: epochs_without_improvement += 1 # Print progress if (epoch + 1) % 10 == 0: print(f"Epoch {epoch+1}/{epochs}, Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}") # Check early stopping if epochs_without_improvement >= early_stopping_patience: print(f"\nEarly stopping triggered after {epoch+1} epochs") break # Load best model if best_model_state is not None: model.load_state_dict(best_model_state) # Plot training history plt.figure(figsize=(10, 5)) plt.plot(train_losses, label='Training Loss') plt.plot(val_losses, label='Validation Loss') plt.xlabel('Epoch') plt.ylabel('MSE Loss') plt.title('Training and Validation Loss') plt.legend() plt.savefig('training_history.png') return model, scaler # Step 6: Evaluate the model def evaluate_model(model, X, y, scaler): # Scale the features X_scaled = scaler.transform(X) # Convert to PyTorch tensors X_tensor = torch.tensor(X_scaled, dtype=torch.float32) y_tensor = torch.tensor(y, dtype=torch.float32) # Make predictions model.eval() with torch.no_grad(): y_pred = model(X_tensor).squeeze().numpy() # Calculate metrics mse = np.mean((y_pred - y) ** 2) rmse = np.sqrt(mse) mae = np.mean(np.abs(y_pred - y)) r2 = 1 - (np.sum((y - y_pred) ** 2) / np.sum((y - np.mean(y)) ** 2)) print("\nModel Evaluation:") print(f"MSE: {mse:.6f}") print(f"RMSE: {rmse:.6f}") print(f"MAE: {mae:.6f}") print(f"R²: {r2:.6f}") # Plot actual vs predicted values plt.figure(figsize=(8, 8)) plt.scatter(y, y_pred, alpha=0.5) plt.plot([min(y), max(y)], [min(y), max(y)], 'r--') plt.xlabel('Actual Power Usage (J)') plt.ylabel('Predicted Power Usage (J)') plt.title('Actual vs Predicted Power Usage') plt.tight_layout() plt.savefig('prediction_scatter.png') return mse, rmse, mae, r2 def remove_outliers(X, y, threshold=3): # Scale features for distance calculations scaler = StandardScaler() X_scaled = scaler.fit_transform(X) # Calculate z-scores for target values z_scores = np.abs((y - np.mean(y)) / np.std(y)) # Identify inliers inliers = z_scores < threshold # Remove outliers X_clean = X.iloc[inliers] y_clean = y[inliers] removed = len(y) - len(y_clean) print(f"Removed {removed} outliers ({removed/len(y)*100:.2f}% of data)") return X_clean, y_clean # Step 7: Main function def main(): csv_path = 'logs.csv' # Load and prepare data print("Loading data...") X, y = load_data(csv_path) # Impute missing values print("\nImputing missing values...") X_imputed = impute_missing_values(X) X_imputed, y = remove_outliers(X_imputed, y) # Analyze feature importance print("\nAnalyzing feature importance...") feature_importance = analyze_feature_importance(X_imputed, y) # Train the model print("\nTraining model...") model, scaler = train_model(X_imputed, y, batch_size=8, epochs=100) # Evaluate the model print("\nEvaluating model...") evaluate_model(model, X_imputed, y, scaler) # Save model first_layer_weight = model.input_lin.weight.data # Original weight matrix first_layer_bias = model.input_lin.bias.data # Original bias vector # Apply scaling to weights and bias for i in range(first_layer_weight.shape[1]): # For each input feature scale_factor = scaler.scale_[i] # Get std for this feature mean_value = scaler.mean_[i] # Get mean for this feature # Modify weights: w_new = w_old / scale first_layer_weight[:, i] = first_layer_weight[:, i] / scale_factor # Add the mean adjustment to the bias: b_new = b_old - (w_new * mean) first_layer_bias[:] = first_layer_bias[:] - (first_layer_weight[:, i] * mean_value).sum(dim=0) # Update the model parameters model.input_lin.weight.data = first_layer_weight model.input_lin.bias.data = first_layer_bias model_weights = model.state_dict() torch.save(model_weights, "perf.pt") print("\nDone!") if __name__ == "__main__": main()