Source code for crtomo.tdManager

# *-* coding: utf-8 *-*
"""A digital representation of a tomodir, i.e., a single-frequency inversion.

** Modelling **

* save to one large binary file

    * I suggest to use pytables (tables) for that.
    * need to figure out how to store the individual modules in there, and
      retrieve them later.

* save-to-tomodir

    * including modeling results

* on-demand generation of modeling results

* provide easy plotting of pseudo sections for voltages:

    * magnitude (linear/log)
    * phase (liner)
    * rho (analytic/numerical K factor)
    * sigma'
    * sigma''

  plots should be generated for dipole-dipole only, and all additional
  configurations that we can find reliable z-estimators for.

  Provide corresponding filter functions, or use reda for that (do we want to
  depend on reda? I think it's ok to do this).

  In case only a subset of data is plotted, note the number of missing values
  in the plot!

  We could also provide means to generate the (x/z) coordinates by means of
  averaging over sensitivity distributions (see old script for that).

* provide functionality to check voltages against sensitivity sums

* plot sensitivities

    * perhaps provide different transformation functions, for example the one
      from Johannes Kenkel

* plot potential distributions

    * also plot current lines via streamlines?
    * can we generate current densities? not sure how to mix element data
      (sigma) and potential data (nodes) in j = sigma E

"""
import time
import datetime
from glob import glob
import re
import os
import tempfile
import subprocess
import io
from io import StringIO
import itertools
import functools
import tarfile

import matplotlib.colors
import matplotlib.cm
import numpy as np
import pandas as pd
import reda
from reda.main import units as reda_units

import crtomo.mpl
from crtomo.mpl import get_mpl_version

import crtomo.binaries as CRBin
import crtomo.grid as CRGrid
import crtomo.nodeManager as nM
import crtomo.parManager as pM
import crtomo.configManager as cConf
import crtomo.cfg as CRcfg
import crtomo.plotManager as PlotManager

mpl_version = get_mpl_version()
plt, mpl = crtomo.mpl.setup()
mpl_version = crtomo.mpl.get_mpl_version()


