import pandas as pd import numpy as np import os import warnings warnings.filterwarnings('ignore') import tensorflow as tf tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) import matplotlib.pyplot as plt from keras import regularizers from keras.callbacks import TensorBoard, LearningRateScheduler from keras.layers import Dropout from sklearn.preprocessing import MinMaxScaler from tqdm import tqdm def data_read(data_address): df = pd.read_csv(data_address) label_mapping = {label: idx for idx, label in enumerate(df['Problem'].unique())} df['Problem'] = df['Problem'].map(label_mapping) df.replace([np.inf, -np.inf], np.nan, inplace=True) df.dropna(inplace=True) X = np.array(df['Voltage']) Y = np.array(df['Problem']) # 转换为时间序列数据格式 time_steps = 34 X_series, Y_series = [], [] for i in range(0, len(X) - time_steps): X_series.append(X[i:(i + time_steps)]) Y_series.append(Y[i + time_steps - 1]) return np.array(X_series), np.array(Y_series) # 编写绘图函数,画出训练集电压数据 def plant_for_voltage(x_train_new, x_train_orginal, gamma, learning_rate, y_train, different_location): # 绘制X_train的图形 for i in different_location: if i % 100 == 0: plt.figure() time_Step = list(range(0, 34)) plt.plot(time_Step, x_train_new[i]) # 添加标题和标签 plt.title(f'gamma:{gamma},learning_rate:{learning_rate},Y:{y_train[i]}') plt.xlabel('time_step') plt.ylabel('voltage') try: os.makedirs(f'Liu/picture/gamma{gamma} learningrate{learning_rate}') except FileExistsError: pass plt.savefig(f'Liu/picture/gamma{gamma} learningrate{learning_rate}/X_train_new_{i}.png') plt.close() plt.clf() # 画出原始图像 plt.figure() time_Step = list(range(0, 34)) plt.plot(time_Step, x_train_orginal[i]) # 添加标题和标签 plt.title(f'gamma:{gamma},learning_rate:{learning_rate},Y:{y_train[i]}') plt.xlabel('time_step') plt.ylabel('voltage') try: os.makedirs(f'Liu/picture/gamma{gamma} learningrate{learning_rate}') except FileExistsError: pass plt.savefig(f'Liu/picture/gamma{gamma} learningrate{learning_rate}/X_train_{i}.png') plt.close() plt.clf() def if_diff(a, b): # 比较两个数组相同位置上的元素是否相等 diff = np.where(a != b) list_diff = [] # 打印不同元素的索引及其对应的元素 for i in range(len(diff[0])): idx = (diff[0][i], diff[1][i]) list_diff.append(idx[0]) return list_diff X_train, Y_train = data_read( 'Liu\data\VOLTAGE-QUALITY-CLASSIFICATION-MODEL--main\Voltage Quality.csv') X_test, Y_test = data_read( 'Liu/data/VOLTAGE-QUALITY-CLASSIFICATION-MODEL--main/Voltage Quality Test.csv') # 初始化归一化模型 sc = MinMaxScaler(feature_range=(-1, 1)) X_train = sc.fit_transform(X_train) X_test = sc.transform(X_test) X_train.reshape(-1, 34, 1) X_test.reshape(-1, 34, 1) np.random.seed(7) np.random.shuffle(X_train) np.random.seed(7) np.random.shuffle(Y_train) tf.random.set_seed(7) # 获取类别数量 n_classes = len(np.unique(Y_train)) # 损失函数 loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True) # 构建使用 RNN 的模型 model = tf.keras.models.Sequential([ tf.keras.layers.SimpleRNN(100, return_sequences=True, input_shape=(34, 1)), Dropout(0.2), tf.keras.layers.SimpleRNN(100), Dropout(0.2), tf.keras.layers.Dense(n_classes, activation='relu', ) # kernel_regularizer=regularizers.l2(0.3) ]) # 编译模型 model.compile( optimizer='SGD', loss=loss_fn, metrics=['accuracy']) # 定义学习率指数递减的函数 def lr_schedule(epoch): initial_learning_rate = 0.01 decay_rate = 0.1 decay_steps = 250 new_learning_rate = initial_learning_rate * decay_rate ** (epoch / decay_steps) return new_learning_rate # 定义学习率调度器 lr_scheduler = LearningRateScheduler(lr_schedule) # Save initial weights initial_weights = model.get_weights() # TensorBoard 回调 log_dir = "logs/fit" tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1) # 制作扰动数据 # 转换X_train和Y_train为TensorFlow张量 X_train_tensor = tf.convert_to_tensor(X_train, dtype=tf.float64) Y_train_tensor = tf.convert_to_tensor(Y_train, dtype=tf.int32) # 使用tf.GradientTape来计算梯度 with tf.GradientTape() as tape: # 确保tape监视输入张量 tape.watch(X_train_tensor) # 前向传播,计算预测值 predictions = model(X_train_tensor) # 计算损失 loss = loss_fn(Y_train_tensor, predictions) # 计算关于输入X的梯度 gradients = tape.gradient(loss, X_train_tensor) # 创建每个gamma对应的准确率的字典 accuracy_per_gamma = {} # 平坦化梯度 flattened_gradients = tf.reshape(gradients, [-1]) # 选择最大的γ * |X|个梯度 for gamma in tqdm([0.6, 0.7, 0.8, 0.9, 0.99]): num_gradients_to_select = int( gamma * tf.size(flattened_gradients, out_type=tf.dtypes.float32)) top_gradients_indices = tf.argsort(flattened_gradients, direction='DESCENDING')[ :num_gradients_to_select] # 创建一个新的梯度张量,初始化为原始梯度的副本 updated_gradients = tf.identity(flattened_gradients) # 创建一个布尔掩码,其中选定的最大梯度为False,其他为True mask = tf.ones_like(updated_gradients, dtype=bool) mask = tf.tensor_scatter_nd_update(mask, tf.expand_dims( top_gradients_indices, 1), tf.zeros_like(top_gradients_indices, dtype=bool)) # 使用这个掩码更新梯度 updated_gradients = tf.where(mask, tf.zeros_like( updated_gradients), updated_gradients) # 将梯度重构为原始形状 updated_gradients = tf.reshape(updated_gradients, tf.shape(gradients)) # 创建准确率列表 accuracy_list = [] for learning_rate in tqdm([0.1, 0.2, 0.3, 0.4, 0.5]): # 应用学习率到梯度 scaled_gradients = (learning_rate * 100) * updated_gradients # 使用缩放后的梯度更新X_train_tensor X_train_updated = tf.add(X_train_tensor, scaled_gradients) X_train_updated = X_train_updated.numpy() list_diff = if_diff(X_train, X_train_updated) # 显示扰动数据和原始数据的可视化图像 # plant_for_voltage(X_train_updated, X_train, gamma, learning_rate, Y_train, list_diff) # Reset model weights to initial weights model.set_weights(initial_weights) # 训练模型,添加 TensorBoard 回调 history = model.fit(X_train_updated, Y_train, epochs=500, batch_size=32, callbacks=[tensorboard_callback, lr_scheduler], verbose=0) # 评估模型 loss, accuracy = model.evaluate(X_test, Y_test) print(f"Accuracy gamma: {gamma},learning:{learning_rate}", accuracy) # 记录准确率 accuracy_list.append(accuracy) # 记录该gamma下的准确率 accuracy_per_gamma[gamma] = accuracy_list # 学习率样本 learning_rates = [0.1, 0.2, 0.3, 0.4, 0.5] # 不同的gamma值 gammas = [0.05, 0.1, 0.2, 0.4] # 创建图像 plt.figure(figsize=(10, 6)) # 为每个gamma值绘制曲线 for gamma in gammas: plt.plot(learning_rates, accuracy_per_gamma[gamma], marker='o', label=f'Gamma={gamma}') # 添加标题和标签 plt.title('Accuracy vs Learning Rate for Different Gammas') plt.xlabel('Learning Rate') plt.ylabel('Accuracy') plt.legend() # 显示图像 plt.show() # tensorboard --logdir logs/fit