# *-* coding: utf-8 *-*
"""Manage parameter sets for a given grid. A parameter set is a set of values
that correspond to the elements of the grid. Usually this is a resistivity or
phase distribution, but can also be a cumulated sensitivity distribution.
"""
import os
from collections.abc import Iterable
from scipy.stats import multivariate_normal
import scipy.interpolate as spi
import numpy as np
import shapely.geometry as shapgeo
[docs]
class ParMan(object):
"""manage one or more parameter sets for a given
:class:`crtomo.grid.crt_grid` object
"""
[docs]
def __init__(self, grid_obj):
"""
Parameters
----------
grid_obj: crtomo.grid.crt_grid
The FE grid that the parameter sets refer to
"""
# check if grid is already initialized
if grid_obj.grid is None:
raise Exception('Grid object is not initialized!')
self.grid = grid_obj
# we store the parameter sets in here
self.parsets = {}
self.metadata = {}
# we assign indices to each data set stored in the manager. This index
# should be unique over the life time of each instance. Therefore we
# increase the counter for each added data set. This way we also
# prevent conflicts when data sets are removed.
self.index = -1
[docs]
def _get_next_index(self):
self.index += 1
return self.index
[docs]
def reset(self):
"""Resets the ParMan instance. This process deletes all data and
metadata, and resets the index variable
"""
self.index = -1
del (self.parsets)
self.parsets = {}
del (self.metadata)
self.metadata = {}
[docs]
def add_data(self, data, metadata=None):
"""Add data to the parameter set
Parameters
----------
data : numpy.ndarray
one or more parameter sets. It must either be 1D or 2D, with the
first dimension the number of parameter sets (K), and the second
the number of elements (N): K x N
metadata : object, optional
the provided object will be stored in in the metadata dict and can
be received with the ID that is returned. If multiple (K) datasets
are added at ones, provide a list of objects with len K.
Returns
-------
int, ID
ID which can be used to access the parameter set
Examples
--------
>>> # suppose that grid is a fully initialized grid oject with 100
# elements
parman = ParMan(grid)
#
one_data_set = np.ones(100)
cid = parman.add_data(one_data_set)
print(parman.parsets[cid])
two_data_sets = np.ones((2, 100))
cids = parman.add_data(two_data_sets)
print(cids)
[0, ]
[1, 2]
"""
subdata = np.atleast_2d(data)
# we try to accommodate transposed input
if subdata.shape[1] != self.grid.nr_of_elements:
if subdata.shape[0] == self.grid.nr_of_elements:
subdata = subdata.T
else:
raise Exception(
'Number of values does not match the number of '
'elements in the grid'
)
# now make sure that metadata can be zipped with the subdata
K = subdata.shape[0]
if metadata is not None:
if K > 1:
if (not isinstance(
metadata, (list, tuple)) or len(metadata) != K):
raise Exception('metadata does not fit the provided data')
else:
# K == 1
metadata = [metadata, ]
if metadata is None:
metadata = [None for i in range(0, K)]
return_ids = []
for dataset, meta in zip(subdata, metadata):
cid = self._get_next_index()
self.parsets[cid] = dataset
self.metadata[cid] = meta
return_ids.append(cid)
if len(return_ids) == 1:
return return_ids[0]
else:
return return_ids
[docs]
def load_from_rho_file(self, filename):
"""Convenience function that loads two parameter sets from a rho.dat
file, as used by CRMod for forward resistivity/phase models.
Parameters
----------
filename: string, file path
filename of rho.dat file
Returns
-------
cid_mag: int
ID of magnitude parameter set
cid_phase: int
ID of phase parameter set
"""
data = np.loadtxt(filename, skiprows=1)
cid_mag = self.add_data(data[:, 0])
cid_pha = self.add_data(data[:, 1])
return cid_mag, cid_pha
[docs]
def load_inv_result(self, filename, columns=2, is_log10=False):
"""Load one parameter set from a rho*.mag or rho*.pha file produced by
CRTomo.
Parameters
----------
filename : str, file path
Filename to loaded data from
columns : int or iterable of ints, optional
column(s) to add to the manager. Defaults to 2 (third column).
is_log10 : bool, optional
If set to True, assume values to be in log10 and convert the
imported values to linear before importing
Returns
-------
pid : int or list of ints
ID(s) of parameter set
"""
assert os.path.isfile(filename)
try:
iterator = iter(columns)
except TypeError:
# not iterable
iterator = [columns, ]
data = np.loadtxt(filename, skiprows=1)
pid_list = []
for column in iterator:
if is_log10:
data[:, column] = 10 ** data[:, column]
pid = self.add_data(data[:, column])
pid_list.append(pid)
if len(pid_list) == 1:
return pid_list[0]
else:
return pid_list
[docs]
def load_model_from_file(self, filename, columns=0):
"""Load one parameter set from a file which contains one value per line
No row is skipped.
Parameters
----------
filename : string, file path
Filename to loaded data from
columns : int or iterable of ints, optional
column(s) to add to the manager. Defaults to 0 (first column)
Returns
-------
pid : int or list of ints
ID(s) of parameter set
"""
try:
iterator = iter(columns)
except TypeError:
# not iterable
iterator = [columns, ]
assert os.path.isfile(filename)
data = np.loadtxt(filename).squeeze()
# make sure data is NxC, where C is the number of datasets (1-?)
if len(data.shape) == 1:
data = data[:, np.newaxis]
# check that all columns can be added
assert data.shape[1] > np.max(columns)
pid_list = []
for column in iterator:
pid = self.add_data(data[:, column])
pid_list.append(pid)
if len(pid_list) == 1:
return pid_list[0]
else:
return pid_list
[docs]
def load_from_sens_file(self, filename):
"""Load real and imaginary parts from a sens.dat file generated by
CRMod
Parameters
----------
filename: string
filename of sensitivity file
Returns
-------
nid_re: int
ID of real part of sensitivities
nid_im: int
ID of imaginary part of sensitivities
"""
sens_data = np.loadtxt(filename, skiprows=1)
nid_re = self.add_data(sens_data[:, 2])
nid_im = self.add_data(sens_data[:, 3])
return nid_re, nid_im
[docs]
def save_to_rho_file(self, filename, cid_mag, cid_pha=None):
"""Save one or two parameter sets in the rho.dat forward model format
Parameters
----------
filename: string (file path)
output filename
cid_mag: int
ID of magnitude parameter set
cid_pha: int, optional
ID of phase parameter set. If not set, will be set to zeros.
"""
mag_data = self.parsets[cid_mag]
if cid_pha is None:
pha_data = np.zeros(mag_data.shape)
else:
pha_data = self.parsets[cid_pha]
with open(filename, 'wb') as fid:
fid.write(
bytes(
'{0}\n'.format(self.grid.nr_of_elements),
'utf-8',
)
)
np.savetxt(
fid,
np.vstack((
mag_data,
pha_data,
)).T,
fmt='%f %f'
)
[docs]
def add_empty_dataset(self, value=1):
"""Create an empty data set. Empty means: all elements have the same
value.
Parameters
----------
value: float, optional
which value to assign all element parameters. Default is one.
"""
subdata = np.ones(self.grid.nr_of_elements) * value
pid = self.add_data(subdata)
return pid
[docs]
def _clean_pid(self, pid):
"""if pid is a number, don't do anything. If pid is a list with one
entry, strip the list and return the number. If pid contains more than
one entries, do nothing.
"""
if isinstance(pid, (list, tuple)):
if len(pid) == 1:
return pid[0]
else:
return pid
return pid
[docs]
def modify_area(self, pid, xmin, xmax, zmin, zmax, value):
"""Modify the given dataset in the rectangular area given by the
parameters and assign all parameters inside this area the given value.
Partially contained elements are treated as INSIDE the area, i.e., they
are assigned new values.
Parameters
----------
pid: int
id of the parameter set to modify
xmin: float
smallest x value of the area to modify
xmax: float
largest x value of the area to modify
zmin: float
smallest z value of the area to modify
zmin: float
largest z value of the area to modify
value: float
this value is assigned to all parameters of the area
Examples
--------
>>> import crtomo.tdManager as CRtdm
tdman = CRtdm.tdMan(
elem_file='GRID/elem.dat',
elec_file='GRID/elec.dat',
)
pid = tdman.parman.add_empty_dataset(value=1)
tdman.parman.modify_area(
pid,
xmin=0,
xmax=2,
zmin=-2,
zmin=-0.5,
value=2,
)
fig, ax = tdman.plot.plot_elements_to_ax(pid)
fig.savefig('out.png')
"""
area_polygon = shapgeo.Polygon(
((xmin, zmax), (xmax, zmax), (xmax, zmin), (xmin, zmin))
)
self.modify_polygon(pid, area_polygon, value)
[docs]
def modify_polygon(self, pid, polygon, value):
"""Modify parts of a parameter set by modifying all elements within a
provided :class:`shapely.geometry.Polygon` instance. Hereby, an element
is modified if its center lies within the polygon.
Parameters
----------
pid: int
id of parameter set to vary
polygon: :class:`shapely.geometry.Polygon` instance
polygon that determines the area to modify
value: float
value that is assigned to all elements in the polygon
Examples
--------
>>> import shapely.geometry
polygon = shapely.geometry.Polygon((
(2, 0), (4, -1), (2, -1)
))
tdman.parman.modify_polygon_centroids(pid, polygon, 3)
"""
centroids = self.grid.get_element_centroids()
# now determine elements in area
elements_in_area = []
for nr, centroid in enumerate(centroids):
if polygon.contains(shapgeo.Point(centroid)):
elements_in_area.append(nr)
# change the values
pid_clean = self._clean_pid(pid)
self.parsets[pid_clean][elements_in_area] = value
[docs]
def modify_polygon_old(self, pid, polygon, value):
"""Modify parts of a parameter set by setting all parameters located
in, or touching, the provided :class:`shapely.geometry.Polygon`
instance.
WARNING: This implementation often leads to ragged borders in the
selected polygons. Use the new modify_polygon function!
Parameters
----------
pid: int
id of parameter set to vary
polygon: :class:`shapely.geometry.Polygon` instance
polygon that determines the area to modify
value: float
value that is assigned to all elements in the polygon
Examples
--------
>>> import shapely.geometry
polygon = shapely.geometry.Polygon((
(2, 0), (4, -1), (2, -1)
))
tdman.parman.modify_polygon(pid, polygon, 3)
"""
# create grid polygons
grid_polygons = []
for x, z in zip(self.grid.grid['x'], self.grid.grid['z']):
coords = [(a, b) for a, b in zip(x, z)]
grid_polygons.append(
shapgeo.Polygon(coords)
)
# now determine elements in area
elements_in_area = []
for nr, element in enumerate(grid_polygons):
if polygon.contains(element):
elements_in_area.append(nr)
elif polygon.equals(element):
elements_in_area.append(nr)
elif polygon.crosses(element):
elements_in_area.append(nr)
# only take crossed elements with at least A % overlap
# int_area = polygon.intersect(element).area
# print('overlap: ',
# int_area,
# element.area,
# element.area / int_area
# )
# change the values
pid_clean = self._clean_pid(pid)
self.parsets[pid_clean][elements_in_area] = value
[docs]
def modify_pixels(self, pid, pixels, new_value):
"""Replace the value of a given pixel by a new one.
Parameters
----------
pid : int
Parset id
pixel : int|itearble
Pixel index (zero-indexed)
new_value : float
New value that is assigned to the pixel
"""
if not isinstance(pixels, Iterable):
pixels = [pixels, ]
for pixel in pixels:
self.parsets[pid][pixel] = new_value
[docs]
def create_parset_with_gaussian_anomaly(
self, center, width, max_value, background):
"""
Parameters
----------
center : [float, float]
Center of the anomaly
width : float| [float, float]
The spatial width of the anomaly (x/z directions). Equivalent to
the standard deviations of the underlying distribution.
max_value : float
The maximum value that the anomaly is normalized to
background : float
Background value
Returns
-------
pid : int
ID of newly created parameter set
"""
pid = self.add_empty_dataset(background)
self.add_gaussian_anomaly_to_parset(
pid, center, np.sqrt(width), max_value
)
return pid
[docs]
def add_gaussian_anomaly_to_parset(
self, pid, center, width, max_value):
"""
Parameters
----------
pid : int
Id of parameter set
center : [float, float]
Center of the anomaly
width : float| [float, float]
The spatial width of the anomaly (x/z directions). Equivalent to
the standard deviations of the underlying distribution.
max_value : float
The maximum value that the anomaly is normalized to
"""
xy = self.grid.get_element_centroids()
# generates a 2D Gaussian distribution with
covariances = np.array(width) ** 2
rv = multivariate_normal(
center,
covariances,
)
grid_values = rv.pdf(xy)
if isinstance(width, (list, tuple)):
assert len(width) == 2, "need either one number or two"
std_x = width[0]
std_z = width[1]
else:
std_x = width
std_z = width
# peak value of stretched normal distribution
peak_x = 1 / (np.sqrt(2 * np.pi) * std_x)
peak_z = 1 / (np.sqrt(2 * np.pi) * std_z)
# resulting 2D peak
peak_value = peak_x * peak_z
grid_values /= peak_value
grid_values *= max_value
self.parsets[pid] = self.parsets[pid] + grid_values
[docs]
def add_gaussian_line(
self, pid,
p0,
length,
rotation,
spacing,
anomaly_peak,
anomaly_std,
):
xpos = np.arange(p0[0], length, step=spacing)
zpos = np.ones_like(xpos) * p0[1]
# rotate
xr = xpos * np.cos(
rotation * np.pi / 180) - zpos * np.sin(rotation * np.pi / 180)
zr = zpos * np.cos(
rotation * np.pi / 180) + xpos * np.sin(rotation * np.pi / 180)
for x, z in zip(xr, zr):
self.add_gaussian_anomaly_to_parset(
pid,
[x, z],
width=anomaly_std,
max_value=anomaly_peak,
)
return xr, zr
[docs]
def add_2d_cos_anomaly_line(
self, pid, p0, anomaly_width, anomaly_height, peak_value,
area='only_one_y',
only_one_y_line=True, only_one_x_line=False, whole_mesh=False):
"""Add one line of cos(x)cos(y) anomalies to a given parameter set. The
wavelength refers to half a period, with the maximum of the anomaly
centered on p0=[x0, z0].
Parameters
----------
whole_mesh : bool, optional
If True, then fill the whole mesh with a cos(x)cos(y) pattern.
"""
coords = self.grid.get_element_centroids()
norm = 2 * np.pi
wavelength_x = anomaly_width * 2
wavelength_z = anomaly_height * 2
if area == 'all':
indices = np.arange(0, coords.shape[0]).astype(int)
elif area == 'only_one_y':
boundary_y_min = p0[1] - (anomaly_height / 2)
boundary_y_max = p0[1] + (anomaly_height / 2)
# restrict to to within a rectangular area
indices = np.where(
(coords[:, 1] > boundary_y_min) &
(coords[:, 1] < boundary_y_max)
)
elif area == 'only_one_x':
boundary_x_min = p0[0] - (anomaly_width / 2)
boundary_x_max = p0[0] + (anomaly_width / 2)
# restrict to to within a rectangular area
indices = np.where(
(coords[:, 0] > boundary_x_min) &
(coords[:, 0] < boundary_x_max)
)
anomaly = np.cos(
(coords[indices, 0] - p0[0]) * norm / wavelength_x
) * np.cos(
(coords[indices, 1] - p0[1]) * norm / wavelength_z
) * peak_value
paradd = np.zeros(coords.shape[0])
paradd[indices] += anomaly.squeeze()
self.parsets[pid] += paradd
[docs]
def add_checkerboard_pattern(
self, pid, p0, anomaly_width, anomaly_height, peak_value,
):
"""
Note that if p0 is larger than a few anomaly sizes its possible that
the checkerboard pattern will only be applied to parts of the mesh!
"""
xlims, zlims = self.grid.get_minmax()
x_positions = np.arange(
p0[0],
np.abs(xlims[1] - xlims[0]) + 4 * anomaly_width, anomaly_width * 2
) - (
anomaly_width * 2
) * np.ceil(np.abs(xlims[0] - p0[0]) / (anomaly_width * 2))
depth_sign = np.sign(zlims[0] - zlims[1])
z_positions = np.arange(
p0[1],
(zlims[0] - zlims[1]) +
depth_sign * 4 * anomaly_height, depth_sign * anomaly_height * 2
)
print(z_positions)
offset = (anomaly_height * 2) * np.ceil(np.abs(zlims[0] - p0[1]) / (
anomaly_height * 2))
print(offset)
coords = self.grid.get_element_centroids()
paradd = np.zeros(coords.shape[0])
for pos_x in x_positions:
for pos_z in z_positions:
indices = np.where(
(coords[:, 0] >= pos_x) &
(coords[:, 0] <= pos_x + anomaly_width) &
(coords[:, 1] <= pos_z) &
(coords[:, 1] >= (pos_z - anomaly_height))
)
paradd[indices] = peak_value
self.parsets[pid] += paradd
[docs]
def _com_data_trafo_mode(self, subdata, mode):
if mode == "none":
# just take the absolute values
s = np.abs(subdata)
elif mode == "log10":
# log10 values of absolutes
s = np.abs(np.log10(np.abs(subdata)))
elif mode == "sqrt":
# square roots of absolutes
s = np.sqrt(np.abs(subdata))
return s
[docs]
def center_of_mass_value(self, pid, mode='log10'):
"""Compute the center of mass value of a given parameter set.
"""
centroids = self.grid.get_element_centroids()
# compute the integrative value
subdata = self.parsets[pid]
s = self._com_data_trafo_mode(subdata, mode)
com_x = np.sum(centroids[:, 0] * s) / np.sum(s)
com_y = np.sum(centroids[:, 1] * s) / np.sum(s)
# to check: do we need to norm here?
return [com_x, com_y]
[docs]
def center_of_mass_value_multiple(self, pid_list, mode='none'):
print('center_of_mass_value_multiple')
centroids = self.grid.get_element_centroids()
values = np.vstack(
[self.parsets[index] for index in pid_list]
)
s = self._com_data_trafo_mode(values, mode)
xy = np.dot(s, centroids)
s_sum = np.sum(s, axis=1)
s_sum_stacked = np.tile(s_sum, (2, 1))
s_sum_T = s_sum_stacked.T
com = xy / s_sum_T
return com
# import IPython
# IPython.embed()