Nantes Université

Skip to content
Extraits de code Groupes Projets
Vérifiée Valider 75d93c06 rédigé par Mickael Bonnin's avatar Mickael Bonnin
Parcourir les fichiers

Single 'spectro_extract' func in 'data_process'

parent 956fe263
Aucune branche associée trouvée
Aucune étiquette associée trouvée
2 requêtes de fusion!5Merge main to CH_master,!3Merge MB_master to main
......@@ -71,8 +71,8 @@ def get_fft(trace: op.core.trace.Trace, WINDOW_LENGTH: int,
return result, freqs
def spectro_extract_valid(data_dir: str, spectro_dir: str,
events_list: list) -> None:
def spectro_extract(data_dir: str, spectro_dir: str,
events_list: list = None) -> None:
"""
Compute the spectrograms that will be used for the validation.
The matrices are saved as NumPy objects.
......@@ -88,90 +88,10 @@ def spectro_extract_valid(data_dir: str, spectro_dir: str,
WINDOW_LENGTH = 1
OVERLAP = (1 - 0.75)
print(f'Number of events: {len(events_list)}')
nb_evt = 0
for a in range(len(events_list)):
nb_evt += 1
print('*****************')
print(f'EVENT {nb_evt} / {len(events_list)}')
time = events_list[a][0]
if not os.path.exists(f'{spectro_dir}/{time}'):
os.makedirs(f'{spectro_dir}/{time}')
list_stream = glob.glob(f'{data_dir}/{time}/*')
print(f'Number of stream: {len(list_stream)}')
nb_st = 0
for stream in list_stream:
nb_st += 1
print(f'Stream {nb_st} / {len(list_stream)}\r')
st = op.read(stream, dtype=float)
stream_name = (stream.split('/')[-1]).split('.mseed')[0]
st.detrend('demean')
st.taper(0.05)
st = st.filter('highpass', freq=2, corners=4, zerophase=True)
if st[0].stats.sampling_rate == 200:
st.decimate(2)
compo = []
for tr in st:
compo.append(tr.stats.component)
if len(compo) != 3:
continue
spectro = []
find = False
for c in compo:
trace = st.select(component=c)[0]
s_rate = trace.stats.sampling_rate
nb_pts = int(WINDOW_LENGTH * s_rate)
fft_list = []
time_used = []
start = trace.stats.starttime
END = trace.stats.endtime
while start + WINDOW_LENGTH <= END:
tr = trace.slice(starttime=start,
endtime=start + WINDOW_LENGTH)
mean_time = tr.stats.starttime + (WINDOW_LENGTH / 2)
time_used.append(mean_time - trace.stats.starttime)
start += (WINDOW_LENGTH * OVERLAP)
fft, _ = get_fft(tr, WINDOW_LENGTH, OVERLAP, nb_pts)
fft = np.array(fft)
fft_list.append(fft)
fft_list = np.array(fft_list)
if fft_list.shape == (237, 50): # OVERLAP 75% : (237,50)
fft_list /= fft_list.max()
spectro.append(fft_list)
find = True
if find == True and len(spectro) == 3:
spectro = np.array(spectro)
np.save(f'{spectro_dir}/{time}/{stream_name}.npy', spectro)
def spectro_extract_pred(data_dir: str, spectro_dir: str) -> None:
"""
Compute the spectrogramms that will be used for the prediction.
The spectrograms are saved as NumPy objects.
:type data_dir: str
:param data_dir: Absolute path the input data.
:type spectro_dir: str
:param spectro_dir: Absolute path where to save the output spectrograms.
"""
WINDOW_LENGTH = 1
OVERLAP = (1 - 0.75)
events = glob.glob(f'{data_dir}/*')
if events_list is None:
events = glob.glob(f'{data_dir}/*')
else:
events = events_list
print(f'Number of events: {len(events)}')
nb_evt = 0
......@@ -179,14 +99,18 @@ def spectro_extract_pred(data_dir: str, spectro_dir: str) -> None:
nb_evt += 1
print('*****************')
print(f'EVENT {nb_evt} / {len(events)}')
time = events[a].split('/')[-1]
if events_list is None:
time = events[a].split('/')[-1]
else:
time = events[a][0]
if not os.path.exists(f'{spectro_dir}/{time}'):
os.makedirs(f'{spectro_dir}/{time}')
list_stream = glob.glob(f'{data_dir}/{time}/*')
print(f'Number of stream: {len(list_stream)}')
print(f'Number of streams: {len(list_stream)}')
nb_st = 0
for stream in list_stream:
nb_st += 1
......
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter