完成了对一般模型的攻击复现,结果同论文相同

This commit is contained in:
MuJ 2024-01-13 19:35:48 +08:00
parent b4431d20ab
commit 111afc5b4c
10 changed files with 12265 additions and 263 deletions

Binary file not shown.

83
attack_nomal/data_load.py Normal file
View File

@ -0,0 +1,83 @@
import pandas as pd
import numpy as np
from keras.utils import to_categorical
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
def data_format(data_path, is_column=False, rate=0.25):
"""_summary_
Args:
data_path (_type_): 数据路径
is_column (bool, optional): 是否为列数据. Defaults to False.
rate (float, optional): 实验集划分的比例. Defaults to 0.25.
Returns:X_train, X_test, Y_train, Y_test
_type_: np.array
"""
# 读入数据
X, Y = data_load(data_path, is_column)
# 归一化数据
sc = MinMaxScaler(feature_range=(-1, 1))
X = sc.fit_transform(X)
# 划分数据集75%用于训练25%用于测试
X_train, X_test, Y_train, Y_test = train_test_split(
X, Y, test_size=rate, random_state=7)
return X_train, X_test, Y_train, Y_test
def data_load(data_path, is_column=False):
"""
数据加载
data_path: 数据路径
is_column: 是否是列数据
return:X,Y
"""
# 读取csv文件
df = pd.read_csv(data_path)
# 进行数据清洗
data_clean(df, is_column)
# 去除第一列
df = df.drop(df.columns[0], axis=1)
# 初始化X,Y
X, Y = [], []
# 遍历DataFrame的每一行
for index, row in df.iterrows():
# 获取前128个数据项
X.append(row.iloc[0:128])
Y.append(int(row.iloc[128]))
return np.array(X), np.array(Y)
def data_clean(data, is_column=False):
"""_summary_
Args:
data (_type_): csv数据
is_column (bool, optional): 清除含有NaN数据的列. Defaults to False.即清除含有NaN数据的行
Returns:
_type_: 清洗过的数据
"""
if not is_column:
data = data.dropna(axis=0)
return data
else:
data = data.dropna(axis=1)
return data
if __name__ == '__main__':
# 加载数据
X_train, X_test, Y_train, Y_test = data_format(
'data/archive/PowerQualityDistributionDataset1.csv')
print(X_train.shape, X_test.shape, Y_train.shape, Y_test.shape)

99
attack_nomal/main.py Normal file
View File

