U
    Ã@ÛfDf  ã                   @   sX   d dl mZ d dlmZ d dlZeƒ ZG dd„ deƒZG dd„ deƒZ	G dd	„ d	ƒZ
dS )
é    )ÚEvent)ÚgreenthreadNc                   @   s   e Zd ZdZdS )Ú	Collisiona  
    DAGPool raises Collision when you try to launch two greenthreads with the
    same key, or post() a result for a key corresponding to a greenthread, or
    post() twice for the same key. As with KeyError, str(collision) names the
    key in question.
    N)Ú__name__Ú
__module__Ú__qualname__Ú__doc__© r	   r	   úJ/var/www/html/chatgpt/venv/lib/python3.8/site-packages/eventlet/dagpool.pyr      s   r   c                       s(   e Zd ZdZ‡ fdd„Zdd„ Z‡  ZS )ÚPropagateErrora8  
    When a DAGPool greenthread terminates with an exception instead of
    returning a result, attempting to retrieve its value raises
    PropagateError.

    Attributes:

    key
        the key of the greenthread which raised the exception

    exc
        the exception object raised by the greenthread
    c                    s>   d  ||jj|¡}tƒ  |¡ || _||f| _|| _|| _d S )NzPropagateError({}): {}: {})	ÚformatÚ	__class__r   ÚsuperÚ__init__ÚmsgÚargsÚkeyÚexc)Úselfr   r   r   ©r   r	   r
   r   '   s      ÿ
zPropagateError.__init__c                 C   s   | j S ©N)r   ©r   r	   r	   r
   Ú__str__3   s    zPropagateError.__str__)r   r   r   r   r   r   Ú__classcell__r	   r	   r   r
   r      s   r   c                   @   sô   e Zd ZdZe dd¡Zi fdd„Zdd„ Ze	fdd	„Z
e	fd
d„Ze	fdd„Ze	fdd„Zdd„ Zdd„ Zedd„ ƒZdd„ Zdd„ Zdd„ Zdd„ Zdd„ Zd4d!d"„Zd#d$„ Zd5d&d'„Zd(d)„ Zd*d+„ Zd,d-„ Zd.d/„ Zd0d1„ Ze	fd2d3„Zd%S )6ÚDAGPoola¬
  
    A DAGPool is a pool that constrains greenthreads, not by max concurrency,
    but by data dependencies.

    This is a way to implement general DAG dependencies. A simple dependency
    tree (flowing in either direction) can straightforwardly be implemented
    using recursion and (e.g.)
    :meth:`GreenThread.imap() <eventlet.greenthread.GreenThread.imap>`.
    What gets complicated is when a given node depends on several other nodes
    as well as contributing to several other nodes.

    With DAGPool, you concurrently launch all applicable greenthreads; each
    will proceed as soon as it has all required inputs. The DAG is implicit in
    which items are required by each greenthread.

    Each greenthread is launched in a DAGPool with a key: any value that can
    serve as a Python dict key. The caller also specifies an iterable of other
    keys on which this greenthread depends. This iterable may be empty.

    The greenthread callable must accept (key, results), where:

    key
        is its own key

    results
        is an iterable of (key, value) pairs.

    A newly-launched DAGPool greenthread is entered immediately, and can
    perform any necessary setup work. At some point it will iterate over the
    (key, value) pairs from the passed 'results' iterable. Doing so blocks the
    greenthread until a value is available for each of the keys specified in
    its initial dependencies iterable. These (key, value) pairs are delivered
    in chronological order, *not* the order in which they are initially
    specified: each value will be delivered as soon as it becomes available.

    The value returned by a DAGPool greenthread becomes the value for its
    key, which unblocks any other greenthreads waiting on that key.

    If a DAGPool greenthread terminates with an exception instead of returning
    a value, attempting to retrieve the value raises :class:`PropagateError`,
    which binds the key of the original greenthread and the original
    exception. Unless the greenthread attempting to retrieve the value handles
    PropagateError, that exception will in turn be wrapped in a PropagateError
    of its own, and so forth. The code that ultimately handles PropagateError
    can follow the chain of PropagateError.exc attributes to discover the flow
    of that exception through the DAG of greenthreads.

    External greenthreads may also interact with a DAGPool. See :meth:`wait_each`,
    :meth:`waitall`, :meth:`post`.

    It is not recommended to constrain external DAGPool producer greenthreads
    in a :class:`GreenPool <eventlet.greenpool.GreenPool>`: it may be hard to
    provably avoid deadlock.

    .. automethod:: __init__
    .. automethod:: __getitem__
    Ú_Coro)r   Úpendingc                 C   sB   z|  ¡ }W n tk
