extras-buildsys/server ArchJob.py,1.18,1.19

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Sun Nov 13 02:27:41 UTC 2005


Author: dcbw

Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv23155/server

Modified Files:
	ArchJob.py 
Log Message:
2005-11-12  Dan Williams  <dcbw at redhat.com>

    * server/ArchJob.py
        - Lock access to self._downloads
        - Make more stuff private to ArchJob
        - Use variables for download dict to reduce possibility
            of typing mistakes




Index: ArchJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/ArchJob.py,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- ArchJob.py	1 Nov 2005 14:46:18 -0000	1.18
+++ ArchJob.py	13 Nov 2005 02:27:39 -0000	1.19
@@ -25,6 +25,14 @@
 from plague import FileDownloader
 from plague import CommonErrors
 
+DL_RETRIES = 'retries'
+DL_STATUS = 'status'
+DL_WAIT_TIME = 'wait_time'
+
+STATUS_WAITING = 'waiting'
+STATUS_ERROR = 'error'
+STATUS_INPROGRESS = 'in-progress'
+STATUS_DONE = 'done'
 
 class ArchJob:
     """ Tracks a single build instance for a single arch on a builder """
@@ -35,23 +43,27 @@
         self._server = server
         self._use_ssl = cfg.get_bool("Builders", "use_ssl")
         self.jobid = jobid
-        self.status = 'running'
-        self.builder_status = ''
+        self._status = 'running'
+        self._builder_status = ''
         self._failure_noticed = False
         self._download_failed = False
         self._target_dict = target_dict
         self._builder_gone = False
-        self.downloads = {}
-        self.starttime = time.time()
-        self.endtime = 0
+        self._download_lock = threading.Lock()
+        self._downloads = {}
+        self._starttime = time.time()
+        self._endtime = 0
         self._die = False
         self._die_lock = threading.Lock()
 
         # SSL certificate and key filenames
-        self._certs = {}
-        self._certs['key_and_cert'] = cfg.get_str("SSL", "server_key_and_cert")
-        self._certs['ca_cert'] = cfg.get_str("SSL", "ca_cert")
-        self._certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert")
+        if self._use_ssl:
+            self._certs = {}
+            self._certs['key_and_cert'] = cfg.get_str("SSL", "server_key_and_cert")
+            self._certs['ca_cert'] = cfg.get_str("SSL", "ca_cert")
+            self._certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert")
+        else:
+            self._certs = None
 
     def failure_noticed(self):
         return self._failure_noticed
@@ -60,12 +72,12 @@
         self._failure_noticed = True
 
     def _builder_finished(self):
-        if self.builder_status == 'done' or self.builder_status == 'killed' or self.builder_status == 'failed' or self.builder_status == 'orphaned':
+        if self._builder_status == 'done' or self._builder_status == 'killed' or self._builder_status == 'failed' or self._builder_status == 'orphaned':
             return True
         return False
 
     def builder_failed(self):
-        if self.builder_status == 'killed' or self.builder_status == 'failed':
+        if self._builder_status == 'killed' or self._builder_status == 'failed':
             return True
         return False
 
@@ -73,7 +85,7 @@
         return self._download_failed
 
     def builder_prepping(self):
-        if self.builder_status == 'prepping':
+        if self._builder_status == 'prepping':
             return True
         return False
 
@@ -94,28 +106,27 @@
         host_port, path = urllib.splithost(addr)
         host, port = urllib.splitport(host_port)
         attrdict['builder_addr'] = host
-        attrdict['status'] = self.status
-        attrdict['builder_status'] = self.builder_status
-        attrdict['starttime'] = self.starttime
-        attrdict['endtime'] = self.endtime
+        attrdict['status'] = self._status
+        attrdict['builder_status'] = self._builder_status
+        attrdict['starttime'] = self._starttime
+        attrdict['endtime'] = self._endtime
         return attrdict
 
-    def set_builder_job_status(self, status):
-        if status != 'idle':            
-            oldstatus = self.builder_status
-            self.builder_status = status
-            if oldstatus != self.builder_status:
-                attrdict = self._to_dict()
-                self.par_job.bm.queue_archjob_status_update(self.jobid, attrdict)
-                del attrdict
+    def set_builder_job_status(self, builder_status):
+        oldstatus = self._builder_status
+        self._builder_status = builder_status
+        if oldstatus != self._builder_status:
+            attrdict = self._to_dict()
+            self.par_job.bm.queue_archjob_status_update(self.jobid, attrdict)
+            del attrdict
 
-        if status == 'killed' or status == 'failed':
+        if builder_status == 'killed' or builder_status == 'failed':
             self.par_job.wake()
 
     def _set_status(self, status):
-        oldstatus = self.status
-        self.status = status
-        if oldstatus != self.status:
+        oldstatus = self._status
+        self._status = status
+        if oldstatus != self._status:
             attrdict = self._to_dict()
             self.par_job.bm.queue_archjob_status_update(self.jobid, attrdict)
             del attrdict
