2025-04-20 20:55:06 +08:00
|
|
|
|
import warnings
|
|
|
|
|
|
|
|
|
|
import matplotlib
|
|
|
|
|
|
|
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
|
matplotlib.use('Agg')
|
|
|
|
|
import numpy as np
|
|
|
|
|
import torch
|
|
|
|
|
import pandas as pd
|
|
|
|
|
import ruptures as rpt
|
|
|
|
|
from scipy.optimize import differential_evolution
|
|
|
|
|
from sklearn.metrics import mean_squared_error
|
|
|
|
|
from query_probability import query_one, load_ucr
|
|
|
|
|
from pymoo.core.problem import ElementwiseProblem
|
|
|
|
|
from pymoo.algorithms.soo.nonconvex.pso import PSO
|
|
|
|
|
from pymoo.optimize import minimize
|
|
|
|
|
from pymoo.core.callback import Callback
|
|
|
|
|
warnings.filterwarnings('ignore')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def detect_change_points(data):
|
|
|
|
|
models = ['l2', 'rbf', 'l1'] # 使用三种模型
|
|
|
|
|
all_bkps = set() # 使用集合来自动去重
|
|
|
|
|
|
|
|
|
|
for model in models:
|
|
|
|
|
algo = rpt.Binseg(model=model, min_size=1, jump=1).fit(data)
|
|
|
|
|
bkps = algo.predict(pen=1)
|
|
|
|
|
all_bkps.update(bkps) # 将检测到的变点添加到集合中
|
|
|
|
|
|
|
|
|
|
return sorted(all_bkps) # 返回排序后的唯一变点列表
|
|
|
|
|
|
|
|
|
|
# def process_change_points(length, change_points, window_size):
|
|
|
|
|
# # 初始化次要变点集合
|
|
|
|
|
# all_important_nodes = set()
|
|
|
|
|
# # 遍历每一个主要变点
|
|
|
|
|
# for bkp in change_points:
|
|
|
|
|
# # 计算该变点左右窗口的边界
|
|
|
|
|
# start = max(0, int(bkp - window_size/2))
|
|
|
|
|
# end = min(length, int(bkp + window_size/2))
|
|
|
|
|
# a = 0.4
|
|
|
|
|
# b = 0.6
|
|
|
|
|
# prob = a + (b - a) * np.random.random()
|
|
|
|
|
# # 遍历窗口内的每一个点
|
|
|
|
|
# for i in range(start, end):
|
|
|
|
|
# # 生成一个[0, 1]之间的随机数
|
|
|
|
|
# random_value = np.random.random()
|
|
|
|
|
# # 如果随机数大于0.5,将该点添加到次要变点集合中
|
|
|
|
|
# if random_value >= 0.4:
|
|
|
|
|
# all_important_nodes.add(i)
|
|
|
|
|
#
|
|
|
|
|
# return all_important_nodes
|
|
|
|
|
|
|
|
|
|
def process_change_points(ori_ts, length, change_points, window_size):
|
|
|
|
|
# 计算每个变点的变化激烈程度
|
|
|
|
|
changes_intensity = [(bkp, abs(ori_ts[bkp] - ori_ts[bkp - 1])) for bkp in change_points]
|
|
|
|
|
# 根据变化激烈程度对主要变点进行排序
|
|
|
|
|
changes_intensity.sort(key=lambda x: x[1], reverse=True)
|
|
|
|
|
|
|
|
|
|
# 初始化次要变点集合
|
|
|
|
|
all_important_nodes = set()
|
|
|
|
|
|
|
|
|
|
# 分配阈值
|
|
|
|
|
# num_points = len(changes_intensity)
|
|
|
|
|
# thresholds = {bkp: 0.7 - 0.4 * ((rank) / (num_points - 1))**2 for rank, (bkp, _) in enumerate(changes_intensity)}
|
|
|
|
|
# 初始化一个空字典来存储每个变点及其对应的阈值
|
|
|
|
|
thresholds = {}
|
|
|
|
|
|
|
|
|
|
# 获取变点总数
|
|
|
|
|
num_points = len(changes_intensity)
|
|
|
|
|
# 使用enumerate函数遍历排序后的变点列表及其索引(即排名)
|
|
|
|
|
for rank, (bkp, _) in enumerate(changes_intensity):
|
|
|
|
|
# 计算归一化排名比例
|
|
|
|
|
normalized_rank = 1 - ((num_points - 1-rank) / (num_points - 1))
|
|
|
|
|
# 计算阈值降低的量,即缩放因子乘以归一化排名比例的平方
|
|
|
|
|
decrement = 0.4 * (normalized_rank ** 2)
|
|
|
|
|
# 计算并设置当前变点的阈值
|
|
|
|
|
thresholds[bkp] = 0.3+decrement
|
|
|
|
|
# 此时,thresholds字典中包含了每个变点的位置和对应的阈值
|
|
|
|
|
|
|
|
|
|
# 遍历每一个主要变点
|
|
|
|
|
for bkp, intensity in changes_intensity:
|
|
|
|
|
# 计算该变点左右窗口的边界
|
|
|
|
|
start = max(0, int(bkp - window_size / 2))
|
|
|
|
|
end = min(length, int(bkp + window_size / 2))
|
|
|
|
|
threshold = thresholds[bkp]
|
|
|
|
|
|
|
|
|
|
# 遍历窗口内的每一个点
|
|
|
|
|
for i in range(start, end):
|
|
|
|
|
# 生成一个[0, 1]之间的随机数
|
|
|
|
|
random_value = np.random.random()
|
|
|
|
|
# 如果随机数大于分配的阈值,将该点添加到次要变点集合中
|
|
|
|
|
if random_value >= threshold:
|
|
|
|
|
all_important_nodes.add(i)
|
|
|
|
|
|
|
|
|
|
return all_important_nodes
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_magnitude(run_tag, factor, normalize,gpu):
|
|
|
|
|
'''
|
|
|
|
|
:param run_tag:
|
|
|
|
|
:param factor:
|
|
|
|
|
:return: Perturbed Magnitude
|
|
|
|
|
'''
|
|
|
|
|
data = load_ucr('data/' + run_tag + '/' + run_tag + '_attack'+gpu+'.txt', normalize=normalize)
|
|
|
|
|
X = data[:, 1:]
|
|
|
|
|
|
|
|
|
|
max_magnitude = X.max(1)
|
|
|
|
|
min_magnitude = X.min(1)
|
|
|
|
|
mean_magnitude = np.mean(max_magnitude - min_magnitude)
|
|
|
|
|
|
|
|
|
|
perturbed_mag = mean_magnitude * factor
|
|
|
|
|
print('Perturbed Magnitude:', perturbed_mag)
|
|
|
|
|
|
|
|
|
|
return perturbed_mag
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Attacker:
|
|
|
|
|
|
|
|
|
|
def __init__(self, run_tag, model_type, cuda, normalize, e,device,gpu,classes):
|
|
|
|
|
self.run_tag = run_tag
|
|
|
|
|
self.model_type = model_type
|
|
|
|
|
self.cuda = cuda
|
|
|
|
|
#self.intervals = get_attack_position(self.run_tag, self.top_k)
|
|
|
|
|
self.normalize = normalize
|
|
|
|
|
self.e = e
|
|
|
|
|
self.device = device
|
|
|
|
|
self.gpu = gpu
|
|
|
|
|
self.classes = classes
|
|
|
|
|
|
|
|
|
|
def perturb_ts(self, perturbations, ts, attack_pos):
|
|
|
|
|
'''
|
|
|
|
|
:param perturbations:formalized as a tuple(x,e),x(int) is the x-coordinate,e(float) is the epsilon,e.g.,(2,0.01)
|
|
|
|
|
:param ts: time series
|
|
|
|
|
:return: perturbed ts
|
|
|
|
|
'''
|
|
|
|
|
# first we copy a ts
|
|
|
|
|
|
|
|
|
|
ts_tmp = np.copy(ts)
|
|
|
|
|
|
|
|
|
|
coordinate = 0 # 初始化perturbations数组的索引
|
|
|
|
|
for i in range(len(attack_pos)):
|
|
|
|
|
if attack_pos[i] == 1:
|
|
|
|
|
ts_tmp[i] += perturbations[coordinate]
|
|
|
|
|
coordinate += 1
|
|
|
|
|
# for interval in self.intervals:
|
|
|
|
|
# for i in range(int(interval[0]), int(interval[1])):
|
|
|
|
|
# ts_tmp[i] += perturbations[coordinate]
|
|
|
|
|
# coordinate += 1
|
|
|
|
|
return ts_tmp
|
|
|
|
|
|
|
|
|
|
def plot_per(self, perturbations, ts, target_class, sample_idx,attack_pos, prior_probs, attack_probs, factor):
|
|
|
|
|
|
|
|
|
|
# Obtain the perturbed ts
|
|
|
|
|
ts_tmp = np.copy(ts)
|
|
|
|
|
ts_perturbed = self.perturb_ts(perturbations=perturbations, ts=ts, attack_pos=attack_pos)
|
|
|
|
|
# Start to plot
|
|
|
|
|
plt.figure(figsize=(6, 4))
|
|
|
|
|
plt.plot(ts_tmp, color='b', label='Original %.2f' % prior_probs)
|
|
|
|
|
plt.plot(ts_perturbed, color='r', label='Perturbed %.2f' % attack_probs)
|
|
|
|
|
plt.xlabel('Time', fontsize=12)
|
|
|
|
|
|
|
|
|
|
if target_class == -1:
|
|
|
|
|
plt.title('Untargeted: Sample %d, eps_factor=%.3f' %
|
|
|
|
|
(sample_idx, factor), fontsize=14)
|
|
|
|
|
else:
|
|
|
|
|
plt.title('Targeted(%d): Sample %d, eps_factor=%.3f' %
|
|
|
|
|
(target_class, sample_idx, factor), fontsize=14)
|
|
|
|
|
|
|
|
|
|
plt.legend(loc='upper right', fontsize=8)
|
|
|
|
|
plt.savefig('result_' + str(factor) + '_' + str(self.model_type) + '/'
|
|
|
|
|
+ self.run_tag + '/figures'+self.gpu+'/' + self.run_tag +'_' + str(sample_idx) + '.png')
|
|
|
|
|
# plt.show()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def fitness(self, device,perturbations, ts, sample_idx, queries,attack_pos, target_class=-1):
|
|
|
|
|
#device = torch.device("cuda:0" if self.cuda else "cpu")
|
|
|
|
|
perturbations = torch.tensor(perturbations)
|
|
|
|
|
#ts = torch.tensor(ts, device='cuda')
|
|
|
|
|
# perturbations = torch.tensor(perturbations, dtype=torch.float32)
|
|
|
|
|
# perturbations = perturbations.to(device)
|
|
|
|
|
|
|
|
|
|
queries[0] += 1
|
|
|
|
|
ts_perturbed = self.perturb_ts(perturbations, ts,attack_pos = attack_pos)
|
|
|
|
|
mse = mean_squared_error(ts, ts_perturbed)
|
|
|
|
|
|
|
|
|
|
prob, _, _, _, _ = query_one(run_tag=self.run_tag,device=device, idx=sample_idx, attack_ts=ts_perturbed,
|
|
|
|
|
target_class=target_class, normalize=self.normalize,
|
|
|
|
|
cuda=self.cuda, model_type=self.model_type, e=self.e,gpu=self.gpu,n_class=self.classes)
|
|
|
|
|
# decimal_places = -int(np.floor(np.log10(prob))) + 1
|
|
|
|
|
# # 计算需要标准化的数字的标准化因子
|
|
|
|
|
# scale_factor = 10 ** decimal_places
|
|
|
|
|
# # 标准化目标数字
|
|
|
|
|
# normalized = np.round(scale_factor * mse, 0) / scale_factor
|
|
|
|
|
# 确保标准化后的数字至少为0.01
|
|
|
|
|
#normalized = max(normalized, 0.01)
|
|
|
|
|
prob = torch.tensor(prob)
|
|
|
|
|
|
|
|
|
|
if target_class != -1:
|
|
|
|
|
prob = 1 - prob
|
|
|
|
|
|
|
|
|
|
return prob # The fitness function is to minimize the fitness value
|
|
|
|
|
|
|
|
|
|
def attack_success(self, device, perturbations, ts, sample_idx, attack_pos, iterations, target_class=-1,
|
|
|
|
|
verbose=True):
|
|
|
|
|
iterations[0] += 1
|
|
|
|
|
print('The %d iteration' % iterations[0])
|
|
|
|
|
ts_perturbed = self.perturb_ts(perturbations, ts, attack_pos)
|
|
|
|
|
# ts_perturbed = torch.tensor(ts_perturbed, device='cuda')
|
|
|
|
|
# Obtain the perturbed probability vector and the prior probability vector
|
|
|
|
|
prob, prob_vector, prior_prob, prior_prob_vec, real_label = query_one(self.run_tag, device, idx=sample_idx,
|
|
|
|
|
attack_ts=ts_perturbed,
|
|
|
|
|
target_class=target_class,
|
|
|
|
|
normalize=self.normalize,
|
|
|
|
|
verbose=verbose, cuda=self.cuda,
|
|
|
|
|
model_type=self.model_type,
|
|
|
|
|
e=self.e, gpu=self.gpu,n_class=self.classes)
|
|
|
|
|
|
|
|
|
|
predict_class = torch.argmax(prob_vector).to(device)
|
|
|
|
|
prior_class = torch.argmax(prior_prob_vec).to(device)
|
|
|
|
|
real_label = real_label.to(device)
|
|
|
|
|
|
|
|
|
|
# Conditions for early termination(empirical-based estimation), leading to save the attacking time
|
|
|
|
|
# But it may judge incorrectly that this may decrease the success rate of the attack.
|
|
|
|
|
if (iterations[0] > 20 and prob > 0.9):
|
|
|
|
|
|
|
|
|
|
print('The %d sample is not expected to successfully attack.' % sample_idx)
|
|
|
|
|
print('prob: ', prob)
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
if prior_class != real_label:
|
|
|
|
|
print('The %d sample cannot be classified correctly, no need to attack' % sample_idx)
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
if prior_class == target_class:
|
|
|
|
|
print(
|
|
|
|
|
'The true label of %d sample equals to target label, no need to attack' % sample_idx)
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
if verbose:
|
|
|
|
|
print('The Confidence of current iteration: %.4f' % prob)
|
|
|
|
|
print('########################################################')
|
|
|
|
|
|
|
|
|
|
# The criterion of attacking successfully:
|
|
|
|
|
# Untargeted attack: predicted label is not equal to the original label.
|
|
|
|
|
# Targeted attack: predicted label is equal to the target label.
|
|
|
|
|
if ((target_class == -1 and predict_class != prior_class) or
|
|
|
|
|
(target_class != -1 and predict_class == target_class)):
|
|
|
|
|
print('##################### Attack Successfully! ##########################')
|
|
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
def attack(self, sample_idx,device, target_class=-1, factor=0.04,
|
|
|
|
|
max_iteration=50, popsize=200, verbose=True):
|
|
|
|
|
|
|
|
|
|
class MyProblem(ElementwiseProblem):
|
|
|
|
|
def __init__(self, outer, ts, sample_idx, queries, attack_pos, target_class, device, bounds,perturbed_magnitude):
|
|
|
|
|
#self, device, perturbations, ts, sample_idx, queries, attack_pos, target_class = -1
|
|
|
|
|
self.perturbed_magnitude=perturbed_magnitude
|
|
|
|
|
super().__init__(n_var=len(bounds), n_obj=1, n_constr=0, xl=-perturbed_magnitude * np.ones(len(bounds)), # 下界
|
|
|
|
|
xu=perturbed_magnitude * np.ones(len(bounds)))
|
|
|
|
|
self.outer = outer
|
|
|
|
|
self.ts = ts
|
|
|
|
|
self.sample_idx = sample_idx
|
|
|
|
|
self.queries = queries
|
|
|
|
|
self.attack_pos = attack_pos
|
|
|
|
|
self.target_class = target_class
|
|
|
|
|
self.device = device
|
|
|
|
|
|
|
|
|
|
def _evaluate(self, x, out, *args, **kwargs):
|
|
|
|
|
f = self.outer.fitness(self.device,x , self.ts, self.sample_idx,self.queries, self.attack_pos, self.target_class,
|
|
|
|
|
)
|
|
|
|
|
# self, device,perturbations, ts, sample_idx, queries,attack_pos, target_class=-1
|
|
|
|
|
out["F"] = np.array([f])
|
|
|
|
|
|
|
|
|
|
class MyCallback(Callback):
|
|
|
|
|
def __init__(self, outer, device, ts, sample_idx, queries, attack_pos, target_class, verbose,iterations):
|
|
|
|
|
super().__init__()
|
|
|
|
|
self.outer = outer
|
|
|
|
|
self.device = device
|
|
|
|
|
self.ts = ts
|
|
|
|
|
self.sample_idx = sample_idx
|
|
|
|
|
self.queries = queries
|
|
|
|
|
self.attack_pos = attack_pos
|
|
|
|
|
self.target_class = target_class
|
|
|
|
|
self.iterations = iterations
|
|
|
|
|
self.verbose = verbose
|
|
|
|
|
self.generation = 0
|
|
|
|
|
|
|
|
|
|
def notify(self, algorithm):
|
|
|
|
|
x = algorithm.pop.get("X")[np.argmin(algorithm.pop.get("F"))] # Current best solution
|
|
|
|
|
ifsuccess=self.outer.attack_success(self.device, x, self.ts,
|
|
|
|
|
self.sample_idx, self.attack_pos,
|
|
|
|
|
self.iterations, self.target_class,
|
|
|
|
|
self.verbose)
|
|
|
|
|
#self,device,perturbations, ts, sample_idx, attack_pos, iterations, target_class=-1, verbose=True
|
|
|
|
|
# best_f = min(algorithm.pop.get("F"))
|
|
|
|
|
# best_x = algorithm.pop.get("X")[np.argmin(algorithm.pop.get("F"))]
|
|
|
|
|
# print(f"Generation {self.generation}: Best solution so far: {best_x}, Fitness: {best_f}")
|
|
|
|
|
# print('-------',should_terminate)
|
|
|
|
|
if ifsuccess:
|
|
|
|
|
algorithm.termination.force_termination = True
|
|
|
|
|
self.generation += 1
|
|
|
|
|
|
|
|
|
|
test = load_ucr('data/' + self.run_tag + '/' + self.run_tag + '_attack'+self.gpu+'.txt'
|
|
|
|
|
, normalize=self.normalize)
|
|
|
|
|
#attack_poses = np.loadtxt('data/' + self.run_tag + '/' + self.run_tag + '_attackPos'+self.gpu+'.txt')
|
|
|
|
|
|
|
|
|
|
ori_ts = test[sample_idx][1:]
|
|
|
|
|
|
|
|
|
|
attacked_probs, attacked_vec, prior_probs, prior_vec, real_label = query_one(self.run_tag,device,idx=sample_idx,
|
|
|
|
|
attack_ts=ori_ts,
|
|
|
|
|
target_class=target_class,
|
|
|
|
|
normalize=self.normalize,
|
|
|
|
|
verbose=False,
|
|
|
|
|
cuda=self.cuda, e=self.e,
|
|
|
|
|
model_type=self.model_type,gpu=self.gpu,n_class=self.classes)
|
|
|
|
|
prior_class = torch.argmax(prior_vec).to(device)
|
|
|
|
|
if prior_class != real_label:
|
|
|
|
|
print('The %d sample cannot be classified correctly, no need to attack' % sample_idx)
|
|
|
|
|
return ori_ts,ori_ts, [prior_probs, attacked_probs, 0, 0, 0, 0, 0, 'WrongSample']
|
|
|
|
|
# Get the maximum perturbed magnitude
|
|
|
|
|
perturbed_magnitude = get_magnitude(self.run_tag, factor, normalize=self.normalize,gpu=self.gpu)
|
|
|
|
|
bounds = []
|
|
|
|
|
# 变点检测找区间
|
|
|
|
|
length = ori_ts.shape
|
|
|
|
|
all_bkps = detect_change_points(ori_ts)
|
|
|
|
|
all_bkps = [x - 1 for x in all_bkps]
|
|
|
|
|
window_size = int(length[0] / len(all_bkps)) # 窗口大小
|
|
|
|
|
|
|
|
|
|
sequence = np.zeros(len(ori_ts), dtype=int)
|
|
|
|
|
# secondary_changes = process_change_points(length[0], all_bkps, window_size)
|
|
|
|
|
# sequence[list(secondary_changes)] = 1
|
|
|
|
|
# sequence[list(all_bkps)] = 1
|
|
|
|
|
# 处理变点
|
|
|
|
|
if len(all_bkps)<0.7*length[0] :
|
|
|
|
|
secondary_changes = process_change_points(ori_ts, length[0], all_bkps, window_size)
|
|
|
|
|
sequence[list(secondary_changes)] = 1
|
|
|
|
|
sequence[list(all_bkps)] = 1
|
|
|
|
|
else:
|
|
|
|
|
sequence[list(all_bkps)] = 1
|
|
|
|
|
attack_pos = sequence
|
|
|
|
|
|
|
|
|
|
steps_count = attack_pos.sum()
|
|
|
|
|
for i in range(len(attack_pos)):
|
|
|
|
|
if attack_pos[i] == 1:
|
|
|
|
|
bounds.append((-1 * perturbed_magnitude, perturbed_magnitude))
|
|
|
|
|
|
|
|
|
|
print('The length of shapelet interval', steps_count)
|
2025-04-21 18:50:53 +08:00
|
|
|
|
|
|
|
|
|
if False: # 需要时手动修改为True,用于测试随机变点
|
|
|
|
|
# 生成随机变点
|
|
|
|
|
# 假设ori_ts的长度可以从shape[0]获取
|
|
|
|
|
length = ori_ts.shape[0]
|
|
|
|
|
|
|
|
|
|
# 获取与steps_count相同数量的随机变点位置
|
|
|
|
|
random_indices = np.random.choice(length, size=int(steps_count), replace=False)
|
|
|
|
|
|
|
|
|
|
random_pos = np.zeros(length, dtype=int)
|
|
|
|
|
random_pos[random_indices] = 1
|
|
|
|
|
|
|
|
|
|
# 用于存储随机变点的扰动边界
|
|
|
|
|
random_bounds = []
|
|
|
|
|
for i in range(len(random_pos)):
|
|
|
|
|
if random_pos[i] == 1:
|
|
|
|
|
random_bounds.append((-1 * perturbed_magnitude, perturbed_magnitude))
|
|
|
|
|
|
|
|
|
|
print('The length of random interval', random_pos.sum())
|
|
|
|
|
|
|
|
|
|
# 验证随机变点数量与原算法相同
|
|
|
|
|
assert random_pos.sum() == steps_count
|
|
|
|
|
# 覆盖原有的变点
|
|
|
|
|
attack_pos = random_pos
|
|
|
|
|
|
2025-04-20 20:55:06 +08:00
|
|
|
|
#print('The length of bounds', len(bounds))
|
|
|
|
|
popmul = max(1, popsize // len(bounds))
|
|
|
|
|
# Record of the number of iterations
|
|
|
|
|
iterations = [0]
|
|
|
|
|
queries = [0]
|
|
|
|
|
problem = MyProblem(self, ori_ts, sample_idx, queries, attack_pos, target_class, device,bounds,perturbed_magnitude)
|
|
|
|
|
algorithm = PSO(pop_size=50, w=0.9, c1=1.2, c2=2.2)
|
|
|
|
|
callback1 = MyCallback(self, device, ori_ts, sample_idx, queries, attack_pos, target_class, verbose,iterations)
|
|
|
|
|
res = minimize(problem, algorithm, ('n_gen', max_iteration), callback=callback1, seed=1, verbose=verbose)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
attack_result = res.X # 最优解
|
|
|
|
|
# attack_result = differential_evolution(func=fitness_fn, bounds=bounds
|
|
|
|
|
# , maxiter=max_iteration, popsize=popmul
|
|
|
|
|
# , recombination=0.7, callback=callback_fn,
|
|
|
|
|
# atol=-1, polish=False)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
attack_ts = self.perturb_ts(attack_result, ori_ts, attack_pos)
|
|
|
|
|
|
|
|
|
|
mse = mean_squared_error(ori_ts, attack_ts)
|
|
|
|
|
|
|
|
|
|
attacked_probs, attacked_vec, prior_probs, prior_vec, real_label = query_one(self.run_tag,device, idx=sample_idx,
|
|
|
|
|
attack_ts=attack_ts,
|
|
|
|
|
target_class=target_class,
|
|
|
|
|
normalize=self.normalize,
|
|
|
|
|
verbose=False,
|
|
|
|
|
cuda=self.cuda, e=self.e,
|
|
|
|
|
model_type=self.model_type,gpu=self.gpu,n_class=self.classes)
|
|
|
|
|
|
|
|
|
|
predicted_class = torch.argmax(attacked_vec).to(device)
|
|
|
|
|
prior_class = torch.argmax(prior_vec).to(device)
|
|
|
|
|
|
|
|
|
|
if prior_class != real_label:
|
|
|
|
|
success = 'WrongSample'
|
|
|
|
|
|
|
|
|
|
elif prior_class == target_class:
|
|
|
|
|
success = 'NoNeedAttack'
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
if (predicted_class.item() != prior_class.item() and target_class == -1) \
|
|
|
|
|
or (predicted_class.item() == target_class and target_class != -1):
|
|
|
|
|
success = 'Success'
|
|
|
|
|
else:
|
|
|
|
|
success = 'Fail'
|
|
|
|
|
|
|
|
|
|
if success == 'Success':
|
|
|
|
|
self.plot_per(perturbations=attack_result, ts=ori_ts, target_class=target_class,
|
|
|
|
|
sample_idx=sample_idx, attack_pos=attack_pos, prior_probs=prior_probs, attack_probs=attacked_probs, factor=factor)
|
|
|
|
|
|
|
|
|
|
return ori_ts, attack_ts, [prior_probs, attacked_probs, prior_class.item(),
|
|
|
|
|
predicted_class.item(), queries[0], mse, iterations[0], success]
|
|
|
|
|
|
|
|
|
|
|