1
2
3 USAGE = """
4 ## Example
5
6 For any existing app
7
8 Create File: app/models/scheduler.py ======
9 from gluon.scheduler import Scheduler
10
11 def demo1(*args,**vars):
12 print 'you passed args=%s and vars=%s' % (args, vars)
13 return 'done!'
14
15 def demo2():
16 1/0
17
18 scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2))
19 ## run worker nodes with:
20
21 cd web2py
22 python gluon/scheduler.py -u sqlite://storage.sqlite \
23 -f applications/myapp/databases/ \
24 -t mytasks.py
25 (-h for info)
26 python scheduler.py -h
27
28 ## schedule jobs using
29 http://127.0.0.1:8000/scheduler/appadmin/insert/db/scheduler_task
30
31 ## monitor scheduled jobs
32 http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_task.id>0
33
34 ## view completed jobs
35 http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_run.id>0
36
37 ## view workers
38 http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_worker.id>0
39
40 ## Comments
41 """
42
43 import os
44 import time
45 import multiprocessing
46 import sys
47 import cStringIO
48 import threading
49 import traceback
50 import signal
51 import socket
52 import datetime
53 import logging
54 import optparse
55
56 try:
57 from gluon.contrib.simplejson import loads, dumps
58 except:
59 from simplejson import loads, dumps
60
61 if 'WEB2PY_PATH' in os.environ:
62 sys.path.append(os.environ['WEB2PY_PATH'])
63 else:
64 os.environ['WEB2PY_PATH'] = os.getcwd()
65
66 from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET
67 from gluon.utils import web2py_uuid
68
69 QUEUED = 'QUEUED'
70 ASSIGNED = 'ASSIGNED'
71 RUNNING = 'RUNNING'
72 COMPLETED = 'COMPLETED'
73 FAILED = 'FAILED'
74 TIMEOUT = 'TIMEOUT'
75 STOPPED = 'STOPPED'
76 ACTIVE = 'ACTIVE'
77 INACTIVE = 'INACTIVE'
78 DISABLED = 'DISABLED'
79 SECONDS = 1
80 HEARTBEAT = 3*SECONDS
81
83 - def __init__(self,app,function,timeout,args='[]',vars='{}',**kwargs):
84 logging.debug(' new task allocated: %s.%s' % (app,function))
85 self.app = app
86 self.function = function
87 self.timeout = timeout
88 self.args = args
89 self.vars = vars
90 self.__dict__.update(kwargs)
92 return '<Task: %s>' % self.function
93
95 - def __init__(self,status,result=None,output=None,tb=None):
96 logging.debug(' new task report: %s' % status)
97 if tb:
98 logging.debug(' traceback: %s' % tb)
99 else:
100 logging.debug(' result: %s' % result)
101 self.status = status
102 self.result = result
103 self.output = output
104 self.tb = tb
106 return '<TaskReport: %s>' % self.status
107
109 """ test function """
110 for i in range(argv[0]):
111 print 'click',i
112 time.sleep(1)
113 return 'done'
114
115
116
117
119 newlist = []
120 for i in lst:
121 if isinstance(i, unicode):
122 i = i.encode('utf-8')
123 elif isinstance(i, list):
124 i = _decode_list(i)
125 newlist.append(i)
126 return newlist
127
129 newdict = {}
130 for k, v in dct.iteritems():
131 if isinstance(k, unicode):
132 k = k.encode('utf-8')
133 if isinstance(v, unicode):
134 v = v.encode('utf-8')
135 elif isinstance(v, list):
136 v = _decode_list(v)
137 newdict[k] = v
138 return newdict
139
141 """ the background process """
142 logging.debug(' task started')
143 stdout, sys.stdout = sys.stdout, cStringIO.StringIO()
144 try:
145 if task.app:
146 os.chdir(os.environ['WEB2PY_PATH'])
147 from gluon.shell import env
148 from gluon.dal import BaseAdapter
149 from gluon import current
150 level = logging.getLogger().getEffectiveLevel()
151 logging.getLogger().setLevel(logging.WARN)
152 _env = env(task.app,import_models=True)
153 logging.getLogger().setLevel(level)
154 scheduler = current._scheduler
155 scheduler_tasks = current._scheduler.tasks
156 _function = scheduler_tasks[task.function]
157 globals().update(_env)
158 args = loads(task.args)
159 vars = loads(task.vars, object_hook=_decode_dict)
160 result = dumps(_function(*args,**vars))
161 else:
162
163 result = eval(task.function)(*loads(task.args, list_hook),**loads(task.vars, object_hook=_decode_dict))
164 stdout, sys.stdout = sys.stdout, stdout
165 queue.put(TaskReport(COMPLETED, result,stdout.getvalue()))
166 except BaseException,e:
167 sys.stdout = stdout
168 tb = traceback.format_exc()
169 queue.put(TaskReport(FAILED,tb=tb))
170
263
264
265 TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED)
266 RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED)
267 WORKER_STATUS = (ACTIVE,INACTIVE,DISABLED)
268
270 """
271 validator that check whether field is valid json and validate its type
272 """
273
274 - def __init__(self,myclass=list,parse=False):
277
279 from gluon import current
280 try:
281 obj = loads(value)
282 except:
283 return (value,current.T('invalid json'))
284 else:
285 if isinstance(obj,self.myclass):
286 if self.parse:
287 return (obj,None)
288 else:
289 return (value,None)
290 else:
291 return (value,current.T('Not of type: %s') % self.myclass)
292
294 - def __init__(self,db,tasks={},migrate=True,
295 worker_name=None,group_names=None,heartbeat=HEARTBEAT):
296
297 MetaScheduler.__init__(self)
298
299 self.db = db
300 self.db_thread = None
301 self.tasks = tasks
302 self.group_names = group_names or ['main']
303 self.heartbeat = heartbeat
304 self.worker_name = worker_name or socket.gethostname()+'#'+str(web2py_uuid())
305
306 from gluon import current
307 current._scheduler = self
308
309 self.define_tables(db,migrate=migrate)
310
312 from gluon import current
313 logging.debug('defining tables (migrate=%s)' % migrate)
314 now = datetime.datetime.now()
315 db.define_table(
316 'scheduler_task',
317 Field('application_name',requires=IS_NOT_EMPTY(),
318 default=None,writable=False),
319 Field('task_name',requires=IS_NOT_EMPTY()),
320 Field('group_name',default='main',writable=False),
321 Field('status',requires=IS_IN_SET(TASK_STATUS),
322 default=QUEUED,writable=False),
323 Field('function_name',
324 requires=IS_IN_SET(sorted(self.tasks.keys()))),
325 Field('args','text',default='[]',requires=TYPE(list)),
326 Field('vars','text',default='{}',requires=TYPE(dict)),
327 Field('enabled','boolean',default=True),
328 Field('start_time','datetime',default=now),
329 Field('next_run_time','datetime',default=now),
330 Field('stop_time','datetime',default=now+datetime.timedelta(days=1)),
331 Field('repeats','integer',default=1,comment="0=unlimted"),
332 Field('period','integer',default=60,comment='seconds'),
333 Field('timeout','integer',default=60,comment='seconds'),
334 Field('times_run','integer',default=0,writable=False),
335 Field('last_run_time','datetime',writable=False,readable=False),
336 Field('assigned_worker_name',default='',writable=False),
337 migrate=migrate,format='%(task_name)s')
338 if hasattr(current,'request'):
339 db.scheduler_task.application_name.default=current.request.application
340
341 db.define_table(
342 'scheduler_run',
343 Field('scheduler_task','reference scheduler_task'),
344 Field('status',requires=IS_IN_SET(RUN_STATUS)),
345 Field('start_time','datetime'),
346 Field('stop_time','datetime'),
347 Field('output','text'),
348 Field('result','text'),
349 Field('traceback','text'),
350 Field('worker_name',default=self.worker_name),
351 migrate=migrate)
352
353 db.define_table(
354 'scheduler_worker',
355 Field('worker_name'),
356 Field('first_heartbeat','datetime'),
357 Field('last_heartbeat','datetime'),
358 Field('status',requires=IS_IN_SET(WORKER_STATUS)),
359 migrate=migrate)
360 db.commit()
361
362 - def loop(self,worker_name=None):
364
366 now = datetime.datetime.now()
367 db, ts = self.db, self.db.scheduler_task
368 try:
369 logging.debug(' grabbing all queued tasks')
370 all_available = db(ts.status.belongs((QUEUED,RUNNING)))\
371 ((ts.times_run<ts.repeats)|(ts.repeats==0))\
372 (ts.start_time<=now)\
373 (ts.stop_time>now)\
374 (ts.next_run_time<=now)\
375 (ts.enabled==True)\
376 (ts.assigned_worker_name.belongs((None,'',self.worker_name)))
377 number_grabbed = all_available.update(
378 assigned_worker_name=self.worker_name,status=ASSIGNED)
379 db.commit()
380 except:
381 db.rollback()
382 logging.debug(' grabbed %s tasks' % number_grabbed)
383 if number_grabbed:
384 grabbed = db(ts.assigned_worker_name==self.worker_name)\
385 (ts.status==ASSIGNED)
386 task = grabbed.select(limitby=(0,1), orderby=ts.next_run_time).first()
387
388 logging.debug(' releasing all but one (running)')
389 if task:
390 task.update_record(status=RUNNING,last_run_time=now)
391 grabbed.update(assigned_worker_name='',status=QUEUED)
392 db.commit()
393 else:
394 return None
395 next_run_time = task.last_run_time + datetime.timedelta(seconds=task.period)
396 times_run = task.times_run + 1
397 if times_run < task.repeats or task.repeats==0:
398 run_again = True
399 else:
400 run_again = False
401 logging.debug(' new scheduler_run record')
402 while True:
403 try:
404 run_id = db.scheduler_run.insert(
405 scheduler_task = task.id,
406 status=RUNNING,
407 start_time=now,
408 worker_name=self.worker_name)
409 db.commit()
410 break
411 except:
412 db.rollback
413 logging.info('new task %(id)s "%(task_name)s" %(application_name)s.%(function_name)s' % task)
414 return Task(
415 app = task.application_name,
416 function = task.function_name,
417 timeout = task.timeout,
418 args = task.args,
419 vars = task.vars,
420 task_id = task.id,
421 run_id = run_id,
422 run_again = run_again,
423 next_run_time=next_run_time,
424 times_run = times_run)
425
427 logging.debug(' recording task report in db (%s)' % task_report.status)
428 db = self.db
429 db(db.scheduler_run.id==task.run_id).update(
430 status = task_report.status,
431 stop_time = datetime.datetime.now(),
432 result = task_report.result,
433 output = task_report.output,
434 traceback = task_report.tb)
435 if task_report.status == COMPLETED:
436 d = dict(status = task.run_again and QUEUED or COMPLETED,
437 next_run_time = task.next_run_time,
438 times_run = task.times_run,
439 assigned_worker_name = '')
440 else:
441 d = dict(
442 assigned_worker_name = '',
443 status = {'FAILED':'FAILED',
444 'TIMEOUT':'TIMEOUT',
445 'STOPPED':'QUEUED'}[task_report.status])
446 db(db.scheduler_task.id==task.task_id)\
447 (db.scheduler_task.status==RUNNING).update(**d)
448 db.commit()
449 logging.info('task completed (%s)' % task_report.status)
450
452 if not self.db_thread:
453 logging.debug('thread building own DAL object')
454 self.db_thread = DAL(self.db._uri,folder = self.db._adapter.folder)
455 self.define_tables(self.db_thread,migrate=False)
456 try:
457 db = self.db_thread
458 sw, st = db.scheduler_worker, db.scheduler_task
459 now = datetime.datetime.now()
460 expiration = now-datetime.timedelta(seconds=self.heartbeat*3)
461
462 logging.debug('........recording heartbeat')
463 if not db(sw.worker_name==self.worker_name)\
464 .update(last_heartbeat = now, status = ACTIVE):
465 sw.insert(status = ACTIVE,worker_name = self.worker_name,
466 first_heartbeat = now,last_heartbeat = now)
467 if counter % 10 == 0:
468
469 logging.debug(' freeing workers that have not sent heartbeat')
470 inactive_workers = db(sw.last_heartbeat<expiration)
471 db(st.assigned_worker_name.belongs(
472 inactive_workers._select(sw.worker_name)))\
473 (st.status.belongs((RUNNING,ASSIGNED,QUEUED)))\
474 .update(assigned_worker_name='',status=QUEUED)
475 inactive_workers.delete()
476 db.commit()
477 except:
478 db.rollback()
479 time.sleep(self.heartbeat)
480
483
485 """
486 allows to run worker without python web2py.py .... by simply python this.py
487 """
488 parser = optparse.OptionParser()
489 parser.add_option(
490 "-w", "--worker_name", dest="worker_name", default=None,
491 help="start a worker with name")
492 parser.add_option(
493 "-b", "--heartbeat",dest="heartbeat", default = 10,
494 help="heartbeat time in seconds (default 10)")
495 parser.add_option(
496 "-L", "--logger_level",dest="logger_level",
497 default = 'INFO',
498 help="level of logging (DEBUG, INFO, WARNING, ERROR)")
499 parser.add_option(
500 "-g", "--group_names",dest="group_names",
501 default = 'main',
502 help="comma separated list of groups to be picked by the worker")
503 parser.add_option(
504 "-f", "--db_folder",dest="db_folder",
505 default = '/Users/mdipierro/web2py/applications/scheduler/databases',
506 help="location of the dal database folder")
507 parser.add_option(
508 "-u", "--db_uri",dest="db_uri",
509 default = 'sqlite://storage.sqlite',
510 help="database URI string (web2py DAL syntax)")
511 parser.add_option(
512 "-t", "--tasks",dest="tasks",default=None,
513 help="file containing task files, must define" + \
514 "tasks = {'task_name':(lambda: 'output')} or similar set of tasks")
515 (options, args) = parser.parse_args()
516 if not options.tasks or not options.db_uri:
517 print USAGE
518 if options.tasks:
519 path,filename = os.path.split(options.tasks)
520 if filename.endswith('.py'):
521 filename = filename[:-3]
522 sys.path.append(path)
523 print 'importing tasks...'
524 tasks = __import__(filename, globals(), locals(), [], -1).tasks
525 print 'tasks found: '+', '.join(tasks.keys())
526 else:
527 tasks = {}
528 group_names = [x.strip() for x in options.group_names.split(',')]
529
530 logging.getLogger().setLevel(logging.DEBUG)
531
532 print 'groups for this worker: '+', '.join(group_names)
533 print 'connecting to database in folder: ' + options.db_folder or './'
534 print 'using URI: '+options.db_uri
535 db = DAL(options.db_uri,folder=options.db_folder)
536 print 'instantiating scheduler...'
537 scheduler=Scheduler(db = db,
538 worker_name = options.worker_name,
539 tasks = tasks,
540 migrate = True,
541 group_names = group_names,
542 heartbeat = options.heartbeat)
543 print 'starting main worker loop...'
544 scheduler.loop()
545
546 if __name__=='__main__':
547 main()
548