source: spip-zone/_plugins_/job_queue/inc/queue.php @ 39423

Last change on this file since 39423 was 39423, checked in by cedric@…, 10 years ago

optimisation : passer le champ status en int avec un index pour accelerer les traitements et eviter les slow queries

File size: 12.1 KB
Line 
1<?php
2/*
3 * Plugin Job Queue
4 * (c) 2009 Cedric&Fil
5 * Distribue sous licence GPL
6 *
7 */
8
9@define('_JQ_SCHEDULED',1);
10@define('_JQ_PENDING',0);
11
12/**
13 * Add a job to the queue. The function added will be called in the order it
14 * was added during cron.
15 *
16 * @param $function
17 *   The function name to call.
18 * @param $description
19 *   A human-readable description of the queued job.
20 * @param $arguments
21 *   Optional array of arguments to pass to the function.
22 * @param $file
23 *   Optional file path which needs to be included for $fucntion.
24 * @param $no_duplicate
25 *   If TRUE, do not add the job to the queue if one with the same function and
26 *   arguments already exists.
27 *       If 'function_only' test of existence is only on function name (for cron job)
28 * @param $time
29 *              time for starting the job. If 0, job will start as soon as possible
30 * @param $priority
31 *              -10 (low priority) to +10 (high priority), 0 is the default
32 * @return int
33 *      id of job
34 */
35function queue_add_job($function, $description, $arguments = array(), $file = '', $no_duplicate = false, $time=0, $priority=0){
36        include_spip('base/abstract_sql');
37
38        // cas pourri de ecrire/action/editer_site avec l'option reload=oui
39        if (defined('_GENIE_SYNDIC_NOW'))
40                $arguments['id_syndic'] = _GENIE_SYNDIC_NOW;
41
42        // serialiser les arguments
43        $arguments = serialize($arguments);
44        $md5args = md5($arguments);
45
46        // si option ne pas dupliquer, regarder si la fonction existe deja
47        // avec les memes args et file
48        if (
49                        $no_duplicate
50                AND
51                        sql_countsel('spip_jobs',
52                                'status='.intval(_JQ_SCHEDULED).' AND fonction='.sql_quote($function)
53                                .(($no_duplicate==='function_only')?'':
54                                 ' AND md5args='.sql_quote($md5args).' AND inclure='.sql_quote($file)))
55                )
56                return false;
57
58        // si pas de date programee, des que possible
59        if (!$time)
60                $time = time();
61        $date = date('Y-m-d H:i:s',$time);
62
63        $id_job = sql_insertq('spip_jobs',array(
64                        'fonction'=>$function,
65                        'descriptif'=>$description,
66                        'args'=>$arguments,
67                        'md5args'=>$md5args,
68                        'inclure'=>$file,
69                        'priorite'=>max(-10,min(10,intval($priority))),
70                        'date'=>$date,
71                        'status'=>_JQ_SCHEDULED,
72                ));
73
74        if (defined('_JQ_INSERT_CHECK_ARGS') AND $id_job) {
75                $args = sql_getfetsel('args', 'spip_jobs', 'id_job='.intval($id_job));
76                if ($args!==$arguments) {
77                        spip_log('arguments job errones / longueur '.strlen($args)." vs ".strlen($arguments).' / valeur : '.var_export($arguments,true),'queue');
78                }
79        }
80
81        if ($id_job){
82                queue_update_next_job_time($time);
83        }
84
85        return $id_job;
86
87}
88
89
90/**
91 * Remove a job from the queue.
92 * @param int $id_job
93 *  id of jonb to delete
94 * @return bool
95 */
96function queue_remove_job($id_job){
97        include_spip('base/abstract_sql');
98
99        if ($row = sql_fetsel('fonction,inclure,date','spip_jobs','id_job='.intval($id_job))
100         AND $res = sql_delete('spip_jobs','id_job='.intval($id_job))){
101                queue_unlink_job($id_job);
102                // est-ce une tache cron qu'il faut relancer ?
103                if ($periode = queue_is_cron_job($row['fonction'],$row['inclure'])){
104                        // relancer avec les nouveaux arguments de temps
105                        include_spip('inc/genie');
106                        // relancer avec la periode prevue
107                        queue_genie_replan_job($row['fonction'],$periode,strtotime($row['date']));
108                }
109                queue_update_next_job_time();
110        }
111        return $res;
112}
113
114/**
115 * Link a job with SPIP objects
116 *
117 *
118 * @param int $id_job
119 *      id of job to link
120 * @param array $objets
121 *  can be a simple array('objet'=>'article','id_objet'=>23)
122 *  or an array of simple array to link multiples objet in one time
123 */
124function queue_link_job($id_job,$objets){
125        include_spip('base/abstract_sql');
126
127        if (is_array($objets) AND count($objets)){
128                if (is_array(reset($objets))){
129                        foreach($objets as $k=>$o){
130                                $objets[$k]['id_job'] = $id_job;
131                        }
132                        sql_insertq_multi('spip_jobs_liens',$objets);
133                }
134                else
135                        sql_insertq('spip_jobs_liens',array_merge(array('id_job'=>$id_job),$objets));
136        }
137}
138
139/**
140 * Unlink job with SPIP objects
141 *
142 * @param int $id_job
143 *      id of job to unlink ibject with
144 * @return int/bool
145 *      result of sql_delete
146 */
147function queue_unlink_job($id_job){
148        return sql_delete("spip_jobs_liens","id_job=".intval($id_job));
149}
150
151/**
152 * Start a job described by array $row
153 * @param array $row
154 *      describe the job, with field of table spip_jobs
155 * @return mixed
156 *      return the result of job
157 */
158function queue_start_job($row){
159
160// deserialiser les arguments
161        $args = unserialize($row['args']);
162        if ($args===false){
163                spip_log('arguments job errones '.var_export($row,true),'queue');
164                $args = array();
165        }
166
167        $fonction = $row['fonction'];
168        if (strlen($inclure = trim($row['inclure']))){
169                if (substr($inclure,-1)=='/'){ // c'est un chemin pour charger_fonction
170                        $f = charger_fonction($fonction,rtrim($inclure,'/'),false);
171                        if ($f)
172                                $fonction = $f;
173                }
174                else
175                        include_spip($inclure);
176        }
177
178        if (!function_exists($fonction)){
179                spip_log("fonction $fonction ($inclure) inexistante ".var_export($row,true),'queue');
180                return false;
181        }
182
183        switch (count($args)) {
184                case 0: $res = $fonction(); break;
185                case 1: $res = $fonction($args[0]); break;
186                case 2: $res = $fonction($args[0],$args[1]); break;
187                case 3: $res = $fonction($args[0],$args[1], $args[2]); break;
188                case 4: $res = $fonction($args[0],$args[1], $args[2], $args[3]); break;
189                case 5: $res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4]); break;
190                case 6: $res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4], $args[5]); break;
191                case 7: $res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4], $args[5], $args[6]); break;
192                case 8: $res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4], $args[5], $args[6], $args[7]); break;
193                case 9: $res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4], $args[5], $args[6], $args[7], $args[8]); break;
194                case 10:$res = $fonction($args[0],$args[1], $args[2], $args[3], $args[4], $args[5], $args[6], $args[7], $args[8], $args[9]); break;
195                default:
196                        $res = call_user_func_array($fonction, $args);
197        }
198        return $res;
199
200}
201
202/**
203 * Ordonanceur
204 * Evite les requetes sql a chaque appel
205 * en memorisant en meta la date du prochain job
206 */
207function queue_schedule($force_jobs = null){
208        $time = time();
209        if (defined('_DEBUG_BLOCK_QUEUE'))
210                return;
211
212        // rien a faire si le prochain job est encore dans le futur
213        if ($GLOBALS['meta']['queue_next_job_time']>$time AND (!$force_jobs OR !count($force_jobs)))
214                return;
215
216        include_spip('base/abstract_sql');
217
218        if (!defined('_JQ_MAX_JOBS_TIME_TO_EXECUTE')){
219                $max_time = ini_get('max_execution_time')/2;
220                // valeur conservatrice si on a pas reussi a lire le max_execution_time
221                if (!$max_time) $max_time=5;
222                define('_JQ_MAX_JOBS_TIME_TO_EXECUTE',min($max_time,15)); // une valeur maxi en temps.
223        }
224        $end_time = $time + _JQ_MAX_JOBS_TIME_TO_EXECUTE;
225
226        #spip_log("JQ schedule $time / $end_time",'jq');
227
228        if (!defined('_JQ_MAX_JOBS_EXECUTE'))
229                define('_JQ_MAX_JOBS_EXECUTE',200);
230        $nbj=0;
231        // attraper les jobs
232        // dont la date est passee (echus en attente),
233        // par odre :
234        //      - de priorite
235        //      - de date
236        // lorsqu'un job cron n'a pas fini, sa priorite est descendue
237        // pour qu'il ne bloque pas les autres jobs en attente
238        if (is_array($force_jobs) AND count($force_jobs))
239                $cond = "status=".intval(_JQ_SCHEDULED)." AND ".sql_in("id_job", $force_jobs);
240        else {
241                $now = date('Y-m-d H:i:s',$time);
242                $cond = "status=".intval(_JQ_SCHEDULED)." AND date<".sql_quote($now);
243        }
244
245        register_shutdown_function('queue_error_handler'); // recuperer les erreurs auant que possible
246        $res = sql_select('*','spip_jobs',$cond,'','priorite DESC,date','0,'.(_JQ_MAX_JOBS_EXECUTE+1));
247        do {
248                if ($row = sql_fetch($res)){
249                        $nbj++;
250                        // il faut un verrou, a base de sql_delete
251                        if (sql_delete('spip_jobs',"id_job=".intval($row['id_job']."AND status=".intval(_JQ_SCHEDULED)))){
252                                #spip_log("JQ schedule job ".$nbj." OK",'jq');
253                                // on reinsert dans la base aussitot avec un status=_JQ_PENDING
254                                $row['status'] = _JQ_PENDING;
255                                $row['date'] = $time;
256                                sql_insertq('spip_jobs', $row);
257       
258                                // on a la main sur le job :
259                                // l'executer
260                                $result = queue_start_job($row);
261
262                                $time = time();
263                                queue_close_job($row, $time, $result);
264                        }
265                }
266                #spip_log("JQ schedule job end time ".$time,'jq');
267        } while ($nbj<_JQ_MAX_JOBS_EXECUTE AND $row AND $time<$end_time);
268
269        #spip_log("JQ schedule end time ".time(),'jq');
270       
271        if ($row = sql_fetch($res)){
272                queue_update_next_job_time(0); // on sait qu'il y a encore des jobs a lancer ASAP
273                #spip_log("JQ encore !",'jq');
274        }
275        else
276                queue_update_next_job_time();
277
278}
279
280/**
281 * Terminer un job au status _JQ_PENDING :
282 *  - le reprogrammer si c'est un cron
283 *  - supprimer ses liens
284 *  - le detruire en dernier
285 *
286 * @param array $row
287 * @param int $time
288 * @param int $result
289 */
290function queue_close_job(&$row,$time,$result=0){
291        // est-ce une tache cron qu'il faut relancer ?
292        if ($periode = queue_is_cron_job($row['fonction'],$row['inclure'])){
293                // relancer avec les nouveaux arguments de temps
294                include_spip('inc/genie');
295                if ($result<0)
296                        // relancer tout de suite, mais en baissant la priorite
297                        queue_genie_replan_job($row['fonction'],$periode,0-$result/*last*/,0/*ASAP*/,$row['priorite']-1);
298                else
299                        // relancer avec la periode prevue
300                        queue_genie_replan_job($row['fonction'],$periode,$time);
301        }
302        // purger ses liens eventuels avec des objets
303        sql_delete("spip_jobs_liens","id_job=".intval($row['id_job']));
304        // supprimer le job fini
305        sql_delete('spip_jobs','id_job='.intval($row['id_job']));
306}
307
308/**
309 * Recuperer des erreurs auant que possible
310 * en terminant la gestion de la queue
311 */
312function queue_error_handler(){
313        // se remettre dans le bon dossier, car Apache le change parfois (toujours?)
314        chdir(_ROOT_CWD);
315
316        queue_update_next_job_time();
317}
318
319
320/**
321 * Test if a job in queue is periodic cron
322 *
323 * @param <type> $function
324 * @param <type> $inclure
325 * @return <type>
326 */
327function queue_is_cron_job($function,$inclure){
328        static $taches = null;
329        if (strncmp($inclure,'genie/',6)==0){
330                if (is_null($taches)){
331                        include_spip('inc/genie');
332                        $taches = taches_generales();
333                }
334                if (isset($taches[$function]))
335                        return $taches[$function];
336        }
337        return false;
338}
339
340/**
341 * Mettre a jour la date du prochain job a lancer
342 * Si une date est fournie (au format time unix)
343 * on fait simplement un min entre la date deja connue et celle fournie
344 * (cas de l'ajout simple
345 * ou cas $next_time=0 car l'on sait qu'il faut revenir ASAP)
346 *
347 * @param int $next_time
348 *      temps de la tache ajoutee ou 0 pour ASAP
349 */
350function queue_update_next_job_time($next_time=null){
351        static $nb_jobs_scheduled = null;
352        static $deja_la = false;
353        // prendre le min des $next_time que l'on voit passer ici, en cas de reentrance
354        static $next = null;
355        if (!is_null($next_time)){
356                if (is_null($next) OR $next>$next_time)
357                        $next = $next_time;
358        }
359        // queue_close_job peut etre reentrant ici
360        if ($deja_la) return;
361        $deja_la = true;
362
363        include_spip('base/abstract_sql');
364        $time = time();
365
366        // traiter les jobs morts au combat (_JQ_PENDING depuis plus de 180s)
367        // pour cause de timeout ou autre erreur fatale
368        $res = sql_select("*","spip_jobs","status=".intval(_JQ_PENDING)." AND date<".sql_quote(date('Y-m-d H:i:s',$time-180)));
369        while ($row = sql_fetch($res))
370                queue_close_job($row,$time);
371
372        // chercher la date du prochain job si pas connu
373        if (is_null($next) OR !isset($GLOBALS['meta']['queue_next_job_time'])){
374                $date = sql_getfetsel('date','spip_jobs',"status=".intval(_JQ_SCHEDULED),'','date','0,1');
375                $next = strtotime($date);
376        }
377        else {
378                if ($next){
379                        if (is_null($nb_jobs_scheduled))
380                                $nb_jobs_scheduled = sql_countsel('spip_jobs',"status=".intval(_JQ_SCHEDULED)." AND date<".sql_quote(date('Y-m-d H:i:s',$time)));
381                        elseif ($next<=$time)
382                                $nb_jobs_scheduled++;
383                        // si trop de jobs en attente, on force la purge en fin de hit
384                        // pour assurer le coup
385                        if ($nb_jobs_scheduled>_JQ_NB_JOBS_OVERFLOW)
386                                define('_DIRECT_CRON_FORCE',true);
387                }
388        }
389        // toujours relire la table pour comparer, pour tenir compte des maj concourrantes
390        // et ne mettre a jour que si il y a un interet a le faire
391        $curr_next = sql_getfetsel('valeur','spip_meta',"nom='queue_next_job_time'");
392        if (
393                        ($curr_next<$time AND $next>$time) // le prochain job est dans le futur mais pas la date planifiee actuelle
394                        OR $curr_next>$next // le prochain job est plus tot que la date planifiee actuelle
395                ) {
396                include_spip('inc/meta');
397                ecrire_meta('queue_next_job_time',$next);
398        }
399        $deja_la = false;
400}
401?>
Note: See TracBrowser for help on using the repository browser.