
    ,iH                     j   d dl Z d dlZd dlmZ d dlmZ d dlmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZ d dlmZmZ d dlmZmZmZ d dlm Z m!Z! d dl"m#Z# d dl$m%Z%m&Z& d dl'm(Z(  e jR                  e*      Z+e( G d dee             Z,defdZ- G d dee      Z. G d d      Z/y)    N)as_completed)ThreadPoolExecutor)AnyCallableListOptional)BackgroundScheduler)PubSubWorkerThread)CoreCommandsRedisModuleCommands)CircuitBreaker)State)DefaultCommandExecutor)DEFAULT_GRACE_PERIODMultiDbConfig)Database	DatabasesSyncDatabase)NoValidDatabaseExceptionUnhealthyDatabaseException)FailureDetector)HealthCheckHealthCheckPolicy)experimentalc                       e Zd ZdZdefdZd ZdefdZde	ddfd	Z
de	fd
Zde	de	fdZdefdZde	defdZdefdZdefdZd Zd Zdedgdf   fdZd Zde	defdZd#deegdf   fdZdeded efd!Z d" Z!y)$MultiDBClientzx
    Client that operates on multiple logical Redis databases.
    Should be used in Active-Active database setups.
    configc           
         |j                         | _        |j                  s|j                         n|j                  | _        |j
                  | _        |j                  j                  |j                  |j                        | _        |j                  s|j                         n|j                  | _        |j                  |j!                         n|j                  | _        | j"                  j%                  | j                         |j&                  | _        |j*                  | _        |j.                  | _        | j0                  j3                  t4        f       t7        | j                  | j                  | j0                  | j"                  |j8                  |j:                  | j,                  | j(                        | _        d| _        tA        jB                         | _"        tG               | _$        || _%        y )N)failure_detectors	databasescommand_retryfailover_strategyfailover_attemptsfailover_delayevent_dispatcherauto_fallback_intervalF)&r    
_databaseshealth_checksdefault_health_checks_health_checkshealth_check_interval_health_check_intervalhealth_check_policyvaluehealth_check_probeshealth_check_probes_delay_health_check_policyr   default_failure_detectors_failure_detectorsr"   default_failover_strategy_failover_strategyset_databasesr&   _auto_fallback_intervalr%   _event_dispatcherr!   _command_retryupdate_supported_errorsConnectionRefusedErrorr   r#   r$   command_executorinitialized	threadingRLock_hc_lockr	   _bg_scheduler_config)selfr   s     Y/var/www/html/langgraph-service/venv/lib/python3.12/site-packages/redis/multidb/client.py__init__zMultiDBClient.__init__   s    **, '' ((*%% 	
 '-&B&B#7=7Q7Q7W7W&&(H(H8
!
 ++ ,,.)) 	 ''/ ,,.)) 	
 	--doo>'-'D'D$!'!8!8$22335K4MN 6"55oo--"55$66!00!33#'#?#?	!
 !!)02    c                    d }| j                  |       | j                  j                  | j                  | j                          d}| j                  D ]h  \  }}|j
                  j                  | j                         |j
                  j                  t        j                  k(  sS|rV|| j                  _        d}j |st        d      d| _        y)zT
        Perform initialization of databases to define their initial state.
        c                     | N )errors    rD   raise_exception_on_failed_hcz>MultiDBClient.initialize.<locals>.raise_exception_on_failed_hcM   s    KrF   )on_errorFTz4Initial connection failed - no active database foundN)_check_databases_healthrA   run_recurringr,   r'   circuiton_state_changed!_on_circuit_state_change_callbackstateCBStateCLOSEDr<   active_databaser   r=   )rC   rL   is_active_db_founddatabaseweights        rD   
