import sys, os
import pyvisa as visa
# make modules importable from anywhere
# sys.path.append(r'C:\Users\Christopher\OneDrive - University College London\ipes_cdt\phd_project\projects\soa_driving\code\soa_driving\optimisation\python\\')
from soa import devices, signalprocessing, analyse, distort_tf
import time
import csv
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import random
import math
import multiprocessing
import pickle
from scipy import signal
[docs]class PSO:
"""
Optimises parameters using particle swarm optimisation (PSO) algorithm
"""
def __init__(self,
t,
init_OP,
n,
iter_max,
rep_max,
init_v_f,
max_v_f,
w_init=0.9,
w_final=None,
c1=0.2, c2=0.2,
adapt_accel=True,
sect_to_optimise='whole_signal',
areas_to_suppress='None',
off_suppress_f=0.2,
on_suppress_f=0.8,
embed_init_signal=True,
path_to_embedded_signal=None,
directory='Unknown',
cost_f='Unknown',
st_importance_factor=None,
sim_model=None,
awg=None,
osc=None,
awg_res=8,
min_val=-1.0,
max_val=1.0,
record_extra_info=False,
linux=True,
SP=None):
"""
Initialise pso parameters
Args:
- t (list of floats): time array for signals
- init_OP (array of floats): start OP from which to begin optimisation
- n (int): number of particles
- iter_max (int): max number of pso iterations
- rep_max (int): number of times to repeat PSO
- init_v_f (float): factor by which to multiply initial positions by to
get initial velocity for first iteration
- max_v_f (float): factor by which to multiply max param values by to get
max velocity for each iteration
- w_init (float): initial inertial weight value (0 <= w < 1) e.g. 0.9
- w_final (float): final intertia weight value e.g. 0.5 (0 <= w < 1)
- c1 (float): acceleration constant (0 <= c1 <= 1)
- c2 (float): pso acceleration constant (0 <= c2 <= 1)
- adapt_accel (boolean): whether want to use adaptive acceleration i.e.
adaptively adjust w, c1 and c2
- sect_to_optimise (str): specifies which part of signal to optimise
(enter as string from list of options. Must be 1 of: 'whole_signal'
- areas_to_suppress (str): specify which (if any) areas of signal to
restrict in terms of what value they particles can take. Must be 1 of:
'None', 'start_centre', 'start_centre_end', 'pisic_shape'
- off_suppress_f (float): factor by which to multiply initial guess by
when in 'off' state and add to this to get space constraint
- on_suppress_f (float): factor by which to multiply initial guess by when
in 'on' state to get on constraing for PSO particles
- embed_init_signal (boolean): specify whether want to embed the initial
signal amongst the initialised particle positions so that don't ever
optimise towards something that's worse than initial signal
- path_to_embedded_signal (str): give full path to where signal .csv file
of signal you want to embed is saved. If path_to_embedded_signal = None
and embed_init_signal = True, will embed init_OP amongst the initialised
particle positions
- directory (str): directory in which we're working (will save files here)
- cost_f (str): cost function to use to evaluate performance. Must be 1
of: 'mSE', 'st', 'mSE+st', 's_mse+st', 'mse+st+os', 'zlpc'
- st_importance_factor (float): factor by which want settling time to be
larger/smaller than mean squared error. Only (and must) specify if using
s_mse+st cost_f
- sim_model (obj): simulation model to use if using simulation. When model
is passed a list of inputs, it should be able to simulate list of outputs
- awg (obj): arbitrary waveform generator object if using experiment
- osc (obj): oscilliscope object if using experiment
- awg_res (int): resolution (bits) of awg used to generate signal
- min_val (float): min voltage value awg can take
- max_val (float): max voltage value awg can take
- record_extra_info (boolean) = If want to plot extra info about PSO.
- linux (bool): If True, file dirs have forward slash. If false, have backslash
- SP (list of floats): target SP PSO should try to achieve
"""
if linux:
self.slash = '/'
else:
self.slash = '\\'
self.t = t
self.init_OP = init_OP
self.n = n
self.iter_max = iter_max
self.rep_max = rep_max
self.init_v_f = init_v_f
self.max_v_f = max_v_f
self.w_init = w_init
self.w_final = w_final
self.c1 = c1
self.c2 = c2
self.adapt_accel = adapt_accel
self.sect_to_optimise = sect_to_optimise
self.areas_to_suppress = areas_to_suppress
self.off_suppress_f = off_suppress_f
self.on_suppress_f = on_suppress_f
self.embed_init_signal = embed_init_signal
self.path_to_embedded_signal = path_to_embedded_signal
self.directory = directory
self.cost_f = cost_f
self.st_importance_factor = st_importance_factor
self.sim_model = sim_model
self.awg = awg
self.osc = osc
self.awg_res = awg_res
self.min_val = min_val
self.max_val = max_val
self.SP = SP
self.record_extra_info = record_extra_info
self.num_points = len(self.init_OP)
if sim_model == None and awg == None or \
sim_model == None and osc == None or \
sim_model != None and awg != None and osc != None:
message = 'Must either use simulation or experimental env to perform \
pso in. sim_model, awg and osc cannot all be None /not None.'
sys.exit(message)
if self.adapt_accel == True and w_final == None:
sys.exit('Must specifiy w_final value if using adaptive accel.')
# get init output
if self.sim_model != None:
self.X0 = self.__find_x_init(self.sim_model)
self.init_PV = self.__getTransferFunctionOutput(self.sim_model,
self.init_OP,
self.t,
self.X0)
else:
self.init_PV = self.__getSoaOutput(self.init_OP)
# init params
self.K_index, self.K = self.__getSectionToOptimise() # opt indices
self.m = len(self.K) # num params to optimise
self.num_points = int(len(self.t)) # num points in signal
self.curr_iter = 0
self.x = np.zeros((self.n, self.m)) # current pop position array
self.x_value = np.zeros(self.n) # fitness vals of positions
self.pbest_value = np.copy(self.x_value) # best local fitness vals
self.min_cost_index = np.argmin(self.pbest_value) # index best fitness
self.gbest = np.copy(self.K) # global best positions
self.gbest_cost = self.pbest_value[self.min_cost_index] # global best val
self.awg_step_size = (self.max_val - self.min_val) / (2**self.awg_res)
if self.SP is None:
self.SP = analyse.ResponseMeasurements(self.init_PV, self.t).sp.sp
else:
self.SP = SP
# set particle position and velocity boundaries
self.LB = np.zeros(self.m) # lower bound on particle positions
self.UB = np.zeros(self.m) # upper bound on particle positions
for g in range(0, self.m):
self.LB[g] = self.min_val
self.UB[g] = self.max_val
self.v_LB = np.zeros(self.m) # lower bound on particle velocities
self.v_UB = np.zeros(self.m) # upper bound on particle velocities
for g in range(0, self.m):
self.v_UB[g] = self.UB[g] * self.max_v_f
self.v_LB[g] = self.v_UB[g] * (-1)
# initialise particle positions
for g in range(0, self.m):
if self.embed_init_signal == True:
if self.path_to_embedded_signal == None:
self.x[0, g] = self.init_OP[g] # embed init OP at start
else:
init_sig = self.__getInitialDrivingSignalGuess(self.path_to_embedded_signal)
self.x[0, g] = init_sig[g]
else:
self.x[0, g] = self.LB[g] + ( random.uniform(0, 1) * (self.UB[g] - self.LB[g]) )
self.x[0, :] = self.__discretiseParticlePosition(self.x[0, :])
self.x[0, :] = self.__suppressAreasOfSignal(self.x[0, :])
self.x[0, :] = self.__discretiseParticlePosition(self.x[0, :])
self.x[0, :] = self.__suppressAreasOfSignal(self.x[0, :])
self.x[0, :] = self.__discretiseParticlePosition(self.x[0, :])
for j in range(1, self.n):
for g in range(0, self.m):
self.x[j, g] = self.LB[g] + \
(random.uniform(0,1) * (self.UB[g] - self.LB[g]))
self.x[j, :] = self.__discretiseParticlePosition(self.x[j, :])
self.x[j, :] = self.__suppressAreasOfSignal(self.x[j, :])
self.x[j, :] = self.__discretiseParticlePosition(self.x[j, :])
self.x[j, :] = self.__suppressAreasOfSignal(self.x[j, :])
self.x[j, :] = self.__discretiseParticlePosition(self.x[j, :])
# initialise particle velocities
self.v = np.zeros((self.n, self.m))
for j in range(0, self.n):
for g in range(0, self.m):
self.v[j, g] = self.init_v_f * self.x[j, g]
self.pbest = np.copy(self.x)
self.iter_gbest_reached = [0] # for plotting
# set up dirs and load data if needed
self.path_to_data = self.directory + self.slash + 'data' + self.slash # path to save data
self.pso_dir_name = "n_" + str(self.n) + \
"_mxvl_" + str(self.max_val) + \
"_mnvl_" + str(self.min_val) + \
"_ivf_" + str(self.init_v_f) + \
"_mvf_" + str(self.max_v_f) + \
"_irmx_" + str(self.iter_max) + \
"_rm_" + str(self.rep_max) + \
"_cstf_" + str(self.cost_f) + \
'_sif_' + str(self.st_importance_factor) + \
"_wi_" + str(self.w_init) + \
"_wf_" + str(self.w_final) + \
"_c1_" + str(self.c1) + \
"_c2_" + str(self.c2) + \
'_aptacl_' + str(self.adapt_accel) + \
'_offsf_' + str(self.off_suppress_f) + \
'_onsf_' + str(self.on_suppress_f) + \
'_emb_' + str(self.embed_init_signal) + self.slash
self.path_to_pso_data = self.path_to_data + self.pso_dir_name
if os.path.exists(self.path_to_data) == False:
os.mkdir(self.path_to_data)
if os.path.exists(self.path_to_pso_data) == False:
os.mkdir(self.path_to_pso_data)
if os.path.exists(self.path_to_pso_data + 'curr_pos.csv') == True:
print('Programme detected a curr_pos.csv file in pso data directory ' + \
str(self.path_to_pso_data) + \
'. Continue from the last particle swarm position(s) saved?')
ans = input("y or n: ")
while ans != 'y' or ans != 'n':
ans = input('y or n')
if ans == 'n':
pass
elif ans == 'y':
message = 'Please check your folders in the path ' + \
str(self.path_to_data) + \
'. Please enter the full path of the folder from which you \
want to load your saved data. To find the path, open the \
folder and right click on any file saved inside it, \
click on properties and copy-past the path. This programme \
requires you to manually enter this path because you might \
have multiple iteration extension folders to choose from \
as a starting point.'
print(message)
self.path_to_pso_data = input('Enter path of folder data is saved\
in (NOT path to specific file): ') + self.slash
while os.path.exists(self.path_to_pso_data) == False:
print('Entered path not found. Please enter a valid path to \
your saved data')
self.path_to_pso_data = input('Enter path of folder data is \
saved in (NOT path to specific file): ') + self.slash
self.iter_max, \
self.path_to_pso_data, \
self.x, \
self.pbest, \
self.x_value, \
self.pbest_value, \
self.min_cost_index, \
self.gbest, \
self.gbest_cost, \
self.curr_iter = self.__loadPsoData()
os.mkdir(self.path_to_pso_data)
else:
# evaluate init particle positions
print('Initialising PSO...')
start_time = time.time()
self.x_value = self.__evaluateParticlePositions(np.copy(self.x),
curr_iter=self.curr_iter,
plot=True)
self.pbest_value = np.copy(self.x_value) # store local best vals
end_time = time.time()
time_all_particles = end_time - start_time
print("Time to evaluate " + str(self.n) + \
" particles == time per generation: " + str(time_all_particles) + \
" s | " + str(time_all_particles/60) + " mins | " + \
str((time_all_particles/60)/60) + " hrs")
time_all_generations = time_all_particles * self.iter_max
print("Time to evaluate " + str(self.iter_max) + \
" generations == estimated time to algorithm completion: " + \
str(time_all_generations) + " s | " + str(time_all_generations/60) \
+ " mins | " + str((time_all_generations/60)/60) + " hrs")
print("Time to complete all " + str(self.rep_max) + " PSO repetitions: " + \
str(time_all_generations*self.rep_max) + " s | " + \
str((time_all_generations*self.rep_max)/60) + " mins | " + \
str(((time_all_generations*self.rep_max/60))/60) + " hrs")
# init global cost history for plotting
self.gbest_cost_history = []
self.min_cost_index = np.argmin(self.pbest_value)
for g in range(0, self.m):
self.gbest[g] = self.pbest[self.min_cost_index, g]
self.gbest_cost = self.pbest_value[self.min_cost_index]
self.gbest_cost_history = np.append([self.gbest_cost_history],
[self.gbest_cost])
print('Costs: ' + str(self.pbest_value))
print('Best cost: ' + str(self.gbest_cost))
# init analysis vals
self.rt_st_os_analysis = [self.__analyseSignal(self.gbest, curr_iter=0)]
if self.record_extra_info == True:
# record init particle positions and pbest
self.__savePsoData(np.copy(self.x),
np.copy(self.x_value),
self.curr_iter,
np.copy(self.pbest),
self.gbest_cost_history,
[0],
self.rt_st_os_analysis)
# run pso algorithm
self.__runPsoAlgorithm()
def __analyseSignal(self, OP, curr_iter):
"""
This method analyses a signal to get its rise time, settling time and
overshoot. Will also save input and output data
Args:
- OP = driving signal whose resultant output we want to analyse
- curr_iter = current iteration. Use this for saving.
Returns:
- [rt, st, os] = table with rise time, settling time and overshoot of
output signal
"""
# GET OUTPUT
if self.sim_model != None:
PV = self.__getTransferFunctionOutput(self.sim_model,
OP,
self.t,
self.X0)
else:
PV = self.__getSoaOutput(OP)
OP_df = pd.DataFrame(OP)
OP_df.to_csv(self.path_to_pso_data + "OP_itr" + str(curr_iter) + ".csv",
index=None,
header=False)
PV_df = pd.DataFrame(PV)
PV_df.to_csv(self.path_to_pso_data + "PV_itr" + str(curr_iter) + ".csv",
index=None,
header=False)
responseMeasurementsObject = analyse.ResponseMeasurements(PV, self.t)
rt = responseMeasurementsObject.riseTime
st = responseMeasurementsObject.settlingTime
os = responseMeasurementsObject.overshoot
st_index = responseMeasurementsObject.settlingTimeIndex
return [rt, st, os, st_index]
def __getInitialDrivingSignalGuess(self, path_to_sig):
'''
This method loads a previously defined csv with the driving signal that
we want to use as our initial guess to embed amongst the initialised
particle positions
Args:
- path_to_sig = path to signal that want to use
Returns:
- init_sig = initial guess for signal to embed amongst particles
'''
init_sig_array = pd.read_csv(path_to_sig, header=None).values
init_sig = np.zeros(len(init_sig_array))
for i in range(0, len(init_sig_array)):
init_sig[i] = float(init_sig_array[i])
return init_sig
def __savePsoData(self,
x,
x_value,
curr_iter,
pbest,
gbest_cost_history,
iter_gbest_reached,
rt_st_os_analysis):
"""
This method is for saving pso progress so it can later be loaded
Args:
- x = particle positions to save
- x_value = fitness values for current positions
- pbest = personal historic best position for each particle
- curr_iter = current iteration reached so far
- gbest_cost_history = history of gbest reached
- iter_gbest_reached = corresponding iteration new gbest was found
- rt_st_os_analysis = hsitory of rise time, settling time and overshoot
throughout pso algoithm
Returns:
-
"""
iter_gbest_reached_df = pd.DataFrame(iter_gbest_reached)
gbest_cost_history_df = pd.DataFrame(gbest_cost_history)
rt_st_os_analysis_df = pd.DataFrame(rt_st_os_analysis)
curr_pos_df = pd.DataFrame(x)
curr_pos_df.to_csv(self.path_to_pso_data + 'curr_pos.csv',
index=None,
header=False)
curr_pos_val_df = pd.DataFrame(x_value)
curr_pos_val_df.to_csv(self.path_to_pso_data + 'curr_pos_values.csv',
index=None,
header=False)
iter_gbest_reached_df.to_csv(self.path_to_pso_data + "iter_gbest_reached.csv",
index=None,
header=False)
gbest_cost_history_df.to_csv(self.path_to_pso_data + "gbest_cost_history.csv",
index=None,
header=False)
rt_st_os_analysis_df.to_csv(self.path_to_pso_data + 'rt_st_os_analysis.csv',
index=None,
header=False)
f = open(self.path_to_pso_data + 'curr_iter.csv', 'w')
f.write(str(curr_iter))
if self.record_extra_info == True:
# save x and pbest for each particle
curr_pbest_df = pd.DataFrame(x)
curr_pbest_df.to_csv(self.path_to_pso_data + 'curr_inputs_' \
+ str(curr_iter) + '.csv',
index=None,
header=False)
curr_x_df = pd.DataFrame(pbest)
curr_x_df.to_csv(self.path_to_pso_data + 'pbest_pos_' \
+ str(curr_iter) + '.csv',
index=None,
header=False)
def __loadPsoData(self):
"""
This method loads previously saved pso data
Args:
-
Returns:
- iter_max = max number of pso iterations
- path_to_pso_data = path to where pso data will now be stored
- x = particle positions
- pbest = local particle position values
- x_value = fitness of particle positions
- pbest_value = local particle position best fitnesses
- min_cost_index = particle index of fittest particle
- gbest = global best particle
- gbest_cost = global best particle fitness
"""
# init params
iter_max = 0
path_to_pso_data = ''
curr_iter = 0
x = np.zeros(self.n)
x_value = np.zeros(self.n)
min_cost_index = 0
gbest = np.zeros(len(self.K))
# get where saved data is
self.path_to_pso_data = input('Enter folder path of data: ') + self.slash
while os.path.exists(self.path_to_pso_data) == False:
print('Entered path not found. Enter a valid path to your saved data')
path_to_pso_data = input('Enter folder path of data: ') + self.slash
curr_iter_array = pd.read_csv(path_to_pso_data + "curr_iter.csv",
header=None).values
curr_iter = curr_iter_array[0]
curr_iter = curr_iter[0]
# update
print('How many more generations would you like to do?')
extend_iterations_by = int(input('Number of extra generations (integer): '))
while type(extend_iterations_by) != int:
extend_iterations_by = int(input('Number of extra generations (integer): '))
iter_max = curr_iter + extend_iterations_by
pso_dir_name = "n_" + str(self.n) + \
"_mxvl_" + str(self.max_val) + \
"_mnvl_" + str(self.min_val) + \
"_ivf_" + str(self.init_v_f) + \
"_mvf_" + str(self.max_v_f) + \
"_irmx_" + str(self.iter_max) + \
"_rm_" + str(self.rep_max) + \
"_cstf_" + str(self.cost_f) + \
'_sif_' + str(self.st_importance_factor) + \
"_wi_" + str(self.w_init) + \
"_wf_" + str(self.w_final) + \
"_c1_" + str(self.c1) + \
"_c2_" + str(self.c2) + \
'_aptacl_' + str(self.adapt_accel) + \
'_offsf_' + str(self.off_suppress_f) + \
'_onsf_' + str(self.on_suppress_f) + \
'_embd_' + str(self.embed_init_signal) + self.slash
path_to_pso_data = self.path_to_data + pso_dir_name
# load positions
x_df = pd.read_csv(path_to_pso_data + "curr_pos.csv",
header=None)
x_array = x_df.values
for i in range(0, self.num_points):
x[i] = float(x_array[i])
pbest = np.copy(x)
# load position vals
x_value_df = pd.read_csv(path_to_pso_data + "curr_pos_values.csv",
header=None)
x_value_array = x_value_df.values
for i in range(0, self.n):
x_value[i] = float(x_value_array[i])
pbest_value = np.copy(self.x_value)
min_cost_index = np.argmin(pbest_value)
for g in range(0, len(self.K)):
gbest[g] = pbest[min_cost_index, g]
gbest_cost = pbest_value[min_cost_index]
data = (iter_max,
path_to_pso_data,
x,
pbest,
x_value,
pbest_value,
min_cost_index,
gbest,
gbest_cost,
curr_iter)
return data
def __find_x_init(self, tf):
"""
This method calculates the state-vector from a long -1 drive signal.
Must call before sending / receiving signals to / from transfer function
model
Args:
- tf = transfer function
Returns:
- X0 = system's state-vector result for steady state
"""
U = np.array([-1.0] * 480)
T = np.linspace(0, 40e-9, 480)
(_, _, xout) = signal.lsim2(tf, U=U, T=T, X0=None, atol=1e-13)
X0 = xout[-1]
return X0
def __getTransferFunctionOutput(self, tf, U, T, X0, atol=1e-12):
"""
This method sends a drive signal to a transfer function model and gets
the output
Args:
- tf = transfer function
- U = signal to drive transfer function with
- T = array of time values
- X0 = initial value
- atol = scipy ode func parameter
Returns:
- PV = resultant output signal of transfer function
"""
(_, PV, _) = signal.lsim2(tf, U, T, X0=X0, atol=atol)
# ensure lower point of signal >=0 (can occur for sims), otherwise
# will destroy st, os and rt analysis
min_PV = np.copy(min(PV))
if min_PV < 0:
for i in range(0, len(PV)):
PV[i] = PV[i] + abs(min_PV)
return PV
def __getSoaOutput(self, OP, channel=1):
"""
This method sends a drive signal to the SOA and gets an soa output
Args:
- OP = signal to send to AWG which will be used to drive SOA
- channel = channel osc reads soa output on
Returns:
- PV = soa output
"""
if type(OP[1]) == np.ndarray:
# must convert to float as osc only takes list of floats
flattened_OP = [val for sublist in OP for val in sublist]
else:
# no need to convert
flattened_OP = OP
self.awg.send_waveform(flattened_OP, suppress_messages=True)
time.sleep(3)
PV = self.osc.measurement(channel=channel)
return PV
def __getSectionToOptimise(self):
"""
This method sets the section of the signal that we want to optimise
Args:
-
Returns:
- K_index = column vector of particle indices
- K = column vector of particle values
"""
if self.sect_to_optimise == 'whole_signal':
start_opt_index = 0
end_opt_index = int(self.num_points)
# create index and value arrays of points (particles K) to optimise
K_index = np.asarray(list(range(start_opt_index, end_opt_index+1)))
K = np.asarray([self.init_OP[start_opt_index:end_opt_index]])
# transpose to column vectors
K_index = K_index.transpose()
K = K.transpose()
return K_index, K
def __discretiseParticlePosition(self, part_pos):
"""
This method discretises a drive signal/particle positions so that they
can be read by awg. This also reduces the size of the pso algorithm
search space of particle positions, which will speed up convergence
Args:
- part_pos = drive signal / particle position
Returns:
- discretised particle position
"""
for g in range(0, len(part_pos)):
if part_pos[g] % self.awg_step_size != 0:
# value not allowed therefore must discretise
part_pos[g] = int(part_pos[g] / self.awg_step_size) \
* self.awg_step_size
return part_pos
def __suppressAreasOfSignal(self, part_pos):
"""
This method suppresses (or doesn't) various areas of the drive signal
from being able to take certain values depending on what user has specified
Args:
- part_pos = drive signal / particle position
Returns:
- suppressed (or not suppressed) particle position
"""
if self.areas_to_suppress == 'None':
# no suppression
pass
elif self.areas_to_suppress == 'start_centre':
# get params
start_drive = self.init_OP[0]
end_drive = self.init_OP[-1]
max_drive = np.amax(self.init_OP)
for i in range(0, len(self.init_OP)):
if self.init_OP[i] > start_drive:
# get particle index signal turns on
K_index_signal_on = int(i - 0.5*(len(self.init_OP) - len(self.K)))
break
for i in range(int(len(self.init_OP)-1), K_index_signal_on, -1):
if self.init_OP[i] > end_drive:
# get particle index signal turns off
K_index_signal_off = int(i - 0.5*(len(self.init_OP) - len(self.K)))
# suppress start and centre
for index in range(0, K_index_signal_on):
if part_pos[index] > start_drive + abs(self.off_suppress_f*start_drive):
# suppress start of signal
part_pos[index] = start_drive + abs(self.off_suppress_f*start_drive)
for index in range(K_index_signal_on, len(self.K)):
if part_pos[index] < max_drive - abs(self.on_suppress_f*max_drive):
# suppress centre to end of signal
part_pos[index] = max_drive - abs(self.on_suppress_f*max_drive)
elif self.areas_to_suppress == 'start_centre_end':
# get params
start_drive = self.init_OP[0]
max_drive = np.amax(self.init_OP)
for i in range(0, len(self.init_OP)):
if self.init_OP[i] > start_drive:
K_index_signal_on = int(i - 0.5*(len(self.init_OP) - len(self.K)))
break
# suppress start, centre and end
for index in range(0, K_index_signal_on):
if part_pos[index] > start_drive + abs(self.off_suppress_f*start_drive):
# suppress start of signal
part_pos[index] = start_drive + abs(self.off_suppress_f*start_drive)
for index in range(K_index_signal_off, int(len(self.K))):
if part_pos[index] > end_drive + abs(self.off_suppress_f*start_drive):
# suppress end of signal
part_pos[index] = end_drive + abs(self.off_suppress_f*start_drive)
for index in range(K_index_signal_on, K_index_signal_off):
if part_pos[index] < max_drive - abs(self.on_suppress_f*max_drive):
# suppress centre of signal
part_pos[index] = max_drive - abs(self.on_suppress_f*max_drive)
elif self.areas_to_suppress == 'pisic_shape':
# suppress start and most of centre except for leading edge of
# drive signal, as this is an optional pisic area for PSO to fill
pisic_length_factor = 0.1
# get params
start_drive = self.init_OP[0]
end_drive = self.init_OP[-1]
max_drive = np.amax(self.init_OP)
for i in range(0, len(self.init_OP)):
if self.init_OP[i] > start_drive:
K_index_signal_on = int(i - 0.5*(len(self.init_OP) - len(self.K)))
break
for i in range(int(len(self.init_OP)-1), K_index_signal_on, -1):
if self.init_OP[i] > end_drive:
K_index_signal_off = int(i - 0.5*(len(self.init_OP) - len(self.K)))
# suppress start and centre
for index in range(0, K_index_signal_on):
if part_pos[index] > start_drive + abs(self.off_suppress_f*start_drive):
# suppress start of signal
part_pos[index] = start_drive + abs(self.off_suppress_f*start_drive)
for index in range(K_index_signal_on + int((len(self.K)*pisic_length_factor)),
len(self.K)):
if part_pos[index] > self.init_OP[-1]:
# suppress centre to below the step/initial input (except
# pisic area, whose length is defined by pisic_length_factor)
part_pos[index] = self.init_OP[-1]
return part_pos
def __evaluateParticlePositions(self, particles, curr_iter=None, plot=False):
"""
This method evaluates the positions of each particle in an array
Args:
- particles = array of particles to evaluate
- plot = set whether want to plot (and save) the resultant PV and OP when
each particle is used to drive SOA
- curr_iter = current iteration pso is on (only needed if plot == True)
Returns:
- x_value = array of fitness values for each particle position
"""
if self.record_extra_info == True:
curr_outputs = np.zeros((self.n, self.m))
if plot == True and curr_iter == None:
sys.exit('method requires arg curr_iter if want to plot')
if plot == True:
plt.figure(1)
plt.figure(2)
x_value = np.zeros(self.n) # int current particle fitnesses/costs storage
for j in range(0, self.n):
particle = particles[j, :]
OP = np.copy(self.init_OP)
particleIndex = 0
for signalIndex in range(self.K_index[0], self.K_index[-1]):
OP[signalIndex] = particle[particleIndex]
particleIndex += 1
if self.sim_model != None:
PV = self.__getTransferFunctionOutput(self.sim_model,
OP,
self.t,
self.X0)
else:
PV = self.__getSoaOutput(OP)
x_value[j] = signalprocessing.cost(self.t,
PV,
cost_function_label=self.cost_f,
st_importance_factor=self.st_importance_factor,
SP=self.SP).costEval
if self.record_extra_info == True:
# store particle output
curr_outputs[j, :] = PV
if plot == True:
plt.figure(1)
plt.plot(self.t, PV, c='b')
plt.figure(2)
plt.plot(self.t, OP, c='r')
if plot == True:
# get best fitness for analysis
min_cost_index = np.argmin(x_value)
if self.sim_model != None:
best_PV = self.__getTransferFunctionOutput(self.sim_model,
particles[min_cost_index,:],
self.t,
self.X0)
else:
best_PV = self.__getSoaOutput(particles[min_cost_index, :])
if plot == True:
# finalise and save plot
plt.figure(1)
plt.plot(self.t, self.SP, c='g', label='Target SP')
plt.plot(self.t, self.init_PV, c='r', label='Initial Output')
plt.plot(self.t, best_PV, c='c', label='Best fitness')
st_index = analyse.ResponseMeasurements(best_PV, self.t).settlingTimeIndex
plt.plot(self.t[st_index],
best_PV[st_index],
marker='x',
markersize=6,
color="red",
label='Settling Point')
plt.legend(loc='lower right')
plt.title('PSO-Optimised Output Signals After ' + str(curr_iter) + \
' Generations')
plt.xlabel('Time')
plt.ylabel('Amplitude')
plt.savefig(self.path_to_pso_data + str(curr_iter) + '_gen_outputs.png')
plt.close()
plt.figure(2)
plt.plot(self.t,
particles[min_cost_index, :],
c='c',
label='Best fitness')
plt.legend(loc='lower right')
plt.title('PSO-Optimised Input Signals After ' + str(curr_iter) + \
' Generations')
plt.xlabel('Time')
plt.ylabel('Voltage')
plt.savefig(self.path_to_pso_data + str(curr_iter) + '_gen_inputs.png')
plt.close()
if self.record_extra_info == True:
# save current particle outputs
curr_outputs_df = pd.DataFrame(curr_outputs)
curr_outputs_df.to_csv(self.path_to_pso_data + 'curr_outputs_' + \
str(curr_iter) + '.csv',
index=None,
header=False)
return x_value
def __runPsoAlgorithm(self):
"""
This method runs the pso algorithm
Args:
-
Returns:
-
"""
# INITIALISE COPIES OF PARAMETERS WE'RE CHANGING
# v = np.copy(self.v)
# x = np.copy(self.x)
# x_value = np.copy(self.x_value)
# pbest = np.copy(self.pbest)
# pbest_value = np.copy(self.pbest_value)
# gbest = np.copy(self.gbest)
# curr_iter = np.copy(self.curr_iter)
# gbest_cost = np.copy(self.gbest_cost)
# gbest_cost_history = np.copy(self.gbest_cost_history)
# iter_gbest_reached = np.copy(self.iter_gbest_reached)
print('################### RUNNING PSO ALGORITHM #####################')
v = np.copy(self.v)
x = np.copy(self.x)
x_value = np.copy(self.x_value)
pbest = np.copy(self.pbest)
pbest_value = np.copy(self.pbest_value)
gbest = np.copy(self.gbest)
curr_iter = self.curr_iter+1
gbest_cost = np.copy(self.gbest_cost)
gbest_cost_history = np.copy(self.gbest_cost_history)
rt_st_os_analysis = np.copy(self.rt_st_os_analysis)
iter_gbest_reached = np.copy(self.iter_gbest_reached)
meta_path_to_pso_data = self.path_to_pso_data
# # run thru pso multiple times
curr_rep = 1
while curr_rep <= self.rep_max:
self.path_to_pso_data = self.path_to_pso_data + 'rep' + \
str(curr_rep) + self.slash
os.mkdir(self.path_to_pso_data)
for g in range(0, self.m):
if self.embed_init_signal == True:
x[0, g] = gbest[g] # embed signal guess
w = np.ones(self.n) * self.w_init
c1 = np.ones(self.n) * self.c1
c2 = np.ones(self.n) * self.c2
if self.adapt_accel == True:
rel_improv = np.zeros(self.n)
c1_max = 2.5
c2_max = 2.5
c1_min = 0.1
c2_min = 0.1
pc_marker = int(0.05*self.iter_max) # for plotting/saving
if pc_marker == 0:
pc_marker = 1
while curr_iter <= self.iter_max:
if self.adapt_accel == True:
for j in range(0, self.n):
# update particle vals
rel_improv[j] = (pbest_value[j] - x_value[j]) \
/ (pbest_value[j] + x_value[j])
w[j] = self.w_init + ( (self.w_final - self.w_init) * \
((math.exp(rel_improv[j]) - 1) / (math.exp(rel_improv[j]) + 1)) )
c1[j] = ((c1_min + c1_max)/2) + ((c1_max - c1_min)/2) + \
(math.exp(-rel_improv[j]) - 1) / (math.exp(-rel_improv[j]) + 1)
c2[j] = ((c2_min + c2_max)/2) + ((c2_max - c2_min)/2) + \
(math.exp(-rel_improv[j]) - 1) / (math.exp(-rel_improv[j]) + 1)
# update particle velocities
for j in range(0, self.n):
for g in range(0, self.m):
v[j, g] = (w[j] * v[j, g]) + (c1[j] * random.uniform(0, 1) \
* (pbest[j, g] - x[j, g]) + (c2[j] * \
random.uniform(0, 1) * (gbest[g] - x[j,g])))
# handle velocity boundary violations
for j in range(0, self.n):
for g in range(0, self.m):
if v[j, g] > self.v_UB[g]:
v[j, g] = self.v_UB[g]
if v[j, g] < self.v_LB[g]:
v[j, g] = self.v_LB[g]
# update particle positions
for j in range(0, self.n):
x[j, :] = x[j, :] + v[j, :]
# handle position boundary violations
for j in range(0, self.n):
for g in range(0, self.m):
if x[j, g] < self.LB[g]:
x[j, g] = self.LB[g]
elif x[j, g] > self.UB[g]:
x[j, g] = self.UB[g]
# descretise
for j in range(0, self.n):
x[j, :] = self.__discretiseParticlePosition(x[j, :])
# suppress
for j in range(0, self.n):
x[j, :] = self.__suppressAreasOfSignal(x[j, :])
# eval particle positions
if curr_iter % pc_marker == 0 or curr_iter == self.iter_max:
# plot/save
x_value = self.__evaluateParticlePositions(x,
curr_iter=curr_iter,
plot=True)
else:
x_value = self.__evaluateParticlePositions(x,
curr_iter=curr_iter,
plot=False)
# update local best particle positions & fitness vals
for j in range(0, self.n):
if x_value[j] < pbest_value[j]:
pbest_value[j] = x_value[j]
for g in range(0, self.m):
pbest[j, g] = x[j, g]
# update global best particle positions & history
min_cost_index = np.argmin(pbest_value)
if pbest_value[min_cost_index] < gbest_cost_history[-1]:
for g in range(0, self.m):
gbest[g] = pbest[min_cost_index, g]
rt_st_os_analysis = np.vstack((rt_st_os_analysis,
self.__analyseSignal(gbest,
curr_iter)))
gbest_cost = pbest_value[min_cost_index]
gbest_cost_history = np.append([gbest_cost_history], [gbest_cost])
iter_gbest_reached = np.append([iter_gbest_reached], [curr_iter])
cost_reduction = ((gbest_cost_history[0] - gbest_cost) \
/ gbest_cost_history[0])*100
print('Reduced cost by ' + str(cost_reduction) + '% so far')
self.__savePsoData(x,
x_value,
curr_iter,
pbest,
gbest_cost_history,
iter_gbest_reached,
rt_st_os_analysis)
print('Num iterations completed: ' + str(curr_iter) + ' / ' + str(self.iter_max))
curr_iter += 1
# ensure cost and analysis table has final gbest val
gbest_cost_history = np.append([gbest_cost_history], [gbest_cost])
self.rt_st_os_analysis = np.vstack((rt_st_os_analysis,
self.__analyseSignal(gbest,
curr_iter)))
iter_gbest_reached = np.append([iter_gbest_reached],
[self.iter_max])
self.__getPsoPerformance(gbest,
iter_gbest_reached,
gbest_cost_history,
self.rt_st_os_analysis)
print('Num PSO repetitions completed: ' + str(curr_rep) + ' / ' + str(self.rep_max))
# reset for next pso run
v = np.copy(self.v)
x = np.copy(self.x)
x_value = np.copy(self.x_value)
pbest = np.copy(self.pbest)
pbest_value = np.copy(self.pbest_value)
gbest_cost_history = []
gbest_cost_history = np.append([gbest_cost_history], [gbest_cost])
rt_st_os_analysis = [self.__analyseSignal(gbest, curr_iter)]
iter_gbest_reached = np.copy(self.iter_gbest_reached)
self.path_to_pso_data = meta_path_to_pso_data
curr_iter = 1
curr_rep += 1
print('################### FINISHED PSO ALGORITHM ##################')
def __getPsoPerformance(self,
gbest,
iter_gbest_reached,
gbest_cost_history,
rt_st_os_analysis):
"""
This method analyses, plots and saves the pso algorithm's performance
Args:
- gbest = best global particle position
- iter_gbest_reached = array of iterations at which a new gbest was found
- gbest_cost_history = array of the history of gbest values that were found
Returns:
-
"""
self.gbest = [i[0] for i in gbest]
# get best particle position output signal
if self.sim_model != None:
self.gbest_PV = self.__getTransferFunctionOutput(self.sim_model,
self.gbest,
self.t,
self.X0)
else:
self.gbest_PV = self.__getSoaOutput(self.gbest)
# plot final output signal
plt.figure()
plt.plot(self.t, self.SP, c='g', label='Target SP')
plt.plot(self.t, self.init_PV, c='r', label='Initial Output')
plt.plot(self.t, self.gbest_PV, c='c', label='PSO-Optimised Output')
st_index = int(rt_st_os_analysis[len(rt_st_os_analysis)-1, 3])
plt.plot(self.t[st_index],
self.gbest_PV[st_index],
marker='x',
markersize=6,
color="red",
label='Settling Point')
plt.legend(loc='lower right')
plt.title('Final PSO-Optimised Output Signal')
plt.xlabel('Time')
plt.ylabel('Amplitude')
plt.savefig(self.path_to_pso_data + 'final_output.png')
plt.close()
# plot final driving signal
plt.figure()
plt.plot(self.t, self.init_OP, c='r', label='Initial Input')
plt.plot(self.t, self.gbest, c='c', label='PSO-Optimised Input')
plt.legend(loc='lower right')
plt.title('Final PSO-Optimised Input Signal')
plt.xlabel('Time')
plt.ylabel('Voltage')
plt.savefig(self.path_to_pso_data + 'final_input.png')
plt.close()
# plot learning curve
plt.figure()
plt.plot(iter_gbest_reached, gbest_cost_history)
plt.title('PSO Algorithm MSE Learning Curve')
plt.xlabel('No. Iterations')
plt.ylabel('Cost ' + str(self.cost_f))
plt.savefig(self.path_to_pso_data + 'final_learning_curve.png')
plt.close()
# plot how rt, st and os varied with iterations
max_rt, max_st, max_os = np.amax(rt_st_os_analysis[:,0]),\
np.amax(rt_st_os_analysis[:,1]),\
np.amax(rt_st_os_analysis[:,2])
plt.figure()
plt.plot(iter_gbest_reached,
rt_st_os_analysis[:,0]/max_rt,
label='Rise Time')
plt.plot(iter_gbest_reached,
rt_st_os_analysis[:,1]/max_st,
label='Settling Time')
plt.plot(iter_gbest_reached,
rt_st_os_analysis[:,2]/max_os,
label='Overshoot')
plt.title('PSO Algorithm RT-ST-OS Learning Curve')
plt.legend(loc='upper right')
plt.xlabel('No. Iterations')
plt.ylabel('Normalised RT/ST/OS')
plt.savefig(self.path_to_pso_data + 'rtstos_learning_curve.png')
plt.close()
# save key data
t_df = pd.DataFrame(self.t)
init_OP_df = pd.DataFrame(self.init_OP) #
OP_df = pd.DataFrame(self.gbest)
SP_df = pd.DataFrame(self.SP)
init_PV_df = pd.DataFrame(self.init_PV)
PV_df = pd.DataFrame(self.gbest_PV)
iter_gbest_reached_df = pd.DataFrame(iter_gbest_reached)
gbest_cost_history_df = pd.DataFrame(gbest_cost_history)
rt_st_os_analysis_df = pd.DataFrame(rt_st_os_analysis)
t_df.to_csv(self.path_to_pso_data + "time.csv",
index = None,
header=False)
init_OP_df.to_csv(self.path_to_pso_data + "initial_OP.csv",
index = None,
header=False)
OP_df.to_csv(self.path_to_pso_data + "optimised_OP.csv",
index = None,
header=False)
SP_df.to_csv(self.path_to_pso_data + "SP.csv",
index = None,
header=False)
init_PV_df.to_csv(self.path_to_pso_data + "initial_PV.csv",
index = None,
header=False)
PV_df.to_csv(self.path_to_pso_data + "optimised_PV.csv",
index = None,
header=False)
iter_gbest_reached_df.to_csv(self.path_to_pso_data + "iter_gbest_reached.csv",
index = None,
header=False)
gbest_cost_history_df.to_csv(self.path_to_pso_data + "gbest_cost_history.csv",
index = None,
header=False)
rt_st_os_analysis_df.to_csv(self.path_to_pso_data + "rt_st_os_analysis.csv",
index = None,
header=False)
[docs]def run_test(directory_for_run,
tf_for_run,
t,
init_OP,
n,
iter_max,
rep_max,
init_v_f,
max_v_f,
w_init,
w_final,
adapt_accel,
areas_to_suppress,
on_suppress_f,
embed_init_signal,
path_to_embedded_signal,
cost_f,
st_importance_factor,
record_extra_info,
linux,
sp,
pso_objs):
'''
This function defines the test we want to run for each job in a list of
python multiprocessing jobs. It is not essential to use this function
to run a loop of different PSO runs, but this will execute the PSO runs in
parallel and therefore significantly speed up your experiments.
'''
psoObject = PSO(t,
init_OP,
n,
iter_max,
rep_max,
init_v_f,
max_v_f,
w_init=w_init,
w_final=w_final,
adapt_accel=True,
areas_to_suppress='pisic_shape',
on_suppress_f=on_suppress_f,
embed_init_signal=True,
path_to_embedded_signal=None,
directory=directory_for_run,
cost_f=cost_f,
st_importance_factor=st_importance_factor,
sim_model=tf_for_run,
record_extra_info=True,
SP=sp)
pso_objs.append(psoObject)
if __name__ == '__main__':
'''
Below is an example implementation of the above PSO code. Note that there
are 2 main modes of using this PSO implementation:
1) Simulation (using a transfer function that simulates SOAs)
2) Experimental
To use the experimental setup, you will need all the same equipment, modules,
specific GPIB addresses etc. that were used in the Connet lab in UCL's
EEE Robert's building (contact the Optical Networks Group for more info).
Users outside of ONG will need to write code to interface with their
own equipment.
To use the simulation (i.e. the transfer function), the user should not need
to write any code themselves. Simply changing the below 'directory' variable
to point this programme to where to store data should be sufficient.
The below code runs a PSO simulation, where PSO is optimising 10 different
SOA transfer functions in parallel. Users can play around with the PSO
hyperparameters to control PSO performance, convergence properties,
run time etc., and can also distort the transfer function by adjusting
the distortion coefficients or even implement their own transfer functions
to simulate their custom SOAs. By optimising different transfer functions,
users will be able to see how well PSO is generalising to different SOAs.
While this code is not the 'cleanest', we have tried to insert clear comments
so that a user wishing to delve deeper into the use of this PSO implementation
(beyond running simple transfer function simulations) can follow the logic
and implement the same functionality in their own programmes.
As a general rule-of-thumb, increasing n (the number of particles) is a
reliable way to improve PSO performance and find more optimal solutions.
'''
import soa
# set dir to save data
directory = os.path.dirname(soa.__file__)+'/../data/'
# specify whether running simulation or experiment
sim = True
# specify if using linux (or mac) (for backslash or forward slash dirs)
linux = True
num_points = 240
time_start = 0.0
time_stop = 20e-9
t = np.linspace(time_start,time_stop,num_points)
init_OP = np.zeros(num_points)
# config pso params
n = 3
iter_max = 150
rep_max = 1
max_v_f = 0.05
init_v_f = max_v_f
cost_f = 'mSE'
st_importance_factor = None
w_init = 0.9
w_final = 0.5
on_suppress_f = 2.0
if sim == True:
# define transfer function(s)
# N.B. init_OP low must be -1 for tf
init_OP[:int(0.25*num_points)],init_OP[int(0.25*num_points):] = -1, 0.5
num = [2.01199757841099e85]
den = [
1.64898505756825e0,
4.56217233166632e10,
3.04864287973918e21,
4.76302109455371e31,
1.70110870487715e42,
1.36694076792557e52,
2.81558045148153e62,
9.16930673102975e71,
1.68628748250276e81,
2.40236028415562e90,
]
tf = signal.TransferFunction(num, den)
# set up simulation(s) you want to run
init_PV = distort_tf.getTransferFunctionOutput(tf,init_OP,t)
sp = analyse.ResponseMeasurements(init_PV, t).sp.sp
tfs, _ = distort_tf.gen_tfs(num_facs=[1.0,1.2,1.4],
a0_facs=[0.8],
a1_facs=[0.7,0.8,1.2],
a2_facs=[1.05,1.1,1.2],
all_combos=False)
pso_objs = multiprocessing.Manager().list()
jobs = []
test_nums = [test+1 for test in range(len(tfs))]
direcs = [directory + '/test_{}'.format(test_num) for test_num in test_nums]
for tf, direc in zip(tfs, direcs):
if os.path.exists(direc) == False:
os.mkdir(direc)
p = multiprocessing.Process(target=run_test,
args=(direc,
tf,
t,
init_OP,
n,
iter_max,
rep_max,
init_v_f,
max_v_f,
w_init,
w_final,
True,
'pisic_shape',
on_suppress_f,
True,
None,
cost_f,
st_importance_factor,
True,
linux,
sp,
pso_objs,))
jobs.append(p)
p.start()
for job in jobs:
job.join()
# plot composite graph
pso_objs = list(pso_objs)
plt.figure()
plt.plot(t, sp, color='green')
for pso_obj in pso_objs:
plt.plot(t, pso_obj.gbest_PV)
plt.show()
# pickle data
PIK = directory + '/pickle.dat'
data = pso_objs
with open(PIK, 'wb') as f:
pickle.dump(data, f)
else:
# set up experiment(s) you want to run
directory = r"C:\Users\onglab\Desktop\SOA_project\Chris\pso_no_fall_test_09012020"
awg = devices.TektronixAWG7122B("GPIB1::1::INSTR")
osc = devices.Agilent86100C("GPIB1::7::INSTR")
osc.set_acquire(average=True, count=30, points=num_points)
osc.set_timebase(position=4.2e-8, range_=time_stop-time_start)
init_OP[:60], init_OP[60:] = -0.5, 0.5
psoObject = pso(t,
init_OP,
n,
iter_max,
rep_max,
init_v_f,
max_v_f,
w_init=w_init,
w_final=w_final,
adapt_accel=True,
areas_to_suppress='start_centre',
on_suppress_f=on_suppress_f,
embed_init_signal=True,
path_to_embedded_signal=None,
directory=directory,
cost_f=cost_f,
st_importance_factor=st_importance_factor,
awg=awg,
osc=osc)