Source code for reda.containers.SIP

"""Container for Spectral Induced Polarization (SIP) measurements
"""
import numpy as np
import pandas as pd

import reda.utils.mpl

from reda.containers.BaseContainer import ImportersBase
from reda.containers.BaseContainer import BaseContainer

import reda.importers.sip04 as reda_sip04
import reda.importers.mpt_das1 as reda_mpt
# from reda.importers.crtomo import load_mod_file

from reda.utils.decorators_and_managers import append_doc_of
from reda.utils.decorators_and_managers import LogDataChanges

plt, mpl = reda.utils.mpl.setup()


[docs] class SIPImporters(ImportersBase): """This class provides wrappers for most of the importer functions, and is meant to be inherited by the data containers """ # @append_doc_of(reda_sip04.import_sip04)
[docs] def import_sip04(self, filename, timestep=None): """SIP04 data import Parameters ---------- filename: string Path to .mat or .csv file containing SIP-04 measurement results Examples -------- :: import tempfile import reda with tempfile.TemporaryDirectory() as fid: reda.data.download_data('sip04_fs_01', fid) sip = reda.SIP() sip.import_sip04(fid + '/sip_dataA.mat') """ df = reda_sip04.import_sip04_data(filename) if timestep is not None: print('adding timestep') df['timestep'] = timestep self._add_to_container(df) print('Summary:') self._describe_data(df)
[docs] @append_doc_of(reda_mpt.import_das1_td) def import_mpt(self, filename, **kwargs): """MPT DAS 1 FD importer timestep: int or :class:`datetime.datetime` if provided use this value to set the 'timestep' column of the produced dataframe. Default: 0 """ timestep = kwargs.get('timestep', None) if 'timestep' in kwargs: del (kwargs['timestep']) self.logger.info('MPT DAS-1 import') with LogDataChanges(self, filter_action='import'): data, electrodes, topography = reda_mpt.import_das1_sip( filename, **kwargs) if timestep is not None: data['timestep'] = timestep self._add_to_container(data) if kwargs.get('verbose', False): print('Summary:') self._describe_data(data)
[docs] class SIP(BaseContainer, SIPImporters): """.""" def __init__(self, data=None, electrode_positions=None, topography=None): """.""" self.setup_logger() self.data = self.check_dataframe(data) self.required_columns = [ 'a', 'b', 'm', 'n', 'r' 'frequency', 'rpha' 'Zt', ] self.plot_columns = [ 'frequency', 'Zt' ] self.electrode_positions = electrode_positions self.topography = topography
[docs] def check_dataframe(self, dataframe): """Check the given dataframe for the required type and columns """ if dataframe is None: return None # is this a DataFrame if not isinstance(dataframe, pd.DataFrame): raise Exception( 'The provided dataframe object is not a pandas.DataFrame' ) for column in self.required_columns: if column not in dataframe: raise Exception('Required column not in dataframe: {0}'.format( column )) return dataframe
[docs] def reduce_duplicate_frequencies(self): """In case multiple frequencies were measured, average them and compute std, min, max values for zt. In case timesteps were added (i.e., multiple separate measurements), group over those and average for each timestep. Examples -------- :: import tempfile import reda with tempfile.TemporaryDirectory() as fid: reda.data.download_data('sip04_fs_06', fid) sip = reda.SIP() sip.import_sip04(fid + '/sip_dataA.mat', timestep=0) # well, add the spectrum again as another timestep sip.import_sip04(fid + '/sip_dataA.mat', timestep=1) df = sip.reduce_duplicate_frequencies() """ group_keys = ['frequency', ] if 'timestep' in self.data.columns: group_keys = group_keys + ['timestep', ] g = self.data.groupby(group_keys) def group_apply(item): y = item[['zt_1', 'zt_2', 'zt_3']].values.flatten() zt_imag_std = np.std(np.imag(y)) zt_real_std = np.std(np.real(y)) zt_imag_min = np.min(np.imag(y)) zt_real_min = np.min(np.real(y)) zt_imag_max = np.max(np.imag(y)) zt_real_max = np.max(np.real(y)) zt_imag_mean = np.mean(np.imag(y)) zt_real_mean = np.mean(np.real(y)) dfn = pd.DataFrame( { 'zt_real_mean': zt_real_mean, 'zt_real_std': zt_real_std, 'zt_real_min': zt_real_min, 'zt_real_max': zt_real_max, 'zt_imag_mean': zt_imag_mean, 'zt_imag_std': zt_imag_std, 'zt_imag_min': zt_imag_min, 'zt_imag_max': zt_imag_max, }, index=[0, ] ) dfn['count'] = len(y) dfn.index.name = 'index' return dfn p = g.apply(group_apply) p.index = p.index.droplevel('index') if len(group_keys) > 1: p = p.swaplevel(0, 1).sort_index() return p
[docs] def frequencies(self): """Return all frequencies""" return np.array(list(self.data.groupby('frequency').groups.keys()))
[docs] def export_specs_to_ascii(self, frequency_file, data_file): """ """ subdata = pd.pivot_table( self.data.sort_values('frequency'), index=['a', 'b', 'm', 'n'], columns='frequency', values=['r', 'rpha'] ) np.savetxt(frequency_file, self.frequencies()) np.savetxt(data_file, subdata.values)