
    ga<                         d Z ddlZddlZddlZddlZddlZddlZddlZddlm	Z	 dZ
 G d d      Z e       Z e       Z G d dej                        Z G d	 d
      Zy)z8A thread-based worker pool.

.. spelling::

   joinable
    N)	pass_none)WorkerThread
ThreadPoolc                       e Zd ZdZd Zd Zy)	TrueyZerozDObject which equals and does math like the integer 0 but evals True.c                     |S N selfothers     //opt/Tautulli/lib/cheroot/workers/threadpool.py__add__zTrueyZero.__add__           c                     |S r	   r
   r   s     r   __radd__zTrueyZero.__radd__   r   r   N)__name__
__module____qualname____doc__r   r   r
   r   r   r   r      s    Nr   r   c                   4    e Zd ZdZdZ	 dZ	 dZ	 d Zd Zd Z	y)r   aF  Thread which continuously polls a Queue for Connection objects.

    Due to the timing issues of polling a Queue, a WorkerThread does not
    check its own 'ready' flag after it has started. To stop the thread,
    it is necessary to stick a _SHUTDOWNREQUEST object onto the Queue
    (one for each running WorkerThread).
    NFc                      d _         | _        d _        d _        d _        d _        d _         fd fd fd fdd d	 d
 _        t        j                  j                          y)zInitialize WorkerThread instance.

        Args:
            server (cheroot.server.HTTPServer): web server object
                receiving this request
        Fr   Nc                 z    j                   j                  d u xr t        xs j                  j                   z   S r	   )requests_seen
start_time	trueyzeroconnsr   s    r   <lambda>z'WorkerThread.__init__.<locals>.<lambda>I   s8    $"4"44' +99**# r   c                     j                   j                  d u xr t        xs  j                  j                  j                   z   S r	   )
bytes_readr   r   r   rfiler   s    r   r!   z'WorkerThread.__init__.<locals>.<lambda>N   s:    DOO4' .99??--% r   c                     j                   j                  d u xr t        xs  j                  j                  j                   z   S r	   )bytes_writtenr   r   r   wfiler   s    r   r!   z'WorkerThread.__init__.<locals>.<lambda>S   s<    t'9'94' 199??00( r   c                     j                   j                  d u xr t        xs! t        j                         j                  z
  z   S r	   )	work_timer   r   timer   s    r   r!   z'WorkerThread.__init__.<locals>.<lambda>X   s;    4>>4' 199;0$ r   c                 8     | d   |        | d   |       xs dz  S )N
Bytes Read	Work Timeư>r
   r    s    r   r!   z'WorkerThread.__init__.<locals>.<lambda>]   s)    <);+q!)T* r   c                 8     | d   |        | d   |       xs dz  S )NBytes Writtenr-   r.   r
   r/   s    r   r!   z'WorkerThread.__init__.<locals>.<lambda>`   s+    *<!O*<Q*?+q!)T+ r   )Requestsr,   r1   r-   zRead ThroughputzWrite Throughput)readyserverr   r#   r&   r   r)   stats	threadingThread__init__)r   r4   s   ` r   r8   zWorkerThread.__init__9   su     




 !1

8 	!!$'r   c                 ,   | j                   | j                  j                   d   | j                  <   d| _        	 | j	                          d	| _        y# t
        t        f$ rY}|j                  xs |}| j                  j                  d|t        j                         || j                  _        Y d}~kd}~wt        $ rG}| j                  j                  d|dt        j                  d       || j                  _         d}~ww xY w# d	| _        w xY w)
a0  Set up incoming HTTP connection processing loop.

        This is the thread's entry-point. It performs lop-layer
        exception handling and interrupt processing.
        :exc:`KeyboardInterrupt` and :exc:`SystemExit` bubbling up
        from the inner-layer code constitute a global server interrupt
        request. When they happen, the worker thread exits.

        :raises BaseException: when an unexpected non-interrupt
                               exception leaks from the inner layers

        # noqa: DAR401 KeyboardInterrupt SystemExit
        zWorker ThreadsTz%Setting the server interrupt flag to levelNzAA fatal exception happened. Setting the server interrupt flag to z and giving up.

Please, report this on the Cheroot tracker at <https://github.com/cherrypy/cheroot/issues/new/choose>, providing a full reproducer with as much context and details as possible.r;   	tracebackF)r5   r4   namer3   &_process_connections_until_interruptedKeyboardInterrupt
SystemExit	__cause__	error_logloggingDEBUG	interruptBaseExceptionCRITICAL)r   interrupt_excinterrupt_causeunderlying_excs       r   runzWorkerThread.runf   s
    :>*+DII6
	7792 DJ1 ":. 	4+55FOKK!!77KLmm "  %4DKK!! 	 KK!!%) *\\ && " 	 %3DKK!	" DJs7   A D AB4/D
 4D ADDD
 
	Dc           
      P
   	 | j                   j                  j                         }|t        u ry|| _        | j                   j
                  d   }|rt        j                         | _        d}	 |j                         }	 |r| j                   j                  |       n|j!                          |r| xj"                  |j"                  z  c_        | xj$                  |j&                  j$                  z  c_        | xj(                  |j*                  j(                  z  c_        | xj,                  t        j                         | j                  z
  z  c_        d| _        d| _        a# t        $ r9}d}| j                   j                  d|j                  d|j                  d|dt        j                  	       Y d}~|r| j                   j                  |       n|j!                          |r| xj"                  |j"                  z  c_        | xj$                  |j&                  j$                  z  c_        | xj(                  |j*                  j(                  z  c_        | xj,                  t        j                         | j                  z
  z  c_        d| _        d| _        d}~wt.        t0        f$ re}d}| j                   j                  d
