five

SFT-Math-Code-1M

收藏
魔搭社区2025-12-03 更新2025-12-06 收录
下载链接:
https://modelscope.cn/datasets/PrimeIntellect/SFT-Math-Code-1M
下载链接
链接失效反馈
官方服务:
资源简介:
# PrimeIntellect/SFT-Math-Code-1M <!-- Provide a quick summary of the dataset. --> A curation of SFT traces of DeepSeek-R1-0528 on math and code problems. ## Generation This dataset was created by running ````bash uv run create_data.py --subset-size 500000 --repo-id PrimeIntellect/SFT-Math-Code-1M ```` ````python # create_data.py # /// script # requires-python = ">=3.12" # dependencies = ["datasets", "jinja2"] # /// import argparse import json import os import sys from pathlib import Path from typing import cast assert "HF_TOKEN" in os.environ, "HF_TOKEN is not set" os.environ["HF_HOME"] = "/workspace/hf" from datasets import Dataset, DatasetDict, load_dataset from huggingface_hub import DatasetCard, DatasetCardData def format_num(num: float | int, precision: int = 2) -> str: """ Format a number in human-readable format with abbreviations. """ sign = "-" if num < 0 else "" num = abs(num) if num < 1e3: return f"{sign}{num:.{precision}f}" if isinstance(num, float) else f"{sign}{num}" elif num < 1e6: return f"{sign}{num / 1e3:.{precision}f}K" elif num < 1e9: return f"{sign}{num / 1e6:.{precision}f}M" else: return f"{sign}{num / 1e9:.{precision}f}B" def subset_dataset(dataset: Dataset, subset_size: int) -> Dataset: if subset_size > 0: return dataset.select(range(min(subset_size, len(dataset)))) return dataset def prepare_nemotron_math(subset_size: int) -> Dataset: # Load Nemotron-Math dataset dataset_name = "nvidia/Nemotron-Post-Training-Dataset-v1" nemotron_math = cast(Dataset, load_dataset(dataset_name, split="math")) nemotron_math = subset_dataset(nemotron_math, subset_size) print(f"Loaded {len(nemotron_math):,} math samples from {dataset_name}") # Process Nemotron-Math dataset def prepare_messages(example: dict) -> dict: # From: https://huggingface.co/datasets/nvidia/Nemotron-Post-Training-Dataset-v1 MATH_SYSTEM_PROMPT = ( "Solve the following math problem. Explain your reasoning and put the final answer in \\boxed{}." ) assert len(example["messages"]) == 2 prompt, completion = example["messages"] assert len(prompt["tool_calls"]) == len(completion["tool_calls"]) == 0 del prompt["tool_calls"] del completion["tool_calls"] prompt = [{"role": "system", "content": MATH_SYSTEM_PROMPT}] + [prompt] return {"prompt": prompt, "completion": [completion]} def add_source(example: dict, index: int) -> dict: return {"source": {"dataset": "nvidia/Nemotron-Post-Training-Dataset-v1", "split": "math", "index": index}} return ( nemotron_math.map(prepare_messages) .map(add_source, with_indices=True) .select_columns(["source", "prompt", "completion"]) ) def prepare_nemotron_code(subset_size: int) -> Dataset: # Load Nemotron-Code dataset dataset_name = "nvidia/Nemotron-Post-Training-Dataset-v1" nemotron_code = cast(Dataset, load_dataset(dataset_name, split="code")) nemotron_code = subset_dataset(nemotron_code, subset_size) print(f"Loaded {len(nemotron_code):,} code samples from {dataset_name}") # Process Nemotron-Code dataset hf_datasets = { "taco": load_dataset("BAAI/TACO", trust_remote_code=True), "apps": load_dataset("codeparrot/apps", trust_remote_code=True), "code_contests": load_dataset("deepmind/code_contests"), "open-r1/codeforces": load_dataset("open-r1/codeforces"), } def get_question(ds_name, split, index): # From https://huggingface.co/datasets/nvidia/OpenCodeReasoning-2#how-to-use-it benchmark = hf_datasets[ds_name][split][int(index)] if ds_name == "code_contests": if not benchmark["description"]: return None return benchmark["description"] elif ds_name in ["taco", "apps"]: return benchmark["question"] elif ds_name == "open-r1/codeforces": if not benchmark["description"]: return None question = benchmark["description"] if benchmark["input_format"]: question += "\n\nInput\n\n" + benchmark["input_format"] if benchmark["output_format"]: question += "\n\nOutput\n\n" + benchmark["output_format"] if benchmark["examples"]: question += "\n\nExamples" for example in benchmark["examples"]: if "input" in example: question += "\n\nInput\n\n" + example["input"] if "output" in example: question += "\n\nOutput\n\n" + example["output"] if benchmark["note"]: question += "\n\nNote\n\n" + benchmark["note"] return question return None def prepare_messages(example: dict) -> dict: # Extract prompt from external dataset metadata = json.loads(example["metadata"]) assert "dataset" in metadata and "split" in metadata and "index" in metadata ds_name, split, index = metadata["dataset"], metadata["split"], int(metadata["index"]) assert ds_name in list(hf_datasets.keys()) question = get_question(ds_name, split, index) assert question is not None assert example["messages"][0]["content"] == "-" # Prepare prompt and completion CODE_SYSTEM_PROMPT = "Write a solution for the following programming challenge. Provide a brief explanation of your approach, followed by the complete code." prompt = [{"role": "system", "content": CODE_SYSTEM_PROMPT}, {"role": "user", "content": question}] completion = example["messages"][1] assert len(completion["tool_calls"]) == 0 del completion["tool_calls"] return {"prompt": prompt, "completion": [completion]} def add_source(example: dict, index: int) -> dict: return {"source": {"dataset": "nvidia/OpenCodeReasoning-2", "split": "code", "index": index}} return ( nemotron_code.map(prepare_messages) .map(add_source, with_indices=True) .select_columns(["source", "prompt", "completion"]) ) def prepare_am(subset_size: int) -> Dataset: # Load AM dataset # dataset_name = "a-m-team/AM-DeepSeek-R1-0528-Distilled" # am: Dataset = load_dataset(dataset_name, split="train") # if SUBSET: # am = am.select(range(1000)) # print(f"Loaded {len(am):,} samples from {dataset_name}") # Process AM dataset # TODO(Mika): Somehow cannot load the AM dataset with error # `DatasetGenerationError: An error occurred while generating the dataset`` return Dataset.from_dict({}) def prepare_swe_swiss(subset_size: int) -> Dataset: # Load SWE-Swiss dataset swe_swiss = cast(Dataset, load_dataset("SWE-Swiss/SWESwiss-SFT-Merged-10K", split="train")) swe_swiss = subset_dataset(swe_swiss, subset_size) print(f"Loaded {len(swe_swiss):,} samples") def prepare_messages(example: dict) -> dict: assert len(example["messages"]) == 2 prompt, completion = example["messages"] assert prompt["name"] == completion["name"] == "" del prompt["name"], completion["name"] del prompt["loss_mask"], completion["loss_mask"] return {"prompt": [prompt], "completion": [completion]} def add_source(_example: dict, index: int) -> dict: return {"source": {"dataset": "SWE-Swiss/SWESwiss-SFT-Merged-10K", "split": "train", "index": index}} return ( swe_swiss.map(prepare_messages) .map(add_source, with_indices=True) .select_columns(["source", "prompt", "completion"]) ) def main(args: argparse.Namespace): # Prepare datasets nemotron_math = prepare_nemotron_math(args.subset_size) nemotron_code = prepare_nemotron_code(args.subset_size) # am = prepare_am(args.subset_size) # swe_swiss = prepare_swe_swiss(args.subset_size) # Combine datasets dataset_dict = DatasetDict( { "nemotron_math": nemotron_math, "nemotron_code": nemotron_code, # "am": am, # "swe_swiss": swe_swiss, } ) # Create dataset card card_meta = DatasetCardData( pretty_name=args.repo_id, language=["en"], license="apache-2.0", ) card = DatasetCard.from_template( card_data=card_meta, template_path="SIMPLE_TEMPLATE.md", dataset_name=args.repo_id, dataset_summary=args.summary, cmd=f"uv run create_data.py {' '.join(sys.argv[1:])}", source=Path(__file__).read_text(encoding="utf-8", errors="replace"), ) for split in dataset_dict.keys(): print(f"{split}: {format_num(len(dataset_dict[split]))} samples") # Push to hub print(f"Pushing to `{args.repo_id}`") dataset_dict.push_to_hub(args.repo_id, private=args.private) card.push_to_hub(args.repo_id) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--subset-size", type=int, default=-1) parser.add_argument("--private", action="store_true") parser.add_argument( "--summary", type=str, default="A curation of SFT traces of DeepSeek-R1-0528 on math and code problems." ) parser.add_argument("--repo-id", type=str, default="PrimeIntellect/SFT-Math-Code") args = parser.parse_args() assert "HF_TOKEN" in os.environ, "HF_TOKEN should be set" main(args) ````

