Event scheduler¶
sched¶
Python possède depuis la versoin 2.6 un module qui permet de faire de la plannification d’évènement: un scheduler
Vous pouvez retrouver les informations de ce module dans la documentation, mais comme souvent un bon exemple permet de présenter le fonctionnement
import sched, time
s = sched.scheduler(time.time, time.sleep)
def print_time(a='default'):
print("From print_time", time.time(), a)
def print_some_times():
print(time.time())
s.enter(10, 1, print_time, ())
s.enter(5, 2, print_time, argument=('positional',))
s.enter(5, 1, print_time, ('test',))
s.run()
print(time.time())
print_some_times()
donne pour résultat
C:\Users\f.aoustin\Desktop>python test.py
1404999663.78
('From print_time', 1404999668.789, 'test')
('From print_time', 1404999668.789, 'positional')
('From print_time', 1404999673.796, 'default')
1404999673.8
ce qu’il faut noter:
- le premier évènement a bien été joué en dernier
- lors de la création d’un évènement la gestion des prioritées est bien prise en compte
- le lancement de l’objet scheduler ne rend la main qu’a la fin ... ce qui peut être problématique
Voiçi un programme qui permet:
- de supprimer des évènements intégrés au scheduler
- de threader le scheduler lui même
import sched
import threading
import time
scheduler = sched.scheduler(time.time, time.sleep)
# Set up a global to be modified by the threads
counter = 0
def increment_counter(name):
global counter
print 'EVENT:', time.time(), name
counter += 1
print 'NOW:', counter
print 'START:', time.time()
e1 = scheduler.enter(2, 1, increment_counter, ('E1',))
e2 = scheduler.enter(3, 1, increment_counter, ('E2',))
# Start a thread to run the events
t = threading.Thread(target=scheduler.run)
t.start()
# Back in the main thread, cancel the first scheduled event.
scheduler.cancel(e1)
# Wait for the scheduler to finish running in the thread
t.join()
print 'FINAL:', counter
C:\Users\f.aoustin\Desktop>python sched_cancel.py
START: 1361446610.65
EVENT: 1361446613.66 E2
NOW: 1
FINAL: 1
Advanced Python Scheduler¶
Il s’agit d’une module tier de python qui permet de faire d’un scheduler avancé, notament dans la gestion des répétitions et de la gestion des appels dans le temps.
pour l’installer on juste besoin de pip
pip install apscheduler
Warning
uen nouvelle version 3.0 est sortie et elle est incompréhensible donc lors de l’install pip install apscheduler==2.1.2
Et maintenant on peut faire des essais.
Lancer un job à une date précise (le 11 juillet 2014 à 11h49m05s)
from datetime import date, datetime
from apscheduler.scheduler import Scheduler
# Start the scheduler
sched = Scheduler()
sched.start()
# Define the function that is to be executed
def my_job(text):
print text
# The job will be executed on November 6th, 2009
exec_date = datetime(2014, 07, 11, 11, 49, 5)
# Store the job in a variable in case we want to cancel it
job = sched.add_date_job(my_job, exec_date, ['coucou'])
résultat
coucou
Note
il est possible de rajouter des noms aux jobs : sched.add_date_job(my_job, exec_date, name=”NameOfJob”, args=[‘coucou’])
Lancer un job toutes les 5 secondes à partir du 11 juillet 2014 à 11h51m05s)
from datetime import date, datetime
from apscheduler.scheduler import Scheduler
# Start the scheduler
sched = Scheduler()
sched.start()
# Define the function that is to be executed
def my_job(text):
print text
# The job will be executed on November 6th, 2009
exec_date = datetime(2014, 07, 11, 11, 51, 5)
# The same as before, but start after a certain time point
sched.add_interval_job(my_job, seconds=5, start_date=exec_date, args=['coucou'])
résultat
coucou
coucou
coucou
Lancer un job tout les lundi et vendredi à 5h30 (on utilise une notation cron-tab)
from datetime import date, datetime
from apscheduler.scheduler import Scheduler
# Start the scheduler
sched = Scheduler()
sched.start()
# Define the function that is to be executed
def my_job(text):
print text
# The job will be executed on November 6th, 2009
exec_date = datetime(2014, 07, 11, 11, 51, 5)
# The same as before, but start after a certain time point
sched.add_cron_job(my_job, day_of_week='mon-fri', hour=5, minute=30, args=['coucou'])
on peut aussi imaginer une tâche tout les jours à 5h30
sched.add_cron_job(my_job, day_of_week='*', hour=5, minute=30, args=['coucou'])
Il est possible d’obtenir la liste des jobs
sched.get_jobs()
Il est possible de “déscheduler” un job ou une fonction
sched.unschedule_func(func)
sched.unschedule_job(job)
On peut aussi rajouter un listener au scheduleur permetttant d’avoir les infos de démarrage et d’arret de tâche par exemple
def my_listener(event):
if event.exception:
print 'The job crashed :('
else:
print 'The job worked :)'
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
liste des évènements dans le listener
Constant | Event class | Triggered when... |
---|---|---|
EVENT_SCHEDULER_START | SchedulerEvent | The scheduler is started |
EVENT_SCHEDULER_SHUTDOWN | SchedulerEvent | The scheduler is shut down |
EVENT_JOBSTORE_ADDED | JobStoreEvent | A job store is added to the scheduler |
EVENT_JOBSTORE_REMOVED | JobStoreEvent | A job store is removed from the scheduler |
EVENT_JOBSTORE_JOB_ADDED | JobStoreEvent | A job is added to a job store |
EVENT_JOBSTORE_JOB_REMOVED | JobStoreEvent | A job is removed from a job store |
EVENT_JOB_EXECUTED | JobEvent | A job is executed successfully |
EVENT_JOB_ERROR | JobEvent | A job raised an exception during execution |
EVENT_JOB_MISSED | JobEvent | A job execution time is missed |
Il est aussi possible de gérer par job le nombre d’instance, le temps avant suppression ...