[docs] class noise_model(object): """noise model as contained in the crt.noisemod file Notes ----- 1. line: 1 # Ensemble seed 2. line: 0.300 # Relative error resistance A (noise) [%] dR=AR+B 3. line: 0.100E-01 # Absolute errior resistance B (noise) [Ohm m] 4. line: 0.00 # Phase error parameter A1 [mRad/Ohm/m] dp=A1*R^B1+A2*p+p0 5. line: 0.00 # Phase error parameter B1 (noise) [] 6. line: 0.00 # Relative phase error A2 (noise) [%] 7. line: 0.00 # Absolute phase error p0 (noise) [mRad] """ def __init__( self, seed, mag_rel=0, mag_abs=0, pha_a1=0, pha_b1=0, pha_rel=0, pha_abs=0): self.seed = seed self.mag_rel = mag_rel self.mag_abs = mag_abs self.pha_a1 = pha_a1 self.pha_b1 = pha_b1 self.pha_rel = pha_rel self.pha_abs = pha_abs
[docs] def write_crt_noisemod(self, filename): with open(filename, 'w') as fid: for value in (self.seed, self.mag_rel, self.mag_abs, self.pha_a1, self.pha_b1, self.pha_rel, self.pha_abs): fid.write('{}\n'.format(value))
[docs] class tdMan(object): """Manage tomodirs """
[docs] def __init__(self, **kwargs): """The following initialization permutations are possible: * initialize empty * initialize from an existing tomodir * tomodir: [tomodir path] * supply one or more of the building blocks of a tomodir * grid: crtomo.grid.crt_grid instance * crmod_cfg: crtomo.cfg.crmod_cfg instance * crtomo_cfg: crmod.cfg.crtomo_cfg instance .. blockdiag:: diagram { inv_cjg -> NI; cov -> NI; cov_mag_fpi -> NI; ata_diag -> NI; ata_reg_diag -> NI; cov1_m_diag -> NI; cov2_m_diag -> NI; res_m_diag -> tdMan_parset_resm; eps -> tdMan_eps_data; inv_ctr -> tdMan_inv_stats; modl -> NI; mag -> tdMan_inv_mag; pha -> tdMan_inv_pha; run_ctr -> NI; volt_files -> NI; cre_cim -> tdMag_inv_cre_cim; tdMan_inv_stats[label="tdMan.inv_stats"]; tdMan_eps_data[label="tdMan.eps_data"]; tdMan_parset_resm[label="tdMan.parman.parsets[tdMan.a['res_m']]"]; inv_cjg[label="conjugate gradient information"]; NI[label="not implemented"]; } http://www2.geo.uni-bonn.de/~mweigand/dashboard/content/crtomo_doc/crtomo/files.html#inv Keyword Arguments ----------------- grid: crtomo.grid.crt_grid A fully initialized grid object tempdir : string|None, optional if set, use this directory to create all temporary directories in. For example, settings this to /dev/shm can result in faster generation of large Jacobians volt_file : None|str|numpy.ndarray if this is None, assume we didn't get any measurement data. If this is a str, assume it to be the filename to a CRTomo volt.dat file. If it is a numpy array, assume 6 columns: a,b,m,n,rmag,rpha volt_data : synonym for volt_file parameter see description for volt_file """ # these variables will be filled later self.grid = None self.crmod_cfg = None self.crtomo_cfg = None self.nodeman = None self.parman = None self.configs = None self.plot = None self.crmod_cfg = None self.crtomo_cfg = None self.tempdir = kwargs.get('tempdir', None) # we need a struct to organize the assignments self.assignments = { # should contain a two-item list with ids in parman 'forward_model': None, # should contain a two-item list with ids of magnitude and phase # measurements (which are stored in self.configs) 'measurements': None, # should contain a two-item tuple with ids of data errors (stored # in self.configs) 'measurement_errors': None, # these are the normalization factors for the individual errors 'error_norm_factor_mag': None, 'error_norm_factor_pha': None, # store sensitivity cids here 'sensitivities': None, # store potential nids here 'potentials': None, # last iteration of inversion results 'inversion': None, # if not None, this should be a tuple (pid_rmag, pid_rpha) # containing pids to the prior model 'prior_model': None, } # short-cut self.a = self.assignments # if set, use this class for the decoupled error model self.noise_model = kwargs.get('noise_model', None) # indicates if all information for modeling are present self.can_model = False # indicates if all information for inversion are present self.can_invert = False self.eps_data = None self.inv_stats = None # additional information from inversion results self.header_res_m = None self.header_l1_dw_log10_norm = None # if we did measure electrode capacitances, store the average electrode # capacitance here # See Zimmermann et al. 2018 for more information # For now this is only one value [S/m] = omega * C and will be # multiplied for each electrode self.electrode_admittance = None # when calling CRTomo, store output here self.crtomo_error_msg = None self.crtomo_output = None self._initialize_components(kwargs)
def __repr__(self): """Return meaningful information on current state of the object""" str_list = [] str_list.append(80 * '-') str_list.append('tdMan instance') str_list.append(80 * '-') # status of grid str_list.append('GRID:') if self.grid is None: str_list.append('no grid loaded') else: str_list.append(self.grid.__repr__()) # status of configs str_list.append('') str_list.append('CONFIGS:') if self.configs.configs is None: str_list.append('no configs present') else: str_list.append( '{} configs present'.format(self.configs.configs.shape[0]) ) # status of parsets str_list.append('') str_list.append('PARSETS:') str_list.append( '{} parsets loaded'.format(len(self.parman.parsets.keys())) ) str_list.append('') str_list.append('ASSIGNMENTS:') str_list.append('{}'.format(self.assignments)) str_list.append(80 * '-') return '\n'.join(str_list)
[docs] def _initialize_components(self, kwargs): r"""initialize the various components using the supplied \*\*kwargs Parameters ---------- kwargs : dict kwargs dict as received by __init__() """ tomodir = None # load/assign grid if 'tomodir' in kwargs: # load grid tomodir = kwargs.get('tomodir') print('importing tomodir {}'.format(tomodir)) assert os.path.isdir(tomodir) grid = CRGrid.crt_grid( tomodir + os.sep + 'grid' + os.sep + 'elem.dat', tomodir + os.sep + 'grid' + os.sep + 'elec.dat', ) self.grid = grid elif 'grid' in kwargs: self.grid = kwargs.get('grid') elif 'elem_file' in kwargs and 'elec_file' in kwargs: grid = CRGrid.crt_grid() grid.load_grid( kwargs['elem_file'], kwargs['elec_file'], ) self.grid = grid else: raise Exception( 'You must provide either a grid instance or ' 'elem_file/elec_file file paths' ) crmod_cfg = kwargs.get('crmod_cfg', CRcfg.crmod_config()) self.crmod_cfg = crmod_cfg crtomo_cfg = kwargs.get('crtomo_cfg', CRcfg.crtomo_config()) self.crtomo_cfg = crtomo_cfg parman = kwargs.get('parman', pM.ParMan(self.grid)) self.parman = parman nodeman = kwargs.get('nodeman', nM.NodeMan(self.grid)) self.nodeman = nodeman configs_abmn = kwargs.get('configs_abmn', None) config = cConf.ConfigManager( nr_of_electrodes=self.grid.nr_of_electrodes ) if configs_abmn is not None: config.add_to_configs(configs_abmn) self.configs = config config_file = kwargs.get('config_file', None) if config_file is not None: self.configs.load_crmod_or_4c_configs(config_file) # we can load data either from file, or directly from a numpy array voltage_file = kwargs.get('volt_file', None) voltage_data = kwargs.get('volt_data', voltage_file) # did we get either a file name OR data? then import it if voltage_data is not None: cids = self.configs.load_crmod_data(voltage_data) self.assignments['measurements'] = cids resistances = kwargs.get('resistances', None) if resistances is not None: cid_mag = self.configs.add_measurements(resistances) self.register_measurements(mag=cid_mag) self.plot = PlotManager.plotManager( grid=self.grid, nm=self.nodeman, pm=self.parman, ) # store decoupling data for the inversion here # will become a Yx3 array self.decouplings = None # if we load from a tomodir, also load configs and inversion results if tomodir is not None: print('importing tomodir results') # if present, read crtomo.cfg file crtomo_cfg_file = tomodir + os.sep + 'exe' + os.sep + 'crtomo.cfg' if os.path.isfile(crtomo_cfg_file): self.crtomo_cfg.import_from_file(crtomo_cfg_file) # forward configurations config_file = tomodir + os.sep + 'config' + os.sep + 'config.dat' if os.path.isfile(config_file): self.configs.load_crmod_config(config_file) # forward model rho_file = tomodir + os.sep + 'rho' + os.sep + 'rho.dat' if os.path.isfile(rho_file): pid_mag, pid_pha = self.parman.load_from_rho_file(rho_file) self.register_forward_model(pid_mag, pid_pha) # load data/modeling results self._read_modeling_results(tomodir + os.sep + 'mod') # load inversion results self.read_inversion_results(tomodir) self.read_decouplings_file( tomodir + os.sep + 'exe' + os.sep + 'decouplings.dat' )
# if tomodir_tarxz is not None: # # read from a tarxz file/BytesIO file # raise Exception('Reading from tar.xz files is not supported yet')
[docs] def read_decouplings_file(self, filename): """Import decoupling data for the inversion. This is usally a file called decouplings.dat in the exe/ directory of a tomodir, but we can also read from an BytesIO object. Do nothing if the file does not exist Overwrite any existing decouplings """ if not isinstance(filename, io.BytesIO): if not os.path.isfile(filename): return decouplings = np.loadtxt(filename, skiprows=1) assert decouplings.shape[1] == 3 if self.decouplings is not None: print('WARNING: overwriting existing decouplings') self.decouplings = decouplings
[docs] def save_decouplins_file(self, filename): if self.decouplings is None: return if isinstance(filename, io.BytesIO): fid = filename else: fid = open(filename, 'w') fid.write('{}\n'.format(self.decouplings.shape[0])) np.savetxt(fid, self.decouplings, fmt='%i %i %f') if not isinstance(filename, io.BytesIO): fid.close()
[docs] def add_to_decouplings(self, new_decouplings): assert new_decouplings.shape[1] == 3 if self.decouplings is None: self.decouplings = new_decouplings self.decouplings = np.vstack(( self.decouplings, new_decouplings ))
[docs] def inv_get_last_pid(self, parameter): """Return the pid of the parameter set corresponding to the final inversion results of a given parameter. Return None if the parameter type does not exist, or no inversion result was registered. Parameters ---------- parameter : str The requested parameter type: cre, cim, rmag, rpha Returns ------- pid : int|None The parameter id, or None """ if ('inversion' in self.a and parameter in self.a['inversion'] and len(self.a['inversion'][parameter]) > 0): pid = self.a['inversion'][parameter][-1] return pid return None
[docs] def inv_last_rmag_parset(self): """Return the resistivity magnitude of the last iteration. None if no inversion data exists. Example ------- >>> import crtomo ... tdm = crtomo.tdMan('tomodir/') ... tdm.inv_last_rmag_parset() Returns ------- inv_last_rmag : numpy.ndarray|None """ pid = self.inv_get_last_pid('rmag') if pid is None: return None else: return self.parman.parsets[pid]
[docs] def inv_last_rpha_parset(self): """Return the phase magnitude of the last inversion iteration. None if no inversion data exists. Returns ------- inv_last_rpha : numpy.ndarray|None """ pid = self.inv_get_last_pid('rpha') if pid is None: return None else: return self.parman.parsets[pid]
[docs] def inv_last_cre_parset(self): """Return the real part of the complex resistivity of the last inversion iteration. None if no inversion data exists. Returns ------- inv_last_cre : numpy.ndarray|None """ pid = self.inv_get_last_pid('cre') if pid is None: return None else: return self.parman.parsets[pid]
[docs] def inv_last_cim_parset(self): """Return the imaginary part of the complex resistivity of the last inversion iteration.None if no inversion data exists. Returns ------- inv_last_cim : numpy.ndarray|None """ pid = self.inv_get_last_pid('cim') if pid is None: return None else: return self.parman.parsets[pid]
[docs] def reset_data(self): """Attempts to reset (delete) all inversion data currently stored in the tdMan instance. This is mostly attempted for the impedance data (magnitudes, phases, conductivity real and imaginary parts), but could be extended to other data (this is currently not done due to complexity and missing demand). Forward models are also deleted. """ # deletes data actually stored self.parman.reset() if 'inversion' in self.a and self.a['inversion'] is not None: for key in ('rmag', 'rpha', 'cre', 'cim', 'cre_cim'): self.a['inversion'][key] = {}
[docs] def create_tomodir(self, directory): """Create a tomodir subdirectory structure in the given directory """ pwd = os.getcwd() if not os.path.isdir(directory): os.makedirs(directory) os.chdir(directory) directories = ( 'config', 'exe', 'grid', 'mod', 'mod/pot', 'mod/sens', 'rho', ) for directory in directories: if not os.path.isdir(directory): os.makedirs(directory) os.chdir(pwd)
[docs] def _check_state(self): """Check if this instance can model and/or can invert """ if (self.grid is not None and self.configs.configs is not None and self.assignments['forward_model'] is not None): self.can_model = True if (self.grid is not None and self.assignments['measurements'] is not None): self.can_invert = True
[docs] def load_parset_from_file(self, filename, columns=0): """ Parameters ---------- filename : string filename to rho.dat file columns : int|list of int which columns to use/treat as parameter sets. This settings uses zero-indexing, i.e., the first column is 0. Default: column 0 Returns ------- pids : int|list the parameter ids of the imported files """ pids = self.parman.load_model_from_file(filename, columns=columns) return pids
[docs] def load_rho_file(self, filename): """Load a forward model from a rho.dat file Parameters ---------- filename : string filename to rho.dat file Returns ------- pid_mag : int parameter id for the magnitude model pid_pha : int parameter id for the phase model """ pids = self.parman.load_from_rho_file(filename) self.register_magnitude_model(pids[0]) self.register_phase_model(pids[1]) return pids
[docs] def save_to_tarfile(self): """Save the current modeling/inversion data into a tarfile. Return the file as an io.BytesIO object. The tar file is generated completely in memory - no files are written to the disc Returns ------- tomodir_tar : io.BytesIO Tomodir stored in tar file """ tomodir = io.BytesIO() tar = tarfile.open(fileobj=tomodir, mode='w:xz') # prepare buffers and write them to the tar file elem_data = io.BytesIO() self.grid.save_elem_file(elem_data) info = tarfile.TarInfo() info.name = 'grid/elem.dat' info.mtime = time.time() info.size = elem_data.tell() info.type = tarfile.REGTYPE elem_data.seek(0) tar.addfile(info, elem_data) elec_data = io.BytesIO() self.grid.save_elec_file(elec_data) info = tarfile.TarInfo() info.name = 'grid/elec.dat' info.mtime = time.time() info.size = elec_data.tell() info.type = tarfile.REGTYPE elec_data.seek(0) tar.addfile(info, elec_data) crtomo_cfg = io.BytesIO() self.crtomo_cfg.write_to_file(crtomo_cfg) info = tarfile.TarInfo() info.name = 'exe/crtomo.cfg' info.mtime = time.time() info.size = crtomo_cfg.tell() info.type = tarfile.REGTYPE crtomo_cfg.seek(0) tar.addfile(info, crtomo_cfg) decouplings = io.BytesIO() self.save_decouplins_file(decouplings) info = tarfile.TarInfo() info.name = 'exe/decouplings.dat' info.mtime = time.time() info.size = crtomo_cfg.tell() info.type = tarfile.REGTYPE decouplings.seek(0) tar.addfile(info, decouplings) volt_data = io.BytesIO() self.save_measurements(volt_data) info = tarfile.TarInfo() info.name = 'mod/volt.dat' info.mtime = time.time() info.size = volt_data.tell() info.type = tarfile.REGTYPE volt_data.seek(0) tar.addfile(info, volt_data) tar.close() tomodir.seek(0) return tomodir # modelling """ config/config.dat rho/rho.dat exe/crmod.cfg exe/crtomo.cfg exe/crt.noisemod exe/electrode_capactitances.dat exe/decouplings.dat exe/prior.model [TODO]exe/crt.lamnull mod/sens/* [TODO] mod/volt.dat [TODO] mod/pot/* inv/cjg.ctr inv/coverage.mag coverage.mag.fpi [TODO] inv/ata.diag [TODO] inv/ata_reg.diag [TODO] inv/cov1_m.diag [TODO] inv/cov2_m.diag [TODO] inv/res_m.diag [TODO] inv/eps.ctr [TODO] inv/inv.ctr [TODO] inv/*.model [TODO] inv/*.mag [TODO] inv/*.pha [TODO] inv/run.ctr [TODO] inv/voltXX.dat """
# file1 = io.BytesIO() # content_file1 = 'Hi there\nNew line\n'.encode('utf-8') # file1.write(content_file1) # file1.seek(0) # tar = tarfile.open(fileobj=target_file, mode='w:xz') # info1 = tarfile.TarInfo() # info1.name = 'subdir/File1.txt' # info1.mtime = time.time() # info1.size = len(content_file1) # info1.type = tarfile.REGTYPE # tar.addfile(info1, file1) # tar.close() # target_file.seek(0) # print('Reading tar file from memory:') # print( # target_file.read() # ) # with open('test.tar.xz', 'wb') as fid: # target_file.seek(0) # fid.write(target_file.read()) # # extract with tar xvJf test.tar.xz # ## Reading # tar = tarfile.open(fileobj=target_file, mode='r')
[docs] def save_to_tomodir(self, directory, only_for_inversion=False): """Save the tomodir instance to a directory structure. At this point forward modeling results (voltages, potentials and sensitivities) will be saved. Inversion results will, at this stage, not be saved (TODO!!!). Parameters ---------- directory : str Output directory (tomodir). Will be created of it does not exists, but otherwise files we be overwritten in any existing directories only_for_inversion : bool, optional If True, save only files required for an inversion (i.e., omit any forward modeling files and results not necessary). Default: False Note ---- Test cases: * modeling only * inversion only * modeling and inversion """ self.create_tomodir(directory) self.grid.save_elem_file( directory + os.sep + 'grid' + os.sep + 'elem.dat' ) self.grid.save_elec_file( directory + os.sep + 'grid' + os.sep + 'elec.dat' ) # modeling if not only_for_inversion: if (self.configs.configs is not None and self.assignments['forward_model'] is not None): self.configs.write_crmod_config( directory + os.sep + 'config' + os.sep + 'config.dat' ) if self.assignments['forward_model'] is not None: self.parman.save_to_rho_file( directory + os.sep + 'rho' + os.sep + 'rho.dat', self.assignments['forward_model'][0], self.assignments['forward_model'][1], ) self.crmod_cfg.write_to_file( directory + os.sep + 'exe' + os.sep + 'crmod.cfg' ) if self.assignments['sensitivities'] is not None: self._save_sensitivities( directory + os.sep + 'mod' + os.sep + 'sens', ) if self.assignments['potentials'] is not None: self._save_potentials( directory + os.sep + 'mod' + os.sep + 'pot', ) # we always want to save the measurements self.save_measurements( directory + os.sep + 'mod' + os.sep + 'volt.dat' ) # inversion self.crtomo_cfg.write_to_file( directory + os.sep + 'exe' + os.sep + 'crtomo.cfg' ) if self.electrode_admittance is not None: self._write_el_admittance(directory) if self.noise_model is not None: self.noise_model.write_crt_noisemod( directory + os.sep + 'exe' + os.sep + 'crt.noisemod' ) self.save_decouplins_file( directory + os.sep + 'exe' + os.sep + 'crt.noisemod' ) if not os.path.isdir(directory + os.sep + 'inv'): os.makedirs(directory + os.sep + 'inv') if self.a['prior_model'] is not None: self.parman.save_to_rho_file( directory + os.sep + 'inv/prior.model', *self.a['prior_model'] )
[docs] def _write_el_admittance(self, directory): filename = 'electrode_capacitances.dat' with open(directory + os.sep + 'exe' + os.sep + filename, 'w') as fid: fid.write('{}\n'.format(self.grid.nr_of_electrodes)) np.savetxt( fid, np.ones_like( self.grid.nr_of_electrodes ) * self.electrode_admittance )
[docs] def save_measurements(self, filename): """Save measurements in a file. Use the CRTomo format. Do not save anything if no measurements are present in this tdMan instance. Parameters ---------- filename : string path to output filename """ if self.assignments['measurements'] is not None: if self.assignments['measurement_errors'] is not None: # save individual errors self.configs.write_crmod_volt_with_individual_errors( filename, self.assignments['measurements'], self.assignments['measurement_errors'], ) else: self.configs.write_crmod_volt( filename, self.assignments['measurements'] )
[docs] def _save_sensitivities(self, directory): """save sensitivities to a directory """ print('saving sensitivities') digits = int(np.ceil(np.log10(self.configs.configs.shape[0]))) for i in range(0, self.configs.configs.shape[0]): sens_data, meta_data = self.get_sensitivity(i) filename_raw = 'sens{0:0' + '{0}'.format(digits) + '}.dat' filename = directory + os.sep + filename_raw.format(i + 1) grid_xz = self.grid.get_element_centroids() all_data = np.vstack(( grid_xz[:, 0], grid_xz[:, 0], sens_data[0], sens_data[1], )).T with open(filename, 'wb') as fid: fid.write(bytes( '{0} {1}\n'.format(meta_data[0], meta_data[1]), 'utf-8' )) np.savetxt(fid, all_data)
[docs] def _save_potentials(self, directory): """save potentials to a directory """ print('saving potentials') digits = int(np.ceil(np.log10(self.configs.configs.shape[0]))) for i in range(0, self.configs.configs.shape[0]): pot_data = self.get_potential(i) filename_raw = 'pot{0:0' + '{0}'.format(digits) + '}.dat' filename = directory + os.sep + filename_raw.format(i + 1) nodes = self.grid.nodes['sorted'][:, 1:3] all_data = np.hstack(( nodes, pot_data[0][:, np.newaxis], pot_data[1][:, np.newaxis], )) with open(filename, 'wb') as fid: np.savetxt(fid, all_data)
[docs] def clear_measurements(self): """Forget any previous measurements """ mid_list = self.assignments.get('measurements', None) if mid_list is not None: for mid in mid_list: self.configs.delete_measurements(mid=mid) self.assignments['measurements'] = None
[docs] def measurements(self, silent=False): """Return the measurements associated with this instance. If measurements are not present, check if we can model, and then run CRMod to load the measurements. Parameters ---------- silent : bool, optional If False, suppress CRMod output """ # check if we have measurements mid = self.assignments.get('measurements', None) if mid is None: return_value = self.model( voltages=True, sensitivities=False, potentials=False, silent=silent, ) if return_value is None: print('cannot model') return # retrieve measurements cids = self.assignments['measurements'] measurements = np.vstack(( self.configs.measurements[cids[0]], self.configs.measurements[cids[1]], )).T return measurements
[docs] def _read_modeling_results(self, mod_directory, silent=False): """Read modeling results from a given mod/ directory. Possible values to read in are: * voltages * potentials * sensitivities Parameters ---------- mod_directory : str Path to the directory containing the volt.dat file silent : bool, optional If True, suppress some debug output Returns ------- None """ voltage_file = mod_directory + os.sep + 'volt.dat' if os.path.isfile(voltage_file): if not silent: print('reading voltages') self.read_voltages(voltage_file) sens_files = sorted(glob( mod_directory + os.sep + 'sens' + os.sep + 'sens*.dat') ) # check if there are sensitivity files, and that the nr corresponds to # the nr of configs if (len(sens_files) > 0 and len(sens_files) == self.configs.nr_of_configs): print('reading sensitivities') self._read_sensitivities(mod_directory + os.sep + 'sens') # same for potentials pot_files = sorted(glob( mod_directory + os.sep + 'pot' + os.sep + 'pot*.dat') ) # check if there are sensitivity files, and that the nr corresponds to # the nr of configs if (len(pot_files) > 0 and len(pot_files) == self.configs.nr_of_configs): print('reading potentials') self._read_potentials(mod_directory + os.sep + 'pot')
[docs] def _read_sensitivities(self, sens_dir): """import sensitivities from a directory Note ---- * check that signs are correct in case CRMod switches potential electrodes """ if self.assignments['sensitivities'] is not None: print('Sensitivities already imported. Will not overwrite!') return else: self.assignments['sensitivities'] = {} sens_files = sorted(glob(sens_dir + os.sep + 'sens*.dat')) for nr, filename in enumerate(sens_files): with open(filename, 'r') as fid: metadata = np.fromstring( fid.readline().strip(), sep=' ', count=2 ) meta_re = metadata[0] meta_im = metadata[1] sens_data = np.loadtxt(fid) cids = self.parman.add_data( sens_data[:, 2:4], [meta_re, meta_im], ) # store cids for later retrieval self.assignments['sensitivities'][nr] = cids
[docs] def _read_potentials(self, pot_dir): """import potentials from a directory """ if self.assignments['potentials'] is not None: print('Potentials already imported. Will not overwrite!') return else: self.assignments['potentials'] = {} pot_files = sorted(glob(pot_dir + os.sep + 'pot*.dat')) for nr, filename in enumerate(pot_files): with open(filename, 'r') as fid: pot_data = np.loadtxt(fid) nids = self.nodeman.add_data( pot_data[:, 2:4], ) # store cids for later retrieval self.assignments['potentials'][nr] = nids
[docs] def get_potential(self, config_nr): """Return potential data for a given measurement configuration. Parameters ---------- config_nr: int Number of the configurations. Starts at 0 Returns ------- pot_data: list with two numpy.ndarrays First array: magnitude potentials, second array: phase potentials """ if self.assignments['potentials'] is None: self._check_state() if self.can_model: self.model(potentials=True) nids = self.assignments['potentials'][config_nr] pot_data = [self.nodeman.nodevals[nid] for nid in nids] return pot_data
[docs] def get_sensitivity(self, config_nr): """return a sensitivity, as well as corresponding metadata, for a given measurement configuration. Indices start at zero. """ if self.assignments['sensitivities'] is None: self._check_state() if self.can_model: self.model(sensitivities=True, silent=True) cids = self.assignments['sensitivities'][config_nr] sens_data = [self.parman.parsets[cid] for cid in cids] meta_data = [self.parman.metadata[cid] for cid in cids] return sens_data, meta_data
[docs] def plot_sensitivity(self, config_nr=None, sens_data=None, mag_only=False, absv=False, **kwargs): """Create a nice looking plot of the sensitivity distribution for the given configuration nr. Configs start at 1! Parameters ---------- config_nr : int, optional The configuration number (starting with 0) to compute the sensitivity for. sens_data : Nx2 numpy.ndarray, optional If provided, use this data as sensitivity data (do not compute anything) mag_only : bool, optional Plot only the magnitude sensitivities absv : bool, optional If true, plot absolute values of sensitivity Returns ------- fig ax Examples -------- .. plot:: import crtomo.debug import crtomo grid = crtomo.debug.get_grid(key=20) td = crtomo.tdMan(grid=grid) td.configs.add_to_configs([1, 5, 9, 13]) cid_mag, cid_pha = td.add_homogeneous_model(25, 0) td.register_forward_model(cid_mag, cid_pha) td.model(sensitivities=True) fig, axes = td.plot_sensitivity(0) fig.tight_layout() fig.savefig('sens_plot.pdf', bbox_inches='tight') """ def _rescale_sensitivity(sens_data): norm_value = np.abs(sens_data).max() if norm_value == 0: sens_normed = sens_data * 0 else: sens_normed = sens_data / norm_value indices_gt_zero = sens_data > 1e-5 indices_lt_zero = sens_data < -1e-5 # map all values greater than zero to the range [0.5, 1] x = np.log10(sens_normed[indices_gt_zero]) # log_norm_factor = np.abs(x).max() log_norm_factor = -5 y1 = 1 - x / (2 * log_norm_factor) # map all values smaller than zero to the range [0, 0.5] x = np.log10(np.abs(sens_normed[indices_lt_zero])) y = x / (2 * log_norm_factor) y2 = np.abs(y) # reassign values sens_data[:] = 0.5 sens_data[indices_gt_zero] = y1 sens_data[indices_lt_zero] = y2 return sens_data assert config_nr is not None or sens_data is not None assert not (config_nr is not None and sens_data is not None) if config_nr is not None: cids = self.assignments['sensitivities'][config_nr] sens_mag = self.parman.parsets[cids[0]] sens_pha = self.parman.parsets[cids[1]] else: sens_mag = sens_data[:, 0] sens_pha = sens_data[:, 1] if absv: sens_mag = np.log10(np.abs(sens_mag) / np.abs(sens_mag).max()) sens_pha = np.log10(np.abs(sens_pha) / np.abs(sens_pha).max()) cbmin = sens_mag.min() cbmax = sens_mag.max() else: _rescale_sensitivity(sens_mag) _rescale_sensitivity(sens_pha) cbmin = 0 cbmax = 1 # https://matplotlib.org/stable/api/prev_api_changes/api_changes_3.9.0.html#top-level-cmap-registration-and-access-functions-in-mpl-cm if mpl_version[0] <= 3 and mpl_version[1] < 9: cmap_jet = mpl.cm.get_cmap('jet') else: cmap_jet = mpl.colormaps['jet'] colors = [cmap_jet(i) for i in np.arange(0, 1.1, 0.1)] cmap = matplotlib.colors.LinearSegmentedColormap.from_list( 'jetn9', colors, N=9) over = kwargs.get('over', 'orange') under = kwargs.get('under', 'cyan') bad = kwargs.get('bad', 'white') cmap.set_over(over) cmap.set_under(under) cmap.set_bad(bad) if mag_only: Nx = 1 else: Nx = 2 axes = kwargs.get('axes', None) if axes: # assume axes to plot to were provided if Nx == 2: assert len(axes) == 2, "We need two axes" fig = axes[0].get_figure() else: fig = axes.get_figure() else: fig, axes = plt.subplots(1, Nx, figsize=(15 / 2.54, 12 / 2.54)) axes = np.atleast_1d(axes) # magnitude ax = axes[0] cid = self.parman.add_data(sens_mag) fig, ax, cnorm, cmap, cb, sM = self.plot.plot_elements_to_ax( cid=cid, ax=ax, plot_colorbar=True, # cmap_name='seismic', cmap_name='jet_r', cbsegments=18, cbmin=cbmin, cbmax=cbmax, bad='white', # cbmin=-cblim, # cbmax=cblim, # converter=converter_pm_log10, # norm = colors.SymLogNorm( # linthresh=0.03, # linscale=0.03, # vmin=-1.0, # vmax=1.0 # ), # xmin=-0.25, # xmax=10, # zmin=-2, ) if not absv: cb.set_ticks([0, 0.25, 0.5, 0.75, 1]) cb.set_ticklabels([ '-1', r'$-10^{-2.5}$', '0', r'$10^{-2.5}$', '1', ]) # self.plot.plot_elements_to_ax( # cid=cids[0], # ax=ax, # ) if not mag_only: cid = self.parman.add_data(sens_pha) # plot phase ax = axes[1] fig, ax, cnorm, cmap, cb, sM = self.plot.plot_elements_to_ax( cid=cid, ax=ax, plot_colorbar=True, # cmap_name='seismic', cmap_name='jet_r', cbsegments=18, cbmin=cbmin, cbmax=cbmax, bad='white', ) if not absv: cb.set_ticks([0, 0.25, 0.5, 0.75, 1]) cb.set_ticklabels([ '-1', r'$-10^{-2.5}$', '0', r'$10^{-2.5}$', '1', ]) fig.tight_layout() return fig, axes
[docs] def read_voltages(self, voltage_file): """Import voltages from a volt.dat file Parameters ---------- voltage_file : str Path to volt.dat file """ if isinstance(voltage_file, (StringIO, )): fid = voltage_file fid.seek(0) else: fid = open(voltage_file, 'r') items_first_line = fid.readline().strip().split(' ') # rewind for reading of complete file later on fid.seek(0) if int(items_first_line[0]) == 0: # empty file return individual_errors = False if len(items_first_line) == 1: # regular volt.dat file measurements_raw = np.loadtxt( fid, skiprows=1, ) elif len(items_first_line) == 2 and items_first_line[1] == 'T': individual_errors = True # Individual data errors measurements_raw = np.genfromtxt( fid, skip_header=1, max_rows=int(items_first_line[0]), ) fid.seek(0) (norm_mag, norm_pha) = np.genfromtxt(fid, max_rows=1) fid.close() measurements = np.atleast_2d(measurements_raw) # extract measurement configurations A = (measurements[:, 0] / 1e4).astype(int) B = (measurements[:, 0] % 1e4).astype(int) M = (measurements[:, 1] / 1e4).astype(int) N = (measurements[:, 1] % 1e4).astype(int) ABMN = np.vstack((A, B, M, N)).T # it may happen that we need to switch signs of measurements switch_signs = [] if self.configs.configs is None: self.configs.configs = ABMN else: # configurations don't match if not np.all(ABMN == self.configs.configs): for nr, (old_config, new_config) in enumerate(zip( self.configs.configs, ABMN)): if np.all(old_config == new_config): continue # check polarity current_electrodes_are_equal = np.all( old_config[0:2] == new_config[0:2] ) voltage_electrodes_are_switched = np.all( old_config[2:4] == new_config[4:1:-1] ) if (current_electrodes_are_equal and voltage_electrodes_are_switched): if len(self.configs.measurements.keys()) > 0: # print('asdasd') # import IPython # IPython.embed() # raise Exception( # 'need to switch electrode polarity, but ' + # 'there are already measurements stored for ' # + 'the old configuration!') # exit() switch_signs += [nr] else: # switch M/N in configurations self.configs.configs[nr, :] = new_config else: raise Exception( 'There was an error matching configurations of ' + 'voltages with configurations already imported' ) # change sign of R measurements that require it measurements[switch_signs, 2] *= -1 # add measurements to the config instance mid_mag = self.configs.add_measurements(measurements[:, 2]) if measurements.shape[1] >= 4: mid_pha = self.configs.add_measurements(measurements[:, 3]) else: mid_pha = None # register those measurements as 'the measurements', used, e.g., for a # subsequent inversion self.assignments['measurements'] = [mid_mag, mid_pha] if individual_errors: mid_mag_error = self.configs.add_measurements(measurements[:, 4]) mid_pha_error = self.configs.add_measurements(measurements[:, 5]) self.register_data_errors( mid_mag_error, mid_pha_error, norm_mag, norm_pha )
[docs] def _model(self, voltages, sensitivities, potentials, tempdir, silent=False): self._check_state() if self.can_model: if not silent: print('attempting modeling') pwd = os.getcwd() os.chdir(tempdir) # store the configuration temporarily cfg_save = self.crmod_cfg.copy() self.crmod_cfg['write_volts'] = voltages self.crmod_cfg['write_pots'] = potentials self.crmod_cfg['write_sens'] = sensitivities self.save_to_tomodir('.') os.chdir('exe') binary = CRBin.get('CRMod') return_text = subprocess.check_output( binary, shell=True, stderr=subprocess.STDOUT, ) if not silent: print(return_text) # print('Return text:', return_text) # restore the configuration self.crmod_cfg = cfg_save # if return_code != 0: # raise Exception('There was an error using CRMod') os.chdir(pwd) self._read_modeling_results( tempdir + os.sep + 'mod', silent=silent) return 1 else: print( 'Sorry, no measurements present, cannot model yet' ) return None
[docs] def model(self, voltages=True, sensitivities=False, potentials=False, output_directory=None, silent=False, ): """Forward model the tomodir and read in the results Note that this function will always do the full modeling. No caching involved. Please use .measurements to accessed cached measurements. Parameters ---------- voltages : bool, optional if True, compute voltages for registered quadrupoles. Default: True sensitivities : bool, optional if True, compute sensitivities for registered quadrupoles. Default: False potentials : bool, optional if True, compute potential fields for all current injections. Default: False TODO: check if implemented in the Python wrapper output_directory : str|None, optional if this is a string, treat it as an output directory in which the tomodir used for the modeling is saved. Will raise an exception if the output directory already exists. Default: None silent : bool, optional if True, suppress most of the output. Default: False Returns ------- None """ self._check_state() if self.can_model: if output_directory is not None: if not os.path.isdir(output_directory): os.makedirs(output_directory) tempdir = output_directory self._model(voltages, sensitivities, potentials, tempdir) else: raise IOError( 'output directory already exists: {0}'.format( output_directory ) ) else: with tempfile.TemporaryDirectory(dir=self.tempdir) as tempdir: self._model( voltages, sensitivities, potentials, tempdir, silent=silent ) return 1 else: print('Sorry, not all required information to model are present') print('Check:') print('1) configurations present: self.configs.configs') print('2) is a model present') return None
[docs] def _invert(self, tempdir, catch_output=True, **kwargs): """Internal function than runs an inversion using CRTomo. Parameters ---------- tempdir : str directory which to use as a tomodir catch_output : bool, optional if True, catch all outputs of the CRTomo call (default: True) cores : int, optional how many cores to use. (default 2) Returns ------- success: bool False if an error was detected error_msg: str Error message. None if not error was encountered. output: str Output of the actual CRTomo call """ nr_cores = kwargs.get('cores', 2) print('Attempting inversion in directory: {0}'.format(tempdir)) pwd = os.getcwd() os.chdir(tempdir) self.save_to_tomodir('.') os.chdir('exe') binary = CRBin.get('CRTomo') print('Using binary: {0}'.format(binary)) print('Calling CRTomo') # store env variable env_omp = os.environ.get('OMP_NUM_THREADS', '') os.environ['OMP_NUM_THREADS'] = '{0}'.format(nr_cores) output = None if catch_output: output = subprocess.check_output( binary, shell=True, stderr=subprocess.STDOUT, ) else: subprocess.call( binary, shell=True, ) # reset environment variable os.environ['OMP_NUM_THREADS'] = env_omp print('Inversion attempt finished') if os.path.isfile('error.dat'): error_message = open('error.dat', 'r').read() success = False else: success = True error_message = None os.chdir(pwd) if success: print('Attempting to import the results') self.read_inversion_results(tempdir) print('Statistics of last iteration:') print(self.inv_stats.iloc[-1]) else: print('There was an error while trying to invert') print('Please see .crtomo_error_msg and .crtomo_output') self.crtomo_output = output self.crtomo_error_msg = error_message return success
[docs] def invert(self, output_directory=None, catch_output=True, **kwargs): """Invert this instance, and import the result files No directories/files will be overwritten. Raise an IOError if the output directory exists. Parameters ---------- output_directory : string, optional use this directory as output directory for the generated tomodir. If None, then a temporary directory will be used that is deleted after data import. catch_output : bool, optional if True, do not show CRTomo output. Default:True cores : int, optional how many cores to use for CRTomo **kwargs : Are supplied to :py:meth:`crtomo.tdMan._invert` Returns ------- return_code : bool Return 0 if the inversion completed successfully. Return 1 if no measurements are present. """ self._check_state() if self.can_invert: if output_directory is not None: if not os.path.isdir(output_directory): os.makedirs(output_directory) tempdir = output_directory success = self._invert( tempdir, catch_output, **kwargs ) else: raise IOError( 'Output directory already exists: {0}'.format( output_directory ) ) else: with tempfile.TemporaryDirectory(dir=self.tempdir) as tempdir: success = self._invert( tempdir, catch_output, **kwargs ) if success: return 0 else: return 1 else: print( 'Sorry, no measurements present, cannot model yet' ) return 1
[docs] def invert_with_crhydra(self): import cr_hydra.settings import crh_add import crh_retrieve import hashlib import platform cr_hydra_config, error_code = cr_hydra.settings.get_config(True) if error_code != 0: assert ('No cr_hydra config file found. Cannot proceed') print('cr_hydra config found. Proceeding') print('Creating in-memory .tar.xz file of tomodir') tarxz = self.save_to_tarfile() # upload the simulation crh_settings = { 'datetime_init': '{}'.format( datetime.datetime.now(tz=datetime.timezone.utc) ), } crh_settings['source_computer'] = platform.node() crh_settings['sim_type'] = 'inv' crh_settings['crh_file'] = 'jupyter-lab' crh_settings['username'] = 'crtomo-tools' engine = crh_add._get_db_engine(cr_hydra_config) connection = engine.connect() archive_file = 'dasdasd' file_id = crh_add._upload_binary_data( tarxz, archive_file, connection ) crh_settings['tomodir_unfinished_file'] = file_id sim_id = crh_add._upload_simulation(crh_settings, connection) crh_settings['sim_id'] = sim_id crh_add._activate_simulation(sim_id, connection) print('cr_hydra simulation id:', sim_id) # wait until the inversion finished is_finished = None while is_finished is None: print('Waiting for inversion to finish') time.sleep(5) is_finished = crh_retrieve._is_finished(sim_id, connection) print('Inversion finished') print('Downloading') # retrieve the inversion # NOTE: This should be called from crh_retrieve result = connection.execute( crh_retrieve.text( 'select hash, data from binary_data where index=:data_id;' ), parameters={ 'data_id': is_finished, }, ) assert result.rowcount == 1 file_hash, binary_data = result.fetchone() # check hash m = hashlib.sha256() m.update(binary_data) assert file_hash == m.hexdigest(), "sha256 does not match" result_data = io.BytesIO(bytes(binary_data)) with open('output.tar.xz', 'wb') as fid: fid.write(result_data.read())
[docs] def read_inversion_results(self, tomodir): """Import inversion results from a tomodir into this instance .. warning:: This function is not finished yet and does not import ALL crtomo information yet. Parameters ---------- tomodir : str Path to tomodir """ print('Reading inversion results') self._read_inversion_results(tomodir) self._read_inversion_fwd_responses(tomodir) self.inv_stats = self._read_inv_ctr(tomodir) self._read_resm_m(tomodir) self._read_l1_coverage(tomodir) self._read_l2_coverage(tomodir) self.eps_data = self._read_eps_ctr(tomodir) # for simplicity, add configurations to psi data has_eps_data = False if isinstance(self.eps_data, list): if len(self.eps_data) > 1: has_eps_data = True if self.configs.configs is not None and len( self.configs.configs) > 0 and has_eps_data: for iteration in range(1, len(self.eps_data)): for index, key in enumerate('abmn'): self.eps_data[ iteration ][key] = self.configs.configs[:, index]
[docs] def _read_inversion_fwd_responses(self, tomodir): """Import the forward responses for all iterations of a given inversion Parameters ---------- tomodir : str Path to tomodir """ basedir = tomodir + os.sep + 'inv' + os.sep volt_files = sorted(glob(basedir + 'volt*.dat')) pids_rmag = [] pids_rpha = [] pids_wdfak = [] for filename in volt_files: pids = self.configs.load_crmod_data( filename, is_forward_response=True, try_fix_signs=True) pids_rmag.append(pids[0]) pids_rpha.append(pids[1]) pids_wdfak.append(pids[2]) # self.a['inversions'] already created by .read_inversion_results self.assignments['inversion']['fwd_response_rmag'] = pids_rmag self.assignments['inversion']['fwd_response_rpha'] = pids_rpha self.assignments['inversion']['fwd_response_wdfak'] = pids_wdfak
[docs] def _read_inversion_results(self, tomodir): """Import resistivity magnitude/phase and real/imaginary part of conductivity for all iterations Parameters ---------- tomodir : str Path to tomodir """ basedir = tomodir + os.sep + 'inv' + os.sep inv_mag = sorted(glob(basedir + 'rho*.mag')) inv_pha = sorted(glob(basedir + 'rho*.pha')) inv_sig = sorted(glob(basedir + 'rho*.sig')) assert len(inv_pha) == 0 or len(inv_mag) == len(inv_pha) assert len(inv_sig) == 0 or len(inv_mag) == len(inv_sig) pids_mag = [ self.parman.load_inv_result(filename, is_log10=True) for filename in inv_mag ] pids_pha = [self.parman.load_inv_result(filename) for filename in inv_pha] pids_sig = [ self.parman.load_inv_result( filename, columns=[0, 1], is_log10=False ) for filename in inv_sig ] if len(pids_sig) > 0: pids_cre = [x[0] for x in pids_sig] pids_cim = [x[1] for x in pids_sig] else: if len(pids_pha) > 0: pids_cre = [] pids_cim = [] for pid_rmag, pid_rpha in zip(pids_mag, pids_pha): # compute the admittances by hand impedance = self.parman.parsets[pid_rmag] * np.exp( 1j * self.parman.parsets[pid_rpha] / 1000 ) admittance = 1 / impedance pids_cre += [self.parman.add_data(np.real(admittance))] pids_cim += [self.parman.add_data(np.imag(admittance))] else: pids_cre = None pids_cim = None self.assignments['inversion'] = { 'rmag': pids_mag, 'rpha': pids_pha, 'cre_cim': pids_sig, 'cre': pids_cre, 'cim': pids_cim, }
[docs] def plot_eps_data_hist(self, filename=None): """Plot histograms of data residuals and data error weighting TODO : * add percentage of data below/above the RMS value Parameters ---------- filename : string|None if not None, then save plot to this file Returns ------- figure : matplotlib.Figure|None if filename is None, then return the generated figure """ assert self.eps_data is not None dfs = self.eps_data # check if this is a DC inversion if 'datum' in dfs[0]: dc_inv = True else: dc_inv = False nr_y = len(dfs) size_y = 5 / 2.54 * nr_y if dc_inv: nr_x = 1 else: nr_x = 3 size_x = 15 / 2.54 fig, axes = plt.subplots(nr_y, nr_x, figsize=(size_x, size_y)) axes = np.atleast_2d(axes) # plot initial data errors df = dfs[0] if dc_inv: ax = axes[0, 0] ax.hist( df['datum'] / df['eps_r'], 100, ) ax.set_xlabel(r'$-log(|R|) / \epsilon_r$') ax.set_ylabel(r'count') else: # complex inversion ax = axes[0, 0] ax.hist( df['-log(|R|)'] / df['eps'], 100, ) ax.set_xlabel(r'$-log(|R|)$') ax.set_ylabel(r'count') ax = axes[0, 1] ax.hist( df['-log(|R|)'] / df['eps_r'], 100, ) ax.set_xlabel(r'$-log(|R|) / \epsilon_r$') ax.set_ylabel(r'count') ax = axes[0, 2] phase_data = df['-Phase(rad)'] / df['eps_p'] if not np.all(np.isinf(phase_data) | np.isnan(phase_data)): ax.hist( phase_data, 100, ) ax.set_xlabel(r'$-\phi[rad] / \epsilon_p$') ax.set_ylabel(r'count') # iterations for it, df in enumerate(dfs[1:]): ax = axes[1 + it, 0] ax.hist( df['psi'], 100 ) rms = np.sqrt( 1 / df['psi'].shape[0] * np.sum( df['psi'] ** 2 ) ) ax.axvline(rms, color='k', linestyle='dashed') ax.set_title('iteration: {0}'.format(it)) ax.set_xlabel('psi') ax.set_ylabel(r'count') ax = axes[1 + it, 1] Rdat = df['Re(d)'] Rmod = df['Re(f(m))'] ax.scatter( Rdat, Rmod, ) ax.set_xlabel(r'$log(R_{data}~[\Omega])$') ax.set_ylabel(r'$log(R_{mod}~[\Omega])$') ax = axes[1 + it, 2] phidat = df['Im(d)'] phimod = df['Im(f(m))'] ax.scatter( phidat, phimod, ) ax.set_xlabel(r'$\phi_{data}~[mrad]$') ax.set_ylabel(r'$\phi_{mod}~[mrad]$') fig.tight_layout() if filename is not None: fig.savefig(filename, dpi=300) else: return fig
[docs] def plot_eps_data(self, filename=None): """Plot data residuals and data error weighting Parameters ---------- filename : string|None if not None, then save plot to this file Returns ------- figure : matplotlib.Figure|None if filename is None, then return the generated figure """ assert self.eps_data is not None dfs = self.eps_data # check if this is a DC inversion if 'datum' in dfs[0]: dc_inv = True else: dc_inv = False nr_y = len(dfs) size_y = 5 / 2.54 * nr_y if dc_inv: nr_x = 1 else: nr_x = 3 size_x = 15 / 2.54 fig, axes = plt.subplots(nr_y, nr_x, figsize=(size_x, size_y)) axes = np.atleast_2d(axes) # plot initial data errors df = dfs[0] if dc_inv: ax = axes[0, 0] ax.scatter( df['datum'] / df['eps_r'], df['eps_r'], ) ax.set_xlabel(r'$-log(|R|)$') ax.set_ylabel(r'$eps_r$') else: # complex inversion ax = axes[0, 0] ax.scatter( df['-log(|R|)'] / df['eps'], df['eps'], ) ax.set_xlabel(r'$-log(|R|)$') ax.set_ylabel(r'$eps$') ax = axes[0, 1] ax.scatter( df['-log(|R|)'] / df['eps_r'], df['eps_p'], ) ax.set_xlabel(r'$-log(|R|)$') ax.set_ylabel(r'$eps$') ax = axes[0, 2] ax.scatter( df['-Phase(rad)'] / df['eps_p'], df['eps_p'], ) ax.set_xlabel(r'$-log(|R|)$') ax.set_ylabel(r'$eps$') # iterations for it, df in enumerate(dfs[1:]): ax = axes[1 + it, 0] ax.scatter( range(0, df.shape[0]), df['psi'], ) rms = np.sqrt( 1 / df['psi'].shape[0] * np.sum( df['psi'] ** 2 ) ) ax.axhline(rms, color='k', linestyle='dashed') ax.set_title('iteration: {0}'.format(it)) ax.set_xlabel('config nr') ax.set_ylabel(r'$\Psi$') fig.tight_layout() if filename is not None: fig.savefig(filename, dpi=300) else: return fig
[docs] @staticmethod def _read_eps_ctr(tomodir): """Parse a CRTomo eps.ctr file. TODO: change parameters to only provide eps.ctr file Parameters ---------- tomodir : string Path to directory path Returns ------- """ epsctr_file = tomodir + os.sep + 'inv' + os.sep + 'eps.ctr' if not os.path.isfile(epsctr_file): print('eps.ctr not found: {0}'.format(epsctr_file)) print(os.getcwd()) return 1 with open(epsctr_file, 'r') as fid: lines = fid.readlines() group = itertools.groupby(lines, lambda x: x == '\n') dfs = [] # group for x in group: # print(x) if not x[0]: data = [y for y in x[1]] if data[0].startswith('IT') or data[0].startswith('PIT'): del (data[0]) data[0] = data[0].replace('-Phase (rad)', '-Phase(rad)') tfile = StringIO(''.join(data)) df = pd.read_csv( tfile, sep=r'\s+', # delim_whitespace=True, na_values=['Infinity'], ) dfs.append(df) return dfs
[docs] def _read_inv_ctr(self, tomodir): """Read in selected results of the inv.ctr file Parameters ---------- tomodir : string directory path to a tomodir Returns ------- inv_ctr : ? structure containing inv.ctr data """ invctr_file = tomodir + os.sep + 'inv' + os.sep + 'inv.ctr' if not os.path.isfile(invctr_file): print('inv.ctr not found: {0}'.format(invctr_file)) print(os.getcwd()) return 1 # read header with open(invctr_file, 'r') as fid: lines = fid.readlines() # check for robust inversion is_robust_inversion = False nr_of_data_points = None for i, line in enumerate(lines): if line.startswith('***PARAMETERS***'): raw_value = lines[i + 7].strip()[0] if raw_value == 'T': is_robust_inversion = True if line.startswith('# Data points'): nr_of_data_points = int(line[15:].strip()) print('is robust', is_robust_inversion) # find section that contains the iteration data for i, line in enumerate(lines): if line.strip().startswith('ID it.'): break # TODO: check for robust iteration # we have three types of lines: # 1. first iteration line # 2. other main iteration lines # 3. update lines # prepare regular expressions for these three types, each in two # flavors: robust and non-robust """ ! first iteration, robust 100 FORMAT (t1,a3,t5,i3,t11,g10.4,t69,g10.4,t81,g10.4,t93,i4,t105,g9.3) ! first iteration, non-robust 101 FORMAT (t1,a3,t5,i3,t11,g10.4,t69,g10.4,t81,g10.4,t93,i4) ! other iterations, robust 110 FORMAT (t1,a3,t5,i3,t11,g10.4,t23,g10.4,t34,g10.4,t46,g10.4,t58,& i6,t69,g10.4,t81,g10.4,t93,i4,t105,g9.3,t117,f5.3) ! other iterations, non-robust 111 FORMAT (t1,a3,t5,i3,t11,g10.4,t23,g10.4,t34,g10.4,t46,g10.4,t58,& i6,t69,g10.4,t81,g10.4,t93,i4,t105,f5.3) ! update iterations, non-robust 105 FORMAT (t1,a3,t5,i3,t11,g10.4,t23,g9.3,t34,g10.4,t46,g10.4,t58,& i6,t105,f5.3) ! update iterations, robust 106 FORMAT (t1,a3,t5,i3,t11,g10.4,t23,g9.3,t34,g10.4,t46,g10.4,t58,& i6,t105,g9.3,t117,f5.3) """ # this identifies a float number, or a NaN value reg_float = ''.join(( r'((?:[-+]?(?:\d+(?:\.\d*)?|\.\d+)(?:[eE][-+]?\d+)?)', '|', '(?:NaN))' )) reg_int = r'(\d{1,3})' # (t1,a3,t5,i3,t11,g10.4,t69,g10.4,t81,g10.4,t93,i4) # first iteration line of non-robust inversion reg_it1_norob = ''.join(( r'([a-zA-Z]{1,3})', ' *' + reg_int, ' *' + reg_float, ' *' + reg_float, ' *' + reg_float, ' *' + reg_int, )) # first iteration line of robust inversion reg_it1_robust = ''.join(( '([a-zA-Z]{1,3})', r' *(\d{1,3})', ' *' + reg_float, # data RMS ' *' + reg_float, # mag RMS ' *' + reg_float, # pha RMS ' *' + reg_int, # nr excluded data ' *' + reg_float, # L1-ratio )) # second-to-last iterations, robust reg_it2plus_rob = ''.join(( r'([a-zA-Z]{1,3})', r' *(\d{1,3})', ' *' + reg_float, # data RMS ' *' + reg_float, # stepsize ' *' + reg_float, # lambda ' *' + reg_float, # roughness ' *' + reg_int, # CG-steps ' *' + reg_float, # mag RMS ' *' + reg_float, # pha RMS ' *' + reg_int, # nr excluded data ' *' + reg_float, # l1-ratio ' *' + reg_float, # steplength )) # second-to-last iterations, non-robustk # (t1,a3,t5,i3,t11,g10.4,t23,g10.4,t34,g10.4,t46,g10.4,t58,& # i6,t69,g10.4,t81,g10.4,t93,i4,t105,f5.3) reg_it2plus_norob = ''.join(( '([a-zA-Z]{1,3})', r' *(\d{1,3})', ' *' + reg_float, # data RMS ' *' + reg_float, # stepsize ' *' + reg_float, # lambda ' *' + reg_float, # roughness ' *' + reg_int, # CG-steps ' *' + reg_float, # mag RMS ' *' + reg_float, # pha RMS ' *' + reg_int, # nr excluded data ' *' + reg_float, # steplength )) # update robust reg_update_rob = ''.join(( '([a-zA-Z]{1,3})', r' *(\d{1,3})', ' *' + reg_float, # data RMS ' *' + reg_float, # stepsize ' *' + reg_float, # lambda ' *' + reg_float, # roughness ' *' + reg_int, # CG-steps ' *' + reg_float, # l1ratio )) # update non-robust reg_update_norob = ''.join(( '([a-zA-Z]{1,3})', r' *(\d{1,3})', ' *' + reg_float, # data RMS ' *' + reg_float, # stepsize ' *' + reg_float, # lambda ' *' + reg_float, # roughness ' *' + reg_int, # CG-steps ' *' + reg_float, # steplength )) # iteration counter current_iteration = 0 iterations = [] for line in lines[i:]: linec = line.strip() if linec.startswith('IT') or linec.startswith('PIT'): if linec[0:3].strip() == 'IT': it_type = 'DC/IP' else: it_type = 'FPI' values = None # main iterations if is_robust_inversion: if current_iteration == 0: # first iteration, robust g = re.compile(reg_it1_robust).search(linec).groups() keyfuncs = [ (None, None), ('iteration', int), ('dataRMS', float), ('magRMS', float), ('phaRMS', float), ('nrdata', int), ('l1ratio', float), ] values = {} for value, (key, func) in zip(g, keyfuncs): if key is not None: values[key] = func(value) else: # second-to-last iterations, robust g = re.compile( reg_it2plus_rob ).search(linec).groups() keyfuncs = [ (None, None), ('iteration', int), ('dataRMS', float), ('stepsize', float), ('lambda', float), ('roughness', float), ('cgsteps', int), ('magRMS', float), ('phaRMS', float), ('nrdata', int), ('l1ratio', float), ('steplength', float), ] values = {} for value, (key, func) in zip(g, keyfuncs): if key is not None: values[key] = func(value) values['type'] = 'main' values['main_iteration'] = current_iteration values['it_type'] = it_type iterations.append(values) current_iteration += 1 else: if current_iteration == 0: # non-robust, first iteration g = re.compile(reg_it1_norob).search(linec).groups() keyfuncs = [ (None, None), ('iteration', int), ('dataRMS', float), ('magRMS', float), ('phaRMS', float), ('nrdata', int) ] values = {} for value, (key, func) in zip(g, keyfuncs): if key is not None: values[key] = func(value) else: g = re.compile( reg_it2plus_norob ).search(linec).groups() keyfuncs = [ (None, None), ('iteration', int), ('dataRMS', float), ('stepsize', float), ('lambda', float), ('roughness', float), ('cgsteps', int), ('magRMS', float), ('phaRMS', float), ('nrdata', int), ('steplength', float), ] values = {} for value, (key, func) in zip(g, keyfuncs): if key is not None: values[key] = func(value) values['type'] = 'main' values['it_type'] = it_type values['main_iteration'] = current_iteration iterations.append(values) current_iteration += 1 elif linec.startswith('UP'): # update iterations if is_robust_inversion: # robust g = re.compile( reg_update_rob ).search(linec).groups() keyfuncs = [ (None, None), ('iteration', int), ('dataRMS', float), ('stepsize', float), ('lambda', float), ('roughness', float), ('cgsteps', int), ('l1-ratio', float), ] values = {} for value, (key, func) in zip(g, keyfuncs): if key is not None: values[key] = func(value) else: g = re.compile( reg_update_norob ).search(linec).groups() keyfuncs = [ (None, None), ('iteration', int), ('dataRMS', float), ('stepsize', float), ('lambda', float), ('roughness', float), ('cgsteps', int), ('steplength', float), ] values = {} for value, (key, func) in zip(g, keyfuncs): if key is not None: values[key] = func(value) values['type'] = 'update' values['it_type'] = it_type values['main_iteration'] = current_iteration iterations.append(values) df = pd.DataFrame(iterations) df = df.reindex([ 'iteration', 'main_iteration', 'it_type', 'type', 'dataRMS', 'magRMS', 'phaRMS', 'lambda', 'roughness', 'cgsteps', 'nrdata', 'steplength', 'stepsize', 'l1ratio', ], axis=1) df['nrdata'] = nr_of_data_points - df['nrdata'] return df
[docs] def plot_inversion_evolution(self, filename=None): """Plot the evolution of inversion properties (lambda, RMS, ...) Parameters ---------- filename : string|None if not None, save plot to this file Returns ------- fig : matplotlib.Figure|None if filename is None, return the figure object """ assert self.inv_stats is not None df = self.inv_stats fig, ax = plt.subplots(figsize=(16 / 2.54, 9 / 2.54), dpi=300) plot_objs = [] # main iterations main = df.query('type == "main"') obj = ax.plot( main['main_iteration'], main['lambda'], '.-', color='k', label=r'$\lambda$' ) # import IPython # IPython.embed() plot_objs.append(obj[0]) ax.set_ylabel(r'$\lambda$') ax2 = ax.twinx() obj = ax2.plot( main['main_iteration'], main['dataRMS'], '.-', color='r', label='dataRMS', ) plot_objs.append(obj[0]) ax2.set_ylabel('data RMS') ax3 = ax.twinx() ax3.spines['right'].set_position(('axes', 1.2)) # update iterations g = df.groupby('main_iteration') # https://matplotlib.org/stable/api/prev_api_changes/api_changes_3.9.0.html#top-level-cmap-registration-and-access-functions-in-mpl-cm if mpl_version[0] <= 3 and mpl_version[1] < 9: cm = mpl.cm.get_cmap('jet') else: cm = mpl.colormaps['jet'] SM = mpl.cm.ScalarMappable(norm=None, cmap=cm) colors = SM.to_rgba(np.linspace(0, 1, g.ngroups)) for color, (name, group) in zip(colors, g): # plot update evolution updates = group.query('type == "update"') print('#############################') print(name) print(updates) obj = ax3.scatter( range(0, updates.shape[0]), updates['lambda'], color=color, label='it {}'.format(name), ) plot_objs.append(obj) ax3.set_ylabel('update lambdas') ax.set_xlabel('iteration number') ax.grid(None) ax2.grid(None) ax3.grid(None) # added these three lines labs = [label.get_label() for label in plot_objs] ax.legend(plot_objs, labs, loc=0, fontsize=6.0) fig.tight_layout() if filename is None: return fig else: fig.savefig(filename, dpi=300)
[docs] def _read_l1_coverage(self, tomodir): """Read in the L1 data-weighted coverage (or cumulated sensitivity) of an inversion Parameters ---------- tomodir : str directory path to a tomodir """ l1_dw_coverage_file = os.sep.join( (tomodir, 'inv', 'coverage.mag') ) if not os.path.isfile(l1_dw_coverage_file): print( 'Info: coverage.mag not found: {0}'.format(l1_dw_coverage_file) ) print(os.getcwd()) return 1 try: nr_cells, max_sens = np.loadtxt(l1_dw_coverage_file, max_rows=1) l1_dw_log10_norm = np.loadtxt( l1_dw_coverage_file, skiprows=1)[:, 2] except Exception: # maybe old format - ignore for now return self.header_l1_dw_log10_norm = { 'max_value': max_sens, } pid = self.parman.add_data(l1_dw_log10_norm) if 'inversion' not in self.a: self.a['inversion'] = {} self.a['inversion']['l1_dw_log10_norm'] = pid
[docs] def _read_l2_coverage(self, tomodir): """Read in the L2 data-weighted coverage (or cumulated sensitivity) of an inversion Parameters ---------- tomodir : str directory path to a tomodir """ l2_dw_coverage_file = os.sep.join( (tomodir, 'inv', 'ata.diag') ) if not os.path.isfile(l2_dw_coverage_file): print( 'Info: ata.diag not found: {0}'.format(l2_dw_coverage_file) ) print(os.getcwd()) return 1 nr_cells, min_l2, max_l2 = np.loadtxt(l2_dw_coverage_file, max_rows=1) l2_dw_log10_norm = np.loadtxt(l2_dw_coverage_file, skiprows=1)[:, 1] self.header_l1_dw_log10_norm = { 'min_value': min_l2, 'max_value': max_l2, } pid = self.parman.add_data(l2_dw_log10_norm) if 'inversion' not in self.a: self.a['inversion'] = {} self.a['inversion']['l2_dw_log10_norm'] = pid
[docs] def _read_resm_m(self, tomodir): """Read in the diagonal entries of the resolution matrix of an inversion Parameters ---------- tomodir : str directory path to a tomodir """ resm_file = tomodir + os.sep + 'inv' + os.sep + 'res_m.diag' if not os.path.isfile(resm_file): print('Info: res_m.diag not found: {0}'.format(resm_file)) print(os.getcwd()) return 1 with open(resm_file, 'rb') as fid: first_line = fid.readline().strip() header_raw = np.fromstring(first_line, count=4, sep=' ') # nr_cells = int(header_raw[0]) self.header_res_m = { 'r_lambda': float(header_raw[1]), 'r_min': float(header_raw[2]), 'r_max': float(header_raw[3]), } subdata = np.genfromtxt(fid) pid = self.parman.add_data(subdata[:, 0]) if 'inversion' not in self.a: self.a['inversion'] = {} self.a['inversion']['resm'] = pid
[docs] def register_measurements(self, mag, pha=None): """Register measurements as magnitude/phase measurements used for the inversion Parameters ---------- mag: int|numpy.ndarray magnitude measurement id for the corresponding measurement data in self.configs.measurements. If mag is a numpy.ndarray, assume mag to be the data itself an register it pha: int, optional phase measurement id for the corresponding measurement data in self.configs.measurements. If not present, a new measurement set will be added with zeros only. """ if isinstance(mag, np.ndarray): # make sure that this array is 1D at the most # the 0 indicates only one measurement assert len(mag.squeeze().shape) in (0, 1) mid_mag = self.configs.add_measurements(mag) else: mid_mag = mag if pha is not None: if isinstance(pha, np.ndarray): mid_pha = self.configs.add_measurements(pha) else: mid_pha = pha else: mid_pha = self.configs.add_measurements( np.zeros_like( self.configs.measurements[mid_mag] ) ) self.assignments['measurements'] = [mid_mag, mid_pha]
[docs] def register_forward_model(self, pid_mag, pid_pha): """Register parameter sets as the forward models for magnitude and phase Parameters ---------- pid_mag: int parameter id corresponding to the magnitude model pid_pha: int parameter id corresponding to the phase model """ self.register_magnitude_model(pid_mag) self.register_phase_model(pid_pha)
[docs] def register_magnitude_model(self, pid): """Set a given parameter model to the forward magnitude model """ if self.assignments['forward_model'] is None: self.assignments['forward_model'] = [None, None] self.assignments['forward_model'][0] = pid
[docs] def register_phase_model(self, pid): """Set a given parameter model to the forward phase model """ if self.assignments['forward_model'] is None: self.assignments['forward_model'] = [None, None] self.assignments['forward_model'][1] = pid
[docs] def add_homogeneous_model(self, magnitude, phase=0): """Add a homogeneous resistivity model to the tomodir. This is useful for synthetic measurements. Parameters ---------- magnitude : float magnitude [Ohm m] value of the homogeneous model phase : float, optional phase [mrad] value of the homogeneous model Returns ------- pid_mag : int ID value of the parameter set of the magnitude model pid_pha : int ID value of the parameter set of the phase model Note that the parameter sets are automatically registered as the forward models for magnitude and phase values. """ if self.assignments['forward_model'] is not None: print('model already set, will overwrite') # generate distributions magnitude_model = np.ones(self.grid.nr_of_elements) * magnitude phase_model = np.ones(self.grid.nr_of_elements) * phase pid_mag = self.parman.add_data(magnitude_model) pid_pha = self.parman.add_data(phase_model) self.assignments['forward_model'] = [pid_mag, pid_pha] return pid_mag, pid_pha
[docs] def check_measurements_against_sensitivities( self, magnitude, phase=0, return_plot=False): """Check for all configurations if the sensitivities add up to a given homogeneous model Parameters ---------- magnitude : float magnitude used for the homogeneous model phase : float, optional, default=0 phase value used for the homogeneous model return_plot : bool, optional, default=False create a plot analyzing the differences Returns ------- results : Nx6 numpy.ndarray Results of the analysis. * magnitude measurement [Ohm] * sum of sensitivities [Volt] * relative deviation of sensitivity-sum from measurement [in percent] fig : matplotlib.figure, optional figure object. Only returned of return_plot=True axes : list list of axes corresponding to the figure Examples -------- >>> #!/usr/bin/python import crtomo.tdManager as CRtdMan tdm = CRtdMan.tdMan( elem_file='grid/elem.dat', elec_file='grid/elec.dat', config_file='config/config.dat', ) results, fig, axes = tdm.check_measurements_against_sensitivities( magnitude=100, phase=-10, return_plot=True ) fig.savefig('sensitivity_comparison.png', dpi=300) """ # generate a temporary tdMan instance tdm = tdMan( grid=self.grid, configs=self.configs, ) tdm.add_homogeneous_model(magnitude, phase) measurements = tdm.measurements() Z = measurements[:, 0] * np.exp(1j * measurements[:, 1] / 1000) results = [] for nr in range(0, tdm.configs.nr_of_configs): sensitivities = tdm.get_sensitivity(nr) sens_re = sensitivities[0][0] sens_im = sensitivities[0][1] sens_mag = 1.0 / measurements[nr, 0] * ( np.real(Z[nr]) * sens_re + np.imag(Z[nr]) * sens_im ) V_mag_from_sens = sens_mag.sum() / magnitude if phase != 0: outer = 1 / (1 + (np.imag(Z[nr]) / np.real(Z[nr])) ** 2) inner1 = - sens_re / np.real(Z[nr]) ** 2 * np.imag(Z[nr]) inner2 = sens_im * np.real(Z[nr]) sens_pha = outer * (inner1 + inner2) V_pha_from_sens = sens_pha.sum() / phase else: V_pha_from_sens = None print( 'WARNING: We still do not know where the minus sign comes ' + 'from!' ) V_mag_from_sens *= -1 results.append(( measurements[nr][0], V_mag_from_sens, (measurements[nr][0] - V_mag_from_sens) / measurements[nr][0] * 100, measurements[nr][1], V_pha_from_sens, (measurements[nr][1] - V_mag_from_sens) / measurements[nr][1] * 100, )) results = np.array(results) if return_plot: nr_x = 2 if phase == 0: nr_x = 1 fig, axes = plt.subplots(1, nr_x, figsize=(15 / 2.54, 7 / 2.54)) fig.suptitle('Comparison sum of sensitivities to measurements') # plot phase first if phase != 0: ax = axes[1] ax.plot(results[:, 5], '.') ax.set_xlabel('configuration number') ax.set_ylabel( r'$\frac{V_i^{\mathrm{pha}} - ' + r' \sum s_{ij}^{\mathrm{pha}} \cdot ' + r'\phi_0}{V_i}~[\%]$' ) # set ax for magnitude plot ax = axes[0] else: ax = axes ax.plot(results[:, 2], '.') ax.set_xlabel('configuration number') # ax.set_ylabel('deviation from magnitude measurement [\%]') ax.set_ylabel( r'$\frac{V_i^{\mathrm{mag}} - ' + r'\sum s_{ij}^{\mathrm{mag}} \cdot ' + r'\sigma_0}{V_i}~[\%]$' ) fig.tight_layout() return results, fig, axes else: return results
[docs] def show_parset(self, pid, **kwargs): """Plot a given parameter set. Thin wrapper around :py:meth:`crtomo.plotManager.plotManager.plot_elements_to_ax`. kwargs will be directly supplied to the plot function """ if 'ax' in kwargs: ax = kwargs.get('ax') kwargs.pop('ax') fig = ax.get_figure() else: fig, ax = plt.subplots() self.plot.plot_elements_to_ax(pid, ax=ax, **kwargs) return fig, ax
[docs] def plot_forward_models(self): """Plot the forward models (requires the forward models to be loaded) """ pids_rho = self.assignments.get('forward_model', None) if pids_rho is None: raise Exception('you need to load the forward model first') fig, axes = plt.subplots(1, 2, figsize=(16 / 2.54, 8 / 2.54)) ax = axes[0] self.plot.plot_elements_to_ax( pids_rho[0], ax=ax, plot_colorbar=True, cblabel=r'$|\rho| [\Omega m]$', ) ax = axes[1] self.plot.plot_elements_to_ax( pids_rho[1], ax=ax, plot_colorbar=True, cblabel=r'$\phi [mrad]$', ) fig.tight_layout() return fig, axes
@functools.wraps(pM.ParMan.extract_points) def extract_points(self, pid, points): values = self.parman.extract_points(pid, points) return values @functools.wraps(pM.ParMan.extract_along_line) def extract_along_line(self, pid, xy0, xy1, N=10): values = self.parman.extract_along_line(pid, xy0, xy1, N) return values
[docs] def set_prior_model(self, pid_rmag, pid_rpha): self.assignments['prior_model'] = (pid_rmag, pid_rpha) self.crtomo_cfg['prior_model'] = '../inv/prior.model'
[docs] def set_starting_model(self, pid_rmag, pid_rpha): self.set_prior_model(pid_rmag, pid_rpha)
[docs] def register_data_errors( self, mid_rmag_error, mid_rpha_error=None, norm_mag=1, norm_pha=1): """Register individual data errors. The normalization factors are used as follows: mag_error -> mag_error / norm_mag ** 2 pha_error -> pha_error / norm_pha ** 2 That means that you need to decrease the values in order to increase individual errors. Parameters ---------- mid_mag_error : int ID to the corresponding measurement in tdMan.configs holding the magnitude errors (linear) mid_pha_error : int ID to the corresponding measurement in tdMan.configs holding the phase errors (linear) norm_mag : float Magnitude errors can be normalized by the square of this value. norm_pha : float Phase errors can be normalized by the square of this value. """ self.assignments['measurement_errors'] = [ mid_rmag_error, mid_rpha_error ] self.assignments['error_norm_factor_mag'] = norm_mag self.assignments['error_norm_factor_pha'] = norm_pha
[docs] def copy(self): """Provide a copy of yourself. Do not copy modeling or inversion results, but copy everything that can be used for modeling or inversion """ tdm_copy = tdMan(grid=self.grid) tdm_copy.crtomo_cfg = self.crtomo_cfg.copy() tdm_copy.crmod_cfg = self.crmod_cfg.copy() # configs # forward model if self.a['forward_model'] is not None: raise Exception('not implemented yet') # data if len(self.a['measurements']) > 1: raise Exception('not implemented yet') return tdm_copy
[docs] def sensitivity_center_of_masses(self, mode='none'): """ """ assert 'sensitivities' in self.a, \ "This function requires sensitivities" mag_sens_indices = [ self.a['sensitivities'][key][0] for key in sorted( self.a['sensitivities'].keys() ) ] coms = self.parman.center_of_mass_value_multiple( mag_sens_indices, mode=mode, ) return coms
[docs] def plot_inversion_result_rmag( self, figsize=None, log10=False, overlay_coverage=False, **kwargs): """Plot the final inversion results, magnitude-results only Parameters ---------- figsize: (float, float)|None Figure size of the matplotlib figure in inches. log10: bool, default: False Shortcut to force a log10 conversion of the resistivity data overlay_coverage: bool, default: False If True, use the cumulated coverage to adjust the transparency of the plot **kwargs: dict will be propagated into self.plot.plot_elements_to_ax Returns ------- fig: matplotlib.Figure The created figure ax: matplotlib.Axes Plot axes """ assert self.assignments['inversion'] is not None, \ 'need inversion results to plot anything' pid_mag = self.assignments['inversion']['rmag'][-1] if 'plot_colorbar' not in kwargs: kwargs['plot_colorbar'] = True if 'cmap_name' not in kwargs: kwargs['cmap_name'] = 'turbo' if 'cblabel' not in kwargs: kwargs['cblabel'] = reda_units.get_label('r', log10=log10) if log10: kwargs['converter'] = PlotManager.converter_abs_log10 if figsize is None: figsize = (16 / 2.54, 8 / 2.54) if overlay_coverage: key = 'l1_dw_log10_norm' if key in self.a['inversion']: abscov = np.abs( self.parman.parsets[self.a['inversion'][key]] ) normcov = np.divide(abscov, 3) normcov[np.where(normcov > 1)] = 1 alpha_channel = np.subtract(1, normcov) kwargs['cid_alpha'] = alpha_channel fig, ax = plt.subplots(1, 1, figsize=figsize) self.plot.plot_elements_to_ax( pid_mag, ax=ax, **kwargs, ) fig.tight_layout() return fig, ax
[docs] def plot_coverage(self, figsize=None, **kwargs): """Plot the final cumulated coverage Parameters ---------- figsize: (float, float)|None Figure size of the matplotlib figure in inches. **kwargs: dict will be propagated into self.plot.plot_elements_to_ax Returns ------- fig: matplotlib.Figure The created figure ax: matplotlib.Axes Plot axes """ assert self.assignments['inversion'] is not None, \ 'need inversion results to plot anything' if 'plot_colorbar' not in kwargs: kwargs['plot_colorbar'] = True if 'cmap_name' not in kwargs: kwargs['cmap_name'] = 'turbo' # if 'cblabel' not in kwargs: # kwargs['cblabel'] = r'$|\rho| [\Omega m]$' if figsize is None: figsize = (16 / 2.54, 8 / 2.54) key = 'l1_dw_log10_norm' if key in self.a['inversion']: pid_l1_cov = self.a['inversion'][key] fig, ax = plt.subplots(1, 1, figsize=figsize) self.plot.plot_elements_to_ax( pid_l1_cov, ax=ax, **kwargs, ) fig.tight_layout() return fig, ax
[docs] def plot_decouplings_to_ax(self, ax, plot_transistions=True): """Visualize decouplings. Usually you may want to plot the mesh to this axis first Parameters ---------- ax: matplotlib.Axes Axis to plot to plot_transistions: bool (True) If True, then also visualize the decoupled transitions """ if self.decouplings is None: return if plot_transistions: centroids = self.grid.get_element_centroids() for A, B, strength in self.decouplings: A = int(A) B = int(B) ax.plot( [centroids[A][0], centroids[B][0]], [centroids[A][1], centroids[B][1]], color='green', linewidth=1.0 * strength + 0.1, ) for element1, element2, strength in self.decouplings: element1 = int(element1) element2 = int(element2) nodes = np.intersect1d( self.grid.elements[element1], self.grid.elements[element2] ) (a_x, a_y) = self.grid.nodes['presort'][nodes[0]][1:3] (b_x, b_y) = self.grid.nodes['presort'][nodes[1]][1:3] ax.plot([a_x, b_x], [a_y, b_y], color='r')
[docs] def plot_inversion_misfit_pseudosection(self): if self.eps_data is None: return psi = self.eps_data[-1][['a', 'b', 'm', 'n', 'psi']] rms = self.inv_stats.iloc[-1]['dataRMS'] from reda.plotters.pseudoplots import plot_pseudosection_type2 fig, ax, cb = plot_pseudosection_type2( psi, 'psi', markersize=100, cmap='seismic', cbmin=0, cbmax=2, title='RMS: {} Mag-Error: {} % + {}'.format( rms, self.crtomo_cfg['mag_rel'], self.crtomo_cfg['mag_abs'], ), ) # import numpy as np # import matplotlib.pylab as plt # rms = 1 / np.sqrt( # psi.shape[0]) * np.sqrt(np.sum(psi['psi'] ** 2)) # print('error weighted RMS:', rms) # fig, ax = plt.subplots() # _ = ax.hist(psi['psi'], 100) return fig, ax, cb
[docs] def get_fwd_reda_container(self): """Return a REDA container, either reda.ERT, or reda.CR, with modeled data Returns ------- container : reda.ERT|reda.CR|None """ m_ids = self.a['measurements'] if m_ids is None: return None if len(m_ids) == 1: # ERT ert = reda.ERT() data = np.hstack(( self.configs.configs, self.configs.measurements[m_ids[0]][:, np.newaxis], )) df = pd.DataFrame( data, columns=['a', 'b', 'm', 'n', 'r', 'rpha'], ) ert.add_dataframe(df) return ert elif len(m_ids) == 2: # Complex-resistivity cr = reda.CR() data = np.hstack(( self.configs.configs, self.configs.measurements[m_ids[0]][:, np.newaxis], self.configs.measurements[m_ids[1]][:, np.newaxis], )) df = pd.DataFrame( data, columns=['a', 'b', 'm', 'n', 'r', 'rpha'], ) cr.add_dataframe(df) return cr return None
[docs] def plot_pseudo_locs(self, spacing=1.0): """Plot pseudo-locations of measurement configurations. This function does not take into account real electrode locations. However, for surface configurations a 'spacing' parameter can be provided Parameters ---------- spacing: float, default=1.0 Electrode spacing """ ert = reda.ERT() data = np.hstack(( self.configs.configs, np.ones(self.configs.configs.shape[0])[:, np.newaxis], )) df = pd.DataFrame( data, columns=['a', 'b', 'm', 'n', 'r'], ) ert.add_dataframe(df) fig, ax, cb = ert.pseudosection_type2( markersize=100, spacing=spacing, xlabel='X-Center [m]', ylabel='Pseudodepth [m]', ) cb.remove() return fig, ax