Examples for PhreeqPy¶
Simple example¶
This is an example for the use PhreeqPy for one-dimensional advection. It is the example 11 from the PHREEQC users manual.
"""Advection with DLL or COM server.
Using MODFIY we update the concentration on every
time step. We shift by one cell per step.
"""
from __future__ import print_function
import sys
# Simple Python 3 compatibility adjustment.
if sys.version_info[0] == 2:
range = xrange
import os
import timeit
MODE = 'dll' # 'dll' or 'com'
if MODE == 'com':
import phreeqpy.iphreeqc.phreeqc_com as phreeqc_mod
elif MODE == 'dll':
import phreeqpy.iphreeqc.phreeqc_dll as phreeqc_mod
else:
raise Exception('Mode "%s" is not defined use "com" or "dll".' % MODE)
def make_initial_conditions():
"""
Specify initial conditions data blocks.
Uniform initial conditions are assumed.
"""
initial_conditions = """
TITLE Example 11.--Transport and ion exchange.
SOLUTION 0 CaCl2
units mmol/kgw
temp 25.0
pH 7.0 charge
pe 12.5 O2(g) -0.68
Ca 0.6
Cl 1.2
SOLUTION 1 Initial solution for column
units mmol/kgw
temp 25.0
pH 7.0 charge
pe 12.5 O2(g) -0.68
Na 1.0
K 0.2
N(5) 1.2
END
EXCHANGE 1
equilibrate 1
X 0.0011
END
"""
return initial_conditions
def make_selected_output(components):
"""
Build SELECTED_OUTPUT data block
"""
headings = "-headings cb H O "
for i in range(len(components)):
headings += components[i] + "\t"
selected_output = """
SELECTED_OUTPUT
-reset false
USER_PUNCH
"""
selected_output += headings + "\n"
#
# charge balance, H, and O
#
code = '10 w = TOT("water")\n'
code += '20 PUNCH CHARGE_BALANCE, TOTMOLE("H"), TOTMOLE("O")\n'
#
# All other elements
#
lino = 30
for component in components:
code += '%d PUNCH w*TOT(\"%s\")\n' % (lino, component)
lino += 10
selected_output += code
return selected_output
def initialize(cells, first=False):
"""
Initialize IPhreeqc module
"""
phreeqc = phreeqc_mod.IPhreeqc()
phreeqc.load_database(r"phreeqc.dat")
initial_conditions = make_initial_conditions()
phreeqc.run_string(initial_conditions)
components = phreeqc.get_component_list()
selected_output = make_selected_output(components)
phreeqc.run_string(selected_output)
phc_string = "RUN_CELLS; -cells 0-1\n"
phreeqc.run_string(phc_string)
conc = get_selected_output(phreeqc)
inflow = {}
initial = {}
for name in conc:
if first:
inflow[name] = conc[name][0]
else:
inflow[name] = conc[name][1]
initial[name] = conc[name][1]
task = initial_conditions + "\n"
task += "COPY solution 1 %d-%d\n" % (cells[0], cells[1])
task += "COPY exchange 1 %d-%d\n" % (cells[0], cells[1])
task += "END\n"
task += "RUN_CELLS; -cells %d-%d\n" % (cells[0], cells[1])
task += selected_output
phreeqc.run_string(task)
conc = get_selected_output(phreeqc)
for name in conc:
value = [initial[name]] * len(conc[name])
conc[name] = value
return phreeqc, inflow, conc
def advect_step(phreeqc, inflow, conc, cells):
"""Advect by shifting concentrations from previous time step.
"""
all_names = conc.keys()
names = [name for name in all_names if name not in ('cb', 'H', 'O')]
for name in conc:
# shift one cell
conc[name][1:] = conc[name][:-1]
conc[name][0] = inflow[name]
modify = []
for index, cell in enumerate(range(cells[0], cells[1] + 1)):
modify.append("SOLUTION_MODIFY %d" % cell)
modify.append("\t-cb %e" % conc['cb'][index])
modify.append("\t-total_h %f" % conc['H'][index])
modify.append("\t-total_o %f" % conc['O'][index])
modify.append("\t-totals")
for name in names:
modify.append("\t\t%s\t%f" % (name, conc[name][index]))
modify.append("RUN_CELLS; -cells %d-%d\n" % (cells[0], cells[1]))
cmd = '\n'.join(modify)
phreeqc.run_string(cmd)
conc = get_selected_output(phreeqc)
return conc
def get_selected_output(phreeqc):
"""Return calculation result as dict.
Header entries are the keys and the columns
are the values as lists of numbers.
"""
output = phreeqc.get_selected_output_array()
header = output[0]
conc = {}
for head in header:
conc[head] = []
for row in output[1:]:
for col, head in enumerate(header):
conc[head].append(row[col])
return conc
def run(ncells, shifts, specie_names):
"""Do one run in one process.
"""
cells = (1, ncells)
phreeqc, inflow, conc = initialize(cells, first=True)
outflow = {}
for name in specie_names:
outflow[name] = []
for _counter in range(shifts):
# advect
conc = advect_step(phreeqc, inflow, conc, cells)
for name in specie_names:
outflow[name].append(conc[name][-1])
return outflow
def write_outflow(file_name, outflow):
"""Write the outflow values to a file.
"""
fobj = open(file_name, 'w')
header = outflow.keys()
for head in header:
fobj.write('%20s' % head)
fobj.write('\n')
for lineno in range(len(outflow[head])):
for head in header:
fobj.write('%20.17f' % outflow[head][lineno])
fobj.write('\n')
def main(ncells, shifts):
"""Run different versions with and without multiprocessing
"""
def measure_time(func, *args, **kwargs):
"""Convinience function to measure run times.
"""
start = timeit.default_timer()
result = func(*args, **kwargs)
return result, timeit.default_timer() - start
print('Dimensions')
print('==========')
print('number of cells: ', ncells)
print('number of shifts ', shifts)
specie_names = ('Ca', 'Cl', 'K', 'N', 'Na')
outflow, run_time = measure_time(run, ncells, shifts, specie_names)
if not os.path.exists('data'):
os.mkdir('data')
write_outflow('data/out.txt', outflow)
print('run time:', run_time)
print("Finished simulation\n")
if __name__ == '__main__':
main(ncells=40, shifts=120)
Parallel Example¶
This is an example for the use PhreeqPy for one-dimensional advection
running in parallel, using multiprocessing
.
# -*- coding: utf-8 -*-
"""A coupled advection-reaction model using PhreeqPy.
The sketch below shows the setup. The coupled model contains a advection and a
reaction model. The reaction model can have one or more Phreeqc calculators.
The calculators can work in parallel using the module `multiprocessing`.
+--------------------------------------------------------------------------+
| CoupledModel |
| +--------------------------------------------------------------------+ |
| | AdvectionModel | |
| | | |
| +--------------------------------------------------------------------+ |
| | ReactionModel | |
| | +--------------------+--------------------+--------------------+ | |
| | | PhreeqcCalculator1 | PhreeqcCalculator2 | PhreeqcCalculator3 | | |
+--------------------------------------------------------------------------+
Author: Mike Müller, mmueller@hydrocomputing.com
"""
import multiprocessing
import os
import timeit
import matplotlib.pyplot as plt
MODE = 'dll' # 'dll' or 'com'
if MODE == 'com':
import phreeqpy.iphreeqc.phreeqc_com as phreeqc_mod
elif MODE == 'dll':
import phreeqpy.iphreeqc.phreeqc_dll as phreeqc_mod
else:
raise Exception('Mode "%s" is not defined use "com" or "dll".' % MODE)
class CoupledModel(object):
"""This is a coupled advection model.
Since it is just a simple example, we use a 1D model.
The same approach can be applied to a 2d or 3D model
as long as the advection model supports it.
The PHREEQC part is the same.
Furthermore, instead of a simple advection model,
we can have a more sophisticated transport model.
"""
def __init__(self, ncells, nshifts, initial_conditions, processes):
self.nshifts = nshifts
self.reaction_model = ReactionModel(ncells, initial_conditions,
processes)
self.reaction_model.make_initial_state()
init_conc = dict([(name, [value] * ncells) for name, value in
self.reaction_model.init_conc.items()])
self.advection_model = AdvectionModel(init_conc,
self.reaction_model.inflow_conc)
self.component_names = self.reaction_model.component_names
self.results = {}
for name in self.component_names:
self.results[name] = []
def run(self):
"""Go over all time steps (shifts).
"""
for _ in range(self.nshifts):
self.advection_model.advect()
self.advection_model.save_results(self.results)
self.reaction_model.modify(self.advection_model.conc)
self.advection_model.update_conc(self.reaction_model.conc)
self.reaction_model.finish()
class AdvectionModel(object):
"""Very simple 1D advection model.
This model can be replaced by a more sophisticated transport
model with two or three dimensions.
This could be an external code such as MT3D or anything
else that is moving concentrations through the subsurface.
"""
def __init__(self, init_conc, inflow_conc):
"""Set the initial and inflow concentrations.
Both concentrations are dictionaries with the specie names as keys.
Values are 1D arrays for `init_conc` and scalars for `inflow_conc`.
"""
self.conc = init_conc
self.inflow_conc = inflow_conc
self.outflow = {}
def update_conc(self, new_conc):
"""Update the concentrations after the reactions step.
This is very simple but could be more involved
if the transport model is more complex.
"""
self.conc = new_conc
def advect(self):
"""Shift one cell.
"""
for name in self.conc:
self.outflow[name] = self.conc[name][-1]
self.conc[name][1:] = self.conc[name][:-1]
self.conc[name][0] = self.inflow_conc[name]
def save_results(self, results):
"""Save the calculation results.
Typically, we would write our results into a file.
For simplicity we just add the current outflow that
we stored in `self.outflow` and add it to `results`,
which is a dictionary with all specie names as keys
and lists as values.
"""
for name in self.conc:
results[name].append(self.outflow[name])
class ReactionModel(object):
"""Calculate reactions using PHREEQC as computational engine.
We have no direct contact with IPhreeqc here.
We make one or more instances of `PhreeqcCalculator`
that are actually using IPhreeqc.
We can use more than one processor with `multiprocessing`.
"""
def __init__(self, ncells, initial_conditions, processes):
if processes > ncells:
raise ValueError('Number of processes needs to be less or equal '
'than number of cells. %d processes %d cells.'
% (processes, ncells))
if processes < 1:
raise ValueError('Need at least one process got %d' % processes)
self.parallel = False
if processes > 1:
self.parallel = True
self.ncells = ncells
self.initial_conditions = initial_conditions
self.processes = processes
self.inflow_conc = {}
self.init_conc = {}
self.conc = {}
self.component_names = []
self.calculators = []
self.cell_ranges = []
self._init_calculators()
self.make_initial_state()
def _init_calculators(self):
"""If we are going parallel we need several calculators.
"""
if self.parallel:
# Domain decomposition.
slave_ncells, reminder = divmod(self.ncells, self.processes)
root_ncells = slave_ncells + reminder
current_cell = root_ncells
root_calculator = PhreeqcCalculator(root_ncells,
self.initial_conditions)
self.calculators = [root_calculator]
self.cell_ranges = [(0, root_ncells)]
for _ in range(self.processes - 1):
self.calculators.append(PhreeqcCalculatorProxy(slave_ncells,
self.initial_conditions))
self.cell_ranges.append((current_cell,
current_cell + slave_ncells))
current_cell += slave_ncells
assert current_cell == self.ncells
self.calculators.reverse()
self.cell_ranges.reverse()
else:
root_calculator = PhreeqcCalculator(self.ncells,
self.initial_conditions)
# Just one calculator and the entire range but still use a list
# to provide the same interface as the parallel case.
self.calculators = [root_calculator]
self.cell_ranges = [(0, self.ncells)]
def make_initial_state(self):
"""Get the initial values from the calculator(s).
"""
self.inflow_conc = self.calculators[0].inflow_conc
self.init_conc = self.calculators[0].init_conc
self.component_names = self.calculators[0].component_names
if self.parallel:
# Make sure all calculators are initialized the same.
for calculator in self.calculators[1:]:
assert self.inflow_conc == calculator.inflow_conc
assert self.init_conc == calculator.init_conc
assert self.component_names == calculator.component_names
def modify(self, new_conc):
"""Pass new conc after advection to the calculator.
"""
self.conc = {}
for name in self.component_names:
self.conc[name] = []
for cell_range, calculator in zip(self.cell_ranges, self.calculators):
current_conc = dict([(name, value[cell_range[0]:cell_range[1]]) for
name, value in new_conc.items()])
calculator.modify(current_conc)
for calculator in self.calculators:
conc = calculator.get_modified()
for name in self.component_names:
self.conc[name].extend(conc[name])
def finish(self):
"""This is necessary for multiprocessing.
Multiprocessing uses external processes. These need to be
explicitly closed to avoid hanging of the program at
the end.
"""
for calculator in self.calculators:
calculator.finish()
class PhreeqcCalculator(object):
"""All PHREEQC calculations happen here.
This is the only place where we interact wit IPhreeqc.
Each instance of this class might run in a different
process using `multiprocessing`.
"""
def __init__(self, ncells, initial_conditions):
"""
ncells - number of cells
initial_conditions - string containing PHREEQC input for
solution and exchange, see example below
"""
self.ncells = ncells
self.initial_conditions = initial_conditions
self.inflow_conc = {}
self.init_conc = {}
self.conc = {}
self.phreeqc = phreeqc_mod.IPhreeqc()
self.phreeqc.load_database(r"phreeqc.dat")
self.components = []
self.component_names = []
self._make_initial_state()
def _make_initial_state(self):
"""Copy solution to all cells and calculate initial conditions.
"""
self.phreeqc.run_string(self.initial_conditions)
self.components = self.phreeqc.get_component_list()
start = 1
end = self.ncells
code = ''
code += "COPY solution 1 %d-%d\n" % (start, end)
code += "COPY exchange 1 %d-%d\n" % (start, end)
code += "END\n"
code += "RUN_CELLS; -cells %d-%d\n" % (start, end)
code += self.make_selected_output(self.components)
self.phreeqc.run_string(code)
self.conc = self.get_selected_output()
all_names = self.conc.keys()
self.component_names = [name for name in all_names if name not in
('cb', 'H', 'O')]
code = ''
code += self.make_selected_output(self.components)
code += "RUN_CELLS; -cells 0-1\n"
self.phreeqc.run_string(code)
start_conc = self.get_selected_output()
for name in self.component_names:
self.inflow_conc[name] = start_conc[name][0]
self.init_conc[name] = start_conc[name][1]
def modify(self, new_conc):
"""Set new concentration after advection and re-calculate.
"""
conc = self.conc
end = self.ncells + 1
conc.update(new_conc)
modify = []
for index, cell in enumerate(range(1, end)):
modify.append("SOLUTION_MODIFY %d" % cell)
modify.append("\t-cb %e" % conc['cb'][index])
modify.append("\t-total_h %s" % conc['H'][index])
modify.append("\t-total_o %s" % conc['O'][index])
modify.append("\t-totals")
for name in self.component_names:
modify.append("\t\t%s\t%s" % (name, conc[name][index]))
modify.append("RUN_CELLS; -cells %d-%d\n" % (1, self.ncells))
code = '\n'.join(modify)
self.phreeqc.run_string(code)
self.conc = self.get_selected_output()
def get_modified(self):
"""Return calculated conc.
"""
return self.conc
@ staticmethod # this is just a function but belongs here
def make_selected_output(components):
"""
Build SELECTED_OUTPUT data block.
"""
headings = "-headings cb H O "
headings += '\t'.join(components)
selected_output = """
SELECTED_OUTPUT
-reset false
USER_PUNCH
"""
selected_output += headings + "\n"
# charge balance, H, and O
code = '10 w = TOT("water")\n'
code += '20 PUNCH CHARGE_BALANCE, TOTMOLE("H"), TOTMOLE("O")\n'
# All other elements
lino = 30
for component in components:
code += '%d PUNCH w*TOT(\"%s\")\n' % (lino, component)
lino += 10
selected_output += code
return selected_output
def get_selected_output(self):
"""Return calculation result as dict.
Header entries are the keys and the columns
are the values as lists of numbers.
"""
output = self.phreeqc.get_selected_output_array()
header = output[0]
conc = {}
for head in header:
conc[head] = []
for row in output[1:]:
for col, head in enumerate(header):
conc[head].append(row[col])
return conc
def finish(self):
"""Placeholder to give same interface as the multiprocessing version.
"""
pass
class PhreeqcCalculatorProxy(object):
"""Proxy that communicates with other processes.
We uses this proxy for parallel computations.
All code that is specific for parallel computing is located
in here.
"""
def __init__(self, ncells, initial_conditions):
"""Go parallel.
"""
self.in_queue = multiprocessing.JoinableQueue()
self.out_queue = multiprocessing.JoinableQueue()
self.process = multiprocessing.Process(
target=process_worker,
args=(ncells, initial_conditions, self.in_queue, self.out_queue))
self.process.start()
(self.inflow_conc,
self.init_conc,
self.component_names) = self.out_queue.get()
def modify(self, new_conc):
"""Run PHREEQC in another process.
"""
self.in_queue.put(new_conc)
def get_modified(self):
"""Return calculated conc.
"""
return self.out_queue.get()
def finish(self):
"""Terminate the process.
"""
self.in_queue.put(None)
self.process.join()
def process_worker(ncells, initial_conditions, in_queue, out_queue):
"""This runs in another process.
"""
print('Started process with ID', os.getpid())
calculator = PhreeqcCalculator(ncells, initial_conditions)
out_queue.put((calculator.inflow_conc, calculator.init_conc,
calculator.component_names))
while True:
new_conc = in_queue.get()
# None is the sentinel. We are done
if new_conc is None:
break
calculator.modify(new_conc)
out_queue.put(calculator.conc)
def plot(ncells, outflow, specie_names):
"""Plot the results.
"""
colors = {'Ca': 'r', 'Cl': 'b', 'K': 'g', 'N': 'y', 'Na': 'm'}
x = [i / float(ncells) for i in
range(1, len(outflow[specie_names[0]]) + 1)]
args = []
for name in specie_names:
args.extend([x, outflow[name], colors[name]])
plt.plot(*args)
plt.legend(specie_names, loc=(0.8, 0.5))
plt.ylabel('MILLIMOLES PER KILOGRAM WATER')
plt.xlabel('PORE VOLUME')
plt.savefig("ex11.png")
plt.show()
def measure_time(func, *args, **kwargs):
"""Convenience function to measure run times.
"""
start = timeit.default_timer()
result = func(*args, **kwargs)
return result, timeit.default_timer() - start
def main(ncells, nshifts, processes=2, show_plot=False):
"""
Specify initial conditions data blocks.
Uniform initial conditions are assumed.
"""
initial_conditions = """
TITLE Example 11.--Transport and ion exchange.
SOLUTION 0 CaCl2
units mmol/kgw
temp 25.0
pH 7.0 charge
pe 12.5 O2(g) -0.68
Ca 0.6
Cl 1.2
SOLUTION 1 Initial solution for column
units mmol/kgw
temp 25.0
pH 7.0 charge
pe 12.5 O2(g) -0.68
Na 1.0
K 0.2
N(5) 1.2
END
EXCHANGE 1
equilibrate 1
X 0.0011
END
"""
def run():
"""Do the work.
"""
model = CoupledModel(ncells, nshifts, initial_conditions,
processes)
model.run()
return model, model.results
(model, outflow), run_time = measure_time(run)
print('Statistics')
print('==========')
print('number of cells: ', ncells)
print('number of shifts: ', nshifts)
print('number of processes:', processes)
print('run_time: ', run_time)
print()
if show_plot:
plot(ncells, outflow, model.component_names)
def benchmark(ncells=400, nshifts=1200, process_range=range(1,9)):
"""Benchmark for diffrent numbers of processes"""
for processes in process_range:
main(ncells=ncells, nshifts=nshifts, processes=processes)
def plot_result(ncells=400, nshifts=1200, processes=4):
"""Also show the plot"""
main(ncells=ncells, nshifts=nshifts, processes=processes, show_plot=True)
if __name__ == '__main__':
benchmark()
plot_result()