# Copyright (c) 2017 - 2020, Matthias Frey, Paul Scherrer Institut, Villigen PSI, Switzerland
# All rights reserved
#
# Implemented as part of the PhD thesis
# "Precise Simulations of Multibunches in High Intensity Cyclotrons"
#
# This file is part of pyOPALTools.
#
# pyOPALTools is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# You should have received a copy of the GNU General Public License
# along with pyOPALTools. If not, see <https://www.gnu.org/licenses/>.
import json
import os
import numpy as np
import re
from .BaseParser import BaseParser
[docs]class OptimizerParser(BaseParser):
"""Parsing of JSON files generated by the OPAL optimizer
Attributes
----------
__ids : list
List of all individual IDs of a generation
__dvar_values : numpy.matrix
List of all design variables values for each ID
(order corresponds to __ids)
__obj_values : numpy.matrix
List of all objective values for each ID
(order corresponds to __ids)
__obj_names : list
Specifies columns in self.__obj_values
__dvar_names : list
Specifies columns in self.__dvar_values
__pareto : str
Notes
-----
Supports following JSON files:
.. highlight:: JSON
.. code-block:: JSON
{
"name": "opt-pilot",
"dvar-bounds": {
"benergy": [ 0.071, 0.072 ],
"phiinit": [ 106, 114 ],
"prinit": [ -0.02, -0.01 ],
"rinit": [ 2000, 2060 ]
}
,
"constraints": [
"dpeak1 > 0.0",
"dpeak2 > 0.0"
]
,
"solutions": [
{
"ID": 0,
"obj":
{
"dpeak1": 10.37,
"dpeak2": 5.43,
"dpeak3_5": 14.1095
}
,
"dvar":
{
"benergy": 0.0718672,
"phiinit": 113.653,
"prinit": -0.0191565,
"rinit": 2018.26
}
}
,
{
"ID": 18,
"obj":
{
"dpeak1": 5.95,
"dpeak2": 4.75,
"dpeak3_5": 17.6668
}
,
"dvar":
{
"benergy": 0.0715704,
"phiinit": 110.517,
"prinit": -0.0122177,
"rinit": 2035.32
}
}
,
{
"ID": 2,
"obj":
{
"dpeak1": 1.84,
"dpeak2": 6.04,
"dpeak3_5": 19.2461
}
,
"dvar":
{
"benergy": 0.0711942,
"phiinit": 107.097,
"prinit": -0.0198867,
"rinit": 2034.75
}
}
]
}
.. code-block:: JSON
{
"name": "opt-pilot",
"OPAL version": "2.0.0",
"git revision": "1849b7e5130657e8be50d524de0f6c50134f330a",
"dvar-bounds": {
"MX": "[ 16, 32 ]",
"nstep": "[ 10, 40 ]"
},
"constraints": [
"statVariableAt('rms_x',0.0) > 0",
"statVariableAt('rms_x',0.0) > 0.001"
],
"population": {
"0": {
"obj": {
"dpeak1": 10.37,
"dpeak2": 5.43,
"dpeak3_5": 14.1095
},
"dvar": {
"benergy": 0.0718672,
"phiinit": 113.653,
"prinit": -0.0191565,
"rinit": 2018.26
}
},
"18": {
"obj": {
"dpeak1": 5.95,
"dpeak2": 4.75,
"dpeak3_5": 17.6668
},
"dvar": {
"benergy": 0.0715704,
"phiinit": 110.517,
"prinit": -0.0122177,
"rinit": 2035.32
}
},
"2": {
"obj": {
"dpeak1": 1.84,
"dpeak2": 6.04,
"dpeak3_5": 19.2461
},
"dvar": {
"benergy": 0.0711942,
"phiinit": 107.097,
"prinit": -0.0198867,
"rinit": 2034.75
}
}
}
}
.. highlight:: python3
Examples
--------
>>> import OptimizerParser as optreader
>>> import sys
>>> try:
1. Find all .json files of a directory, e.g. "./"
>>> optjson = optreader.OptimizerParser()
>>> optjson.parse("./")
2. Read in a generation file, e.g. 1
>>> optjson.readGeneration(1)
3. Get all design variables
>>> dvars = optjson.getDesignVariables()
>>> print ( "Design variables: ", dvars )
4. Get design variable bounds
>>> bounds = optjson.getBounds()
>>> print ( "Bounds: ", bounds )
4a. Get only bounds of a specific design variable
>>> bound = optjson.getBounds(dvars[2])
>>> lower = bound[0]
>>> upper = bound[1]
>>> print ( "dvar: " + dvars[1] + "
>>> " lower bound: " + str(lower) + "
>>> " upper bound: " + str(upper))
5. Get all objectives
>>> objs = optjson.getObjectives()
>>> print ( "Objectives: ", objs )
6. Get all constraints
>>> constr = optjson.getConstraints()
>>> print ("Constraints: " + constr[0] + " " + constr[1])
7. Get an individual
>>> print ( optjson.getIndividual(0) )
8. Get an individual with specific ID
>>> print ( optjson.getIndividualWithID(18) )
raise error
>>> except:
>>> print ( sys.exc_info()[1] )
"""
# @param directory to json files
[docs] def __init__(self):
"""Constructor.
"""
self.clear()
@property
def num_optimizers(self):
return self.__nOptimizers
[docs] def getNumOfGenerations(self):
return int(self.__nJsonFiles / self.__nOptimizers)
[docs] def readGeneration(self, gen, opt=0, pareto=False):
"""Parse a generation file
Parameters
----------
gen : int
The generation number
opt : int, optional
The optimizer number (default: 0)
pareto : str, optional
Read a pareto file (default: False)
Notes
-----
Raises a runtime error if generation file doesn't exist
"""
if pareto:
filename = os.path.join(self.__directory,
self.__pareto + self.__basename + str(opt) + '.json')
self.__loaded_generation = -1
self.__loaded_optimizer = -1
self.__loaded_pareto_front = opt
else:
if not self.__loaded_generation == gen or \
not self.__loaded_optimizer == opt:
self.__loaded_generation = gen
self.__loaded_optimizer == opt
self.__loaded_pareto_front = -1
else:
return
filename = os.path.join(self.__directory,
str(gen) + self.__basename + str(opt) + '.json')
if not os.path.isfile(filename):
raise IOError("File '" + filename + "' does not exist.")
# clear old data
self.__clear_container()
self.__actual_parse(filename)
[docs] def getIndividual(self, ind):
"""Get individual in a list of [dvars, objs, ID]
Parameters
----------
ind : int
The individual number (not the ID)
Returns
-------
list
A list containing [design variables, objectives, ID]
Notes
-----
Raises an index error if individual number is out of bounds
"""
self.__hasLoaded()
if ind > len(self.__ids) - 1 or ind < 0:
raise IndexError("Individual number is out of bounds.")
data = list(self.__dvar_values[ind, :]) + list(self.__obj_values[ind, :])
data.append(self.__ids[ind])
return data
[docs] def getIndividualWithID(self, ID):
"""Get invdividual with ID
Parameters
----------
ID : int
ID for which we want the data
Returns
-------
list
A list containing [design variables, objectives, ID]
Notes
-----
Raises a runtime error if there's no individual with given ID
"""
idx = self.getIndexOfID(ID)
data = list(self.__dvar_values[idx, :]) + list(self.__obj_values[idx, :])
data.append(self.__ids[idx])
return data
[docs] def getIndexOfID(self, ID):
"""Get Index of ID
Returns
-------
int
The index of the individual with certain ID
"""
self.__hasLoaded()
if ID not in self.__ids:
raise RuntimeError("An individual with ID " + str(ID) + " is not present.")
return self.__ids.index(ID)
[docs] def getDesignVariables(self):
"""Obtain names of all design variables
Returns
-------
list
A list containing all design variable names
"""
self.__hasLoaded()
return self.__dvar_names
[docs] def getObjectives(self):
""" Obtain names of all objectives
Returns
-------
list
A list containing all design variable names
"""
self.__hasLoaded()
return self.__obj_names
[docs] def getBounds(self, dvar = ''):
"""Obtain the design variable bounds
Parameters
----------
dvar : str, optional
Design variable or ''
Returns
-------
dict or array
If `dvar` == '' it returns the whole dictionary of
design variables and their bounds otherwise
it returns the bounds of the given design variable
Notes
-----
Raises an exception if design variable unknown
"""
self.__hasLoaded()
if dvar == '':
return self.__dvarBounds
elif dvar not in self.__dvarBounds:
raise RuntimeError("Unknown design variable '" + dvar + "'.")
else:
return self.__dvarBounds[dvar]
[docs] def getConstraints(self):
"""Obtain the constraints of the simulation. They are
formulas written as strings
Returns
-------
list [str]
A list of strings containing the constraints
"""
self.__hasLoaded()
return self.__constraints
[docs] def getAllOutput(self):
""" Obtain all objective output.
Returns
-------
numpy.ndarray
A ndarray where each column corresponds to the values
of an objective
"""
self.__hasLoaded()
return self.__obj_values
[docs] def getIDs(self):
""" Obtain all individual IDs.
Returns
-------
list [int]
A list of IDs
"""
self.__hasLoaded()
return self.__ids
def __parse_version_2_1_0(self, data):
"""Read in data
Parameters
----------
data : str
A generation file
"""
self.__dvarBounds = data["dvar-bounds"]
self.__constraints = data["constraints"]
population = data["population"]
# get first individual id
first_ind = next(iter(population))
nDvars = len(population[first_ind]['dvar'])
nObjs = len(population[first_ind]['obj'])
nInds = len(population)
self.__dvar_values = np.zeros((nInds, nDvars))
self.__obj_values = np.zeros((nInds, nObjs))
i = 0
for ind in sorted(population.keys()):
self.__ids.append(int(ind))
k = 0
for dvar, value in sorted(population[ind]['dvar'].items()):
self.__dvar_values[i, k] = float(value)
k += 1
if i == 0:
self.__dvar_names.append( dvar )
k = 0
for obj, value in sorted(population[ind]['obj'].items()):
self.__obj_values[i, k] = float(value)
k += 1
if i == 0:
self.__obj_names.append( obj )
i += 1
def __parse_version_2_0_0(self, data):
"""Read in data
Parameters
----------
data : str
A generation file
Notes
-----
Storage of columns: dvars, objs, ID
"""
self.__dvarBounds = data["dvar-bounds"]
self.__constraints = data["constraints"]
individuals = data["solutions"]
nDvars = len(individuals[0]['dvar'])
nObjs = len(individuals[0]['obj'])
nInds = len(individuals)
self.__dvar_values = np.zeros((nInds, nDvars))
self.__obj_values = np.zeros((nInds, nObjs))
for i, entry in enumerate(individuals):
for key in entry.keys():
if key == 'dvar':
k = 0
for dvar, value in sorted(entry[key].items()):
self.__dvar_values[i, k] = float(value)
k += 1
if i == 0:
self.__dvar_names.append( dvar )
elif key == 'obj':
k = 0
for obj, value in sorted(entry[key].items()):
self.__obj_values[i, k] = float(value)
k += 1
if i == 0:
self.__obj_names.append( obj )
elif key == 'ID':
self.__ids.append( int(entry[key]) )
def __hasLoaded(self):
"""Throw error if no generation or pareto front file loaded.
"""
if self.__loaded_generation < 0 and self.__loaded_pareto_front < 0:
raise RuntimeError("Neither generation nor pareto front file loaded.")
def __actual_parse(self, filename):
"""Parse an optimizer JSON file
Parameters
----------
filename : str
JSON file to be loaded
"""
data = json.load(open(filename))
tag = 'OPAL version'
if tag in data.keys():
version = data[tag]
version_int = int(version.replace('.', ''))
if version_int < 210:
raise IOError('No version ' + version + ' supported.')
self.__parse_version_2_1_0(data)
else:
self.__parse_version_2_0_0(data)
[docs] def parse(self, directory):
"""Collect information about optimizer JSON files.
Parameters
----------
directory : str
Where the .json files are
Notes
-----
Raises a runtime error if
- there are several several different basenames
- no json file is found
"""
self.__directory = directory
self.__reset_attributes()
tbasename = ""
for f in os.listdir(self.__directory):
if f.endswith(".json"):
if self.__pareto in f:
self.__nParetoFiles += 1
continue
self.__nJsonFiles += 1
tbasename = os.path.basename(f)
split = re.split('_|.json', tbasename)
tbasename = '_' + split[1] + '_'
self.__nOptimizers = max(self.__nOptimizers, int(split[2]))
# check if only one type of simulation
if self.__basename != "" and tbasename != self.__basename:
raise RuntimeError("Several simulations with different name!")
self.__basename = tbasename
if self.__nJsonFiles == 0:
raise RuntimeError("No json file found in directory '" + self.__directory + "'.")
self.__nOptimizers += 1
print ( "Found " + str(self.__nJsonFiles) + " json files from " +
str(self.__nOptimizers) + " optimizers and " + str(self.__nParetoFiles) +
" pareto front files")
def __reset_attributes(self):
"""Reset attributes to default values.
"""
self.__nJsonFiles = 0
self.__basename = ""
self.__nOptimizers = -1
self.__nParetoFiles = 0
self.__pareto = 'ParetoFront'
self.__loaded_generation = -1
self.__loaded_optimizer = -1
self.__loaded_pareto_front = -1
self.__clear_container()
def __clear_container(self):
"""Clear old attribute data
"""
self.__ids = []
self.__dvar_values = np.empty((0, 0))
self.__dvar_names = []
self.__obj_values = np.empty((0, 0))
self.__obj_names = []
self.__dvarBounds = {} # for each generation file the same
self.__constraints = [] # for each generation file the same
[docs] def clear(self):
"""Clear data.
"""
self.__directory = ""
self.__reset_attributes()
[docs] def check_file(self, filename):
"""Check if a file is really an optimizer output
Parameters
----------
filename : str
JSON file to be loaded
Returns
-------
bool
True if an optimizer file, otherwise False
"""
try:
self.__actual_parse(filename)
self.clear()
except:
self.clear()
return False
return True