
    wgT	                     ~    d dl Z d dlZ	 d dlZddlmZ  e j                  d      Z G d de      Zy# e$ r dZY -w xY w)    N   )PubSubManagersocketioc                   >     e Zd ZdZdZ	 	 d fd	Zd Zd Zd Z xZ	S )KafkaManagera  Kafka based client manager.

    This class implements a Kafka backend for event sharing across multiple
    processes.

    To use a Kafka backend, initialize the :class:`Server` instance as
    follows::

        url = 'kafka://hostname:port'
        server = socketio.Server(client_manager=socketio.KafkaManager(url))

    :param url: The connection URL for the Kafka server. For a default Kafka
                store running on the same host, use ``kafka://``. For a highly
                available deployment of Kafka, pass a list with all the
                connection URLs available in your cluster.
    :param channel: The channel name (topic) on which the server sends and
                    receives notifications. Must be the same in all the
                    servers.
    :param write_only: If set to ``True``, only initialize to emit events. The
                       default of ``False`` initializes the class for emitting
                       and receiving.
    kafkac                 f   t         t        d      t        |   ||       t	        |t
              r|gn|}|D cg c]  }|dk7  r|dd  nd c}| _        t        j                  | j                        | _        t        j                  | j                  | j                        | _        y c c}w )NzZkafka-python package is not installed (Run "pip install kafka-python" in your virtualenv).)channel
write_onlyzkafka://   zlocalhost:9092)bootstrap_servers)r   RuntimeErrorsuper__init__
isinstancestr
kafka_urlsKafkaProducerproducerKafkaConsumerr
   consumer)selfurlr
   r   urls	__class__s        [/home/mcse/projects/flask/flask-venv/lib/python3.12/site-packages/socketio/kafka_manager.pyr   zKafkaManager.__init__'   s    =  . / / 	Z@"3,u#&*," '*Z&73qr7=MM ,++dooN++DLL>BooO,s   B.c                     | j                   j                  | j                  t        j                  |             | j                   j                          y )N)value)r   sendr
   pickledumpsflush)r   datas     r   _publishzKafkaManager._publish7   s6    4<<v||D/AB    c              #   8   K   | j                   E d {    y 7 wN)r   )r   s    r   _kafka_listenzKafkaManager._kafka_listen;   s     ==  s   c              #      K   | j                         D ]=  }|j                  | j                  k(  st        j                  |j
                         ? y wr'   )r(   topicr
   r    loadsr   )r   messages     r   _listenzKafkaManager._listen>   sA     ))+ 	2G}},ll7==11	2s
   -A$A)zkafka://localhost:9092r   F)
__name__
__module____qualname____doc__namer   r$   r(   r-   __classcell__)r   s   @r   r   r      s*    , D=G!O !2r%   r   )	loggingr    r   ImportErrorpubsub_managerr   	getLoggerloggerr    r%   r   <module>r:      sL      *			:	&32= 32  Es   2 <<