From 6c7ddedd342553b53dd98c8de9cbe9e8e2e8cd7c Mon Sep 17 00:00:00 2001
From: Michael Manfre
Date: Sun, 27 Nov 2022 13:09:46 -0500
Subject: Post editing

---
 stator/graph.py  |  3 +++
 stator/runner.py | 52 +++++++++++++++++++++++++++++++---------------------
 2 files changed, 34 insertions(+), 21 deletions(-)

(limited to 'stator')

diff --git a/stator/graph.py b/stator/graph.py
index 436638b..424ea49 100644
--- a/stator/graph.py
+++ b/stator/graph.py
@@ -104,6 +104,9 @@ class State:
     def __repr__(self):
         return f"<State {self.name}>"
 
+    def __str__(self):
+        return self.name
+
     def __eq__(self, other):
         if isinstance(other, State):
             return self is other
diff --git a/stator/runner.py b/stator/runner.py
index 48549bc..c78437d 100644
--- a/stator/runner.py
+++ b/stator/runner.py
@@ -52,28 +52,11 @@ class StatorRunner:
                     Config.system = await Config.aload_system()
                     print(f"{self.handled} tasks processed so far")
                     print("Running cleaning and scheduling")
-                    for model in self.models:
-                        asyncio.create_task(model.atransition_clean_locks())
-                        asyncio.create_task(model.atransition_schedule_due())
-                    self.last_clean = time.monotonic()
-                # Calculate space left for tasks
+                    await self.run_cleanup()
+
                 self.remove_completed_tasks()
-                space_remaining = self.concurrency - len(self.tasks)
-                # Fetch new tasks
-                for model in self.models:
-                    if space_remaining > 0:
-                        for instance in await model.atransition_get_with_lock(
-                            number=min(space_remaining, self.concurrency_per_model),
-                            lock_expiry=(
-                                timezone.now()
-                                + datetime.timedelta(seconds=self.lock_expiry)
-                            ),
-                        ):
-                            self.tasks.append(
-                                asyncio.create_task(self.run_transition(instance))
-                            )
-                            self.handled += 1
-                            space_remaining -= 1
+                await self.fetch_and_process_tasks()
+
                 # Are we in limited run mode?
                 if self.run_for and (time.monotonic() - self.started) > self.run_for:
                     break
@@ -92,6 +75,33 @@ class StatorRunner:
         print("Complete")
         return self.handled
 
+    async def run_cleanup(self):
+        """
+        Do any transition cleanup tasks
+        """
+        for model in self.models:
+            asyncio.create_task(model.atransition_clean_locks())
+            asyncio.create_task(model.atransition_schedule_due())
+        self.last_clean = time.monotonic()
+
+    async def fetch_and_process_tasks(self):
+        # Calculate space left for tasks
+        space_remaining = self.concurrency - len(self.tasks)
+        # Fetch new tasks
+        for model in self.models:
+            if space_remaining > 0:
+                for instance in await model.atransition_get_with_lock(
+                    number=min(space_remaining, self.concurrency_per_model),
+                    lock_expiry=(
+                        timezone.now() + datetime.timedelta(seconds=self.lock_expiry)
+                    ),
+                ):
+                    self.tasks.append(
+                        asyncio.create_task(self.run_transition(instance))
+                    )
+                    self.handled += 1
+                    space_remaining -= 1
+
     async def run_transition(self, instance: StatorModel):
         """
         Wrapper for atransition_attempt with fallback error handling
-- 
cgit v1.2.3