Batch

Batch makes it easier to handle large inputs in one Node or rerun a Flow multiple times. Handy for:

  • Chunk-based processing (e.g., splitting large texts).
  • Multi-file processing.
  • Iterating over lists of params (e.g., user queries, documents, URLs).

1. BatchNode

A BatchNode extends Node but changes prep() and exec():

  • prep(shared): returns an iterable (e.g., list, generator).
  • exec(item): called once per item in that iterable.
  • post(shared, prep_res, exec_res_list): after all items are processed, receives a list of results (exec_res_list) and returns an Action.

Example: Summarize a Large File

class MapSummaries(BatchNode):
    def prep(self, shared):
        # Suppose we have a big file; chunk it
        content = shared["data"].get("large_text.txt", "")
        chunk_size = 10000
        chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
        return chunks

    def exec(self, chunk):
        prompt = f"Summarize this chunk in 10 words: {chunk}"
        summary = call_llm(prompt)
        return summary

    def post(self, shared, prep_res, exec_res_list):
        combined = "\n".join(exec_res_list)
        shared["summary"]["large_text.txt"] = combined
        return "default"

map_summaries = MapSummaries()
flow = Flow(start=map_summaries)
flow.run(shared)

2. BatchFlow

A BatchFlow runs a Flow multiple times, each time with different params. Think of it as a loop that replays the Flow for each parameter set.

Example: Summarize Many Files

class SummarizeAllFiles(BatchFlow):
    def prep(self, shared):
        # Return a list of param dicts (one per file)
        filenames = list(shared["data"].keys())  # e.g., ["file1.txt", "file2.txt", ...]
        return [{"filename": fn} for fn in filenames]

# Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce):
summarize_file = SummarizeFile(start=load_file)

# Wrap that flow into a BatchFlow:
summarize_all_files = SummarizeAllFiles(start=summarize_file)
summarize_all_files.run(shared)

Under the Hood

  1. prep(shared) returns a list of param dicts—e.g., [{filename: "file1.txt"}, {filename: "file2.txt"}, ...].
  2. The BatchFlow loops through each dict. For each one:
    • It merges the dict with the BatchFlow’s own params.
    • It calls flow.run(shared) using the merged result.
  3. This means the sub-Flow is run repeatedly, once for every param dict.

3. Nested or Multi-Level Batches

You can nest a BatchFlow in another BatchFlow. For instance:

  • Outer batch: returns a list of diretory param dicts (e.g., {"directory": "/pathA"}, {"directory": "/pathB"}, …).
  • Inner batch: returning a list of per-file param dicts.

At each level, BatchFlow merges its own param dict with the parent’s. By the time you reach the innermost node, the final params is the merged result of all parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once.


class FileBatchFlow(BatchFlow):
    def prep(self, shared):
        directory = self.params["directory"]
        files = [f for f in os.listdir(directory) if f.endswith(".txt")]
        return [{"filename": f} for f in files]

class DirectoryBatchFlow(BatchFlow):
    def prep(self, shared):
        directories = [ "/path/to/dirA", "/path/to/dirB"]
        return [{"directory": d} for d in directories]


inner_flow = FileBatchFlow(start=MapSummaries())
outer_flow = DirectoryBatchFlow(start=inner_flow)