r$   |}Y nX t|ƒ| _i | _tƒ | _dS )zé
        DAGPool can be prepopulated with an initial dict or iterable of (key,
        value) pairs. These (key, value) pairs are of course immediately
        available for any greenthread that depends on any of those keys.
        N)ÚitemsÚAttributeErrorÚdictÚvaluesÚcorosr   Úevent)r   ZpreloadÚ	iteritemsr	   r	   r
   r   t   s    

zDAGPool.__init__c                 C   s   |   ¡ S )aP  
        waitall() blocks the calling greenthread until there is a value for
        every DAGPool greenthread launched by :meth:`spawn`. It returns a dict
        containing all :class:`preload data <DAGPool>`, all data from
        :meth:`post` and all values returned by spawned greenthreads.

        See also :meth:`wait`.
        )Úwaitr   r	   r	   r
   Úwaitall‹   s    
zDAGPool.waitallc                 C   s   t |  |¡ƒS )a{  
        *keys* is an optional iterable of keys. If you omit the argument, it
        waits for all the keys from :class:`preload data <DAGPool>`, from
        :meth:`post` calls and from :meth:`spawn` calls: in other words, all
        the keys of which this DAGPool is aware.

        wait() blocks the calling greenthread until all of the relevant keys
        have values. wait() returns a dict whose keys are the relevant keys,
        and whose values come from the *preload* data, from values returned by
        DAGPool greenthreads or from :meth:`post` calls.

        If a DAGPool greenthread terminates with an exception, wait() will
        raise :class:`PropagateError` wrapping that exception. If more than
        one greenthread terminates with an exception, it is indeterminate
        which one wait() will raise.

        If an external greenthread posts a :class:`PropagateError` instance,
        wait() will raise that PropagateError. If more than one greenthread
        posts PropagateError, it is indeterminate which one wait() will raise.

        See also :meth:`wait_each_success`, :meth:`wait_each_exception`.
        )r   Ú	wait_each©r   Úkeysr	   r	   r
   r$   —   s    zDAGPool.waitc                 C   s   |   |  |¡¡S )a+  
        *keys* is an optional iterable of keys. If you omit the argument, it
        waits for all the keys from :class:`preload data <DAGPool>`, from
        :meth:`post` calls and from :meth:`spawn` calls: in other words, all
        the keys of which this DAGPool is aware.

        wait_each() is a generator producing (key, value) pairs as a value
        becomes available for each requested key. wait_each() blocks the
        calling greenthread until the next value becomes available. If the
        DAGPool was prepopulated with values for any of the relevant keys, of
        course those can be delivered immediately without waiting.

        Delivery order is intentionally decoupled from the initial sequence of
        keys: each value is delivered as soon as it becomes available. If
        multiple keys are available at the same time, wait_each() delivers
        each of the ready ones in arbitrary order before blocking again.

        The DAGPool does not distinguish between a value returned by one of
        its own greenthreads and one provided by a :meth:`post` call or *preload* data.

        The wait_each() generator terminates (raises StopIteration) when all
        specified keys have been delivered. Thus, typical usage might be:

        ::

            for key, value in dagpool.wait_each(keys):
                # process this ready key and value
            # continue processing now that we've gotten values for all keys

        By implication, if you pass wait_each() an empty iterable of keys, it
        returns immediately without yielding anything.

        If the value to be delivered is a :class:`PropagateError` exception object, the
        generator raises that PropagateError instead of yielding it.

        See also :meth:`wait_each_success`, :meth:`wait_each_exception`.
        )Ú
