source: spip-zone/_plugins_/job_queue/inc/queue.php @ 38751

Last change on this file since 38751 was 38751, checked in by cedric@…, 10 years ago

les fonctions appelees au shutwodn (register_shutdown_function) peuvent s'executer dans un repertoire different.
On se remet dans le bon pour ne pas risquer de tout casser dans SPIP (mais il faudrait robustifier certains acces par un chemin absolu)

File size: 12.0 KB
Line 
1<?php
2/*
3 * Plugin Job Queue
4 * (c) 2009 Cedric&Fil
5 * Distribue sous licence GPL
6 *
7 */
8
9
10/**
11 * Add a job to the queue. The function added will be called in the order it
12 * was added during cron.
13 *
14 * @param $function
15 *   The function name to call.
16 * @param $description
17 *   A human-readable description of the queued job.
18 * @param $arguments
19 *   Optional array of arguments to pass to the function.
20 * @param $file
21 *   Optional file path which needs to be included for $fucntion.
22 * @param $no_duplicate
23 *   If TRUE, do not add the job to the queue if one with the same function and
24 *   arguments already exists.
25 *       If 'function_only' test of existence is only on function name (for cron job)
26 * @param $time
27 *              time for starting the job. If 0, job will start as soon as possible
28 * @param $priority
29 *              -10 (low priority) to +10 (high priority), 0 is the default
30 * @return int
31 *      id of job
32 */
33function queue_add_job($function, $description, $arguments = array(), $file = '', $no_duplicate = false, $time=0, $priority=0){
34        include_spip('base/abstract_sql');
35
36        // cas pourri de ecrire/action/editer_site avec l'option reload=oui
37        if (defined('_GENIE_SYNDIC_NOW'))
38                $arguments['id_syndic'] = _GENIE_SYNDIC_NOW;
39
40        // serialiser les arguments
41        $arguments = serialize($arguments);
42        $md5args = md5($arguments);
43
44        // si option ne pas dupliquer, regarder si la fonction existe deja
45        // avec les memes args et file
46        if (
47                        $no_duplicate
48                AND
49                        sql_countsel('spip_jobs',
50                                'status=\'scheduled\' AND fonction='.sql_quote($function)
51                                .(($no_duplicate==='function_only')?'':
52                                 ' AND md5args='.sql_quote($md5args).' AND inclure='.sql_quote($file)))
53                )
54                return false;
55
56        // si pas de date programee, des que possible
57        if (!$time)
58                $time = time();
59        $date = date('Y-m-d H:i:s',$time);
60
61        $id_job = sql_insertq('spip_jobs',array(
62                        'fonction'=>$function,
63                        'descriptif'=>$description,
64                        'args'=>$arguments,
65                        'md5args'=>$md5args,
66                        'inclure'=>$file,
67                        'priorite'=>max(-10,min(10,intval($priority))),
68                        'date'=>$date,
69                        'status'=>'scheduled',
70                ));
71
72        if (defined('_JQ_INSERT_CHECK_ARGS') AND $id_job) {
73                $args = sql_getfetsel('args', 'spip_jobs', 'id_job='.intval($id_job));
74                if ($args!==$arguments) {
75                        spip_log('arguments job errones / longueur '.strlen($args)." vs ".strlen($arguments).' / valeur : '.var_export($arguments,true),'queue');
76                }
77        }
78
79        if ($id_job){
80                queue_update_next_job_time($time);
81        }
82
83        return $id_job;
84
85}
86
87
88/**
89 * Remove a job from the queue.
90 * @param int $id_job
91 *  id of jonb to delete
92 * @return bool
93 */
94function queue_remove_job($id_job){
95        include_spip('base/abstract_sql');
96
97        if ($row = sql_fetsel('fonction,inclure,date','spip_jobs','id_job='.intval($id_job))
98         AND $res = sql_delete('spip_jobs','id_job='.intval($id_job))){
99                queue_unlink_job($id_job);
100                // est-ce une tache cron qu'il faut relancer ?
101                if ($periode = queue_is_cron_job($row['fonction'],$row['inclure'])){
102                        // relancer avec les nouveaux arguments de temps
103                        include_spip('inc/genie');
104                        // relancer avec la periode prevue
105                        queue_genie_replan_job($row['fonction'],$periode,strtotime($row['date']));
106                }
107                queue_update_next_job_time();
108        }
109        return $res;
110}
111
112/**
113 * Link a job with SPIP objects
114 *
115 *
116 * @param int $id_job
117 *      id of job to link
118 * @param array $objets
119 *  can be a simple array('objet'=>'article','id_objet'=>23)
120 *  or an array of simple array to link multiples objet in one time
121 */
122function queue_link_job($id_job,$objets){
123        include_spip('base/abstract_sql');
124
125        if (is_array($objets) AND count($objets)){
126                if (is_array(reset($objets))){
127                        foreach($objets as $k=>$o){
128                                $objets[$k]['id_job'] = $id_job;
129                        }
130                        sql_insertq_multi('spip_jobs_liens',$objets);
131                }
132                else
133                        sql_insertq('spip_jobs_liens',array_merge(array('id_job'=>$id_job),$objets));
134        }
135}
136
137/**
138 * Unlink job with SPIP objects
139 *
140 * @param int $id_job
141 *      id of job to unlink ibject with
142 * @return int/bool
143 *      result of sql_delete
144 */
145function queue_unlink_job($id_job){
146        return sql_delete("spip_jobs_liens","id_job=".intval($id_job));
147}
148
149/**
150 * Start a job described by array $row
151 * @param array $row
152 *      describe the job, with field of table spip_jobs
153 * @return mixed
154 *      return the result of job
155 */
156function queue_start_job($row){
157
158// deserialiser les arguments
159        $args = unserialize($row['args']);
160        if ($args===false){
161                spip_log('arguments job errones '.var_export($row,true),'queue');
162                $args = array();
163        }
164
165        $fonction = $row['fonction'];
166        if (strlen($inclure = trim($row['inclure']))){
167                if (substr($inclure,-1)=='/'){ // c'est un chemin pour charger_fonction
168                        $f = charger_fonction($fonction,rtrim($inclure,'/'),false);
169                        if ($f)
170                                $fonction = $f;
171                }
172                else
173                        include_spip($inclure);
174        }
175
176        if (!function_exists($fonction)){
177                spip_log("fonction $fonction ($inclure) inexistante ".var_export($row,true),'queue');
178                return false;
179        }
180
181        switch (count($args)) {
182                case 0: $res = $fonction(); break;
183                case 1: $res = $fonction($args[0]); break;
184                case 2: $res = $fonction($args[0],$args[1]); break;
185                case 3: $res = $fonction($args[0],$args[1], $args[2]); break;
186                case 4: $res = $fonction($args[0],$args[1], $args[2], $args[3]); break;
187                case 5: $res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4]); break;
188                case 6: $res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4], $args[5]); break;
189                case 7: $res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4], $args[5], $args[6]); break;
190                case 8: $res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4], $args[5], $args[6], $args[7]); break;
191                case 9: $res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4], $args[5], $args[6], $args[7], $args[8]); break;
192                case 10:$res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4], $args[5], $args[6], $args[7], $args[8], $args[9]); break;
193                default:
194                        $res = call_user_func_array($fonction, $args);
195        }
196        return $res;
197
198}
199
200/**
201 * Ordonanceur
202 * Evite les requetes sql a chaque appel
203 * en memorisant en meta la date du prochain job
204 */
205function queue_schedule($force_jobs = null){
206        $time = time();
207        if (defined('_DEBUG_BLOCK_QUEUE'))
208                return;
209
210        // rien a faire si le prochain job est encore dans le futur
211        if ($GLOBALS['meta']['queue_next_job_time']>$time AND (!$force_jobs OR !count($force_jobs)))
212                return;
213
214        include_spip('base/abstract_sql');
215
216        if (!defined('_JQ_MAX_JOBS_TIME_TO_EXECUTE')){
217                $max_time = ini_get('max_execution_time')/2;
218                // valeur conservatrice si on a pas reussi a lire le max_execution_time
219                if (!$max_time) $max_time=5;
220                define('_JQ_MAX_JOBS_TIME_TO_EXECUTE',min($max_time,15)); // une valeur maxi en temps.
221        }
222        $end_time = $time + _JQ_MAX_JOBS_TIME_TO_EXECUTE;
223
224        #spip_log("JQ schedule $time / $end_time",'jq');
225
226        if (!defined('_JQ_MAX_JOBS_EXECUTE'))
227                define('_JQ_MAX_JOBS_EXECUTE',200);
228        $nbj=0;
229        // attraper les jobs
230        // dont la date est passee (echus en attente),
231        // par odre :
232        //      - de priorite
233        //      - de date
234        // lorsqu'un job cron n'a pas fini, sa priorite est descendue
235        // pour qu'il ne bloque pas les autres jobs en attente
236        if (is_array($force_jobs) AND count($force_jobs))
237                $cond = "status='scheduled' AND ".sql_in("id_job", $force_jobs);
238        else {
239                $now = date('Y-m-d H:i:s',$time);
240                $cond = "status='scheduled' AND date<".sql_quote($now);
241        }
242
243        register_shutdown_function('queue_error_handler'); // recuperer les erreurs auant que possible
244        $res = sql_select('*','spip_jobs',$cond,'','priorite DESC,date','0,'.(_JQ_MAX_JOBS_EXECUTE+1));
245        do {
246                if ($row = sql_fetch($res)){
247                        $nbj++;
248                        // il faut un verrou, a base de sql_delete
249                        if (sql_delete('spip_jobs',"status='scheduled' AND id_job=".intval($row['id_job']))){
250                                #spip_log("JQ schedule job ".$nbj." OK",'jq');
251                                // on reinsert dans la base aussitot avec un status='pending'
252                                $row['status'] = 'pending';
253                                $row['date'] = $time;
254                                sql_insertq('spip_jobs', $row);
255       
256                                // on a la main sur le job :
257                                // l'executer
258                                $result = queue_start_job($row);
259
260                                $time = time();
261                                queue_close_job($row, $time, $result);
262                        }
263                }
264                #spip_log("JQ schedule job end time ".$time,'jq');
265        } while ($nbj<_JQ_MAX_JOBS_EXECUTE AND $row AND $time<$end_time);
266
267        #spip_log("JQ schedule end time ".time(),'jq');
268       
269        if ($row = sql_fetch($res)){
270                queue_update_next_job_time(0); // on sait qu'il y a encore des jobs a lancer ASAP
271                #spip_log("JQ encore !",'jq');
272        }
273        else
274                queue_update_next_job_time();
275
276}
277
278/**
279 * Terminer un job au status 'pending' :
280 *  - le reprogrammer si c'est un cron
281 *  - supprimer ses liens
282 *  - le detruire en dernier
283 *
284 * @param array $row
285 * @param int $time
286 * @param int $result
287 */
288function queue_close_job(&$row,$time,$result=0){
289        // est-ce une tache cron qu'il faut relancer ?
290        if ($periode = queue_is_cron_job($row['fonction'],$row['inclure'])){
291                // relancer avec les nouveaux arguments de temps
292                include_spip('inc/genie');
293                if ($result<0)
294                        // relancer tout de suite, mais en baissant la priorite
295                        queue_genie_replan_job($row['fonction'],$periode,0-$result/*last*/,0/*ASAP*/,$row['priorite']-1);
296                else
297                        // relancer avec la periode prevue
298                        queue_genie_replan_job($row['fonction'],$periode,$time);
299        }
300        // purger ses liens eventuels avec des objets
301        sql_delete("spip_jobs_liens","id_job=".intval($row['id_job']));
302        // supprimer le job fini
303        sql_delete('spip_jobs','id_job='.intval($row['id_job']));
304}
305
306/**
307 * Recuperer des erreurs auant que possible
308 * en terminant la gestion de la queue
309 */
310function queue_error_handler(){
311        // se remettre dans le bon dossier, car Apache le change parfois (toujours?)
312        chdir(_ROOT_CWD);
313
314        queue_update_next_job_time();
315}
316
317
318/**
319 * Test if a job in queue is periodic cron
320 *
321 * @param <type> $function
322 * @param <type> $inclure
323 * @return <type>
324 */
325function queue_is_cron_job($function,$inclure){
326        static $taches = null;
327        if (strncmp($inclure,'genie/',6)==0){
328                if (is_null($taches)){
329                        include_spip('inc/genie');
330                        $taches = taches_generales();
331                }
332                if (isset($taches[$function]))
333                        return $taches[$function];
334        }
335        return false;
336}
337
338/**
339 * Mettre a jour la date du prochain job a lancer
340 * Si une date est fournie (au format time unix)
341 * on fait simplement un min entre la date deja connue et celle fournie
342 * (cas de l'ajout simple
343 * ou cas $next_time=0 car l'on sait qu'il faut revenir ASAP)
344 *
345 * @param int $next_time
346 *      temps de la tache ajoutee ou 0 pour ASAP
347 */
348function queue_update_next_job_time($next_time=null){
349        static $nb_jobs_scheduled = null;
350        static $deja_la = false;
351        // prendre le min des $next_time que l'on voit passer ici, en cas de reentrance
352        static $next = null;
353        if (!is_null($next_time)){
354                if (is_null($next) OR $next>$next_time)
355                        $next = $next_time;
356        }
357        // queue_close_job peut etre reentrant ici
358        if ($deja_la) return;
359        $deja_la = true;
360
361        include_spip('base/abstract_sql');
362        $time = time();
363
364        // traiter les jobs morts au combat (pending depuis plus de 180s)
365        // pour cause de timeout ou autre erreur fatale
366        $res = sql_select("*","spip_jobs","status='pending' AND date<".sql_quote(date('Y-m-d H:i:s',$time-180)));
367        while ($row = sql_fetch($res))
368                queue_close_job($row,$time);
369
370        // chercher la date du prochain job si pas connu
371        if (is_null($next) OR !isset($GLOBALS['meta']['queue_next_job_time'])){
372                $date = sql_getfetsel('date','spip_jobs',"status='scheduled'",'','date','0,1');
373                $next = strtotime($date);
374        }
375        else {
376                if ($next){
377                        if (is_null($nb_jobs_scheduled))
378                                $nb_jobs_scheduled = sql_countsel('spip_jobs',"status='scheduled' AND date<".sql_quote(date('Y-m-d H:i:s',$time)));
379                        elseif ($next<=$time)
380                                $nb_jobs_scheduled++;
381                        // si trop de jobs en attente, on force la purge en fin de hit
382                        // pour assurer le coup
383                        if ($nb_jobs_scheduled>_JQ_NB_JOBS_OVERFLOW)
384                                define('_DIRECT_CRON_FORCE',true);
385                }
386        }
387        // toujours relire la table pour comparer, pour tenir compte des maj concourrantes
388        // et ne mettre a jour que si il y a un interet a le faire
389        $curr_next = sql_getfetsel('valeur','spip_meta',"nom='queue_next_job_time'");
390        if (
391                        ($curr_next<$time AND $next>$time) // le prochain job est dans le futur mais pas la date planifiee actuelle
392                        OR $curr_next>$next // le prochain job est plus tot que la date planifiee actuelle
393                ) {
394                include_spip('inc/meta');
395                ecrire_meta('queue_next_job_time',$next);
396        }
397        $deja_la = false;
398}
399?>
Note: See TracBrowser for help on using the repository browser.