Batch
Batch makes it easier to handle large inputs in one Node or rerun a Flow multiple times. Handy for:
- Chunk-based processing (e.g., splitting large texts).
- Multi-file processing.
- Iterating over lists of params (e.g., user queries, documents, URLs).
1. BatchNode
A BatchNode extends Node
but changes prep()
and exec()
:
prep(shared)
: returns an iterable (e.g., list, generator).exec(item)
: called once per item in that iterable.post(shared, prep_res, exec_res_list)
: after all items are processed, receives a list of results (exec_res_list
) and returns an Action.
Example: Summarize a Large File
class MapSummaries(BatchNode):
def prep(self, shared):
# Suppose we have a big file; chunk it
content = shared["data"].get("large_text.txt", "")
chunk_size = 10000
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
return chunks
def exec(self, chunk):
prompt = f"Summarize this chunk in 10 words: {chunk}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res_list):
combined = "\n".join(exec_res_list)
shared["summary"]["large_text.txt"] = combined
return "default"
map_summaries = MapSummaries()
flow = Flow(start=map_summaries)
flow.run(shared)
2. BatchFlow
A BatchFlow runs a Flow multiple times, each time with different params
. Think of it as a loop that replays the Flow for each parameter set.
Example: Summarize Many Files
class SummarizeAllFiles(BatchFlow):
def prep(self, shared):
# Return a list of param dicts (one per file)
filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...]
return [{"filename": fn} for fn in filenames]
# Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce):
summarize_file = SummarizeFile(start=load_file)
# Wrap that flow into a BatchFlow:
summarize_all_files = SummarizeAllFiles(start=summarize_file)
summarize_all_files.run(shared)
Under the Hood
prep(shared)
returns a list of param dicts—e.g.,[{filename: "file1.txt"}, {filename: "file2.txt"}, ...]
.- The BatchFlow loops through each dict. For each one:
- It merges the dict with the BatchFlow’s own
params
. - It calls
flow.run(shared)
using the merged result.
- It merges the dict with the BatchFlow’s own
- This means the sub-Flow is run repeatedly, once for every param dict.
3. Nested or Multi-Level Batches
You can nest a BatchFlow in another BatchFlow. For instance:
- Outer batch: returns a list of diretory param dicts (e.g.,
{"directory": "/pathA"}
,{"directory": "/pathB"}
, …). - Inner batch: returning a list of per-file param dicts.
At each level, BatchFlow merges its own param dict with the parent’s. By the time you reach the innermost node, the final params
is the merged result of all parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once.
class FileBatchFlow(BatchFlow):
def prep(self, shared):
directory = self.params["directory"]
files = [f for f in os.listdir(directory) if f.endswith(".txt")]
return [{"filename": f} for f in files]
class DirectoryBatchFlow(BatchFlow):
def prep(self, shared):
directories = [ "/path/to/dirA", "/path/to/dirB"]
return [{"directory": d} for d in directories]
inner_flow = FileBatchFlow(start=MapSummaries())
outer_flow = DirectoryBatchFlow(start=inner_flow)