
    ,i.                     2   d dl mZmZ d dlmZmZ d dlmZmZmZm	Z	 d dl
mZmZmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZmZ d d	lmZmZmZmZ d d
lm Z m!Z!m"Z"m#Z#m$Z$ d dl%m&Z& d dl'm(Z(  G d de      Z) G d de)      Z* G d de)      Z+ G d de+e*      Z,y)    )ABCabstractmethod)datetime	timedelta)AnyCallableListOptional)PipelinePubSubPubSubWorkerThread)EventDispatcherInterfaceOnCommandsFailEvent)State)DEFAULT_AUTO_FALLBACK_INTERVAL)Database	DatabasesSyncDatabase)ActiveDatabaseChanged&CloseConnectionOnActiveDatabaseChangedRegisterCommandFailure"ResubscribeOnActiveDatabaseChanged)DEFAULT_FAILOVER_ATTEMPTSDEFAULT_FAILOVER_DELAYDefaultFailoverStrategyExecutorFailoverStrategyFailoverStrategyExecutor)FailureDetector)Retryc                   d    e Zd Zeedefd              Zej                  ededdfd              Zy)CommandExecutorreturnc                      y)zReturns auto-fallback interval.N selfs    c/var/www/html/langgraph-service/venv/lib/python3.12/site-packages/redis/multidb/command_executor.pyauto_fallback_intervalz&CommandExecutor.auto_fallback_interval        	    r(   Nc                      y)zSets auto-fallback interval.Nr$   r&   r(   s     r'   r(   z&CommandExecutor.auto_fallback_interval"   r)   r*   )__name__
__module____qualname__propertyr   floatr(   setterr$   r*   r'   r!   r!      sS        ""U t   #r*   r!   c                   h    e Zd ZefdefdZedefd       Zej                  de	ddfd       ZddZ
y)	BaseCommandExecutorr(   c                     || _         |  y N_auto_fallback_intervalr,   s     r'   __init__zBaseCommandExecutor.__init__*   s     (>$r*   r"   c                     | j                   S r6   r7   r%   s    r'   r(   z*BaseCommandExecutor.auto_fallback_interval1   s    +++r*   Nc                     || _         y r6   r7   r,   s     r'   r(   z*BaseCommandExecutor.auto_fallback_interval5   s
    '=$r*   c                     | j                   dk  ry t        j                         t        | j                         z   | _        y )Nr   )seconds)r8   r   nowr   _next_fallback_attemptr%   s    r'   _schedule_next_fallbackz+BaseCommandExecutor._schedule_next_fallback9   s6    ''!+&.llny008
 '
#r*   )r"   N)r-   r.   r/   r   r1   r9   r0   r(   r2   intr@   r$   r*   r'   r4   r4   )   s]     )G. %. , , , "">S >T > #>
r*   r4   c                      e Zd Zeedefd              Zeedee   fd              Z	ededdfd       Z
eedee   fd              Zej                  ededdfd	              Zeedee   fd
              Zej                  ededdfd              Zeedefd              Zeedefd              Zed        Zed        Zedefd       Zedeegdf   fd       Zedefd       Zedede fd       Z!y)SyncCommandExecutorr"   c                      y)zReturns a list of databases.Nr$   r%   s    r'   	databaseszSyncCommandExecutor.databasesC   r)   r*   c                      y)z$Returns a list of failure detectors.Nr$   r%   s    r'   failure_detectorsz%SyncCommandExecutor.failure_detectorsI   r)   r*   failure_detectorNc                      y)z=Adds a new failure detector to the list of failure detectors.Nr$   r&   rH   s     r'   add_failure_detectorz(SyncCommandExecutor.add_failure_detectorO        	r*   c                      y)z"Returns currently active database.Nr$   r%   s    r'   active_databasez#SyncCommandExecutor.active_databaseT   r)   r*   databasec                      y)z#Sets the currently active database.Nr$   )r&   rO   s     r'   rN   z#SyncCommandExecutor.active_databaseZ   r)   r*   c                      y)z Returns currently active pubsub.Nr$   r%   s    r'   active_pubsubz!SyncCommandExecutor.active_pubsub`   r)   r*   pubsubc                      y)zSets currently active pubsub.Nr$   r&   rS   s     r'   rR   z!SyncCommandExecutor.active_pubsubf   r)   r*   c                      y)z#Returns failover strategy executor.Nr$   r%   s    r'   failover_strategy_executorz.SyncCommandExecutor.failover_strategy_executorl   r)   r*   c                      y)zReturns command retry object.Nr$   r%   s    r'   command_retryz!SyncCommandExecutor.command_retryr   r)   r*   c                      y)z:Initializes a PubSub object on a currently active databaseNr$   )r&   kwargss     r'   rS   zSyncCommandExecutor.pubsubx   rL   r*   c                      y)z*Executes a command and returns the result.Nr$   )r&   argsoptionss      r'   execute_commandz#SyncCommandExecutor.execute_command}   rL   r*   command_stackc                      y)z)Executes a stack of commands in pipeline.Nr$   )r&   r`   s     r'   execute_pipelinez$SyncCommandExecutor.execute_pipeline   rL   r*   transactionc                      y)z1Executes a transaction block wrapped in callback.Nr$   )r&   rc   watchesr^   s       r'   execute_transactionz'SyncCommandExecutor.execute_transaction   s    
 	r*   method_namec                      y)z*Executes a given method on active pub/sub.Nr$   )r&   rg   r]   r[   s       r'   execute_pubsub_methodz)SyncCommandExecutor.execute_pubsub_method   rL   r*   
sleep_timec                      y)z!Executes pub/sub run in a thread.Nr$   )r&   rj   r[   s      r'   execute_pubsub_runz&SyncCommandExecutor.execute_pubsub_run   rL   r*   )"r-   r.   r/   r0   r   r   rE   r	   r   rG   rK   r
   r   rN   r2   r   r   rR   r   rW   r   rY   rS   r_   tuplerb   r   r   rf   strri   r1   r   rl   r$   r*   r'   rC   rC   B   s   9    4#8    _    (!3         x/    F t    ,D    u        e   #XJ$45     U   r*   rC   c                       e Zd Zeeefdee   dede	de
dedededef fd	Zed
efd       Zed
ee   fd       Zded
dfdZed
e	fd       Zed
ee   fd       Zej.                  ded
dfd       Zed
ee   fd       Zej.                  ded
dfd       Zed
efd       Zd ZdefdZdee gdf   fdZ!d Z"de#fdZ$d)d Z%d*d!ed"efd#Z&d$ Z'd% Z(d&efd'Z)d( Z* xZ+S )+DefaultCommandExecutorrG   rE   rY   failover_strategyevent_dispatcherfailover_attemptsfailover_delayr(   c	                    t         
|   |       |D ]  }	|	j                  |         || _        || _        || _        t        |||      | _        || _        d| _	        d| _
        i | _        | j                          | j                          y)a  
        Initialize the DefaultCommandExecutor instance.

        Args:
            failure_detectors: List of failure detector instances to monitor database health
            databases: Collection of available databases to execute commands on
            command_retry: Retry policy for failed command execution
            failover_strategy: Strategy for handling database failover
            event_dispatcher: Interface for dispatching events
            failover_attempts: Number of failover attempts
            failover_delay: Delay between failover attempts
            auto_fallback_interval: Time interval in seconds between attempts to fall back to a primary database
        )command_executorN)superr9   set_command_executor