initializezMultiDBClient.initializeH   s    
	 	$$.J$K 	((''((	

 # $ 	*Hf--d.T.TU %%7@R8@%%5%)"	* "*F   rF   returnc                     | j                   S )zE
        Returns a sorted (by weight) list of all databases.
        )r'   rC   s    rD   get_databaseszMultiDBClient.get_databasesk   s     rF   rX   Nc                 F   d}| j                   D ]  \  }}||k(  sd} n |st        d      | j                  |       |j                  j                  t
        j                  k(  r3| j                   j                  d      d   \  }}|| j                  _	        yt        d      )zL
        Promote one of the existing databases to become an active.
        NT/Given database is not a member of database list   r   z1Cannot set active database, database is unhealthy)r'   
ValueError_check_db_healthrP   rS   rT   rU   	get_top_nr<   rV   r   )rC   rX   existsexisting_db_highest_weighted_dbs         rD   set_active_databasez!MultiDBClient.set_active_databaseq   s     "oo 	NKh&	
 NOOh'!!W^^3%)__%>%>q%A!%D"4<D!!1&?
 	
rF   c                     | j                   D ]  \  }}||k(  st        d       | j                  |       | j                   j                  d      d   \  }}| j                   j	                  ||j
                         | j                  ||       y)z;
        Adds a new database to the database list.
        zGiven database already existsra   r   N)r'   rb   rc   rd   addrY   _change_active_database)rC   rX   rf   rg   rh   highest_weights         rD   add_databasezMultiDBClient.add_database   s     #oo 	BNKh& !@AA	B 	h'.2oo.G.G.J1.M+^Hhoo6$$X/BCrF   new_databasehighest_weight_databasec                     |j                   |j                   kD  r:|j                  j                  t        j                  k(  r|| j
                  _        y y y rI   )rY   rP   rS   rT   rU   r<   rV   )rC   ro   rp   s      rD   rl   z%MultiDBClient._change_active_database   sJ     "9"@"@@$$**gnn<4@D!!1 = ArF   c                     | j                   j                  |      }| j                   j                  d      d   \  }}||k  r:|j                  j                  t
        j                  k(  r|| j                  _        yyy)z<
        Removes a database from the database list.
        ra   r   N)	r'   removerd   rP   rS   rT   rU   r<   rV   )rC   rX   rY   rh   rm   s        rD   remove_databasezMultiDBClient.remove_database   sr     ''1.2oo.G.G.J1.M+^ f$#++11W^^C4GD!!1 D %rF   rY   c                    d}| j                   D ]  \  }}||k(  sd} n |st        d      | j                   j                  d      d   \  }}| j                   j                  ||       ||_        | j                  ||       y)z<
        Updates a database from the database list.
        NTr`   ra   r   )r'   rb   rd   update_weightrY   rl   )rC   rX   rY   re   rf   rg   rh   rm   s           rD   update_database_weightz$MultiDBClient.update_database_weight   s     "oo 	NKh&	
 NOO.2oo.G.G.J1.M+^%%h7 $$X/BCrF   failure_detectorc                 :    | j                   j                  |       y)z>
        Adds a new failure detector to the database.
        N)r3   append)rC   rx   s     rD   add_failure_detectorz"MultiDBClient.add_failure_detector   s     	&&'78rF   healthcheckc                 |    | j                   5  | j                  j                  |       ddd       y# 1 sw Y   yxY w)z:
        Adds a new health check to the database.
        N)r@   r*   rz   )rC   r|   s     rD   add_health_checkzMultiDBClient.add_health_check   s4     ]] 	4&&{3	4 	4 	4s   2;c                 r    | j                   s| j                           | j                  j                  |i |S )zB
        Executes a single command and return its result.
        )r=   rZ   r<   execute_commandrC   argsoptionss      rD   r   zMultiDBClient.execute_command   s5     OO4t$$44dFgFFrF   c                     t        |       S )z:
        Enters into pipeline mode of the client.
        )Pipeliner]   s    rD   pipelinezMultiDBClient.pipeline   s     ~rF   funcr   c                 x    | j                   s| j                           | j                  j                  |g|| S )z3
        Executes callable as transaction.
        )r=   rZ   r<   execute_transaction)rC   r   watchesr   s       rD   transactionzMultiDBClient.transaction   s:     OO8t$$88RR'RRrF   c                 R    | j                   s| j                          t        | fi |S )z
        Return a Publish/Subscribe object. With this object, you can
        subscribe to channels and listen for messages that get published to
        them.
        )r=   rZ   PubSub)rC   kwargss     rD   pubsubzMultiDBClient.pubsub   s'     OOd%f%%rF   c                 v   | j                   j                  | j                  |      }|sH|j                  j                  t
        j                  k7  rt
        j                  |j                  _        |S |rF|j                  j                  t
        j                  k7  rt
        j                  |j                  _        |S )zO
        Runs health checks on the given database until first failure.
        )r1   executer*   rP   rS   rT   OPENrU   )rC   rX   
is_healthys      rD   rc   zMultiDBClient._check_db_health   s    
 ..66t7J7JHU
%%5)0  &H,,22gnnD%,^^H"rF   rM   c           	      b   t        t        | j                              5 }| j                  D ch c]!  \  }}|j                  | j                  |      # }}}	 t        || j                        D ]  }	 |j                           	 ddd       yc c}}w # t        $ rj}|j                  }t        j                  |j                  _        t        j                  d|j                          |r ||j                          Y d}~d}~ww xY w# t"        $ r t#        d      w xY w# 1 sw Y   yxY w)zk
        Runs health checks as a recurring task.
        Runs health checks against all databases.
        )max_workers)timeoutz%Health check failed, due to exception)exc_infoNz4Health check execution exceeds health_check_interval)r   lenr'   submitrc   r   r,   resultr   rX   rT   r   rP   rS   logger	exceptionoriginal_exceptionTimeoutError)	rC   rM   executorrX   rg   futuresfutureeunhealthy_dbs	            rD   rN   z%MultiDBClient._check_databases_health  s    
  C,@A 	X $(??Ha  5 5x@G 
*T%@%@ ;F;	;	 	 6 
;'(zz5<\\,,2((C%&%9%9 ) 
 $$Q%9%9:
;   "J 1	 	sY   D%&BD%D5BDD%	D
 A D DD

DD""D%%D.rP   	old_state	new_statec                     |t         j                  k(  r| j                  |j                         y |t         j                  k(  r:|t         j
                  k(  r&| j                  j                  t        t        |       y y y rI   )
rT   	HALF_OPENrc   rX   rU   r   rA   run_oncer   _half_open_circuit)rC   rP   r   r   s       rD   rR   z/MultiDBClient._on_circuit_state_change_callback#  se     )))!!'"2"23&9+D''$&8' ,E&rF   c                     | j                   r| j                   j                          | j                  j                  r/| j                  j                  j                  j                          yy)z:
        Closes the client and all its resources.
        N)rA   stopr<   rV   clientcloser]   s    rD   r   zMultiDBClient.close/  sQ     ##%  00!!1188>>@ 1rF   rI   )"__name__
__module____qualname____doc__r   rE   rZ   r   r^   r   ri   rn   rl   r   rt   floatrw   r   r{   r   r~   r   r   r   r   r   boolrc   	ExceptionrN   r   rT   rR   r   rJ   rF   rD   r   r      s   
(} (T! Fy 
L 
T 
2D\ DA(ACOAH HD| DU D&9_ 94K 4GS*t); < S	& $   )d9J0K  D
%
29
FM
ArF   r   rP   c                 .    t         j                  | _        y rI   )rT   r   rS   )rP   s    rD   r   r   9  s    %%GMrF   c                   x    e Zd ZdZdefdZddZd Zd Zde	fdZ
defd	ZddZddZddZd Zdee   fdZy
)r   zG
    Pipeline implementation for multiple logical Redis databases.
    r   c                      g | _         || _        y rI   )_command_stack_client)rC   r   s     rD   rE   zPipeline.__init__B  s     rF   r[   c                     | S rI   rJ   r]   s    rD   	__enter__zPipeline.__enter__F      rF   c                 $    | j                          y rI   reset)rC   exc_type	exc_value	tracebacks       rD   __exit__zPipeline.__exit__I      

rF   c                 D    	 | j                          y # t        $ r Y y w xY wrI   r   r   r]   s    rD   __del__zPipeline.__del__L  s"    	JJL 		    	c                 ,    t        | j                        S rI   )r   r   r]   s    rD   __len__zPipeline.__len__R  s    4&&''rF   c                      y)z1Pipeline instances should always evaluate to TrueTrJ   r]   s    rD   __bool__zPipeline.__bool__U  s    rF   Nc                     g | _         y rI   )r   r]   s    rD   r   zPipeline.resetY  s
     rF   c                 $    | j                          y)zClose the pipelineNr   r]   s    rD   r   zPipeline.close\  s    

rF   c                 @    | j                   j                  ||f       | S )ar  
        Stage a command to be executed when execute() is next called

        Returns the current Pipeline object back so commands can be
        chained together, such as:

        pipe = pipe.set('foo', 'bar').incr('baz').decr('bang')

        At some other point, you can then run: pipe.execute(),
        which will execute all commands queued in the pipe.
        )r   rz   r   s      rD   pipeline_execute_commandz!Pipeline.pipeline_execute_command`  s!     	""D'?3rF   c                 &     | j                   |i |S )zAdds a command to the stack)r   rC   r   r   s      rD   r   zPipeline.execute_commando  s    ,t,,d=f==rF   c                    | j                   j                  s| j                   j                          	 | j                   j                  j	                  t        | j                              | j                          S # | j                          w xY w)z0Execute all the commands in the current pipeline)r   r=   rZ   r<   execute_pipelinetupler   r   r]   s    rD   r   zPipeline.executes  s_    ||''LL##%	<<00AAd))* JJLDJJLs   7A: :B)r[   r   r[   N)r   r   r   r   r   rE   r   r   r   intr   r   r   r   r   r   r   r   r   r   rJ   rF   rD   r   r   =  s^    } ( ($ !>
