本文导读

前言

看langgraph官方文档感觉human in the loop貌似还挺简单的,但实际上手时,那文档看得我云里雾里的。更详细的Guides和Reference,恕我能力有限,悲摧的也没看懂。作为试验,我想做一个功能:本地执行shell命令,每次执行前都要用户确认。左看官方文档, 右去西天请ChatGPT老祖。ChatGPT说得头头是道,Copilot也反复调试,但就是不能用。就这。。。看来碰到新东西AI就十分拉胯。最终,认真看了半天文档,没借助GPT,总算捣鼓出来一个最简版。

自从AI能力越来越强,大多时候自己更习惯直接让AI帮忙解决问题,越来越懒得看文档。自己找饭吃的能力还是得留着,不能光靠AI喂饭。

运行效果

试验嘛,交互就是命令行了。效果大概这样

$ python custom_workflow.pyAI助手已启动,输入 'quit'、'exit' 或 'q' 退出程序User: 今天的日期是什么Assistant: 今天的日期是 2025-09-03。User: 合肥的天气怎么样Assistant: 合肥的天气总是阳光明媚!User: 查看下本地内存占用Assistant: Do you approve executing this command: free -h? Please answer 'yes' or 'no'.User: yesAssistant: 当前本地内存占用情况如下total        used        free      shared  buff/cache   available内存:          62Gi        10Gi        46Gi       157Mi       6.5Gi        52Gi交换:         3.8Gi          0B       3.8GiUser: disk呢?Assistant: Do you approve executing this command: df -h? Please answer 'yes' or 'no'.User: yesAssistant: 当前磁盘使用情况如下:文件系统                 大小  已用  可用 已用% 挂载点udev                      32G     0   32G    0% /devtmpfs                    6.3G  1.8M  6.3G    1% /run/dev/mapper/debian-root  234G   29G  194G   13% /tmpfs                     32G   37M   32G    1% /dev/shmefivarfs                 128K   40K   84K   32% /sys/firmware/efi/efivarstmpfs                    5.0M   12K  5.0M    1% /run/locktmpfs                    1.0M     0  1.0M    0% /run/credentials/systemd-journald.servicetmpfs                     32G   49M   32G    1% /tmp/dev/nvme1n1p1           989M  256M  666M   28% /boot/dev/mapper/debian-home  676G  196G  446G   31% /home/dev/nvme0n1p1           300M   39M  262M   13% /boot/efitmpfs                    6.3G  4.1M  6.3G    1% /run/user/1000User: 非常好Assistant: 谢谢!如果您有其他问题或需要进一步的帮助,请随时告诉我。?User: quitGoodbye!

Code

注释写得够详细的了,具体可以直接看注释。LLM用的是阿里千问,注意替换成自己的。

checkpointer用的是内存,在生产环境,可以把checkpointer换成sqlite、postgres、redis等。

log就是个写日志文件的模块,不输出到控制台,之前调试的时候用来发给LLM做诊断,比较简单就不贴了。

在python命令行交互程序中,最好引用下readline模块,不然输入中文会碰到退格键没法正常用的问题,而且方向键也没法用。

