'''
MIT License
Copyright (c) [2022] [Temitope Ajayi]
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
'''
from pytoughreact.constants.format_specifications import t2react_format_specification, \
t2react_extra_precision_format_specification
from pytoughreact.constants.defaults_constants import DEFAULT_REACT
from pytoughreact.constants.defaults_constants import DEFAULT_PARAMETERS
from pytoughreact.constants.sections import t2react_sections
from fixed_format_file import default_read_function, fixed_format_file
from pytoughreact.wrapper.reactgrid import T2ReactGrid
from pytoughreact.wrapper.reactblock import T2Block
from pytoughreact.exceptions.custom_error import NotFoundError
from pytoughreact.wrapper.reactzone import T2Zone
from copy import deepcopy
from t2data import t2data, fix_blockname, rocktype
from t2grids import t2connection
import numpy as np
import os
import sys
from pathlib import Path
from os.path import splitext, basename
from os import devnull, remove
from t2data import trim_trailing_nones
from mulgrids import padstring
from subprocess import call
import typing
[docs]
class T2ReactParser(fixed_format_file):
"""Class for parsing REACTION data file."""
[docs]
def __init__(self, filename, mode, specification=t2react_format_specification, read_function=default_read_function,
location=None):
# super(T2ReactParser, self).__init__(filename, mode,
# t2react_format_specification, read_function)
self.specification = specification
self.read_function = read_function
self.preprocess_specification()
print(os.getcwd())
if location is None:
self.file = open(filename, mode)
else:
os.chdir(location)
self.file = open(filename, mode)
[docs]
class T2React(t2data):
[docs]
def __init__(self, filename='', meshfilename='', read_function=default_read_function):
"""
Main class for structuring the writing , reading and running of reaction simulations
"""
super().__init__(filename='', meshfilename='', read_function=default_read_function)
self.react = DEFAULT_REACT.copy()
self.grid = T2ReactGrid()
self.title = ''
self.simulator = ''
self.parameter = deepcopy(DEFAULT_PARAMETERS)
self._more_option_str = '0' * 21,
self.more_option = np.zeros(22, int)
self.multi = {}
self.start = False
self.relative_permeability = {}
self.capillarity = {}
self.lineq = {}
self.output_times = {}
self.generatorlist = []
self.generator = {}
self.short_output = {}
self.incon = {}
self.solver = {}
self.history_block = []
self.history_connection = []
self.history_generator = []
self.indom = {}
self.noversion = False
self.diffusion = []
self.selection = {}
self.meshmaker = []
self._sections = []
self.end_keyword = 'ENDCY'
self._extra_precision, self._echo_extra_precision = [], True
self.update_read_write_functions()
self.read_function = read_function
# if self.filename:
# self.read(filename, meshfilename)
self.minerals = []
self.all_species = []
[docs]
def read_connections(self, infile):
"""Reads grid connections"""
self.grid.connectionlist, self.grid.connection = [], {}
line = padstring(infile.readline())
while line.strip() and not line.startswith('+++'):
[name1, name2, nseq,
nad1, nad2, isot, d1, d2,
areax, betax, sigx] = infile.parse_string(line, 'connections')
name1, name2 = fix_blockname(name1), fix_blockname(name2)
if nseq == 0:
nseq = None
if nad1 == 0:
nad1 = None
if nad2 == 0:
nad2 = None
self.grid.add_connection(t2connection([self.grid.block[name1],
self.grid.block[name2]],
isot, [d1, d2], areax, betax,
sigx, nseq, nad1, nad2))
line = padstring(infile.readline())
[docs]
def read_rocktypes(self, infile):
""" Reads grid rock types
Parameters
-----------
infile : str
Input file processor
Returns
--------
rocktype: self
adds rocktype information to grid
"""
self.grid.rocktypelist = []
self.grid.rocktype = {}
line = padstring(infile.readline())
while line.strip():
[name, nad, density, porosity,
k1, k2, k3, conductivity, specific_heat] = infile.parse_string(line, 'rocks1')
self.grid.add_rocktype(rocktype(name, nad,
density, porosity,
[k1, k2, k3],
conductivity, specific_heat))
if nad is None:
nad = 0
if nad >= 1: # additional lines:
infile.read_value_line(self.grid.rocktype[name].__dict__, 'rocks1.1')
if nad >= 2:
vals = infile.read_values('rocks1.2')
self.grid.rocktype[name].relative_permeability['type'] = vals[0]
self.grid.rocktype[name].relative_permeability['parameters'] = vals[2: -1]
vals = infile.read_values('rocks1.3')
self.grid.rocktype[name].capillarity['type'] = vals[0]
self.grid.rocktype[name].capillarity['parameters'] = vals[2: -1]
line = padstring(infile.readline())
zone = T2Zone(name)
self.grid.add_zone(zone)
[docs]
def read_blocks(self, infile):
""" Reads grid blocks
Parameters
-----------
infile : str
Input file processor
Returns
--------
parameter: self
reads block information
"""
self.grid.block, self.grid.blocklist = {}, []
line = padstring(infile.readline())
while line.strip():
[name, nseq, nadd, rockname,
volume, ahtx, pmx, x, y, z] = infile.parse_string(line, 'blocks')
name = fix_blockname(name)
if rockname in self.grid.rocktype:
rocktype = self.grid.rocktype[rockname]
if rockname in self.grid.zone:
zonename = self.grid.zone[rockname]
elif rockname.strip() == '' and self.grid.num_rocktypes > 0:
rocktype = self.grid.rocktypelist[0] # default
else:
try: # check if rocktype index specified:
rockindex = int(rockname) - 1
rocktype = self.grid.rocktypelist[rockindex]
except Exception:
raise RuntimeError("Unknown rocktype " + rockname + " in block " + name)
if (x is not None) and (y is not None) and (z is not None):
centre = np.array([x, y, z])
else:
centre = None
if nseq == 0:
nseq = None
if nadd == 0:
nadd = None
self.grid.add_block(T2Block(name, volume, rocktype, blockzone=zonename,
centre=centre, ahtx=ahtx,
pmx=pmx, nseq=nseq, nadd=nadd))
line = padstring(infile.readline())
[docs]
def update_read_write_functions(self):
"""Updates functions for reading and writing sections of data file."""
self.read_fn = dict(zip(
t2react_sections,
[self.read_simulator,
self.read_rocktypes,
self.read_parameters,
self.read_react,
self.read_more_options,
self.read_start,
self.read_noversion,
self.read_rpcap,
self.read_lineq,
self.read_solver,
self.read_multi,
self.read_times,
self.read_selection,
self.read_diffusion,
self.read_blocks,
self.read_connections,
self.read_meshmaker,
self.read_generators,
self.read_short_output,
self.read_history_blocks,
self.read_history_connections,
self.read_history_generators,
self.read_incons,
self.read_indom]))
self.write_fn = dict(zip(
t2react_sections,
[self.write_simulator,
self.write_rocktypes,
self.write_parameters,
self.write_react,
self.write_more_options,
self.write_start,
self.write_noversion,
self.write_rpcap,
self.write_lineq,
self.write_solver,
self.write_multi,
self.write_times,
self.write_selection,
self.write_diffusion,
self.write_blocks,
self.write_connections,
self.write_meshmaker,
self.write_generators,
self.write_short_output,
self.write_history_blocks,
self.write_history_connections,
self.write_history_generators,
self.write_incons,
self.write_indom]))
[docs]
def get_present_sections(self):
""" Returns a list of TOUGH2 section keywords for which there are
corresponding data in the t2react object.
Parameters
-----------
Returns
--------
parameters : list
list of present sections
"""
data_present = dict(zip(
t2react_sections,
[self.simulator,
self.grid and self.grid.rocktypelist,
self.parameter,
self.react,
np.any(self.more_option),
self.start,
self.noversion,
self.relative_permeability or self.capillarity,
self.lineq,
self.solver,
self.multi,
self.output_times,
self.selection,
self.diffusion,
self.grid,
self.grid,
self.meshmaker,
self.generatorlist,
self.short_output,
self.history_block,
self.history_connection,
self.history_generator,
self.incon,
self.indom]))
return [keyword for keyword in t2react_sections if data_present[keyword]]
present_sections = property(get_present_sections)
[docs]
def section_insertion_index(self, section):
""" Determines an appropriate position to insert the specified section
in the internal list of data file sections.
Parameters
-----------
Returns
--------
"""
try:
listindex = t2react_sections.index(section)
if listindex == 0:
return 0 # SIMUL section
else:
# first look for sections above the one specified,
# and put new one just after the last found:
for i in reversed(range(listindex)):
try:
section_index = self._sections.index(t2react_sections[i])
return section_index + 1
except ValueError:
pass
# look for sections below the one specified,
# and put new one just before the first found:
for i in range(listindex, len(t2react_sections)):
try:
section_index = self._sections.index(t2react_sections[i])
return section_index
except ValueError:
pass
return len(self._sections)
except ValueError:
return len(self._sections)
[docs]
def check_for_executable(self, executable_name, directory):
""" Check if the executable exists in the folder.
Parameters
-----------
executable_name : str
name of executable
directory : str
directory that contains executable
Returns
--------
exception : NotFoundError
returns error if executable not found
"""
path = Path(directory + '/' + executable_name)
output = path.is_file()
if output is True:
return
else:
raise NotFoundError('Tough React Executable', path.resolve())
[docs]
def check_for_thermodynamic_database(self, directory, t2solute):
""" Check if the thermodynamic database exists in the folder.
Parameters
-----------
t2solute : t2solute
t2solute class
directory : str
directory that contains thermodynamic database
Returns
--------
exception : NotFoundError
returns error if executable not found
"""
database_name = t2solute.readio['database']
path = Path(directory + '/' + database_name)
output = path.is_file()
if output is True:
return
else:
raise NotFoundError('Thermodynamic Database', path.resolve())
[docs]
def run(self, t2solute, save_filename='', incon_filename='', runlocation='', simulator='AUTOUGH2_2',
silent=False, output_filename=''):
""" Runs simulation using TOUGH2 ,AUTOUGH2, TOUGHREACT.
Parameters
-----------
save_filename : str
filename of SAVE file
incon_filename : str
filename of INCON file
run_location : str
path to Executable and required files
simulator : str
simulator executable name
silent: boolean
if screen messages should be written out
output_filename: str
name of output listing file (only TOUGH2)
Returns
--------
"""
if runlocation:
os.chdir(os.path.dirname(os.path.realpath(__file__)))
os.chdir(runlocation)
self.check_for_thermodynamic_database(runlocation, t2solute)
self.check_for_executable(simulator, runlocation)
if self.filename:
root_dir = os.path.abspath(os.curdir)
self.check_for_executable(simulator, root_dir)
self.check_for_thermodynamic_database(root_dir, t2solute)
datbase, ext = splitext(self.filename)
if (self.type == 'AUTOUGH2'):
if save_filename == '':
save_filename = datbase + '.save'
if incon_filename == '':
incon_filename = datbase + '.incon'
savebase, ext = splitext(save_filename)
inconbase, ext = splitext(incon_filename)
runfilename = datbase + '_' + basename(simulator) + '.in'
open(runfilename, 'w').write('\n'.join([savebase, inconbase, datbase]))
infile = open(runfilename, 'r')
cmd = [simulator]
if silent:
outfile = open(devnull, 'w')
else:
outfile = None
# run AUTOUGH2:
call(cmd, stdin=infile, stdout=outfile)
infile.close()
remove(runfilename)
else: # run TOUGH2 (need to specify simulator executable name)
if runlocation:
os.chdir(runlocation)
if os.path.exists("GENER"):
os.remove("GENER")
if os.path.exists("OUTPUT_ELEME.csv"):
os.remove("OUTPUT_ELEME.csv")
cmd = [simulator]
infile = open(self.filename, 'r')
if silent:
outfile = None
else:
if output_filename == '':
datbase = 'screen_info'
outfilename = datbase + '.out'
else:
outfilename = output_filename
outfile = open(outfilename, 'w')
# p = Popen(os.path.join(current_dir,">treacteos1<flow.inp"),cwd=current_dir)
status = call(cmd, stdin=infile, stdout=outfile)
print(status)
outfile.close()
infile.close()
# status_2 = run(["treacteos1.exe"],
# stdin=infile,
# stdout=PIPE,
# text=True)
# print(status_2)
[docs]
def read_parameters(self, infile):
""" Reads simulation parameters
Parameters
-----------
infile : str
Input file processor
Returns
--------
line: str
read parameters block to grid
"""
spec = ['param1', 'param1_autough2'][self.type == 'AUTOUGH2']
infile.read_value_line(self.parameter, spec)
mops = self.parameter['_option_str'].rstrip().ljust(24).replace(' ', '0')
self.parameter['option'] = np.array([0] + [int(mop) for mop in mops], int)
infile.read_value_line(self.parameter, 'param2')
if (self.parameter['print_block'] is not None) and (self.parameter['print_block'].strip() == ''):
self.parameter['print_block'] = None
self.read_timesteps(infile)
infile.read_value_line(self.parameter, 'param3')
for val in infile.read_values('default_incons'):
self.parameter['default_incons'].append(val)
self.parameter['default_incons'] = trim_trailing_nones(self.parameter['default_incons'])
# read any additional lines of default incons:
more = True
while more:
line = padstring(infile.readline())
if line.strip():
section = any([line.startswith(keyword) for keyword in t2react_sections])
if section:
more = False
else:
more_incons = infile.parse_string(line, 'default_incons')
more_incons = trim_trailing_nones(more_incons)
self.parameter['default_incons'] += more_incons
else:
more, line = False, None
return line
[docs]
def read_react(self, infile):
pass
[docs]
def write_react(self, outfile):
""" Writes React Parameters
Parameters
-----------
outfile : str
output file processor
Returns
--------
"""
outfile.write('REACT\n')
vals = self.react
outfile.write_values(vals, 'REACT')
[docs]
def read(self, filename='', meshfilename='', file_location: str = None):
""" Reads data from file. Mesh data can optionally be read from an
auxiliary file. Extra precision data will also be read from
an associated '.pdat' file, if it exists.
Reads data from file. Mesh data can optionally be read from an
auxiliary file. Extra precision data will also be read from
an associated '.pdat' file, if it exists.
Parameters
-----------
filename : str
file to read
meshfilename : str
Mesh file to read
file_location : str
Location where the file is on the local system
Returns
--------
"""
if file_location is None:
file_location = os.getcwd()
if filename:
self.filename = filename
mode = 'r' if sys.version_info > (3,) else 'rU'
infile = T2ReactParser(self.filename, mode, read_function=self.read_function, location=file_location)
self.read_title(infile)
self._sections = []
more = True
next_line = None
while more:
if next_line:
line = next_line
else:
line = infile.readline()
if line:
keyword = line[0: 5].strip()
if keyword in ['ENDCY', 'ENDFI']:
more = False
self.end_keyword = keyword
elif keyword in t2react_sections:
fn = self.read_fn[keyword]
next_line = None
if keyword == 'SHORT':
fn(infile, line)
elif keyword == 'PARAM':
next_line = fn(infile)
else:
fn(infile)
self._sections.append(keyword)
else:
more = False
infile.close()
if meshfilename and (self.grid.num_blocks == 0):
self.meshfilename = meshfilename
if isinstance(meshfilename, str):
mode = 'r' if sys.version_info > (3,) else 'rU'
meshfile = T2ReactParser(self.meshfilename, mode, read_function=self.read_function)
self.read_meshfile(meshfile)
meshfile.close()
elif isinstance(meshfilename, (list, tuple)):
if len(meshfilename) == 2:
self.read_binary_meshfiles()
else:
print('Mesh filename must be either a string or a two-element tuple or list.')
return self
[docs]
def write(self, filename='', meshfilename='', runlocation='',
extra_precision=None, echo_extra_precision=None):
""" Writes data to file. Mesh data can optionally be written to an
auxiliary file. For AUTOUGH2, if extra_precision is True or a
list of section names, the corresponding data sections will be
written to an extra precision file; otherwise, the same
sections (if any) that were read in as extra precision will
also be written out as extra precision. If
echo_extra_precision is True, the extra precision sections
will also be written to the main data file.
Parameters
-----------
filename : str
file to read
meshfilename : str
Mesh file to read
runlocation : str
Path containing executables
extra_precision: boolean
Required for AUTOUGH from PYTOUGH
echo_extra_precision: boolean
Required for AUTOUGH from PYTOUGH
Returns
--------
"""
if runlocation:
if not os.path.isdir(runlocation):
os.mkdir(runlocation)
os.chdir(runlocation)
if filename:
self.filename = filename
if self.filename == '':
self.filename = 't2react.dat'
self.update_sections()
mesh_sections = []
if meshfilename:
self.meshfilename = meshfilename
if self.meshfilename:
if isinstance(self.meshfilename, str):
meshfile = T2ReactParser(self.meshfilename, 'w')
self.write_blocks(meshfile)
self.write_connections(meshfile)
meshfile.close()
mesh_sections = ['ELEME', 'CONNE']
elif isinstance(self.meshfilename, (list, tuple)):
if len(self.meshfilename) == 2:
self.write_binary_meshfiles()
mesh_sections = ['ELEME', 'CONNE']
if self.type == 'AUTOUGH2':
self.write_extra_precision(extra_precision, echo_extra_precision)
outfile = T2ReactParser(self.filename, 'w')
self.write_title(outfile)
for keyword in self._sections:
if (keyword not in mesh_sections) and ((keyword not in self.extra_precision) or
(keyword in self.extra_precision and self.echo_extra_precision)):
self.write_fn[keyword](outfile)
outfile.write(self.end_keyword + '\n')
outfile.close()