'''
MIT License
Copyright (c) [2022] [Temitope Ajayi]
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
'''
import os
import numpy as np
import csv
import collections
import itertools
from pytoughreact.utilities.t2_utilities import T2Utilities
[docs]
class ResultTough3(object):
""" Class for processing results from Tough3 """
[docs]
def __init__(self, simulatortype, filelocation, filetitle=None, **kwargs):
"""Initialization of Parameters
Parameters
-----------
simulator_type : string
Type of simulator being run. Can either be 'tmvoc', 'toughreact' or 'tough3'.
Should be tough3 for this class
file_location : string
Location of results file on system
file_title : string
Title or name of the file. Example is 'kddconc.tec' or 'OUTPUT.csv'
kwargs: dict
1) generation (string) - if generation data exists in the results.
Returns
--------
"""
if filelocation is None:
self.filelocation = os.getcwd()
else:
self.filelocation = filelocation
os.chdir(self.filelocation)
self.filetitle = filetitle
self.simulatortype = simulatortype
self.generation = kwargs.get('generation')
self.file_as_list = []
def __repr__(self):
return 'Results from ' + self.filelocation + ' in ' + self.filetitle + ' for ' + self.simulatortype
[docs]
def read_file(self):
""" Read file specified in file_location and file_title
Parameters
-----------
Returns
--------
file_as_list : list
Results from file as list
"""
os.chdir(self.filelocation)
with open(self.filetitle) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',', quotechar='"')
self.file_as_list = []
for row in csv_reader:
self.file_as_list.append(row)
return self.file_as_list
[docs]
def get_times(self):
""" Get times stored for duration of the simulation
Parameters
-----------
grid_block_number : int
The grid block number for which to retrieve the results
format_of_date : str
Provides information to the method on format of the date. For example. year, hour, min or seconds
Returns
--------
unprocessed_time_data : list
Time data directly from file without processing.
"""
self.read_file()
time = []
unprocessed_time_data = []
if self.generation is True:
for i in range(1, len(self.file_as_list)):
unprocessed_time_data.append(float(self.file_as_list[i][0]))
else:
for i in range(len(self.file_as_list)):
if len(self.file_as_list[i]) == 1:
time.append(self.file_as_list[i])
for i in range(len(time)):
interim = time[i][0].split()
unprocessed_time_data.append(float(interim[2]))
return unprocessed_time_data
[docs]
def convert_times(self, format_of_date):
""" Convert time to desirable format e.g day, month, year
Parameters
-----------
format_of_date : str
Provides information to the method on format of the date. For example. year, hour, min or seconds
Returns
--------
processed_time_data : list
List of converted time
"""
intermediate = self.get_times()
utility_function = T2Utilities()
processed_time_data = utility_function.convert_times(intermediate, format_of_date)
return processed_time_data
[docs]
def get_time_index(self):
""" Get Index of Time
Parameters
-----------
Returns
--------
processed_time_data : list
Index the time
"""
self.read_file()
indexed_time = []
for index, value in enumerate(self.file_as_list):
if len(self.file_as_list[index]) == 1:
indexed_time.append(index)
indexed_time.append(len(self.file_as_list))
return indexed_time
[docs]
def get_generation_data(self, param):
""" Get data from generation.
Parameters
-----------
param: string
Parameter to be derive data
Returns
--------
result_array : list
Results from the generation.
"""
self.read_file()
result_array = []
heading = []
heading_first = self.file_as_list[0]
heading_first_modify = []
for i in heading_first:
heading_first_modify.append(i.upper())
for i in range(len(heading_first_modify)):
heading.append(heading_first_modify[i].lstrip())
index_param = heading.index(param.upper())
for i in range(1, len(self.file_as_list)):
result_array.append(float(self.file_as_list[i][index_param]))
return result_array
[docs]
def get_elements(self):
""" Get elements from the simulation
Parameters
-----------
Returns
--------
elements : list
Elements present in the result file.
"""
self.read_file()
indextime = self.get_time_index()
temp_file = self.file_as_list[indextime[0] + 1:indextime[1]]
elements = []
for i in range(len(temp_file)):
elements.append(temp_file[i][0])
return elements
[docs]
def get_parameters(self):
""" Remove space from parameters
Parameters
-----------
Returns
--------
parameter_list : list
Parameters with blanks removed.
"""
self.read_file()
parameter_list = self.file_as_list[0]
for i in range(len(parameter_list)):
parameter_list[i] = parameter_list[i].replace(" ", "")
return parameter_list
[docs]
def get_result_dictionary(self):
""" Results in dictionary form
Parameters
-----------
Returns
--------
result_dict : dict
Results dictionary
"""
self.read_file()
result_dict = {}
temp_dict = {}
index_time = self.get_time_index()
timeraw = self.get_times()
for i in range(len(index_time) - 1):
temp_dict[i] = self.file_as_list[index_time[i] + 1:index_time[i + 1]]
for i in range(len(timeraw)):
result_dict[timeraw[i]] = temp_dict[i]
return result_dict
[docs]
def get_timeseries_data(self, param, gridblocknumber):
""" Get Time series data
Parameters
-----------
grid_block_number : int
The grid block number for which to retrieve the results
param: string
Parameter to be derive data
Returns
--------
final_timeseries_data : list
Time series data for particular parameter.
"""
self.read_file()
results = self.get_result_dictionary()
resultarray = []
heading = []
heading_first = self.file_as_list[0]
heading_first_modify = []
for i in heading_first:
heading_first_modify.append(i.upper())
for i in range(len(heading_first_modify)):
heading.append(heading_first_modify[i].lstrip())
index_param = heading.index(param.upper())
for k in results.keys():
resultarray.append(results[k][gridblocknumber][index_param].lstrip())
final_timeseries_data = [float(x) for x in resultarray]
return final_timeseries_data
[docs]
def get_element_data(self, time, param):
""" Get Data for elements
Parameters
-----------
time : float
Time in which the data should be retrieved.
param: string
Parameter to be derive data
Returns
--------
final_element_data : list
Data for each of the elements.
"""
self.read_file()
timeraw = self.get_times()
results = self.get_result_dictionary()
heading = []
heading_first = self.file_as_list[0]
heading_first_modify = []
for i in heading_first:
heading_first_modify.append(i.upper())
for i in range(len(heading_first_modify)):
heading.append(heading_first_modify[i].lstrip())
index_param = heading.index(param.upper())
if time < timeraw[0]:
time = timeraw[0]
elif time > timeraw[-1]:
time = timeraw[-1]
else:
absolute_difference_function = lambda list_value: abs(list_value - time)
time = min(timeraw, key=absolute_difference_function)
results_specific = results[time]
data = []
for i in range(len(results_specific)):
data.append(results_specific[i][index_param].lstrip())
final_element_data = [float(x) for x in data]
return final_element_data
[docs]
def get_x_data(self, time):
""" Get X Axis Data
Parameters
-----------
time : float
Time in which the data should be retrieved.
Returns
--------
output : list
Data for the x axis.
"""
return self.get_element_data(time, 'x')
[docs]
def get_y_data(self, time):
""" Get Y Axis Data
Parameters
-----------
time : float
Time in which the data should be retrieved.
Returns
--------
output : list
Data for the y axis.
"""
return self.get_element_data(time, 'y')
[docs]
def get_z_data(self, time):
""" Get Z Axis Data
Parameters
-----------
time : float
Time in which the data should be retrieved.
Returns
--------
output : list
Data for the z axis.
"""
return self.get_element_data(time, 'z')
[docs]
def get_coord_data(self, direction, timer):
""" Get Coordinate Data
Parameters
-----------
timer : float
Time in which the data should be retrieved.
direction : string
Direction to get data. Can be 'X', 'Y', 'Z'
Returns
--------
direction_value_output : list
Data for the specified direction.
"""
if direction.lower() == 'x':
direction_value_output = self.get_x_data(timer)
elif direction.lower() == 'y':
direction_value_output = self.get_y_data(timer)
elif direction.lower() == 'z':
direction_value_output = self.get_z_data(timer)
else:
print("coordinates can either be X, Y or Z")
return direction_value_output
[docs]
def get_unique_x_data(self, timer):
""" Get Unique X Axis Data
Parameters
-----------
timer : float
Time in which the data should be retrieved.
Returns
--------
unique_x_output_data : list
Unique data for the x axis.
"""
original_array = self.get_coord_data('x', timer)
indices_array = []
for i in range(0, len(original_array)):
try:
if original_array[i] > original_array[i + 1]:
indices_array.append(i)
else:
continue
except Exception:
pass
unique_x_output_data = original_array[0:indices_array[0] + 1]
return unique_x_output_data
[docs]
def get_x_start_points(self, timer):
""" Get X Axis Start Point Data
Parameters
-----------
timer : float
Time in which the data should be retrieved.
Returns
--------
indices_array : list
X Axis Start Point Data.
"""
original_array = self.get_coord_data('x', timer)
indices_array = []
for i in range(0, len(original_array)):
try:
if original_array[i] > original_array[i + 1]:
indices_array.append(i)
else:
continue
except Exception:
pass
# output_data = ori_array[0:indices_array[0] + 1]
return indices_array
[docs]
def get_unique_y_data(self, timer):
""" Get Unique Y Axis Data
Parameters
-----------
timer : float
Time in which the data should be retrieved.
Returns
--------
unique_y_output_data : list
Unique data for the y axis.
"""
original_array = self.get_coord_data('y', timer)
unique_y_output_data = list(set(original_array))
return unique_y_output_data
[docs]
def get_unique_z_data(self, timer):
""" Get Unique Z Axis Data
Parameters
-----------
timer : float
Time in which the data should be retrieved.
Returns
--------
unique_z_output_data : list
Unique data for the z axis.
"""
original_array = self.get_coord_data('z', timer)
unique_z_output_data = list(set(original_array))
return unique_z_output_data
[docs]
def get_number_of_layers(self, direction):
""" Get Number of Layers
Parameters
-----------
direction : string
Direction to get data. Can be 'X', 'Y', 'Z'
Returns
--------
number_of_layers : int
Total number of layers.
"""
if direction.lower() == 'x':
direction_data_array = self.get_unique_x_data(0)
elif direction.lower() == 'y':
direction_data_array = self.get_unique_y_data(0)
elif direction.lower() == 'z':
direction_data_array = self.get_unique_z_data(0)
else:
print("coordinates can either be X, Y or Z")
number_of_layers = len(direction_data_array)
return number_of_layers
[docs]
def get_z_layer_data(self, layer_number, param, timer):
""" Get Z Layer Data
Parameters
-----------
timer : float
Time in which the data should be retrieved.
layer_num: int
Layer number in which to retrieve data
param: string
Parameter to be derive data
Returns
--------
z_layer_data_output : list
Data for the z direction.
"""
x_start = self.get_x_start_points(timer)
z_data = self.get_element_data(timer, param)
total_grid_in_z = self.get_number_of_layers('z')
if layer_number > 1:
begin_index = x_start[layer_number - 2] + 1
else:
begin_index = 0
if layer_number < total_grid_in_z:
end_index = x_start[layer_number - 1] + 1
else:
end_index = 50
z_layer_data_output = z_data[begin_index:end_index]
return z_layer_data_output
[docs]
def get_x_depth_data(self, line_number, param, timer):
""" Get X Axis And Depth Data
Parameters
-----------
timer : float
Time in which the data should be retrieved.
line_number: int
Line number to retrieve x depth data for
param: string
Parameter to be derive data
Returns
--------
x_depth_data_array : list
Data for the x depth.
"""
element_data = self.get_element_data(timer, param)
x_layers = self.get_number_of_layers('x')
z_layers = self.get_number_of_layers('z')
x_depth_data_array = []
for i in range(0, z_layers):
x_depth_data_array.append(element_data[line_number - 1])
line_number = line_number + x_layers
return x_depth_data_array
[docs]
def get_layer_data(self, direction, layer_number, timer, param):
""" Get Layer Data
Parameters
-----------
direction : string
Direction to get data. Can be 'X', 'Y', 'Z'
timer : float
Time in which the data should be retrieved.
layer_num: int
Layer number in which to retrieve data
param: string
Parameter to be derive data
Returns
--------
layer_data_array : list
Data for the specified direction.
"""
number_of_layers = self.get_number_of_layers(direction)
if layer_number > number_of_layers:
raise ValueError("The specified layer is more than the number of layers in the model")
else:
if direction.lower() == 'z':
layer_data_array = self.get_z_layer_data(layer_number, param, timer)
elif direction.lower() == 'x':
layer_data_array = self.get_x_depth_data(layer_number, param, timer)
return layer_data_array
[docs]
def get_unique_coord_data(self, direction, timer):
""" Get Unique Coordinate Data
Parameters
-----------
direction : string
Direction to get data. Can be 'X', 'Y', 'Z'
timer : float
Time in which the data should be retrieved.
Returns
--------
unique_coordinate_data : list
Data for the unique coordinate.
"""
if direction.lower() == 'x':
unique_coordinate_data = self.get_unique_x_data(timer)
elif direction.lower() == 'y':
unique_coordinate_data = self.get_unique_y_data(timer)
elif direction.lower() == 'z':
unique_coordinate_data = self.get_unique_z_data(timer)
else:
print("coordinates can either be X, Y or Z")
return unique_coordinate_data
[docs]
def remove_non_increasing(self, sequence_a, sequence_b):
""" Remove Non Increasing Data
Parameters
-----------
sequence_a: list
list containing first sequence
sequence_b: list
list containing second sequence
Returns
--------
sequenceA, sequenceB : list, list
Data for the unique coordinate.
"""
monotone = self.check_strictly_increasing(sequence_a)
if not monotone:
index = self.duplicate_index(sequence_a)
if len(index) > 0:
sequence_a = self.del_index(sequence_a, index)
indexes1 = self.duplicate_index(sequence_b)
if len(sequence_a) != len(sequence_b):
if len(indexes1) > 0:
sequence_b = self.del_index(sequence_b, indexes1)
return sequence_a, sequence_b
[docs]
def check_strictly_increasing(self, sequence):
""" Check for only strictly increasing data
Parameters
-----------
sequence: list
list containing data
Returns
--------
output : list
Retuns strictly increasing data
"""
dx = np.diff(sequence)
return np.all(dx > 0)
[docs]
def del_index(self, input_list, indexes):
""" Delete index in data
Parameters
-----------
input_list: list
Input list to remove indexes
indexes: list
indexes for which to remove data
Returns
--------
input_list : list
List after indexes have been removed
"""
for index in sorted(indexes, reverse=True):
del input_list[index]
return input_list
[docs]
def duplicate_index(self, sequence):
""" Duplicate index in sequence
Parameters
-----------
sequence: list
list containing data
Returns
--------
output : list
Output after indexes have been duplicated
"""
dicta = {}
indexes = []
duplicates = collections.defaultdict(list)
for i, e in enumerate(sequence):
duplicates[e].append(i)
for k, v in sorted(duplicates.items()):
if len(v) >= 2:
dicta[k] = v
for k, v in dicta.items():
indexes.append(v[1:])
return list(itertools.chain.from_iterable(indexes))