_wait_eachÚ_get_keyset_for_wait_eachr'   r	   r	   r
   r&   ±   s    'zDAGPool.wait_eachc                 c   s2   |   |  |¡¡D ]\}}t|tƒs||fV  qdS )aÈ  
        wait_each_success() filters results so that only success values are
        yielded. In other words, unlike :meth:`wait_each`, wait_each_success()
        will not raise :class:`PropagateError`. Not every provided (or
        defaulted) key will necessarily be represented, though naturally the
        generator will not finish until all have completed.

        In all other respects, wait_each_success() behaves like :meth:`wait_each`.
        N©Ú_wait_each_rawr*   Ú
isinstancer   ©r   r(   r   Úvaluer	   r	   r
   Úwait_each_successÚ   s    

zDAGPool.wait_each_successc                 c   s2   |   |  |¡¡D ]\}}t|tƒr||fV  qdS )aó  
        wait_each_exception() filters results so that only exceptions are
        yielded. Not every provided (or defaulted) key will necessarily be
        represented, though naturally the generator will not finish until
        all have completed.

        Unlike other DAGPool methods, wait_each_exception() simply yields
        :class:`PropagateError` instances as values rather than raising them.

        In all other respects, wait_each_exception() behaves like :meth:`wait_each`.
        Nr+   r.   r	   r	   r
   Úwait_each_exceptionè   s    
zDAGPool.wait_each_exceptionc                 C   s0   |t k	rt|ƒS t| j ¡ ƒt| j ¡ ƒB S dS )ax  
        wait_each(), wait_each_success() and wait_each_exception() promise
        that if you pass an iterable of keys, the method will wait for results
        from those keys -- but if you omit the keys argument, the method will
        wait for results from all known keys. This helper implements that
        distinction, returning a set() of the relevant keys.
        N)Ú_MISSINGÚsetr!   r(   r    r'   r	   r	   r
   r*   ø   s    z!DAGPool._get_keyset_for_wait_eachc                 c   s(   |   |¡D ]\}}||  |¡fV  q
dS )z£
        When _wait_each() encounters a value of PropagateError, it raises it.

        In all other respects, _wait_each() behaves like _wait_each_raw().
        N)r,   Ú_value_or_raise©r   r   r   r/   r	   r	   r
   r)     s    zDAGPool._wait_eachc                 C   s   t | tƒr| ‚| S r   )r-   r   )r/   r	   r	   r
   r4     s    
zDAGPool._value_or_raisec                 c   sN   |  ¡ D ].}| j |t¡}|tk	r| |¡ ||fV  q|s>qJ| j ¡  q dS )a°  
        pending is a set() of keys for which we intend to wait. THIS SET WILL
        BE DESTRUCTIVELY MODIFIED: as each key acquires a value, that key will
        be removed from the passed 'pending' set.

        _wait_each_raw() does not treat a PropagateError instance specially:
        it will be yielded to the caller like any other value.

        In all other respects, _wait_each_raw() behaves like wait_each().
        N)Úcopyr    Úgetr2   Úremover"   r$   r5   r	   r	   r
   r,     s    
zDAGPool._wait_each_rawc                 O   sZ   || j ks|| jkrt|ƒ‚t|ƒ}tj| j|||  |¡f|ž|Ž}|  ||¡| j |< dS )aÉ  
        Launch the passed *function(key, results, ...)* as a greenthread,
        passing it:

        - the specified *key*
        - an iterable of (key, value) pairs
        - whatever other positional args or keywords you specify.

        Iterating over the *results* iterable behaves like calling
        :meth:`wait_each(depends) <DAGPool.wait_each>`.

        Returning from *function()* behaves like
        :meth:`post(key, return_value) <DAGPool.post>`.

        If *function()* terminates with an exception, that exception is wrapped
        in :class:`PropagateError` with the greenthread's *key* and (effectively) posted
        as the value for that key. Attempting to retrieve that value will
        raise that PropagateError.

        Thus, if the greenthread with key 'a' terminates with an exception,
        and greenthread 'b' depends on 'a', when greenthread 'b' attempts to
        iterate through its *results* argument, it will encounter
        PropagateError. So by default, an uncaught exception will propagate
        through all the downstream dependencies.

        If you pass :meth:`spawn` a key already passed to spawn() or :meth:`post`, spawn()
        raises :class:`Collision`.
        N)	r!   r    r   r3   r   ÚspawnÚ_wrapperr)   r   )r   r   ÚdependsÚfunctionr   Úkwdsr   Znewcoror	   r	   r
   r9   4  s    ÿþþzDAGPool.spawnc              
   O   s~   zHz|||f|ž|Ž}W n, tk
