Package web2py :: Package gluon :: Module scheduler
[hide private]
[frames] | no frames]

Source Code for Module web2py.gluon.scheduler

  1  #### WORK IN PROGRESS... NOT SUPPOSED TO WORK YET 
  2   
  3  USAGE = """ 
  4  ## Example 
  5   
  6  For any existing app 
  7   
  8  Create File: app/models/scheduler.py ====== 
  9  from gluon.scheduler import Scheduler 
 10   
 11  def demo1(*args,**vars): 
 12      print 'you passed args=%s and vars=%s' % (args, vars) 
 13      return 'done!' 
 14   
 15  def demo2(): 
 16      1/0 
 17   
 18  scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2)) 
 19  ## run worker nodes with: 
 20   
 21     cd web2py 
 22     python gluon/scheduler.py -u sqlite://storage.sqlite \ 
 23                               -f applications/myapp/databases/ \ 
 24                               -t mytasks.py 
 25  (-h for info) 
 26  python scheduler.py -h 
 27   
 28  ## schedule jobs using 
 29  http://127.0.0.1:8000/scheduler/appadmin/insert/db/scheduler_task 
 30   
 31  ## monitor scheduled jobs 
 32  http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_task.id>0 
 33   
 34  ## view completed jobs 
 35  http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_run.id>0 
 36   
 37  ## view workers 
 38  http://127.0.0.1:8000/scheduler/appadmin/select/db?query=db.scheduler_worker.id>0 
 39   
 40  ## Comments 
 41  """ 
 42   
 43  import os 
 44  import time 
 45  import multiprocessing 
 46  import sys 
 47  import cStringIO 
 48  import threading 
 49  import traceback 
 50  import signal 
 51  import socket 
 52  import datetime 
 53  import logging 
 54  import optparse 
 55   
 56  try: 
 57      from gluon.contrib.simplejson import loads, dumps 
 58  except: 
 59      from simplejson import loads, dumps 
 60   
 61  if 'WEB2PY_PATH' in os.environ: 
 62      sys.path.append(os.environ['WEB2PY_PATH']) 
 63  else: 
 64      os.environ['WEB2PY_PATH'] = os.getcwd() 
 65   
 66  from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET 
 67  from gluon.utils import web2py_uuid 
 68   
 69  QUEUED = 'QUEUED' 
 70  ASSIGNED = 'ASSIGNED' 
 71  RUNNING = 'RUNNING' 
 72  COMPLETED = 'COMPLETED' 
 73  FAILED = 'FAILED' 
 74  TIMEOUT = 'TIMEOUT' 
 75  STOPPED = 'STOPPED' 
 76  ACTIVE = 'ACTIVE' 
 77  INACTIVE = 'INACTIVE' 
 78  DISABLED = 'DISABLED' 
 79  SECONDS = 1 
 80  HEARTBEAT = 3*SECONDS 
 81   
