
    wgI                     Z    d dl Z d dlZddlmZ 	 d dlZ G d de      Zy# e$ r dZY w xY w)    N   )AsyncPubSubManagerc                   P     e Zd ZdZdZ	 	 d
 fd	Zd Zd Zd Zd Z	d Z
d	 Z xZS )AsyncAioPikaManagera6  Client manager that uses aio_pika for inter-process messaging under
    asyncio.

    This class implements a client manager backend for event sharing across
    multiple processes, using RabbitMQ

    To use a aio_pika backend, initialize the :class:`Server` instance as
    follows::

        url = 'amqp://user:password@hostname:port//'
        server = socketio.Server(client_manager=socketio.AsyncAioPikaManager(
            url))

    :param url: The connection URL for the backend messaging queue. Example
                connection URLs are ``'amqp://guest:guest@localhost:5672//'``
                for RabbitMQ.
    :param channel: The channel name on which the server sends and receives
                    notifications. Must be the same in all the servers.
                    With this manager, the channel name is the exchange name
                    in rabbitmq
    :param write_only: If set to ``True``, only initialize to emit events. The
                       default of ``False`` initializes the class for emitting
                       and receiving.
    asyncaiopikac                     t         t        d      || _        t        j                         | _        d | _        d | _        d | _        t        | )  |||       y )NzRaio_pika package is not installed (Run "pip install aio_pika" in your virtualenv).)channel
write_onlylogger)aio_pikaRuntimeErrorurlasyncioLock_lockpublisher_connectionpublisher_channelpublisher_exchangesuper__init__)selfr   r	   r
   r   	__class__s        c/home/mcse/projects/flask/flask-venv/lib/python3.12/site-packages/socketio/async_aiopika_manager.pyr   zAsyncAioPikaManager.__init__(   s]      . / / \\^
$(!!%"&ZO    c                 \   K   t        j                  | j                         d {   S 7 wN)r   connect_robustr   )r   s    r   _connectionzAsyncAioPikaManager._connection5   s!     ,,TXX6666s   #,*,c                 >   K   |j                          d {   S 7 wr   )r	   )r   
connections     r   _channelzAsyncAioPikaManager._channel8   s     ''))))s   c                    K   |j                  | j                  t        j                  j                         d {   S 7 wr   )declare_exchanger	   r   ExchangeTypeFANOUT)r   r	   s     r   	_exchangezAsyncAioPikaManager._exchange;   s<     --dll.6.C.C.J.JL L 	L Ls   8A?Ac                    K   |j                  dddi       d {   }|j                  |       d {    |S 7 7 w)NFz	x-expiresi )durable	arguments)declare_queuebind)r   r	   exchangequeues       r   _queuezAsyncAioPikaManager._queue?   sM     ++E7BF6K , M Mjj"""M"s   ?;?=??c                   K   | j                   | j                  4 d {    | j                   m| j                          d {   | _         | j                  | j                          d {   | _        | j                  | j                         d {   | _        d d d       d {    d}	 	 | j                  j                  t        j                  t        j                  |      t        j                  j                        d       d {    y 7 7 7 7 7 v# 1 d {  7  sw Y   xY w7 $# t        j                  $ rH |r"| j                         j!                  d       d}n!| j                         j!                  d       Y y Y n4t        j"                  j$                  $ r t'        j(                         w xY ww)NT)bodydelivery_mode*)routing_keyz&Cannot publish to rabbitmq... retryingFz'Cannot publish to rabbitmq... giving up)r   r   r   r!   r   r&   r   publishr   MessagepickledumpsDeliveryMode
PERSISTENTAMQPException_get_loggererror
exceptionsChannelInvalidStateErrorr   CancelledError)r   dataretrys      r   _publishzAsyncAioPikaManager._publishE   s    $$,zz  ,,46:6F6F6H0HD-37==114 .D* 59NN..5 /D+  /--55$$#\\$/&.&;&;&F&F $'	 6    %0H./    )) $$&,, .8 9!E$$&,,AC	 
 &&?? /,,../% s   GDG D!D'D!+D,'D!D	D!G(D)G1A D8 D6D8 GD!D!D!G!D3'D*(D3/G6D8 8AGG1GGc           
       K   | j                          d {   4 d {   }| j                  |       d {   }|j                  d       d {    | j                  |       d {   }| j	                  ||       d {   }d}	 	 |j                         4 d {   }|2 3 d {   }|j                         4 d {    t        j                  |j                         d}d d d       d {    W7 7 7 7 7 7 7 i7 `7 I7 # 1 d {  7  sw Y   }xY w6 d d d       d {  7   n# 1 d {  7  sw Y   nxY wn# t        j                  $ r^ | j                         j                  dj                  |             t        j                   |       d {  7   t#        |dz  d      }Y n4t        j$                  j&                  $ r t        j(                         w xY wk# 1 d {  7  sw Y   y xY ww)Nr   )prefetch_countz3Cannot receive from rabbitmq... retrying in {} secs   <   )r   r!   set_qosr&   r.   iteratorprocessr6   loadsr0   r   r:   r;   r<   formatr   sleepminr=   r>   r?   )r   r    r	   r,   r-   retry_sleep
queue_itermessages           r   _listenzAsyncAioPikaManager._listeng   s    **,, 	3 	3 MM*55G///333!^^G44H++gx88EK3$~~/ 0 0:-7 0 0''.'8 0 0&,ll7<<&@ @./0 0 0 - 	35348
000 0 0 0 0 .80 0 0 0 0
  -- ;$$&,,..4f[.AC "--444"%kAor":K**CC 3!00223 	3 	3 	3sa  HD HDHG8DG8DG8+D,G8D
G8E"D#E&D<)D)-D
.D)1D<DD<
%D	/D<:D
;D< HHG8G8G8
G8ED)D<D<D&DD&"D<*E5D86E<E	EE	
EG8AG3+F.,G3 G81G33G88H
>H?H
H)z#amqp://guest:guest@localhost:5672//socketioFN)__name__
__module____qualname____doc__namer   r   r!   r&   r.   rB   rQ   __classcell__)r   s   @r   r   r      s;    2 D@>BP7*L /D3r   r   )r   r6   async_pubsub_managerr   r   ImportErrorr    r   r   <module>r\      s;      4
r3, r3	  Hs     **