python-codewars-solutions/prime_streaming/helper.py

41 lines
1.1 KiB
Python
Raw Normal View History

2024-01-23 13:32:21 +00:00
from threading import Thread
import functools
def timeout(timeout):
def deco(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
res = [Exception('function [%s] timeout [%s seconds] exceeded!' % (func.__name__, timeout))]
def newFunc():
try:
res[0] = func(*args, **kwargs)
except Exception as e:
res[0] = e
t = Thread(target=newFunc)
t.daemon = True
try:
t.start()
t.join(timeout)
except Exception as je:
print ('error starting thread')
raise je
ret = res[0]
if isinstance(ret, BaseException):
raise ret
return ret
return wrapper
return deco
if __name__ == "__main__":
@timeout(timeout=5)
def long_running_function():
import time
time.sleep(10)
return "Function completed successfully"
try:
result = long_running_function()
print(result)
except TimeoutError as e:
print(e)