Map Reduce

Process large inputs by splitting them into chunks using BatchNode, then combining results.

Example: Document Summarization

class MapSummaries(BatchNode):
    def prep(self, shared): return [shared["text"][i:i+10000] for i in range(0, len(shared["text"]), 10000)]
    def exec(self, chunk): return call_llm(f"Summarize this chunk: {chunk}")
    def post(self, shared, prep_res, exec_res_list): shared["summaries"] = exec_res_list

class ReduceSummaries(Node):
    def prep(self, shared): return shared["summaries"]
    def exec(self, summaries): return call_llm(f"Combine these summaries: {summaries}")
    def post(self, shared, prep_res, exec_res): shared["final_summary"] = exec_res

# Connect nodes
map_node = MapSummaries()
reduce_node = ReduceSummaries()
map_node >> reduce_node

# Create flow 
summarize_flow = Flow(start=map_node)
summarize_flow.run(shared)