a
    i3e
1                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlZddlZddlmZ ddlmZ ddlmZmZ zddlZW n ey   dZY n0 ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ e	j dkr&ddl!m"Z" nddlm"Z" dd Z#dd Z$dLddZ%dd Z&G dd deZ'G dd deZ(G dd dZ)G dd  d e)e(Z*d!d"d#d$Z+e,ed%r G d&d' d'ej-eZ.G d(d) d)e.eZ/G d*d+ d+e/Z0G d,d- d-e)e0Z1d.d/ Z2ej3d0d1 Z4ej3d!d"d2d3Z5ej3d4dd!d5d6d7Z6d8d9 Z7G d:d; d;ej8Z9G d<d= d=ej:Z;d>d? Z<G d@dA dAe=Z>dBdC Z?G dDdE dEej@Z@ej3dFdG ZAdHdI ZBdJdK ZCdS )MzUtilities shared by tests.    N)mock)
HTTPServer)WSGIRequestHandler
WSGIServer   )base_events)events)futures)	selectors)tasks)	coroutine)loggerwin32)
socketpairc                   C   s   t d u rd S t t jS d S N)ssl
SSLContextPROTOCOL_SSLv23 r   r   E/home/pi/bot/my_env/lib/python3.9/site-packages/asyncio/test_utils.pydummy_ssl_context)   s    r   c                 C   sH   t dd }| }| |}d|_z| | W |  n
|  0 d S )Nc                   S   s   d S r   r   r   r   r   r   once1   s    zrun_briefly.<locals>.onceF)r   Zcreate_taskZ_log_destroy_pendingrun_until_completeclose)loopr   gentr   r   r   run_briefly0   s    

r      c                 C   sP   t   | }| sL|d ur6|t    }|dkr6t | tjd| d qd S )Nr   gMbP?r   )timer	   TimeoutErrorr   r   sleep)r   predtimeoutdeadliner   r   r   	run_until?   s    r&   c                 C   s   |    |   dS )zloop.stop() schedules _raise_stop_error()
    and run_forever() runs until _raise_stop_error() callback.
    this wont work if test waits for some IO events, because
    _raise_stop_error() runs before any of io events callbacks.
    N)stopZrun_foreverr   r   r   r   run_onceI   s    r(   c                   @   s   e Zd Zdd Zdd ZdS )SilentWSGIRequestHandlerc                 C   s   t  S r   )ioStringIOselfr   r   r   
