Skip to content

AsyncRedisSaver fails to aput #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
nkilleen-work opened this issue Jun 4, 2025 · 2 comments · Fixed by #56
Closed

AsyncRedisSaver fails to aput #55

nkilleen-work opened this issue Jun 4, 2025 · 2 comments · Fixed by #56
Assignees

Comments

@nkilleen-work
Copy link

nkilleen-work commented Jun 4, 2025

Simple reproduction steps:

Dependency versions from pip freeze:

redis==6.2.0
redisvl==0.7.0
langgraph-checkpoint==2.0.26
langgraph-checkpoint-redis==0.0.6

Using Redis Stack 8.0.0 in cluster configuration.

Setup the AsyncRedisSaver using a RedisCluster object directly:

import asyncio
from typing import TypedDict
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.redis.aio import AsyncRedisSaver
from redis.asyncio.cluster import RedisCluster, ClusterNode

class SimpleState(TypedDict):
    value: str

async def set_value_node(state: SimpleState):
    return {"value": "Hello Redis Cluster"}

async def main():
    redis_client = RedisCluster(startup_nodes=[ClusterNode(host="node1", port=6380), ClusterNode(host="node2", port=6380), ClusterNode(host="node3", port=6380)], ssl=True)

    checkpointer = AsyncRedisSaver(redis_client=redis_client)
    await checkpointer.asetup()

    builder = StateGraph(SimpleState)
    builder.add_node("setter", set_value_node)
    builder.set_entry_point("setter")
    builder.add_edge("setter", END)
    graph = builder.compile(checkpointer=checkpointer)

    thread_config = {"configurable": {"thread_id": "simple-thread-1"}}
    
    initial_input = {"value": "initial"}
    final_output = await graph.ainvoke(initial_input, config=thread_config)
    print(f"Graph output: {final_output}")

    saved_state = await graph.aget_state(thread_config)
    print(f"Saved state: {saved_state.values if saved_state else 'Not found'}")

    await redis_client.aclose()

if __name__ == "__main__":
    asyncio.run(main())

Throws the following exception:

  File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/__init__.py", line 2788, in ainvoke
    async for chunk in self.astream(
  File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/__init__.py", line 2596, in astream
    async with AsyncPregelLoop(
                 ^^^^^^^^^^^^^^^^
  File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/loop.py", line 1393, in __aexit__
    return await exit_task
           ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/contextlib.py", line 754, in __aexit__
    raise exc_details[1]
  File "/usr/local/lib/python3.12/contextlib.py", line 737, in __aexit__
    cb_suppress = await cb(*exc_details)
                  ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/executor.py", line 209, in __aexit__
    raise exc
  File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/loop.py", line 1260, in _checkpointer_put_after_previous
    await prev
  File "/home/www/.local/lib/python3.12/site-packages/langgraph/pregel/loop.py", line 1262, in _checkpointer_put_after_previous
    await cast(BaseCheckpointSaver, self.checkpointer).aput(
  File "/home/www/.local/lib/python3.12/site-packages/langgraph/checkpoint/redis/aio.py", line 585, in aput
    raise e
  File "/home/www/.local/lib/python3.12/site-packages/langgraph/checkpoint/redis/aio.py", line 505, in aput
    await pipeline.json().set(checkpoint_key, "$", checkpoint_data)
          ^^^^^^^^^^^^^^^
  File "/home/www/.local/lib/python3.12/site-packages/redis/commands/redismodules.py", line 24, in json
    jj = JSON(client=self, encoder=encoder, decoder=decoder)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/www/.local/lib/python3.12/site-packages/redis/commands/json/__init__.py", line 71, in __init__
    if get_protocol_version(self.client) in ["3", 3]:
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/www/.local/lib/python3.12/site-packages/redis/commands/helpers.py", line 118, in get_protocol_version
    return client.nodes_manager.connection_kwargs.get("protocol")
           ^^^^^^^^^^^^^^^^^^^^
AttributeError: 'ClusterPipeline' object has no attribute 'nodes_manager'

From line https://github.com/redis-developer/langgraph-redis/blob/main/langgraph/checkpoint/redis/aio.py#L505

await pipeline.json().set(checkpoint_key, "$", checkpoint_data)
@nkilleen-work
Copy link
Author

I'm assuming based on the trace that this could very well be a redis-py issue.

@abrookins
Copy link
Contributor

Thank you for the repro example @nkilleen-work! I'm taking a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants