请问HN:为什么我们没有一个功能齐全的数据+嵌入+API管道的领域特定语言(DSL)?
我正在处理一个相当常见的问题:
- 我有结构化数据存储在 JSONL 文件中(in.jsonl,out.jsonl)
- 我通过一个键匹配行
- 我将它们转换为(文本,嵌入)对
- 我可选地对它们进行过滤/映射
- 我将它们批处理(每批 50 个)
- 我将每批推送到外部系统(例如,向量数据库,Chroma)
就这样。听起来很简单。但它很快就变成了丑陋的命令式 Python 代码:嵌套的 for 循环、全局索引、+=、手动批处理、逐行处理、低级 JSON 解析。
在 Python 中,它通常看起来是这样的:
```python
with open("in.json", "r") as fin:
with open("out.json", "r") as fout:
for in_line, out_line in zip(fin, fout):
in_data = json.loads(in_line)
out_data = json.loads(out_line)
if in_data["custom_id"] != out_data["custom_id"]:
raise Exception...
texts = in_data["body"]["input"]
embeddings = [d["embedding"] for d in out_data["response"]["body"]["data"]]
for i in range(len(texts)):
doc = texts[i]
emb = embeddings[i]
metadata = {
"source": f"chunk-{global_ids}",
```
我们已经到了 2025 年,这就是我们将数据接入 API 的方式。
---
为什么我们要容忍这种情况?
这是一个声明式、流式的数据处理问题。为什么我们不使用更优雅的方式?更具组合性的方式,比如函数式管道?
我在想:为什么我们没有一个可组合的、流式的、函数式的领域特定语言(DSL)来处理这种任务?
---
为什么不像 Unix 管道那样构建它?
我想要的是一种感觉像这样的方式:
```bash
cat input.jsonl \
| match output.jsonl on custom_id \
| extract (text, embedding) \
| filter not-empty \
| batch 50 \
| send-to-chroma
```
---
在 Lisp / Clojure 中:
```clojure
(->> (zip input output)
(filter (= :custom_id))
(mapcat (fn [[in out]] (zip (:input in) (:embedding out))))
(partition-all 50)
(map send-to-chroma))
```
---
在 Elixir + Broadway 中:
```elixir
Broadway
|> read_stream("in.jsonl", "out.jsonl")
|> match_on(:custom_id)
|> map(&{&1.text, &1.embedding})
|> batch_every(50)
|> send_to_chroma()
```
---
现在,回到 Python...
我们被困在写命令式的代码或构建一些像这样的 hacky DSL 中:
```python
load_json_pairs() \
| where(is_valid) \
| select(to_embedding_record) \
| batch(50) \
| foreach(send_to_chroma)
```
...或者,更现实地,写成成千上万行的 `with open(...) as f`。
尽管像 tf.data.Dataset、dask.bag、pandas 或 pipe 这样的库存在,但它们都没有以连贯和富有表现力的方式真正解决这个用例。它们都专注于表格数据、大数据或机器学习输入管道——而不是这种“结构化数据 -> 转换 -> 推送到 API”的模式。
---
现在,尤其是当每个人都在做 RAG 时,这种情况显得尤为荒谬。
随着检索增强生成(RAG)成为常态,我们都在解析文件、提取嵌入、丰富元数据、批处理并插入向量存储。
为什么我们都在写相同的低级、临时代码来完成这些?
难道不应该由适当的 DSL/框架来解决这一整类工作吗?
---
建立以下内容难道不合理吗...
- 一个用于 JSON 到嵌入到 API 管道的函数式 DSL?
- 或者一个具有适当的映射、过滤、批处理、管道、接收语义的 Python 库?
- 甚至一个像 Elixir Broadway 或最小的函数式 Rx 风格图的流式运行时?
即使是 R 语言中的 dplyr 也有比我们在 Python 中为这些工作所做的更优雅的转换表达方式。
---
我是不是遗漏了什么?
有没有工具、语言或框架能够真正很好地解决这个问题?
还是说这只是工具生态系统中尚未填补的一个空白?
我很想听听其他人的做法——如果有人已经在研究这样的解决方案,那就更好了。
谢谢。
查看原文
I’ve been working on a pretty common problem:<p><pre><code> - I have structured data in JSONL files (in.jsonl, out.jsonl)
- I match lines by a key
- I transform them into (text, embedding) pairs
- I optionally filter/map them
- I batch them (into chunks of 50)
- I push each batch into an external system (e.g. vector DB, Chroma)
</code></pre>
That’s it. Sounds trivial. But it turns into ugly imperative Python code very quickly: nested for-loops, global indices, +=, manual batching, line-by-line handling, low-level JSON parsing.<p>Here’s what it usually looks like in Python:<p>```
with open("in.json", "r") as fin:
with open("out.json", "r") as fout:
for in_line, out_line in zip(fin, fout):
in_data = json.loads(in_line)
out_data = json.loads(out_line)<p><pre><code> if in_data["custom_id"] != out_data["custom_id"]:
raise Exception...
texts = in_data["body"]["input"]
embeddings = [d["embedding"] for d in out_data["response"]["body"]["data"]]
for i in range(len(texts)):
doc = texts[i]
emb = embeddings[i]
metadata = {
"source": f"chunk-{global_ids}",
</code></pre>
```<p>We’re in 2025, and this is how we’re wiring data into APIs.<p>---<p>Why do we tolerate this?<p>This is a declarative, streaming, data processing problem. Why aren’t we using something more elegant? Something more composable, like functional pipelines?<p>I'm asking myself: Why don’t we have a composable, streaming, functional DSL for this kind of task?<p>---<p>Why not build it like Unix pipes?<p>What I want is something that feels like:<p>```
cat input.jsonl \
| match output.jsonl on custom_id \
| extract (text, embedding) \
| filter not-empty \
| batch 50 \
| send-to-chroma
```<p>---
In Lisp / Clojure:<p>```
(->> (zip input output)
(filter (= :custom_id))
(mapcat (fn [[in out]] (zip (:input in) (:embedding out))))
(partition-all 50)
(map send-to-chroma))
```<p>---<p>In Elixir + Broadway:<p>```
Broadway
|> read_stream("in.jsonl", "out.jsonl")
|> match_on(:custom_id)
|> map(&{&1.text, &1.embedding})
|> batch_every(50)
|> send_to_chroma()
```<p>---<p>And now, back to Python..<p>We’re stuck writing imperative soup or building hacky DSLs with things like:<p>```
load_json_pairs() \
| where(is_valid) \
| select(to_embedding_record) \
| batch(50) \
| foreach(send_to_chroma)
````<p>...or, more realistically, writing thousands of lines of with open(...) as f.<p>And even though libraries like tf.data.Dataset, dask.bag, pandas, or pipe exist, none of them really solve this use case in a cohesive and expressive way. They all focus on either tabular data, or big data, or ML input pipelines – not this "structured data -> transform -> push to API" pattern.<p>---<p>This is especially absurd now that everyone’s doing RAG<p>With Retrieval-Augmented Generation (RAG) becoming the norm, we’re all parsing files, extracting embeddings, enriching metadata, batching, and inserting into vector stores.<p>Why are we all writing the same low-level, ad-hoc code to do this?<p>Shouldn’t this entire category of work be addressed by proper DSL/framework?<p>---<p>Wouldn’t it make sense to build...
- a functional DSL for JSON-to-embedding-to-API pipelines?
- or a Python library with proper map, filter, batch, pipe, sink semantics?
- or even a streaming runtime like Elixir Broadway or a minimal functional Rx-style graph?<p>Even R with dplyr has more elegant ways to express transformation than what we do in Python for these jobs.<p>---<p>Am I missing something?<p>Is there a tool, a language, or a framework out there that actually solves this well?<p>Or is this just one of those gaps in the tooling ecosystem that no one has filled yet?<p>Would love to hear what others are doing – and if anyone’s already working on a solution like this.<p>Thanks.