@@ -142,133 +153,147 @@
             pass
         return files
 
-    def process(self):
-        if self.status == 'done':
-            return
-
-        # If we're supposed to die, tell the builder and clean up
-        self._die_lock.acquire()
-        should_die = self._die
-        self._die_lock.release()
-        if should_die:
-            self._server.die(self.jobid)
-            self._set_status('done')
-            return
-
-        if self.status == 'running':
-            # Builders pause before they enter the 'prep' state (which accesses
-            # the repo for this target), and wait for the server to allow them
-            # to proceed when the repo is unlocked.
-            if self.builder_status == 'downloaded':
-                if not self.par_job.repo.locked():
-                    self._send_repo_unlocked()
-
-            # if the builder is done, grab list of files to download
-            if self._builder_finished():
-                self._set_status('downloading')
-                for f in self._dl_files():
-                    uf = urllib.unquote(f)
-                    dl_dict = {}
-                    dl_dict['status'] = 'waiting'
-                    dl_dict['retries'] = 0
-                    dl_dict['wait_time'] = 0
-                    self.downloads[uf] = dl_dict
-        elif self.status == 'downloading':
-            # Start grabbing the next undownloaded file, but only
-            # if we aren't already pulling one down
-            undownloaded = False
-            failed = False
-            for url in self.downloads.keys():
-                dl_dict = self.downloads[url]
-                dl_status = dl_dict['status']
-                if dl_status == 'waiting':
-                    # If the download got retried due to a previous
-                    # download error, we may have to wait a bit
-                    if dl_dict['wait_time'] > 0:
-                        dl_dict['wait_time'] = dl_dict['wait_time'] - 1
-                        undownloaded = True
-                        continue
-
-                    # Otherwise, spawn the download thread to grab the file
-                    target_dir = os.path.join(self.par_job.get_stage_dir(), self._target_dict['arch'])
-                    if not os.path.exists(target_dir):
-                        os.makedirs(target_dir)
-                    try:
-                        if self._use_ssl:
-                            dl_thread = FileDownloader.FileDownloader(self.dl_callback, url, url,
-                                        target_dir, ['.rpm', '.log'], self._certs)
-                        else:
-                            dl_thread = FileDownloader.FileDownloader(self.dl_callback, url, url,
-                                        target_dir, ['.rpm', '.log'], None)
-                        dl_thread.start()
-                        undownloaded = True
-                        dl_dict['status'] = 'in-progress'
-                    except FileDownloader.FileNameException, e:
-                        print "%s (%s/%s): [ %s ] Bad file name error when getting %s: '%s'" % (self.par_job.uid,
-                                    self.par_job.package, self._target_dict['arch'], self.bci.address(), url, e)
-                        # Hard error, we don't retry this one
-                        dl_dict['status'] = 'error'
-                    break
-                elif dl_status == 'in-progress':
-                    undownloaded = True
-                    break
-                elif dl_status == 'error':
-                    failed = True
-                    continue
-                elif dl_status == 'done':
-                    continue
+    def _is_done_status(self):
+        if self._status == 'done':
+            return True
+        return False
 
-            # All done downloading?
-            if not undownloaded:
-                self._print_downloaded_files()
-                self.endtime = time.time()
-                if failed:
-                    self._download_failed = True
-                self._set_status('done')
-                self.par_job.wake()
+    def _status_running(self):
+        # Builders pause before they enter the 'prep' state (which accesses
+        # the repo for this target), and wait for the server to allow them
+        # to proceed when the repo is unlocked.
+        if self._builder_status == 'downloaded':
+            if not self.par_job.repo.locked():
+                self._send_repo_unlocked()
+
+        # if the builder is done, grab list of files to download
+        if self._builder_finished():
+            self._set_status('downloading')
+            for f in self._dl_files():
+                uf = urllib.unquote(f)
+                dl_dict = {}
+                dl_dict[DL_STATUS] = STATUS_WAITING
+                dl_dict[DL_RETRIES] = 0
+                dl_dict[DL_WAIT_TIME] = 0
+                self._downloads[uf] = dl_dict
 
     def dl_callback(self, status, cb_data):
         url = cb_data
-        dl_dict = self.downloads[url]
+        self._download_lock.acquire()
+        dl_dict = self._downloads[url]
         if status == 'done':
-            dl_dict['status'] = 'done'
+            dl_dict[DL_STATUS] = STATUS_DONE
         elif status == 'failed':
             # Retry the download up to 3 times, then fail it
-            if dl_dict['retries'] >= 3:
-                dl_dict['status'] = 'error'
+            if dl_dict[DL_RETRIES] >= 3:
+                dl_dict[DL_STATUS] = STATUS_ERROR
             else:
-                dl_dict['status'] = 'waiting'
-                dl_dict['wait_time'] = 5  # Wait a bit before trying again
-                dl_dict['retries'] = dl_dict['retries'] + 1
+                dl_dict[DL_STATUS] = STATUS_WAITING
+                dl_dict[DL_WAIT_TIME] = 5  # Wait a bit before trying again
+                dl_dict[DL_RETRIES] = dl_dict[DL_RETRIES] + 1
+        self._download_lock.release()
 
     def _print_downloaded_files(self):
         file_string = ""