rD } zt||ƒ}W 5 d}~X Y nX W 5 | j |= X z|  ||¡ W n tk
rx   Y nX |S )z™
        This wrapper runs the top-level function in a DAGPool greenthread,
        posting its return value (or PropagateError) to the DAGPool.
        N)r!   Ú	Exceptionr   Úpostr   )r   r<   r   Úresultsr   r=   ÚresultÚerrr	   r	   r
   r:   `  s     
zDAGPool._wrapperc                 O   s,   |  ¡ D ]\}}| j|||f|ž|Ž qdS )a~  
        spawn_many() accepts a single *function* whose parameters are the same
        as for :meth:`spawn`.

        The difference is that spawn_many() accepts a dependency dict
        *depends*. A new greenthread is spawned for each key in the dict. That
        dict key's value should be an iterable of other keys on which this
        greenthread depends.

        If the *depends* dict contains any key already passed to :meth:`spawn`
        or :meth:`post`, spawn_many() raises :class:`Collision`. It is
        indeterminate how many of the other keys in *depends* will have
        successfully spawned greenthreads.
        N)r   r9   )r   r;   r<   r   r=   r   Údepsr	   r	   r
   Ú
spawn_many{  s    zDAGPool.spawn_manyc                 C   s   | j | j ¡  | j |= dS )zŽ
        Kill the greenthread that was spawned with the specified *key*.

        If no such greenthread was spawned, raise KeyError.
        N)r!   r   Úkill)r   r   r	   r	   r
   rE     s    zDAGPool.killFc                 C   sb   | j  |t¡}|tk	r,|jt ¡ k	r,t|ƒ‚|| jkrB|sBt|ƒ‚|| j|< | j ¡  t	ƒ | _dS )aV  
        post(key, value) stores the passed *value* for the passed *key*. It
        then causes each greenthread blocked on its results iterable, or on
        :meth:`wait_each(keys) <DAGPool.wait_each>`, to check for new values.
        A waiting greenthread might not literally resume on every single
        post() of a relevant key, but the first post() of a relevant key
        ensures that it will resume eventually, and when it does it will catch
        up with all relevant post() calls.

        Calling post(key, value) when there is a running greenthread with that
        same *key* raises :class:`Collision`. If you must post(key, value) instead of
        letting the greenthread run to completion, you must first call
        :meth:`kill(key) <DAGPool.kill>`.

        The DAGPool implicitly post()s the return value from each of its
        greenthreads. But a greenthread may explicitly post() a value for its
        own key, which will cause its return value to be discarded.

        Calling post(key, value, replace=False) (the default *replace*) when a
        value for that key has already been posted, by any means, raises
        :class:`Collision`.

        Calling post(key, value, replace=True) when a value for that key has
        already been posted, by any means, replaces the previously-stored
        value. However, that may make it complicated to reason about the
        behavior of greenthreads waiting on that key.

        After a post(key, value1) followed by post(key, value2, replace=True),
        it is unspecified which pending :meth:`wait_each([key...]) <DAGPool.wait_each>`
        calls (or greenthreads iterating over *results* involving that key)
        will observe *value1* versus *value2*. It is guaranteed that
        subsequent wait_each([key...]) calls (or greenthreads spawned after
        that point) will observe *value2*.

        A successful call to
        post(key, :class:`PropagateError(key, ExceptionSubclass) <PropagateError>`)
        ensures that any subsequent attempt to retrieve that key's value will
        raise that PropagateError instance.
        N)
r!   r7   r2   r   Z
getcurrentr   r    r"   Úsendr   )r   r   r/   ÚreplaceÚcoror	   r	   r
   r?   š  s    ,

zDAGPool.postc                 C   s    |   |f¡D ]\}}|  S dS )zw
        __getitem__(key) (aka dagpool[key]) blocks until *key* has a value,
        then delivers that value.
        N)r&   )r   r   Ú_r/   r	   r	   r
   Ú__getitem__à  s    zDAGPool.__getitem__Nc                 C   s   |   | j ||¡¡S )zy
        get() returns the value for *key*. If *key* does not yet have a value,
        get() returns *default*.
        )r4   r    r7   )r   r   Údefaultr	   r	   r
   r7   ë  s    zDAGPool.getc                 C   s   t | j ¡ ƒS )zU
        Return a snapshot tuple of keys for which we currently have values.
        )Útupler    r(   r   r	   r	   r
   r(   ò  s    zDAGPool.keysc                    s   t ‡ fdd„ˆ j ¡ D ƒƒS )zT
        Return a snapshot tuple of currently-available (key, value) pairs.
        c                 3   s    | ]\}}|ˆ   |¡fV  qd S r   )r4   )Ú.0r   r/   r   r	   r
   Ú	<genexpr>   s   ÿz DAGPool.items.<locals>.<genexpr>)rL   r    r   r   r	   r   r
   r   ú  s    ÿzDAGPool.itemsc                 C   s
   t | jƒS )zÞ
        Return number of running DAGPool greenthreads. This includes
        greenthreads blocked while iterating through their *results* iterable,
        that is, greenthreads waiting on values from other keys.
        )Úlenr!   r   r	   r	   r
   Úrunning  s    zDAGPool.runningc                 C   s   t | j ¡ ƒS )zÝ
        Return keys for running DAGPool greenthreads. This includes
        greenthreads blocked while iterating through their *results* iterable,
        that is, greenthreads waiting on values from other keys.
        )rL   r!   r(   r   r	   r	   r
   Úrunning_keys  s    zDAGPool.running_keysc                 C   s   t |  ¡ ƒS )a  
        Return number of waiting DAGPool greenthreads, that is, greenthreads
        still waiting on values from other keys. This explicitly does *not*
        include external greenthreads waiting on :meth:`wait`,
        :meth:`waitall`, :meth:`wait_each`.
        )rO   Úwaiting_forr   r	   r	   r
   Úwaiting  s    zDAGPool.waitingc                    sh   t | j ¡ ƒ‰ |tk	rF| j |t¡}|tkr<| j|  t ƒ S |jˆ  S dd„ ‡ fdd„| j ¡ D ƒD ƒS )aö  
        waiting_for(key) returns a set() of the keys for which the DAGPool
        greenthread spawned with that *key* is still waiting. If you pass a
        *key* for which no greenthread was spawned, waiting_for() raises
        KeyError.

        waiting_for() without argument returns a dict. Its keys are the keys
        of DAGPool greenthreads still waiting on one or more values. In the
        returned dict, the value of each such key is the set of other keys for
        which that greenthread is still waiting.

        This method allows diagnosing a "hung" DAGPool. If certain
        greenthreads are making no progress, it's possible that they are
        waiting on keys for which there is no greenthread and no :meth:`post` data.
        c                 S   s   i | ]\}}|r||“qS r	   r	   )rM   r   r   r	   r	   r
   Ú
<dictcomp>V  s   ý z'DAGPool.waiting_for.<locals>.<dictcomp>c                 3   s    | ]\}}||j ˆ  fV  qd S r   )r   )rM   r   rH   ©Ú	availabler	   r
   rN   W  s   ÿz&DAGPool.waiting_for.<locals>.<genexpr>)r3   r    r(   r2   r!   r7   r   r   )r   r   rH   r	   rU   r
   rR   $  s    


ÿÿzDAGPool.waiting_for)F)N) r   r   r   r   ÚcollectionsÚ
namedtupler   r   r%   r2   r$   r&   r0   r1   r*   r)   Ústaticmethodr4   r,   r9   r:   rD   rE   r?   rJ   r7   r(   r   rP   rQ   rS   rR   r	   r	   r	   r
   r   7   s4   :)	
,
F
	
r   )Zeventlet.eventr   Zeventletr   rW   Úobjectr2   r>   r   r   r   r	   r	   r	   r
   Ú<module>   s   
