DataChain 是一个专为人工智能设计的现代 Python 数据框架库。它用于将非结构化数据组织成数据集,并在本地计算机上大规模处理它。 Datachain不会抽象或隐藏AI模型和API调用,而是有助于将它们集成到后现代数据堆栈中。
存储作为事实来源。
处理非结构化数据,无需来自 S3、GCP、Azure 和本地文件系统的冗余副本。
多模式数据支持:图像、视频、文本、PDF、JSON、CSV、镶木地板。
将文件和元数据合并成持久的、版本化的柱状数据集。
Python 友好的数据管道。
对 Python 对象和对象字段进行操作。
内置并行化和内存不足计算,无需 SQL 或 Spark。
数据丰富和处理。
使用本地 AI 模型和 LLM API 生成元数据。
按元数据过滤、联接和分组。按向量嵌入搜索。
将数据集传递到 Pytorch 和 Tensorflow,或将它们导出回存储。
效率。
并行化、内存不足的工作负载和数据缓存。
Python 对象字段的向量化运算:sum、count、avg 等。
优化的矢量搜索。
$ pip 安装数据链
存储由猫和狗的图像(dog.1048.jpg、cat.1009.jpg)组成,以“json-pairs”格式用真实值和模型推理进行注释,其中每个图像都有一个匹配的 JSON 文件,如 cat。 1009.json:
{“class”:“cat”,“id”:“1009”,“num_annotators”:8,“inference”:{“class”:“dog”,“confidence”:0.68} }
使用 JSON 元数据仅下载“高置信度猫”推断图像的示例:
from datachain import Column, DataChainmeta = DataChain.from_json("gs://datachain-demo/dogs-and-cats/*json", object_name="meta")images = DataChain.from_storage("gs://datachain-demo /dogs-and-cats/*jpg")images_id = images.map(id=lambda file: file.path.split('.')[-2]) 注释= images_id.merge(meta, on="id", right_on="meta.id")likely_cats = annotated.filter((Column("meta.inference.confidence") > 0.93) & (Column("meta.inference.class_") == "cat"))likely_cats.export_files("high-confidence-cats/", signal="file")
使用 Transformers 库通过简单的情感模型进行批量推理:
点安装变压器
下面的代码从云端下载文件,并对每个文件应用一个用户定义的函数。然后,检测到的所有具有积极情绪的文件都会被复制到本地目录。
from Transformers import pipelinefrom datachain import DataChain, Columnclassifier = pipeline("sentiment-analysis", device="cpu",model="distilbert/distilbert-base-uncased-finetuned-sst-2-english")def is_positive_dialogue_ending(file) - > bool:dialogue_ending = file.read()[-512:]返回分类器(dialogue_ending)[0]["label"] == "正"链 = ( DataChain.from_storage("gs://datachain-demo/chatbot-KiT/", object_name="file", type="text") .settings(并行=8,缓存=True) .map(is_positive=is_positive_dialogue_ending) .save("文件响应") )positive_chain = chain.filter(Column("is_positive") == True)positive_chain.export_files("./output")print(f"{positive_chain.count()} 个文件已导出")
导出了 13 个文件
$ ls 输出/datachain-demo/chatbot-KiT/ 15.txt 20.txt 24.txt 27.txt 28.txt 29.txt 33.txt 37.txt 38.txt 43.txt ... $ ls 输出/datachain-demo/chatbot-KiT/ |厕所-l 13
法学硕士可以作为通用分类器。在下面的示例中,我们使用 Mistral 的免费 API 来判断公开可用的聊天机器人对话框。请在 https://console.mistral.ai 获取免费的 Mistral API 密钥
$ pip install Mistralai (需要版本 >=1.0.0) $ 导出 MISTRAL_API_KEY=_your_key_
DataChain可以并行化API调用;免费的 Mistral 层最多同时支持 4 个请求。
from Mistralai import Mistralfrom datachain import File, DataChain, ColumnPROMPT = "此对话框成功吗?用一个词回答:成功或失败。"def eval_dialogue(file: File) -> bool: client = Mistral() response = client.chat .complete( model="open-mixtral-8x22b", messages=[{"角色": "系统", "内容": 提示}, {"角色": "用户", "内容": file.read()}]) result = response.choices[0].message.content return result.lower().startswith("success")chain = ( DataChain .from_storage("gs://datachain-demo/chatbot-KiT/", object_name="file") .settings(并行=4,缓存=True) .map(is_success=eval_dialogue) .save("mistral_files") )successful_chain = chain.filter(Column("is_success") == True)successful_chain.export_files("./output_mistral")print(f"{successful_chain.count()} 个文件已导出")
根据上述说明,Mistral 模型考虑 31/50 个文件来保存成功的对话:
$ ls output_mistral/datachain-demo/chatbot-KiT/ 1.txt 15.txt 18.txt 2.txt 22.txt 25.txt 28.txt 33.txt 37.txt 4.txt 41.txt ... $ ls output_mistral/datachain-demo/chatbot-KiT/ |厕所-l 31
LLM 响应可能包含对分析有价值的信息,例如使用的令牌数量或模型性能参数。
DataChain 可以将整个 LLM 响应序列化到内部数据库,而不是从 Mistral 响应数据结构(类 ChatCompletionResponse)中提取此信息:
from mistralai import Mistralfrom mistralai.models import ChatCompletionResponsefrom datachain import File, DataChain, ColumnPROMPT = "此对话框成功吗?用一个词回答:成功或失败。"def eval_dialog(file: File) -> ChatCompletionResponse: client = MistralClient()返回 client.chat( model="open-mixtral-8x22b", messages=[{"role": "system", “内容”:提示}, {"角色": "用户", "内容": file.read()}])chain = ( DataChain.from_storage("gs://datachain-demo/chatbot-KiT/", object_name="file") .settings(并行=4,缓存=True) .map(响应=eval_dialog) .map(状态=lambda响应:response.choices[0].message.content.lower()[:7]) .save("响应") )chain.select("file.name", "status", "response.usage").show(5)success_rate = chain.filter(Column("status") == "success").count() / chain .count()print(f"{100*success_rate:.1f}% 对话成功")
输出:
文件状态 响应 响应 响应 名称 用法 用法 用法 提示令牌 总计令牌 完成令牌 0 1.txt 成功 547 548 1 1 10.txt 失败 3576 3578 2 2 11.txt 失败 626 628 2 3 12.txt失败1144 1182 38 4 13.txt 成功 1100 1101 1 [仅限5行] 64.0% 对话成功
在前面的示例中,数据集保存在嵌入式数据库中(工作目录的 .datachain 文件夹中的 SQLite)。这些数据集自动进行版本控制,并且可以使用 DataChain.from_dataset("dataset_name") 进行访问。
以下是如何检索保存的数据集并迭代对象:
chain = DataChain.from_dataset("response")# 逐一迭代:支持内存不足的工作流程for file, response in chain.limit(5).collect("file", "response"):# 验证收集的Python对象assert isinstance(response, ChatCompletionResponse)status = response.choices[0].message.content[:7]tokens = response.usage.total_tokensprint(f"{file.get_uri()}: {status}, 文件大小: {file.size}, 令牌: {tokens}")
输出:
gs://datachain-demo/chatbot-KiT/1.txt:成功,文件大小:1776,代币:548 gs://datachain-demo/chatbot-KiT/10.txt:失败,文件大小:11576,令牌:3578 gs://datachain-demo/chatbot-KiT/11.txt:失败,文件大小:2045,令牌:628 gs://datachain-demo/chatbot-KiT/12.txt:失败,文件大小:3833,令牌:1207 gs://datachain-demo/chatbot-KiT/13.txt:成功,文件大小:3657,代币:1101
某些操作可以在数据库内部运行而无需反序列化。例如,让我们计算使用 LLM API 的总成本,假设 Mixtral 调用每 1M 个输入代币的成本为 2 美元,每 1M 个输出代币的成本为 6 美元:
chain = DataChain.from_dataset("mistral_dataset")cost = chain.sum("response.usage.prompt_tokens")*0.000002 + chain.sum("response.usage.completion_tokens")*0.000006print(f"在 {chain.count()} 调用上花费了 ${cost:.2f}")
输出:
50 次通话花费了 0.08 美元
链结果可以导出或直接传递到 PyTorch 数据加载器。例如,如果我们有兴趣传递图像和基于文件名后缀的标签,则以下代码将执行此操作:
from torch.utils.data import DataLoaderfrom Transformers import CLIPProcessorfrom datachain import C, DataChainprocessor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")chain = (DataChain.from_storage("gs://datachain-demo/dogs) -和-猫/”,类型=“图像”) .map(label=lambda 名称: name.split(".")[0], params=["file.name"]) .select("文件", "标签").to_pytorch(transform=processor.image_processor,tokenizer=processor.tokenizer, ) )loader = DataLoader(链,batch_size=1)
入门
多式联运(在 Colab 中尝试)
LLM 评估(在 Colab 中尝试)
读取 JSON 元数据(在 Colab 中尝试)
非常欢迎您的贡献。要了解更多信息,请参阅贡献者指南。
文档
如果您遇到任何问题,请提出问题
不和谐聊天
电子邮件
叽叽喳喳