51 lines
1.4 KiB
Python
51 lines
1.4 KiB
Python
import time
|
|
import datetime
|
|
import logging
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
|
)
|
|
|
|
|
|
class Job:
|
|
def __init__(self, target, interval, times: int):
|
|
self.target = target
|
|
self.interval = interval
|
|
self.next_run = datetime.datetime.now() + interval
|
|
self.times = times
|
|
self.last_run = None
|
|
|
|
@property
|
|
def should_run(self):
|
|
if self.times == 0:
|
|
return False
|
|
return datetime.datetime.now() >= self.next_run
|
|
|
|
def run(self):
|
|
logging.info(f"Running job {self}")
|
|
self.last_run = datetime.datetime.now()
|
|
self.target()
|
|
self.times -= 1
|
|
self.next_run = self.last_run + self.interval
|
|
logging.debug(f"Last run {self.last_run}")
|
|
logging.debug(f"Next run {self.next_run}")
|
|
logging.debug(f"Should run {self.should_run}")
|
|
|
|
|
|
class Scheduler:
|
|
jobs: [Job] = []
|
|
|
|
def every(self, interval: datetime.timedelta, target, times=-1):
|
|
self.jobs.append(Job(target, interval, times))
|
|
|
|
def at(self, time, target):
|
|
interval = time - datetime.datetime.now()
|
|
self.jobs.append(Job(target, interval, 1))
|
|
|
|
def run(self):
|
|
for job in self.jobs:
|
|
if job.should_run:
|
|
job.run()
|
|
# else:
|
|
# logging.info("No jobs to run…")
|