# *-* coding: utf-8 *-*
"""Manage measurement configurations measurements.
"""
import itertools
import io
import numpy as np
import pandas as pd
import reda.utils.mpl
from reda.utils import opt_import
plt, mpl = reda.utils.mpl.setup()
class ConfigManager(object):
"""The class`ConfigManager` manages four-point measurement configurations.
"""
def __init__(self, nr_of_electrodes=None):
# store the configs as a Nx4 numpy array
self.configs = None
# each measurement can store additional data here
self.metadata = {}
# number of electrodes
self.nr_electrodes = nr_of_electrodes
[docs]
def abmn_to_dataframe(self):
"""Return a pandas.DataFrame containing the measurement configurations
Returns
-------
abmn_df : pandas.DataFrame
Configurations in a DataFrame (columns: a,b,m,n)
"""
abmn_df = pd.DataFrame(self.configs, columns=['a', 'b', 'm', 'n'])
return abmn_df
[docs]
def _get_next_index(self):
"""
"""
self.meas_counter += 1
return self.meas_counter
[docs]
def clear_configs(self):
"""Remove all configs. This implies deleting all measurements.
"""
del (self.configs)
self.configs = None
@property
def nr_of_configs(self):
"""Return number of configurations
Returns
-------
nr_of_configs: int
number of configurations stored in this instance
"""
if self.configs is None:
return 0
else:
return self.configs.shape[0]
[docs]
def _crmod_to_abmn(self, configs):
r"""convert crmod-style configurations to a Nx4 array
CRMod-style configurations merge A and B, and M and N, electrode
numbers into one large integer each:
.. math ::
AB = A \cdot 10^4 + B
MN = M \cdot 10^4 + N
Parameters
----------
configs: numpy.ndarray
Nx2 array holding the configurations to convert
Examples
--------
>>> import numpy as np
>>> from reda.configs.configManager import ConfigManager
>>> config = ConfigManager(nr_of_electrodes=5)
>>> crmod_configs = np.array((
... (10002, 40003),
... (10010, 30004),
... ))
>>> abmn = config._crmod_to_abmn(crmod_configs)
>>> print(abmn) # doctest: +NORMALIZE_WHITESPACE
[[ 1 2 4 3]
[ 1 10 3 4]]
"""
A = np.floor(configs[:, 0] / 1e4).astype(int)
B = (configs[:, 0] % 1e4).astype(int)
M = np.floor(configs[:, 1] / 1e4).astype(int)
N = (configs[:, 1] % 1e4).astype(int)
ABMN = np.hstack((
A[:, np.newaxis],
B[:, np.newaxis],
M[:, np.newaxis],
N[:, np.newaxis]
)).astype(int)
return ABMN
[docs]
def load_configs(self, filename):
"""Load configurations from a file with four columns: a b m n
"""
configs = np.loadtxt(filename)
self.add_to_configs(configs)
[docs]
def load_injections_from_mcf(self, filename):
"""Load injections from a mcf-file used by the sEIT-systems from FZJ;
Injections in the file must be formatted like the following:
SE 001 002
SE 002 001
.
.
.
Parameters
----------
filename: string
absolute or relative path to a mcf-file
Returns
-------
Nx2 numpy array with current injections
"""
injections = []
# load all injections
with open(filename, encoding="latin-1") as mcf:
for line in mcf:
if line[:2] == "SE":
injections.append((line[3:6], line[7:10]))
# convert to array
injections = np.asarray(injections, dtype=int)
# remove double entries because only one current injection
# is needed for configs
mask = np.less(injections[:, 0], injections[:, 1])
injections = injections[mask]
return(injections)
[docs]
def load_crmod_config(self, filename):
"""Load a CRMod configuration file
Parameters
----------
filename: string|io.BytesIO
absolute or relative path to a crmod config.dat file. Can also be a
BytesIO object
"""
if isinstance(filename, io.BytesIO):
fid = filename
else:
fid = open(filename, 'r')
nr_of_configs = int(fid.readline().strip())
configs = np.loadtxt(fid)
if not isinstance(filename, io.BytesIO):
fid.close()
print('loaded configs:', configs.shape)
if nr_of_configs != configs.shape[0]:
raise Exception(
'indicated number of measurements does not equal ' +
'to actual number of measurements')
ABMN = self._crmod_to_abmn(configs[:, 0:2])
self.configs = ABMN
[docs]
def load_crmod_or_4c_configs(self, filename):
"""Load configurations either from a CRMod measurement file, or from a
four-column file. Assume 1-indexed data (start with electrode 1).
Parameters
----------
filename : str
Path to file that shall be imported
Returns
-------
None
"""
# The first line decides
first_line = np.genfromtxt(filename, max_rows=1)
if first_line.size == 4:
self.load_configs(filename)
elif first_line.size == 1:
self.load_crmod_config(filename)
else:
raise Exception('File format not recognized')
# def load_crmod_volt(self, filename):
# """Load a CRMod measurement file (commonly called volt.dat)
# Parameters
# ----------
# filename: string
# path to filename
# Returns
# -------
# list
# list of measurement ids
# """
# with open(filename, 'r') as fid:
# nr_of_configs = int(fid.readline().strip())
# measurements = np.loadtxt(fid)
# if nr_of_configs != measurements.shape[0]:
# raise Exception(
# 'indicated number of measurements does not equal ' +
# 'to actual number of measurements')
# ABMN = self._crmod_to_abmn(measurements[:, 0:2])
# if self.configs is None:
# self.configs = ABMN
# else:
# # check that configs match
# if not np.all(ABMN == self.configs):
# raise Exception(
# 'previously stored configurations do not match new ' +
# 'configurations')
# # add data
# cid_mag = self.add_measurements(measurements[:, 2])
# cid_pha = self.add_measurements(measurements[:, 3])
# return [cid_mag, cid_pha]
[docs]
def _get_crmod_abmn(self):
"""return a Nx2 array with the measurement configurations formatted
CRTomo style
"""
ABMN = np.vstack((
self.configs[:, 0] * 1e4 + self.configs[:, 1],
self.configs[:, 2] * 1e4 + self.configs[:, 3],
)).T.astype(int)
return ABMN
[docs]
def write_crmod_volt(self, filename, mid):
"""Write the measurements to the output file in the volt.dat file
format that can be read by CRTomo.
Data is written in utf-8 encoding.
Parameters
----------
filename: string|io.BytesIO
output filename
mid: int or [int, int]
measurement ids of magnitude and phase measurements. If only one ID
is given, then the phase column is filled with zeros
"""
ABMN = self._get_crmod_abmn()
if isinstance(mid, (list, tuple)):
mag_data = self.measurements[mid[0]]
pha_data = self.measurements[mid[1]]
else:
mag_data = self.measurements[mid]
pha_data = np.zeros(mag_data.shape)
all_data = np.hstack(
(ABMN, mag_data[:, np.newaxis], pha_data[:, np.newaxis])
)
if isinstance(filename, io.BytesIO):
fid = filename
else:
fid = open(filename, 'wb')
fid.write(
bytes(
'{0}\n'.format(ABMN.shape[0]),
'utf-8',
)
)
np.savetxt(fid, all_data, fmt='%i %i %f %f')
if not isinstance(filename, io.BytesIO):
fid.close()
[docs]
def write_crmod_volt_with_individual_errors(
self, filename, data_mids, error_mids, norm_mag=1, norm_pha=1,
):
"""Write the measurements and individual errors to the output file in
the volt.dat file format that can be read by CRTomo.
Data is written in utf-8 encoding.
Parameters
----------
filename: string|io.BytesIO
output filename
data_mids: [int, int]
measurement ids of magnitude and phase measurements
error_mids: [int, int]
ids of magnitude and phase error estimates
norm_mag: float
Normalization factor for magnitude errors
norm_pha: float
Normalization factor for magnitude errors
"""
assert len(data_mids) == 2, "data_mids must be length 2"
assert len(error_mids) == 2, "error_mids must be length 2"
ABMN = self._get_crmod_abmn()
if isinstance(data_mids, (list, tuple)):
mag_data = self.measurements[data_mids[0]]
pha_data = self.measurements[data_mids[1]]
mag_error = self.measurements[error_mids[0]]
pha_error = self.measurements[error_mids[1]]
else:
raise Exception(
'Individual errors for mag-only not implemented yet')
mag_data = self.measurements[data_mids]
pha_data = np.zeros(data_mids.shape)
all_data = np.hstack(
(
ABMN,
mag_data[:, np.newaxis],
pha_data[:, np.newaxis],
mag_error[:, np.newaxis],
pha_error[:, np.newaxis],
)
)
if isinstance(filename, io.BytesIO):
fid = filename
else:
fid = open(filename, 'wb')
fid.write(
bytes(
'{0} T\n'.format(ABMN.shape[0]),
'utf-8',
)
)
np.savetxt(fid, all_data, fmt='%i %i %f %f %f %f')
fid.write(
bytes(
'{} {}\n'.format(norm_mag, norm_pha),
'utf-8',
)
)
if not isinstance(filename, io.BytesIO):
fid.close()
[docs]
def write_crmod_config(self, filename):
"""Write the configurations to a configuration file in the CRMod format
All configurations are merged into one previor to writing to file
Parameters
----------
filename: string
absolute or relative path to output filename (usually config.dat)
"""
ABMN = self._get_crmod_abmn()
with open(filename, 'wb') as fid:
fid.write(bytes(
'{0}\n'.format(ABMN.shape[0]),
'utf-8',
))
np.savetxt(fid, ABMN.astype(int), fmt='%i %i')
[docs]
def gen_dipole_dipole(self,
skipc,
skipv=None,
stepc=1,
stepv=1,
nr_voltage_dipoles=10,
before_current=False,
allow_crossings=False,
start_skip=0,
N=None):
"""Generate dipole-dipole configurations
Parameters
----------
skipc: int
number of electrode positions that are skipped between electrodes
of a given dipole
skipv: int
steplength between subsequent voltage dipoles. A steplength of 0
will produce increments by one, i.e., 3-4, 4-5, 5-6 ...
stepc: int
steplength between subsequent current dipoles. A steplength of 0
will produce increments by one, i.e., 3-4, 4-5, 5-6 ...
stepv: int
steplength between subsequent voltage dipoles. A steplength of 0
will produce increments by one, i.e., 3-4, 4-5, 5-6 ...
nr_voltage_dipoles: int
the number of voltage dipoles to generate for each current
injection dipole
before_current: bool, optional
if set to True, also generate voltage dipoles in front of current
dipoles.
allow_crossings : bool, optional
Allow potential dipoles to cross current injection dipoles. Note
that any quadpole generated by crossing the current electrodes is
NOT counted into nr_voltage_dipoles.
start_skip: int, optional
how many electrode to skip before/after the first/second current
electrode.
N: int, optional
number of electrodes, must be given if not already known by the
config instance
Examples
--------
>>> from reda.configs.configManager import ConfigManager
>>> config = ConfigManager(nr_of_electrodes=10)
>>> config.gen_dipole_dipole(skipc=2)
array([[ 1, 4, 5, 8],
[ 1, 4, 6, 9],
[ 1, 4, 7, 10],
[ 2, 5, 6, 9],
[ 2, 5, 7, 10],
[ 3, 6, 7, 10]])
>>> from reda.configs.configManager import ConfigManager
>>> config = ConfigManager(nr_of_electrodes=10)
>>> config.gen_dipole_dipole(
... skipc=2, before_current=True, allow_crossings=True)
array([[ 1, 4, 2, 5],
[ 1, 4, 3, 6],
[ 1, 4, 5, 8],
[ 1, 4, 6, 9],
[ 1, 4, 7, 10],
[ 2, 5, 1, 4],
[ 2, 5, 3, 6],
[ 2, 5, 4, 7],
[ 2, 5, 6, 9],
[ 2, 5, 7, 10],
[ 3, 6, 1, 4],
[ 3, 6, 2, 5],
[ 3, 6, 4, 7],
[ 3, 6, 5, 8],
[ 3, 6, 7, 10]])
>>> from reda.configs.configManager import ConfigManager
>>> config = ConfigManager(nr_of_electrodes=10)
>>> config.gen_dipole_dipole(skipc=1, before_current=True)
array([[ 1, 3, 4, 6],
[ 1, 3, 5, 7],
[ 1, 3, 6, 8],
[ 1, 3, 7, 9],
[ 1, 3, 8, 10],
[ 2, 4, 5, 7],
[ 2, 4, 6, 8],
[ 2, 4, 7, 9],
[ 2, 4, 8, 10],
[ 3, 5, 6, 8],
[ 3, 5, 7, 9],
[ 3, 5, 8, 10],
[ 4, 6, 1, 3],
[ 4, 6, 7, 9],
[ 4, 6, 8, 10],
[ 5, 7, 2, 4],
[ 5, 7, 1, 3],
[ 5, 7, 8, 10]])
"""
if N is None and self.nr_electrodes is None:
raise Exception('You must provide the number of electrodes')
elif N is None:
N = self.nr_electrodes
# by default, current voltage dipoles have the same size
if skipv is None:
skipv = skipc
configs = []
# current dipoles
for a in range(0, N - skipv - skipc - 3, stepc):
b = a + skipc + 1
nr = 0
# potential dipoles before current injection
if before_current:
for n in range(a - start_skip - 1, -1, -stepv):
nr += 1
if nr > nr_voltage_dipoles:
continue
m = n - skipv - 1
if m < 0:
continue
quadpole = np.array((a, b, m, n)) + 1
configs.append(quadpole)
if allow_crossings:
# backward crossings
electrodes_between_injections = list(range(a + 1, b))
for n in electrodes_between_injections:
m = n - skipv - 1
if m < 0:
continue
quadpole = np.array((a, b, m, n)) + 1
configs.append(quadpole)
if allow_crossings:
# forward crossings
electrodes_between_injections = list(range(a + 1, b))
for m in electrodes_between_injections:
n = m + skipv + 1
quadpole = np.array((a, b, m, n)) + 1
configs.append(quadpole)
# potential dipoles after current injection
nr = 0
for m in range(b + start_skip + 1, N - skipv - 1, stepv):
nr += 1
if nr > nr_voltage_dipoles:
continue
n = m + skipv + 1
quadpole = np.array((a, b, m, n)) + 1
configs.append(quadpole)
configs = np.array(configs)
# now add to the instance
if self.configs is None:
self.configs = configs
else:
self.configs = np.vstack((self.configs, configs))
return configs
[docs]
def gen_inner_gradients(self, injections, skips=None):
"""
Given a set of current injections, generate voltage dipoles between the
current electrodes
Examples
--------
>>> import reda
>>> cfg = reda.ConfigManager(nr_of_electrodes=10)
>>> cfg.gen_inner_gradients([1, 10], skips=[2, ])
array([[ 1, 10, 2, 5],
[ 1, 10, 3, 6],
[ 1, 10, 4, 7],
[ 1, 10, 5, 8],
[ 1, 10, 6, 9]])
Parameters
----------
injections : Nx2 numpy.ndarray
Current injections
skips : None or iterable
The skips to generate. None will generate all possible skips
Returns
-------
configs : Nx4 numpy.ndarray
The generated configurations
"""
configs_raw = []
for ab in np.atleast_2d(injections):
a = min(ab)
b = max(ab)
if b - a < 3:
continue
max_skip = b - a - 2
if skips is None:
use_skips = range(0, max_skip)
else:
use_skips = [x for x in skips if x <= max_skip]
for skip in use_skips:
for m in range(a + 1, b - 1 - skip, 1):
n = m + skip + 1
configs_raw.append(
(ab[0], ab[1], m, n)
)
configs = np.array(configs_raw)
self.add_to_configs(configs)
return configs
[docs]
def gen_gradient(self, skip=0, step=1, vskip=0, vstep=1):
"""Generate gradient measurements
Parameters
----------
skip: int
distance between current electrodes
step: int
steplength between subsequent current dipoles
vskip: int
distance between voltage electrodes
vstep: int
steplength between subsequent voltage dipoles
"""
N = self.nr_electrodes
quadpoles = []
for a in range(1, N - skip, step):
b = a + skip + 1
for m in range(a + 1, b - vskip - 1, vstep):
n = m + vskip + 1
quadpoles.append((a, b, m, n))
configs = np.array(quadpoles)
if configs.size == 0:
return None
self.add_to_configs(configs)
return configs
[docs]
def gen_all_voltages_for_injections(self, injections_raw):
r"""For a given set of current injections AB, generate all possible
unique potential measurements.
After Noel and Xu, 1991, for N electrodes, the number of possible
voltage dipoles for a given current dipole is :math:`(N - 2)(N - 3) /
2`. This includes normal and reciprocal measurements.
If current dipoles are generated with
ConfigManager.gen_all_current_dipoles(), then :math:`N \cdot (N - 1) /
2` current dipoles are generated. Thus, this function will produce
:math:`(N - 1)(N - 2)(N - 3) / 4` four-point configurations ABMN, half
of which are reciprocals (Noel and Xu, 1991).
All generated measurements are added to the instance.
Use ConfigManager.split_into_normal_and_reciprocal() to split the
configurations into normal and reciprocal measurements.
Parameters
----------
injections : numpy.ndarray
Kx2 array holding K current injection dipoles A-B
Returns
-------
configs : numpy.ndarray
Nax4 array holding all possible measurement configurations
"""
if isinstance(injections_raw, (list, tuple)):
injections = np.atleast_2d(np.array(injections_raw)).astype(int)
else:
injections = np.atleast_2d(injections_raw).astype(int)
N = self.nr_electrodes
all_quadpoles = []
for idipole in injections:
# sort current electrodes and convert to array indices
Ipoles = np.sort(idipole) - 1
# voltage electrodes
velecs = list(range(1, N + 1))
# remove current electrodes
del (velecs[Ipoles[1]])
del (velecs[Ipoles[0]])
# permutate remaining
voltages = itertools.permutations(velecs, 2)
for voltage in voltages:
all_quadpoles.append((idipole[0], idipole[1], voltage[0],
voltage[1]))
configs_unsorted = np.array(all_quadpoles)
# sort AB and MN
configs_sorted = np.hstack((
np.sort(configs_unsorted[:, 0:2], axis=1),
np.sort(configs_unsorted[:, 2:4], axis=1),
))
configs = self.remove_duplicates(configs_sorted)
self.add_to_configs(configs)
self.remove_duplicates()
return configs
[docs]
def gen_all_current_dipoles(self):
r"""Generate all possible current dipoles for the given number of
electrodes (self.nr_electrodes). Duplicates are removed in the process.
After Noel and Xu, 1991, for N electrodes, the number of possible
unique configurations is :math:`N \cdot (N - 1) / 2`. This excludes
duplicates in the form of switches current/voltages electrodes, as well
as reciprocal measurements.
Returns
-------
configs : Nx2 numpy.ndarray
all possible current dipoles A-B
"""
N = self.nr_electrodes
celecs = list(range(1, N + 1))
AB_list = itertools.permutations(celecs, 2)
AB = np.array([ab for ab in AB_list])
AB.sort(axis=1)
# now we need to filter duplicates
AB = np.unique(AB.view(AB.dtype.descr * 2)).view(AB.dtype).reshape(
-1, 2)
return AB
[docs]
def remove_duplicates(self, configs=None):
"""remove duplicate entries from 4-point configurations. If no
configurations are provided, then use self.configs. Unique
configurations are only returned if configs is not None.
Parameters
----------
configs: Nx4 numpy.ndarray, optional
remove duplicates from these configurations instead from
self.configs.
Returns
-------
configs_unique: Kx4 numpy.ndarray
unique configurations. Only returned if configs is not None
"""
if configs is None:
c = self.configs
else:
c = configs
struct = c.view(c.dtype.descr * 4)
configs_unique = np.unique(struct).view(c.dtype).reshape(-1, 4)
if configs is None:
self.configs = configs_unique
else:
return configs_unique
[docs]
def gen_schlumberger(self, M, N, a=None):
"""generate one Schlumberger sounding configuration, that is, one set
of configurations for one potential dipole MN.
Parameters
----------
M: int
electrode number for the first potential electrode
N: int
electrode number for the second potential electrode
a: int, optional
stepping between subsequent voltage electrodes. If not set,
determine it as a = abs(M - N)
Returns
-------
configs: Kx4 numpy.ndarray
array holding the configurations
Examples
--------
import reda.configs.configManager as CRconfig
config = CRconfig.ConfigManager(nr_of_electrodes=40)
config.gen_schlumberger(M=20, N=21)
"""
if a is None:
a = np.abs(M - N)
nr_of_steps_left = int(min(M, N) - 1 / a)
nr_of_steps_right = int((self.nr_electrodes - max(M, N)) / a)
configs = []
for i in range(0, min(nr_of_steps_left, nr_of_steps_right)):
A = min(M, N) - (i + 1) * a
B = max(M, N) + (i + 1) * a
configs.append((A, B, M, N))
configs = np.array(configs)
self.add_to_configs(configs)
return configs
[docs]
def gen_wenner(self, a):
"""Generate Wenner measurement configurations.
Parameters
----------
a: int
distance (in electrodes) between subsequent electrodes of each
four-point configuration.
Returns
-------
configs: Kx4 numpy.ndarray
array holding the configurations
"""
configs = []
for i in range(1, self.nr_electrodes - 3 * a + 1):
configs.append((i, i + 3 * a, i + 1 * a, i + 2 * a), )
configs = np.array(configs)
self.add_to_configs(configs)
return configs
[docs]
def add_to_configs(self, configs):
"""Add one or more measurement configurations to the stored
configurations
Parameters
----------
configs : list or numpy.ndarray
list or array of configurations
Returns
-------
configs : Kx4 numpy.ndarray
array holding all configurations of this instance
"""
if len(configs) == 0:
return None
if self.configs is None:
self.configs = np.atleast_2d(configs)
else:
configs = np.atleast_2d(configs)
self.configs = np.vstack((self.configs, configs))
# make sure we store the configs as ints
self.configs = self.configs.astype(int)
return self.configs
[docs]
def split_into_normal_and_reciprocal(self, pad=False,
return_indices=False):
"""Split the stored configurations into normal and reciprocal
measurements
** *Rule 1: the normal configuration contains the smallest electrode
number of the four involved electrodes in the current dipole* **
Parameters
----------
pad: bool, optional
if True, add numpy.nan values to the reciprocals for non-existent
measuremnts
return_indices: bool, optional
if True, also return the indices of normal and reciprocal
measurments. This can be used to extract corresponding
measurements.
Returns
-------
normal: numpy.ndarray
Nnx4 array. If pad is True, then Nn == N (total number of
unique measurements). Otherwise Nn is the number of normal
measurements.
reciprocal: numpy.ndarray
Nrx4 array. If pad is True, then Nr == N (total number of
unique measurements). Otherwise Nr is the number of reciprocal
measurements.
nor_indices: numpy.ndarray, optional
Nnx1 array containing the indices of normal measurements. Only
returned if return_indices is True.
rec_indices: numpy.ndarray, optional
Nrx1 array containing the indices of normal measurements. Only
returned if return_indices is True.
"""
# for simplicity, we create an array where AB and MN are sorted
configs = np.hstack((np.sort(self.configs[:, 0:2], axis=1),
np.sort(self.configs[:, 2:4], axis=1)))
ab_min = configs[:, 0]
mn_min = configs[:, 2]
# rule 1
indices_normal = np.where(ab_min < mn_min)[0]
# now look for reciprocals
indices_used = []
normal = []
normal_indices = []
reciprocal_indices = []
reciprocal = []
duplicates = []
for index in indices_normal:
indices_used.append(index)
normal.append(self.configs[index, :])
normal_indices.append(index)
# look for reciprocal configuration
index_rec = np.where(
# A == M, B == N, M == A, N == B
(configs[:, 0] == configs[index, 2]) &
(configs[:, 1] == configs[index, 3]) &
(configs[:, 2] == configs[index, 0]) &
(configs[:, 3] == configs[index, 1]))[0]
if len(index_rec) == 0 and pad:
reciprocal.append(np.ones(4) * np.nan)
elif len(index_rec) == 1:
reciprocal.append(self.configs[index_rec[0], :])
indices_used.append(index_rec[0])
reciprocal_indices.append(index_rec[0])
elif len(index_rec > 1):
# take the first one
reciprocal.append(self.configs[index_rec[0], :])
reciprocal_indices.append(index_rec[0])
duplicates += list(index_rec[1:])
indices_used += list(index_rec)
# now determine all reciprocal-only parameters
set_all_indices = set(list(range(0, configs.shape[0])))
set_used_indices = set(indices_used)
reciprocal_only_indices = set_all_indices - set_used_indices
for index in reciprocal_only_indices:
if pad:
normal.append(np.ones(4) * np.nan)
reciprocal.append(self.configs[index, :])
normals = np.array(normal)
reciprocals = np.array(reciprocal)
if return_indices:
return normals, reciprocals, normal_indices, reciprocal_indices
else:
return normals, reciprocals
[docs]
def gen_reciprocals(self, append=False):
""" Generate reciprocal configurations, sort by AB, and optionally
append to configurations.
Parameters
----------
append : bool
Append reciprocals to configs (the default is False).
Examples
--------
>>> cfgs = ConfigManager(nr_of_electrodes=5)
>>> nor = cfgs.gen_dipole_dipole(skipc=0)
>>> rec = cfgs.gen_reciprocals(append=True)
>>> print(cfgs.configs)
[[1 2 3 4]
[1 2 4 5]
[2 3 4 5]
[3 4 1 2]
[4 5 1 2]
[4 5 2 3]]
"""
# Switch AB and MN
reciprocals = self.configs.copy()[:, ::-1]
reciprocals[:, 0:2] = np.sort(reciprocals[:, 0:2], axis=1)
reciprocals[:, 2:4] = np.sort(reciprocals[:, 2:4], axis=1)
# # Sort by current dipoles
ind = np.lexsort((reciprocals[:, 3], reciprocals[:, 2],
reciprocals[:, 1], reciprocals[:, 0]))
reciprocals = reciprocals[ind]
if append:
self.configs = np.vstack((self.configs, reciprocals))
return reciprocals
[docs]
def gen_configs_permutate(self,
injections_raw,
only_same_dipole_length=False,
ignore_crossed_dipoles=False,
silent=False):
""" Create measurement configurations out of a pool of current
injections. Use only the provided dipoles for potential dipole
selection. This means that we have always reciprocal measurements.
Remove quadpoles where electrodes are used both as current and voltage
dipoles.
Parameters
----------
injections_raw : Nx2 array
current injections
only_same_dipole_length : bool, optional
if True, only generate permutations for the same dipole length
ignore_crossed_dipoles : bool, optional
If True, potential dipoles will be ignored that lie between current
dipoles, e.g. 1-4 3-5. In this case it is possible to not have
full normal-reciprocal coverage.
silent: bool, optional
if True, do not print information on ignored configs (default:
False)
Returns
-------
configs : Nx4 array
quadrupoles generated out of the current injections
"""
injections = np.atleast_2d(injections_raw).astype(int)
N = injections.shape[0]
measurements = []
for injection in range(0, N):
dipole_length = np.abs(injections[injection][1] -
injections[injection][0])
# select all dipole EXCEPT for the injection dipole
for i in set(range(0, N)) - set([injection]):
test_dipole_length = np.abs(injections[i, :][1] -
injections[i, :][0])
if (only_same_dipole_length
and test_dipole_length != dipole_length): # noqa: W503
continue
quadpole = np.array(
[injections[injection, :], injections[i, :]]).flatten()
if ignore_crossed_dipoles is True:
# check if we need to ignore this dipole
# Note: this could be wrong if electrode number are not
# ascending!
if (quadpole[2] > quadpole[0]
and quadpole[2] < quadpole[1]): # noqa: W503
if not silent:
print('A - ignoring', quadpole)
elif (quadpole[3] > quadpole[0]
and quadpole[3] < quadpole[1]): # noqa: W503
if not silent:
print('B - ignoring', quadpole)
else:
measurements.append(quadpole)
else:
# add very quadpole
measurements.append(quadpole)
# check and remove double use of electrodes
filtered = []
for quadpole in measurements:
if (not set(quadpole[0:2]).isdisjoint(set(quadpole[2:4]))):
if not silent:
print('Ignoring quadrupole because of ',
'repeated electrode use:', quadpole)
else:
filtered.append(quadpole)
self.add_to_configs(filtered)
return np.array(filtered)
[docs]
def remove_max_dipole_sep(self, maxsep=10):
"""Remove configurations with dipole separations higher than `maxsep`.
Parameters
----------
maxsep : int
Maximum separation between both dipoles (the default is 10).
"""
sep = np.abs(self.configs[:, 1] - self.configs[:, 2])
self.configs = self.configs[sep <= maxsep]
[docs]
def write_configs(self, filename):
"""Write configs to file in four columns
"""
np.savetxt(filename, self.configs, fmt='%i %i %i %i')
[docs]
def to_pg_scheme(self, container=None, positions=None):
"""Convert the configuration to a pygimli measurement scheme
Parameters
----------
container: reda.containers.ERT.ERT
an ERT data container (we take the electrode positions from here)
positions = None
Returns
-------
data: pybert.DataContainerERT
Examples
--------
import numpy as np
from reda.configs.configManager import ConfigManager
configs = ConfigManager(nr_of_electrodes=48)
new_configs = configs.gen_dipole_dipole(skipc=2)
x = np.arange(0, 48, 1)
z = np.ones(48) * -1
y = np.zeros(48)
xyz = np.vstack((x, y, z)).T
scheme = configs.to_pg_scheme(positions=xyz)
print(scheme)
"""
if container is None and positions is None:
raise Exception('electrode positions are required for BERT export')
if container is not None and container.electrodes is None:
raise Exception('container does not contain electrode positions')
if container is not None and positions is not None:
raise Exception(
'only one of container OR positions must be provided')
if container is not None:
elec_positions = container.electrodes.values
elif positions is not None:
elec_positions = positions
opt_import("pybert", requiredFor="")
import pybert
# Initialize BERT DataContainer
data = pybert.DataContainerERT()
# Define electrodes (48 electrodes spaced by 0.5 m)
for nr, (x, y, z) in enumerate(elec_positions):
data.createSensor((x, y, z))
# Define number of measurements
data.resize(self.configs.shape[0])
for index, token in enumerate("abmn"):
data.set(token, self.configs[:, index].tolist())
# account for zero indexing
for token in "abmn":
data.set(token, data(token) - 1)
# np.vstack([data.get(x).array() for x in ("abmn")]).T
return data
[docs]
def to_iris_syscal(self, filename, spacing=1):
"""Export to IRIS Instrument configuration file
Parameters
----------
filename : string
Path to output filename
"""
with open(filename, 'w') as fid:
# fprintf(fod, '#\t X\t Y\t Z\n');
fid.write('#\t X\t Y\t Z\n')
# fprintf(fod, '%d\t %.1f\t %d\t %d\n', D');
# loop over electrodes and assign increasing x-positions
# TODO: use proper electrode positions, if available
for nr in range(0, self.configs.max()):
fid.write('{} {} 0 0\n'.format(nr + 1, nr * spacing))
# fprintf(fod, '#\t A\t B\t M\t N\n');
fid.write('#\t A\t B\t M\t N\n')
# fprintf(fod, '%d\t %d\t %d\t %d\t %d\n', C');
for nr, config in enumerate(self.configs):
fid.write('{} {} {} {} {}\n'.format(nr + 1, *config))
[docs]
def get_unique_current_injections(self):
"""Return all unique current injection electrode pairs
Examples
--------
>>> from reda.configs.configManager import ConfigManager
>>> cfg = ConfigManager()
>>> cfg.nr_electrodes
>>> cfg.nr_electrodes = 10
>>> cfg.add_to_configs((
... (1, 2, 3, 4),
... (1, 2, 5, 6),
... (7, 8, 9, 10)
... ))
array([[ 1, 2, 3, 4],
[ 1, 2, 5, 6],
[ 7, 8, 9, 10]])
>>> cfg.get_unique_current_injections()
array([[1, 2],
[7, 8]])
cfg = ConfigManager()
cfg.get_unique_current_injections()
Returns
-------
unique_injections : Nx2 numpy.ndarray
Current injections
"""
if self.configs is None:
return None
return np.unique(self.configs[:, 0:2], axis=0)
@property
def unique_injections(self):
return self.get_unique_current_injections()
[docs]
def remove_reciprocals(self, configs=None):
"""
"""
if configs is None:
configs_to_use = self.configs
else:
configs_to_use = configs
indices_to_delete = []
configs_ar = np.array(configs_to_use)
configs_ar[:, 0:2] = np.sort(configs_ar[:, 0:2], axis=1)
configs_ar[:, 2:4] = np.sort(configs_ar[:, 2:4], axis=1)
for index in reversed(range(configs_ar.shape[0])):
# print('Checking', index, configs_ar[index, :])
# normalized reciprocal
ar = configs_ar[index, 2]
br = configs_ar[index, 3]
mr = configs_ar[index, 0]
nr = configs_ar[index, 1]
# we only search in lower indices
subdata = configs_ar[0:index, :]
search = np.where(
(
(subdata[:, 0] == ar) &
(subdata[:, 1] == br) &
(subdata[:, 2] == mr) &
(subdata[:, 3] == nr)
)
)[0]
# print('search', search)
if len(search) > 0:
indices_to_delete.append(index)
# print(indices_to_delete)
for index in reversed(np.sort(indices_to_delete)):
configs_ar = np.delete(configs_ar, index, axis=0)
if configs is None:
# write back
self.configs = configs_ar
return configs_ar