|j                  d|j                  d|dt        j2                  	       t1        t5        |            |d}~wt6        $ r}| j                   j                  d|t        j8                  d       Y d}~|r| j                   j                  |       n|j!                          |r| xj"                  |j"                  z  c_        | xj$                  |j&                  j$                  z  c_        | xj(                  |j*                  j(                  z  c_        | xj,                  t        j                         | j                  z
  z  c_        d| _        d| _        7d}~ww xY w# |r| j                   j                  |       n|j!                          |r| xj"                  |j"                  z  c_        | xj$                  |j&                  j$                  z  c_        | xj(                  |j*                  j(                  z  c_        | xj,                  t        j                         | j                  z
  z  c_        d| _        d| _        w xY w)a  Process incoming HTTP connections in an infinite loop.

        Retrieves incoming connections from thread pool, processing
        them one by one.

        :raises SystemExit: on the internal requests to stop the
                            server instance
        TNEnabledFz8Got a connection error while handling a connection from :z ()r:   z?Got a server shutdown request while handling a connection from z8Unhandled error while processing an incoming connection r<   )r4   requestsget_SHUTDOWNREQUESTr   r5   r*   r   communicateConnectionErrorrC   remote_addrremote_portrD   INFOput_conncloser   r#   r$   r&   r'   r)   r@   rA   rE   strrG   ERROR)r   r   is_stats_enabledkeep_conn_openconnection_errorshutdown_requestunhandled_errors          r   r?   z3WorkerThread._process_connections_until_interrupted   s    ;;''++-D''DI#{{00;"&))+"N:!!%!1!1!3^ "KK((.JJL#&&$*<*<<&OOtzz'<'<<O&&$***B*BB&NNdiikDOO&CCN&*DO 	I  # !&%%''+'7'7&;1''+2.>-B!E ",,	 &  L "KK((.JJL#&&$*<*<<&OOtzz'<'<<O&&$***B*BB&NNdiikDOO&CCN&*DO 	_ &z2 (!&%%''+'7'7&;1''+2.>-B!E "--	 &  !()'( !  %%""1!57!--"	 &   "KK((.JJL#&&$*<*<<&OOtzz'<'<<O&&$***B*BB&NNdiikDOO&CCN&*DO 	G2 "KK((.JJL#&&$*<*<<&OOtzz'<'<<O&&$***B*BB&NNdiikDOO&CCN&*DO 	sK   -E# #
P=-AJ!8Q  !P=3A LP= /P8Q  8P==Q   C%T%)
r   r   r   r   r   r4   r3   r8   rL   r?   r
   r   r   r   r   %   s9     D?F8 E$+(Z*XM!r   r   c                       e Zd ZdZ	 	 ddZd Zed        Zd Zd Z	d Z
d Zd	 Zdd
Zeed               Zd Zed        Zy)r   zA Request Queue for an HTTPServer which pools threads.

    ThreadPool objects must provide min, get(), put(obj), start()
    and stop(timeout) attributes.
    c                    |dk  rt        d|d      |t        d      k(  rn4t        |t              r|dk(  rt	        d|d      |dk  rt        d      }||k  rt        d|d	|d
      || _        || _        || _        g | _        t        j                  |      | _        || _        | j                  j                  | _        t        j                         | _        y)an  Initialize HTTP requests queue instance.

        Args:
            server (cheroot.server.HTTPServer): web server object
                receiving this request
            min (int): minimum number of worker threads
            max (int): maximum number of worker threads (-1/inf for no max)
            accepted_queue_size (int): maximum number of active
                requests in queue
            accepted_queue_timeout (int): timeout for putting request
                into queue

        :raises ValueError: if the min/max values are invalid
        :raises TypeError: if the max is not an integer or inf
           zmin=z must be > 0infr   zIExpected an integer or the infinity value for the `max` argument but got .zmax=z must be > min=z (or infinity for no max))maxsizeN)
