maxseats/aihub-464-bracket-test-tmp
收藏Hugging Face2024-06-12 更新2024-06-29 收录
下载链接:
https://hf-mirror.com/datasets/maxseats/aihub-464-bracket-test-tmp
下载链接
链接失效反馈官方服务:
资源简介:
---
dataset_info:
features:
- name: input_features
sequence:
sequence: float32
- name: labels
dtype: string
splits:
- name: train
num_bytes: 1271628697
num_examples: 1324
- name: test
num_bytes: 159433806
num_examples: 166
- name: valid
num_bytes: 158474204
num_examples: 165
download_size: 343908842
dataset_size: 1589536707
configs:
- config_name: default
data_files:
- split: train
path: data/train-*
- split: test
path: data/test-*
- split: valid
path: data/valid-*
---
# 실행한 코드는 다음과 같아요
```
import os
import json
from pydub import AudioSegment
from tqdm import tqdm
import re
from datasets import Audio, Dataset, DatasetDict
from transformers import WhisperFeatureExtractor, WhisperTokenizer
import pandas as pd
# 사용자 지정 변수를 설정해요.
DATA_DIR = '/mnt/a/maxseats/(주의-원본-680GB)주요 영역별 회의 음성인식 데이터' # 데이터셋이 저장된 폴더
# 원천, 라벨링 데이터 폴더 지정
json_base_dir = DATA_DIR
audio_base_dir = DATA_DIR
output_dir = '/mnt/a/maxseats/(주의-원본)clips' # 가공된 데이터셋이 저장될 폴더
token = "hf_!" # 허깅페이스 토큰
CACHE_DIR = '/mnt/a/maxseats/.cache' # 허깅페이스 캐시 저장소 지정
dataset_name = "maxseats/aihub-464-preprocessed-680GB" # 허깅페이스에 올라갈 데이터셋 이름
model_name = "SungBeom/whisper-small-ko" # 대상 모델 / "openai/whisper-base"
DATA_DIR = '/mnt/a/maxseats-git/New_Sample'
json_base_dir = DATA_DIR
audio_base_dir = DATA_DIR
output_dir = '/mnt/a/maxseats-git/clips'
dataset_name = "maxseats/aihub-464-bracket-test-tmp"
'''
데이터셋 경로를 지정해서
하나의 폴더에 mp3, txt 파일로 추출해요.
추출 과정에서 원본 파일은 자동으로 삭제돼요. (저장공간 절약을 위해)
'''
def bracket_preprocess(text):
# 정규 표현식을 사용하여 패턴 제거
text = re.sub(r'/\([^\)]+\)', '', text) # /( *) 패턴 제거, /(...) 형식 제거
text = re.sub(r'[()]', '', text) # 개별적으로 등장하는 ( 및 ) 제거
return text.strip()
def process_audio_and_subtitle(json_path, audio_base_dir, output_dir):
# JSON 파일 읽기
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 메타데이터에서 오디오 파일 이름 추출
title = data['metadata']['title']
# 각 TS, VS 폴더에서 해당 오디오 파일을 찾기
audio_file = None
for root, _, files in os.walk(audio_base_dir):
for file in files:
if file == title + '.wav':
audio_file = os.path.join(root, file)
break
if audio_file:
break
# 오디오 파일 로드
if not audio_file or not os.path.exists(audio_file):
print(f"Audio file {audio_file} does not exist.")
return
audio = AudioSegment.from_mp3(audio_file)
# 발화 데이터 처리
for utterance in data['utterance']:
start_time = float(utterance['start']) * 1000 # 밀리초로 변환
end_time = float(utterance['end']) * 1000 # 밀리초로 변환
text = bracket_preprocess(utterance['form']) # 괄호 전처리
if not text: # 비어 있으면 수행 x
continue
# 오디오 클립 추출
audio_clip = audio[start_time:end_time]
# 파일 이름 설정
clip_id = utterance['id']
audio_output_path = os.path.join(output_dir, clip_id + '.mp3')
text_output_path = os.path.join(output_dir, clip_id + '.txt')
# 오디오 클립 저장
audio_clip.export(audio_output_path, format='mp3')
# 괄호 전처리 텍스트 파일 저장
with open(text_output_path, 'w', encoding='utf-8') as f:
f.write(text)
# 오디오 파일 삭제
os.remove(audio_file)
os.remove(audio_file.replace('.wav', '.txt'))
print(f"Deleted audio file: {audio_file}")
def process_all_files(json_base_dir, audio_base_dir, output_dir):
json_files = []
# JSON 파일 목록 생성
for root, dirs, files in os.walk(json_base_dir):
for file in files:
if file.endswith('.json'):
json_files.append(os.path.join(root, file))
# JSON 파일 처리
for json_file in tqdm(json_files, desc="Processing JSON files"):
process_audio_and_subtitle(json_file, audio_base_dir, output_dir)
# 완료 후 JSON 파일 삭제
os.remove(json_file)
print(f"Deleted JSON file: {json_file}")
# 디렉토리 생성
os.makedirs(output_dir, exist_ok=True)
# 프로세스 실행
process_all_files(json_base_dir, audio_base_dir, output_dir)
'''
가공된 mp3, txt 데이터를 학습 가능한 허깅페이스 데이터셋 형태로 변환해요.
'''
# 캐시 디렉토리 설정
os.environ['HF_HOME'] = CACHE_DIR
os.environ["HF_DATASETS_CACHE"] = CACHE_DIR
feature_extractor = WhisperFeatureExtractor.from_pretrained(model_name, cache_dir=CACHE_DIR)
tokenizer = WhisperTokenizer.from_pretrained(model_name, language="Korean", task="transcribe", cache_dir=CACHE_DIR)
def exclude_json_files(file_names: list) -> list:
# .json으로 끝나는 원소 제거
return [file_name for file_name in file_names if not file_name.endswith('.json')]
def get_label_list(directory):
# 빈 리스트 생성
label_files = []
# 디렉토리 내 파일 목록 불러오기
for filename in os.listdir(directory):
# 파일 이름이 '.txt'로 끝나는지 확인
if filename.endswith('.txt'):
label_files.append(os.path.join(directory, filename))
return label_files
def get_audio_list(directory):
# 빈 리스트 생성
audio_files = []
# 디렉토리 내 파일 목록 불러오기
for filename in os.listdir(directory):
# 파일 이름이 '.wav'나 '.mp3'로 끝나는지 확인
if filename.endswith('.wav') or filename.endswith('mp3'):
audio_files.append(os.path.join(directory, filename))
return audio_files
def prepare_dataset(batch):
# 오디오 파일을 16kHz로 로드
audio = batch["audio"]
# input audio array로부터 log-Mel spectrogram 변환
batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]
# target text를 label ids로 변환
# batch["labels"] = tokenizer(batch["transcripts"]).input_ids
batch["labels"] = batch["transcripts"]
# 'input_features'와 'labels'만 포함한 새로운 딕셔너리 생성
return {"input_features": batch["input_features"], "labels": batch["labels"]}
label_data = get_label_list(output_dir)
audio_data = get_audio_list(output_dir)
transcript_list = []
for label in tqdm(label_data):
with open(label, 'r', encoding='UTF8') as f:
line = f.readline()
transcript_list.append(line)
df = pd.DataFrame(data=transcript_list, columns = ["transcript"]) # 정답 label
df['audio_data'] = audio_data # 오디오 파일 경로
# 오디오 파일 경로를 dict의 "audio" 키의 value로 넣고 이를 데이터셋으로 변환
# 이때, Whisper가 요구하는 사양대로 Sampling rate는 16,000으로 설정한다.
ds = Dataset.from_dict(
{"audio": [path for path in df["audio_data"]],
"transcripts": [transcript for transcript in df["transcript"]]}
).cast_column("audio", Audio(sampling_rate=16000))
# 데이터셋을 훈련 데이터와 테스트 데이터, 밸리데이션 데이터로 분할
train_testvalid = ds.train_test_split(test_size=0.2)
test_valid = train_testvalid["test"].train_test_split(test_size=0.5)
datasets = DatasetDict(
{"train": train_testvalid["train"],
"test": test_valid["test"],
"valid": test_valid["train"]}
)
datasets = datasets.map(prepare_dataset, num_proc=2)
datasets = datasets.remove_columns(['audio', 'transcripts']) # 불필요한 부분 제거
print('-'*48)
print(type(datasets))
print(datasets)
print('-'*48)
'''
허깅페이스 로그인 후, 최종 데이터셋을 업로드해요.
'''
datasets.save_to_disk('/mnt/a/maxseats/preprocessed_cache.arrow')
# datasets.push_to_hub(dataset_name, token=token)
while True:
if token =="exit":
break
try:
datasets.push_to_hub(dataset_name, token=token)
print(f"Dataset {dataset_name} pushed to hub successfully. 넘나 축하.")
break
except Exception as e:
print(f"Failed to push dataset: {e}")
token = input("Please enter your Hugging Face API token: ")
```
---
dataset_info: 数据集信息
特征(features):
- 名称:input_features,类型为序列,序列元素为float32格式
- 名称:labels,数据类型为字符串(string)
数据集划分(splits):
- 划分名称:train,占用字节数:1271628697,样本数量:1324
- 划分名称:test,占用字节数:159433806,样本数量:166
- 划分名称:valid,占用字节数:158474204,样本数量:165
下载大小(download_size):343908842
数据集总大小(dataset_size):1589536707
配置项(configs):
- 配置名称:default,数据文件路径:
- 训练集(train):data/train-*
- 测试集(test):data/test-*
- 验证集(valid):data/valid-*
---
# 执行代码如下
python
import os
import json
from pydub import AudioSegment
from tqdm import tqdm
import re
from datasets import Audio, Dataset, DatasetDict
from transformers import WhisperFeatureExtractor, WhisperTokenizer
import pandas as pd
# 设置自定义变量
DATA_DIR = '/mnt/a/maxseats/(注意-原始-680GB)主要领域会议语音识别数据' # 数据集存储目录
# 指定原始数据与标注数据目录
json_base_dir = DATA_DIR
audio_base_dir = DATA_DIR
output_dir = '/mnt/a/maxseats/(注意-原始)clips' # 加工后数据集存储目录
token = "hf_!" # Hugging Face令牌
CACHE_DIR = '/mnt/a/maxseats/.cache' # 指定Hugging Face缓存目录
dataset_name = "maxseats/aihub-464-preprocessed-680GB" # 将上传至Hugging Face的数据集名称
model_name = "SungBeom/whisper-small-ko" # 目标模型 / 可选"openai/whisper-base"
DATA_DIR = '/mnt/a/maxseats-git/New_Sample'
json_base_dir = DATA_DIR
audio_base_dir = DATA_DIR
output_dir = '/mnt/a/maxseats-git/clips'
dataset_name = "maxseats/aihub-464-bracket-test-tmp"
'''
指定数据集路径,将数据提取为单个目录下的mp3与txt文件。
提取过程中原文件将自动删除,以节省存储空间。
'''
def bracket_preprocess(text):
# 使用正则表达式移除指定模式文本
text = re.sub(r'/([^)]+)', '', text) # 移除形如/(...)的文本模式
text = re.sub(r'[()]', '', text) # 移除单独出现的左右括号
return text.strip()
def process_audio_and_subtitle(json_path, audio_base_dir, output_dir):
# 读取JSON文件
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# 从元数据中提取音频文件名
title = data['metadata']['title']
# 遍历TS、VS子目录,查找对应音频文件
audio_file = None
for root, _, files in os.walk(audio_base_dir):
for file in files:
if file == title + '.wav':
audio_file = os.path.join(root, file)
break
if audio_file:
break
# 加载音频文件
if not audio_file or not os.path.exists(audio_file):
print(f"Audio file {audio_file} does not exist.")
return
audio = AudioSegment.from_mp3(audio_file)
# 处理每一段发言数据
for utterance in data['utterance']:
start_time = float(utterance['start']) * 1000 # 转换为毫秒单位
end_time = float(utterance['end']) * 1000 # 转换为毫秒单位
text = bracket_preprocess(utterance['form']) # 执行括号预处理
if not text: # 若文本为空则跳过
continue
# 提取音频片段
audio_clip = audio[start_time:end_time]
# 设置输出文件名
clip_id = utterance['id']
audio_output_path = os.path.join(output_dir, clip_id + '.mp3')
text_output_path = os.path.join(output_dir, clip_id + '.txt')
# 保存音频片段为mp3格式
audio_clip.export(audio_output_path, format='mp3')
# 保存预处理后的文本为txt文件
with open(text_output_path, 'w', encoding='utf-8') as f:
f.write(text)
# 删除原始音频文件
os.remove(audio_file)
os.remove(audio_file.replace('.wav', '.txt'))
print(f"Deleted audio file: {audio_file}")
def process_all_files(json_base_dir, audio_base_dir, output_dir):
json_files = []
# 生成所有JSON文件列表
for root, dirs, files in os.walk(json_base_dir):
for file in files:
if file.endswith('.json'):
json_files.append(os.path.join(root, file))
# 批量处理所有JSON文件
for json_file in tqdm(json_files, desc="Processing JSON files"):
process_audio_and_subtitle(json_file, audio_base_dir, output_dir)
# 处理完成后删除原始JSON文件
os.remove(json_file)
print(f"Deleted JSON file: {json_file}")
# 创建输出目录
os.makedirs(output_dir, exist_ok=True)
# 执行批量处理流程
process_all_files(json_base_dir, audio_base_dir, output_dir)
'''
将加工后的mp3与txt数据转换为可用于训练的Hugging Face数据集格式
'''
# 设置缓存目录
os.environ['HF_HOME'] = CACHE_DIR
os.environ["HF_DATASETS_CACHE"] = CACHE_DIR
feature_extractor = WhisperFeatureExtractor.from_pretrained(model_name, cache_dir=CACHE_DIR)
tokenizer = WhisperTokenizer.from_pretrained(model_name, language="Korean", task="transcribe", cache_dir=CACHE_DIR)
def exclude_json_files(file_names: list) -> list:
# 移除列表中以.json结尾的文件
return [file_name for file_name in file_names if not file_name.endswith('.json')]
def get_label_list(directory):
# 初始化空列表
label_files = []
# 遍历目录获取所有文件
for filename in os.listdir(directory):
# 筛选后缀为txt的文件作为标签文件
if filename.endswith('.txt'):
label_files.append(os.path.join(directory, filename))
return label_files
def get_audio_list(directory):
# 初始化空列表
audio_files = []
# 遍历目录获取所有文件
for filename in os.listdir(directory):
# 筛选后缀为wav或mp3的文件作为音频文件
if filename.endswith('.wav') or filename.endswith('mp3'):
audio_files.append(os.path.join(directory, filename))
return audio_files
def prepare_dataset(batch):
# 加载音频文件并统一采样率为16kHz
audio = batch["audio"]
# 将音频数组转换为log-Mel频谱图作为输入特征
batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]
# 将目标文本转换为标签ID(此处临时保留原始文本作为标签)
# batch["labels"] = tokenizer(batch["transcripts"]).input_ids
batch["labels"] = batch["transcripts"]
# 仅保留输入特征与标签,构建新的批次数据
return {"input_features": batch["input_features"], "labels": batch["labels"]}
label_data = get_label_list(output_dir)
audio_data = get_audio_list(output_dir)
transcript_list = []
for label in tqdm(label_data):
with open(label, 'r', encoding='UTF8') as f:
line = f.readline()
transcript_list.append(line)
df = pd.DataFrame(data=transcript_list, columns = ["transcript"]) # 构建存储转录文本的DataFrame
df['audio_data'] = audio_data # 添加音频文件路径列
# 将音频路径转换为数据集,并按照Whisper要求将采样率统一为16000Hz
ds = Dataset.from_dict(
{"audio": [path for path in df["audio_data"]],
"transcripts": [transcript for transcript in df["transcript"]]}
).cast_column("audio", Audio(sampling_rate=16000))
# 将数据集划分为训练集、测试集与验证集
train_testvalid = ds.train_test_split(test_size=0.2)
test_valid = train_testvalid["test"].train_test_split(test_size=0.5)
datasets = DatasetDict(
{"train": train_testvalid["train"],
"test": test_valid["test"],
"valid": test_valid["train"]}
)
datasets = datasets.map(prepare_dataset, num_proc=2) # 并行执行数据集预处理映射
datasets = datasets.remove_columns(['audio', 'transcripts']) # 移除不必要的列
print('-'*48)
print(type(datasets))
print(datasets)
print('-'*48)
'''
登录Hugging Face后,上传最终数据集
'''
datasets.save_to_disk('/mnt/a/maxseats/preprocessed_cache.arrow')
# datasets.push_to_hub(dataset_name, token=token)
while True:
if token =="exit":
break
try:
datasets.push_to_hub(dataset_name, token=token)
print(f"Dataset {dataset_name} pushed to hub successfully. 恭喜!")
break
except Exception as e:
print(f"Failed to push dataset: {e}")
token = input("请输入您的Hugging Face API令牌:")
提供机构:
maxseats



