Source code for fredirc.task

# Copyright (c) 2014 Tobias Marquardt
#
# Distributed under terms of the (2-clause) BSD license.

""" This module provides a task class. """

__all__ = ['Task']

import asyncio
import types


[docs]class Task(object): """A Task can be used to schedule a function that will be executed by the event loop. .. note:: A task will be scheduled only if there is a running(!) :py:class:`.IRCClient` instance in the same process. There are two ways to use a Task: 1. Subclass Task and overwrite its :py:meth:`run()<.Task.run>` method. 2. Instantiate the task directly and provide a function as parameter to the constructor. After initialization the Task must be started explicitly via :py:meth:`.start()`. Args: delay (float): Time (in seconds) to defer the execution of the task after it is started or the interval for its repeated execution if ``repeat=True``. repeat (bool): If ``True`` the task will run periodically until it is stopped. func (function type): function that will be called (the actual task) """ def __init__(self, delay, repeat=False, func=None): self._repeat = repeat if delay >= 0.0: self._delay = delay else: raise ValueError('delay must not be negative.') self._loop = asyncio.get_event_loop() self._handler = None if func: if isinstance(func, types.FunctionType): self.run = func else: raise TypeError('func is not a function type.')
[docs] def change_delay(self, delay): """ Change Task delay. The new delay will be applied to the next repeat of the tasK """ if delay >= 0.0: self._delay = delay else: raise ValueError('delay must not be negative.')
[docs] def run(self): """ Method that is called on execution of the Task. Can be overwritten in subclasses or by passing a function to the constructor. """ pass
def _run(self): self.run() if self._repeat: self.start()
[docs] def start(self): """ Start the task. It will be executed after the configured delay. A started task can be stopped by calling :py:meth:`.stop()`. """ if self._handler: self._handler.cancel() self._handler = self._loop.call_later(self._delay, self._run)
[docs] def stop(self): """ Stop the task. Will have no effect if task is not running. The task might be started again later. """ if self._handler: self._handler.cancel()