diff options
Diffstat (limited to 'stator')
| -rw-r--r-- | stator/graph.py | 3 | ||||
| -rw-r--r-- | stator/runner.py | 52 | 
2 files changed, 34 insertions, 21 deletions
diff --git a/stator/graph.py b/stator/graph.py index 436638b..424ea49 100644 --- a/stator/graph.py +++ b/stator/graph.py @@ -104,6 +104,9 @@ class State:      def __repr__(self):          return f"<State {self.name}>" +    def __str__(self): +        return self.name +      def __eq__(self, other):          if isinstance(other, State):              return self is other diff --git a/stator/runner.py b/stator/runner.py index 48549bc..c78437d 100644 --- a/stator/runner.py +++ b/stator/runner.py @@ -52,28 +52,11 @@ class StatorRunner:                      Config.system = await Config.aload_system()                      print(f"{self.handled} tasks processed so far")                      print("Running cleaning and scheduling") -                    for model in self.models: -                        asyncio.create_task(model.atransition_clean_locks()) -                        asyncio.create_task(model.atransition_schedule_due()) -                    self.last_clean = time.monotonic() -                # Calculate space left for tasks +                    await self.run_cleanup() +                  self.remove_completed_tasks() -                space_remaining = self.concurrency - len(self.tasks) -                # Fetch new tasks -                for model in self.models: -                    if space_remaining > 0: -                        for instance in await model.atransition_get_with_lock( -                            number=min(space_remaining, self.concurrency_per_model), -                            lock_expiry=( -                                timezone.now() -                                + datetime.timedelta(seconds=self.lock_expiry) -                            ), -                        ): -                            self.tasks.append( -                                asyncio.create_task(self.run_transition(instance)) -                            ) -                            self.handled += 1 -                            space_remaining -= 1 +                await self.fetch_and_process_tasks() +                  # Are we in limited run mode?                  if self.run_for and (time.monotonic() - self.started) > self.run_for:                      break @@ -92,6 +75,33 @@ class StatorRunner:          print("Complete")          return self.handled +    async def run_cleanup(self): +        """ +        Do any transition cleanup tasks +        """ +        for model in self.models: +            asyncio.create_task(model.atransition_clean_locks()) +            asyncio.create_task(model.atransition_schedule_due()) +        self.last_clean = time.monotonic() + +    async def fetch_and_process_tasks(self): +        # Calculate space left for tasks +        space_remaining = self.concurrency - len(self.tasks) +        # Fetch new tasks +        for model in self.models: +            if space_remaining > 0: +                for instance in await model.atransition_get_with_lock( +                    number=min(space_remaining, self.concurrency_per_model), +                    lock_expiry=( +                        timezone.now() + datetime.timedelta(seconds=self.lock_expiry) +                    ), +                ): +                    self.tasks.append( +                        asyncio.create_task(self.run_transition(instance)) +                    ) +                    self.handled += 1 +                    space_remaining -= 1 +      async def run_transition(self, instance: StatorModel):          """          Wrapper for atransition_attempt with fallback error handling  | 
