util.py (6185B)
1""" 2Miscellaneous Utilities 3 4This module provides asyncio utilities and compatibility wrappers for 5Python 3.6 to provide some features that otherwise become available in 6Python 3.7+. 7 8Various logging and debugging utilities are also provided, such as 9`exception_summary()` and `pretty_traceback()`, used primarily for 10adding information into the logging stream. 11""" 12 13import asyncio 14import sys 15import traceback 16from typing import ( 17 Any, 18 Coroutine, 19 Optional, 20 TypeVar, 21 cast, 22) 23 24 25T = TypeVar('T') 26 27 28# -------------------------- 29# Section: Utility Functions 30# -------------------------- 31 32 33async def flush(writer: asyncio.StreamWriter) -> None: 34 """ 35 Utility function to ensure a StreamWriter is *fully* drained. 36 37 `asyncio.StreamWriter.drain` only promises we will return to below 38 the "high-water mark". This function ensures we flush the entire 39 buffer -- by setting the high water mark to 0 and then calling 40 drain. The flow control limits are restored after the call is 41 completed. 42 """ 43 transport = cast(asyncio.WriteTransport, writer.transport) 44 45 # https://github.com/python/typeshed/issues/5779 46 low, high = transport.get_write_buffer_limits() # type: ignore 47 transport.set_write_buffer_limits(0, 0) 48 try: 49 await writer.drain() 50 finally: 51 transport.set_write_buffer_limits(high, low) 52 53 54def upper_half(func: T) -> T: 55 """ 56 Do-nothing decorator that annotates a method as an "upper-half" method. 57 58 These methods must not call bottom-half functions directly, but can 59 schedule them to run. 60 """ 61 return func 62 63 64def bottom_half(func: T) -> T: 65 """ 66 Do-nothing decorator that annotates a method as a "bottom-half" method. 67 68 These methods must take great care to handle their own exceptions whenever 69 possible. If they go unhandled, they will cause termination of the loop. 70 71 These methods do not, in general, have the ability to directly 72 report information to a caller’s context and will usually be 73 collected as a Task result instead. 74 75 They must not call upper-half functions directly. 76 """ 77 return func 78 79 80# ------------------------------- 81# Section: Compatibility Wrappers 82# ------------------------------- 83 84 85def create_task(coro: Coroutine[Any, Any, T], 86 loop: Optional[asyncio.AbstractEventLoop] = None 87 ) -> 'asyncio.Future[T]': 88 """ 89 Python 3.6-compatible `asyncio.create_task` wrapper. 90 91 :param coro: The coroutine to execute in a task. 92 :param loop: Optionally, the loop to create the task in. 93 94 :return: An `asyncio.Future` object. 95 """ 96 if sys.version_info >= (3, 7): 97 if loop is not None: 98 return loop.create_task(coro) 99 return asyncio.create_task(coro) # pylint: disable=no-member 100 101 # Python 3.6: 102 return asyncio.ensure_future(coro, loop=loop) 103 104 105def is_closing(writer: asyncio.StreamWriter) -> bool: 106 """ 107 Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper. 108 109 :param writer: The `asyncio.StreamWriter` object. 110 :return: `True` if the writer is closing, or closed. 111 """ 112 if sys.version_info >= (3, 7): 113 return writer.is_closing() 114 115 # Python 3.6: 116 transport = writer.transport 117 assert isinstance(transport, asyncio.WriteTransport) 118 return transport.is_closing() 119 120 121async def wait_closed(writer: asyncio.StreamWriter) -> None: 122 """ 123 Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper. 124 125 :param writer: The `asyncio.StreamWriter` to wait on. 126 """ 127 if sys.version_info >= (3, 7): 128 await writer.wait_closed() 129 return 130 131 # Python 3.6 132 transport = writer.transport 133 assert isinstance(transport, asyncio.WriteTransport) 134 135 while not transport.is_closing(): 136 await asyncio.sleep(0) 137 138 # This is an ugly workaround, but it's the best I can come up with. 139 sock = transport.get_extra_info('socket') 140 141 if sock is None: 142 # Our transport doesn't have a socket? ... 143 # Nothing we can reasonably do. 144 return 145 146 while sock.fileno() != -1: 147 await asyncio.sleep(0) 148 149 150def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T: 151 """ 152 Python 3.6-compatible `asyncio.run` wrapper. 153 154 :param coro: A coroutine to execute now. 155 :return: The return value from the coroutine. 156 """ 157 if sys.version_info >= (3, 7): 158 return asyncio.run(coro, debug=debug) 159 160 # Python 3.6 161 loop = asyncio.get_event_loop() 162 loop.set_debug(debug) 163 ret = loop.run_until_complete(coro) 164 loop.close() 165 166 return ret 167 168 169# ---------------------------- 170# Section: Logging & Debugging 171# ---------------------------- 172 173 174def exception_summary(exc: BaseException) -> str: 175 """ 176 Return a summary string of an arbitrary exception. 177 178 It will be of the form "ExceptionType: Error Message", if the error 179 string is non-empty, and just "ExceptionType" otherwise. 180 """ 181 name = type(exc).__qualname__ 182 smod = type(exc).__module__ 183 if smod not in ("__main__", "builtins"): 184 name = smod + '.' + name 185 186 error = str(exc) 187 if error: 188 return f"{name}: {error}" 189 return name 190 191 192def pretty_traceback(prefix: str = " | ") -> str: 193 """ 194 Formats the current traceback, indented to provide visual distinction. 195 196 This is useful for printing a traceback within a traceback for 197 debugging purposes when encapsulating errors to deliver them up the 198 stack; when those errors are printed, this helps provide a nice 199 visual grouping to quickly identify the parts of the error that 200 belong to the inner exception. 201 202 :param prefix: The prefix to append to each line of the traceback. 203 :return: A string, formatted something like the following:: 204 205 | Traceback (most recent call last): 206 | File "foobar.py", line 42, in arbitrary_example 207 | foo.baz() 208 | ArbitraryError: [Errno 42] Something bad happened! 209 """ 210 output = "".join(traceback.format_exception(*sys.exc_info())) 211 212 exc_lines = [] 213 for line in output.split('\n'): 214 exc_lines.append(prefix + line) 215 216 # The last line is always empty, omit it 217 return "\n".join(exc_lines[:-1])