-        ndownloads = len(self.downloads.keys())
-        for url in self.downloads.keys():
+        ndownloads = len(self._downloads.keys())
+        for url in self._downloads.keys():
             filename = os.path.basename(url)
             string = "'" + filename + "'"
-            dl_dict = self.downloads[url]
-            if dl_dict['status'] == 'error':
+            dl_dict = self._downloads[url]
+            if dl_dict[DL_STATUS] == STATUS_ERROR:
                 string = string + " (failed)"
             file_string = file_string + string
-            if url != self.downloads.keys()[ndownloads - 1]:
+            if url != self._downloads.keys()[ndownloads - 1]:
                 file_string = file_string + ", "
 
         print "%s (%s/%s): Build result files - [ %s ]" % (self.par_job.uid,
                     self.par_job.package, self._target_dict['arch'], file_string)
 
+    def _status_downloading(self):
+        # Start grabbing the next undownloaded file, but only
+        # if we aren't already pulling one down
+        undownloaded = False
+        failed = False
+        self._download_lock.acquire()
+        for url in self._downloads.keys():
+            dl_dict = self._downloads[url]
+            dl_status = dl_dict[DL_STATUS]
+            if dl_status == STATUS_WAITING:
+                # If the download got retried due to a previous
+                # download error, we may have to wait a bit
+                if dl_dict[DL_WAIT_TIME] > 0:
+                    dl_dict[DL_WAIT_TIME] = dl_dict[DL_WAIT_TIME] - 1
+                    undownloaded = True
+                    continue
+
+                # Otherwise, spawn the download thread to grab the file
+                target_dir = os.path.join(self.par_job.get_stage_dir(), self._target_dict['arch'])
+                if not os.path.exists(target_dir):
+                    os.makedirs(target_dir)
+
+                try:
+                    dl_thread = FileDownloader.FileDownloader(self.dl_callback, url, url,
+                                    target_dir, ['.rpm', '.log'], self._certs)
+                    dl_thread.start()
+                    undownloaded = True
+                    dl_dict[DL_STATUS] = STATUS_INPROGRESS
+                except FileDownloader.FileNameException, e:
+                    print "%s (%s/%s): [ %s ] Bad file name error when getting %s: '%s'" % (self.par_job.uid,
+                                self.par_job.package, self._target_dict['arch'], self.bci.address(), url, e)
+                    # Hard error, we don't retry this one
+                    dl_dict[DL_STATUS] = STATUS_ERROR
+                break
+            elif dl_status == STATUS_INPROGRESS:
+                undownloaded = True
+                break
+            elif dl_status == STATUS_ERROR:
+                failed = True
+                continue
+            elif dl_status == STATUS_DONE:
+                continue
+        self._download_lock.release()
+
+        # All done downloading?
+        if not undownloaded:
+            self._print_downloaded_files()
+            self._endtime = time.time()
+            if failed:
+                self._download_failed = True
+            self._set_status('done')
+            self.par_job.wake()
+
+    def process(self):
+        if self._is_done_status():
+            return
+
+        # If we're supposed to die, tell the builder and clean up
+        self._die_lock.acquire()
+        should_die = self._die
+        self._die_lock.release()
+        if should_die:
+            self._server.die(self.jobid)
+            self._set_status('done')
+            return
+
+        try:
+            func = getattr(self, "_status_%s" % self._status)
+            func()
+        except AttributeError:
+            print "ERROR: internal archjob inconsistency, didn't recognize status '%s'.\n" % self._status
+            self._set_status('failed')
+
     def get_status(self):
-        return self.status
+        return self._status
 
     def get_files(self):
         """ Return a list of base filenames we got from the builder """
         files = []
-        for url in self.downloads.keys():
+        for url in self._downloads.keys():
             try:
                 fname = FileDownloader.get_base_filename_from_url(url, ['.rpm', '.log'])
-                dl_dict = self.downloads[url]
-                dl_status = dl_dict['status']
-                if dl_status == 'done':
+                dl_dict = self._downloads[url]
+                dl_status = dl_dict[DL_STATUS]
+                if dl_status == STATUS_DONE:
                     files.append(fname)
             except FileDownloader.FileNameException, e:
                 # Just ignore the file then
@@ -278,14 +303,14 @@
         return files
 
     def builder_gone(self):
-        if self.status != 'done':
-            self.builder_status = 'orphaned'
+        if self._status != 'done':
+            self._builder_status = 'orphaned'
             self._set_status('done')
             self.par_job.remove_arch_job(self)
 
     def die(self):
         # Can be called from other threads
-        if self.status == 'initialize' or self.status == 'running':
+        if self._status == 'running':
             self._die_lock.acquire()
             self._die = True
             self._die_lock.release()




More information about the fedora-extras-commits mailing list