
    ,iTH                     n   d dl Z d dlZd dlmZmZmZmZmZmZm	Z	 d dl
mZ d dlmZ d dlmZmZ d dlmZmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZmZ d dlmZ d dlm Z! d dl"m#Z#m$Z$ d dl%m&Z&m'Z'm(Z( d dl)m*Z*  ejV                  e,      Z-e* G d dee             Z.defdZ/ G d dee      Z0 G d d      Z1y)    N)Any	AwaitableCallable	CoroutineListOptionalUnion)PubSubHandler)DefaultCommandExecutor)DEFAULT_GRACE_PERIODMultiDbConfig)AsyncDatabase	Databases)AsyncFailureDetector)HealthCheckHealthCheckPolicy)BackgroundScheduler)AsyncCoreCommandsAsyncRedisModuleCommands)CircuitBreaker)State)NoValidDatabaseExceptionUnhealthyDatabaseException)ChannelT
EncodableTKeyT)experimentalc                   T   e Zd ZdZdefdZd+dZd Zd Zde	fdZ
d	edd
fdZd	efdZdedefdZd	efdZd	edefdZdefdZdefdZd Zd Zd
dd
ddedgeeee   f   f   dedee   ded ee   f
d!Z d" Z!	 d,d#eee"ge#eed
f   f      fd$Z$d	edefd%Z%d&e&d'e'd(e'fd)Z(d* Z)y
)-MultiDBClientzx
    Client that operates on multiple logical Redis databases.
    Should be used in Active-Active database setups.
    configc           
         |j                         | _        |j                  s|j                         n|j                  | _        |j
                  | _        |j                  j                  |j                  |j                        | _        |j                  s|j                         n|j                  | _        |j                  |j!                         n|j                  | _        | j"                  j%                  | j                         |j&                  | _        |j*                  | _        |j.                  | _        | j0                  j3                  t4        g       t7        | j                  | j                  | j0                  | j"                  |j8                  |j:                  | j,                  | j(                        | _        d| _        tA        jB                         | _"        tG               | _$        || _%        d | _&        g | _'        d | _(        y )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF))r#   
_databaseshealth_checksdefault_health_checks_health_checkshealth_check_interval_health_check_intervalhealth_check_policyvaluehealth_check_probeshealth_check_delay_health_check_policyr"   default_failure_detectors_failure_detectorsr%   default_failover_strategy_failover_strategyset_databasesr)   _auto_fallback_intervalr(   _event_dispatcherr$   _command_retryupdate_supported_errorsConnectionRefusedErrorr   r&   r'   command_executorinitializedasyncioLock_hc_lockr   _bg_scheduler_config_recurring_hc_task	_hc_tasks_half_open_state_task)selfr    s     a/var/www/html/langgraph-service/venv/lib/python3.12/site-packages/redis/asyncio/multidb/client.py__init__zMultiDBClient.__init__   s    **, '' ((*%% 	 '-&B&B#7=7Q7Q7W7W&&(A(A8
!
 ++ ,,.)) 	 ''/ ,,.)) 	
 	--doo>'-'D'D$!'!8!8$22335K4LM 6"55oo--"55$66!00!33#'#?#?	!
 !02"&%)"    returnc                 Z   K   | j                   s| j                          d {    | S 7 wN)r@   