82 -class Task(object):
83 - def __init__(self,app,function,timeout,args='[]',vars='{}',**kwargs):
84 logging.debug(' new task allocated: %s.%s' % (app,function)) 85 self.app = app 86 self.function = function 87 self.timeout = timeout 88 self.args = args # json 89 self.vars = vars # json 90 self.__dict__.update(kwargs)
91 - def __str__(self):
92 return '<Task: %s>' % self.function
93
94 -class TaskReport(object):
95 - def __init__(self,status,result=None,output=None,tb=None):
96 logging.debug(' new task report: %s' % status) 97 if tb: 98 logging.debug(' traceback: %s' % tb) 99 else: 100 logging.debug(' result: %s' % result) 101 self.status = status 102 self.result = result 103 self.output = output 104 self.tb = tb
105 - def __str__(self):
106 return '<TaskReport: %s>' % self.status
107
108 -def demo_function(*argv,**kwargs):
109 """ test function """ 110 for i in range(argv[0]): 111 print 'click',i 112 time.sleep(1) 113 return 'done'
114 115 #the two functions below deal with simplejson decoding as unicode, esp for the dict decode 116 #and subsequent usage as function Keyword arguments unicode variable names won't work! 117 #borrowed from http://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-unicode-ones-from-json-in-python
118 -def _decode_list(lst):
119 newlist = [] 120 for i in lst: 121 if isinstance(i, unicode): 122 i = i.encode('utf-8') 123 elif isinstance(i, list): 124 i = _decode_list(i) 125 newlist.append(i) 126 return newlist
127
128 -def _decode_dict(dct):
129 newdict = {} 130 for k, v in dct.iteritems(): 131 if isinstance(k, unicode): 132 k = k.encode('utf-8') 133 if isinstance(v, unicode): 134 v = v.encode('utf-8') 135 elif isinstance(v, list): 136 v = _decode_list(v) 137 newdict[k] = v 138 return newdict
139
140 -def executor(queue,task):
141 """ the background process """ 142 logging.debug(' task started') 143 stdout, sys.stdout = sys.stdout, cStringIO.StringIO() 144 try: 145 if task.app: 146 os.chdir(os.environ['WEB2PY_PATH']) 147 from gluon.shell import env 148 from gluon.dal import BaseAdapter 149 from gluon import current 150 level = logging.getLogger().getEffectiveLevel() 151 logging.getLogger().setLevel(logging.WARN) 152 _env = env(task.app,import_models=True) 153 logging.getLogger().setLevel(level) 154 scheduler = current._scheduler 155 scheduler_tasks = current._scheduler.tasks 156 _function = scheduler_tasks[task.function] 157 globals().update(_env) 158 args = loads(task.args) 159 vars = loads(task.vars, object_hook=_decode_dict) 160 result = dumps(_function(*args,**vars)) 161 else: 162 ### for testing purpose only 163 result = eval(task.function)(*loads(task.args, list_hook),**loads(task.vars, object_hook=_decode_dict)) 164 stdout, sys.stdout = sys.stdout, stdout 165 queue.put(TaskReport(COMPLETED, result,stdout.getvalue())) 166 except BaseException,e: 167 sys.stdout = stdout 168 tb = traceback.format_exc() 169 queue.put(TaskReport(FAILED,tb=tb))
170
171 -class MetaScheduler(threading.Thread):
172 - def __init__(self):
173 threading.Thread.__init__(self) 174 self.process = None # the backround process 175 self.have_heartbeat = True # set to False to kill
176 - def async(self,task):
177 """ 178 starts the background process and returns: 179 ('ok',result,output) 180 ('error',exception,None) 181 ('timeout',None,None) 182 ('terminated',None,None) 183 """ 184 queue = multiprocessing.Queue(maxsize=1) 185 p = multiprocessing.Process(target=executor,args=(queue,task)) 186 self.process = p 187 logging.debug(' task starting') 188 p.start() 189 try: 190 p.join(task.timeout) 191 except: 192 p.terminate() 193 p.join() 194 self.have_heartbeat = False 195 logging.debug(' task stopped') 196 return TaskReport(STOPPED) 197 if p.is_alive(): 198 p.terminate() 199 p.join() 200 logging.debug(' task timeout') 201 return TaskReport(TIMEOUT) 202 elif queue.empty(): 203 self.have_heartbeat = False 204 logging.debug(' task stopped') 205 return TaskReport(STOPPED) 206 else: 207 logging.debug(' task completed or failed') 208 return queue.get()
209
210 - def die(self):
211 logging.info('die!') 212 self.have_heartbeat = False 213 self.terminate_process()
214
215 - def terminate_process(self):
216 try: 217 self.process.terminate() 218 except: 219 pass # no process to terminate
220
221 - def run(self):
222 """ the thread that sends heartbeat """ 223 counter = 0 224 while self.have_heartbeat: 225 self.send_heartbeat(counter) 226 counter += 1
227
228 - def start_heartbeats(self):
229 self.start()
230
231 - def send_heartbeat(self,counter):
232 print 'thum' 233 time.sleep(1)
234
235 - def pop_task(self):
236 return Task( 237 app = None, 238 function = 'demo_function', 239 timeout = 7, 240 args = '[2]', 241 vars = '{}')
242
243 - def report_task(self,task,task_report):
244 print 'reporting task' 245 pass
246
247 - def sleep(self):
248 pass
249
250 - def loop(self):
251 try: 252 self.start_heartbeats() 253 while True and self.have_heartbeat: 254 logging.debug('looping...') 255 task = self.pop_task() 256 if task: 257 self.report_task(task,self.async(task)) 258 else: 259 logging.debug('sleeping...') 260 self.sleep() 261 except KeyboardInterrupt: 262 self.die()
263 264 265 TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED) 266 RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED) 267 WORKER_STATUS = (ACTIVE,INACTIVE,DISABLED) 268
269 -class TYPE(object):
270 """ 271 validator that check whether field is valid json and validate its type 272 """ 273
274 - def __init__(self,myclass=list,parse=False):
275 self.myclass = myclass 276 self.parse=parse
277
278 - def __call__(self,value):
279 from gluon import current 280 try: 281 obj = loads(value) 282 except: 283 return (value,current.T('invalid json')) 284 else: 285 if isinstance(obj,self.myclass): 286 if self.parse: 287 return (obj,None) 288 else: 289 return (value,None) 290 else: 291 return (value,current.T('Not of type: %s') % self.myclass)
292
293 -class Scheduler(MetaScheduler):
294 - def __init__(self,db,tasks={},migrate=True, 295 worker_name=None,group_names=None,heartbeat=HEARTBEAT):
296 297 MetaScheduler.__init__(self) 298 299 self.db = db 300 self.db_thread = None 301 self.tasks = tasks 302 self.group_names = group_names or ['main'] 303 self.heartbeat = heartbeat 304 self.worker_name = worker_name or socket.gethostname()+'#'+str(web2py_uuid()) 305 306 from gluon import current 307 current._scheduler = self 308 309 self.define_tables(db,migrate=migrate)
310
311 - def define_tables(self,db,migrate):
312 from gluon import current 313 logging.debug('defining tables (migrate=%s)' % migrate) 314 now = datetime.datetime.now() 315 db.define_table( 316 'scheduler_task', 317 Field('application_name',requires=IS_NOT_EMPTY(), 318 default=None,writable=False), 319 Field('task_name',requires=IS_NOT_EMPTY()), 320 Field('group_name',default='main',writable=False), 321 Field('status',requires=IS_IN_SET(TASK_STATUS), 322 default=QUEUED,writable=False), 323 Field('function_name', 324 requires=IS_IN_SET(sorted(self.tasks.keys()))), 325 Field('args','text',default='[]',requires=TYPE(list)), 326 Field('vars','text',default='{}',requires=TYPE(dict)), 327 Field('enabled','boolean',default=True), 328 Field('start_time','datetime',default=now), 329 Field('next_run_time','datetime',default=now), 330 Field('stop_time','datetime',default=now+datetime.timedelta(days=1)), 331 Field('repeats','integer',default=1,comment="0=unlimted"), 332 Field('period','integer',default=60,comment='seconds'), 333 Field('timeout','integer',default=60,comment='seconds'), 334 Field('times_run','integer',default=0,writable=False), 335 Field('last_run_time','datetime',writable=False,readable=False), 336 Field('assigned_worker_name',default='',writable=False), 337 migrate=migrate,format='%(task_name)s') 338 if hasattr(current,'request'): 339 db.scheduler_task.application_name.default=current.request.application 340 341 db.define_table( 342 'scheduler_run', 343 Field('scheduler_task','reference scheduler_task'), 344 Field('status',requires=IS_IN_SET(RUN_STATUS)), 345 Field('start_time','datetime'), 346 Field('stop_time','datetime'), 347 Field('output','text'), 348 Field('result','text'), 349 Field('traceback','text'), 350 Field('worker_name',default=self.worker_name), 351 migrate=migrate) 352 353 db.define_table( 354 'scheduler_worker', 355 Field('worker_name'), 356 Field('first_heartbeat','datetime'), 357 Field('last_heartbeat','datetime'), 358 Field('status',requires=IS_IN_SET(WORKER_STATUS)), 359 migrate=migrate) 360 db.commit()
361
362 - def loop(self,worker_name=None):
363 MetaScheduler.loop(self)
364
365 - def pop_task(self):
366 now = datetime.datetime.now() 367 db, ts = self.db, self.db.scheduler_task 368 try: 369 logging.debug(' grabbing all queued tasks') 370 all_available = db(ts.status.belongs((QUEUED,RUNNING)))\ 371 ((ts.times_run<ts.repeats)|(ts.repeats==0))\ 372 (ts.start_time<=now)\ 373 (ts.stop_time>now)\ 374 (ts.next_run_time<=now)\ 375 (ts.enabled==True)\ 376 (ts.assigned_worker_name.belongs((None,'',self.worker_name))) #None? 377 number_grabbed = all_available.update( 378 assigned_worker_name=self.worker_name,status=ASSIGNED) 379 db.commit() 380 except: 381 db.rollback() 382 logging.debug(' grabbed %s tasks' % number_grabbed) 383 if number_grabbed: 384 grabbed = db(ts.assigned_worker_name==self.worker_name)\ 385 (ts.status==ASSIGNED) 386 task = grabbed.select(limitby=(0,1), orderby=ts.next_run_time).first() 387 388 logging.debug(' releasing all but one (running)') 389 if task: 390 task.update_record(status=RUNNING,last_run_time=now) 391 grabbed.update(assigned_worker_name='',status=QUEUED) 392 db.commit() 393 else: 394 return None 395 next_run_time = task.last_run_time + datetime.timedelta(seconds=task.period) 396 times_run = task.times_run + 1 397 if times_run < task.repeats or task.repeats==0: 398 run_again = True 399 else: 400 run_again = False 401 logging.debug(' new scheduler_run record') 402 while True: 403 try: 404 run_id = db.scheduler_run.insert( 405 scheduler_task = task.id, 406 status=RUNNING, 407 start_time=now, 408 worker_name=self.worker_name) 409 db.commit() 410 break 411 except: 412 db.rollback 413 logging.info('new task %(id)s "%(task_name)s" %(application_name)s.%(function_name)s' % task) 414 return Task( 415 app = task.application_name, 416 function = task.function_name, 417 timeout = task.timeout, 418 args = task.args, #in json 419 vars = task.vars, #in json 420 task_id = task.id, 421 run_id = run_id, 422 run_again = run_again, 423 next_run_time=next_run_time, 424 times_run = times_run)
425
426 - def report_task(self,task,task_report):
427 logging.debug(' recording task report in db (%s)' % task_report.status) 428 db = self.db 429 db(db.scheduler_run.id==task.run_id).update( 430 status = task_report.status, 431 stop_time = datetime.datetime.now(), 432 result = task_report.result, 433 output = task_report.output, 434 traceback = task_report.tb) 435 if task_report.status == COMPLETED: 436 d = dict(status = task.run_again and QUEUED or COMPLETED, 437 next_run_time = task.next_run_time, 438 times_run = task.times_run, 439 assigned_worker_name = '') 440 else: 441 d = dict( 442 assigned_worker_name = '', 443 status = {'FAILED':'FAILED', 444 'TIMEOUT':'TIMEOUT', 445 'STOPPED':'QUEUED'}[task_report.status]) 446 db(db.scheduler_task.id==task.task_id)\ 447 (db.scheduler_task.status==RUNNING).update(**d) 448 db.commit() 449 logging.info('task completed (%s)' % task_report.status)
450
451 - def send_heartbeat(self,counter):
452 if not self.db_thread: 453 logging.debug('thread building own DAL object') 454 self.db_thread = DAL(self.db._uri,folder = self.db._adapter.folder) 455 self.define_tables(self.db_thread,migrate=False) 456 try: 457 db = self.db_thread 458 sw, st = db.scheduler_worker, db.scheduler_task 459 now = datetime.datetime.now() 460 expiration = now-datetime.timedelta(seconds=self.heartbeat*3) 461 # record heartbeat 462 logging.debug('........recording heartbeat') 463 if not db(sw.worker_name==self.worker_name)\ 464 .update(last_heartbeat = now, status = ACTIVE): 465 sw.insert(status = ACTIVE,worker_name = self.worker_name, 466 first_heartbeat = now,last_heartbeat = now) 467 if counter % 10 == 0: 468 # deallocate jobs assigned to inactive workers and requeue them 469 logging.debug(' freeing workers that have not sent heartbeat') 470 inactive_workers = db(sw.last_heartbeat<expiration) 471 db(st.assigned_worker_name.belongs( 472 inactive_workers._select(sw.worker_name)))\ 473 (st.status.belongs((RUNNING,ASSIGNED,QUEUED)))\ 474 .update(assigned_worker_name='',status=QUEUED) 475 inactive_workers.delete() 476 db.commit() 477 except: 478 db.rollback() 479 time.sleep(self.heartbeat)
480
481 - def sleep(self):
482 time.sleep(self.heartbeat) # should only sleep until next available task
483
484 -def main():
485 """ 486 allows to run worker without python web2py.py .... by simply python this.py 487 """ 488 parser = optparse.OptionParser() 489 parser.add_option( 490 "-w", "--worker_name", dest="worker_name", default=None, 491 help="start a worker with name") 492 parser.add_option( 493 "-b", "--heartbeat",dest="heartbeat", default = 10, 494 help="heartbeat time in seconds (default 10)") 495 parser.add_option( 496 "-L", "--logger_level",dest="logger_level", 497 default = 'INFO', 498 help="level of logging (DEBUG, INFO, WARNING, ERROR)") 499 parser.add_option( 500 "-g", "--group_names",dest="group_names", 501 default = 'main', 502 help="comma separated list of groups to be picked by the worker") 503 parser.add_option( 504 "-f", "--db_folder",dest="db_folder", 505 default = '/Users/mdipierro/web2py/applications/scheduler/databases', 506 help="location of the dal database folder") 507 parser.add_option( 508 "-u", "--db_uri",dest="db_uri", 509 default = 'sqlite://storage.sqlite', 510 help="database URI string (web2py DAL syntax)") 511 parser.add_option( 512 "-t", "--tasks",dest="tasks",default=None, 513 help="file containing task files, must define" + \ 514 "tasks = {'task_name':(lambda: 'output')} or similar set of tasks") 515 (options, args) = parser.parse_args() 516 if not options.tasks or not options.db_uri: 517 print USAGE 518 if options.tasks: 519 path,filename = os.path.split(options.tasks) 520 if filename.endswith('.py'): 521 filename = filename[:-3] 522 sys.path.append(path) 523 print 'importing tasks...' 524 tasks = __import__(filename, globals(), locals(), [], -1).tasks 525 print 'tasks found: '+', '.join(tasks.keys()) 526 else: 527 tasks = {} 528 group_names = [x.strip() for x in options.group_names.split(',')] 529 530 logging.getLogger().setLevel(logging.DEBUG) 531 532 print 'groups for this worker: '+', '.join(group_names) 533 print 'connecting to database in folder: ' + options.db_folder or './' 534 print 'using URI: '+options.db_uri 535 db = DAL(options.db_uri,folder=options.db_folder) 536 print 'instantiating scheduler...' 537 scheduler=Scheduler(db = db, 538 worker_name = options.worker_name, 539 tasks = tasks, 540 migrate = True, 541 group_names = group_names, 542 heartbeat = options.heartbeat) 543 print 'starting main worker loop...' 544 scheduler.loop()
545 546 if __name__=='__main__': 547 main() 548