extras-buildsys/server ArchJob.py,1.18,1.19
Daniel Williams (dcbw)
fedora-extras-commits at redhat.com
Sun Nov 13 02:27:41 UTC 2005
Author: dcbw
Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv23155/server
Modified Files:
ArchJob.py
Log Message:
2005-11-12 Dan Williams <dcbw at redhat.com>
* server/ArchJob.py
- Lock access to self._downloads
- Make more stuff private to ArchJob
- Use variables for download dict to reduce possibility
of typing mistakes
Index: ArchJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/ArchJob.py,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- ArchJob.py 1 Nov 2005 14:46:18 -0000 1.18
+++ ArchJob.py 13 Nov 2005 02:27:39 -0000 1.19
@@ -25,6 +25,14 @@
from plague import FileDownloader
from plague import CommonErrors
+DL_RETRIES = 'retries'
+DL_STATUS = 'status'
+DL_WAIT_TIME = 'wait_time'
+
+STATUS_WAITING = 'waiting'
+STATUS_ERROR = 'error'
+STATUS_INPROGRESS = 'in-progress'
+STATUS_DONE = 'done'
class ArchJob:
""" Tracks a single build instance for a single arch on a builder """
@@ -35,23 +43,27 @@
self._server = server
self._use_ssl = cfg.get_bool("Builders", "use_ssl")
self.jobid = jobid
- self.status = 'running'
- self.builder_status = ''
+ self._status = 'running'
+ self._builder_status = ''
self._failure_noticed = False
self._download_failed = False
self._target_dict = target_dict
self._builder_gone = False
- self.downloads = {}
- self.starttime = time.time()
- self.endtime = 0
+ self._download_lock = threading.Lock()
+ self._downloads = {}
+ self._starttime = time.time()
+ self._endtime = 0
self._die = False
self._die_lock = threading.Lock()
# SSL certificate and key filenames
- self._certs = {}
- self._certs['key_and_cert'] = cfg.get_str("SSL", "server_key_and_cert")
- self._certs['ca_cert'] = cfg.get_str("SSL", "ca_cert")
- self._certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert")
+ if self._use_ssl:
+ self._certs = {}
+ self._certs['key_and_cert'] = cfg.get_str("SSL", "server_key_and_cert")
+ self._certs['ca_cert'] = cfg.get_str("SSL", "ca_cert")
+ self._certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert")
+ else:
+ self._certs = None
def failure_noticed(self):
return self._failure_noticed
@@ -60,12 +72,12 @@
self._failure_noticed = True
def _builder_finished(self):
- if self.builder_status == 'done' or self.builder_status == 'killed' or self.builder_status == 'failed' or self.builder_status == 'orphaned':
+ if self._builder_status == 'done' or self._builder_status == 'killed' or self._builder_status == 'failed' or self._builder_status == 'orphaned':
return True
return False
def builder_failed(self):
- if self.builder_status == 'killed' or self.builder_status == 'failed':
+ if self._builder_status == 'killed' or self._builder_status == 'failed':
return True
return False
@@ -73,7 +85,7 @@
return self._download_failed
def builder_prepping(self):
- if self.builder_status == 'prepping':
+ if self._builder_status == 'prepping':
return True
return False
@@ -94,28 +106,27 @@
host_port, path = urllib.splithost(addr)
host, port = urllib.splitport(host_port)
attrdict['builder_addr'] = host
- attrdict['status'] = self.status
- attrdict['builder_status'] = self.builder_status
- attrdict['starttime'] = self.starttime
- attrdict['endtime'] = self.endtime
+ attrdict['status'] = self._status
+ attrdict['builder_status'] = self._builder_status
+ attrdict['starttime'] = self._starttime
+ attrdict['endtime'] = self._endtime
return attrdict
- def set_builder_job_status(self, status):
- if status != 'idle':
- oldstatus = self.builder_status
- self.builder_status = status
- if oldstatus != self.builder_status:
- attrdict = self._to_dict()
- self.par_job.bm.queue_archjob_status_update(self.jobid, attrdict)
- del attrdict
+ def set_builder_job_status(self, builder_status):
+ oldstatus = self._builder_status
+ self._builder_status = builder_status
+ if oldstatus != self._builder_status:
+ attrdict = self._to_dict()
+ self.par_job.bm.queue_archjob_status_update(self.jobid, attrdict)
+ del attrdict
- if status == 'killed' or status == 'failed':
+ if builder_status == 'killed' or builder_status == 'failed':
self.par_job.wake()
def _set_status(self, status):
- oldstatus = self.status
- self.status = status
- if oldstatus != self.status:
+ oldstatus = self._status
+ self._status = status
+ if oldstatus != self._status:
attrdict = self._to_dict()
self.par_job.bm.queue_archjob_status_update(self.jobid, attrdict)
del attrdict
@@ -142,133 +153,147 @@
pass
return files
- def process(self):
- if self.status == 'done':
- return
-
- # If we're supposed to die, tell the builder and clean up
- self._die_lock.acquire()
- should_die = self._die
- self._die_lock.release()
- if should_die:
- self._server.die(self.jobid)
- self._set_status('done')
- return
-
- if self.status == 'running':
- # Builders pause before they enter the 'prep' state (which accesses
- # the repo for this target), and wait for the server to allow them
- # to proceed when the repo is unlocked.
- if self.builder_status == 'downloaded':
- if not self.par_job.repo.locked():
- self._send_repo_unlocked()
-
- # if the builder is done, grab list of files to download
- if self._builder_finished():
- self._set_status('downloading')
- for f in self._dl_files():
- uf = urllib.unquote(f)
- dl_dict = {}
- dl_dict['status'] = 'waiting'
- dl_dict['retries'] = 0
- dl_dict['wait_time'] = 0
- self.downloads[uf] = dl_dict
- elif self.status == 'downloading':
- # Start grabbing the next undownloaded file, but only
- # if we aren't already pulling one down
- undownloaded = False
- failed = False
- for url in self.downloads.keys():
- dl_dict = self.downloads[url]
- dl_status = dl_dict['status']
- if dl_status == 'waiting':
- # If the download got retried due to a previous
- # download error, we may have to wait a bit
- if dl_dict['wait_time'] > 0:
- dl_dict['wait_time'] = dl_dict['wait_time'] - 1
- undownloaded = True
- continue
-
- # Otherwise, spawn the download thread to grab the file
- target_dir = os.path.join(self.par_job.get_stage_dir(), self._target_dict['arch'])
- if not os.path.exists(target_dir):
- os.makedirs(target_dir)
- try:
- if self._use_ssl:
- dl_thread = FileDownloader.FileDownloader(self.dl_callback, url, url,
- target_dir, ['.rpm', '.log'], self._certs)
- else:
- dl_thread = FileDownloader.FileDownloader(self.dl_callback, url, url,
- target_dir, ['.rpm', '.log'], None)
- dl_thread.start()
- undownloaded = True
- dl_dict['status'] = 'in-progress'
- except FileDownloader.FileNameException, e:
- print "%s (%s/%s): [ %s ] Bad file name error when getting %s: '%s'" % (self.par_job.uid,
- self.par_job.package, self._target_dict['arch'], self.bci.address(), url, e)
- # Hard error, we don't retry this one
- dl_dict['status'] = 'error'
- break
- elif dl_status == 'in-progress':
- undownloaded = True
- break
- elif dl_status == 'error':
- failed = True
- continue
- elif dl_status == 'done':
- continue
+ def _is_done_status(self):
+ if self._status == 'done':
+ return True
+ return False
- # All done downloading?
- if not undownloaded:
- self._print_downloaded_files()
- self.endtime = time.time()
- if failed:
- self._download_failed = True
- self._set_status('done')
- self.par_job.wake()
+ def _status_running(self):
+ # Builders pause before they enter the 'prep' state (which accesses
+ # the repo for this target), and wait for the server to allow them
+ # to proceed when the repo is unlocked.
+ if self._builder_status == 'downloaded':
+ if not self.par_job.repo.locked():
+ self._send_repo_unlocked()
+
+ # if the builder is done, grab list of files to download
+ if self._builder_finished():
+ self._set_status('downloading')
+ for f in self._dl_files():
+ uf = urllib.unquote(f)
+ dl_dict = {}
+ dl_dict[DL_STATUS] = STATUS_WAITING
+ dl_dict[DL_RETRIES] = 0
+ dl_dict[DL_WAIT_TIME] = 0
+ self._downloads[uf] = dl_dict
def dl_callback(self, status, cb_data):
url = cb_data
- dl_dict = self.downloads[url]
+ self._download_lock.acquire()
+ dl_dict = self._downloads[url]
if status == 'done':
- dl_dict['status'] = 'done'
+ dl_dict[DL_STATUS] = STATUS_DONE
elif status == 'failed':
# Retry the download up to 3 times, then fail it
- if dl_dict['retries'] >= 3:
- dl_dict['status'] = 'error'
+ if dl_dict[DL_RETRIES] >= 3:
+ dl_dict[DL_STATUS] = STATUS_ERROR
else:
- dl_dict['status'] = 'waiting'
- dl_dict['wait_time'] = 5 # Wait a bit before trying again
- dl_dict['retries'] = dl_dict['retries'] + 1
+ dl_dict[DL_STATUS] = STATUS_WAITING
+ dl_dict[DL_WAIT_TIME] = 5 # Wait a bit before trying again
+ dl_dict[DL_RETRIES] = dl_dict[DL_RETRIES] + 1
+ self._download_lock.release()
def _print_downloaded_files(self):
file_string = ""
- ndownloads = len(self.downloads.keys())
- for url in self.downloads.keys():
+ ndownloads = len(self._downloads.keys())
+ for url in self._downloads.keys():
filename = os.path.basename(url)
string = "'" + filename + "'"
- dl_dict = self.downloads[url]
- if dl_dict['status'] == 'error':
+ dl_dict = self._downloads[url]
+ if dl_dict[DL_STATUS] == STATUS_ERROR:
string = string + " (failed)"
file_string = file_string + string
- if url != self.downloads.keys()[ndownloads - 1]:
+ if url != self._downloads.keys()[ndownloads - 1]:
file_string = file_string + ", "
print "%s (%s/%s): Build result files - [ %s ]" % (self.par_job.uid,
self.par_job.package, self._target_dict['arch'], file_string)
+ def _status_downloading(self):
+ # Start grabbing the next undownloaded file, but only
+ # if we aren't already pulling one down
+ undownloaded = False
+ failed = False
+ self._download_lock.acquire()
+ for url in self._downloads.keys():
+ dl_dict = self._downloads[url]
+ dl_status = dl_dict[DL_STATUS]
+ if dl_status == STATUS_WAITING:
+ # If the download got retried due to a previous
+ # download error, we may have to wait a bit
+ if dl_dict[DL_WAIT_TIME] > 0:
+ dl_dict[DL_WAIT_TIME] = dl_dict[DL_WAIT_TIME] - 1
+ undownloaded = True
+ continue
+
+ # Otherwise, spawn the download thread to grab the file
+ target_dir = os.path.join(self.par_job.get_stage_dir(), self._target_dict['arch'])
+ if not os.path.exists(target_dir):
+ os.makedirs(target_dir)
+
+ try:
+ dl_thread = FileDownloader.FileDownloader(self.dl_callback, url, url,
+ target_dir, ['.rpm', '.log'], self._certs)
+ dl_thread.start()
+ undownloaded = True
+ dl_dict[DL_STATUS] = STATUS_INPROGRESS
+ except FileDownloader.FileNameException, e:
+ print "%s (%s/%s): [ %s ] Bad file name error when getting %s: '%s'" % (self.par_job.uid,
+ self.par_job.package, self._target_dict['arch'], self.bci.address(), url, e)
+ # Hard error, we don't retry this one
+ dl_dict[DL_STATUS] = STATUS_ERROR
+ break
+ elif dl_status == STATUS_INPROGRESS:
+ undownloaded = True
+ break
+ elif dl_status == STATUS_ERROR:
+ failed = True
+ continue
+ elif dl_status == STATUS_DONE:
+ continue
+ self._download_lock.release()
+
+ # All done downloading?
+ if not undownloaded:
+ self._print_downloaded_files()
+ self._endtime = time.time()
+ if failed:
+ self._download_failed = True
+ self._set_status('done')
+ self.par_job.wake()
+
+ def process(self):
+ if self._is_done_status():
+ return
+
+ # If we're supposed to die, tell the builder and clean up
+ self._die_lock.acquire()
+ should_die = self._die
+ self._die_lock.release()
+ if should_die:
+ self._server.die(self.jobid)
+ self._set_status('done')
+ return
+
+ try:
+ func = getattr(self, "_status_%s" % self._status)
+ func()
+ except AttributeError:
+ print "ERROR: internal archjob inconsistency, didn't recognize status '%s'.\n" % self._status
+ self._set_status('failed')
+
def get_status(self):
- return self.status
+ return self._status
def get_files(self):
""" Return a list of base filenames we got from the builder """
files = []
- for url in self.downloads.keys():
+ for url in self._downloads.keys():
try:
fname = FileDownloader.get_base_filename_from_url(url, ['.rpm', '.log'])
- dl_dict = self.downloads[url]
- dl_status = dl_dict['status']
- if dl_status == 'done':
+ dl_dict = self._downloads[url]
+ dl_status = dl_dict[DL_STATUS]
+ if dl_status == STATUS_DONE:
files.append(fname)
except FileDownloader.FileNameException, e:
# Just ignore the file then
@@ -278,14 +303,14 @@
return files
def builder_gone(self):
- if self.status != 'done':
- self.builder_status = 'orphaned'
+ if self._status != 'done':
+ self._builder_status = 'orphaned'
self._set_status('done')
self.par_job.remove_arch_job(self)
def die(self):
# Can be called from other threads
- if self.status == 'initialize' or self.status == 'running':
+ if self._status == 'running':
self._die_lock.acquire()
self._die = True
self._die_lock.release()
More information about the fedora-extras-commits
mailing list