initializerI   s    rJ   
__aenter__zMultiDBClient.__aenter__K   s)     //### $s    +)+c                    K   | j                   r| j                   j                          | j                  r| j                  j                          | j                  D ]  }|j                           y wrO   )rF   cancelrH   rG   )rI   exc_type	exc_value	tracebackhc_tasks        rJ   	__aexit__zMultiDBClient.__aexit__P   sY     ""##**,%%&&--/~~ 	GNN	s   A/A1c                   K   d }| j                  |       d{    t        j                  | j                  j	                  | j
                  | j                               | _        d}| j                  D ]z  \  }}|j                  j                  | j                         |j                  j                  t        j                  k(  sS|rV| j                  j                  |       d{    d}| |st!        d      d| _        y7 7 w)zT
        Perform initialization of databases to define their initial state.
        c                    K   | wrO    )errors    rJ   raise_exception_on_failed_hcz>MultiDBClient.initialize.<locals>.raise_exception_on_failed_hc]   s     K   )on_errorNFTz4Initial connection failed - no active database found)_check_databases_healthrA   create_taskrD   run_recurring_asyncr/   rF   r*   circuiton_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDr?   set_active_databaser   r@   )rI   r^   is_active_db_founddatabaseweights        rJ   rP   zMultiDBClient.initializeX   s    
	 **4P*QQQ #*"5"522++,,#
 # $ 	*Hf--d.T.TU %%7@R++??III%)"	* "*F   5 	R$ Js.   DDB,D	DD*D
+D
Dc                     | j                   S )zE
        Returns a sorted (by weight) list of all databases.
        )r*   rQ   s    rJ   get_databaseszMultiDBClient.get_databases}   s     rL   rl   Nc                   K   d}| j                   D ]  \  }}||k(  sd} n |st        d      | j                  |       d{    |j                  j                  t
        j                  k(  rE| j                   j                  d      d   \  }}| j                  j                  |       d{    yt        d      7 {7 w)zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r*   
ValueError_check_db_healthrd   rg   rh   ri   	get_top_nr?   rj   r   )rI   rl   existsexisting_db_highest_weighted_dbs         rJ   rj   z!MultiDBClient.set_active_database   s      "oo 	NKh&	
 NOO##H---!!W^^3%)__%>%>q%A!%D"'';;HEEE&?
 	
 	. Fs)   C&CB?A*C.C/CCc                 P  K   | j                   D ]  \  }}||k(  st        d       | j                  |       d{    | j                   j                  d      d   \  }}| j                   j	                  ||j
                         | j                  ||       d{    y7 f7 w)z;
        Adds a new database to the database list.
        zGiven database already existsNrr   r   )r*   rs   rt   ru   addrm   _change_active_database)rI   rl   rw   rx   ry   highest_weights         rJ   add_databasezMultiDBClient.add_database   s      #oo 	BNKh& !@AA	B ##H---.2oo.G.G.J1.M+^Hhoo6**85HIII	 	. 	Js'   B& B&B"A B&B$B&$B&new_databasehighest_weight_databasec                    K   |j                   |j                   kD  rL|j                  j                  t        j                  k(  r$| j
                  j                  |       d {    y y y 7 wrO   )rm   rd   rg   rh   ri   r?   rj   )rI   r   r   s      rJ   r|   z%MultiDBClient._change_active_database   s`      "9"@"@@$$**gnn<'';;LIII = A Js   AA+!A)"A+c                 *  K   | j                   j                  |      }| j                   j                  d      d   \  }}||k  rL|j                  j                  t
        j                  k(  r$| j                  j                  |       d{    yyy7 w)z<
        Removes a database from the database list.
        rr   r   N)	r*   removeru   rd   rg   rh   ri   r?   rj   )rI   rl   rm   ry   r}   s        rJ   remove_databasezMultiDBClient.remove_database   s      ''1.2oo.G.G.J1.M+^ f$#++11W^^C'';;<OPPP D % Qs   BB	B
Brm   c                 $  K   d}| j                   D ]  \  }}||k(  sd} n |st        d      | j                   j                  d      d   \  }}| j                   j                  ||       ||_        | j                  ||       d{    y7 w)z<
        Updates a database from the database list.
        NTrq   rr   r   )r*   rs   ru   update_weightrm   r|   )rI   rl   rm   rv   rw   rx   ry   r}   s           rJ   update_database_weightz$MultiDBClient.update_database_weight   s      "oo 	NKh&	
 NOO.2oo.G.G.J1.M+^%%h7 **85HIIIs   BA+BB	Bfailure_detectorc                 :    | j                   j                  |       y)z>
        Adds a new failure detector to the database.
        N)r6   append)rI   r   s     rJ   add_failure_detectorz"MultiDBClient.add_failure_detector   s     	&&'78rL   healthcheckc                    K   | j                   4 d{    | j                  j                  |       ddd      d{    y7 07 # 1 d{  7  sw Y   yxY ww)z:
        Adds a new health check to the database.
        N)rC   r-   r   )rI   r   s     rJ   add_health_checkzMultiDBClient.add_health_check   sR      == 	4 	4&&{3	4 	4 	4 	4 	4 	4 	4sA   AAAAAAAAAAAAc                    K   | j                   s| j                          d{     | j                  j                  |i | d{   S 7 (7 w)zB
        Executes a single command and return its result.
        N)r@   rP   r?   execute_commandrI   argsoptionss      rJ   r   zMultiDBClient.execute_command   sK      //###:T**::DLGLLL $Ls!    AA#AAAAc                     t        |       S )z:
        Enters into pipeline mode of the client.
        )PipelinerQ   s    rJ   pipelinezMultiDBClient.pipeline   s     ~rL   F
shard_hintvalue_from_callablewatch_delayfuncr   watchesr   r   r   c                   K   | j                   s| j                          d{     | j                  j                  |g||||d d{   S 7 .7 w)z3
        Executes callable as transaction.
        Nr   )r@   rP   r?   execute_transaction)rI   r   r   r   r   r   s         rJ   transactionzMultiDBClient.transaction   sg      //###>T**>>

 " 3#
 
 	
 $
s!    AA)AAAAc                 n   K   | j                   s| j                          d{    t        | fi |S 7 w)z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        N)r@   rP   PubSub)rI   kwargss     rJ   pubsubzMultiDBClient.pubsub  s6      //###d%f%% $s    535r`   c           	        K   	 | j                   D cg c])  \  }}t        j                  | j                  |            + c}}| _        t        j
                  t        j                  | j                  ddi| j                         d{   }|D ]t  }t        |t              s|j                  }t        j                  |j                  _        t         j#                  d|j$                         |sc ||j$                         v yc c}}w 7 # t        j                  $ r t        j                  d      w xY ww)zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        return_exceptionsT)timeoutNz4Health check execution exceeds health_check_intervalz%Health check failed, due to exception)exc_info)r*   rA   rb   rt   rG   wait_forgatherr/   TimeoutError