get_stderrU   s    z#SilentWSGIRequestHandler.get_stderrc                 G   s   d S r   r   )r-   formatargsr   r   r   log_messageX   s    z$SilentWSGIRequestHandler.log_messageN)__name__
__module____qualname__r.   r1   r   r   r   r   r)   S   s   r)   c                       s(   e Zd ZdZ fddZdd Z  ZS )SilentWSGIServer   c                    s"   t   \}}|| j ||fS r   superget_request
settimeoutrequest_timeoutr-   requestclient_addr	__class__r   r   r9   `   s    zSilentWSGIServer.get_requestc                 C   s   d S r   r   r-   r=   client_addressr   r   r   handle_errore   s    zSilentWSGIServer.handle_error)r2   r3   r4   r;   r9   rC   __classcell__r   r   r?   r   r5   \   s   r5   c                   @   s   e Zd Zdd ZdS )SSLWSGIServerMixinc                 C   s   t jt jtdd}t j|s>t jt jt jdd}t j|d}t j|d}tj|||dd}z| |||  |	  W n t
y   Y n0 d S )	Nz..teststestZtest_asynciozssl_key.pemzssl_cert.pemT)keyfilecertfileserver_side)ospathjoindirname__file__isdirr   wrap_socketRequestHandlerClassr   OSError)r-   r=   rB   hererH   rI   Zssockr   r   r   finish_requestk   s"    z!SSLWSGIServerMixin.finish_requestN)r2   r3   r4   rU   r   r   r   r   rE   i   s   rE   c                   @   s   e Zd ZdS )SSLWSGIServerNr2   r3   r4   r   r   r   r   rV      s   rV   F)use_sslc                 #   s   dd }|r|n|}|| t   |  j _tj fddd}|  z" V  W       |	  n      |	  0 d S )Nc                 S   s   d}dg}||| dgS )Nz200 OK)zContent-typez
text/plains   Test messager   )environZstart_responsestatusheadersr   r   r   app   s    
z_run_test_server.<locals>.appc                      s    j ddS )Ng?)poll_interval)serve_foreverr   Zhttpdr   r   <lambda>       z"_run_test_server.<locals>.<lambda>)target)
r)   Zset_appserver_addressaddress	threadingThreadstartshutdownserver_closerM   )rd   rX   
server_clsserver_ssl_clsr\   Zserver_classZserver_threadr   r_   r   _run_test_server   s"    



rl   AF_UNIXc                   @   s   e Zd Zdd ZdS )UnixHTTPServerc                 C   s   t j|  d| _d| _d S )N	127.0.0.1P   )socketserverUnixStreamServerserver_bindZserver_nameZserver_portr,   r   r   r   rs      s    zUnixHTTPServer.server_bindN)r2   r3   r4   rs   r   r   r   r   rn      s   rn   c                       s(   e Zd ZdZdd Z fddZ  ZS )UnixWSGIServerr6   c                 C   s   t |  |   d S r   )rn   rs   Zsetup_environr,   r   r   r   rs      s    
zUnixWSGIServer.server_bindc                    s"   t   \}}|| j |dfS )N)ro    r7   r<   r?   r   r   r9      s    zUnixWSGIServer.get_request)r2   r3   r4   r;   rs   r9   rD   r   r   r?   r   rt      s   rt   c                   @   s   e Zd Zdd ZdS )SilentUnixWSGIServerc                 C   s   d S r   r   rA   r   r   r   rC      s    z!SilentUnixWSGIServer.handle_errorN)r2   r3   r4   rC   r   r   r   r   rv      s   rv   c                   @   s   e Zd ZdS )UnixSSLWSGIServerNrW   r   r   r   r   rw      s   rw   c                  C   s2   t  } | jW  d    S 1 s$0    Y  d S r   )tempfileNamedTemporaryFilename)filer   r   r   gen_unix_socket_path   s    
r|   c                  c   s\   t  } z,| V  W zt|  W qX ty0   Y qX0 n$zt|  W n tyT   Y n0 0 d S r   )r|   rK   unlinkrS   )rL   r   r   r   unix_socket_path   s    r~   c                 c   s@   t  &}t|| ttdE d H  W d    n1 s20    Y  d S N)rd   rX   rj   rk   )r~   rl   rv   rw   )rX   rL   r   r   r   run_test_unix_server   s
    r   ro   hostportrX   c                 c   s   t | |f|ttdE d H  d S r   )rl   r5   rV   r   r   r   r   run_test_server   s    
r   c                 C   sL   i }t | D ](}|dr&|dr&qtd d||< qtd| f| j | S )N__return_valueZTestProtocol)dir
startswithendswithMockCallbacktype	__bases__)basedctrz   r   r   r   make_test_protocol   s    r   c                   @   s6   e Zd Zdd ZdddZdd Zdd	 Zd
d ZdS )TestSelectorc                 C   s
   i | _ d S r   keysr,   r   r   r   __init__   s    zTestSelector.__init__Nc                 C   s   t |d||}|| j|< |S )Nr   )r
   SelectorKeyr   )r-   fileobjr   datakeyr   r   r   register   s    
zTestSelector.registerc                 C   s   | j |S r   )r   pop)r-   r   r   r   r   
unregister   s    zTestSelector.unregisterc                 C   s   g S r   r   )r-   r$   r   r   r   select   s    zTestSelector.selectc                 C   s   | j S r   r   r,   r   r   r   get_map  s    zTestSelector.get_map)N)r2   r3   r4   r   r   r   r   r   r   r   r   r   r      s
   
r   c                       s   e Zd ZdZd! fdd	Zdd Zdd Z fd	d
Zdd Zdd Z	dd Z
dd Zdd Zdd Zdd Z fddZ fddZdd Zdd  Z  ZS )"TestLoopa  Loop for unittests.

    It manages self time directly.
    If something scheduled to be executed later then
    on next loop iteration after all ready handlers done
    generator passed to __init__ is calling.

    Generator should be like this:

        def gen():
            ...
            when = yield ...
            ... = yield time_advance

    Value returned by yield is absolute time of next scheduled handler.
    Value passed to yield is time advance to move loop's time forward.
    Nc                    sl   t    |d u r"dd }d| _nd| _| | _t| j d| _d| _g | _t | _	i | _
i | _|   d S )Nc                   s   s
   d V  d S r   r   r   r   r   r   r     s    zTestLoop.__init__.<locals>.genFTr   g&.>)r8   r   _check_on_close_gennext_timeZ_clock_resolution_timersr   	_selectorreaderswritersreset_counters)r-   r   r?   r   r   r     s    

zTestLoop.__init__c                 C   s   | j S r   r   r,   r   r   r   r    -  s    zTestLoop.timec                 C   s   |r|  j |7  _ dS )zMove test time forward.Nr   )r-   advancer   r   r   advance_time0  s    zTestLoop.advance_timec                    s@   t    | jr<z| jd W n ty2   Y n
0 tdd S )Nr   zTime generator is not finished)r8   r   r   r   sendStopIterationAssertionErrorr,   r?   r   r   r   5  s    
zTestLoop.closec                 G   s   t ||| | j|< d S r   )r   Handler   r-   fdcallbackr0   r   r   r   
add_reader?  s    zTestLoop.add_readerc                 C   s0   | j |  d7  < || jv r(| j|= dS dS d S Nr   TF)remove_reader_countr   r-   r   r   r   r   remove_readerB  s
    
zTestLoop.remove_readerc                 G   s^   || j v sJ d|| j | }|j|ks>J d|j||j|ksZJ d|j|d S Nzfd {} is not registeredz{!r} != {!r})r   r/   	_callback_argsr-   r   r   r0   handler   r   r   assert_readerJ  s    
zTestLoop.assert_readerc                 G   s   t ||| | j|< d S r   )r   r   r   r   r   r   r   
add_writerR  s    zTestLoop.add_writerc                 C   s0   | j |  d7  < || jv r(| j|= dS dS d S r   )remove_writer_countr   r   r   r   r   remove_writerU  s
    
zTestLoop.remove_writerc                 G   s^   || j v sJ d|| j | }|j|ks>J d|j||j|ksZJ d|j|d S r   )r   r/   r   r   r   r   r   r   assert_writer]  s    
zTestLoop.assert_writerc                 C   s   t t| _t t| _d S r   )collectionsdefaultdictintr   r   r,   r   r   r   r   e  s    zTestLoop.reset_countersc                    s6   t    | jD ]}| j|}| | qg | _d S r   )r8   	_run_oncer   r   r   r   )r-   whenr   r?   r   r   r   i  s
    

zTestLoop._run_oncec                    s"   | j | t j||g|R  S r   )r   appendr8   call_at)r-   r   r   r0   r?   r   r   r   p  s    zTestLoop.call_atc                 C   s   d S r   r   )r-   Z
event_listr   r   r   _process_eventst  s    zTestLoop._process_eventsc                 C   s   d S r   r   r,   r   r   r   _write_to_selfw  s    zTestLoop._write_to_self)N)r2   r3   r4   __doc__r   r    r   r   r   r   r   r   r   r   r   r   r   r   r   rD   r   r   r?   r   r     s    
r   c                  K   s   t jf ddgi| S )Nspec__call__)r   Mock)kwargsr   r   r   r   {  s    r   c                   @   s   e Zd ZdZdd ZdS )MockPatternzA regex based str with a fuzzy __eq__.

    Use this helper with 'mock.assert_called_with', or anywhere
    where a regex comparison between strings is needed.

    For instance:
       mock_call.assert_called_with(MockPattern('spam.*ham'))
    c                 C   s   t tt| |tjS r   )boolresearchstrS)r-   otherr   r   r   __eq__  s    zMockPattern.__eq__N)r2   r3   r4   r   r   r   r   r   r   r     s   r   c                 C   s$   t | }|d u r td| f |S )Nzunable to get the source of %r)r   Z_get_function_source
ValueError)funcsourcer   r   r   get_function_source  s    
r   c                   @   s,   e Zd ZddddZd
ddZdd	 ZdS )TestCaseT)cleanupc                C   s*   |d usJ t d  |r&| |j d S r   )r   set_event_loopZ
addCleanupr   )r-   r   r   r   r   r   r     s    
zTestCase.set_event_loopNc                 C   s   t |}| | |S r   )r   r   )r-   r   r   r   r   r   new_test_loop  s    
zTestCase.new_test_loopc                 C   s   t d  | t d d S )N)NNN)r   r   assertEqualsysexc_infor,   r   r   r   tearDown  s    
zTestCase.tearDown)N)r2   r3   r4   r   r   r   r   r   r   r   r     s   
r   c               	   c   s<   t j} z$t tjd  dV  W t |  nt |  0 dS )zrContext manager to disable asyncio logger.

    For example, it can be used to ignore warnings in debug mode.
    r   N)r   levelsetLevelloggingCRITICAL)Z	old_levelr   r   r   disable_logger  s
    r   c                  C   s   t tj} d| j_| S )z'Create a mock of a non-blocking socket.g        )r   r   socket
gettimeoutr   )sockr   r   r   mock_nonblocking_socket  s    r   c                   C   s   t jdddS )Nz'asyncio.sslproto._is_sslproto_availableFr   )r   patchr   r   r   r   force_legacy_ssl_support  s    r   )r   )Dr   r   
contextlibr*   r   rK   r   r   rq   r   rx   re   r    Zunittestr   Zhttp.serverr   Zwsgiref.simple_serverr   r   r   ImportErrorru   r   r   r	   r
   r   Z
coroutinesr   logr   platformZwindows_utilsr   r   r   r&   r(   r)   r5   rE   rV   rl   hasattrrr   rn   rt   rv   rw   r|   contextmanagerr~   r   r   r   BaseSelectorr   ZBaseEventLoopr   r   r   r   r   r   r   r   r   r   r   r   r   <module>   s|   



	

v