"""Human in the loop 示例, 每当AI需要执行shell命令时, 都需要经过用户确认"""import osimport readline  # 引入readlint模块以增强命令行输入体验。Linux环境的Python标准库内置from datetime import datetimeimport subprocessimport tracebackfrom langchain_core.runnables import RunnableConfigfrom langchain_core.tools import toolfrom langchain_core.messages import HumanMessagefrom langchain_openai import ChatOpenAIfrom langgraph.checkpoint.memory import InMemorySaverfrom langgraph.graph import END, START, StateGraph, MessagesStatefrom langgraph.graph.state import CompiledStateGraphfrom langgraph.prebuilt import ToolNode, tools_conditionfrom langgraph.types import Command, interruptfrom langgraph.prebuilt import create_react_agent# 自定义一个简单的文件型日志记录器from log import logger# 设置API密钥os.environ["OPENAI_API_KEY"] = ""# 初始化语言模型llm = ChatOpenAI(    model="qwen-plus",    base_url="https://dashscope.*aliy*un*cs.com/compatible-mode/v1",)# 定义工具函数@tooldef get_date() -> str:    """获取今天的日期。        Returns:        str: 当前日期,格式为 YYYY-MM-DD    """    logger.info("Getting date")    return datetime.now().strftime("%Y-%m-%d")@tooldef get_weather(city: str) -> str:    """获取指定城市的天气信息。        Args:        city (str): 城市名称            Returns:        str: 天气信息描述    """    logger.info("Getting weather")    return f"It's always sunny in {city}!"@tooldef execute_command(command: str) -> str:    """本地执行shell命令, 每次执行前需要用户确认        Args:        command (str): 要执行的命令    Returns:        str: 命令执行结果或拒绝信息    """    # 使用interrupt函数暂停执行并请求用户确认    # interrupt会将控制权交还给用户,等待用户输入    decision = interrupt({"query": f"Do you approve executing this command: {command}? Please answer 'yes' or 'no'."})    logger.info(f"Decision: {decision}")        # 根据用户决策决定是否执行命令    if decision == "yes":        logger.info(f"Executing command, {command}")        try:            # 执行命令并获取结果            result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=10)            result = result.stdout or result.stderr            return result        except subprocess.TimeoutExpired:            return "Command timed out"        except Exception as e:            return f"Error executing command: {str(e)}"    else:        logger.info("Command execution denied by user")        return "Command execution denied by user"# 定义可用工具列表tools = [get_weather, get_date, execute_command]# 创建ReAct代理,它可以根据需要自动调用工具agent = create_react_agent(    model=llm,    tools=tools,    prompt="You are a helpful assistant.")# 创建工具节点,用于执行工具调用tool_node = ToolNode(tools=tools)# 创建内存检查点保存器,用于保存对话状态memory = InMemorySaver()# 配置运行时参数,使用固定的线程IDconfig = RunnableConfig(configurable={"thread_id": "1"})def create_graph() -> CompiledStateGraph:    """创建并返回工作流图。        Returns:        CompiledStateGraph: 编译后的工作流图    """    # 创建状态图,使用MessagesState作为状态类型    graph_builder = StateGraph(MessagesState)        # 添加节点    graph_builder.add_node("agent", agent)  # AI代理节点    graph_builder.add_node("tools", tool_node)  # 工具执行节点        # 添加边    graph_builder.add_edge(START, "agent")  # 从开始节点连接到代理节点    graph_builder.add_edge("tools", "agent")  # 从工具节点连接回代理节点        # 添加条件边,根据代理的决策决定下一步    graph_builder.add_conditional_edges(        "agent",        tools_condition,  # 条件函数,判断是否需要调用工具        {"tools": "tools", END: END}  # 映射:需要工具时转到工具节点,否则结束    )    # 编译图并返回,使用内存保存器来保存状态    return graph_builder.compile(checkpointer=memory)def handle_user_decision(user_input: str) -> bool:    """处理用户对中断的响应。        Args:        user_input (str): 用户的输入            Returns:        bool: 如果处理了中断返回True,否则返回False    """    # 创建图实例    graph = create_graph()        # 获取当前状态    current_state = graph.get_state(config)        # 检查是否有待处理的中断    if not current_state.next:        logger.warning("No pending interrupts to handle.")        return False  # 没有待处理的中断        # 根据用户输入决定如何响应中断    if user_input.lower() == "yes":        # 用户确认,继续执行        graph.invoke(Command(resume="yes"), config=config)    else:        # 用户拒绝,取消执行        graph.invoke(Command(resume="no"), config=config)            return True  # 处理了中断def graph_invoke(user_input: str):    """处理用户输入并执行相应操作。        Args:        user_input (str): 用户输入的文本    """    # 首先尝试处理用户对中断的响应    interrupt_handled = handle_user_decision(user_input)    # 如果已经处理了中断,则不再继续处理用户输入,而是显示结果    if interrupt_handled:        # 获取处理后的状态并显示结果        graph = create_graph()        current_state = graph.get_state(config)        if current_state.values and 'messages' in current_state.values:            # 显示最新的消息内容            messages = current_state.values['messages']            if messages:                last_message = messages[-1]                if hasattr(last_message, 'content'):                    print("Assistant:", last_message.content)        return    # 如果没有待处理的中断,则正常处理用户输入    graph = create_graph()    resp = graph.invoke({"messages": [HumanMessage(content=user_input)]}, config=config)        logger.debug(f"response: {resp}")        # 检查是否有中断需要处理    if "__interrupt__" in resp:        interrupt_data = resp["__interrupt__"]        interrupt = interrupt_data[0] if interrupt_data else None        if not interrupt or not hasattr(interrupt, "value"):            logger.error("Invalid interrupt data")            return                    interrupt_value = interrupt.value        # 显示中断请求给用户        print(f"Assistant: {interrupt_value['query']}")    else:        # 直接显示AI的响应        print("Assistant:", resp["messages"][-1].content)            logger.debug(f"Snapshot state: {graph.get_state(config)}")    logger.debug(f"Snapshot next: {graph.get_state(config).next}")# 程序入口点if __name__ == "__main__":    """主程序循环,处理用户输入并生成响应。"""        print("AI助手已启动,输入 'quit'、'exit' 或 'q' 退出程序")        while True:        try:            # 获取用户输入            user_input = input("User: ").strip()            logger.info(f"User input: {user_input}")                        # 检查退出命令            if user_input.lower() in ["quit", "exit", "q"]:                print("Goodbye!")                break                            # 处理用户输入            graph_invoke(user_input)                    except KeyboardInterrupt:            # 处理Ctrl+C中断            print("nGoodbye!")            break        except Exception as e:            # 记录并显示错误信息            logger.error(f"Error occurred: {traceback.format_exc()}")            print(f"Error: {traceback.format_exc()}")            break
本站提供的所有下载资源均来自互联网,仅提供学习交流使用,版权归原作者所有。如需商业使用,请联系原作者获得授权。 如您发现有涉嫌侵权的内容,请联系我们 邮箱:[email protected]