
    2i'                     ~    d Z ddlZddlZddlmZmZmZmZmZm	Z	 ddl
m
Z
 ddlmZ ddlmZmZmZmZ  G d de      Zy)zd
Redis Checkpointer for LangGraph
Persists workflow state to Redis for human-in-the-loop interrupts
    N)AnyDictListOptionalSequenceTuple)datetime)BaseCheckpointSaver
CheckpointCheckpointMetadataCheckpointTuplec                       e Zd ZdZ	 	 	 d#dee   dedef fdZdej                  fdZ
d	ed
edefdZd	edefdZdeeef   dee   fdZ	 d$deeef   dededeeeef      deeef   f
dZdeeef   deeeef      deddfdZdddddeeef   deeeef      deeeef      dee   fdZdeeef   dee   fdZ	 d$deeef   dededeeeef      deeef   f
dZdeeef   deeeef      deddfdZdddddeeef   deeeef      deeeef      dee   fdZd	edefdZd%d edee   fd!Zd" Z  xZ!S )&RedisCheckpointerz
    Redis-based checkpoint saver for LangGraph workflows.

    Stores campaign state in Redis, enabling:
    - Human-in-the-loop approval workflows
    - Campaign pause/resume functionality
    - State persistence across service restarts
    N	redis_urlprefixttl_secondsc                     t         |           |xs t        j                  dd      | _        || _        || _        d | _        y )N	REDIS_URLzredis://localhost:6379/0)super__init__osgetenvr   r   r   _client)selfr   r   r   	__class__s       B/var/www/html/langgraph-service/checkpointer/redis_checkpointer.pyr   zRedisCheckpointer.__init__   sE     	" 
bii&'
 &.2    returnc                    K   | j                   't        j                  | j                  dd      | _         | j                   S w)zGet or create Redis clientzutf-8T)encodingdecode_responses)r   redisfrom_urlr   r   s    r   _get_clientzRedisCheckpointer._get_client-   s9     << >> !%DL
 ||s   A A	thread_idcheckpoint_idc                 (    | j                    | d| S )z#Generate Redis key for a checkpoint:r   )r   r&   r'   s      r   	_make_keyzRedisCheckpointer._make_key7   s    ++yk=/::r   c                 $    | j                    d| S )z&Generate Redis key for thread metadatathread:r*   )r   r&   s     r   _make_thread_keyz"RedisCheckpointer._make_thread_key;   s    ++gi[11r   configc                   K   |j                  di       j                  d      }|j                  di       j                  d      }|sy| j                          d{   }|s0| j                  |      }|j                  | d       d{   }|sy| j                  ||      }|j                  |       d{   }|sy	 t	        j
                  |      }t        |j                  dd      |d   |d	   |j                  d
i       |j                  di       |j                  di       |j                  dg             }	t        |j                  di       j                  dd      |j                  di       j                  dd      |j                  di       j                  di             }
t        ||	|
|j                  d            S 7 o7 C7 # t        j                  t        f$ r}t        d|        Y d}~yd}~ww xY ww)zLGet a checkpoint tuple by config (implements BaseCheckpointSaver.aget_tuple)configurabler&   r'   N:latestv   idtschannel_valueschannel_versionsversions_seenpending_sends)r3   r5   r6   r7   r8   r9   r:   metadatasourceunknownstepr   writesr<   r>   r?   parent_config)r/   
checkpointr;   rA   z0[RedisCheckpointer] Failed to parse checkpoint: )getr%   r.   r+   jsonloadsr   r   r   JSONDecodeErrorKeyErrorprint)r   r/   r&   r'   client
thread_keykeydataparsedrB   r;   es               r   
aget_tuplezRedisCheckpointer.aget_tuple?   s    JJ~r266{C	

>26::?K'')) ..y9J"(**
|7-C"DDM nnY6ZZ_$	ZZ%F#**S!$$<$<%zz*:B?!',>!C$jj"=$jj"=J *zz*b155h	JZZ
B/33FA>zz*b155hCH
 #%!$jj9	 ? *
 E
 %6 $$h/ 	DQCHI	sa   AHG
.H
G-H8G9HDG 	HHHH,G?:H?HHrB   r;   new_versionsc                 T  K   |j                  di       j                  d      }|st        d      |d   }| j                          d{   }|j                  dd      ||d   |j                  d	i       |j                  d
i       |j                  di       |j                  dg       |r|j                  dd      nd|r|j                  dd      nd|r|j                  di       ni d|j                  di       j                  d      t        j                         j                         d
}| j                  ||      }	| j                  |      }
|j                  |	t        j                  |      | j                         d{    |j                  |
 d|| j                         d{    |j                  |
 d|       d{    |j                  |
 ddd       d{    |j                  |
 d| j                         d{    d||diS 7 7 7 r7 W7 ;7 w)zSave a checkpointr1   r&   zthread_id required in configr5   Nr3   r4   r6   r7   r8   r9   r:   r<   r=   r>   r   r?   r@   rA   )
r3   r5   r6   r7   r8   r9   r:   r;   rA   saved_atexr2   :listc   r&   r'   )rC   
ValueErrorr%   r	   utcnow	isoformatr+   r.   setrD   dumpsr   lpushltrimexpire)r   r/   rB   r;   rP   r&   r'   rI   rL   rK   rJ   s              r   aputzRedisCheckpointer.aputp   s     JJ~r266{C	;<<"4('')) Q'T"(nn-=rB */A2 F'^^OR@'^^OR@?G(,,x;Y3;VQ/8@(,,x4b
 $ZZ;??P )335
" nnY6**95
 jjdjj.43C3CjDDD jjJ<w/4CSCSjTTT llj\/???llj\/B777mmzl%0$2B2BCCC &!.
 	
E *0 	E 	U 	@7Csm   AH(	H
D:H(H(H(-H .H(
H"H((H$)&H(H&H(H( H("H($H(&H(r?   task_idc                   K   |j                  di       j                  d      }|sy| j                          d{   }| j                  |      }||D cg c]	  \  }}||f c}}t        j                         j                         d}	|j                  | d| t        j                  |	t              | j                         d{    y7 c c}}w 7 w)z0Store pending writes (for interrupted workflows)r1   r&   N)ra   r?   
created_atz:pending_writes:)defaultrS   )rC   r%   r.   r	   rY   rZ   r[   rD   r\   strr   )
r   r/   r?   ra   r&   rI   rJ   channelvaluepending_writess
             r   aput_writeszRedisCheckpointer.aput_writes   s      JJ~r266{C	''))**95
 >DENGU'E"//+557
 jjl*7)4JJ~s3  
 	
 	
 *
 F	