isinstancer   rl   rh   OPENrd   rg   logger	exceptionoriginal_exception)rI   r`   rl   rx   resultsresultunhealthy_dbs          rJ   ra   z%MultiDBClient._check_databases_health  s!    	 $(??Ha ##D$9$9($CDDN $,,^^&* 33 G  	8F&"<=%-4\\$$*  ;#66 ! 
 V667	8! ## 	&&F 	sG   D>D .D
A
D DD D>&AD>5D>
D )D;;D>c                   K   | j                   j                  | j                  |       d{   }|sH|j                  j                  t
        j                  k7  rt
        j                  |j                  _        |S |rF|j                  j                  t
        j                  k7  rt
        j                  |j                  _        |S 7 w)zO
        Runs health checks on the given database until first failure.
        N)r4   executer-   rd   rg   rh   r   ri   )rI   rl   
is_healthys      rJ   rt   zMultiDBClient._check_db_health6  s     
  44<<
 

 %%5)0  &H,,22gnnD%,^^H"
s   *CCBCrd   	old_state	new_statec                 @   t        j                         }|t        j                  k(  r4t        j                  | j                  |j                              | _        y |t        j                  k(  r0|t        j                  k(  r|j                  t        t        |       y y y rO   )rA   get_running_looprh   	HALF_OPENrb   rt   rl   rH   ri   r   
call_laterr   _half_open_circuit)rI   rd   r   r   loops        rJ   rf   z/MultiDBClient._on_circuit_state_change_callbackH  s~     '')))))0)<)<%%g&6&67*D& &9+DOO02DgN ,E&rL   c                    K   | j                   j                  r7| j                   j                  j                  j                          d {    y y 7 wrO   )r?   active_databaseclientacloserQ   s    rJ   r   zMultiDBClient.acloseV  sA       00''77>>EEGGG 1Gs   AA
AA)rI   r   rM   r   rO   )*__name__
__module____qualname____doc__r   rK   rR   rY   rP   r   ro   r   rj   r~   r|   r   floatr   r   r   r   r   r   r   r   r	   r   r   r   r   strboolr   r   	Exceptionr   ra   rt   r   rh   rf   r   r\   rL   rJ   r   r      s   
,*} ,*\
# Jy 
- 
D 
2J= JJ)JDQJQm QJ] JE J&95I 94+ 4M %)$)'+

|U3	#+>%??@
 
 SM	

 "
 e_
,	& PT$88YK3T>1J$JKL$8L}  $O%O29OFMOHrL   r   rd   c                 .    t         j                  | _        y rO   )rh   r   rg   )rd   s    rJ   r   r   [  s    %%GMrL   c                   ~    e Zd ZdZdefdZddZd Zd Zd Z	de
fd	Zdefd
ZddZddZddZd Zdee   fdZy)r   zG
    Pipeline implementation for multiple logical Redis databases.
    r   c                      g | _         || _        y rO   )_command_stack_client)rI   r   s     rJ   rK   zPipeline.__init__d  s     rL   rM   c                    K   | S wrO   r\   rQ   s    rJ   rR   zPipeline.__aenter__h       r_   c                    K   | j                          d {    | j                  j                  |||       d {    y 7 *7 wrO   )resetr   rY   rI   rU   rV   rW   s       rJ   rY   zPipeline.__aexit__k  s:     jjlll$$Xy)DDD 	Ds   AA$AAAAc                 >    | j                         j                         S rO   )_async_self	__await__rQ   s    rJ   r   zPipeline.__await__o  s    !++--rL   c                    K   | S wrO   r\   rQ   s    rJ   r   zPipeline._async_selfr  r   r_   c                 ,    t        | j                        S rO   )lenr   rQ   s    rJ   __len__zPipeline.__len__u  s    4&&''rL   c                      y)z1Pipeline instances should always evaluate to TrueTr\   rQ   s    rJ   __bool__zPipeline.__bool__x  s    rL   Nc                    K   g | _         y wrO   )r   rQ   s    rJ   r   zPipeline.reset|  s      s   	c                 @   K   | j                          d{    y7 w)zClose the pipelineN)r   rQ   s    rJ   r   zPipeline.aclose  s     jjl   c                 @    | j                   j                  ||f       | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   r   r   s      rJ   pipeline_execute_commandz!Pipeline.pipeline_execute_command  s!     	""D'?3rL   c                 &     | j                   |i |S )zAdds a command to the stack)r   rI   r   r   s      rJ   r   zPipeline.execute_command  s    ,t,,d=f==rL   c                 t  K   | j                   j                  s"| j                   j                          d{    	 | j                   j                  j	                  t        | j                               d{   | j                          d{    S 7 ]7 7 	# | j                          d{  7   w xY ww)z0Execute all the commands in the current pipelineN)r   r@   rP   r?   execute_pipelinetupler   r   rQ   s    rJ   r   zPipeline.execute  s     ||'',,))+++	66GGd))*  **, , $**,sV   4B8BB8;B 7B8B ;B8BB8B B8B5.B1/B55B8)rI   r   rM   r   rM   N)rM   r   )r   r   r   r   r   rK   rR   rY   r   r   intr   r   r   r   r   r   r   r   r   r   r\   rL   rJ   r   r   _  sd    } E.( ($ !>
tCy 
rL   r   c                       e Zd ZdZdefdZddZddZd Ze	de
fd	       Zd
efdZd
edefdZd
efdZd
edefdZd Z	 dde
dee   fdZddddeddfdZy)r   z2
    PubSub object for multi database client.
    r   c                 ^    || _          | j                   j                  j                  di | y)zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        Nr\   )r   r?   r   )rI   r   r   s      rJ   rK   zPubSub.__init__  s(     ,%%,,6v6rL   rM   c                    K   | S wrO   r\   rQ   s    rJ   rR   zPubSub.__aenter__  r   r_   Nc                 @   K   | j                          d {    y 7 wrO   )r   r   s       rJ   rY   zPubSub.__aexit__  s     kkmr   c                 h   K   | j                   j                  j                  d       d {   S 7 w)Nr   r   r?   execute_pubsub_methodrQ   s    rJ   r   zPubSub.aclose  s'     \\22HHRRRRs   )202c                 V    | j                   j                  j                  j                  S rO   )r   r?   active_pubsub
subscribedrQ   s    rJ   r   zPubSub.subscribed  s    ||,,::EEErL   r   c                 l   K    | j                   j                  j                  dg|  d {   S 7 w)Nr   r   rI   r   s     rJ   r   zPubSub.execute_command  s:     HT\\22HH
 $
 
 	
 
   +424r   c                 r   K    | j                   j                  j                  dg|i | d{   S 7 w)aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscribeNr   r   s      rJ   r   zPubSub.psubscribe  sE      IT\\22HH

#)
 
 	
 
   .757c                 l   K    | j                   j                  j                  dg|  d{   S 7 w)zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscribeNr   r   s     rJ   r   zPubSub.punsubscribe  s=     
 IT\\22HH
!
 
 	
 
r   c                 r   K    | j                   j                  j                  dg|i | d{   S 7 w)aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscribeNr   r   s      rJ   r   zPubSub.subscribe  sE      IT\\22HH

"(
 
 	
 
r   c                 l   K    | j                   j                  j                  dg|  d{   S 7 w)zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscribeNr   r   s     rJ   r   zPubSub.unsubscribe  s=     
 IT\\22HH
 
 
 	
 
r   ignore_subscribe_messagesr   c                 n   K   | j                   j                  j                  d||       d{   S 7 w)a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number or None to wait indefinitely.
        get_message)r   r   Nr   )rI   r   r   s      rJ   r  zPubSub.get_message  s>      \\22HH&? I 
 
 	
 
   ,535g      ?)exception_handlerpoll_timeoutr  c                n   K   | j                   j                  j                  |||        d{   S 7 w)a  Process pub/sub messages using registered callbacks.

        This is the equivalent of :py:meth:`redis.PubSub.run_in_thread` in
        redis-py, but it is a coroutine. To launch it as a separate task, use
        ``asyncio.create_task``:

            >>> task = asyncio.create_task(pubsub.run())

        To shut it down, use asyncio cancellation:

            >>> task.cancel()
            >>> await task
        )
sleep_timer  r   N)r   r?   execute_pubsub_run)rI   r  r  s      rJ   runz
PubSub.run  s>     & \\22EE#7HQU F 
 
 	
 
r  )rM   r   r   )Fg        )r   r   r   r   r   rK   rR   rY   r   propertyr   r   r   r   r   r
   r   r   r   r   r   r   r   r  r  r\   rL   rJ   r   r     s    	7} 	7S FD F F
: 



h 

- 


 


X 

 


 SV
)-
@H
& !	
 	

 

rL   r   )2rA   loggingtypingr   r   r   r   r   r   r	   redis.asyncio.clientr
   &redis.asyncio.multidb.command_executorr   redis.asyncio.multidb.configr   r   redis.asyncio.multidb.databaser   r   &redis.asyncio.multidb.failure_detectorr   !redis.asyncio.multidb.healthcheckr   r   redis.backgroundr   redis.commandsr   r   redis.multidb.circuitr   r   rh   redis.multidb.exceptionr   r   redis.typingr   r   r   redis.utilsr   	getLoggerr   r   r   r   r   r   r\   rL   rJ   <module>r     s      M M M . I L C G L 0 F 0 2 X 3 3 $			8	$ AH,.? AH AHH
& &A'): AHq
 q
rL   