c 
rF   r   c                       e Zd ZdZdefdZddZddZddZdd	Z	e
defd
       Zd Zd Zd Zd Zd Zd Zd Z	 ddedefdZ	 ddedefdZ	 	 	 	 ddededee   deddf
dZy) r   z2
    PubSub object for multi database client.
    r   c                 ^    || _          | j                   j                  j                  di | y)zInitialize the PubSub object for a multi-database client.

        Args:
            client: MultiDBClient instance to use for pub/sub operations
            **kwargs: Additional keyword arguments to pass to the underlying pubsub implementation
        NrJ   )r   r<   r   )rC   r   r   s      rD   rE   zPubSub.__init__  s(     ,%%,,6v6rF   r[   c                     | S rI   rJ   r]   s    rD   r   zPubSub.__enter__  r   rF   Nc                 D    	 | j                          y # t        $ r Y y w xY wrI   r   r]   s    rD   r   zPubSub.__del__  s$    	 JJL 		r   c                 L    | j                   j                  j                  d      S )Nr   r   r<   execute_pubsub_methodr]   s    rD   r   zPubSub.reset  s    ||,,BB7KKrF   c                 $    | j                          y rI   r   r]   s    rD   r   zPubSub.close  r   rF   c                 V    | j                   j                  j                  j                  S rI   )r   r<   active_pubsub
subscribedr]   s    rD   r   zPubSub.subscribed  s    ||,,::EEErF   c                 P     | j                   j                  j                  dg| S )Nr   r   rC   r   s     rD   r   zPubSub.execute_command  s,    Bt||,,BB
 $
 	
