Granular
收藏Granular 数据集概述
概述
Granular 是一种数据集格式,适用于从简单到复杂的各种数据集。每个 Granular 数据集是由 [bag 文件格式][bag] 组成的链接文件集合,具有高性能的数据加载器。
特点
- 性能: 本地和云端的高读写吞吐量。
- 寻址: 通过数据点索引快速随机访问磁盘。
- 序列: 数据点可以包含可寻址的模态列表。
- 灵活性: 用户提供编码器和解码器;提供示例。
- 分片: 将数据集存储为分片以分割处理负载。
- 确定性: 每个 epoch 的全局洗牌是确定性和可恢复的。
- 正确性: 具有高代码覆盖率的单元测试套件。
快速开始
写入数据
python import pathlib import granular import numpy as np
directory = ./dataset
spec = { foo: int, # 整数 bar: utf8[], # 字符串列表 baz: msgpack, # 打包结构 abc: jpg, # 图像 xyz: array, # 数组 }
with granular.DatasetWriter(directory, spec, granular.encoders) as writer: for i in range(10): datapoint = { foo: i, bar: [hello] * i, baz: {a: 1}, abc: np.zeros((60, 80, 3), np.uint8), xyz: np.arange(0, 1 + i, np.float32), } writer.append(datapoint)
print(list(directory.glob(*)))
[spec.json, refs.bag, foo.bag, bar.bag, baz.bag, abc.bag, xyz.bag]
读取数据
python with granular.DatasetReader(directory, granular.decoders) as reader: print(reader.spec) # {foo: int, bar: utf8[], baz: msgpack, ...} print(reader.size) # 数据集大小(字节) print(len(reader)) # 数据点数量
datapoint = reader[2] print(datapoint[foo]) # 2 print(datapoint[bar]) # [hello, hello] print(datapoint[abc].shape) # (60, 80, 3)
加载数据
python def preproc(datapoint, seed): return {image: datapoint[abc], label: datapoint[foo]}
loader = granular.Loader( reader, batch=8, fns=[preproc], shuffle=True, workers=64, seed=0)
print(loader.spec)
{image: (np.uint8, (60, 80, 3)), label: (np.int64, ())}
dataset = iter(loader) for _ in range(100): batch = next(dataset) print(batch[image].shape) # (8, 60, 80, 3)
高级功能
文件系统
支持自定义文件系统,通过提供不同的 Path 实现。例如,在 Google Cloud 上可以使用 [elements][elements] 中优化的 Path:
python import elements # pip install elements
directory = elements.Path(gs://<bucket>/dataset)
reader = granular.DatasetReader(directory, ...) writer = granular.DatasetWriter(directory, ...)
格式
Granular 不强制用户使用特定的序列化解决方案。任何字符串都可以用作 spec 中的类型,只要提供了相应的编码器和解码器函数。
python import msgpack
encoders = { bytes: lambda x: x, utf8: lambda x: x.encode(utf-8), msgpack: msgpack.packb, }
decoders = { bytes: lambda x: x, utf8: lambda x: x.decode(utf-8), msgpack: msgpack.unpackb, }
恢复
数据加载器是完全确定性和可恢复的,只需提供步骤和种子整数。可以通过 loader.save() 保存状态字典,并在加载检查点时传递给 loader.load()。
python state = loader.save() print(state) # {step: 100, seed: 0} loader.load(state)
缓存
检索数据点需要先从 refs.bag 读取以找到其他 bag 文件的引用,然后从每个模态 bag 文件读取。如果某些模态足够小,可以通过设置 cache_keys 将其缓存在 RAM 中。
python reader = granular.DatasetReader( directory, decoders, cache_index=True, # 将所有 bag 文件的索引表缓存在内存中。 cache_keys=(refs, foo), # 完全缓存 refs.bag 和 foo.bag 在内存中。 )
掩码
可以仅加载数据点的子集键值。通过提供掩码和数据点索引来减少读取请求的数量:
python print(reader.spec) # {foo: int, bar: utf8, baz: array}
mask = {foo: True, baz: True} datapoint = reader[index, mask] print(foo in datapoint) # True print(bar in datapoint) # False print(baz in datapoint) # True
序列
每个数据集是数据点的列表。每个数据点是具有字符串键的字典,键值可以是单个字节值或字节值列表。要使用序列值,在 spec 中的类型后添加 [] 后缀:
python spec = { title: utf8, frames: jpg[], captions: utf8[], times: int[], }
序列字段不仅可以存储可变长度的值,还可以通过掩码读取值的范围而不需要从磁盘加载整个序列:
python available = reader.available(index) print(available)
{title: True, frames: range(54), captions: range(7), times: range(7)}
mask = { title: True, # 读取标题模态 frames: range(32, 42), # 读取10帧的范围。 captions: range(0, 7), # 读取所有字幕。 times: True, # 另一种读取完整列表的方式。 } datapoint = reader[index, mask] print(len(datapoint[frames])) # 10
分片
大型数据集可以存储为较小的数据集列表,以便轻松并行处理。分片长度指定每个分片的数据点数量。一个好的默认值是设置每个分片的大小约为10 Gb。
python
写入分片数据集。
writer = granular.ShardedDatasetWriter(directory, spec, encoders, shardlen=10000)
从分片数据集读取。
reader = granular.ShardedDatasetReader(directory, decoders)
分片数据集的文件结构是每个分片一个文件夹,以分片编号命名。每个分片本身是一个数据集,也可以使用非分片的 granular.DatasetReader 读取。
sh $ tree ./directory . ├── 000000 │ ├── spec.json │ ├── refs.bag │ ├── foo.bag │ ├── bar.bag │ └── baz.bag ├── 000001 │ ├── spec.json │ ├── refs.bag │ ├── foo.bag │ ├── bar.bag │ └── baz.bag └── ...
在处理具有大量分片的数据集时,指定 shardstart 和 shardstep,以便每个工作线程读取和写入其专用子集的分片。
python
写入分片数据集。
writer = granular.ShardedDatasetWriter( directory, spec, encoders, shardlen=10000, shardstart=worker_id, # 从此分片开始写入。 shardstep=num_workers, # 之后,跳过这么多分片。 )
从分片数据集读取。
reader = granular.ShardedDatasetReader( directory, decoders, shardstart=worker_id, # 从此分片开始读取。 shardstep=num_workers, # 之后,跳过这么多分片。 )




