diff options
Diffstat (limited to 'stator')
-rw-r--r-- | stator/graph.py | 3 | ||||
-rw-r--r-- | stator/runner.py | 52 |
2 files changed, 34 insertions, 21 deletions
diff --git a/stator/graph.py b/stator/graph.py index 436638b..424ea49 100644 --- a/stator/graph.py +++ b/stator/graph.py @@ -104,6 +104,9 @@ class State: def __repr__(self): return f"<State {self.name}>" + def __str__(self): + return self.name + def __eq__(self, other): if isinstance(other, State): return self is other diff --git a/stator/runner.py b/stator/runner.py index 48549bc..c78437d 100644 --- a/stator/runner.py +++ b/stator/runner.py @@ -52,28 +52,11 @@ class StatorRunner: Config.system = await Config.aload_system() print(f"{self.handled} tasks processed so far") print("Running cleaning and scheduling") - for model in self.models: - asyncio.create_task(model.atransition_clean_locks()) - asyncio.create_task(model.atransition_schedule_due()) - self.last_clean = time.monotonic() - # Calculate space left for tasks + await self.run_cleanup() + self.remove_completed_tasks() - space_remaining = self.concurrency - len(self.tasks) - # Fetch new tasks - for model in self.models: - if space_remaining > 0: - for instance in await model.atransition_get_with_lock( - number=min(space_remaining, self.concurrency_per_model), - lock_expiry=( - timezone.now() - + datetime.timedelta(seconds=self.lock_expiry) - ), - ): - self.tasks.append( - asyncio.create_task(self.run_transition(instance)) - ) - self.handled += 1 - space_remaining -= 1 + await self.fetch_and_process_tasks() + # Are we in limited run mode? if self.run_for and (time.monotonic() - self.started) > self.run_for: break @@ -92,6 +75,33 @@ class StatorRunner: print("Complete") return self.handled + async def run_cleanup(self): + """ + Do any transition cleanup tasks + """ + for model in self.models: + asyncio.create_task(model.atransition_clean_locks()) + asyncio.create_task(model.atransition_schedule_due()) + self.last_clean = time.monotonic() + + async def fetch_and_process_tasks(self): + # Calculate space left for tasks + space_remaining = self.concurrency - len(self.tasks) + # Fetch new tasks + for model in self.models: + if space_remaining > 0: + for instance in await model.atransition_get_with_lock( + number=min(space_remaining, self.concurrency_per_model), + lock_expiry=( + timezone.now() + datetime.timedelta(seconds=self.lock_expiry) + ), + ): + self.tasks.append( + asyncio.create_task(self.run_transition(instance)) + ) + self.handled += 1 + space_remaining -= 1 + async def run_transition(self, instance: StatorModel): """ Wrapper for atransition_attempt with fallback error handling |