Liu/main.py

264 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from tqdm import tqdm
from sklearn.preprocessing import MinMaxScaler
from keras.layers import Dropout
from keras.callbacks import TensorBoard, LearningRateScheduler
from keras import regularizers
import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import os
import warnings
warnings.filterwarnings('ignore')
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
def data_read(data_address):
df = pd.read_csv(data_address)
label_mapping = {label: idx for idx,
label in enumerate(df['Problem'].unique())}
df['Problem'] = df['Problem'].map(label_mapping)
df.replace([np.inf, -np.inf], np.nan, inplace=True)
df.dropna(inplace=True)
X = np.array(df['Voltage'])
Y = np.array(df['Problem'])
# 转换为时间序列数据格式
time_steps = 34
X_series, Y_series = [], []
i = 0
while i < len(X):
X_series.append(X[i:(i + time_steps)])
Y_series.append(Y[i + time_steps - 1])
i += time_steps
return np.array(X_series), np.array(Y_series)
# 编写绘图函数,画出训练集电压数据
def plant_for_voltage(x_train_new, x_train_orginal, gamma, learning_rate, y_train, different_location):
# 绘制X_train的图形
for i in different_location:
if i % 100 == 0:
plt.figure()
time_Step = list(range(0, 34))
plt.plot(time_Step,
x_train_new[i])
# 添加标题和标签
plt.title(
f'gamma:{gamma},learning_rate:{learning_rate},Y:{y_train[i]}')
plt.xlabel('time_step')
plt.ylabel('voltage')
try:
os.makedirs(
f'Liu/picture/gamma{gamma} learningrate{learning_rate}')
except FileExistsError:
pass
plt.savefig(
f'Liu/picture/gamma{gamma} learningrate{learning_rate}/X_train_new_{i}.png')
plt.close()
plt.clf()
# 画出原始图像
plt.figure()
time_Step = list(range(0, 34))
plt.plot(time_Step,
x_train_orginal[i])
# 添加标题和标签
plt.title(
f'gamma:{gamma},learning_rate:{learning_rate},Y:{y_train[i]}')
plt.xlabel('time_step')
plt.ylabel('voltage')
try:
os.makedirs(
f'Liu/picture/gamma{gamma} learningrate{learning_rate}')
except FileExistsError:
pass
plt.savefig(
f'Liu/picture/gamma{gamma} learningrate{learning_rate}/X_train_{i}.png')
plt.close()
plt.clf()
def if_diff(a, b):
# 比较两个数组相同位置上的元素是否相等
diff = np.where(a != b)
list_diff = []
# 打印不同元素的索引及其对应的元素
for i in range(len(diff[0])):
idx = (diff[0][i], diff[1][i])
list_diff.append(idx[0])
return list_diff
X_train, Y_train = data_read(
'Liu\data\VOLTAGE-QUALITY-CLASSIFICATION-MODEL--main\Voltage Quality.csv')
X_test, Y_test = data_read(
'Liu/data/VOLTAGE-QUALITY-CLASSIFICATION-MODEL--main/Voltage Quality Test.csv')
# 初始化归一化模型
sc = MinMaxScaler(feature_range=(-1, 1))
X_train = sc.fit_transform(X_train)
X_test = sc.transform(X_test)
X_train.reshape(-1, 34, 1)
X_test.reshape(-1, 34, 1)
np.random.seed(7)
np.random.shuffle(X_train)
np.random.seed(7)
np.random.shuffle(Y_train)
tf.random.set_seed(7)
# 获取类别数量
n_classes = len(np.unique(Y_train))
# 损失函数
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# 构建使用 RNN 的模型
model = tf.keras.models.Sequential([
tf.keras.layers.SimpleRNN(100, return_sequences=True, input_shape=(34, 1)),
Dropout(0.2),
tf.keras.layers.SimpleRNN(100),
Dropout(0.2),
tf.keras.layers.Dense(n_classes, activation='relu',
) # kernel_regularizer=regularizers.l2(0.3)
])
# 编译模型
model.compile(
optimizer='SGD',
loss=loss_fn,
metrics=['accuracy'])
# 定义学习率指数递减的函数
def lr_schedule(epoch):
initial_learning_rate = 0.01
decay_rate = 0.1
decay_steps = 250
new_learning_rate = initial_learning_rate * \
decay_rate ** (epoch / decay_steps)
return new_learning_rate
# 定义学习率调度器
lr_scheduler = LearningRateScheduler(lr_schedule)
# TensorBoard 回调
log_dir = "logs/fit"
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)
# 训练模型,添加 TensorBoard 回调
model.fit(X_train, Y_train, epochs=500,
batch_size=32, callbacks=[tensorboard_callback, lr_scheduler])
# 评估模型
loss, accuracy = model.evaluate(X_test, Y_test)
print("Test original accuracy:", accuracy)
# 制作扰动数据
# 转换X_test和Y_test为TensorFlow张量
X_train_tensor = tf.convert_to_tensor(X_test, dtype=tf.float64)
Y_train_tensor = tf.convert_to_tensor(Y_test, dtype=tf.int32)
# 使用tf.GradientTape来计算梯度
with tf.GradientTape() as tape:
# 确保tape监视输入张量
tape.watch(X_train_tensor)
# 前向传播,计算预测值
predictions = model(X_train_tensor)
# 计算损失
loss = loss_fn(Y_train_tensor, predictions)
# 计算关于输入X的梯度
gradients = tape.gradient(loss, X_train_tensor)
# 创建每个gamma对应的准确率的字典
accuracy_per_gamma = {}
# 平坦化梯度
flattened_gradients = tf.reshape(gradients, [-1])
# 选择最大的γ * |X|个梯度
for gamma in [0.05, 0.1, 0.2, 0.4]:
num_gradients_to_select = int(
gamma * tf.size(flattened_gradients, out_type=tf.dtypes.float32))
top_gradients_indices = tf.argsort(flattened_gradients, direction='DESCENDING')[
:num_gradients_to_select]
# 创建一个新的梯度张量,初始化为原始梯度的副本
updated_gradients = tf.identity(flattened_gradients)
# 创建一个布尔掩码其中选定的最大梯度为False其他为True
mask = tf.ones_like(updated_gradients, dtype=bool)
mask = tf.tensor_scatter_nd_update(mask, tf.expand_dims(
top_gradients_indices, 1), tf.zeros_like(top_gradients_indices, dtype=bool))
# 使用这个掩码更新梯度
updated_gradients = tf.where(mask, tf.zeros_like(
updated_gradients), updated_gradients)
# 将梯度重构为原始形状
updated_gradients = tf.reshape(updated_gradients, tf.shape(gradients))
# 创建准确率列表
accuracy_list = []
for learning_rate in [0.1, 0.2, 0.3, 0.4, 0.5]:
# 应用学习率到梯度
scaled_gradients = (learning_rate * 17000) * updated_gradients
# 使用缩放后的梯度更新X_train_tensor
X_train_updated = tf.add(X_train_tensor, scaled_gradients)
X_train_updated = X_train_updated.numpy()
# 显示扰动数据和原始数据的可视化图像
# plant_for_voltage(X_train_updated, X_train, gamma, learning_rate, Y_train, list_diff)
# 评估模型
loss, accuracy = model.evaluate(X_train_updated, Y_test)
print(f"Accuracy gamma: {gamma},learning:{learning_rate}", accuracy)
# 记录准确率
accuracy_list.append(accuracy)
# 记录该gamma下的准确率
accuracy_per_gamma[gamma] = accuracy_list
# 学习率样本
learning_rates = [0.1, 0.2, 0.3, 0.4, 0.5]
# 不同的gamma值
gammas = [0.05, 0.1, 0.2, 0.4]
# 创建图像
plt.figure(figsize=(10, 6))
# 为每个gamma值绘制曲线
for gamma in gammas:
plt.plot(learning_rates,
accuracy_per_gamma[gamma], marker='o', label=f'Gamma={gamma}')
# 添加标题和标签
plt.title('Accuracy vs Learning Rate for Different Gammas')
plt.xlabel('Learning Rate')
plt.ylabel('Accuracy')
plt.legend()
# 显示图像
plt.show()