extras-buildsys/server BuildMaster.py, 1.22, 1.23 UserInterface.py, 1.28, 1.29

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Mon Jul 25 21:44:54 UTC 2005


Author: dcbw

Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv29134/server

Modified Files:
	BuildMaster.py UserInterface.py 
Log Message:
2005-07-25  Dan Williams <dcbw at redhat.com>

    * client/client.py
      server/UserInterface.py
      server/BuildMaster.py
        - Add a "requeue" command to restart a failed or killed job




Index: BuildMaster.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuildMaster.py,v
retrieving revision 1.22
retrieving revision 1.23
diff -u -r1.22 -r1.23
--- BuildMaster.py	25 Jul 2005 19:47:15 -0000	1.22
+++ BuildMaster.py	25 Jul 2005 21:44:52 -0000	1.23
@@ -111,6 +111,9 @@
         self._done_queue_lock = threading.Lock()
         self._new_queue = []
         self._new_queue_lock = threading.Lock()
+        self._restart_queue = []
+        self._restart_queue_lock = threading.Lock()
+
         self._status_updates = {}
         self._status_updates_lock = threading.Lock()
         self._archjob_status_updates = {}
@@ -127,16 +130,45 @@
         self.curs = self.dbcx.cursor()
         ensure_job_db_tables(self.dbcx)
 
-        self._restart_interrupted_jobs()
+        self._requeue_interrupted_jobs()
 
         threading.Thread.__init__(self)
 
     def __del__(self):
         self.dbcx.close()
 
-    def _restart_interrupted_jobs(self):
+    def _requeue_interrupted_jobs(self):
         """ Restart interrupted jobs from our db. """
-        self.curs.execute('SELECT * FROM jobs WHERE (status!="needsign" AND status!="failed" AND status!="killed" AND status!="initialize") ORDER BY uid')
+        self.curs.execute('SELECT uid FROM jobs WHERE (status!="needsign" AND status!="failed" AND status!="killed" AND status!="initialize") ORDER BY uid')
+        self.dbcx.commit()
+        uids = self.curs.fetchall()
+
+        if len(uids) == 0:
+            return
+
+        for item in uids:
+            self.requeue_job(item[0])
+
+    def requeue_job(self, uid):
+        self._restart_queue_lock.acquire()
+        self._restart_queue.append(int(uid))
+        self._restart_queue_lock.release()
+
+    def _start_requeued_jobs(self):
+        uids = ''
+        self._restart_queue_lock.acquire()
+        for uid in self._restart_queue:
+            if len(uids) == 0:
+                uids = uids + "uid=%d" % uid
+            else:
+                uids = uids + " OR uid=%d" % uid
+        self._restart_queue = []
+        self._restart_queue_lock.release()
+
+        if len(uids) == 0:
+            return
+
+        self.curs.execute('SELECT * FROM jobs WHERE %s ORDER BY uid' % uids)
         self.dbcx.commit()
         jobs = self.curs.fetchall()
 
@@ -310,13 +342,13 @@
                     ' cvs_tag, target, buildreq, starttime, endtime, status)' \
                     ' VALUES (NULL, "%s", "%s", "%s", "%s", "%s", %d, 0, "%s")' \
                     % (item['email'], item['package'], locator, item['target'], \
-                    item['buildreq'], item['time'], 'waiting'))
+                    item['buildreq'], item['time'], 'initialize'))
             self.dbcx.commit()
 
             # Find the UID
             self.curs.execute('SELECT uid FROM jobs WHERE username="%s" AND' \
                     ' package="%s" AND cvs_tag="%s" AND target="%s" AND' \
-                    ' buildreq="%s" AND starttime=%d AND status="waiting"' \
+                    ' buildreq="%s" AND starttime=%d AND status="initialize"' \
                     % (item['email'], item['package'], locator, item['target'], \
                     item['buildreq'], item['time']))
             self.dbcx.commit()
@@ -349,6 +381,13 @@
         if have_work:
             return True
 
+        self._restart_queue_lock.acquire()
+        if len(self._restart_queue) > 0:
+            have_work = True
+        self._restart_queue_lock.release()
+        if have_work:
+            return True
+
         self._status_updates_lock.acquire()
         if len(self._status_updates) > 0:
             have_work = True
@@ -390,6 +429,7 @@
 
             # Start any new jobs
             self._start_new_jobs()
+            self._start_requeued_jobs()
 
             last_time = time.time()
             while not self._have_work() and time.time() <= last_time + 5:


Index: UserInterface.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/UserInterface.py,v
retrieving revision 1.28
retrieving revision 1.29
diff -u -r1.28 -r1.29
--- UserInterface.py	25 Jul 2005 19:47:15 -0000	1.28
+++ UserInterface.py	25 Jul 2005 21:44:52 -0000	1.29
@@ -153,6 +153,35 @@
             self._bm.enqueue_srpm(email, package, srpm_file, real_target, buildreq, time.time())
             return (0, "Success: package has been queued.")
 
+    def requeue(self, email, uid):
+        uid = validate_uid(uid)
+        if not uid:
+            return (-1, "Error: Invalid job UID.")
+
+        sql = 'SELECT uid, username, status FROM jobs WHERE uid=%d' % uid
+
+        # Run the query for the job
+        try:
+            dbcx, curs = get_dbcx()
+        except sqlite.DatabaseError, e:
+            return (-1, "Unable to access job database.")
+        curs.execute(sql)
+        job = curs.fetchone()
+        if not job:
+            return (-1, "Error: Invalid job UID.")
+
+        # Ensure matching usernames
+        if job[1] != email:
+            return (-1, "Error: You are not the original submitter for Job %d." % uid)
+
+        # Ensure the job failed or was killed
+        if job[2] != 'failed' and job[2] != 'killed':
+            return (-1, "Error: Job %d must be either 'failed' or 'killed' to requeue." % uid)
+
+        self._bm.requeue_job(uid)
+        return (0, "Success: Job %d has been requeued." % uid)
+        
+
     def _kill_job(self, email, job, jobid):
         if not job:
             return (-1, "Job %s does not exist." % jobid)
@@ -373,6 +402,11 @@
             return (-1, "Insufficient privileges.")
         return UserInterface.enqueue_srpm(self, user.email, package, srpm_file, target, buildreq)
 
+    def requeue(self, uid):
+        user = AuthedXMLRPCServer.get_authinfo()
+        if not user or not user.own_jobs:
+            return (-1, "Insufficient privileges.")
+        return UserInterface.requeue(self, user.email, uid)
 
     def kill_job(self, email, jobid):
         user = AuthedXMLRPCServer.get_authinfo()




More information about the fedora-extras-commits mailing list