Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _read_general_header_1(fi):
"""
Extract information contained in the general header block 1
"""
gen_head_1 = dict(
file_number=_read(fi, 0, 2, 'bcd'),
sample_format_code=_read(fi, 2, 2, 'bcd'),
general_constant=_read(fi, 4, 6, 'bcd'),
time_slice_year=_read(fi, 10, 1, 'bcd'),
nbr_add_general_header=_read(fi, 11, 0.5, 'bcd'),
julian_day=_read(fi, 11, 1.5, 'bcd', False),
time_slice=_read(fi, 13, 3, 'bcd'),
manufacturer_code=_read(fi, 16, 1, 'bcd'),
manufacturer_serial_number=_read(fi, 17, 2, 'bcd'),
base_scan_interval=_read(fi, 22, 1, 'binary'),
polarity_code=_read(fi, 23, 0.5, 'binary'),
record_type=_read(fi, 25, 0.5, 'binary'),
scan_type_per_record=_read(fi, 27, 1, 'bcd'),
nbr_channel_set=_read(fi, 28, 1, 'bcd'),
nbr_skew_block=_read(fi, 29, 1, 'bcd'),
)
return gen_head_1
def _read_extended_header_3(fi, start_byte):
"""
Extract information contained in the extended header block number 3.
"""
extended_header_3 = dict(
receiver_line_number=_read(fi, start_byte, 4, 'binary'),
receiver_point=_read(fi, start_byte + 4, 4, 'binary'),
receiver_point_index=_read(fi, start_byte + 8, 1, 'binary'),
first_shot_line=_read(fi, start_byte + 9, 4, 'binary'),
first_shot_point=_read(fi, start_byte + 13, 4, 'binary'),
first_shot_point_index=_read(fi, start_byte + 17, 1, 'binary'),
last_shot_line=_read(fi, start_byte + 18, 4, 'binary'),
last_shot_point=_read(fi, start_byte + 22, 4, 'binary'),
last_shot_point_index=_read(fi, start_byte + 26, 1, 'binary'),
)
return extended_header_3
def _read_channel_set(fi, start_byte):
"""
Extract information contained in the ith channel set descriptor.
"""
nbr_32_ext = _read(fi, start_byte + 28, 0.5, 'binary', False)
channel_set = dict(
scan_type_number=_read(fi, start_byte, 1, 'bcd'),
channel_set_number=_read(fi, start_byte + 1, 1, 'bcd'),
channel_set_start_time=_read(fi, start_byte + 2, 2, 'binary') * 2e-3,
channel_set_end_time=_read(fi, start_byte + 4, 2, 'binary') * 2e-3,
optionnal_MP_factor=_read(fi, start_byte + 6, 1, 'binary'),
mp_factor_descaler_multiplier=_read(fi, start_byte + 7, 1, 'binary'),
nbr_channels_in_channel_set=_read(fi, start_byte + 8, 2, 'bcd'),
channel_type_code=_read(fi, start_byte + 10, 0.5, 'binary'),
nbr_sub_scans=_read(fi, start_byte + 11, 0.5, 'bcd'),
gain_control_type=_read(fi, start_byte + 11, 0.5, 'bcd', False),
alias_filter_frequency=_read(fi, start_byte + 12, 2, 'bcd'),
alias_filter_slope=_read(fi, start_byte + 14, 2, 'bcd'),
low_cut_filter_freq=_read(fi, start_byte + 16, 2, 'bcd'),
low_cut_filter_slope=_read(fi, start_byte + 18, 2, 'bcd'),
notch_filter_freq=_read(fi, start_byte + 20, 2, 'bcd') / 10,
notch_2_filter_freq=_read(fi, start_byte + 22, 2, 'bcd') / 10,
notch_3_filter_freq=_read(fi, start_byte + 24, 2, 'bcd') / 10,
drift_code = str(_read(fi, start_byte + 13, 1, 'binary'))
leg_oscillator_type = {'0': 'control board', '1': 'atomic',
'2': 'ovenized', '3': 'double ovenized',
'4': 'disciplined'}
oscillator_code = str(_read(fi, start_byte + 14, 1, 'binary'))
leg_data_collection = {'0': 'normal', '1': 'continuous',
'2': 'shot sliced with guard band'}
data_collection_code = str(_read(fi, start_byte + 15, 1, 'binary'))
leg_data_decimation = {'0': 'not decimated', '1': 'decimated data'}
decimation_code = str(_read(fi, start_byte + 28, 1, 'binary'))
extended_header_2 = dict(
acquisition_drift_window=_read(fi, start_byte, 4, 'IEEE') * 1e-6,
clock_drift=_read(fi, start_byte + 4, 8, 'binary') * 1e-9,
clock_stop_method=leg_clock_stop[stop_code],
frequency_drift=leg_freq_drift[drift_code],
oscillator_type=leg_oscillator_type[oscillator_code],
data_collection_method=leg_data_collection[data_collection_code],
nbr_time_slices=_read(fi, start_byte + 16, 4, 'binary'),
nbr_files=_read(fi, start_byte + 20, 4, 'binary'),
file_number=_read(fi, start_byte + 24, 4, 'binary'),
data_decimation=leg_data_decimation[decimation_code],
original_base_scan_interval=_read(fi, start_byte + 29, 1, 'binary'),
nbr_decimation_filter_coef=_read(fi, start_byte + 30, 2, 'binary'),
)
return extended_header_2
def _read_trace_header_7(fi, trace_block_start):
"""
Read trace header 7
"""
pos = trace_block_start + 20 + 32 * 6
dict_header_7 = dict(
tilt_matrix_vz=_read(fi, pos, 4, 'IEEE'),
azimuth_degree=_read(fi, pos + 4, 4, 'IEEE'),
pitch_degree=_read(fi, pos + 8, 4, 'IEEE'),
roll_degree=_read(fi, pos + 12, 4, 'IEEE'),
remote_unit_temp=_read(fi, pos + 16, 4, 'IEEE'),
remote_unit_humidity=_read(fi, pos + 20, 4, 'IEEE'),
orientation_matrix_version_nbr=_read(fi, pos + 24, 4, 'binary'),
gimbal_corrections=_read(fi, pos + 28, 1, 'binary'))
return dict_header_7
def _read_trace_header_6(fi, trace_block_start):
"""
Read trace header 6
"""
pos = trace_block_start + 20 + 32 * 5
dict_header_6 = dict(
tilt_matrix_h1x=_read(fi, pos, 4, 'IEEE'),
tilt_matrix_h2x=_read(fi, pos + 4, 4, 'IEEE'),
tilt_matrix_vx=_read(fi, pos + 8, 4, 'IEEE'),
tilt_matrix_h1y=_read(fi, pos + 12, 4, 'IEEE'),
tilt_matrix_h2y=_read(fi, pos + 16, 4, 'IEEE'),
tilt_matrix_vy=_read(fi, pos + 20, 4, 'IEEE'),
tilt_matrix_h1z=_read(fi, pos + 24, 4, 'IEEE'),
tilt_matrix_h2z=_read(fi, pos + 28, 4, 'IEEE'),
)
return dict_header_6
# map sampling rate to band code according to seed standard
band_map = {2000: 'G', 1000: 'G', 500: 'D', 250: 'D'}
# geophone instrument code
instrument_code = 'P'
# mapping for "standard_orientation"
standard_component_map = {'2': 'Z', '3': 'N', '4': 'E'}
component = str(_read(fi, tr_block_start + 40, 1, 'binary'))
if standard_orientation:
component = standard_component_map[component]
chan = band_map[sampling_rate] + instrument_code + component
npts = _read(fi, tr_block_start + 27, 3, 'binary')
start_time = _read(fi, tr_block_start + 20 + 2 * 32, 8, 'binary') / 1e6
end_time = start_time + (npts - 1) * (1 / sampling_rate)
network = _read(fi, tr_block_start + 20, 3, 'binary')
station = _read(fi, tr_block_start + 23, 3, 'binary')
location = _read(fi, tr_block_start + 26, 1, 'binary')
statsdict = dict(starttime=UTCDateTime(start_time),
endtime=UTCDateTime(end_time),
sampling_rate=sampling_rate,
npts=npts,
network=str(network),
station=str(station),
location=str(location),
channel=chan)
if details:
statsdict['rg16'] = {}
statsdict['rg16']['initial_headers'] = {}
stats_initial_headers = statsdict['rg16']['initial_headers']
stats_initial_headers.update(_read_initial_headers(fi))
statsdict['rg16']['trace_headers'] = {}
stats_tr_headers = statsdict['rg16']['trace_headers']
stats_tr_headers.update(_read_trace_header(fi, tr_block_start))
"""
Extract information contained in the general header block 1
"""
gen_head_1 = dict(
file_number=_read(fi, 0, 2, 'bcd'),
sample_format_code=_read(fi, 2, 2, 'bcd'),
general_constant=_read(fi, 4, 6, 'bcd'),
time_slice_year=_read(fi, 10, 1, 'bcd'),
nbr_add_general_header=_read(fi, 11, 0.5, 'bcd'),
julian_day=_read(fi, 11, 1.5, 'bcd', False),
time_slice=_read(fi, 13, 3, 'bcd'),
manufacturer_code=_read(fi, 16, 1, 'bcd'),
manufacturer_serial_number=_read(fi, 17, 2, 'bcd'),
base_scan_interval=_read(fi, 22, 1, 'binary'),
polarity_code=_read(fi, 23, 0.5, 'binary'),
record_type=_read(fi, 25, 0.5, 'binary'),
scan_type_per_record=_read(fi, 27, 1, 'bcd'),
nbr_channel_set=_read(fi, 28, 1, 'bcd'),
nbr_skew_block=_read(fi, 29, 1, 'bcd'),
)
return gen_head_1
gen_head_1 = dict(
file_number=_read(fi, 0, 2, 'bcd'),
sample_format_code=_read(fi, 2, 2, 'bcd'),
general_constant=_read(fi, 4, 6, 'bcd'),
time_slice_year=_read(fi, 10, 1, 'bcd'),
nbr_add_general_header=_read(fi, 11, 0.5, 'bcd'),
julian_day=_read(fi, 11, 1.5, 'bcd', False),
time_slice=_read(fi, 13, 3, 'bcd'),
manufacturer_code=_read(fi, 16, 1, 'bcd'),
manufacturer_serial_number=_read(fi, 17, 2, 'bcd'),
base_scan_interval=_read(fi, 22, 1, 'binary'),
polarity_code=_read(fi, 23, 0.5, 'binary'),
record_type=_read(fi, 25, 0.5, 'binary'),
scan_type_per_record=_read(fi, 27, 1, 'bcd'),
nbr_channel_set=_read(fi, 28, 1, 'bcd'),
nbr_skew_block=_read(fi, 29, 1, 'bcd'),
)
return gen_head_1
leg_shot_flag = {'0': 'normal', '1': 'bad-operator specified',
'2': 'bad-failed to QC test'}
shot_code = str(_read(fi, pos + 11, 1, 'binary'))
dict_header_4 = dict(
# pre shot guard band in second
pre_shot_guard_band=_read(fi, pos, 4, 'binary') / 1e3,
# post shot guard band in second
post_shot_guard_band=_read(fi, pos + 4, 4, 'binary') / 1e3,
# preamp gain in dB
preamp_gain=_read(fi, pos + 8, 1, 'binary'),
trace_clipped_flag=leg_trace_clipped[clipped_code],
record_type_code=leg_record_type[record_type_code],
shot_status_flag=leg_shot_flag[shot_code],
external_shot_id=_read(fi, pos + 12, 4, 'binary'),
post_processed_first_break_pick_time=_read(fi, pos + 24, 4, 'IEEE'),
post_processed_rms_noise=_read(fi, pos + 28, 4, 'IEEE'),
)
return dict_header_4