@ -0,0 +1,99 @@
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
import keras
from data_load import data_format
# 加载数据集
X_train, X_test, Y_train, Y_test = data_format('data/archive/PowerQualityDistributionDataset1.csv')
# 设置随机种子以确保重现性
np.random.seed(7)
np.random.shuffle(X_test)
np.random.seed(7)
np.random.shuffle(Y_test)
tf.random.set_seed(7)
# 加载训练好的模型
model = keras.models.load_model('model_nomal')
# 使用测试集评估模型的初始准确率
loss, accuracy = model.evaluate(X_test, Y_test)
print("Test original accuracy:", accuracy)
# 定义损失函数
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# 将测试数据转换为TensorFlow张量
X_test_tensor = tf.convert_to_tensor(X_test, dtype=tf.float64)
Y_test_tensor = tf.convert_to_tensor(Y_test, dtype=tf.int32)
# 用于存储不同gamma值下的准确率
accuracy_per_gamma = {}
# 遍历不同的gamma值
for gamma in [0.05, 0.1, 0.2, 0.4]:
# 使用GradientTape计算梯度
with tf.GradientTape() as tape:
tape.watch(X_test_tensor)
predictions = model(X_test_tensor)
loss = loss_fn(Y_test_tensor, predictions)
# 计算关于输入的梯度
gradients = tape.gradient(loss, X_test_tensor)
# 平坦化梯度以便进行处理
flattened_gradients = tf.reshape(gradients, [-1])
# 选择最大的γ * |X|个梯度
num_gradients_to_select = int(gamma * tf.size(flattened_gradients, out_type=tf.dtypes.float32))
top_gradients_indices = tf.argsort(flattened_gradients, direction='DESCENDING')[:num_gradients_to_select]
# 创建新的梯度张量,初始值为原始梯度
updated_gradients = tf.identity(flattened_gradients)
# 创建布尔掩码,用于选择特定梯度
mask = tf.ones_like(updated_gradients, dtype=bool)
mask = tf.tensor_scatter_nd_update(mask, tf.expand_dims(top_gradients_indices, 1), tf.zeros_like(top_gradients_indices, dtype=bool))
# 应用掩码更新梯度
updated_gradients = tf.where(mask, tf.zeros_like(updated_gradients), updated_gradients)
# 将梯度恢复到原始形状
updated_gradients = tf.reshape(updated_gradients, tf.shape(gradients))
# 用于存储不同学习率下的准确率
accuracy_list = []
# 遍历不同的学习率
for learning_rate in [0.1, 0.2, 0.3, 0.4, 0.5]:
# 应用学习率到梯度
scaled_gradients = (learning_rate * 700) * updated_gradients
# 更新X_test_tensor
X_train_updated = tf.add(X_test_tensor, scaled_gradients)
X_train_updated = X_train_updated.numpy()
# 评估更新后的模型
loss, accuracy = model.evaluate(X_train_updated, Y_test)
print(f"Accuracy gamma: {gamma},learning:{learning_rate}", accuracy)
# 记录准确率
accuracy_list.append(accuracy)
# 存储每个gamma值下的准确率
accuracy_per_gamma[gamma] = accuracy_list
# 定义学习率和gamma值
learning_rates = [0.1, 0.2, 0.3, 0.4, 0.5]
gammas = [0.05, 0.1, 0.2, 0.4]
# 创建并绘制结果图
plt.figure(figsize=(10, 6))
for gamma in gammas:
plt.plot(learning_rates, accuracy_per_gamma[gamma], marker='o', label=f'Gamma={gamma}')
plt.title('Accuracy vs Learning Rate for Different Gammas')
plt.xlabel('Learning Rate')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

View File

@ -0,0 +1,75 @@
from data_load import data_format
import tensorflow as tf
import numpy as np
from keras.layers import Dropout
from keras import regularizers
from keras.callbacks import TensorBoard, LearningRateScheduler
import keras
def model_train(X_train, X_test, Y_train, Y_test):
"""_summary_
Args:
X_train (np.array): _description_
X_test (np.array): _description_
Y_train (np.array): _description_
Y_test (np.array): _description_
"""
# 数据随机化
np.random.seed(7)
np.random.shuffle(X_train)
np.random.seed(7)
np.random.shuffle(Y_train)
tf.random.set_seed(7)
# 构建模型
model = tf.keras.models.Sequential([
tf.keras.layers.Dense(10000, activation='relu'), # 第一层
Dropout(0.2),
tf.keras.layers.Dense(800, activation='relu'), # 第一层
Dropout(0.2),
tf.keras.layers.Dense(
(len(np.unique(Y_train)) + 1), activation='relu', kernel_regularizer=regularizers.l2(0.01))
])
# 损失函数
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# 编译模型
model.compile(
optimizer='SGD',
loss=loss_fn,
metrics=['accuracy'])
# 定义学习率指数递减的函数
def lr_schedule(epoch):
initial_learning_rate = 0.01
decay_rate = 0.1
decay_steps = 1500
new_learning_rate = initial_learning_rate * \
decay_rate ** (epoch / decay_steps)
return new_learning_rate
# 定义学习率调度器
lr_scheduler = LearningRateScheduler(lr_schedule)
# TensorBoard 回调
log_dir = "logs/fit"
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)
# 训练模型,添加 TensorBoard 回调
model.fit(X_train, Y_train, epochs=1000,
callbacks=[tensorboard_callback, lr_scheduler], batch_size=256)
loss, accuracy = model.evaluate(X_test, Y_test)
print("Test accuracy:", accuracy)
# 保存模型
keras.models.save_model(model, 'model')
if __name__ == "__main__":
X_train, X_test, Y_train, Y_test = data_format(
'data/archive/PowerQualityDistributionDataset1.csv')
model_train(X_train, X_test, Y_train, Y_test)

File diff suppressed because it is too large Load Diff

263
main.py
View File

