Source code for reda.importers.iris_syscal_pro

# *-* coding: utf-8 *-*
"""Read binary data from the IRIS Instruments Syscal Pro system

"""
import struct
from io import StringIO
import io
import logging

import pandas as pd
import numpy as np

import reda
from reda.importers.utils.decorators import enable_result_transforms

logger = logging.getLogger(__name__)


[docs] @enable_result_transforms def import_txt(filename, **kwargs): """Import Syscal measurements from a text file, exported as 'Spreadsheet'. At this point we expect only x-coordinates to be exported. As such the columns Spa.1 to Spa.4 are assumed to be x coordinates for a,b,m,n. Parameters ---------- filename: str Input filename assume_regular_electrodes_x : None|tuple(nr_electrodes, spacing) If not None, then assume measurements were taken using a profile of regular electrodes in x directions. Fill in any electrodes not used in the data to align the logical electrode numbers with the physical electrode numbers. Use the electrode spacing set in the system (usually a default of 1 m is used), and then change this to the real spacing using the 'elecs_transform_reg_spacing_x' parameter. elecs_transform_reg_spacing_x : tuple(old, new)|None, optional If not None, then assume a regular electrode spacing in x direction with spacing 'old'. This is often 1 m, a default of the systems. However, in reality often other spacings are used, and this parameter will lead to a transformation of the old (device-specific) spacing to the true spacing. shift_by_xyz : tuple|list|numpy.ndarray of size 1 or 2 or 3, optional If set, shift electrode positions by adding this vector Length of 1 assumes that only the x coordinate of the vector differs from zero. Length of 2 assume a shift in (x,z) direction. This parameter is evaluated after the 'elecs_transform_reg_spacing_x' parameter - as such you can and must use real spacings in this case. reciprocals: int, optional if provided, then assume that this is a reciprocal measurements where only the electrode cables were switched. The provided number N is treated as the maximum electrode number, and denotations are renamed according to the equation :math:`X_n = N - (X_a - 1)` Returns ------- data: :py:class:`pandas.DataFrame` Contains the measurement data elec_mgr : :py:class:`reda.utils.electrode_manager.electrode_manager` Electrode manager that manages the electrode positions topography: None No topography information is contained in the text files, so we always return None """ # read in text file into a buffer with open(filename, 'r') as fid: text = fid.read() strings_to_replace = { 'Mixed / non conventional': 'Mixed/non-conventional', 'Date': 'Date Time AM-PM', } for key in strings_to_replace.keys(): text = text.replace(key, strings_to_replace[key]) buffer = StringIO(text) # read data file data_raw = pd.read_csv( buffer, # sep='\t', sep=r'\s+', # delim_whitespace=True, ) # clean up column names data_raw.columns = [x.strip() for x in data_raw.columns.tolist()] # convert coordinates to logical electrode numbers elec_mgr = reda.electrode_manager() elec_mgr.set_ordering_to_sort_zyx() elec_mgr.add_by_position(data_raw[['Spa.1', ]].values) elec_mgr.add_by_position(data_raw[['Spa.2', ]].values) elec_mgr.add_by_position(data_raw[['Spa.3', ]].values) elec_mgr.add_by_position(data_raw[['Spa.4', ]].values) assume_regular_electrodes_x = kwargs.get( 'assume_regular_electrodes_x', None) if assume_regular_electrodes_x is not None: nr_electrodes = assume_regular_electrodes_x[0] spacing = assume_regular_electrodes_x[1] assert isinstance(nr_electrodes, int) assert isinstance(spacing, float) elec_mgr.conform_to_regular_x_spacing(spacing, nr_electrodes) data = pd.DataFrame() data['a'] = elec_mgr.get_electrode_numbers_for_positions( np.atleast_2d(data_raw['Spa.1'].values).T ) data['b'] = elec_mgr.get_electrode_numbers_for_positions( np.atleast_2d(data_raw['Spa.2'].values).T ) data['m'] = elec_mgr.get_electrode_numbers_for_positions( np.atleast_2d(data_raw['Spa.3'].values).T ) data['n'] = elec_mgr.get_electrode_numbers_for_positions( np.atleast_2d(data_raw['Spa.4'].values).T ) transform_regular_spacing_x = kwargs.get( 'elecs_transform_reg_spacing_x', None) if transform_regular_spacing_x is not None: assert len(transform_regular_spacing_x) == 2, \ 'spacing transform must be of length 2' old_spacing = transform_regular_spacing_x[0] new_spacing = transform_regular_spacing_x[1] logger.debug('transform_regular_spacing_x from {} to {}') elec_mgr._electrode_positions['x'] = elec_mgr._electrode_positions[ 'x' ] / old_spacing * new_spacing shift_by_xyz = kwargs.get('shift_by_xyz', None) if shift_by_xyz is not None: elec_mgr.shift_positions_xyz(shift_by_xyz) # [mV] / [mA] data['r'] = data_raw['Vp'] / data_raw['In'] data['Vmn'] = data_raw['Vp'] data['Iab'] = data_raw['In'] # rename electrode denotations rec_max = kwargs.get('reciprocals', None) if rec_max is not None: logger.warning('renumbering electrode numbers') data[['a', 'b', 'm', 'n']] = rec_max + 1 - data[['a', 'b', 'm', 'n']] return data, elec_mgr, None
[docs] @enable_result_transforms def import_bin(filename, **kwargs): """Read a .bin file generated by the IRIS Instruments Syscal Systems and return a curated Pandas dataframe for further processing. This dataframe contains only information currently deemed important. Use the function reda.importers.iris_syscal_pro_binary._import_bin to extract ALL information from a given .bin file. Note that the .bin files contain electrodes positions for the electrodes. Electrode numbers are then inferred from those. Parameters ---------- filename : string|io.BytesIO path to input filename or io.BytesIO object containing the binary data assume_regular_electrodes_x : None|tuple(nr_electrodes, spacing) If not None, then assume measurements were taken using a profile of regular electrodes in x directions. Fill in any electrodes not used in the data to align the logical electrode numbers wit the physical electrode numbers. Use the electrode spacing set in the system (usually a default of 1 m is used), and then change this to the real spacing using the 'elecs_transform_reg_spacing_x' parameter. elecs_transform_reg_spacing_x : tuple(old, new)|None, optional If not None, then assume a regular electrode spacing in x direction with spacing 'old'. This is often 1 m, a default of the systems. However, in reality often other spacings are used, and this parameter will lead to a transformation of the old (device-specific) spacing to the true spacing. shift_by_xyz : tuple|list|numpy.ndarray of size 1 or 2 or 3, optional If set, shift electrode positions by adding this vector Length of 1 assumes that only the x coordinate of the vector differs from zero. Length of 2 assume a shift in (x,z) direction. This parameter is evaluated after the 'elecs_transform_reg_spacing_x' parameter - as such you can and must use real spacings in this case. reciprocals : int, optional if provided, then assume that this is a reciprocal measurement where only the electrode cables were switched. The provided number N is treated as the maximum electrode number, and denotations are renamed according to the equation :math:`X_n = N - (X_a - 1)` check_meas_nums : bool, optional if True, then check that the measurement numbers are consecutive. Don't return data after a jump to smaller measurement numbers (this usually indicates that more data points were downloaded than are part of a specific measurement. Default: True skip_rows : int, optional Ignore this number of rows at the beginning, e.g., because they were inadvertently imported from an earlier measurement. Default: 0 Returns ------- data : :py:class:`pandas.DataFrame` Contains the measurement data elec_mgr : :py:class:`reda.utils.electrode_manager.electrode_manager` Electrode manager that manages the electrode positions topography : None No topography information is contained in the text files, so we always return None """ metadata, data_raw = _import_bin(filename) skip_rows = kwargs.get('skip_rows', 0) if skip_rows > 0: data_raw.drop(data_raw.index[range(0, skip_rows)], inplace=True) data_raw = data_raw.reset_index() if kwargs.get( 'check_meas_nums', True) and 'measurement_num' in data_raw.columns: # check that first number is 0 if data_raw['measurement_num'].iloc[0] != 0: logger.warning( 'WARNING: Measurement numbers do not start with 0 ' + '(did you download ALL data?)' ) # check that all measurement numbers increase by one if not np.all(np.diff(data_raw['measurement_num'])) == 1: logger.warning(' '.join(( 'WARNING: ', 'Measurement numbers are not consecutive.', 'Perhaps the first measurement belongs to another' ' measurement?', ' Use the skip_rows parameter to skip those measurements' ))) # now check if there is a jump in measurement numbers somewhere # ignore first entry as this will always be nan diff = data_raw['measurement_num'].diff()[1:] jump = np.where(diff != 1)[0] if len(jump) > 0 and not np.all(data_raw['measurement_num'] == 0): logger.warning( 'WARNING: One or more jumps in measurement numbers detected') logger.warning('The jump indices are:') for jump_nr in jump: logger.warning(jump_nr) logger.info('Removing data points subsequent to the first jump') data_raw = data_raw.iloc[0:jump[0] + 1, :] if data_raw.shape[0] == 0: # no data present, return a bare DataFrame return pd.DataFrame(columns=['a', 'b', 'm', 'n', 'r']), None, None # convert coordinates to logical electrode numbers elec_mgr = reda.electrode_manager() elec_mgr.set_ordering_to_sort_zyx() elec_mgr.add_by_position(data_raw[['x_a', 'y_a', 'z_a']].values) elec_mgr.add_by_position(data_raw[['x_b', 'y_b', 'z_b']].values) elec_mgr.add_by_position(data_raw[['x_m', 'y_m', 'z_m']].values) elec_mgr.add_by_position(data_raw[['x_n', 'y_n', 'z_n']].values) assume_regular_electrodes_x = kwargs.get( 'assume_regular_electrodes_x', None) if assume_regular_electrodes_x is not None: nr_electrodes = assume_regular_electrodes_x[0] spacing = assume_regular_electrodes_x[1] assert isinstance(nr_electrodes, int) assert isinstance(spacing, float) elec_mgr.conform_to_regular_x_spacing(spacing, nr_electrodes) data = pd.DataFrame() data['a'] = elec_mgr.get_electrode_numbers_for_positions( data_raw[['x_a', 'y_a', 'z_a']].values ) data['b'] = elec_mgr.get_electrode_numbers_for_positions( data_raw[['x_b', 'y_b', 'z_b']].values ) data['m'] = elec_mgr.get_electrode_numbers_for_positions( data_raw[['x_m', 'y_m', 'z_m']].values ) data['n'] = elec_mgr.get_electrode_numbers_for_positions( data_raw[['x_n', 'y_n', 'z_n']].values ) transform_regular_spacing_x = kwargs.get( 'elecs_transform_reg_spacing_x', None) if transform_regular_spacing_x is not None: assert len(transform_regular_spacing_x) == 2, \ 'spacing transform must be of length 2' old_spacing = transform_regular_spacing_x[0] new_spacing = transform_regular_spacing_x[1] logger.debug('transform_regular_spacing_x from {} to {}') elec_mgr._electrode_positions['x'] = elec_mgr._electrode_positions[ 'x' ] / old_spacing * new_spacing shift_by_xyz = kwargs.get('shift_by_xyz', None) if shift_by_xyz is not None: elec_mgr.shift_positions_xyz(shift_by_xyz) # [mV] / [mA] data['r'] = data_raw['vp'] / data_raw['Iab'] data['Vmn'] = data_raw['vp'] data['Iab'] = data_raw['Iab'] if 'vab' in data_raw.columns: data['vab'] = data_raw['vab'] data['mdelay'] = data_raw['mdelay'] data['Tm'] = data_raw['Tm'] data['Mx'] = data_raw['Mx'] data['chargeability'] = data_raw['m'] data['q'] = data_raw['q'] # rename electrode denotations rec_max = kwargs.get('reciprocals', None) if rec_max is not None: logger.info( 'renumbering electrode numbers for reciprocal measurements') data[['a', 'b', 'm', 'n']] = rec_max + 1 - data[['a', 'b', 'm', 'n']] return data, elec_mgr, None
def _import_bin(filename): """Read a .bin file generated by the IRIS Instruments Syscal Pro System Parameters ---------- filename : string|io.BytesIO Path to input filename Returns ------- metadata : dict General information on the measurement df : :py:class:`pandas.DataFrame` dataframe containing all measurement data """ if isinstance(filename, io.BytesIO): fid = filename else: fid = open(filename, 'rb') def fget(fid, fmt, tsize): buffer = fid.read(tsize) result_raw = struct.unpack(fmt, buffer) if len(result_raw) == 1: return result_raw[0] else: return result_raw # determine overall size fid.seek(0, 2) total_size = fid.tell() # print('total size', total_size) # start from the beginning fid.seek(0) # read version buffer = fid.read(4) version = struct.unpack('I', buffer) # print('version', version) buffer = fid.read(1) typedesyscal = struct.unpack('c', buffer)[0] syscal_type = int.from_bytes(typedesyscal, 'big') # print('Syscal type: {}'.format(syscal_type)) # comment buffer = fid.read(1024) comment_raw = struct.iter_unpack('c', buffer) comment = ''.join([x[0].decode('utf-8') for x in comment_raw]) # print('comment', comment) metadata = { 'version': version, 'syscal_type': syscal_type, 'comment': comment, } measurements = [] # for each measurement counter = 0 while (fid.tell() < total_size): dataentry = {} # print('COUNTER', counter) buffer = fid.read(2) array_type = struct.unpack('h', buffer) array_type # print(array_type) # not used moretmeasure = fget(fid, 'h', 2) moretmeasure # print('moretmeasure', moretmeasure) # measurement time [ms] mtime = fget(fid, 'f', 4) # print('measurement time', mtime) # delay before IP measurements start [ms] mdelay = fget(fid, 'f', 4) # print('mdelay', mdelay) TypeCpXyz = fget(fid, 'h', 2) # our file format documentation always assumes this value to be == 1 assert TypeCpXyz == 1 # print('TypeCpXyz', TypeCpXyz) # ignore fget(fid, 'h', 2) # positions: a b m n [m] xpos = fget(fid, '4f', 16) # print('xpos', xpos) ypos = fget(fid, '4f', 16) # print('ypos', ypos) zpos = fget(fid, '4f', 16) # print('zpos', zpos) # self-potential [mV] sp = fget(fid, 'f', 4) # print('sp', sp) # measured voltage at voltage electrodes m and n [mV] vp = fget(fid, 'f', 4) # print('vp', vp) Iab = fget(fid, 'f', 4) # print('iab', Iab) rho = fget(fid, 'f', 4) # print('rho', rho) m = fget(fid, 'f', 4) # print('m', m) # standard deviation q = fget(fid, 'f', 4) # print('q', q) # timing windows Tm = fget(fid, '20f', 20 * 4) Tm = np.array(Tm) # print('Tm', Tm) # chargeabilities Mx = fget(fid, '20f', 20 * 4) Mx = np.array(Mx) dataentry['version'] = version dataentry['mtime'] = mtime dataentry['x_a'] = xpos[0] dataentry['x_b'] = xpos[1] dataentry['x_m'] = xpos[2] dataentry['x_n'] = xpos[3] dataentry['y_a'] = ypos[0] dataentry['y_b'] = ypos[1] dataentry['y_m'] = ypos[2] dataentry['y_n'] = ypos[3] dataentry['z_a'] = zpos[0] dataentry['z_b'] = zpos[1] dataentry['z_m'] = zpos[2] dataentry['z_n'] = zpos[3] dataentry['mdelay'] = mdelay dataentry['vp'] = vp dataentry['q'] = q dataentry['m'] = m dataentry['Tm'] = Tm dataentry['Mx'] = Mx dataentry['sp'] = sp dataentry['Iab'] = Iab dataentry['rho'] = rho if moretmeasure > 0: # print('Mx', Mx) # this is 4 bytes used to store information on the measured channel # Channel + NbChannel buffer = fid.read(1) buffer_bin = bin(ord(buffer))[2:].rjust(8, '0') # print(buffer_bin) channel = int(buffer_bin[4:], 2) channelnb = int(buffer_bin[0:4], 2) # print('ChannelNB:', channelnb) # print('Channel:', channel) # 4 binaries + unused buffer = fid.read(1) buffer_bin = bin(ord(buffer))[2:].rjust(8, '0') # print(buffer_bin) overload = bool(int(buffer_bin[4])) channel_valid = bool(int(buffer_bin[5])) channel_sync = bool(int(buffer_bin[6])) gap_filler = bool(int(buffer_bin[7])) # print(overload, channel_valid, channel_sync, gap_filler) measurement_num = fget(fid, 'H', 2) # print('measurement_num', measurement_num) filename = fget(fid, '12s', 12) # print('filename', filename) latitude = fget(fid, 'f', 4) # print('lat', latitude) longitude = fget(fid, 'f', 4) # print('long', longitude) # number of stacks NbCren = fget(fid, 'f', 4) # print('Stacks', NbCren) # RS check RsChk = fget(fid, 'f', 4) # print('RsChk', RsChk) dataentry['channel'] = channel dataentry['overload'] = overload dataentry['channel_valid'] = channel_valid dataentry['channel_sync'] = channel_sync dataentry['gap_filler'] = gap_filler dataentry['NbStacks'] = NbCren dataentry['RsCHk'] = RsChk, dataentry['latitude'] = latitude dataentry['longitude'] = longitude dataentry['channelnb'] = channelnb dataentry['measurement_num'] = measurement_num dataentry['nr'] = measurement_num if moretmeasure >= 2: # absolute applied voltage vab = fget(fid, 'f', 4) # print('Vab', vab) # tx battery voltage [V] batTX = fget(fid, 'f', 4) # print('batTX', batTX) # rx battery voltage [V] batRX = fget(fid, 'f', 4) # print('batRX', batRX) temperature = fget(fid, 'f', 4) # print('Temp.', temperature) dataentry['vab'] = vab dataentry['batTX'] = batTX dataentry['batRX'] = batRX dataentry['temperature'] = temperature if moretmeasure == 3: # TODO: date and time not analyzed b = struct.unpack('2f', fid.read(2 * 4)) # print('b', b) b measurements.append(dataentry) counter += 1 # only close the fid if we read from a file - leave BytesIO open for # further use if not isinstance(filename, io.BytesIO): fid.seek(0) else: # rewind fid.close() # create a dataframe with all primary data df = pd.DataFrame( measurements ).reset_index() return metadata, df