# PrimeIntellect/SFT-Math-Code-1M <!-- 请简要概述该数据集。 --> 本数据集收录了DeepSeek-R1-0528在数学与代码问题上的监督微调(Supervised Fine-Tuning,SFT)轨迹。 ## 数据生成流程 本数据集通过执行以下命令生成: bash uv run create_data.py --subset-size 500000 --repo-id PrimeIntellect/SFT-Math-Code-1M python # create_data.py # /// script # requires-python = ">=3.12" # dependencies = ["datasets", "jinja2"] # /// import argparse import json import os import sys from pathlib import Path from typing import cast assert "HF_TOKEN" in os.environ, "HF_TOKEN 未设置" os.environ["HF_HOME"] = "/workspace/hf" from datasets import Dataset, DatasetDict, load_dataset from huggingface_hub import DatasetCard, DatasetCardData def format_num(num: float | int, precision: int = 2) -> str: """ 将数字格式化为带缩写的人类可读形式。 """ sign = "-" if num < 0 else "" num = abs(num) if num < 1e3: return f"{sign}{num:.{precision}f}" if isinstance(num, float) else f"{sign}{num}" elif num < 1e6: return f"{sign}{num / 1e3:.{precision}f}K" elif num < 1e9: return f"{sign}{num / 1e6:.{precision}f}M" else: return f"{sign}{num / 1e9:.{precision}f}B" def subset_dataset(dataset: Dataset, subset_size: int) -> Dataset: if subset_size > 0: return dataset.select(range(min(subset_size, len(dataset)))) return dataset def prepare_nemotron_math(subset_size: int) -> Dataset: # 加载Nemotron-Math数据集 dataset_name = "nvidia/Nemotron-Post-Training-Dataset-v1" nemotron_math = cast(Dataset, load_dataset(dataset_name, split="math")) nemotron_math = subset_dataset(nemotron_math, subset_size) print(f"已加载 {len(nemotron_math):,} 个数学样本,来自数据集 {dataset_name}") # 处理Nemotron-Math数据集 def prepare_messages(example: dict) -> dict: # 参考自 https://huggingface.co/datasets/nvidia/Nemotron-Post-Training-Dataset-v1 MATH_SYSTEM_PROMPT = ( "解决以下数学问题。请阐述推理过程,并将最终答案置于\boxed{}中。" ) assert len(example["messages"]) == 2 prompt, completion = example["messages"] assert len(prompt["tool_calls"]) == len(completion["tool_calls"]) == 0 del prompt["tool_calls"] del completion["tool_calls"] prompt = [{"role": "system", "content": MATH_SYSTEM_PROMPT}] + [prompt] return {"prompt": prompt, "completion": [completion]} def add_source(example: dict, index: int) -> dict: return {"source": {"dataset": "nvidia/Nemotron-Post-Training-Dataset-v1", "split": "math", "index": index}} return ( nemotron_math.map(prepare_messages) .map(add_source, with_indices=True) .select_columns(["source", "prompt", "completion"]) ) def prepare_nemotron_code(subset_size: int) -> Dataset: # 加载Nemotron-Code数据集 dataset_name = "nvidia/Nemotron-Post-Training-Dataset-v1" nemotron_code = cast(Dataset, load_dataset(dataset_name, split="code")) nemotron_code = subset_dataset(nemotron_code, subset_size) print(f"已加载 {len(nemotron_code):,} 个代码样本,来自数据集 {dataset_name}") # 加载外部代码数据集 hf_datasets = { "taco": load_dataset("BAAI/TACO", trust_remote_code=True), "apps": load_dataset("codeparrot/apps", trust_remote_code=True), "code_contests": load_dataset("deepmind/code_contests"), "open-r1/codeforces": load_dataset("open-r1/codeforces"), } def get_question(ds_name, split, index): # 参考自 https://huggingface.co/datasets/nvidia/OpenCodeReasoning-2#how-to-use-it benchmark = hf_datasets[ds_name][split][int(index)] if ds_name == "code_contests": if not benchmark["description"]: return None return benchmark["description"] elif ds_name in ["taco", "apps"]: return benchmark["question"] elif ds_name == "open-r1/codeforces": if not benchmark["description"]: return None question = benchmark["description"] if benchmark["input_format"]: question += " 输入格式 " + benchmark["input_format"] if benchmark["output_format"]: question += " 输出格式 " + benchmark["output_format"] if benchmark["examples"]: question += " 示例" for example in benchmark["examples"]: if "input" in example: question += " 输入 " + example["input"] if "output" in example: question += " 输出 " + example["output"] if benchmark["note"]: question += " 备注 " + benchmark["note"] return question return None def prepare_messages(example: dict) -> dict: # 从外部数据集提取提示词 metadata = json.loads(example["metadata"]) assert "dataset" in metadata and "split" in metadata and "index" in metadata ds_name, split, index = metadata["dataset"], metadata["split"], int(metadata["index"]) assert ds_name in list(hf_datasets.keys()) question = get_question(ds_name, split, index) assert question is not None assert example["messages"][0]["content"] == "-" # 准备提示词与补全内容 CODE_SYSTEM_PROMPT = "为以下编程挑战编写解决方案。请先简要阐述你的解题思路,再给出完整代码。" prompt = [{"role": "system", "content": CODE_SYSTEM_PROMPT}, {"role": "user", "content": question}] completion = example["messages"][1] assert len(completion["tool_calls"]) == 0 del completion["tool_calls"] return {"prompt": prompt, "completion": [completion]} def add_source(example: dict, index: int) -> dict: return {"source": {"dataset": "nvidia/OpenCodeReasoning-2", "split": "code", "index": index}} return ( nemotron_code.map(prepare_messages) .map(add_source, with_indices=True) .select_columns(["source", "prompt", "completion"]) ) def prepare_am(subset_size: int) -> Dataset: # 加载AM数据集 # dataset_name = "a-m-team/AM-DeepSeek-R1-0528-Distilled" # am: Dataset = load_dataset(dataset_name, split="train") # if SUBSET: # am = am.select(range(1000)) # print(f"已加载 {len(am):,} 个样本,来自数据集 {dataset_name}") # 处理AM数据集 # TODO(Mika):目前无法加载AM数据集,报错信息为:DatasetGenerationError: An error occurred while generating the dataset return Dataset.from_dict({}) def prepare_swe_swiss(subset_size: int) -> Dataset: # 加载SWE-Swiss数据集 swe_swiss = cast(Dataset, load_dataset("SWE-Swiss/SWESwiss-SFT-Merged-10K", split="train")) swe_swiss = subset_dataset(swe_swiss, subset_size) print(f"已加载 {len(swe_swiss):,} 个样本") def prepare_messages(example: dict) -> dict: assert len(example["messages"]) == 2 prompt, completion = example["messages"] assert prompt["name"] == completion["name"] == "" del prompt["name"], completion["name"] del prompt["loss_mask"], completion["loss_mask"] return {"prompt": [prompt], "completion": [completion]} def add_source(_example: dict, index: int) -> dict: return {"source": {"dataset": "SWE-Swiss/SWESwiss-SFT-Merged-10K", "split": "train", "index": index}} return ( swe_swiss.map(prepare_messages) .map(add_source, with_indices=True) .select_columns(["source", "prompt", "completion"]) ) def main(args: argparse.Namespace): # 准备数据集 nemotron_math = prepare_nemotron_math(args.subset_size) nemotron_code = prepare_nemotron_code(args.subset_size) # am = prepare_am(args.subset_size) # swe_swiss = prepare_swe_swiss(args.subset_size) # 合并数据集 dataset_dict = DatasetDict( { "nemotron_math": nemotron_math, "nemotron_code": nemotron_code, # "am": am, # "swe_swiss": swe_swiss, } ) # 创建数据集卡片 card_meta = DatasetCardData( pretty_name=args.repo_id, language=["en"], license="apache-2.0", ) card = DatasetCard.from_template( card_data=card_meta, template_path="SIMPLE_TEMPLATE.md", dataset_name=args.repo_id, dataset_summary=args.summary, cmd=f"uv run create_data.py {' '.join(sys.argv[1:])}", source=Path(__file__).read_text(encoding="utf-8", errors="replace"), ) for split in dataset_dict.keys(): print(f"{split}: {format_num(len(dataset_dict[split]))} 个样本") # 推送至Hugging Face Hub print(f"正在推送至仓库 `{args.repo_id}`") dataset_dict.push_to_hub(args.repo_id, private=args.private) card.push_to_hub(args.repo_id) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--subset-size", type=int, default=-1) parser.add_argument("--private", action="store_true") parser.add_argument( "--summary", type=str, default="本数据集收录了DeepSeek-R1-0528在数学与代码问题上的监督微调(Supervised Fine-Tuning,SFT)轨迹。" ) parser.add_argument("--repo-id", type=str, default="PrimeIntellect/SFT-Math-Code") args = parser.parse_args() assert "HF_TOKEN" in os.environ, "HF_TOKEN 应已设置" main(args)
提供机构:
maas
创建时间:
2025-08-16
5,000+
优质数据集
54 个
任务类型
进入经典数据集
二维码
社区交流群

面向社区/商业的数据集话题

二维码
科研交流群

面向高校/科研机构的开源数据集话题

数据驱动未来

携手共赢发展

商业合作