Source code for opal.parser.OptimizerParser

# Copyright (c) 2017 - 2020, Matthias Frey, Paul Scherrer Institut, Villigen PSI, Switzerland
# All rights reserved
#
# Implemented as part of the PhD thesis
# "Precise Simulations of Multibunches in High Intensity Cyclotrons"
#
# This file is part of pyOPALTools.
#
# pyOPALTools is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# You should have received a copy of the GNU General Public License
# along with pyOPALTools. If not, see <https://www.gnu.org/licenses/>.

import json
import os
import numpy as np
import re
from .BaseParser import BaseParser

[docs]class OptimizerParser(BaseParser): """Parsing of JSON files generated by the OPAL optimizer Attributes ---------- __ids : list List of all individual IDs of a generation __dvar_values : numpy.matrix List of all design variables values for each ID (order corresponds to __ids) __obj_values : numpy.matrix List of all objective values for each ID (order corresponds to __ids) __obj_names : list Specifies columns in self.__obj_values __dvar_names : list Specifies columns in self.__dvar_values __pareto : str Notes ----- Supports following JSON files: .. highlight:: JSON .. code-block:: JSON { "name": "opt-pilot", "dvar-bounds": { "benergy": [ 0.071, 0.072 ], "phiinit": [ 106, 114 ], "prinit": [ -0.02, -0.01 ], "rinit": [ 2000, 2060 ] } , "constraints": [ "dpeak1 > 0.0", "dpeak2 > 0.0" ] , "solutions": [ { "ID": 0, "obj": { "dpeak1": 10.37, "dpeak2": 5.43, "dpeak3_5": 14.1095 } , "dvar": { "benergy": 0.0718672, "phiinit": 113.653, "prinit": -0.0191565, "rinit": 2018.26 } } , { "ID": 18, "obj": { "dpeak1": 5.95, "dpeak2": 4.75, "dpeak3_5": 17.6668 } , "dvar": { "benergy": 0.0715704, "phiinit": 110.517, "prinit": -0.0122177, "rinit": 2035.32 } } , { "ID": 2, "obj": { "dpeak1": 1.84, "dpeak2": 6.04, "dpeak3_5": 19.2461 } , "dvar": { "benergy": 0.0711942, "phiinit": 107.097, "prinit": -0.0198867, "rinit": 2034.75 } } ] } .. code-block:: JSON { "name": "opt-pilot", "OPAL version": "2.0.0", "git revision": "1849b7e5130657e8be50d524de0f6c50134f330a", "dvar-bounds": { "MX": "[ 16, 32 ]", "nstep": "[ 10, 40 ]" }, "constraints": [ "statVariableAt('rms_x',0.0) > 0", "statVariableAt('rms_x',0.0) > 0.001" ], "population": { "0": { "obj": { "dpeak1": 10.37, "dpeak2": 5.43, "dpeak3_5": 14.1095 }, "dvar": { "benergy": 0.0718672, "phiinit": 113.653, "prinit": -0.0191565, "rinit": 2018.26 } }, "18": { "obj": { "dpeak1": 5.95, "dpeak2": 4.75, "dpeak3_5": 17.6668 }, "dvar": { "benergy": 0.0715704, "phiinit": 110.517, "prinit": -0.0122177, "rinit": 2035.32 } }, "2": { "obj": { "dpeak1": 1.84, "dpeak2": 6.04, "dpeak3_5": 19.2461 }, "dvar": { "benergy": 0.0711942, "phiinit": 107.097, "prinit": -0.0198867, "rinit": 2034.75 } } } } .. highlight:: python3 Examples -------- >>> import OptimizerParser as optreader >>> import sys >>> try: 1. Find all .json files of a directory, e.g. "./" >>> optjson = optreader.OptimizerParser() >>> optjson.parse("./") 2. Read in a generation file, e.g. 1 >>> optjson.readGeneration(1) 3. Get all design variables >>> dvars = optjson.getDesignVariables() >>> print ( "Design variables: ", dvars ) 4. Get design variable bounds >>> bounds = optjson.getBounds() >>> print ( "Bounds: ", bounds ) 4a. Get only bounds of a specific design variable >>> bound = optjson.getBounds(dvars[2]) >>> lower = bound[0] >>> upper = bound[1] >>> print ( "dvar: " + dvars[1] + " >>> " lower bound: " + str(lower) + " >>> " upper bound: " + str(upper)) 5. Get all objectives >>> objs = optjson.getObjectives() >>> print ( "Objectives: ", objs ) 6. Get all constraints >>> constr = optjson.getConstraints() >>> print ("Constraints: " + constr[0] + " " + constr[1]) 7. Get an individual >>> print ( optjson.getIndividual(0) ) 8. Get an individual with specific ID >>> print ( optjson.getIndividualWithID(18) ) raise error >>> except: >>> print ( sys.exc_info()[1] ) """ # @param directory to json files
[docs] def __init__(self): """Constructor. """ self.clear()
@property def num_optimizers(self): return self.__nOptimizers
[docs] def getNumOfGenerations(self): return int(self.__nJsonFiles / self.__nOptimizers)
[docs] def readGeneration(self, gen, opt=0, pareto=False): """Parse a generation file Parameters ---------- gen : int The generation number opt : int, optional The optimizer number (default: 0) pareto : str, optional Read a pareto file (default: False) Notes ----- Raises a runtime error if generation file doesn't exist """ if pareto: filename = os.path.join(self.__directory, self.__pareto + self.__basename + str(opt) + '.json') self.__loaded_generation = -1 self.__loaded_optimizer = -1 self.__loaded_pareto_front = opt else: if not self.__loaded_generation == gen or \ not self.__loaded_optimizer == opt: self.__loaded_generation = gen self.__loaded_optimizer == opt self.__loaded_pareto_front = -1 else: return filename = os.path.join(self.__directory, str(gen) + self.__basename + str(opt) + '.json') if not os.path.isfile(filename): raise IOError("File '" + filename + "' does not exist.") # clear old data self.__clear_container() self.__actual_parse(filename)
[docs] def getIndividual(self, ind): """Get individual in a list of [dvars, objs, ID] Parameters ---------- ind : int The individual number (not the ID) Returns ------- list A list containing [design variables, objectives, ID] Notes ----- Raises an index error if individual number is out of bounds """ self.__hasLoaded() if ind > len(self.__ids) - 1 or ind < 0: raise IndexError("Individual number is out of bounds.") data = list(self.__dvar_values[ind, :]) + list(self.__obj_values[ind, :]) data.append(self.__ids[ind]) return data
[docs] def getIndividualWithID(self, ID): """Get invdividual with ID Parameters ---------- ID : int ID for which we want the data Returns ------- list A list containing [design variables, objectives, ID] Notes ----- Raises a runtime error if there's no individual with given ID """ idx = self.getIndexOfID(ID) data = list(self.__dvar_values[idx, :]) + list(self.__obj_values[idx, :]) data.append(self.__ids[idx]) return data
[docs] def getIndexOfID(self, ID): """Get Index of ID Returns ------- int The index of the individual with certain ID """ self.__hasLoaded() if ID not in self.__ids: raise RuntimeError("An individual with ID " + str(ID) + " is not present.") return self.__ids.index(ID)
[docs] def getDesignVariables(self): """Obtain names of all design variables Returns ------- list A list containing all design variable names """ self.__hasLoaded() return self.__dvar_names
[docs] def getObjectives(self): """ Obtain names of all objectives Returns ------- list A list containing all design variable names """ self.__hasLoaded() return self.__obj_names
[docs] def getBounds(self, dvar = ''): """Obtain the design variable bounds Parameters ---------- dvar : str, optional Design variable or '' Returns ------- dict or array If `dvar` == '' it returns the whole dictionary of design variables and their bounds otherwise it returns the bounds of the given design variable Notes ----- Raises an exception if design variable unknown """ self.__hasLoaded() if dvar == '': return self.__dvarBounds elif dvar not in self.__dvarBounds: raise RuntimeError("Unknown design variable '" + dvar + "'.") else: return self.__dvarBounds[dvar]
[docs] def getConstraints(self): """Obtain the constraints of the simulation. They are formulas written as strings Returns ------- list [str] A list of strings containing the constraints """ self.__hasLoaded() return self.__constraints
[docs] def getAllInput(self): """Obtain all design variable input. Returns ------- numpy.ndarray A ndarray where each column corresponds to the values of a design variable """ self.__hasLoaded() return self.__dvar_values
[docs] def getAllOutput(self): """ Obtain all objective output. Returns ------- numpy.ndarray A ndarray where each column corresponds to the values of an objective """ self.__hasLoaded() return self.__obj_values
[docs] def getIDs(self): """ Obtain all individual IDs. Returns ------- list [int] A list of IDs """ self.__hasLoaded() return self.__ids
def __parse_version_2_1_0(self, data): """Read in data Parameters ---------- data : str A generation file """ self.__dvarBounds = data["dvar-bounds"] self.__constraints = data["constraints"] population = data["population"] # get first individual id first_ind = next(iter(population)) nDvars = len(population[first_ind]['dvar']) nObjs = len(population[first_ind]['obj']) nInds = len(population) self.__dvar_values = np.zeros((nInds, nDvars)) self.__obj_values = np.zeros((nInds, nObjs)) i = 0 for ind in sorted(population.keys()): self.__ids.append(int(ind)) k = 0 for dvar, value in sorted(population[ind]['dvar'].items()): self.__dvar_values[i, k] = float(value) k += 1 if i == 0: self.__dvar_names.append( dvar ) k = 0 for obj, value in sorted(population[ind]['obj'].items()): self.__obj_values[i, k] = float(value) k += 1 if i == 0: self.__obj_names.append( obj ) i += 1 def __parse_version_2_0_0(self, data): """Read in data Parameters ---------- data : str A generation file Notes ----- Storage of columns: dvars, objs, ID """ self.__dvarBounds = data["dvar-bounds"] self.__constraints = data["constraints"] individuals = data["solutions"] nDvars = len(individuals[0]['dvar']) nObjs = len(individuals[0]['obj']) nInds = len(individuals) self.__dvar_values = np.zeros((nInds, nDvars)) self.__obj_values = np.zeros((nInds, nObjs)) for i, entry in enumerate(individuals): for key in entry.keys(): if key == 'dvar': k = 0 for dvar, value in sorted(entry[key].items()): self.__dvar_values[i, k] = float(value) k += 1 if i == 0: self.__dvar_names.append( dvar ) elif key == 'obj': k = 0 for obj, value in sorted(entry[key].items()): self.__obj_values[i, k] = float(value) k += 1 if i == 0: self.__obj_names.append( obj ) elif key == 'ID': self.__ids.append( int(entry[key]) ) def __hasLoaded(self): """Throw error if no generation or pareto front file loaded. """ if self.__loaded_generation < 0 and self.__loaded_pareto_front < 0: raise RuntimeError("Neither generation nor pareto front file loaded.") def __actual_parse(self, filename): """Parse an optimizer JSON file Parameters ---------- filename : str JSON file to be loaded """ data = json.load(open(filename)) tag = 'OPAL version' if tag in data.keys(): version = data[tag] version_int = int(version.replace('.', '')) if version_int < 210: raise IOError('No version ' + version + ' supported.') self.__parse_version_2_1_0(data) else: self.__parse_version_2_0_0(data)
[docs] def parse(self, directory): """Collect information about optimizer JSON files. Parameters ---------- directory : str Where the .json files are Notes ----- Raises a runtime error if - there are several several different basenames - no json file is found """ self.__directory = directory self.__reset_attributes() tbasename = "" for f in os.listdir(self.__directory): if f.endswith(".json"): if self.__pareto in f: self.__nParetoFiles += 1 continue self.__nJsonFiles += 1 tbasename = os.path.basename(f) split = re.split('_|.json', tbasename) tbasename = '_' + split[1] + '_' self.__nOptimizers = max(self.__nOptimizers, int(split[2])) # check if only one type of simulation if self.__basename != "" and tbasename != self.__basename: raise RuntimeError("Several simulations with different name!") self.__basename = tbasename if self.__nJsonFiles == 0: raise RuntimeError("No json file found in directory '" + self.__directory + "'.") self.__nOptimizers += 1 print ( "Found " + str(self.__nJsonFiles) + " json files from " + str(self.__nOptimizers) + " optimizers and " + str(self.__nParetoFiles) + " pareto front files")
def __reset_attributes(self): """Reset attributes to default values. """ self.__nJsonFiles = 0 self.__basename = "" self.__nOptimizers = -1 self.__nParetoFiles = 0 self.__pareto = 'ParetoFront' self.__loaded_generation = -1 self.__loaded_optimizer = -1 self.__loaded_pareto_front = -1 self.__clear_container() def __clear_container(self): """Clear old attribute data """ self.__ids = [] self.__dvar_values = np.empty((0, 0)) self.__dvar_names = [] self.__obj_values = np.empty((0, 0)) self.__obj_names = [] self.__dvarBounds = {} # for each generation file the same self.__constraints = [] # for each generation file the same
[docs] def clear(self): """Clear data. """ self.__directory = "" self.__reset_attributes()
[docs] def check_file(self, filename): """Check if a file is really an optimizer output Parameters ---------- filename : str JSON file to be loaded Returns ------- bool True if an optimizer file, otherwise False """ try: self.__actual_parse(filename) self.clear() except: self.clear() return False return True