196 lines
5.6 KiB
Python
196 lines
5.6 KiB
Python
import pandas as pd
|
||
import numpy as np
|
||
import tensorflow as tf
|
||
import matplotlib.pyplot as plt
|
||
from keras import regularizers
|
||
from keras.callbacks import TensorBoard, LearningRateScheduler
|
||
from keras.layers import Dropout
|
||
from sklearn.preprocessing import MinMaxScaler
|
||
|
||
|
||
def data_read(data_address):
|
||
df = pd.read_csv(data_address)
|
||
label_mapping = {label: idx for idx,
|
||
label in enumerate(df['Problem'].unique())}
|
||
df['Problem'] = df['Problem'].map(label_mapping)
|
||
|
||
df.replace([np.inf, -np.inf], np.nan, inplace=True)
|
||
df.dropna(inplace=True)
|
||
|
||
X = np.array(df['Voltage'])
|
||
Y = np.array(df['Problem'])
|
||
|
||
X = tf.nn.relu(X)
|
||
|
||
# 转换为时间序列数据格式
|
||
time_steps = 34
|
||
X_series, Y_series = [], []
|
||
for i in range(0, len(X) - time_steps):
|
||
X_series.append(X[i:(i + time_steps)])
|
||
Y_series.append(Y[i + time_steps - 1])
|
||
|
||
return np.array(X_series), np.array(Y_series)
|
||
|
||
|
||
X_train, Y_train = data_read(
|
||
'Liu\data\VOLTAGE-QUALITY-CLASSIFICATION-MODEL--main\Voltage Quality.csv')
|
||
X_test, Y_test = data_read(
|
||
'Liu/data/VOLTAGE-QUALITY-CLASSIFICATION-MODEL--main/Voltage Quality Test.csv')
|
||
|
||
# 归一化
|
||
sc = MinMaxScaler(feature_range=(0, 1))
|
||
X_train = sc.fit_transform(X_train)
|
||
X_test = sc.transform(X_test)
|
||
|
||
X_train.reshape(-1, 34, 1)
|
||
X_test.reshape(-1, 34, 1)
|
||
|
||
|
||
np.random.seed(7)
|
||
np.random.shuffle(X_train)
|
||
np.random.seed(7)
|
||
np.random.shuffle(Y_train)
|
||
tf.random.set_seed(7)
|
||
|
||
|
||
# 获取类别数量
|
||
n_classes = len(np.unique(Y_train))
|
||
|
||
# 损失函数
|
||
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
|
||
|
||
# 构建使用 RNN 的模型
|
||
model = tf.keras.models.Sequential([
|
||
tf.keras.layers.SimpleRNN(100, return_sequences=True, input_shape=(34, 1)),
|
||
Dropout(0.2),
|
||
tf.keras.layers.SimpleRNN(100),
|
||
Dropout(0.2),
|
||
tf.keras.layers.Dense(n_classes, activation='relu',
|
||
kernel_regularizer=regularizers.l2(0.3)) # 适应多分类
|
||
])
|
||
|
||
# 编译模型
|
||
model.compile(
|
||
optimizer='SGD',
|
||
loss=loss_fn,
|
||
metrics=['accuracy'])
|
||
|
||
# 定义学习率指数递减的函数
|
||
def lr_schedule(epoch):
|
||
initial_learning_rate = 0.01
|
||
decay_rate = 0.1
|
||
decay_steps = 250
|
||
new_learning_rate = initial_learning_rate * decay_rate ** (epoch / decay_steps)
|
||
return new_learning_rate
|
||
|
||
# 定义学习率调度器
|
||
lr_scheduler = LearningRateScheduler(lr_schedule)
|
||
|
||
|
||
# Save initial weights
|
||
initial_weights = model.get_weights()
|
||
|
||
# TensorBoard 回调
|
||
log_dir = "logs/fit"
|
||
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)
|
||
|
||
# 制作扰动数据
|
||
|
||
# 转换X_train和Y_train为TensorFlow张量
|
||
X_train_tensor = tf.convert_to_tensor(X_train, dtype=tf.float64)
|
||
Y_train_tensor = tf.convert_to_tensor(Y_train, dtype=tf.int32)
|
||
|
||
# 使用tf.GradientTape来计算梯度
|
||
with tf.GradientTape() as tape:
|
||
# 确保tape监视输入张量
|
||
tape.watch(X_train_tensor)
|
||
# 前向传播,计算预测值
|
||
predictions = model(X_train_tensor)
|
||
# 计算损失
|
||
loss = loss_fn(Y_train_tensor, predictions)
|
||
|
||
# 计算关于输入X的梯度
|
||
gradients = tape.gradient(loss, X_train_tensor)
|
||
|
||
# 创建每个gamma对应的准确率的字典
|
||
accuracy_per_gamma = {}
|
||
|
||
# 平坦化梯度
|
||
flattened_gradients = tf.reshape(gradients, [-1])
|
||
|
||
# 选择最大的γ * |X|个梯度
|
||
for gamma in [0.05, 0.1, 0.2, 0.4]:
|
||
num_gradients_to_select = int(
|
||
gamma * tf.size(flattened_gradients, out_type=tf.dtypes.float32))
|
||
top_gradients_indices = tf.argsort(flattened_gradients, direction='DESCENDING')[
|
||
:num_gradients_to_select]
|
||
|
||
# 创建一个新的梯度张量,初始化为原始梯度的副本
|
||
updated_gradients = tf.identity(flattened_gradients)
|
||
|
||
# 创建一个布尔掩码,其中选定的最大梯度为False,其他为True
|
||
mask = tf.ones_like(updated_gradients, dtype=bool)
|
||
mask = tf.tensor_scatter_nd_update(mask, tf.expand_dims(
|
||
top_gradients_indices, 1), tf.zeros_like(top_gradients_indices, dtype=bool))
|
||
|
||
# 使用这个掩码更新梯度
|
||
updated_gradients = tf.where(mask, tf.zeros_like(
|
||
updated_gradients), updated_gradients)
|
||
|
||
# 将梯度重构为原始形状
|
||
updated_gradients = tf.reshape(updated_gradients, tf.shape(gradients))
|
||
|
||
# 创建准确率列表
|
||
accuracy_list = []
|
||
|
||
for learning_rate in [0.1, 0.2, 0.3, 0.4, 0.5]:
|
||
|
||
# 应用学习率到梯度
|
||
scaled_gradients = learning_rate * updated_gradients
|
||
|
||
# 使用缩放后的梯度更新X_train_tensor
|
||
X_train_updated = tf.add(X_train_tensor, scaled_gradients)
|
||
X_train_updated = X_train_updated.numpy()
|
||
|
||
# Reset model weights to initial weights
|
||
model.set_weights(initial_weights)
|
||
|
||
# 训练模型,添加 TensorBoard 回调
|
||
history = model.fit(X_train_updated, Y_train, epochs=1500,
|
||
batch_size=32, callbacks=[tensorboard_callback, lr_scheduler])
|
||
|
||
# 评估模型
|
||
loss, accuracy = model.evaluate(X_test, Y_test)
|
||
|
||
# 记录准确率
|
||
accuracy_list.append(accuracy)
|
||
|
||
|
||
# 记录该gamma下的准确率
|
||
accuracy_per_gamma[gamma] = accuracy_list
|
||
|
||
|
||
# 学习率样本
|
||
learning_rates = [0.1, 0.2, 0.3, 0.4, 0.5]
|
||
|
||
# 不同的gamma值
|
||
gammas = [0.05, 0.1, 0.2, 0.4]
|
||
|
||
# 创建图像
|
||
last_plt = plt.figure(figsize=(10, 6))
|
||
|
||
# 为每个gamma值绘制曲线
|
||
for gamma in gammas:
|
||
plt.plot(learning_rates,
|
||
accuracy_per_gamma[gamma], marker='o', label=f'Gamma={gamma}')
|
||
|
||
# 添加标题和标签
|
||
plt.title('Accuracy vs Learning Rate for Different Gammas')
|
||
plt.xlabel('Learning Rate')
|
||
plt.ylabel('Accuracy')
|
||
plt.legend()
|
||
|
||
# 显示图像
|
||
plt.show()
|
||
|
||
# tensorboard --logdir logs/fit |