from threading import Thread import functools def timeout(timeout): def deco(func): @functools.wraps(func) def wrapper(*args, **kwargs): res = [Exception('function [%s] timeout [%s seconds] exceeded!' % (func.__name__, timeout))] def newFunc(): try: res[0] = func(*args, **kwargs) except Exception as e: res[0] = e t = Thread(target=newFunc) t.daemon = True try: t.start() t.join(timeout) except Exception as je: print ('error starting thread') raise je ret = res[0] if isinstance(ret, BaseException): raise ret return ret return wrapper return deco if __name__ == "__main__": @timeout(timeout=5) def long_running_function(): import time time.sleep(10) return "Function completed successfully" try: result = long_running_function() print(result) except TimeoutError as e: print(e)