summaryrefslogtreecommitdiffstats
path: root/stator
diff options
context:
space:
mode:
Diffstat (limited to 'stator')
-rw-r--r--stator/graph.py3
-rw-r--r--stator/runner.py52
2 files changed, 34 insertions, 21 deletions
diff --git a/stator/graph.py b/stator/graph.py
index 436638b..424ea49 100644
--- a/stator/graph.py
+++ b/stator/graph.py
@@ -104,6 +104,9 @@ class State:
def __repr__(self):
return f"<State {self.name}>"
+ def __str__(self):
+ return self.name
+
def __eq__(self, other):
if isinstance(other, State):
return self is other
diff --git a/stator/runner.py b/stator/runner.py
index 48549bc..c78437d 100644
--- a/stator/runner.py
+++ b/stator/runner.py
@@ -52,28 +52,11 @@ class StatorRunner:
Config.system = await Config.aload_system()
print(f"{self.handled} tasks processed so far")
print("Running cleaning and scheduling")
- for model in self.models:
- asyncio.create_task(model.atransition_clean_locks())
- asyncio.create_task(model.atransition_schedule_due())
- self.last_clean = time.monotonic()
- # Calculate space left for tasks
+ await self.run_cleanup()
+
self.remove_completed_tasks()
- space_remaining = self.concurrency - len(self.tasks)
- # Fetch new tasks
- for model in self.models:
- if space_remaining > 0:
- for instance in await model.atransition_get_with_lock(
- number=min(space_remaining, self.concurrency_per_model),
- lock_expiry=(
- timezone.now()
- + datetime.timedelta(seconds=self.lock_expiry)
- ),
- ):
- self.tasks.append(
- asyncio.create_task(self.run_transition(instance))
- )
- self.handled += 1
- space_remaining -= 1
+ await self.fetch_and_process_tasks()
+
# Are we in limited run mode?
if self.run_for and (time.monotonic() - self.started) > self.run_for:
break
@@ -92,6 +75,33 @@ class StatorRunner:
print("Complete")
return self.handled
+ async def run_cleanup(self):
+ """
+ Do any transition cleanup tasks
+ """
+ for model in self.models:
+ asyncio.create_task(model.atransition_clean_locks())
+ asyncio.create_task(model.atransition_schedule_due())
+ self.last_clean = time.monotonic()
+
+ async def fetch_and_process_tasks(self):
+ # Calculate space left for tasks
+ space_remaining = self.concurrency - len(self.tasks)
+ # Fetch new tasks
+ for model in self.models:
+ if space_remaining > 0:
+ for instance in await model.atransition_get_with_lock(
+ number=min(space_remaining, self.concurrency_per_model),
+ lock_expiry=(
+ timezone.now() + datetime.timedelta(seconds=self.lock_expiry)
+ ),
+ ):
+ self.tasks.append(
+ asyncio.create_task(self.run_transition(instance))
+ )
+ self.handled += 1
+ space_remaining -= 1
+
async def run_transition(self, instance: StatorModel):
"""
Wrapper for atransition_attempt with fallback error handling