ValueErrorfloat
isinstanceint	TypeErrorr4   minmax_threadsqueueQueue_queue_queue_put_timeoutrR   collectionsdeque_pending_shutdowns)r   r4   rm   rn   accepted_queue_sizeaccepted_queue_timeouts         r   r8   zThreadPool.__init__   s    & 7tC7,788%,C%$$'7!-  1W,C9sg_SG3LM  kk*=>"8;;??"-"3"3"5r   c                 h    | j                   rt        d      | j                  | j                         y)zaStart the pool of threads.

        :raises RuntimeError: if the pool is already started
        z%Threadpools can only be started once.N)ro   RuntimeErrorgrowrm   r   s    r   startzThreadPool.start  s'    
 ==FGG		$((r   c                     t        | j                  D cg c]  }|j                  | c}      }t        |t        | j                        z
  d      S c c}w )z3Number of worker threads which are idle. Read-only.r   )lenro   r   rn   rv   )r   tidless      r   idlezThreadPool.idle   sH     @1Q@A53t6677;; As
   AAc                 T    | j                   j                  |d| j                         y)zPut request into queue.

        Args:
            obj (:py:class:`~cheroot.server.HTTPConnection`): HTTP connection
                waiting to be processed
        T)blocktimeoutN)rr   putrs   )r   objs     r   r   zThreadPool.put&  s      	41H1HIr   c                     | j                   D cg c]  }|j                         r| c}D ]8  }| j                   j                  |       	 | j                  j	                          : y c c}w # t
        $ r Y Lw xY wr	   )ro   is_aliveremoverv   popleft
IndexError)r   r   s     r   _clear_dead_threadszThreadPool._clear_dead_threads/  sh    !]]?!**,!? 	AMM  #''//1	?  s   A(A(A--	A98A9c                 l   t        | j                   t        | j                        z
  d      }t        ||      }t	        |      D cg c]  }| j                          }}|D ]1  }|j                  rt        j                  d       |j                  s"3 | j                  j                  |       yc c}w )z.Spawn new worker threads (not above self.max).r   g?N)
rn   r   ro   rm   range_spawn_workerr3   r*   sleepextend)r   amountbudgetn_newiworkersworkers          r   r{   zThreadPool.grow8  s    TXXDMM 22A6FF#16u>A4%%'>> 	Fll

2 ll	 	W%	 ?s   B1c                     t        | j                        }dj                  |j                        |_        |j	                          |S )NzCP Server {worker_name!s})worker_name)r   r4   formatr>   r}   )r   r   s     r   r   zThreadPool._spawn_workerC  s:    dkk*'Fv{{F+ 	 	r   c                 d   |t        | j                        z  }| j                          |dk  ryt        t        | j                        | j
                  z
  d      }t        ||      }t        |      D ]<  }| j                  j                  d       | j                  j                  t               > y)z-Kill off worker threads (not below self.min).r   N)r   rv   r   rn   ro   rm   r   appendrr   r   rS   )r   r   n_extran_to_remove_s        r   shrinkzThreadPool.shrinkL  s     	#d--..  "Q; c$--(4883Q7 &'*
 {# 	.A##**40KKOO,-	.r   c                    ||dk  rd}t        j                  dd       |t        j                         |z   }| j                  D ]!  }| j                  j                  t               # t        t        f}| j                         D ]j  }|xr t        j                         z
  }	 |j                  |       |j                         r+| j                  |j                         |j                          l y# |$ r Y uw xY w)z|Terminate all worker threads.

        Args:
            timeout (int): time to wait for threads to stop gracefully
        Nr   zZIn the future, negative timeouts to Server.stop() will be equivalent to a timeout of zero.   )
stacklevel)warningswarningr*   ro   rr   r   rS   rz   r@   _clear_threadsjoinr   _force_closer   )r   r   endtimer   ignored_errorsremaining_times         r   stopzThreadPool.stopb  s     7Q;G; iikG+G mm 	.FKKOO,-	. 
 ))+ 		F$>499;)>NN+??$%%fkk2KKM		 " s   %AC44C<;C<c                     | j                   j                  ry 	 	 | j                  j                  t        j                         y # t
        $ r | j                  j                          Y y w xY w# t        $ r Y y w xY wr	   )r$   closedsocketshutdownSHUT_RDrl   OSError)r   s    r   r   zThreadPool._force_close  sh     ::		'$$V^^4 '$$&'  	 	s)   )A #A*'A- )A**A- -	A98A9c                 R    | j                   dd g c}| j                   dd d |D        S )z3Clear self._threads and yield all joinable threads.Nc              3   L   K   | ]  }|t        j                         ur|  y wr	   )r6   current_thread).0threads     r   	<genexpr>z,ThreadPool._clear_threads.<locals>.<genexpr>  s)      
Y5577 
s   "$)ro   )r   threadss     r   r   zThreadPool._clear_threads  s4     %)MM!$4b!q!
!
 	
r   c                 6    | j                   j                         S )zReturn the queue size.)rr   qsizer|   s    r   r   zThreadPool.qsize  s     {{  ""r   N)
   r   r   )   )r   r   r   r   r8   r}   propertyr   r   r   r{   r   r   r   staticmethodr   r   r   r   r
   r   r   r   r      s     ?A#%,6\ < <
J	&.,)V   
 # #r   r   )r   rt   rD   r6   r*   r   r   rp   jaraco.functoolsr   __all__r   r   objectrS   r7   r   r   r
   r   r   <module>r      sg           & )  K	8 z!9## z!zH# H#r   