@ -1,263 +0,0 @@
from tqdm import tqdm
from sklearn.preprocessing import MinMaxScaler
from keras.layers import Dropout
from keras.callbacks import TensorBoard, LearningRateScheduler
from keras import regularizers
import matplotlib.pyplot as plt
import tensorflow as tf
import pandas as pd
import numpy as np
import os
import warnings
warnings.filterwarnings('ignore')
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
def data_read(data_address):
df = pd.read_csv(data_address)
label_mapping = {label: idx for idx,
label in enumerate(df['Problem'].unique())}
df['Problem'] = df['Problem'].map(label_mapping)
df.replace([np.inf, -np.inf], np.nan, inplace=True)
df.dropna(inplace=True)
X = np.array(df['Voltage'])
Y = np.array(df['Problem'])
# 转换为时间序列数据格式
time_steps = 34
X_series, Y_series = [], []
i = 0
while i < len(X):
X_series.append(X[i:(i + time_steps)])
Y_series.append(Y[i + time_steps - 1])
i += time_steps
return np.array(X_series), np.array(Y_series)
# 编写绘图函数,画出训练集电压数据
def plant_for_voltage(x_train_new, x_train_orginal, gamma, learning_rate, y_train, different_location):
# 绘制X_train的图形
for i in different_location:
if i % 100 == 0:
plt.figure()
time_Step = list(range(0, 34))
plt.plot(time_Step,
x_train_new[i])
# 添加标题和标签
plt.title(
f'gamma:{gamma},learning_rate:{learning_rate},Y:{y_train[i]}')
plt.xlabel('time_step')
plt.ylabel('voltage')
try:
os.makedirs(
f'Liu/picture/gamma{gamma} learningrate{learning_rate}')
except FileExistsError:
pass
plt.savefig(
f'Liu/picture/gamma{gamma} learningrate{learning_rate}/X_train_new_{i}.png')
plt.close()
plt.clf()
# 画出原始图像
plt.figure()
time_Step = list(range(0, 34))
plt.plot(time_Step,
x_train_orginal[i])
# 添加标题和标签
plt.title(
f'gamma:{gamma},learning_rate:{learning_rate},Y:{y_train[i]}')
plt.xlabel('time_step')
plt.ylabel('voltage')
try:
os.makedirs(
f'Liu/picture/gamma{gamma} learningrate{learning_rate}')
except FileExistsError:
pass
plt.savefig(
f'Liu/picture/gamma{gamma} learningrate{learning_rate}/X_train_{i}.png')
plt.close()
plt.clf()
def if_diff(a, b):
# 比较两个数组相同位置上的元素是否相等
diff = np.where(a != b)
list_diff = []
# 打印不同元素的索引及其对应的元素
for i in range(len(diff[0])):
idx = (diff[0][i], diff[1][i])
list_diff.append(idx[0])
return list_diff
X_train, Y_train = data_read(
'Liu\data\VOLTAGE-QUALITY-CLASSIFICATION-MODEL--main\Voltage Quality.csv')
X_test, Y_test = data_read(
'Liu/data/VOLTAGE-QUALITY-CLASSIFICATION-MODEL--main/Voltage Quality Test.csv')
# 初始化归一化模型
sc = MinMaxScaler(feature_range=(-1, 1))
X_train = sc.fit_transform(X_train)
X_test = sc.transform(X_test)
X_train.reshape(-1, 34, 1)
X_test.reshape(-1, 34, 1)
np.random.seed(7)
np.random.shuffle(X_train)
np.random.seed(7)
np.random.shuffle(Y_train)
tf.random.set_seed(7)
# 获取类别数量
n_classes = len(np.unique(Y_train))
# 损失函数
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
# 构建使用 RNN 的模型
model = tf.keras.models.Sequential([
tf.keras.layers.SimpleRNN(100, return_sequences=True, input_shape=(34, 1)),
Dropout(0.2),
tf.keras.layers.SimpleRNN(100),
Dropout(0.2),
tf.keras.layers.Dense(n_classes, activation='relu',
) # kernel_regularizer=regularizers.l2(0.3)
])
# 编译模型
model.compile(
optimizer='SGD',
loss=loss_fn,
metrics=['accuracy'])
# 定义学习率指数递减的函数
def lr_schedule(epoch):
initial_learning_rate = 0.01
decay_rate = 0.1
decay_steps = 250
new_learning_rate = initial_learning_rate * \
decay_rate ** (epoch / decay_steps)
return new_learning_rate
# 定义学习率调度器
lr_scheduler = LearningRateScheduler(lr_schedule)
# TensorBoard 回调
log_dir = "logs/fit"
tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)
# 训练模型,添加 TensorBoard 回调
model.fit(X_train, Y_train, epochs=500,
batch_size=32, callbacks=[tensorboard_callback, lr_scheduler])
# 评估模型
loss, accuracy = model.evaluate(X_test, Y_test)
print("Test original accuracy:", accuracy)
# 制作扰动数据
# 转换X_test和Y_test为TensorFlow张量
X_train_tensor = tf.convert_to_tensor(X_test, dtype=tf.float64)
Y_train_tensor = tf.convert_to_tensor(Y_test, dtype=tf.int32)
# 使用tf.GradientTape来计算梯度
with tf.GradientTape() as tape:
# 确保tape监视输入张量
tape.watch(X_train_tensor)
# 前向传播,计算预测值
predictions = model(X_train_tensor)
# 计算损失
loss = loss_fn(Y_train_tensor, predictions)
# 计算关于输入X的梯度
gradients = tape.gradient(loss, X_train_tensor)
# 创建每个gamma对应的准确率的字典
accuracy_per_gamma = {}
# 平坦化梯度
flattened_gradients = tf.reshape(gradients, [-1])
# 选择最大的γ * |X|个梯度
for gamma in [0.05, 0.1, 0.2, 0.4]:
num_gradients_to_select = int(
gamma * tf.size(flattened_gradients, out_type=tf.dtypes.float32))
top_gradients_indices = tf.argsort(flattened_gradients, direction='DESCENDING')[
:num_gradients_to_select]
# 创建一个新的梯度张量,初始化为原始梯度的副本
updated_gradients = tf.identity(flattened_gradients)
# 创建一个布尔掩码其中选定的最大梯度为False其他为True
mask = tf.ones_like(updated_gradients, dtype=bool)
mask = tf.tensor_scatter_nd_update(mask, tf.expand_dims(
top_gradients_indices, 1), tf.zeros_like(top_gradients_indices, dtype=bool))
# 使用这个掩码更新梯度
updated_gradients = tf.where(mask, tf.zeros_like(
updated_gradients), updated_gradients)
# 将梯度重构为原始形状
updated_gradients = tf.reshape(updated_gradients, tf.shape(gradients))
# 创建准确率列表
accuracy_list = []
for learning_rate in [0.1, 0.2, 0.3, 0.4, 0.5]:
# 应用学习率到梯度
scaled_gradients = (learning_rate * 17000) * updated_gradients
# 使用缩放后的梯度更新X_train_tensor
X_train_updated = tf.add(X_train_tensor, scaled_gradients)
X_train_updated = X_train_updated.numpy()
# 显示扰动数据和原始数据的可视化图像
# plant_for_voltage(X_train_updated, X_train, gamma, learning_rate, Y_train, list_diff)
# 评估模型
loss, accuracy = model.evaluate(X_train_updated, Y_test)
print(f"Accuracy gamma: {gamma},learning:{learning_rate}", accuracy)
# 记录准确率
accuracy_list.append(accuracy)
# 记录该gamma下的准确率
accuracy_per_gamma[gamma] = accuracy_list
# 学习率样本
learning_rates = [0.1, 0.2, 0.3, 0.4, 0.5]
# 不同的gamma值
gammas = [0.05, 0.1, 0.2, 0.4]
# 创建图像
plt.figure(figsize=(10, 6))
# 为每个gamma值绘制曲线
for gamma in gammas:
plt.plot(learning_rates,
accuracy_per_gamma[gamma], marker='o', label=f'Gamma={gamma}')
# 添加标题和标签
plt.title('Accuracy vs Learning Rate for Different Gammas')
plt.xlabel('Learning Rate')
plt.ylabel('Accuracy')
plt.legend()
# 显示图像
plt.show()

File diff suppressed because one or more lines are too long

BIN
model_nomal/saved_model.pb Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.