7
7
8
8
import concurrent .futures
9
9
import multiprocessing as mp
10
+ import sys
10
11
from datetime import datetime , timedelta
11
12
from functools import partial
12
13
@@ -34,7 +35,7 @@ def get_acoustic_data(
34
35
mseed_file_limit = None ,
35
36
large_gap_limit = 1800.0 ,
36
37
obspy_merge_method = 0 ,
37
- gapless_merge = False ,
38
+ gapless_merge = True ,
38
39
):
39
40
"""
40
41
Get broadband acoustic data for specific time frame and sensor node. The
@@ -75,12 +76,6 @@ def get_acoustic_data(
75
76
and end in case of boundary gaps in data. Default is True
76
77
verbose : bool, optional
77
78
specifies whether print statements should occur or not
78
- data_gap_mode : int, optional
79
- How gaps in the raw data will be handled. Options are:
80
- '0': gaps will be linearly interpolated
81
- '1': no interpolation; mask array is returned
82
- '2': subtract mean of data and fill gap with zeros; mask array
83
- is returned
84
79
mseed_file_limit: int, optional
85
80
If the number of mseed traces to be merged exceed this value, the
86
81
function returns None. For some days the mseed files contain
@@ -107,12 +102,11 @@ def get_acoustic_data(
107
102
these were saved as separate mseed files. after 2023 (and in some cases,
108
103
but not all retroactively), 5 minute mseed files contain many fragmented
109
104
traces. These traces are essentially not possible to merge with
110
- obspy.merge. If True, then experimental method to merge traces without
105
+ obspy.merge. If True, then method to merge traces without
111
106
consideration of gaps will be attempted. This will only be done if there
112
107
is full data coverage over 5 min file length, but could still result in
113
- unalligned data.
114
- This is an experimental feature and should be used with
115
- caution.
108
+ unalligned data. Default value is True. You should probably not use
109
+ this method for data before June 2023 because it will likely cause an error.
116
110
117
111
Returns
118
112
-------
@@ -184,11 +178,16 @@ def get_acoustic_data(
184
178
data_url_list [i + 1 ].split ("YDH" )[1 ][1 :].split (".mseed" )[0 ]
185
179
)
186
180
else :
187
- utc_time_url_stop = UTCDateTime (data_url_list [i ].split ("YDH" )[1 ][1 :].split (".mseed" )[0 ])
188
- utc_time_url_stop .hour = 23
189
- utc_time_url_stop .minute = 59
190
- utc_time_url_stop .second = 59
191
- utc_time_url_stop .microsecond = 999999
181
+ base_time = UTCDateTime (data_url_list [i ].split ("YDH" )[1 ][1 :].split (".mseed" )[0 ])
182
+ utc_time_url_stop = UTCDateTime (
183
+ year = base_time .year ,
184
+ month = base_time .month ,
185
+ day = base_time .day ,
186
+ hour = 23 ,
187
+ minute = 59 ,
188
+ second = 59 ,
189
+ microsecond = 999999 ,
190
+ )
192
191
193
192
# if current segment contains desired data, store data segment
194
193
if (
@@ -271,12 +270,11 @@ def get_acoustic_data(
271
270
__read_mseed , valid_data_url_list , verbose = verbose , max_workers = max_workers
272
271
)
273
272
273
+ st_list_new = []
274
274
# combine traces from single files into one trace if gapless merge is set to true
275
+ # if a single 5 minute file is is not compatible with gapless merge, it is currently removed
275
276
if gapless_merge :
276
277
for k , st in enumerate (st_list ):
277
- # check if multiple traces in stream
278
- if len (st ) == 1 :
279
- continue
280
278
281
279
# count total number of points in stream
282
280
npts_total = 0
@@ -286,15 +284,16 @@ def get_acoustic_data(
286
284
# if valid npts, merge traces w/o consideration to gaps
287
285
if npts_total / sampling_rate in [
288
286
300 ,
289
- 299.999 ,
290
- 300.001 ,
287
+ # 299.999,
288
+ # 300.001,
291
289
]: # must be 5 minutes of samples
292
290
# NOTE it appears that npts_total is nondeterminstically off by ± 64 samples. I have
293
291
# idea why, but am catching this here. Unknown what downstream effects this could
294
292
# have
295
293
296
- if verbose :
297
- print (f"gapless merge for { valid_data_url_list [k ]} " )
294
+ # if verbose:
295
+ # print(f"gapless merge for {valid_data_url_list[k]}")
296
+
298
297
data = []
299
298
for tr in st :
300
299
data .append (tr .data )
@@ -304,15 +303,28 @@ def get_acoustic_data(
304
303
stats ["starttime" ] = UTCDateTime (valid_data_url_list [k ][- 33 :- 6 ])
305
304
stats ["endtime" ] = UTCDateTime (stats ["starttime" ] + timedelta (minutes = 5 ))
306
305
stats ["npts" ] = len (data_cat )
307
-
308
- st_list [k ] = Stream (traces = Trace (data_cat , header = stats ))
306
+ st_list_new .append (Stream (traces = Trace (data_cat , header = stats )))
309
307
else :
310
- if verbose :
311
- print (
312
- f"Data segment { valid_data_url_list [k ]} , \
313
- with npts { npts_total } , is not compatible with gapless merge"
314
- )
315
- _ = st_list .pop (k )
308
+ # if verbose:
309
+ # print(
310
+ # f"Data segment {valid_data_url_list[k]}, \
311
+ # with npts {npts_total}, is not compatible with gapless merge"
312
+ # )
313
+
314
+ # check if start times contain unique values
315
+ start_times = []
316
+ for tr in st_list [k ]:
317
+ start_times .append (tr .stats .starttime .strftime ("%Y-%m-%dT%H:%M:%S" ))
318
+ un_starttimes = set (start_times )
319
+ if len (un_starttimes ) == len (st_list [k ]):
320
+ if verbose :
321
+ print ("file fragmented but timestamps are unique. Segment kept" )
322
+ st_list_new .append (st_list [k ])
323
+ else :
324
+ if verbose :
325
+ print ("file fragmented and timestamps are corrupt. Segment thrown out" )
326
+ pass
327
+ st_list = st_list_new
316
328
317
329
# check if number of traces in st_list exceeds limit
318
330
if mseed_file_limit is not None :
@@ -597,19 +609,20 @@ def __map_concurrency(func, iterator, args=(), max_workers=-1, verbose=False):
597
609
if max_workers == - 1 :
598
610
max_workers = 2 * mp .cpu_count ()
599
611
600
- results = []
612
+ results = [None ] * len ( iterator )
601
613
with concurrent .futures .ThreadPoolExecutor (max_workers = max_workers ) as executor :
602
- # Start the load operations and mark each future with its URL
603
- future_to_url = {executor .submit (func , i , * args ): i for i in iterator }
614
+ # Start the load operations and mark each future with its index
615
+ future_to_index = {executor .submit (func , i , * args ): idx for idx , i in enumerate ( iterator ) }
604
616
# Disable progress bar
605
617
is_disabled = not verbose
606
618
for future in tqdm (
607
- concurrent .futures .as_completed (future_to_url ),
619
+ concurrent .futures .as_completed (future_to_index ),
608
620
total = len (iterator ),
609
621
disable = is_disabled ,
622
+ file = sys .stdout ,
610
623
):
611
- data = future . result ()
612
- results . append ( data )
624
+ idx = future_to_index [ future ]
625
+ results [ idx ] = future . result ( )
613
626
return results
614
627
615
628
0 commit comments