rF   c                 V     | j                   j                  j                  dg|i |S )aE  
        Subscribe to channel patterns. Patterns supplied as keyword arguments
        expect a pattern name as the key and a callable as the value. A
        pattern's callable will be invoked automatically when a message is
        received on that pattern rather than producing a message via
        ``listen()``.
        
psubscriber   r   s      rD   r   zPubSub.psubscribe  7     Ct||,,BB

#)
 	
rF   c                 P     | j                   j                  j                  dg| S )zj
        Unsubscribe from the supplied patterns. If empty, unsubscribe from
        all patterns.
        punsubscriber   r   s     rD   r   zPubSub.punsubscribe  /    
 Ct||,,BB
!
 	
rF   c                 V     | j                   j                  j                  dg|i |S )aR  
        Subscribe to channels. Channels supplied as keyword arguments expect
        a channel name as the key and a callable as the value. A channel's
        callable will be invoked automatically when a message is received on
        that channel rather than producing a message via ``listen()`` or
        ``get_message()``.
        	subscriber   r   s      rD   r   zPubSub.subscribe  s7     Ct||,,BB

"(
 	
rF   c                 P     | j                   j                  j                  dg| S )zi
        Unsubscribe from the supplied channels. If empty, unsubscribe from
        all channels
        unsubscriber   r   s     rD   r   zPubSub.unsubscribe  s(    
 Ct||,,BB=XSWXXrF   c                 V     | j                   j                  j                  dg|i |S )az  
        Subscribes the client to the specified shard channels.
        Channels supplied as keyword arguments expect a channel name as the key
        and a callable as the value. A channel's callable will be invoked automatically
        when a message is received on that channel rather than producing a message via
        ``listen()`` or ``get_sharded_message()``.
        
