import asyncio import datetime import time import traceback import uuid from typing import List, Optional, Type from django.utils import timezone from core import exceptions from core.models import Config from stator.models import StatorModel class StatorRunner: """ Runs tasks on models that are looking for state changes. Designed to run either indefinitely, or just for a few seconds. """ def __init__( self, models: List[Type[StatorModel]], concurrency: int = 50, concurrency_per_model: int = 10, liveness_file: Optional[str] = None, schedule_interval: int = 30, lock_expiry: int = 300, run_for: int = 0, ): self.models = models self.runner_id = uuid.uuid4().hex self.concurrency = concurrency self.concurrency_per_model = concurrency_per_model self.liveness_file = liveness_file self.schedule_interval = schedule_interval self.lock_expiry = lock_expiry self.run_for = run_for async def run(self): self.handled = 0 self.started = time.monotonic() self.last_clean = time.monotonic() - self.schedule_interval self.tasks = [] # For the first time period, launch tasks print("Running main task loop") try: while True: # Do we need to do cleaning? if (time.monotonic() - self.last_clean) >= self.schedule_interval: # Refresh the config Config.system = await Config.aload_system() print(f"{self.handled} tasks processed so far") print("Running cleaning and scheduling") await self.run_cleanup() self.remove_completed_tasks() await self.fetch_and_process_tasks() # Are we in limited run mode? if self.run_for and (time.monotonic() - self.started) > self.run_for: break # Prevent busylooping await asyncio.sleep(0.5) except KeyboardInterrupt: pass # Wait for tasks to finish print("Waiting for tasks to complete") while True: self.remove_completed_tasks() if not self.tasks: break # Prevent busylooping await asyncio.sleep(0.1) print("Complete") return self.handled async def run_cleanup(self): """ Do any transition cleanup tasks """ for model in self.models: asyncio.create_task(model.atransition_clean_locks()) asyncio.create_task(model.atransition_schedule_due()) self.last_clean = time.monotonic() async def fetch_and_process_tasks(self): # Calculate space left for tasks space_remaining = self.concurrency - len(self.tasks) # Fetch new tasks for model in self.models: if space_remaining > 0: for instance in await model.atransition_get_with_lock( number=min(space_remaining, self.concurrency_per_model), lock_expiry=( timezone.now() + datetime.timedelta(seconds=self.lock_expiry) ), ): self.tasks.append( asyncio.create_task(self.run_transition(instance)) ) self.handled += 1 space_remaining -= 1 async def run_transition(self, instance: StatorModel): """ Wrapper for atransition_attempt with fallback error handling """ try: print( f"Attempting transition on {instance._meta.label_lower}#{instance.pk} from state {instance.state}" ) await instance.atransition_attempt() except BaseException as e: await exceptions.acapture_exception(e) traceback.print_exc() def remove_completed_tasks(self): """ Removes all completed asyncio.Tasks from our local in-progress list """ self.tasks = [t for t in self.tasks if not t.done()]