_databases_failure_detectors_command_retryr   _failover_strategy_executor_event_dispatcher_active_database_active_pubsub_active_pubsub_kwargs_setup_event_dispatcherr@   )r&   rG   rE   rY   rq   rr   rs   rt   r(   fd	__class__s             r'   r9   zDefaultCommandExecutor.__init__   s    0 	/0# 	;B##T#:	; $"3++J0.,
( "24804%'"$$&$$&r*   r"   c                     | j                   S r6   )ry   r%   s    r'   rE   z DefaultCommandExecutor.databases   s    r*   c                     | j                   S r6   )rz   r%   s    r'   rG   z(DefaultCommandExecutor.failure_detectors   s    &&&r*   rH   Nc                 :    | j                   j                  |       y r6   )rz   appendrJ   s     r'   rK   z+DefaultCommandExecutor.add_failure_detector   s    &&'78r*   c                     | j                   S r6   )r{   r%   s    r'   rY   z$DefaultCommandExecutor.command_retry       """r*   c                     | j                   S r6   )r~   r%   s    r'   rN   z&DefaultCommandExecutor.active_database   s    $$$r*   rO   c                     | j                   }|| _         |A||ur<| j                  j                  t        || j                   | fi | j                         y y y r6   )r~   r}   dispatchr   r   )r&   rO   
old_actives      r'   rN   z&DefaultCommandExecutor.active_database   sf    **
 (!j&@""++%)) 00	 'A!r*   c                     | j                   S r6   r   r%   s    r'   rR   z$DefaultCommandExecutor.active_pubsub   r   r*   rS   c                     || _         y r6   r   rU   s     r'   rR   z$DefaultCommandExecutor.active_pubsub   s
    $r*   c                     | j                   S r6   )r|   r%   s    r'   rW   z1DefaultCommandExecutor.failover_strategy_executor   s    ///r*   c                 :      fd} j                  |      S )Nc                  v     j                   j                  j                  i } j                         | S r6   )r~   clientr_   _register_command_execution)responser]   r^   r&   s    r'   callbackz8DefaultCommandExecutor.execute_command.<locals>.callback   s9    Ct,,33CCTUWUH,,T2Or*   _execute_with_failure_detection)r&   r]   r^   r   s   ``` r'   r_   z&DefaultCommandExecutor.execute_command   s    	
 33HdCCr*   r`   c                 6      fd} j                  |      S )Nc                      j                   j                  j                         5 } D ]  \  }} | j                  |i |  | j	                         }j                         |cd d d        S # 1 sw Y   y xY wr6   )r~   r   pipeliner_   executer   )pipecommandr^   r   r`   r&   s       r'   r   z9DefaultCommandExecutor.execute_pipeline.<locals>.callback   sy    &&--668  D(5 >$GW(D(('=W=>  <<>00?     s   ?A//A8r   )r&   r`   r   s   `` r'   rb   z'DefaultCommandExecutor.execute_pipeline   s    	  33HmLLr*   rc   c                 <      fd} j                  |      S )Nc                  ~     j                   j                  j                  gi } j                  d       | S Nr$   )r~   r   rc   r   )r   r^   r&   rc   re   s    r'   r   z<DefaultCommandExecutor.execute_transaction.<locals>.callback	  sI    ?t,,33??%)0H ,,R0Or*   r   )r&   rc   re   r^   r   s   ```` r'   rf   z*DefaultCommandExecutor.execute_transaction  s    	 33H==r*   c                 4      fd} j                  |      S )Nc                      j                   2 j                  j                  j                  di  _          _        y r   )r   r~   r   rS   r   )r[   r&   s   r'   r   z/DefaultCommandExecutor.pubsub.<locals>.callback  s>    ""*&Id&;&;&B&B&I&I&SF&S#-3*r*   r   )r&   r[   r   s   `` r'   rS   zDefaultCommandExecutor.pubsub  s    	 33H==r*   rg   c                 @      fd}  j                   |g S )Nc                  f    t        j                        }  | i }j                         |S r6   )getattrrR   r   )methodr   r]   r[   rg   r&   s     r'   r   z>DefaultCommandExecutor.execute_pubsub_method.<locals>.callback  s7    T//=Ft.v.H,,T2Or*   r   )r&   rg   r]   r[   r   s   ```` r'   ri   z,DefaultCommandExecutor.execute_pubsub_method  s"    	 4t33HDtDDr*   c                 8      fd} j                  |      S )Nc                  >     j                   j                  fi  S r6   )r   run_in_thread)r[   r&   rj   s   r'   r   z;DefaultCommandExecutor.execute_pubsub_run.<locals>.callback%  s!    44&&44ZJ6JJr*   r   )r&   rj   r[   r   s   ``` r'   rl   z)DefaultCommandExecutor.execute_pubsub_run$  s    	K 33H==r*   r   cmdsc                 \      fd j                   j                  fd fd      S )zO
        Execute a commands execution callback with failure detection.
        c                  2    j                                   S r6   )_check_active_database)r   r&   s   r'   wrapperzGDefaultCommandExecutor._execute_with_failure_detection.<locals>.wrapper/  s    ''):r*   c                               S r6   r$   )r   s   r'   <lambda>zHDefaultCommandExecutor._execute_with_failure_detection.<locals>.<lambda>5  s	    GI r*   c                 *     j                   | g S r6   )_on_command_fail)errorr   r&   s    r'   r   zHDefaultCommandExecutor._execute_with_failure_detection.<locals>.<lambda>6  s    /$//== r*   )r{   call_with_retry)r&   r   r   r   s   ```@r'   r   z6DefaultCommandExecutor._execute_with_failure_detection*  s)    
	
 ""22=
 	