s.   8CCCC$A&C
CCC)filterbeforelimitrj   rk   rl   c               Z  K   |j                  di       j                  d      }|sy| j                          d{   }| j                  |      }|j                  | dd|xs ddz
         d{   }|D ]*  }	d||	di}
| j	                  |
       d{   }|s&| , y7 j7 67 w)	zList checkpoints for a threadr1   r&   NrU   r   d   r4   rW   )rC   r%   r.   lrangerO   )r   r/   rj   rk   rl   r&   rI   rJ   checkpoint_idsr'   checkpoint_configresults               r   alistzRedisCheckpointer.alist   s      JJ~r266{C	''))**95
  &}}l% \cQ 
 
 , 		M!*%2!!  ??+<==F		 *
 >s9   8B+B%5B+0B'1$B+B)B+	B+'B+)B+c                     t        d      )z<Sync version of aget - raises NotImplementedError, use asynczUse async method aget() insteadNotImplementedError)r   r/   s     r   rC   zRedisCheckpointer.get   s    !"CDDr   c                     t        d      )z<Sync version of aput - raises NotImplementedError, use asynczUse async method aput() insteadru   )r   r/   rB   r;   rP   s        r   putzRedisCheckpointer.put   s     ""CDDr   c                     t        d      )zCSync version of aput_writes - raises NotImplementedError, use asyncz&Use async method aput_writes() insteadru   )r   r/   r?   ra   s       r   
put_writeszRedisCheckpointer.put_writes   s     ""JKKr   c                    t        d      )z=Sync version of alist - raises NotImplementedError, use asyncz Use async method alist() insteadru   )r   r/   rj   rk   rl   s        r   listzRedisCheckpointer.list   s     ""DEEr   c                   K   | j                          d{   }| j                  |      }|j                  | ddd       d{   }|D cg c]  }| j                  ||       }}|j	                  | d| dg       |j                  | d       d{   }|j	                  |       |r |j                  |  d{    y7 7 c c}w 7 87 w)z#Delete all checkpoints for a threadNrU   r   r2   z:pending_writes:*T)r%   r.   ro   r+   extendkeysdelete)r   r&   rI   rJ   rp   cidkeys_to_deletepending_keyss           r   delete_threadzRedisCheckpointer.delete_thread  s     ''))**95
  &}}
|5-A1bII 7E
/2DNN9c*
 
 	l'"l% 
 	 $[[J<7H)IJJl+&--000- * J
 K 1sQ   CC	.CCCC'2CC)CCCCCCpatternc                   K   | j                          d{   }|j                  | j                   d| d       d{   }|D cg c]1  }|j                  | j                   dd      j                  dd      3 c}S 7 j7 Cc c}w w)z%Get all thread IDs matching a patternNr-   r2    )r%   r   r   replace)r   r   rI   r   rK   s        r   get_thread_idsz RedisCheckpointer.get_thread_ids   s     ''))[[DKK=y!HII 
 KK4;;-w/4<<YK
 	
 *I
s2   B
B(B
B B
6B>B
B
B
c                 |   K   | j                   r*| j                   j                          d{    d| _         yy7 w)zClose Redis connectionN)r   closer$   s    r   r   zRedisCheckpointer.close)  s2     <<,,$$&&&DL &s   *<:<)Nzlanggraph:checkpoint:i ' )N)*)"__name__
__module____qualname____doc__r   re   intr   r"   Redisr%   r+   r.   r   r   r   rO   r   r   r`   r   r   ri   rs   rC   rx   rz   r|   boolr   r   r   r   __classcell__)r   s   @r   r   r      s    $(-%	3C=3 3 	35;; ;3 ;s ;s ;2# 2# 2/tCH~ /(?:S /l 264
S#X4
 4
 %	4

 tCH~.4
 
c3h4
l
S#X
 sCx)
 	

 

< ,0+/# S#X  c3h(	 
 c3h(  } FE$sCx. EXo-F E 26ES#XE E %	E
 tCH~.E 
c3hELS#XL sCx)L 	L
 
L ,0+/#	FS#X	F c3h(		F
 c3h(	F }	FS T 4
C 
$s) 
 r   r   )r   rD   r   typingr   r   r   r   r   r   r	   redis.asyncioasyncior"   langgraph.checkpoint.baser
   r   r   r   r    r   r   <module>r      s7   
  	 = =   Y + Y r   