ssubscriber   r   s      rD   r   zPubSub.ssubscribe  r   rF   c                 P     | j                   j                  j                  dg| S )zu
        Unsubscribe from the supplied shard_channels. If empty, unsubscribe from
        all shard_channels
        sunsubscriber   r   s     rD   r   zPubSub.sunsubscribe  r   rF   ignore_subscribe_messagesr   c                 R    | j                   j                  j                  d||      S )a  
        Get the next message if one is available, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_messager   r   r   rC   r   r   s      rD   r   zPubSub.get_message  s0     ||,,BB&? C 
 	
rF   c                 R    | j                   j                  j                  d||      S )a&  
        Get the next message if one is available in a sharded channel, otherwise None.

        If timeout is specified, the system will wait for `timeout` seconds
        before returning. Timeout should be specified as a floating point
        number, or None, to wait indefinitely.
        get_sharded_messager   r   r   s      rD   r   zPubSub.get_sharded_message  s0     ||,,BB!&? C 
 	
rF   
sleep_timedaemonexception_handlersharded_pubsubr
   c                 V    | j                   j                  j                  |||| |      S )N)r   r   r   r   )r   r<   execute_pubsub_run)rC   r   r   r   r   s        rD   run_in_threadzPubSub.run_in_thread  s6     ||,,??/) @ 
 	
rF   )r[   r   r   )F        )r  FNF)r   r   r   r   r   rE   r   r   r   r   propertyr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rJ   rF   rD   r   r     s    	7} 	7L FD F F






Y


 IL
)-
@E
" IL
)-
@E
$  04$

 
 $H-	

 
 

rF   r   )0loggingr>   concurrent.futuresr   concurrent.futures.threadr   typingr   r   r   r   redis.backgroundr	   redis.clientr
   redis.commandsr   r   redis.multidb.circuitr   r   rT   redis.multidb.command_executorr   redis.multidb.configr   r   redis.multidb.databaser   r   r   redis.multidb.exceptionr   r   redis.multidb.failure_detectorr   redis.multidb.healthcheckr   r   redis.utilsr   	getLoggerr   r   r   r   r   r   rJ   rF   rD   <module>r     s      + 8 0 0 0 + < 0 2 A D D D X : D $			8	$ ^A' ^A ^AB	& &@"L @FU
 U
rF   