r*   c                 N    | j                   j                  t        ||             y r6   )r}   r   r   )r&   r   r]   s      r'   r   z'DefaultCommandExecutor._on_command_fail9  s    ''(;D%(HIr*   c                 @   | j                   a| j                   j                  j                  t        j                  k7  s0| j
                  dkD  rR| j                  t        j                         k  r0| j                  j                         | _        | j                          yyy)zB
        Checks if active a database needs to be updated.
        Nr   )r~   circuitstateCBStateCLOSEDr8   r?   r   r>   r|   r   rN   r@   r%   s    r'   r   z-DefaultCommandExecutor._check_active_database<  s    
 !!)$$,,22gnnD,,q0//8<<>A $(#C#C#K#K#MD ((*	 B 1r*   cmdc                 H    | j                   D ]  }|j                  |        y r6   )rz   register_command_execution)r&   r   detectors      r'   r   z2DefaultCommandExecutor._register_command_executionK  s%    // 	5H//4	5r*   c                     t        | j                        }t               }t               }| j                  j                  t        |gt        ||gi       y)z0
        Registers necessary listeners.
        N)r   rz   r   r   r}   register_listenersr   r   )r&   failure_listenerresubscribe_listenerclose_connection_listeners       r'   r   z.DefaultCommandExecutor._setup_event_dispatcherO  sW     2$2I2IJAC$J$L!11#&6%7%-((	
r*   )r"   r   )r$   ),r-   r.   r/   r   r   r   r	   r   r   r   r   r   rA   r1   r9   r0   rE   rG   rK   rY   r
   r   rN   r2   r   rR   r   rW   r_   rm   rb   r   r   rf   rS   rn   ri   rl   r   r   r   r   r   __classcell__)r   s   @r'   rp   rp      s    "; 6(F('0(' (' 	('
 ,(' 3(' (' (' !&('T 9   '4#8 ' '9_ 9 9 #u # # %,!7 % %     #x/ # # %F %t % % 0,D 0 0D
Me 
M
>#XJ$45
>>E E>
 
 
J+5u 5
r*   rp   N)-abcr   r   r   r   typingr   r   r	   r
   redis.clientr   r   r   redis.eventr   r   redis.multidb.circuitr   r   redis.multidb.configr   redis.multidb.databaser   r   r   redis.multidb.eventr   r   r   r   redis.multidb.failoverr   r   r   r   r   redis.multidb.failure_detectorr   redis.retryr   r!   r4   rC   rp   r$   r*   r'   <module>r      s{    # ( 0 0 = = E 2 ? D D   ; c 
/ 
2T/ TnE
02E E
r*   