From 4d287d53311d4c5598d68b7249c79f14666dc061 Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Mon, 5 May 2025 05:49:07 -0700 Subject: [PATCH 01/90] yodas2 config is added Signed-off-by: Sasha Meister --- .../multilingual/granary/yodas2.yaml | 385 ++++++++++++++++++ 1 file changed, 385 insertions(+) create mode 100644 dataset_configs/multilingual/granary/yodas2.yaml diff --git a/dataset_configs/multilingual/granary/yodas2.yaml b/dataset_configs/multilingual/granary/yodas2.yaml new file mode 100644 index 00000000..f81aab68 --- /dev/null +++ b/dataset_configs/multilingual/granary/yodas2.yaml @@ -0,0 +1,385 @@ +documentation: | + YODAS2 + ############ + Documentation is in progress. + +params: + source_lang: en + source_lang_full: English + min_audio_lid_probability: 0.7 + min_audio_duration: 0.1 + max_audio_duration: 40.0 #Add specific processor! + translation: + target_lang: it + target_lang_full: Italian + max_len_diff_ratio: 4 + min_hist_token_ratio: 0.8 + min_text_lid_probability: 0.3 + min_qe_score: 0.75 + save_disk_space: True + +processors_to_run: "9" +workspace_dir: ??? +install_requirements: True + +processors: + - _target_: sdp.processors.datasets.yodas2.ListYodas2Data + output_manifest_file: ${workspace_dir}/manifest_00.json + use_metadata: True + + - _target_: sdp.processors.LambdaExpression + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_01.json + new_field: src_lang + expression: entry.lang_subset[:2] + + - _target_: sdp.processors.PreserveByValue + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_02.json + input_value_key: src_lang + target_value: ${params.source_lang} + + - _target_: sdp.processors.datasets.yodas2.DownloadYodas2Data + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_03.json + local_dir: ${workspace_dir}/${params.source_lang}/ + max_workers: 8 + + - _target_: sdp.processors.ExtractTar + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_04.json + field_to_tar_filepath: 'local_audio' + extraction_dir: ${workspace_dir}/${params.source_lang} + remove_source_tar: ${params.save_disk_space} + filepath_prefix_field: 'lang_subset' + output_filepath_field: 'extracted_audios' + get_extracted_filepaths: True + + - _target_: sdp.processors.datasets.yodas2.CreateInitialManifest + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_05.json + field_with_list: 'extracted_audios' + output_field: 'source_audio_filepath' + fields_to_save: + - lang_subset + - shard_id + - src_lang + + - _target_: sdp.processors.LambdaExpression + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_06.json + new_field: 'audio_filepath' + expression: "'${workspace_dir}/${params.source_lang}/converted/' + entry.lang_subset + '/' + entry.shard_id + '/' + entry.yodas_id" + + - _target_: sdp.processors.FfmpegConvert + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_07.json + input_file_key: 'source_audio_filepath' + output_file_key: 'audio_filepath' + id_key: 'audio_filepath' + converted_audio_dir: '/' + target_samplerate: 16000 + target_nchannels: 1 + + - _target_: sdp.processors.RemoveFiles + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_08.json + filepath_field: 'source_audio_filepath' + should_run: ${params.save_disk_space} + + # Lang ID + - _target_: sdp.processors.FasterWhisperInference + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_09.json + model_size_or_path: 'base' + num_devices: -1 + output_dir: ${workspace_dir}/${params.source_lang}/manifest_09 + language_detection_only: True + inference: + language_detection_segments: 7 + chunk_length: 30 + save_timestamps_separately: False + skip_corrupted_audios: True + + - _target_: sdp.processors.LambdaExpression + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_10.json + new_field: 'lid_verified' + expression: (entry.language == "${params.source_lang}") & (entry.language_probability >= ${params.min_audio_lid_probability}) + filter: True + + - _target_: sdp.processors.DropSpecificFields + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_11.json + fields_to_drop: + - language + - language_probability + - lid_verified + + # Inference on long audio + - _target_: sdp.processors.FasterWhisperInference + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_12.json + model_size_or_path: 'base' + output_dir: ${workspace_dir}/${params.source_lang}/manifest_12 + inference: + batch_size: 16 + language: ${params.source_lang} + save_timestamps_separately: False + skip_corrupted: True + + - _target_: sdp.processors.ListToEntries + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_13.json + field_with_list: 'segments' + + - _target_: sdp.processors.LambdaExpression + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_14.json + new_field: 'duration' + expression: entry.end - entry.start + + - _target_: sdp.processors.RenameFields + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_15.json + rename_fields: + start: offset + id: segment_id + + - _target_: sdp.processors.KeepOnlySpecifiedFields + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_16.json + fields_to_keep: + - lang_subset + - shard_id + - yodas_id + - src_lang + - audio_filepath + - segment_id + - offset + - duration + + - _target_: sdp.processors.FasterWhisperInference + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_17.json + model_size_or_path: 'base' + output_dir: ${workspace_dir}/${params.source_lang}/manifest_17 + inference: + batch_size: 16 + language: ${params.source_lang} + save_timestamps_separately: False + skip_corrupted: True + slice_by_offset: True + + - _target_: sdp.processors.KeepOnlySpecifiedFields + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_18.json + fields_to_keep: + - lang_subset + - shard_id + - yodas_id + - src_lang + - audio_filepath + - segment_id + - offset + - duration + - pred_text + + - _target_: sdp.processors.RenameFields + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_19.json + rename_fields: + pred_text: text + + - _target_: sdp.processors.DropIfRegexMatch + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_20.json + text_key: text + regex_patterns: + - "^\\s*$" + + - _target_: sdp.processors.WhisperHallucinationFeatures + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_21.json + text_field: text + + - _target_: sdp.processors.LambdaExpression + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_22.json + new_field: is_hallucinated + expression: (not entry.hall_repeated_ngrams) & (not entry.hall_long_word) & (not entry.hall_frequent_single_word) + filter: True + + - _target_: sdp.processors.KeepOnlySpecifiedFields + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_23.json + fields_to_keep: + - lang_subset + - shard_id + - yodas_id + - src_lang + - audio_filepath + - segment_id + - offset + - duration + - text + + - _target_: sdp.processors.vLLMInference + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_24.json + generation_field: src_text + prompt_file: ./dataset_configs/multilingual/yodas2/partials/pr_recovery_prompts/${params.source_lang}.yaml + model: + model: "Qwen/Qwen2.5-7B-Instruct-1M" + tensor_parallel_size: 2 + max_model_len: 2048 + enable_chunked_prefill: True + max_num_batched_tokens: 1024 + enforce_eager: True + dtype: float16 + gpu_memory_utilization: 0.95 + max_num_seqs: 16 + inference: + temperature: 0.7 + top_p: 0.8 + repetition_penalty: 1.05 + max_tokens: 2048 + apply_chat_template: + tokenize: False + add_generation_prompt: True + + - _target_: sdp.processors.QwenGenerationFiltering + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_25.json + text_field: text + generation_field: src_text + + - _target_: sdp.processors.SubRegex + text_key: src_text + regex_params_yaml: ./dataset_configs/multilingual/yodas2/partials/subregex_params.yaml + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_26.json + + - _target_: sdp.processors.DropSpecificFields + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_27.json + fields_to_drop: + - text + + # AST + - _target_: sdp.processors.AddConstantFields + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_28.json + fields: + tgt_lang: ${params.translation.target_lang} + + - _target_: sdp.processors.vLLMInference + generation_field: tgt_text + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_29.json + prompt: + system: "" + user: | + Translate the following ${params.source_lang_full} source text to ${params.translation.target_lang_full}: + ${params.source_lang_full}: {src_text} + ${params.translation.target_lang_full}: + model: + model: "utter-project/EuroLLM-9B-Instruct" + dtype: float16 + inference: + best_of: 1 + temperature: 0.0 + top_p: 1.0 + max_tokens: 1280 + apply_chat_template: + max_length: 512 + tokenize: False + add_generation_prompt: True + + ## num_words and len_diff_ratio filtering + - _target_: sdp.processors.CountNumWords + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_30.json + text_key: src_text + num_words_key: num_words_src + + - _target_: sdp.processors.CountNumWords + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_31.json + text_key: tgt_text + num_words_key: num_words_tgt + + - _target_: sdp.processors.LambdaExpression + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_32.json + new_field: num_words_filter + expression: (entry.num_words_src > 1) & (entry.num_words_tgt > 1) + filter: True + + - _target_: sdp.processors.LambdaExpression + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_33.json + new_field: len_diff_ratio + expression: max(entry.num_words_src / entry.num_words_tgt, entry.num_words_tgt / entry.num_words_src) + + - _target_: sdp.processors.PreserveByValue + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_34.json + input_value_key: len_diff_ratio + operator: lt + target_value: ${params.translation.max_len_diff_ratio} + + - _target_: sdp.processors.DropSpecificFields + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_35.json + fields_to_drop: + - num_words_src + - num_words_tgt + - num_words_filter + - len_diff_ratio + + ## filtering based on character histograms + - _target_: sdp.processors.CharacterHistograms + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_36.json + text_field: src_text + lang: ${params.source_lang} + output_score_field: src_hist_token_ratio + cache_dir: /data3/sdp_test/cache/histograms + + - _target_: sdp.processors.CharacterHistograms + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_37.json + text_field: tgt_text + lang: ${params.translation.target_lang} + output_score_field: tgt_hist_token_ratio + cache_dir: /data3/sdp_test/cache/histograms + + - _target_: sdp.processors.LambdaExpression + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_38.json + new_field: len_diff_ratio_filter + expression: (entry.src_hist_token_ratio > ${params.translation.min_hist_token_ratio}) & (entry.tgt_hist_token_ratio > ${params.translation.min_hist_token_ratio}) + filter: True + + - _target_: sdp.processors.DropSpecificFields + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_39.json + fields_to_drop: + - src_hist_token_ratio + - tgt_hist_token_ratio + - len_diff_ratio_filter + + ## filtering based on Fasttext LID + - _target_: sdp.processors.FastTextClassifier + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_40.json + text_field: src_text + output_field: src_lid + model_name_or_path: lid.176.bin + cache_dir: /data3/sdp_test/cache + + - _target_: sdp.processors.FastTextClassifier + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_41.json + text_field: tgt_text + output_field: tgt_lid + model_name_or_path: lid.176.bin + cache_dir: /data3/sdp_test/cache + + - _target_: sdp.processors.LambdaExpression + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_42.json + new_field: lid_filter + expression: (entry.src_lid == '${params.source_lang}') & (entry.src_lid_prob > ${params.translation.min_text_lid_probability}) & (entry.tgt_lid == '${params.translation.target_lang}') & (entry.tgt_lid_prob > ${params.translation.min_text_lid_probability}) + filter: True + + - _target_: sdp.processors.DropSpecificFields + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_43.json + fields_to_drop: + - src_lid + - src_lid_prob + - tgt_lid + - tgt_lid_prob + - lid_filter + + ## filtering based on Cometoid QE + - _target_: sdp.processors.CometoidWMTQualityEstimation + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_44.json + source_text_field: src_text + target_text_field: tgt_text + model_name_or_path: cometoid-wmt23 + device_type: gpu + num_devices: 4 + chunksize: 10 + + - _target_: sdp.processors.PreserveByValue + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_45.json + input_value_key: cometoid_score + operator: gt + target_value: ${params.translation.min_qe_score} + + - _target_: sdp.processors.DropSpecificFields + output_manifest_file: ${workspace_dir}/${params.source_lang}/manifest_46.json + fields_to_drop: + - cometoid_score + + # - _target_: sdp.processors.ConvertToTarredAudioDataset \ No newline at end of file From dc6143250147b1e9ae6c7e6acbc5108545404a29 Mon Sep 17 00:00:00 2001 From: Sasha Meister Date: Mon, 5 May 2025 07:37:08 -0700 Subject: [PATCH 02/90] ListToEntries is added Signed-off-by: Sasha Meister --- build/lib/sdp/__init__.py | 13 + build/lib/sdp/logging.py | 18 + build/lib/sdp/processors/__init__.py | 134 ++ build/lib/sdp/processors/base_processor.py | 502 +++++++ build/lib/sdp/processors/datasets/__init__.py | 0 .../datasets/commoncrawl/__init__.py | 15 + .../datasets/commoncrawl/commoncrawl.py | 99 ++ .../datasets/commoncrawl/harv_utils.py | 45 + .../sdp/processors/datasets/coraa/__init__.py | 0 .../datasets/coraa/create_initial_manifest.py | 125 ++ .../processors/datasets/coraal/__init__.py | 16 + .../coraal/create_initial_manifest.py | 218 +++ .../processors/datasets/coraal/data_splits.py | 130 ++ .../processors/datasets/fleurs/__init__.py | 0 .../fleurs/create_initial_manifest.py | 150 ++ .../sdp/processors/datasets/ksc2/__init__.py | 0 .../datasets/ksc2/create_initial_manifest.py | 150 ++ build/lib/sdp/processors/datasets/lhotse.py | 83 ++ .../datasets/librispeech/__init__.py | 0 .../librispeech/create_initial_manifest.py | 140 ++ .../sdp/processors/datasets/masc/__init__.py | 18 + .../datasets/masc/aggregate_segments.py | 131 ++ .../masc/apply_reg_exp_on_vtt_entries.py | 74 + .../datasets/masc/create_initial_manifest.py | 174 +++ .../masc/get_caption_file_segments.py | 62 + .../lib/sdp/processors/datasets/masc/utils.py | 77 ++ .../sdp/processors/datasets/mcv/__init__.py | 0 .../datasets/mcv/create_initial_manifest.py | 142 ++ .../datasets/mediaspeech/__init__.py | 13 + .../mediaspeech/create_initial_manifest.py | 145 ++ .../sdp/processors/datasets/mls/__init__.py | 0 .../datasets/mls/create_initial_manifest.py | 180 +++ .../sdp/processors/datasets/mls/restore_pc.py | 606 ++++++++ .../sdp/processors/datasets/mtedx/__init__.py | 0 .../datasets/mtedx/create_initial_manifest.py | 84 ++ .../processors/datasets/slr102/__init__.py | 0 .../slr102/create_initial_manifest.py | 122 ++ .../processors/datasets/slr140/__init__.py | 0 .../slr140/create_initial_manifest.py | 213 +++ .../sdp/processors/datasets/slr83/__init__.py | 0 .../datasets/slr83/create_initial_manifest.py | 261 ++++ .../datasets/uzbekvoice/__init__.py | 13 + .../uzbekvoice/create_initial_manifest.py | 120 ++ .../processors/datasets/voxpopuli/__init__.py | 0 .../voxpopuli/create_initial_manifest.py | 155 +++ .../voxpopuli/normalize_from_non_pc_text.py | 170 +++ .../sdp/processors/huggingface/__init__.py | 0 .../huggingface/create_initial_manifest.py | 92 ++ .../huggingface/speech_recognition.py | 145 ++ build/lib/sdp/processors/langs/__init__.py | 13 + build/lib/sdp/processors/langs/arabic.py | 183 +++ build/lib/sdp/processors/langs/armenian.py | 95 ++ build/lib/sdp/processors/langs/kazakh.py | 67 + .../processors/modify_manifest/__init__.py | 13 + .../sdp/processors/modify_manifest/common.py | 403 ++++++ .../modify_manifest/create_manifest.py | 93 ++ .../modify_manifest/data_to_data.py | 1227 +++++++++++++++++ .../modify_manifest/data_to_dropbool.py | 907 ++++++++++++ .../make_letters_uppercase_after_period.py | 80 ++ build/lib/sdp/processors/nemo/__init__.py | 0 .../lib/sdp/processors/nemo/asr_inference.py | 78 ++ build/lib/sdp/processors/nemo/pc_inference.py | 111 ++ .../sdp/processors/nemo/transcribe_speech.py | 417 ++++++ build/lib/sdp/processors/toloka/__init__.py | 13 + build/lib/sdp/processors/toloka/accept_if.py | 155 +++ .../lib/sdp/processors/toloka/create_pool.py | 150 ++ .../sdp/processors/toloka/create_project.py | 128 ++ .../processors/toloka/create_sentence_set.py | 56 + .../sdp/processors/toloka/create_task_set.py | 160 +++ .../processors/toloka/download_responses.py | 244 ++++ build/lib/sdp/processors/toloka/reject_if.py | 160 +++ build/lib/sdp/run_processors.py | 253 ++++ build/lib/sdp/utils/__init__.py | 16 + build/lib/sdp/utils/bootstrap_estimates.py | 273 ++++ build/lib/sdp/utils/common.py | 111 ++ build/lib/sdp/utils/edit_spaces.py | 41 + build/lib/sdp/utils/get_diff.py | 81 ++ build/lib/sdp/utils/import_manager.py | 138 ++ build/lib/sdp/utils/metrics_computation.py | 63 + docs/src/sdp/api.rst | 3 + sdp/processors/__init__.py | 1 + .../yodas2/create_initial_manifest.py | 47 + sdp/processors/datasets/yodas2/download.py | 161 +++ .../modify_manifest/data_to_data.py | 154 +++ tests/test_data_to_data.py | 46 +- 85 files changed, 10966 insertions(+), 10 deletions(-) create mode 100644 build/lib/sdp/__init__.py create mode 100644 build/lib/sdp/logging.py create mode 100644 build/lib/sdp/processors/__init__.py create mode 100644 build/lib/sdp/processors/base_processor.py create mode 100644 build/lib/sdp/processors/datasets/__init__.py create mode 100644 build/lib/sdp/processors/datasets/commoncrawl/__init__.py create mode 100644 build/lib/sdp/processors/datasets/commoncrawl/commoncrawl.py create mode 100644 build/lib/sdp/processors/datasets/commoncrawl/harv_utils.py create mode 100644 build/lib/sdp/processors/datasets/coraa/__init__.py create mode 100644 build/lib/sdp/processors/datasets/coraa/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/datasets/coraal/__init__.py create mode 100644 build/lib/sdp/processors/datasets/coraal/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/datasets/coraal/data_splits.py create mode 100644 build/lib/sdp/processors/datasets/fleurs/__init__.py create mode 100644 build/lib/sdp/processors/datasets/fleurs/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/datasets/ksc2/__init__.py create mode 100644 build/lib/sdp/processors/datasets/ksc2/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/datasets/lhotse.py create mode 100644 build/lib/sdp/processors/datasets/librispeech/__init__.py create mode 100644 build/lib/sdp/processors/datasets/librispeech/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/datasets/masc/__init__.py create mode 100644 build/lib/sdp/processors/datasets/masc/aggregate_segments.py create mode 100644 build/lib/sdp/processors/datasets/masc/apply_reg_exp_on_vtt_entries.py create mode 100644 build/lib/sdp/processors/datasets/masc/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/datasets/masc/get_caption_file_segments.py create mode 100644 build/lib/sdp/processors/datasets/masc/utils.py create mode 100644 build/lib/sdp/processors/datasets/mcv/__init__.py create mode 100644 build/lib/sdp/processors/datasets/mcv/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/datasets/mediaspeech/__init__.py create mode 100644 build/lib/sdp/processors/datasets/mediaspeech/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/datasets/mls/__init__.py create mode 100644 build/lib/sdp/processors/datasets/mls/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/datasets/mls/restore_pc.py create mode 100644 build/lib/sdp/processors/datasets/mtedx/__init__.py create mode 100644 build/lib/sdp/processors/datasets/mtedx/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/datasets/slr102/__init__.py create mode 100644 build/lib/sdp/processors/datasets/slr102/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/datasets/slr140/__init__.py create mode 100644 build/lib/sdp/processors/datasets/slr140/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/datasets/slr83/__init__.py create mode 100644 build/lib/sdp/processors/datasets/slr83/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/datasets/uzbekvoice/__init__.py create mode 100644 build/lib/sdp/processors/datasets/uzbekvoice/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/datasets/voxpopuli/__init__.py create mode 100644 build/lib/sdp/processors/datasets/voxpopuli/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/datasets/voxpopuli/normalize_from_non_pc_text.py create mode 100644 build/lib/sdp/processors/huggingface/__init__.py create mode 100644 build/lib/sdp/processors/huggingface/create_initial_manifest.py create mode 100644 build/lib/sdp/processors/huggingface/speech_recognition.py create mode 100644 build/lib/sdp/processors/langs/__init__.py create mode 100644 build/lib/sdp/processors/langs/arabic.py create mode 100644 build/lib/sdp/processors/langs/armenian.py create mode 100644 build/lib/sdp/processors/langs/kazakh.py create mode 100644 build/lib/sdp/processors/modify_manifest/__init__.py create mode 100644 build/lib/sdp/processors/modify_manifest/common.py create mode 100644 build/lib/sdp/processors/modify_manifest/create_manifest.py create mode 100644 build/lib/sdp/processors/modify_manifest/data_to_data.py create mode 100644 build/lib/sdp/processors/modify_manifest/data_to_dropbool.py create mode 100644 build/lib/sdp/processors/modify_manifest/make_letters_uppercase_after_period.py create mode 100644 build/lib/sdp/processors/nemo/__init__.py create mode 100644 build/lib/sdp/processors/nemo/asr_inference.py create mode 100644 build/lib/sdp/processors/nemo/pc_inference.py create mode 100644 build/lib/sdp/processors/nemo/transcribe_speech.py create mode 100644 build/lib/sdp/processors/toloka/__init__.py create mode 100644 build/lib/sdp/processors/toloka/accept_if.py create mode 100644 build/lib/sdp/processors/toloka/create_pool.py create mode 100644 build/lib/sdp/processors/toloka/create_project.py create mode 100644 build/lib/sdp/processors/toloka/create_sentence_set.py create mode 100644 build/lib/sdp/processors/toloka/create_task_set.py create mode 100644 build/lib/sdp/processors/toloka/download_responses.py create mode 100644 build/lib/sdp/processors/toloka/reject_if.py create mode 100644 build/lib/sdp/run_processors.py create mode 100644 build/lib/sdp/utils/__init__.py create mode 100644 build/lib/sdp/utils/bootstrap_estimates.py create mode 100644 build/lib/sdp/utils/common.py create mode 100644 build/lib/sdp/utils/edit_spaces.py create mode 100644 build/lib/sdp/utils/get_diff.py create mode 100644 build/lib/sdp/utils/import_manager.py create mode 100644 build/lib/sdp/utils/metrics_computation.py create mode 100644 sdp/processors/datasets/yodas2/create_initial_manifest.py create mode 100644 sdp/processors/datasets/yodas2/download.py diff --git a/build/lib/sdp/__init__.py b/build/lib/sdp/__init__.py new file mode 100644 index 00000000..2db92b25 --- /dev/null +++ b/build/lib/sdp/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/build/lib/sdp/logging.py b/build/lib/sdp/logging.py new file mode 100644 index 00000000..10d6b14f --- /dev/null +++ b/build/lib/sdp/logging.py @@ -0,0 +1,18 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +# overriding with the library specific logger, so that it's possible to +# customize in any downstream applications +logger = logging.getLogger("sdp") diff --git a/build/lib/sdp/processors/__init__.py b/build/lib/sdp/processors/__init__.py new file mode 100644 index 00000000..6788c88f --- /dev/null +++ b/build/lib/sdp/processors/__init__.py @@ -0,0 +1,134 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# let's import all supported processors here to simplify target specification + +from sdp.processors.datasets.coraa.create_initial_manifest import ( + CreateInitialManifestCORAA, +) +from sdp.processors.datasets.coraal import ( + CreateInitialManifestCORAAL, + TrainDevTestSplitCORAAL, +) +from sdp.processors.datasets.fleurs.create_initial_manifest import ( + CreateInitialManifestFleurs, +) +from sdp.processors.datasets.uzbekvoice.create_initial_manifest import ( + CreateInitialManifestUzbekvoice, +) +from sdp.processors.datasets.ksc2.create_initial_manifest import ( + CreateInitialManifestKSC2, +) +from sdp.processors.datasets.lhotse import LhotseImport +from sdp.processors.datasets.librispeech.create_initial_manifest import ( + CreateInitialManifestLibrispeech, +) +from sdp.processors.datasets.masc import ( + CreateInitialManifestMASC, + AggregateSegments, + RegExpVttEntries, + GetCaptionFileSegments +) +from sdp.processors.datasets.mediaspeech.create_initial_manifest import CreateInitialManifestMediaSpeech +from sdp.processors.datasets.mcv.create_initial_manifest import CreateInitialManifestMCV +from sdp.processors.datasets.mls.create_initial_manifest import CreateInitialManifestMLS +from sdp.processors.datasets.mls.restore_pc import RestorePCForMLS +from sdp.processors.datasets.mtedx.create_initial_manifest import ( + CreateInitialManifestMTEDX, +) +from sdp.processors.datasets.slr83.create_initial_manifest import ( + CreateInitialManifestSLR83, + CustomDataSplitSLR83, +) +from sdp.processors.datasets.slr102.create_initial_manifest import ( + CreateInitialManifestSLR102, +) +from sdp.processors.datasets.slr140.create_initial_manifest import ( + CreateInitialManifestSLR140, + CustomDataSplitSLR140, +) +from sdp.processors.datasets.voxpopuli.create_initial_manifest import ( + CreateInitialManifestVoxpopuli, +) +from sdp.processors.datasets.voxpopuli.normalize_from_non_pc_text import ( + NormalizeFromNonPCTextVoxpopuli, +) +from sdp.processors.huggingface.speech_recognition import ASRTransformers +from sdp.processors.huggingface.create_initial_manifest import CreateInitialManifestHuggingFace + +from sdp.processors.modify_manifest.common import ( + AddConstantFields, + ApplyInnerJoin, + ChangeToRelativePath, + CombineSources, + DuplicateFields, + KeepOnlySpecifiedFields, + RenameFields, + SortManifest, + SplitOnFixedDuration, +) +from sdp.processors.modify_manifest.create_manifest import ( + CreateCombinedManifests, + CreateInitialManifestByExt, +) +from sdp.processors.modify_manifest.data_to_data import ( + ASRFileCheck, + CopyManifestData, + CountNumWords, + ExtractFromBrackets, + FfmpegConvert, + GetAudioDuration, + GetWER, + InsIfASRInsertion, + InverseNormalizeText, + NormalizeText, + MakeSentence, + ReadDocxLines, + ReadTxtLines, + SoxConvert, + SplitLineBySentence, + SubIfASRSubstitution, + SubMakeLowercase, + SubRegex, +) +from sdp.processors.modify_manifest.data_to_dropbool import ( + DropASRError, + DropASRErrorBeginningEnd, + DropDuplicates, + DropHighCER, + DropHighLowCharrate, + DropHighLowDuration, + DropHighLowWordrate, + DropHighWER, + DropIfNoneOfRegexMatch, + DropIfRegexMatch, + DropIfSubstringInInsertion, + DropLowWordMatchRate, + DropNonAlphabet, + DropOnAttribute, + PreserveByValue, + DropRepeatedFields, +) +from sdp.processors.modify_manifest.make_letters_uppercase_after_period import ( + MakeLettersUppercaseAfterPeriod, +) +from sdp.processors.nemo.asr_inference import ASRInference +from sdp.processors.nemo.pc_inference import PCInference +from sdp.processors.toloka.accept_if import AcceptIfWERLess +from sdp.processors.toloka.create_pool import CreateTolokaPool +from sdp.processors.toloka.create_project import CreateTolokaProject +from sdp.processors.toloka.create_sentence_set import CreateSentenceSet +from sdp.processors.toloka.create_task_set import CreateTolokaTaskSet +from sdp.processors.toloka.download_responses import GetTolokaResults +from sdp.processors.toloka.reject_if import RejectIfBanned diff --git a/build/lib/sdp/processors/base_processor.py b/build/lib/sdp/processors/base_processor.py new file mode 100644 index 00000000..6fc22ee8 --- /dev/null +++ b/build/lib/sdp/processors/base_processor.py @@ -0,0 +1,502 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import json +import multiprocessing +import os +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass +from itertools import chain +from typing import Any, Dict, List, Optional, Union + +from tqdm import tqdm +from tqdm.contrib.concurrent import process_map + +from sdp.logging import logger + + +@dataclass +class DataEntry: + """A wrapper for data entry + any additional metrics.""" + + data: Optional[Dict] # can be None to drop the entry + metrics: Any = None + + +class BaseProcessor(ABC): + """Abstract class for SDP processors. + + All processor classes inherit from the ``BaseProcessor`` class. + This is a simple abstract class which has 2 empty methods: :meth:`process` + and :meth:`test`. + + These serve to remind us that SDP essentially just runs ``.test()`` on all + processors (to implement :ref:`run-time tests `), + and then ``.process()`` on all processors. + + Args: + output_manifest_file (str): path of where the output manifest file will + be located. Cannot have the same value as ``input_manifest_file``. + input_manifest_file (str): path of where the input manifest file is + located. This arg is optional - some processors may not take in + an input manifest because they need to create an initial manifest + from scratch (ie from some transcript file that is in a format + different to the NeMo manifest format). Cannot have the same value + as ``input_manifest_file``. + """ + + def __init__(self, output_manifest_file: str, input_manifest_file: Optional[str] = None, **kwargs): + + if output_manifest_file and input_manifest_file and (output_manifest_file == input_manifest_file): + # we cannot have the same input and output manifest file specified because we need to be able to + # read from the input_manifest_file and write to the output_manifest_file at the same time + raise ValueError("A processor's specified input_manifest_file and output_manifest_file cannot be the same") + + self.output_manifest_file = output_manifest_file + self.input_manifest_file = input_manifest_file + + @abstractmethod + def process(self): + """Should be overriden by the child classes to implement some data processing.""" + pass + + def test(self): + """This method can be used to perform "runtime" tests. + + This can be any kind of self-consistency tests, but are usually + in the form of checking that provided input test data entries match + provided output test data entries. + + There are not tests by default. + """ + +class BaseParallelProcessor(BaseProcessor): + """ + A processor that performs per-entry processing in parallel (using Dask or multiprocessing). + + Args: + input_manifest_file (str): Path to the input manifest file. + output_manifest_file (str): Path where the output manifest file will be written. + max_workers (int): Maximum number of workers. + chunksize (int): Chunk size used for parallel routines. + in_memory_chunksize (int): Maximum number of entries to load at once. + test_cases (list[dict]): Optional list of test cases. + use_dask (bool): If True, use Dask for parallelization; otherwise, use multiprocessing. + dask_client: (Optional) An existing Dask client. + """ + + def __getstate__(self): + state = self.__dict__.copy() + # Remove the Dask client from state (it is not picklable) + if 'dask_client' in state: + state['dask_client'] = None + return state + + def __init__( + self, + input_manifest_file: Optional[str] = None, + output_manifest_file: Optional[str] = None, + max_workers: int = -1, + chunksize: int = 100, + in_memory_chunksize: int = 100000, + test_cases: Optional[List[Dict]] = None, + use_dask: bool = True, + dask_client=None, + **kwargs, + ): + kwargs.pop("use_dask", None) # + super().__init__(input_manifest_file=input_manifest_file, output_manifest_file=output_manifest_file, **kwargs) + if max_workers == -1: + max_workers = os.cpu_count() + self.max_workers = max_workers + self.chunksize = chunksize + self.in_memory_chunksize = in_memory_chunksize + self.number_of_entries = 0 + self.total_duration = 0 + self.start_time = time.time() + self.test_cases = test_cases or [] + self.use_dask = use_dask + self.dask_client = dask_client + + def prepare(self): + """Can be used in derived classes to prepare the processing. + + """ + pass + + def process(self): + """A fork in the road to pick dask or classic processing + + """ + os.environ.setdefault("PATH", os.defpath) + + self.prepare() + + os.makedirs(os.path.dirname(self.output_manifest_file), exist_ok=True) + metrics = [] + + #Ability to work sa legacy and as dask + if self.use_dask: + self._process_with_dask(metrics) + else: + self._process_with_multiprocessing(metrics) + self.finalize(metrics) + + def _process_with_dask(self, metrics): + import dask.bag as db + from dask.distributed import Client + + if self.dask_client is None: + self.dask_client = Client() + client = self.dask_client + from sdp.logging import logger + logger.info(f"Using Dask client with dashboard at: {client.dashboard_link}") + + # Delegate manifest reading to read_manifest() which returns a Dask bag. + bag = self.read_manifest() + + if not isinstance(bag, db.Bag): + bag = db.from_sequence(bag) + total_entries = bag.count().compute() + + if total_entries == 0: + logger.info("No entries found in the manifest input. Proceeding to create an empty output manifest.") + results = [] + else: + processed_bag = bag.map(lambda entry: self.process_dataset_entry(entry)).flatten() + results = processed_bag.compute() + + with open(self.output_manifest_file, "wt", encoding="utf8") as fout: + for entry in results: + metrics.append(entry.metrics) + if entry.data is not None: + json.dump(entry.data, fout, ensure_ascii=False) + fout.write("\n") + self.number_of_entries += 1 + self.total_duration += entry.data.get("duration", 0) + logger.info(f"Processed {total_entries} entries using Dask.") + + def _process_with_multiprocessing(self, metrics): + with open(self.output_manifest_file, "wt", encoding="utf8") as fout: + for manifest_chunk in self._chunk_manifest(): + data = itertools.chain( + *process_map( + self.process_dataset_entry, + manifest_chunk, + max_workers=self.max_workers, + chunksize=self.chunksize, + ) + ) + for data_entry in tqdm(data): + metrics.append(data_entry.metrics) + if data_entry.data is None: + continue + json.dump(data_entry.data, fout, ensure_ascii=False) + fout.write("\n") + self.number_of_entries += 1 + self.total_duration += data_entry.data.get("duration", 0) + + def _chunk_manifest(self): + """Splits the input manifest into chunks of in_memory_chunksize size. + Only used in non-Dask (multiprocessing) mode. + """ + manifest_chunk = [] + # When use_dask is False, read_manifest() returns an iterator. + for idx, data_entry in enumerate(self.read_manifest(), 1): + manifest_chunk.append(data_entry) + if idx % self.in_memory_chunksize == 0: + yield manifest_chunk + manifest_chunk = [] + if manifest_chunk: + yield manifest_chunk + + def read_manifest(self): + """ + Reads entries from the input manifest. + + Behavior depends on the parallelization mode: + - When use_dask is True: + If the input_manifest_file exists and is non-empty, returns a Dask bag (reading in 256KB blocks). + Otherwise, logs the condition and returns an empty Dask bag. + - When use_dask is False: + If the input_manifest_file does not exist or is empty, logs the condition and returns an empty iterator. + Otherwise, opens the file in text mode, strips each line, and yields the parsed JSON from non-empty lines. + + This unified behavior lets the processor run even in manifest-creation mode. + + """ + from sdp.logging import logger + if self.use_dask: + import dask.bag as db + if self.input_manifest_file and os.path.exists(self.input_manifest_file) and os.path.getsize(self.input_manifest_file) > 0: + bag = db.read_text(self.input_manifest_file, blocksize=2**18).map(json.loads) + return bag + else: + logger.info("No input manifest file provided or file is empty. Returning an empty Dask bag for manifest creation.") + return db.from_sequence([]) + else: + if not self.input_manifest_file or not os.path.exists(self.input_manifest_file): + logger.info("No input manifest file provided or file does not exist. Continuing with an empty manifest.") + return iter([]) + else: + #if use_dask = False, we get here + def generator(): #Reading manifest line by line, adding only non emply lines + with open(self.input_manifest_file, "rt", encoding="utf8") as fin: + for line in fin: + if line: + yield json.loads(line) + return generator() + + @abstractmethod + def process_dataset_entry(self, data_entry) -> List[Any]: + """ + Must be implemented in derived classes. + For each data entry, return a list of DataEntry objects. + """ + raise NotImplementedError("Derived classes must implement process_dataset_entry.") + + def finalize(self, metrics: List[Any]): + """Outputs metrics about the processed data.""" + from sdp.logging import logger + logger.info("Total number of entries after processing: %d", self.number_of_entries) + if self.total_duration: + logger.info("Total audio duration (hours) after processing: %.2f", self.total_duration / 3600) + else: + logger.info("Unable to calculate total audio duration (hours). Ensure that the manifest file includes a 'duration' key.") + elapsed = time.time() - self.start_time + logger.info("Processor completed in (seconds): %.2f", elapsed) + + def test(self): + """Applies processing to each test case and raises an error if the output does not match expected output.""" + for test_case in self.test_cases: + input_data = test_case["input"].copy() if isinstance(test_case["input"], dict) else test_case["input"] + generated_outputs = self.process_dataset_entry(input_data) + expected_outputs = [test_case["output"]] if not isinstance(test_case["output"], list) else test_case["output"] + for gen_out, exp_out in zip(generated_outputs, expected_outputs): + gen_data = gen_out.data if hasattr(gen_out, "data") else gen_out + if gen_data != exp_out: + raise RuntimeError( + "Runtime test failed.\nTest input: {}\nGenerated output: {}\nExpected output: {}" + .format(test_case["input"], gen_data, exp_out) + ) + + + +# ------------------ Legacy Parallel Processor ------------------ #Just for reference +class LegacyParallelProcessor(BaseProcessor): + """ + A legacy parallel processor implementation using multiprocessing and process_map. + + This class processes the manifest in chunks (using process_map) and is provided for compatibility. + Child classes must implement process_dataset_entry(). + + Args: + max_workers (int): maximum number of workers that will be spawned + during the parallel processing. + chunksize (int): the size of the chunks that will be sent to worker processes + during the parallel processing. + in_memory_chunksize (int): the maximum number of input data entries that will + be read, processed and saved at a time. + test_cases (list[dict]): an optional list of dicts containing test + cases for checking that the processor makes the changes that we + are expecting. + + The dicts must have a key ``input``, the value of which is a dictionary + containing data which is our test's input manifest line, and a key + ``output``, the value of which is a dictionary containing data which is + the expected output manifest line. + """ + def __init__( + self, + max_workers: int = -1, + chunksize: int = 100, + in_memory_chunksize: int = 100000, + test_cases: Optional[List[Dict]] = None, + **kwargs, + ): + kwargs.pop("use_dask", None) # + super().__init__(**kwargs) + if max_workers == -1: + max_workers = multiprocessing.cpu_count() + self.max_workers = max_workers + self.chunksize = chunksize + self.in_memory_chunksize = in_memory_chunksize + self.number_of_entries = 0 + self.total_duration = 0 + self.start_time = time.time() + self.test_cases = test_cases or [] + + def process(self): + """Parallelized implementation of the data processing. + The execution flow of this method is the following. + 1. :meth:`prepare` is called. It's empty by default but can be used to + e.g. download the initial data files or compute some aggregates + required for subsequent processing. + 2. A for-loop begins that loops over all ``manifest_chunk`` lists yielded + by the :meth:`_chunk_manifest` method. :meth:`_chunk_manifest` reads data + entries yielded by :meth:`read_manifest` and yields lists containing + ``in_memory_chunksize`` data entries. + Inside the for-loop: + a) :meth:`process_dataset_entry` is called **in parallel** on each element + of the ``manifest_chunk`` list. + b) All metrics are aggregated. + c) All output data-entries are added to the contents of ``output_manifest_file``. + Note: + * The default implementation of :meth:`read_manifest` reads an input manifest file + and returns a list of dictionaries for each line (we assume a standard NeMo format + of one json per line). + * :meth:`process_dataset_entry` is called **in parallel** on each element + of the list created in the previous step. Note that you cannot create + any new counters or modify the attributes of this class in any way + inside that function as this will lead to an undefined behavior. + Each call to the :meth:`process_dataset_entry` returns a list of + ``DataEntry`` objects that are then aggregated together. ``DataEntry`` + simply defines a ``data`` and ``metrics`` keys. + * If ``data`` is set to None, the objects are ignored (metrics are still collected). + 3. All ``metrics`` keys that were collected in the for-loop above are passed over to + :meth:`finalize` for any desired metric aggregation and reporting. + Here is a diagram outlining the execution flow of this method: + .. can only be viewed in the online documentation + .. raw:: html +
+ +
+ """ + self.prepare() + os.makedirs(os.path.dirname(self.output_manifest_file), exist_ok=True) + metrics = [] + with open(self.output_manifest_file, "wt", encoding="utf8") as fout: + for manifest_chunk in self._chunk_manifest(): + # this will unroll all inner lists + data = itertools.chain( + *process_map( + self.process_dataset_entry, + manifest_chunk, + max_workers=self.max_workers, + chunksize=self.chunksize, + ) + ) + for data_entry in tqdm(data): + if data_entry.metrics is not None: + pass # optionally accumulate metrics here + if data_entry.data is None: + continue + json.dump(data_entry.data, fout, ensure_ascii=False) + self.number_of_entries += 1 + self.total_duration += data_entry.data.get("duration", 0) + fout.write("\n") + self.finalize(self.test_cases) + + def prepare(self): + """Can be used in derived classes to prepare the processing in any way. + E.g., download data or compute some aggregates. Will be called before + starting processing the data. + """ + + def _chunk_manifest(self): + """Splits the manifest into smaller chunks defined by ``in_memory_chunksize``.""" + manifest_chunk = [] + for idx, data_entry in enumerate(self.read_manifest(), 1): + manifest_chunk.append(data_entry) + if idx % self.in_memory_chunksize == 0: + yield manifest_chunk + manifest_chunk = [] + if manifest_chunk: + yield manifest_chunk + + def read_manifest(self): + """Reading the input manifest file. + .. note:: + This function should be overridden in the "initial" class creating + manifest to read from the original source of data. + """ + if not self.input_manifest_file: + raise NotImplementedError("Override this method if no input manifest file is used") + with open(self.input_manifest_file, "rt", encoding="utf8") as fin: + for line in fin: + yield json.loads(line) + + @abstractmethod + def process_dataset_entry(self, data_entry) -> List[DataEntry]: + """Needs to be implemented in the derived classes. + Each returned value should be a ``DataEntry`` object that will hold + a dictionary (or anything else that can be json-serialized) with + the actual data + any additional metrics required for statistics + reporting. Those metrics can be used in :meth:`finalize` to + prepare for final reporting. + ``DataEntry`` is a simple dataclass defined in the following way:: + @dataclass + class DataEntry: + # can be None to drop the entry + data: Optional[Dict] + # anything - you'd need to aggregate all + # values in the finalize method manually + metrics: Any = None + .. note:: + This method should always return a list of objects to allow a + one-to-many mapping. E.g., if you want to cut an utterance into + multiple smaller parts, you can return a list of all the produced + utterances and they will be handled correctly. + The many-to-one mapping is not currently supported by design of + this method (but can still be done if you don't inherit from + this class and process the data sequentially). + Args: + data_entry: most often, ``data_entry`` will be a dictionary + containing items which represent the JSON manifest entry. + Sometimes, such as in :class:`sdp.processors.CreateInitialManifestMLS`, + it will be a string containing a line for that utterance + from the original raw MLS transcript. In general it is an element + of the list returned from the :meth:`read_manifest` method. + """ + # TODO: it would be more straightforward to use a generator here, but + # seems that it's not supported with multiprocessing. Is there a + # way to make it work? + raise NotImplementedError("Derived classes must implement `process_dataset_entry`.") + + def finalize(self, metrics): + """Can be used to output statistics about the processed data. + By default outputs new number of entries/hours. + + Args: + metrics (list): a list containing all ``metrics`` keys from the + data entries returned from the :meth:`process_dataset_entry` + method. + """ + logger.info("Total number of entries after processing (legacy): %d", self.number_of_entries) + if self.total_duration: + logger.info("Total audio duration (hours) after processing (legacy): %.2f", self.total_duration / 3600) + else: + logger.info("Unable to calculate total audio duration (legacy). Please ensure that the manifest file includes a 'duration' key.") + elapsed = time.time() - self.start_time + logger.info("Legacy processor completed in (seconds): %.2f", elapsed) + def test(self): + """Applies processing to "test_cases" and raises an error in case of mismatch.""" + for test_case in self.test_cases: + generated_outputs = self.process_dataset_entry(test_case["input"].copy()) + expected_outputs = ( + [test_case["output"]] if not isinstance(test_case["output"], list) else test_case["output"] + ) + + for generated_output, expected_output in zip(generated_outputs, expected_outputs): + generated_output = generated_output.data + + if generated_output != expected_output: + raise RuntimeError( + "Runtime test failed.\n" + f"Test input: {test_case['input']}\n" + f"Generated output: {generated_output}\n" + f"Expected output: {expected_output}" + ) \ No newline at end of file diff --git a/build/lib/sdp/processors/datasets/__init__.py b/build/lib/sdp/processors/datasets/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/build/lib/sdp/processors/datasets/commoncrawl/__init__.py b/build/lib/sdp/processors/datasets/commoncrawl/__init__.py new file mode 100644 index 00000000..c3909eaa --- /dev/null +++ b/build/lib/sdp/processors/datasets/commoncrawl/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .commoncrawl import SplitByVttSentence diff --git a/build/lib/sdp/processors/datasets/commoncrawl/commoncrawl.py b/build/lib/sdp/processors/datasets/commoncrawl/commoncrawl.py new file mode 100644 index 00000000..8a5cc2c6 --- /dev/null +++ b/build/lib/sdp/processors/datasets/commoncrawl/commoncrawl.py @@ -0,0 +1,99 @@ +import os +from typing import List + +import soundfile as sf +from sdp.processors.base_processor import BaseParallelProcessor, DataEntry +from sdp.processors.datasets.commoncrawl.harv_utils import split_by_vtt + + + +class SplitByVttSentence(BaseParallelProcessor): + """ + A class for splitting audio files based on VTT (WebVTT) sentence-level segmentation in a dataset. + + Args: + splited_audio_dir (str): The directory to store the split audio files. + source_audio_key (str): The field in the dataset containing the path to the source audio files. + target_audio_key (str): The field to store the paths of the split audio files. + duration_key (str): The field to store the duration of each split audio segment. + text_key (str): The field to store the transcriptions corresponding to each split audio segment. + caption_file_key (str): The field in the dataset containing the path to the VTT (WebVTT) files for segmentation. + additional_fields (List[str], optional): List of additional fields to copy from the original data entry to the split entries. + Defaults to an empty list. + duration_threshold (float, optional): The duration threshold in seconds for each split audio segment. Defaults to 10.0. + """ + + def __init__( + self, + splited_audio_dir: str, + source_audio_field: str, + target_audio_field: str, + duration_field: str, + text_field: str, + vtt_field: str, + additional_fields: List[str] = [], + duration_threshold: float = 10.0, + **kwargs, + ): + super().__init__(**kwargs) + self.splited_audio_dir = splited_audio_dir + self.source_audio_field = source_audio_field + self.target_audio_field = target_audio_field + self.duration_field = duration_field + self.text_field = text_field + self.vtt_field = vtt_field + self.duration_threshold = duration_threshold + self.additional_fields = additional_fields + + def prepare(self): + os.makedirs(self.splited_audio_dir, exist_ok=True) + + def process_dataset_entry(self, data_entry): + vtt_file = data_entry[self.vtt_field] + source_audio = data_entry[self.source_audio_field] + res_list = [] + + if os.path.isfile(source_audio): + data, samplerate = sf.read(source_audio) + text_list, start_s, end_s = split_by_vtt(vtt_file, samplerate) + text_c = '' + start_c, end_c = 0, 0 + if text_list: + for text, start_sr, end_sr in zip(text_list, start_s, end_s): + text_c += " " + text + if start_c == 0: + start_c = start_sr + else: + pass + end_c = end_sr + if len(text_c) > 0 and ( + end_c - start_c > self.duration_threshold * samplerate or + text_c[-1] == "." or text_c[-1] == "?"): + res_list.append( + self.makeDataEntry(data_entry, data, vtt_file, samplerate, text_c, start_c, end_c)) + text_c = '' + start_c, end_c = 0, 0 + else: + pass + if len(text_c) > 0 and start_c != 0: + res_list.append(self.makeDataEntry(data_entry, data, vtt_file, samplerate, text_c, start_c, end_c)) + + return res_list + + def makeDataEntry(self, data_entry, data, vtt_file, samplerate, text_c, start_c, end_c): + data_sample = data[start_c:end_c] + wav_save_file = os.path.join(self.splited_audio_dir, '/'.join(os.path.splitext(vtt_file)[0].split('/')[-2:]), + str(int(start_c / (samplerate / 1000))) + "-" + str( + int(end_c / (samplerate / 1000))) + ".wav") + if not os.path.isfile(wav_save_file): + os.makedirs(os.path.split(wav_save_file)[0], exist_ok=True) + sf.write(wav_save_file, data_sample, samplerate) + + data = {self.target_audio_field: wav_save_file, + self.duration_field: data_sample.shape[0] / samplerate, + self.text_field: text_c.strip(), + } + for field in self.additional_fields: + data[field] = data_entry[field] + return DataEntry(data=data) + diff --git a/build/lib/sdp/processors/datasets/commoncrawl/harv_utils.py b/build/lib/sdp/processors/datasets/commoncrawl/harv_utils.py new file mode 100644 index 00000000..24efe80e --- /dev/null +++ b/build/lib/sdp/processors/datasets/commoncrawl/harv_utils.py @@ -0,0 +1,45 @@ +import webvtt # pip install webvtt-py +from datetime import datetime +from sdp.logging import logger + + +def parse_hours(inp): + inp_list = inp.split(":") + if len(inp_list) == 3 and int(inp_list[0]) >= 24: + hours = int(inp_list[0]) % 24 + days = int(inp_list[0]) // 24 + if days < 31: + inp = str(1 + days) + ":" + str(hours) + ":" + ":".join(inp_list[1:]) + return datetime.strptime(inp, '%d:%H:%M:%S.%f') + else: + months = days // 31 + days = days % 31 + inp = str(1 + months) + "/" + str(1 + days) + " " + str(hours) + ":" + ":".join(inp_list[1:]) + return datetime.strptime(inp, '%m/%d %H:%M:%S.%f') + else: + return datetime.strptime(inp, '%H:%M:%S.%f') + + +def split_by_vtt(vtt_file, samplerate): + try: + _begin = datetime.strptime('00:00:00.000', '%H:%M:%S.%f') + text_list, start_s, end_s = [], [], [] + for caption in webvtt.read(vtt_file): + text = ' '.join(caption.text.split('\n')) + + _start = parse_hours(caption.start) + start = (_start - _begin).total_seconds() + start_sr = int(start * samplerate) + + _end = parse_hours(caption.end) + end = (_end - _begin).total_seconds() + end_sr = int(end * samplerate) + + text_list.append(text.strip()) + start_s.append(start_sr) + end_s.append(end_sr) + return text_list, start_s, end_s + except Exception as e: + logger.warning(str(e) + vtt_file) + return None, None, None + diff --git a/build/lib/sdp/processors/datasets/coraa/__init__.py b/build/lib/sdp/processors/datasets/coraa/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/build/lib/sdp/processors/datasets/coraa/create_initial_manifest.py b/build/lib/sdp/processors/datasets/coraa/create_initial_manifest.py new file mode 100644 index 00000000..5be1a8ab --- /dev/null +++ b/build/lib/sdp/processors/datasets/coraa/create_initial_manifest.py @@ -0,0 +1,125 @@ +import glob +import os +from pathlib import Path +from typing import List +import pandas as pd + +import rarfile #Needs to be installed +import sox +from sox import Transformer + +from sdp.processors.base_processor import BaseParallelProcessor, DataEntry +from sdp.utils.common import extract_archive + +class CreateInitialManifestCORAA(BaseParallelProcessor): + """ + Processor to create initial manifest file fo CORAA ASR dataset + + Dataset link: https://github.com/nilc-nlp/CORAA + + Args: + raw_data_dir (str): the path to the directory in which all the data will be downloaded. + extract_archive_dir (str): directory where the extracted data will be saved. + data_split (str): "train", "dev" or "test". + resampled_audio_dir (str): the directory where the resampled wav files will be stored. + already_extracted (bool): if True, we will not try to extract the raw data. + Defaults to False. + already_downloaded (bool): if True, we will not try to download files. + target_samplerate (int): sample rate (Hz) to use for resampling. This parameter will + Defaults to 16000. + target_nchannels (int): number of channels to create during resampling process. + Defaults to 1. + exclude_dataset: list: list of the dataset names that will be excluded when creating initial manifest. + Options 'SP2010', 'C-ORAL-BRASIL I', 'NURC-Recife', 'TEDx Talks', 'ALIP' + + """ + def __init__( + self, + raw_data_dir: str, + extract_archive_dir: str, + data_split: str, + resampled_audio_dir: str, + already_extracted: bool = False, + already_downloaded: bool = False, + target_samplerate: int = 16000, + target_nchannels: int = 1, + exclude_dataset: list = [], + **kwargs, + ): + super().__init__(**kwargs) + self.raw_data_dir = Path(raw_data_dir) + self.extract_archive_dir = extract_archive_dir + self.data_split = data_split + self.already_downloaded = already_downloaded + self.already_extracted = already_extracted + self.exclude_dataset = exclude_dataset + self.resampled_audio_dir = resampled_audio_dir + self.target_samplerate = target_samplerate + self.target_nchannels = target_nchannels + + def prepare(self): + """Downloading and extracting data (unless already done).""" + os.makedirs(self.raw_data_dir, exist_ok=True) + os.makedirs(self.resampled_audio_dir, exist_ok=True) + os.makedirs(self.extract_archive_dir, exist_ok=True) + if not self.already_downloaded: + try: + from huggingface_hub import snapshot_download + snapshot_download(repo_id="gabrielrstan/CORAA-v1.1", repo_type='dataset', local_dir=self.raw_data_dir) + except ImportError: + raise ImportError("huggingface_hub is required to download the dataset. Please install it with pip install huggingface_hub") + if not self.already_extracted: + + if self.data_split == 'train': + first_rar_file = glob.glob(str(self.raw_data_dir) + "/train_dividido"+f"/*{self.data_split}*1.rar") + if first_rar_file and not isinstance(first_rar_file, str): + first_rar_file = first_rar_file[0] + + if rarfile.is_rarfile(first_rar_file): + rar = rarfile.RarFile(first_rar_file) + rar.extractall(path=self.extract_archive_dir) + else: + + zip_files =glob.glob(str(self.raw_data_dir) + f"/*{self.data_split}.zip") + if not zip_files: + raise RuntimeError( + f"Did not find any file matching {self.raw_data_dir}/*.zip. " + "Make sure your download passed succesfully." + ) + elif len(zip_files) > 1: + raise RuntimeError( + f"Expecting exactly one {self.data_split}.zip file in directory {self.raw_data_dir}" + ) + + extract_archive(zip_files[0], self.extract_archive_dir) + self.transcription_file = self.raw_data_dir / f"metadata_{self.data_split}_final.csv" + self.audio_path_prefix = self.extract_archive_dir + + def read_manifest(self): + self.df = pd.read_csv(self.transcription_file) + data_entries = self.df[~self.df['dataset'].isin(self.exclude_dataset)][['file_path','text']] + res = [tuple(row[1]) for row in data_entries.iterrows()] + return res + + def process_dataset_entry(self, data_entry) -> List[DataEntry]: + + file_path, text = data_entry + file_name = os.path.splitext(os.path.basename(file_path))[0] + transcript_text = text.strip() + + audio_path = os.path.join(self.audio_path_prefix, file_path) + output_wav_path = os.path.join(self.resampled_audio_dir, file_name + ".wav") + + if not os.path.exists(output_wav_path): + tfm = Transformer() + tfm.rate(samplerate=self.target_samplerate) + tfm.channels(n_channels=self.target_nchannels) + tfm.build(input_filepath=audio_path, output_filepath=output_wav_path) + + data = { + "audio_filepath": output_wav_path, + "duration": float(sox.file_info.duration(output_wav_path)), + "text": transcript_text, + } + + return [DataEntry(data=data)] diff --git a/build/lib/sdp/processors/datasets/coraal/__init__.py b/build/lib/sdp/processors/datasets/coraal/__init__.py new file mode 100644 index 00000000..7d2fff52 --- /dev/null +++ b/build/lib/sdp/processors/datasets/coraal/__init__.py @@ -0,0 +1,16 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .create_initial_manifest import CreateInitialManifestCORAAL +from .data_splits import TrainDevTestSplitCORAAL diff --git a/build/lib/sdp/processors/datasets/coraal/create_initial_manifest.py b/build/lib/sdp/processors/datasets/coraal/create_initial_manifest.py new file mode 100644 index 00000000..16aa166a --- /dev/null +++ b/build/lib/sdp/processors/datasets/coraal/create_initial_manifest.py @@ -0,0 +1,218 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import glob +import os +import urllib.request +from pathlib import Path + +import pandas as pd +from sox import Transformer + +from sdp.processors.base_processor import BaseParallelProcessor, DataEntry +from sdp.utils.common import download_file, extract_archive + + +def get_coraal_url_list(): + """Returns url list for CORAAL dataset. + + There are a few mistakes in the official url list that are fixed here. + Can be overridden by tests to select a subset of urls. + """ + dataset_url = "http://lingtools.uoregon.edu/coraal/coraal_download_list.txt" + urls = [] + for file_url in urllib.request.urlopen(dataset_url): + file_url = file_url.decode('utf-8').strip() + # fixing known errors in the urls + if file_url == 'http://lingtools.uoregon.edu/coraal/les/2021.07/LES_metadata_2018.10.06.txt': + file_url = 'http://lingtools.uoregon.edu/coraal/les/2021.07/LES_metadata_2021.07.txt' + if file_url == 'http://lingtools.uoregon.edu/coraal/vld/2021.07/VLD_metadata_2018.10.06.txt': + file_url = 'http://lingtools.uoregon.edu/coraal/vld/2021.07/VLD_metadata_2021.07.txt' + urls.append(file_url) + return urls + + +class CreateInitialManifestCORAAL(BaseParallelProcessor): + """Processor to create initial manifest for the Corpus of Regional African American Language (CORAAL) dataset. + + Dataset link: https://oraal.github.io/coraal + + Will download all files, extract tars and split wav files based on the + provided durations in the transcripts. + + Args: + raw_data_dir (str): where to put raw downloaded data. + resampled_audio_dir (str): where to put re-sampled and trimmed wav files. + target_samplerate (int): sample rate to resample to. Defaults to 16000. + target_nchannels (int): target number of channels. Defaults to 1. + drop_pauses (bool): if True, will drop all transcriptions that contain + only silence (indicated by ``(pause X)`` in the transcript). + Defaults to True. + group_duration_threshold (float): can be used to group consecutive + utterances from the same speaker to a longer duration. Set to 0 + to disable this grouping (but note that many utterances are + transcribed with only a few seconds, so grouping is generally + advised). Defaults to 20. + + Returns: + This processor generates an initial manifest file with the following fields:: + + { + "audio_filepath": , + "duration": , + "text": , + "original_file": , + "speaker": , + "is_interviewee": , + "gender": , + "age": , + "education": , + "occupation": , + } + """ + + def __init__( + self, + raw_data_dir: str, + resampled_audio_dir: str, + target_samplerate: int = 16000, + target_nchannels: int = 1, + drop_pauses: bool = True, + group_duration_threshold: float = 20.0, + **kwargs, + ): + super().__init__(**kwargs) + self.raw_data_dir = Path(raw_data_dir) + self.resampled_audio_dir = resampled_audio_dir + self.target_samplerate = target_samplerate + self.target_nchannels = target_nchannels + self.drop_pauses = drop_pauses + self.group_duration_threshold = group_duration_threshold + + def prepare(self): + os.makedirs(self.raw_data_dir, exist_ok=True) + os.makedirs(self.resampled_audio_dir, exist_ok=True) + + # downloading all files + for file_url in get_coraal_url_list(): + download_file(file_url, str(self.raw_data_dir)) + + os.makedirs(self.raw_data_dir / "audio", exist_ok=True) + os.makedirs(self.raw_data_dir / "transcripts", exist_ok=True) + # extracting all files + for data_file in glob.glob(f'{self.raw_data_dir}/*_audio_*.tar.gz'): + # need to set force_extract=True, since there is no folder inside, just a list of files + # and we extract data from multiple tars. Ideally, should change the way we check + # for extracted data (currently there is an assumption that all data in archive is in a single folder) + extract_archive(data_file, self.raw_data_dir / "audio", force_extract=True) + for data_file in glob.glob(f'{self.raw_data_dir}/*_textfiles_*.tar.gz'): + extract_archive(data_file, self.raw_data_dir / "transcripts", force_extract=True) + + def read_manifest(self): + dfs = [] + for data_file in glob.glob(f'{self.raw_data_dir}/transcripts/*.txt'): + df = pd.read_csv(data_file, delimiter='\t') + df['Basefile'] = os.path.basename(data_file)[:-4] # dropping .wav in the end + + if self.drop_pauses: + df = df[~df['Content'].str.contains(r'\(pause \d+(?:\.\d+)?\)')] + + # grouping consecutive segments from the same speaker + if self.group_duration_threshold > 0: + df['Duration'] = df['EnTime'] - df['StTime'] + # puts each sequence of same speaker utts in a "bin" + speaker_bins = (~df['Spkr'].eq(df['Spkr'].shift())).cumsum() + # within each bin, computes cumulative duration and then int-divides by the threshold + df['ThresholdMult'] = df.groupby(speaker_bins)['Duration'].transform( + lambda x: pd.Series.cumsum(x) // self.group_duration_threshold + ) + # finally, we take all positions where the int-division changes, + # which indicates that cumsum exceded the threshold. And combine those + # with speaker-change positions to get the final groups for utterance merging + final_bins = ( + (~df['Spkr'].eq(df['Spkr'].shift())) | (~df['ThresholdMult'].eq(df['ThresholdMult'].shift())) + ).cumsum() + df = df.groupby(final_bins).agg( + { + 'StTime': 'min', + 'EnTime': 'max', + 'Content': ' '.join, + # will be the same in the group + 'Spkr': lambda x: x.iloc[0], + 'Basefile': lambda x: x.iloc[0], + } + ) + # assigning label for interviewee vs interviewer (can be used to select a subset later) + df['is_interviewee'] = df.apply(lambda x: x['Spkr'] in x['Basefile'], axis=1) + + # matching with metadata (age, gender, etc.) + metadata_dfs = [] + for data_file in glob.glob(f'{self.raw_data_dir}/*_metadata_*.txt'): + metadata_dfs.append(pd.read_csv(data_file, delimiter='\t')) + metadata_df = pd.concat(metadata_dfs) + # only selecting a subset of columns - can be changed if more are needed + # dropping duplicates since there are multiple rows per speaker because of + # bit-rate, tar name and other file-specific information + metadata_df = metadata_df[['CORAAL.Spkr', 'Gender', 'Age', 'Education', 'Occupation']].drop_duplicates() + df = df.merge(metadata_df, left_on='Spkr', right_on='CORAAL.Spkr', how='left') + df = df.drop('CORAAL.Spkr', axis=1) + + dfs.append(df) + + df = pd.concat(dfs) + # would be better to keep it as df, but .values is way faster than .iterrows + return df.values + + def process_dataset_entry(self, data_entry): + ( + start_time, + end_time, + content, + speaker, + basefile, + is_interviewee, + gender, + age, + education, + occupation, + ) = data_entry + + src_file = str(self.raw_data_dir / 'audio' / (basefile + '.wav')) + output_wav_path = os.path.join( + self.resampled_audio_dir, + f"{basefile}_{int(start_time * 1000)}_{int(end_time * 1000)}.wav", + ) + + if not os.path.exists(output_wav_path): + tfm = Transformer() + tfm.trim(start_time, end_time) + tfm.rate(samplerate=self.target_samplerate) + tfm.channels(n_channels=self.target_nchannels) + tfm.build(input_filepath=src_file, output_filepath=output_wav_path) + + data = { + "audio_filepath": output_wav_path, + "duration": end_time - start_time, + "text": content.strip(), + "original_file": basefile, + "speaker": speaker, + "is_interviewee": is_interviewee, + "gender": gender, + "age": age, + "education": education, + "occupation": occupation, + } + + return [DataEntry(data=data)] diff --git a/build/lib/sdp/processors/datasets/coraal/data_splits.py b/build/lib/sdp/processors/datasets/coraal/data_splits.py new file mode 100644 index 00000000..82e2819d --- /dev/null +++ b/build/lib/sdp/processors/datasets/coraal/data_splits.py @@ -0,0 +1,130 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from sdp.processors.base_processor import BaseParallelProcessor, DataEntry + + +class TrainDevTestSplitCORAAL(BaseParallelProcessor): + """Custom train-dev-test split for CORAAL dataset. + + Split is done speaker-wise, so the same speakers don't appear in different + splits. + + Args: + data_split (str): train, dev or test. + + Returns: + All the same fields as in the input manifest, but only a subset of + the data is retained. + """ + + def __init__( + self, + data_split: str, + **kwargs, + ): + super().__init__(**kwargs) + if data_split not in ["train", "dev", "test"]: + raise ValueError("data_split has to be either train, dev or test") + self.data_split = data_split + self.split_map = {} + self.split_map["train"] = set( + [ + 'ATL_se0_ag1_m', + 'DCA_se1_ag1_f', + 'DCA_se1_ag2_f', + 'DCA_se1_ag2_m', + 'DCA_se1_ag3_f', + 'DCA_se1_ag3_m', + 'DCA_se1_ag4_m', + 'DCA_se2_ag1_f', + 'DCA_se2_ag1_m', + 'DCA_se2_ag2_m', + 'DCB_se1_ag1_m', + 'DCB_se1_ag2_f', + 'DCB_se1_ag2_m', + 'DCB_se1_ag3_f', + 'DCB_se1_ag3_m', + 'DCB_se1_ag4_f', + 'DCB_se1_ag4_m', + 'DCB_se2_ag1_f', + 'DCB_se2_ag1_m', + 'DCB_se2_ag2_f', + 'LES_se0_ag2_f', + 'LES_se0_ag2_m', + 'PRV_se0_ag1_f', + 'PRV_se0_ag2_f', + 'ROC_se0_ag1_m', + 'ROC_se0_ag2_f', + 'VLD_se0_ag2_f', + 'VLD_se0_ag2_m', + ] + ) + self.split_map["dev"] = set( + [ + 'ATL_se0_ag1_f', + 'DCA_se1_ag1_m', + 'DCB_se1_ag1_f', + 'LES_se0_ag3_f', + 'PRV_se0_ag1_m', + 'ROC_se0_ag1_f', + 'VLD_se0_ag3_f', + ] + ) + self.split_map["test"] = set( + [ + 'ATL_se0_ag2_f', + 'ATL_se0_ag2_m', + 'DCA_se2_ag3_m', + 'DCA_se2_ag4_f', + 'DCA_se2_ag4_m', + 'DCA_se3_ag1_f', + 'DCA_se3_ag1_m', + 'DCA_se3_ag2_f', + 'DCA_se3_ag2_m', + 'DCA_se3_ag3_f', + 'DCA_se3_ag3_m', + 'DCA_se3_ag4_m', + 'DCB_se2_ag2_m', + 'DCB_se2_ag3_f', + 'DCB_se2_ag3_m', + 'DCB_se2_ag4_f', + 'DCB_se2_ag4_m', + 'DCB_se3_ag1_f', + 'DCB_se3_ag1_m', + 'DCB_se3_ag2_f', + 'DCB_se3_ag3_f', + 'DCB_se3_ag3_m', + 'DCB_se3_ag4_f', + 'DCB_se3_ag4_m', + 'LES_se0_ag3_m', + 'LES_se0_ag4_f', + 'LES_se0_ag4_m', + 'PRV_se0_ag2_m', + 'PRV_se0_ag3_f', + 'PRV_se0_ag3_m', + 'ROC_se0_ag2_m', + 'ROC_se0_ag3_f', + 'ROC_se0_ag3_m', + 'VLD_se0_ag3_m', + 'VLD_se0_ag4_f', + 'VLD_se0_ag4_m', + ] + ) + + def process_dataset_entry(self, data_entry): + if data_entry["original_file"][:-5] in self.split_map[self.data_split]: + return [DataEntry(data=data_entry)] + return [] diff --git a/build/lib/sdp/processors/datasets/fleurs/__init__.py b/build/lib/sdp/processors/datasets/fleurs/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/build/lib/sdp/processors/datasets/fleurs/create_initial_manifest.py b/build/lib/sdp/processors/datasets/fleurs/create_initial_manifest.py new file mode 100644 index 00000000..d571593a --- /dev/null +++ b/build/lib/sdp/processors/datasets/fleurs/create_initial_manifest.py @@ -0,0 +1,150 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import fnmatch +import glob +import json +import os +import shutil +import typing +from urllib.parse import parse_qs, urlparse + +from sdp.processors.base_processor import BaseProcessor, DataEntry +from sdp.utils.common import download_file, extract_archive + + +def get_fleurs_url_list(lang: str, split: str) -> list[str]: + # examples + # "https://huggingface.co/datasets/google/fleurs/resolve/main/data/hy_am/audio/dev.tar.gz", + # "https://huggingface.co/datasets/google/fleurs/resolve/main/data/hy_am/dev.tsv" + + urls = [] + base_url = "https://huggingface.co/datasets/google/fleurs/resolve/main/data" + + base_lang_url = os.path.join(base_url, lang) + tsv_url = f"{base_lang_url}/{split}.tsv" + urls.append(tsv_url) + + tar_gz_url = f"{base_lang_url}/audio/{split}.tar.gz" + urls.append(tar_gz_url) + + return urls + + +class CreateInitialManifestFleurs(BaseProcessor): + """ + Processor to create initial manifest for the FLEURS dataset. + + Dataset link: https://huggingface.co/datasets/google/fleurs + + Will download all files, extract them, and create a manifest file with the + "audio_filepath" and "text" fields. + + Args: + lang (str): Language to be processed, identified by a combination of ISO 639-1 and ISO 3166-1 alpha-2 codes. + Examples are: + + - ``"hy_am"`` for Armenian + - ``"ko_kr"`` for Korean + + split (str): Which dataset splits to process. + Options are: + + - ``"test"`` + - ``"train"`` + - ``"dev"`` + + raw_data_dir (str): Path to the folder where the data archive should be downloaded and extracted. + + Returns: + This processor generates an initial manifest file with the following fields:: + + { + "audio_filepath": , + "text": , + } + """ + + def __init__( + self, + lang: str, + split: str, + raw_data_dir: str, + **kwargs, + ): + super().__init__(**kwargs) + self.lang = lang + self.split = split + self.raw_data_dir = raw_data_dir + + def process_transcript(self, file_path: str) -> list[dict[str, typing.Any]]: + """ + Parse transcript TSV file and put it inside manifest. + Assumes the TSV file has two columns: file name and text. + """ + + entries = [] + root = os.path.dirname(file_path) + + with open(file_path, encoding="utf-8") as fin: + for line in fin: + # Split the line into filename text using the tab delimiter + parts = line.strip().split('\t') + if len(parts) < 2: # Skip lines that don't have at least 2 parts + continue + + file_name, transcript_text = parts[1], parts[2] + wav_file = os.path.join(root, file_name) + + entry = {"audio_filepath": os.path.abspath(wav_file), "text": transcript_text} + entries.append(entry) + + return entries + + def process_data(self, data_folder: str, manifest_file: str) -> None: + entries = self.process_transcript(os.path.join(data_folder, self.split + "/" + self.split + ".tsv")) + + with open(manifest_file, "w", encoding="utf-8") as fout: + for m in entries: + fout.write(json.dumps(m, ensure_ascii=False) + "\n") + + def download_extract_files(self, dst_folder: str) -> None: + """downloading and extracting files""" + + os.makedirs(dst_folder, exist_ok=True) + + # downloading all files + for file_url in get_fleurs_url_list(self.lang, self.split): + download_file(file_url, str(dst_folder)) + + extract_archive(f'{dst_folder}/{self.split}.tar.gz', str(dst_folder), force_extract=True) + + # Organizing files into their respective folders + target_folder = os.path.join(dst_folder, self.split) + + file_name = f"{self.split}.tsv" + + file_path = os.path.join(dst_folder, file_name) + dest_file_path = os.path.join(target_folder, file_name) + + if not os.path.exists(dest_file_path): + shutil.move(file_path, dest_file_path) + print(f'Moved {file_path} to {dest_file_path}') + else: + os.remove(file_path) + print(f'File {file_name} already exists in {target_folder}, deleted from source.') + + def process(self): + self.download_extract_files(self.raw_data_dir) + self.process_data(self.raw_data_dir, self.output_manifest_file) diff --git a/build/lib/sdp/processors/datasets/ksc2/__init__.py b/build/lib/sdp/processors/datasets/ksc2/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/build/lib/sdp/processors/datasets/ksc2/create_initial_manifest.py b/build/lib/sdp/processors/datasets/ksc2/create_initial_manifest.py new file mode 100644 index 00000000..3bde174b --- /dev/null +++ b/build/lib/sdp/processors/datasets/ksc2/create_initial_manifest.py @@ -0,0 +1,150 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# To convert mp3 files to wav using sox, you must have installed sox with mp3 support +# For example sudo apt-get install libsox-fmt-mp3 +import csv +import glob +import os +from collections import defaultdict +from pathlib import Path +from typing import Dict, Tuple + +from sox import Transformer +from tqdm.contrib.concurrent import process_map + +from sdp.logging import logger +from sdp.processors.base_processor import BaseParallelProcessor, DataEntry +from sdp.utils.common import download_file, extract_archive + + +class CreateInitialManifestKSC2(BaseParallelProcessor): + """Processor to create initial manifest for the Kazakh Speech Corpus (KSC) 2. + + The dataset should be requested via Google Forms, which can be found here https://issai.nu.edu.kz/kz-speech-corpus/. + + Extracts raw data for the specified language and creates an initial manifest + using the transcripts provided in the raw data. + + Args: + raw_data_dir (str): the path to the directory containing the raw data archive file. + extract_archive_dir (str): directory where the extracted data will be saved. + resampled_audio_dir (str): directory where the resampled audio will be saved. + data_split (str): "train", "dev" or "test". + target_samplerate (int): sample rate (Hz) to use for resampling. + Defaults to 16000. + target_nchannels (int): number of channels to create during resampling process. + Defaults to 1. + Returns: + This processor generates an initial manifest file with the following fields: + + { + "audio_filepath": , + "text": , + "source": , + } + """ + + def __init__( + self, + raw_data_dir: str, + extract_archive_dir: str, + resampled_audio_dir: str, + data_split: str, + target_samplerate: int = 16000, + target_nchannels: int = 1, + **kwargs, + ): + super().__init__(**kwargs) + self.raw_data_dir = Path(raw_data_dir) + self.extract_archive_dir = extract_archive_dir + self.resampled_audio_dir = resampled_audio_dir + self.data_split = data_split + self.target_samplerate = target_samplerate + self.target_nchannels = target_nchannels + + def prepare(self): + """Extracting data (unless already done).""" + os.makedirs(self.raw_data_dir, exist_ok=True) + + tar_gz_files = glob.glob(str(self.raw_data_dir) + f"/*.tar.gz") + + if not tar_gz_files: + raise RuntimeError( + f"Did not find any file matching {self.raw_data_dir}/*.tar.gz. " + "For KSC2 dataset we cannot automatically download the data, so " + "make sure to get the data manually" + "and put it in the 'raw_data_dir' folder." + ) + + elif len(tar_gz_files) > 1: + raise RuntimeError(f"Expecting exactly one *.tar.gz file in directory {self.raw_data_dir}") + + data_folder = extract_archive(tar_gz_files[0], self.extract_archive_dir) + + if self.data_split.capitalize() not in data_folder: + self.data_split_dir = Path(data_folder, self.data_split.capitalize()) + else: + self.data_split_dir = Path(data_folder) + + os.makedirs(self.resampled_audio_dir, exist_ok=True) + + def read_manifest(self): + if self.data_split_dir is None: + raise RuntimeError("self.process has to be called before processing the data.") + + dataset_entries = [] + + without_text = defaultdict(int) + + for audio_filepath in self.data_split_dir.rglob('*.flac'): + filename = audio_filepath.stem + source = audio_filepath.relative_to(self.data_split_dir).parents[0].as_posix() + + transcribed_filename = Path(audio_filepath.parent, filename).with_suffix('.txt') + + if transcribed_filename.exists(): + with open(transcribed_filename, "rt", encoding="utf8") as txtfile: + text = ' '.join(txtfile.readlines()) + elif transcribed_filename.with_suffix('.txt.txt').exists(): + transcribed_filename = transcribed_filename.with_suffix('.txt.txt') + with open(transcribed_filename, "rt", encoding="utf8") as txtfile: + text = ' '.join(txtfile.readlines()) + else: + without_text[audio_filepath.parent] += 1 + continue + + entry = {'audio_filepath': audio_filepath.as_posix(), 'text': text, 'source': source} + + dataset_entries.append(entry) + + logger.info(f"Without text entries -> {without_text}") + + return dataset_entries + + def process_dataset_entry(self, data_entry: Dict): + wav_source_dir = Path(self.resampled_audio_dir, data_entry['source']) + wav_source_dir.mkdir(exist_ok=True) + + output_wav_path = Path(wav_source_dir, Path(data_entry['audio_filepath']).stem).with_suffix(".wav") + + if not os.path.exists(output_wav_path): + tfm = Transformer() + tfm.rate(samplerate=self.target_samplerate) + tfm.channels(n_channels=self.target_nchannels) + tfm.build(input_filepath=data_entry['audio_filepath'], output_filepath=output_wav_path) + + data_entry['audio_filepath'] = output_wav_path.as_posix() + + return [DataEntry(data=data_entry)] diff --git a/build/lib/sdp/processors/datasets/lhotse.py b/build/lib/sdp/processors/datasets/lhotse.py new file mode 100644 index 00000000..01f54d44 --- /dev/null +++ b/build/lib/sdp/processors/datasets/lhotse.py @@ -0,0 +1,83 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json + +from sdp.processors.base_processor import BaseProcessor + + +class LhotseImport(BaseProcessor): + """Processor to create an initial manifest imported from a Lhotse CutSet. + The ``input_manifest_file`` is expected to point to a Lhotse CutSet manifest, + which usually has ``cuts`` in its name and a ``.jsonl`` or ``.jsonl.gz`` extension. + + Lhotse is a library for speech data processing and loading; see: + + * https://github.com/lhotse-speech/lhotse + * https://lhotse.readthedocs.io + + It can be installed using ``pip install lhotse``. + + .. caution:: Currently we only support the importing of cut sets that represent + single-channel, single-audio-file-per-utterance datasets. + + Returns: + This processor generates an initial manifest file with the following fields:: + + { + "audio_filepath": , + "duration": , + "text": , + } + """ + + def process(self): + from lhotse import CutSet + + cuts = CutSet.from_file(self.input_manifest_file) + with open(self.output_manifest_file, "w") as f: + for cut in cuts: + self.check_entry(cut) + data = { + "audio_filepath": cut.recording.sources[0].source, + "duration": cut.duration, + "lhotse_cut_id": cut.id, + } + for meta in ("text", "speaker", "gender", "language"): + if (item := getattr(cut.supervisions[0], meta)) is not None: + data[meta] = item + if (custom := cut.supervisions[0].custom) is not None: + data.update(custom) + print(json.dumps(data), file=f) + + def check_entry(self, cut) -> None: + from lhotse import MonoCut + + assert isinstance( + cut, MonoCut + ), f"Currently, only MonoCut import is supported. Received: {cut}" + assert ( + cut.has_recording + ), f"Currently, we only support cuts with recordings. Received: {cut}" + assert ( + cut.recording.num_channels == 1 + ), f"Currently, we only supports recordings with a single channel. Received: {cut}" + assert ( + len(cut.recording.sources) == 1 + ), f"Currently, we only support recordings with a single AudioSource. Received: {cut}" + assert ( + cut.recording.sources[0].type == "file" + ), f"Currently, we only suppport AudioSources of type='file'. Received: {cut}" + assert ( + len(cut.supervisions) == 1 + ), f"Currently, we only support cuts with a single supervision. Received: {cut}" diff --git a/build/lib/sdp/processors/datasets/librispeech/__init__.py b/build/lib/sdp/processors/datasets/librispeech/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/build/lib/sdp/processors/datasets/librispeech/create_initial_manifest.py b/build/lib/sdp/processors/datasets/librispeech/create_initial_manifest.py new file mode 100644 index 00000000..83d42bde --- /dev/null +++ b/build/lib/sdp/processors/datasets/librispeech/create_initial_manifest.py @@ -0,0 +1,140 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import fnmatch +import glob +import json +import os +import typing + +from sdp.processors.base_processor import BaseProcessor +from sdp.utils.common import download_file, extract_archive + + +def get_librispeech_url_list(split: str) -> str: + urls = { + "dev-clean": "https://openslr.org/resources/12/dev-clean.tar.gz", + "dev-other": "https://openslr.org/resources/12/dev-other.tar.gz", + "test-clean": "https://openslr.org/resources/12/test-clean.tar.gz", + "test-other": "https://openslr.org/resources/12/test-other.tar.gz", + "train-clean-100": "https://openslr.org/resources/12/train-clean-100.tar.gz", + "train-clean-360": "https://openslr.org/resources/12/train-clean-360.tar.gz", + "train-other-500": "https://openslr.org/resources/12/train-other-500.tar.gz", + "dev-clean-2": "https://www.openslr.org/resources/31/dev-clean-2.tar.gz", + "train-clean-5": "https://www.openslr.org/resources/31/train-clean-5.tar.gz", + } + + if split not in urls: + valid_splits = ", ".join(urls.keys()) + raise ValueError(f"Invalid dataset split '{split}'. Valid options are: {valid_splits}") + + return urls[split] + + +class CreateInitialManifestLibrispeech(BaseProcessor): + """Processor to create initial manifest for the Librispeech dataset. + + Dataset link: https://openslr.org/12 + Dataset link: https://openslr.org/31 + + Will download all files, extract tars, and create a manifest file with the + "audio_filepath" and "text" fields. + + Args: + split (str): Which datasets or their combinations should be processed. + Options are: + + - ``"dev-clean"`` + - ``"dev-other"`` + - ``"test-clean"`` + - ``"test-other"`` + - ``"train-clean-100"`` + - ``"train-clean-360"`` + - ``"train-other-500"`` + - ``"dev-clean-2"`` + - ``"train-clean-5"`` + + raw_data_dir (str): Path to the folder where the data archive should be downloaded and extracted. + + Returns: + This processor generates an initial manifest file with the following fields:: + + { + "audio_filepath": , + "text": , + } + """ + + def __init__( + self, + split: str, + raw_data_dir: str, + **kwargs, + ): + super().__init__(**kwargs) + self.split = split + self.raw_data_dir = raw_data_dir + + def process_transcript(self, file_path: str) -> list[dict[str, typing.Any]]: + """Parse transcript file and put it inside manifest + We assume that flac files are located in the same directory as transcript file. + """ + + entries = [] + root = os.path.dirname(file_path) + + print(f"Processing transcript file: {file_path}") + with open(file_path, encoding="utf-8") as fin: + for line in fin: + id, text = line[: line.index(" ")], line[line.index(" ") + 1 :] + transcript_text = text.strip() + + flac_file = os.path.join(root, id + ".flac") + + entry = {} + entry["audio_filepath"] = os.path.abspath(flac_file) + entry["text"] = transcript_text + entries.append(entry) + return entries + + def process_data(self, data_folder: str, manifest_file: str) -> None: + split_folder = os.path.join(data_folder, "LibriSpeech", self.split) + files = [] + entries = [] + if not os.path.exists(split_folder): + raise FileNotFoundError(f"Directory for split '{self.split}' not found at {split_folder}") + + for root, _, filenames in os.walk(split_folder): + for filename in fnmatch.filter(filenames, "*.trans.txt"): + files.append(os.path.join(root, filename)) + + for file in files: + entries.extend(self.process_transcript(file)) + + with open(manifest_file, "w") as fout: + for entry in entries: + fout.write(json.dumps(entry) + "\n") + + def download_extract_files(self, dst_folder: str) -> None: + """downloading and extracting files""" + + os.makedirs(dst_folder, exist_ok=True) + + download_file(get_librispeech_url_list(self.split), str(dst_folder)) + data_file = f'{dst_folder}/{self.split}.tar.gz' + extract_archive(str(data_file), str(dst_folder), force_extract=True) + + def process(self): + self.download_extract_files(self.raw_data_dir) + self.process_data(self.raw_data_dir, self.output_manifest_file) diff --git a/build/lib/sdp/processors/datasets/masc/__init__.py b/build/lib/sdp/processors/datasets/masc/__init__.py new file mode 100644 index 00000000..82fd7b35 --- /dev/null +++ b/build/lib/sdp/processors/datasets/masc/__init__.py @@ -0,0 +1,18 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .create_initial_manifest import CreateInitialManifestMASC +from .aggregate_segments import AggregateSegments +from .apply_reg_exp_on_vtt_entries import RegExpVttEntries +from .get_caption_file_segments import GetCaptionFileSegments diff --git a/build/lib/sdp/processors/datasets/masc/aggregate_segments.py b/build/lib/sdp/processors/datasets/masc/aggregate_segments.py new file mode 100644 index 00000000..8db51046 --- /dev/null +++ b/build/lib/sdp/processors/datasets/masc/aggregate_segments.py @@ -0,0 +1,131 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import logging +from pydub import AudioSegment +from sdp.processors.base_processor import BaseParallelProcessor, DataEntry +from sdp.processors.datasets.masc.utils import save_audio_segment + +class AggregateSegments(BaseParallelProcessor): + """ + Aggregates short segments into segments with duration not longer than `max_duration`. + The algorithm works by iterating from left to right, merging consecutive segments into the current segment until the total duration reaches `max_duration`. + + output_audio_dir (str): Directory where aggregated audio segments will be stored, if `save_aggregated_audio_segments` is True. + If `save_aggregated_audio_segments` is False, this path is used to create the audio file paths in the manifest. + input_segments_key (str): The field name that contains list of segments in the input manifest. Defaults to "segments". + input_audio_filepath_key (str): The field name that contains paths to the audio files in the input manifest. + Defaults to "audio_filepath". + output_text_key (str): Field name where to store aggregated segment text. Defaults to "text". + output_duration_key (str): Field name where aggregated segment durations will be stored. Defaults to "duration". + output_audio_filepath_key (str): Field name where aggregated segment audio file paths will be stored. + Defaults to "audio_filepath". + max_duration (float): Maximum duration of aggregated segment. Default to 20.0s. + save_aggregated_audio_segments (bool): Flag indicating whether to crop audio files according to the aggregated segments. + Defaults to True. + verbose (bool): Set to True to enable more detailed logging. Defaults to False. + """ + def __init__( + self, + output_audio_dir: str, + input_segments_key: str = "segments", + input_audio_filepath_key: str = "audio_filepath", + output_text_key: str = "text", + output_duration_key: str = "duration", + output_audio_filepath_key: str = "audio_filepath", + max_duration: float = 20.0, + save_aggregated_audio_segments: bool = True, + verbose: bool = False, + **kwargs, + ): + super().__init__(**kwargs) + self.max_duration = max_duration + self.input_audio_filepath_key = input_audio_filepath_key + self.output_splitted_audio_filepath_key = output_audio_filepath_key + self.save_aggregated_audio_segments = save_aggregated_audio_segments + self.output_audio_dir = output_audio_dir + self.input_segments_key = input_segments_key + self.verbose = verbose + self.output_text_key = output_text_key + self.output_duration_key = output_duration_key + + def prepare(self): + if self.save_aggregated_audio_segments and self.output_audio_dir: + os.makedirs(os.path.join(self.output_audio_dir), exist_ok=True) + + def process_dataset_entry(self, data_entry: dict): + if self.input_segments_key not in data_entry: + if self.verbose: + logging.info(f"No segments in the sample {data_entry[self.input_audio_filepath_key]}.") + return [] + + segments = data_entry[self.input_segments_key] + if len(segments) == 0: + return [] + + audio = AudioSegment.from_wav(data_entry[self.input_audio_filepath_key]) + + audio_basename = os.path.basename(data_entry[self.input_audio_filepath_key]).split(".")[0] + agg_segments = [] + aggregated_segment = {**segments[0]} + for segment in segments[1:]: + # checking if adding another segment will cause the total duration to exceed max_duration + if (segment["end_time"] > audio.duration_seconds or segment["start_time"] > audio.duration_seconds): + continue + + start_time = min(segment["start_time"], aggregated_segment["start_time"]) + end_time = max(segment["end_time"], aggregated_segment["end_time"]) + if end_time - start_time >= self.max_duration: + agg_segments.append(aggregated_segment) + aggregated_segment = {**segment} + else: + # updating aggregated segment text with correct order of segments. + if aggregated_segment["start_time"] < segment["start_time"]: + aggregated_segment["text"] += f" {segment['text']}".strip() + else: + aggregated_segment["text"] = f"{segment['text']} {aggregated_segment['text']}" + + aggregated_segment["start"] = start_time # updating aggregated segment start time + aggregated_segment["end_time"] = end_time # updating aggregated segment end time + else: + # adding the last aggregated segment + if aggregated_segment not in agg_segments: + agg_segments.append(aggregated_segment) + + valid_segments = [] + for aggregated_segment in agg_segments: + aggregated_segment.update(data_entry) + + start_time = aggregated_segment.pop("start_time") + end_time = aggregated_segment.pop("end_time") + + aggregated_segment[self.output_duration_key] = end_time - start_time + aggregated_segment[self.output_splitted_audio_filepath_key] = os.path.join(self.output_audio_dir, f"{audio_basename}_{start_time}_{end_time}.wav") + + if self.save_aggregated_audio_segments: + try: + save_audio_segment( + audio=audio, + start_time=start_time, + end_time=end_time, + output_audio_filepath=aggregated_segment[self.output_splitted_audio_filepath_key] + ) + valid_segments.append(aggregated_segment) + except IndexError as e: + if self.verbose: + logging.warning(f"Invalid segment boundaries in {audio_basename}. Skipping...") + + return [DataEntry(data=segment) for segment in valid_segments] + \ No newline at end of file diff --git a/build/lib/sdp/processors/datasets/masc/apply_reg_exp_on_vtt_entries.py b/build/lib/sdp/processors/datasets/masc/apply_reg_exp_on_vtt_entries.py new file mode 100644 index 00000000..541e98eb --- /dev/null +++ b/build/lib/sdp/processors/datasets/masc/apply_reg_exp_on_vtt_entries.py @@ -0,0 +1,74 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +import webvtt # pip install webvtt-py +from typing import Dict +from sdp.processors.base_processor import BaseParallelProcessor, DataEntry + + +class RegExpVttEntries(BaseParallelProcessor): + """ + Applies regular expressions on entries of a .vtt (WebVTT) file and stores the processed file in the specified directory. + + Args:: + input_filepath_key (str): Key that stores path to the input `.vtt` file. + output_filtered_vtt_dir (str): Directory where the processed `.vtt` files will be stored. + output_filepath_key (str): Key to store the output `.vtt` file path. + + Returns:: + Manifest with additional field: + { + "output_filepath_key": + } + """ + + def __init__( + self, + regex_params: Dict, + input_filepath_key: str = "vtt_filepath", + output_filtered_vtt_dir: str = "filtered_vtt_filepath", + output_filepath_key: str = "filtered_vtt_filepath", + **kwargs, + ): + super().__init__(**kwargs) + self.input_filepath_key = input_filepath_key + self.output_filepath_key = output_filepath_key + self.output_filtered_vtt_dir = output_filtered_vtt_dir + self.regex_params = regex_params + + def prepare(self): + os.makedirs(self.output_filtered_vtt_dir, exist_ok=True) + + def process_dataset_entry(self, data_entry): + try: + vtt = webvtt.read(data_entry[self.input_filepath_key]) + + for caption in vtt: + caption.text = re.sub( + pattern=self.regex_params["pattern"], + repl=self.regex_params["repl"], + string=caption.text, + count=self.regex_params.get("count", 0), + ) + + basename = os.path.basename(data_entry[self.input_filepath_key]) + filtered_vtt_filepath = os.path.join(self.output_filtered_vtt_dir, basename) + data_entry[self.output_filepath_key] = filtered_vtt_filepath + + vtt.save(filtered_vtt_filepath) + return [DataEntry(data=data_entry)] + except: + return [DataEntry(data=None)] diff --git a/build/lib/sdp/processors/datasets/masc/create_initial_manifest.py b/build/lib/sdp/processors/datasets/masc/create_initial_manifest.py new file mode 100644 index 00000000..9563f723 --- /dev/null +++ b/build/lib/sdp/processors/datasets/masc/create_initial_manifest.py @@ -0,0 +1,174 @@ +# Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import logging +from pathlib import Path +import pandas as pd +from sox import Transformer + +from sdp.processors.base_processor import BaseParallelProcessor, DataEntry +from sdp.utils.common import extract_archive + +class CreateInitialManifestMASC(BaseParallelProcessor): + """ + Processor for creating initial manifest for Massive Arabic Speech Corpus (MASC). \n + Dataset link: https://ieee-dataport.org/open-access/masc-massive-arabic-speech-corpus. + Prior to calling processor download the tarred dataset and store it under `raw_dataset_dir/masc.tar.gz`. + + Creates manifest from samples in . `dataset_dir/subsets/data_split.csv`. All meta information is kept. + + Args: + raw_data_dir (str): The root directory of the dataset. + extract_archive_dir (str): Directory where the extracted data will be saved. + resampled_audios_dir (str): Directory where the resampled audio will be saved. + data_split (str): Dataset split type. + already_extracted (bool): If True, we will not try to extract the raw data. Defaults to False. + target_samplerate (int): Sample rate (Hz) to use for resampling. Defaults to 16000. + target_nchannels (int): Number of channels to create during resampling process. Defaults to 1. + output_manifest_sample_id_key (str): The field name to store sample ID. Defaults to "sample_id". + output_manifest_vtt_filapath_key (str): The field name to store vtt file path. Defaults to "vtt_filepath". + output_manifest_audio_filapath_key (str): The field name to store audio file path. Defaults to "audio_filepath". + verbose (bool): Set to True for more detailed logging. + **kwargs: Additional keyword arguments to be passed to the base class `BaseParallelProcessor`. + + Returns: + This processor generates an initial manifest file with the following fields:: + + { + "sample_id": + "audio_filepath": , + "vtt_filepath": , + "category":