(Advanced) Async
Async Nodes implement prep_async()
, exec_async()
, exec_fallback_async()
, and/or post_async()
. This is useful for:
- prep_async()
- For fetching/reading data (files, APIs, DB) in an I/O-friendly way.
- exec_async()
- Typically used for async LLM calls.
- post_async()
- For awaiting user feedback, coordinating across multi-agents or any additional async steps after
exec_async()
.
- For awaiting user feedback, coordinating across multi-agents or any additional async steps after
Note: AsyncNode
must be wrapped in AsyncFlow
. AsyncFlow
can also include regular (sync) nodes.
Example
class SummarizeThenVerify(AsyncNode):
async def prep_async(self, shared):
# Example: read a file asynchronously
doc_text = await read_file_async(shared["doc_path"])
return doc_text
async def exec_async(self, prep_res):
# Example: async LLM call
summary = await call_llm_async(f"Summarize: {prep_res}")
return summary
async def post_async(self, shared, prep_res, exec_res):
# Example: wait for user feedback
decision = await gather_user_feedback(exec_res)
if decision == "approve":
shared["summary"] = exec_res
return "approve"
return "deny"
summarize_node = SummarizeThenVerify()
final_node = Finalize()
# Define transitions
summarize_node - "approve" >> final_node
summarize_node - "deny" >> summarize_node # retry
flow = AsyncFlow(start=summarize_node)
async def main():
shared = {"doc_path": "document.txt"}
await flow.run_async(shared)
print("Final Summary:", shared.get("summary"))
asyncio.run(main())