extras-buildsys/builder Builder.py, 1.12, 1.13 BuilderMock.py, 1.6, 1.7 Config.py, 1.3, 1.4

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Mon May 15 17:26:52 UTC 2006


Author: dcbw

Update of /cvs/fedora/extras-buildsys/builder
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv8745/builder

Modified Files:
	Builder.py BuilderMock.py Config.py 
Log Message:
2006-05-15  Dan Williams  <dcbw at redhat.com>

    * builder/Builder.py
      builder/BuilderMock.py
        - (untested) Make passive builders work on the builder side




Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/builder/Builder.py,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -r1.12 -r1.13
--- Builder.py	14 May 2006 16:13:06 -0000	1.12
+++ Builder.py	15 May 2006 17:26:45 -0000	1.13
@@ -24,12 +24,15 @@
 import string
 import xmlrpclib
 import OpenSSL
+import shutil
 from plague import Commands
-from plague import AuthedXMLRPCServer
+from plague.AuthedXMLRPCServer import AuthedSSLXMLRPCServer
+from plague.AuthedXMLRPCServer import AuthedXMLRPCServer
 from plague import HTTPServer
 from plague import XMLRPCServerProxy
 from plague import FileDownloader
 from plague import FileUploader
+from plague import FileTransfer
 
 import Config
 import BuilderMock
@@ -38,15 +41,17 @@
 class BuilderInitException(Exception):
     pass
 
-def get_hostname(cfg, bind_all):
-    cfg_hostname = cfg.get_str("Passive", "hostname")
+def get_hostname(cfg, bind_all=False):
+    """Get the builder's hostname, optionally returning a hostname
+    suitable for binding to all active interfaces."""
+    cfg_hostname = cfg.get_str("General", "hostname")
     if cfg_hostname and len(cfg_hostname):
         return cfg_hostname
     elif bind_all:
         return ''
     return socket.gethostname()
 
-def determine_max_jobs():
+def _determine_max_jobs():
     """ Simple max job calculator based on number of CPUs """
 
     import commands
@@ -61,25 +66,51 @@
     return max_jobs
 
 
+def prefix_url(url, use_ssl):
+    """Convenience function to add correct URL method
+    for the security method we're using."""
+    if use_ssl:
+        return "https://" + url
+    return "http://" + url
+
+
 class Builder(object):
     """ Abstract builder base object """
     def __init__(self, cfg):
         self._cfg = cfg
-        self._max_slots = determine_max_jobs()
+        self._max_slots = _determine_max_jobs()
         self._seq_gen = Commands.SequenceGenerator()
+        self._queued_cmds = []
+
+        self._stop = False
+        self._stopped = False
 
         self._building_jobs_lock = threading.Lock()
         self._building_jobs = []
         self._all_jobs = {}
 
+        # Decompose hostname to just get the hostname
+        name = cfg.get_str("General", "server")
+        slash_idx = name.find("//")
+        if slash_idx > 0:
+            name = name[slash_idx + 2:]
+        slash_idx = name.find("/")
+        if slash_idx > 0:
+            name = name[:slash_idx]
+        self._server_hostname = name
+
+        self._server_ip = None
+        self._get_server_ip()
+
         self._certs = None
-        if cfg.get_bool("SSL", "use_ssl"):
-            hostname = get_hostname(self._cfg, False)
-            key_file = os.path.join(cfg.get_str("SSL", "builder_key_and_cert_dir"), "%s.pem" % hostname)
+        self._use_ssl = cfg.get_bool("SSL", "use_ssl")
+        if self._use_ssl:
             self._certs = {}
-            self._certs['key_and_cert'] = key_file
+            hostname = get_hostname(self._cfg)
+            builder_cert = cfg.get_str("SSL", "builder_key_and_cert_dir")
+            self._certs['key_and_cert'] = os.path.join(builder_cert, "%s.pem" % hostname)
             self._certs['ca_cert'] = cfg.get_str("SSL", "ca_cert")
-            self._certs['peer_ca_cert'] = self._certs['ca_cert']
+            self._certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert")
 
         build_arches = []
         for target in self._cfg.targets():
@@ -88,6 +119,14 @@
                     build_arches.append(arch)
         self._log("Available architectures:  [%s]" % string.join(build_arches, ", "))
 
+    def _get_server_ip(self):
+        try:
+            self._server_ip = socket.gethostbyname(self._server_hostname)
+            return True
+        except Exception:
+            pass
+        return False
+
     def _log(self, msg, newline=True):
         if self._cfg.get_bool("General", "debug"):
             if newline:
@@ -96,6 +135,7 @@
             sys.stdout.flush()
 
     def new_builder(cfg, btype):
+        """Create and return a new builder object of the requested type."""
         if btype == 'passive':
             return PassiveBuilder(cfg)
         elif btype == 'active':
@@ -104,10 +144,19 @@
             return None
     new_builder = staticmethod(new_builder)
 
-    def stop(self):
-        pass
+    def work(self):
+        """Builder work loop, starts the builder object's thread
+        and sleeps until it's done."""
+        self.start()
+        try:
+            while not self._stop:
+                time.sleep(60)
+        except KeyboardInterrupt:
+            pass
 
     def cleanup(self):
+        """Clean up the builder by killing all running jobs and waiting
+        for them to complete."""
         self._building_jobs_lock.acquire()
         for job in self._building_jobs:
             job.die()
@@ -162,6 +211,58 @@
 
         return target_cfg
 
+    def _get_default_commands(self):
+        """Return a python list of serialized commands that the builder
+        sends to the server every time it contacts the server."""
+
+        defcmds = []
+
+        # always send a target list
+        next_seq = self._seq_gen.next()
+        cmd = Commands.PlgCommandTargets(self.supported_targets(), next_seq)
+        defcmds.append(cmd)
+
+        # always send free & max slots
+        next_seq = self._seq_gen.next()
+        (free_slots, max_slots) = self.slots()
+        cmd = Commands.PlgCommandSlots(free_slots, max_slots, next_seq)
+        defcmds.append(cmd)
+
+        defcmds.append(self._get_building_jobs_cmd())
+
+        return defcmds
+
+    def _dispatch_server_command(self, cmd):
+        """Process a single command from the server."""
+
+        if isinstance(cmd, Commands.PlgCommandNewJobReq):
+            (uniqid, msg) = self._start_new_job(cmd)
+            ack = Commands.PlgCommandNewJobAck(uniqid, msg, cmd.seq(), self._seq_gen.next())
+            self._queued_cmds.append(ack)
+        elif isinstance(cmd, Commands.PlgCommandUnlockRepo):
+            self._handle_unlock_repo_request(cmd)
+        elif isinstance(cmd, Commands.PlgCommandJobStatus):
+            reply = self._handle_job_status_request(cmd)
+            if reply:
+                self._queued_cmds.append(reply)
+        elif isinstance(cmd, Commands.PlgCommandJobFiles):
+            reply = self._handle_job_files_request(cmd)
+            if reply:
+                self._queued_cmds.append(reply)
+        elif isinstance(cmd, Commands.PlgCommandKillJob):
+            self._handle_kill_job_command(cmd)
+
+    def _process_server_commands(self, cmd_list):
+        """Process the server's command stream."""
+
+        if not cmd_list:
+            # Something went wrong...
+            return
+
+        cmds = Commands.deserialize_command_stream(cmd_list)
+        for cmd in cmds:
+            self._dispatch_server_command(cmd)
+
     def _new_job_for_arch(self, target_cfg, buildarch, srpm_url, uniqid):
         """Creates a new mock build job given a particular build architecture."""
 
@@ -252,130 +353,184 @@
             pass
 
 
-class PassiveBuilderRequestHandler:
-    def __init__(self, cfg, builder):
-        self._builder = builder
-        self._all_jobs = {} # unique id => awclass instance
-        self._building_jobs_lock = threading.Lock()
-        self._building_jobs = []
-        self._cfg = cfg
-
-    def _log(self, msg):
-        if self._cfg.get_bool("General", "debug"):
-            print msg
-
-    def die(self, uniqid):
-        try:
-            job = self._all_jobs[uniqid]
-            job.die()
-        except KeyError:
-            pass
-        return 0
-
-    def files(self, uniqid):
-        try:
-            job = self._all_jobs[uniqid]
-            return job.files()
-        except KeyError:
-            pass
-        return []
-    
-    def repo_unlocked(self, uniqid):
-        try:
-            job = self._all_jobs[uniqid]
-            job.unlock_repo()
-        except KeyError:
-            pass
-        return 0
-
-    def building_jobs(self):
-        jobs = {}
-        self._building_jobs_lock.acquire()
-        building = 0
-        for job in self._building_jobs:
-            jobs[job.uniqid()] = job.status()
-            building = building + 1
-        free = self._max_jobs - building
-        self._building_jobs_lock.release()
-        return (jobs, free)
-
-    def num_slots(self):
-        (free_slots, max_slots) = self._builder.slots()
-        return max_slots
-
-    def job_status(self, uniqid):
-        try:
-            job = self._all_jobs[uniqid]
-            return job.status()
-        except KeyError:
-            pass
-        return ''
-
-    def supported_targets(self):
-        return self._builder.supported_targets()
-            
-
-class PassiveBuilder(Builder):
+class PassiveBuilder(Builder, threading.Thread):
     """
     Passive builders initiate no communication of their own.  They wait
     for the build server to contact them, and therefore may not be used
     behind a firewall without holes being punched through it.
+
+    The class structure here is somewhat convoluted, since PassiveBuilder
+    is actually an XML-RPC request handler as well as a subclass of Builder.
+    It's still it's own thread for housekeeping purposes, but most of it's
+    functions get called from a thread spawned by the XML-RPC server to handle
+    requests.
     """
     def __init__(self, cfg):
         Builder.__init__(self, cfg)
         self._http_server = None
         self._xmlrpc_server = None
+        self._work_dir = os.path.abspath(cfg.get_str("Directories", "builder_work_dir"))
+        threading.Thread.__init__(self)
 
-    def _start_servers(self):
-        # Start up the HTTP server thread which the build server
-        # pulls completed RPMs from
-        hostname = get_hostname(self._cfg, True)
-        port = self._cfg.get_int("Passive", "fileserver_port")
-        work_dir = self._cfg.get_str("Directories", "builder_work_dir")
-        self._http_server = HTTPServer.PlgHTTPServerManager((hostname, port), work_dir, self._certs)
-        self._http_server.start()
+        self._init_servers()
 
+    def _init_servers(self):
+        """Startup HTTP and XML-RPC servers which the build server uses
+        to talk to us."""
+        hostname = get_hostname(self._cfg, bind_all=True)
         self._log("Binding to address '%s'\n" % hostname)
+
+        port = self._cfg.get_int("Passive", "fileserver_port")
+        self._http_server = HTTPServer.PlgHTTPServerManager((hostname, port),
+                self._work_dir, self._certs)
+        self._http_server.set_POST_handler('/upload', self.upload_callback)
+
         xmlrpc_port = self._cfg.get_int("Passive", "xmlrpc_port")
         try:
-            if self._cfg.get_bool("SSL", "use_ssl") == True:
-                self._xmlrpc_server = AuthedXMLRPCServer.AuthedSSLXMLRPCServer((hostname, xmlrpc_port), None, self._certs)
+            if self._use_ssl:
+                self._xmlrpc_server = AuthedSSLXMLRPCServer((hostname, xmlrpc_port), None, self._certs)
             else:
-                self._xmlrpc_server = AuthedXMLRPCServer.AuthedXMLRPCServer((hostname, xmlrpc_port), None)
+                self._xmlrpc_server = AuthedXMLRPCServer((hostname, xmlrpc_port), None)
         except socket.error, exc:
             if exc[0] == 98:
                 raise BuilderInitException("Error: couldn't bind to address '%s:%s'.  "  \
                            "Is the builder already running?\n" % (hostname, xmlrpc_port))
 
-        brh = PassiveBuilderRequestHandler(self._cfg, self)
-        self._xmlrpc_server.register_instance(brh)
+        self._xmlrpc_server.register_instance(self)
 
-    def work(self):
-        self._start_servers()
-        try:
-            self._xmlrpc_server.serve_forever()
-        except KeyboardInterrupt:
-            pass
+    def _get_workdir_for_job(self, archjob_id):
+        return os.path.join(self._work_dir, archjob_id)
 
-    def download_srpm(self, url, target_dir, dl_callback, cb_data):
-        """For passive builders, the server uploads the RPM to the builder.
-        Therefore, we already have it.  Move it from the HTTP server's upload
-        directory to the requested target_dir, if the SRPM exists."""
-        pass
+    def upload_callback(self, request_handler, fs):
+        """Handle SRPM uploads from the server."""
+        # Ensure we know the server
+        if not self._server_ip:
+            self._get_server_ip()
+        ip = request_handler.client_address[0]
+        if not self._server_ip or self._server_ip != ip:
+            request_handler.send_error(403, "Unauthorized")
+            return
+
+        # Search for filename
+        fslist = [fs]
+        if not fs.name and not fs.filename and fs.value:
+            fslist = fs.value
+        jobid = filename = tmpfile = None
+        for item in fslist:
+            if item.name == 'archjob_id':
+                try:
+                    jobid = urllib.unquote(str(item.value))
+                    # Ensure archjob_id is only as long as a sha1 hash
+                    if len(jobid) is not 40:
+                        jobid = None
+                except ValueError:
+                    pass
+            elif item.name == 'filedata':
+                filename = item.filename
+                tmpfile = item.file
+
+        if jobid and filename and tmpfile:
+            upload_dir = os.path.join(self._get_workdir_for_job(jobid), "source")
+            destpath = os.path.join(upload_dir, urllib.unquote(filename))
+            dest = file(destpath, "w+b")
+            shutil.copyfileobj(tmpfile, dest)
+            dest.close()
+            request_handler.send_response(200, "Success")
+            request_handler.send_header("Content-type", "text/html")
+            request_handler.end_headers()
+            request_handler.wfile.write("<html><body>Success!</body></html>")
+        else:
+            request_handler.send_error(400, "Invalid request for %s" % request_handler.path)
 
-    def _stop_servers(self):
+    def run(self):
+        """Main builder loop.  Sit around and serve requests."""
+        self._http_server.start()
+        # We sit in serve_forever() until stopped
+        # FIXME: how do we stop this?  server_close() doesn't seem to
+        self._xmlrpc_server.serve_forever()
+        self._stopped = True
+
+    def stop(self):
+        """Tear down HTTP and XML-RPC servers and cleanup their resources."""
         self._http_server.stop()
         self._xmlrpc_server.stop()
-        try:
-            time.sleep(1)
-        except KeyboardInterrupt:
-            pass
         self._xmlrpc_server.server_close()
+        # FIXME: server_close() doesn't seem to stop the serve_forever()
+        return
 
-    def stop(self):
-        Builder.stop(self)
-        self._stop_servers()
+        while not self._stopped:
+            try:
+                time.sleep(0.2)
+            except KeyboardInterrupt:
+                pass
+
+    ###################################################################
+    # Code below called by XML-RPC request handler from request thread
+    ###################################################################
+
+    def request(self, cmd_list):
+        """Main XML-RPC handler, called by the build server.  Dispatches
+        the build server's requests and returns our response."""
+        cmds = Commands.deserialize_command_stream(cmd_list)
+        self._process_server_commands(cmds)
+        cmds_for_server = self._get_default_commands()
+        cmds_for_server = cmds_for_server + self._queued_cmds
+        self._queued_cmds = []
+        cmd_stream = Commands.serialize_to_command_stream(cmds_for_server)
+        return cmd_stream
+
+    def download_srpm(self, archjob_id, url, target_dir, dl_callback, cb_data=None):
+        """For passive builders, the server uploads the RPM to the builder.
+        Therefore, we already have it.  Move it from the HTTP server's upload
+        directory to the requested target_dir, if the SRPM exists."""
+
+        filename = os.path.basename(urllib.unquote(url))
+        source_dir = os.path.join(self._get_workdir_for_job(archjob_id), "source")
+        target_file = os.path.join(target_dir, filename)
+
+        result = FileTransfer.FT_RESULT_FAILED
+        msg = "Failed"
+
+        # Usually the upload dir will be the same as the archjob's target dir,
+        # but if it's not, copy the file over
+        if source_dir != target_dir:
+            try:
+                shutil.move(os.path.join(source_dir, filename), target_file)
+                result = FileTransfer.FT_RESULT_SUCCESS
+            except IOError, exc:
+                msg = str(exc)
+        elif os.access(target_file, os.R_OK):
+            # Otherwise make sure the files where the archjob wants it
+            result = FileTransfer.FT_RESULT_SUCCESS
+
+        if result == FileTransfer.FT_RESULT_SUCCESS:
+            msg = "Success"        
+        dl_callback(result, cb_data, msg)
+        return None
+
+    def upload_files(self, archjob_id, files, ul_callback, cb_data=None):
+        """For passive builders, the build server retrieves the result
+        files from the builder.  So we pretty much do nothing here, since
+        the work_dir is already the HTTP download dir."""
+        work_dir = self._get_workdir_for_job(archjob_id)
+        result = FileTransfer.FT_RESULT_FAILED
+        msg = "Failed"
+        for fpath in files:
+            if fpath.startswith(work_dir):
+                continue
+            last_part = fpath[len(work_dir):]
+            new_path = os.path.join(work_dir, last_part)
+            try:
+                shutil.move(fpath, new_path)
+                result = FileTransfer.FT_RESULT_SUCCESS
+            except IOError, exc:
+                msg = str(exc)
+                break
 
+        if result == FileTransfer.FT_RESULT_SUCCESS:
+            msg = "Success"
+        ul_callback(result, cb_data, msg)
+        return None
 
 
 # HACK: This class is a hack to work around SSL hanging issues,
@@ -412,35 +567,16 @@
 
     def __init__(self, cfg):
         Builder.__init__(self, cfg)
-        self._stop = False
-        self._stopped = False
         self._last_comm = time.time() - self._SERVER_CONTACT_INTERVAL - 1
-        self._queued_cmds = []
         self._xmlrpc_address = self._get_server_address(cfg.get_str("Active", "xmlrpc_port"))
         self._server = XMLRPCServerProxy.PlgXMLRPCServerProxy(self._xmlrpc_address, self._certs, timeout=20)
         threading.Thread.__init__(self)
 
     def _get_server_address(self, port):
-        addr = self._cfg.get_str("Active", "server")
-        if addr.startswith("http://"):
-            addr = addr[7:]
-        elif addr.startswith("https://"):
-            addr = addr[8:]
-        if self._cfg.get_bool("SSL", "use_ssl"):
-            addr = "https://" + addr
-        else:
-            addr = "http://" + addr
+        addr = prefix_url(self._server_hostname, self._use_ssl)
         return addr + ":" + port
 
-    def work(self):
-        self.start()
-        try:
-            while not self._stop:
-                time.sleep(60)
-        except KeyboardInterrupt:
-            pass
-
-    def download_srpm(self, url, target_dir, dl_callback, cb_data=None):
+    def download_srpm(self, archjob_id, url, target_dir, dl_callback, cb_data=None):
         """Download an SRPM from the build server.  Only used by BuilderMock
         objects."""
         downloader = FileDownloader.FileDownloader(url, target_dir, ['.src.rpm'],
@@ -450,14 +586,9 @@
         return downloader
 
     def upload_files(self, archjob_id, files, ul_callback, cb_data=None):
-        server = self._cfg.get_str("Active", "server")
-        (urltype, urlrest) = urllib.splittype(server)
-        (ignore, server) = urllib.splithost(urlrest)
-        if self._cfg.get_bool("SSL", "use_ssl"):
-            url = "https://" + server
-        else:
-            url = "http://" + server
-        url = url + ":%d/upload" % self._cfg.get_int("Active", "fileserver_port")
+        port = self._cfg.get_int("Active", "fileserver_port")
+        url = "%s:%d/upload" % (self._server_hostname, port)
+        url = prefix_url(url, self._use_ssl)
         data = [("archjob_id", archjob_id)]
         uploader = FileUploader.FileUploader(url, files, 'filedata', data,
                 self._certs)
@@ -479,27 +610,6 @@
             urls.append("file:///%s" % os.path.basename(fpath))
         return Commands.PlgCommandJobFilesAck(archjob_id, urls, cmd.seq(), self._seq_gen.next())
 
-    def _get_default_commands(self):
-        """Return a python list of serialized commands that the builder
-        sends to the server every time it contacts the server."""
-
-        defcmds = []
-
-        # always send a target list
-        next_seq = self._seq_gen.next()
-        cmd = Commands.PlgCommandTargets(self.supported_targets(), next_seq)
-        defcmds.append(cmd)
-
-        # always send free & max slots
-        next_seq = self._seq_gen.next()
-        (free_slots, max_slots) = self.slots()
-        cmd = Commands.PlgCommandSlots(free_slots, max_slots, next_seq)
-        defcmds.append(cmd)
-
-        defcmds.append(self._get_building_jobs_cmd())
-
-        return defcmds
-
     def _send_commands(self):
         """Send default commands, and any commands that we've queued up
         since the last time we sent commands to the server."""
@@ -524,37 +634,6 @@
             return req.response
         return None
 
-    def _dispatch_server_command(self, cmd):
-        """Process a single command from the server."""
-
-        if isinstance(cmd, Commands.PlgCommandNewJobReq):
-            (uniqid, msg) = self._start_new_job(cmd)
-            ack = Commands.PlgCommandNewJobAck(uniqid, msg, cmd.seq(), self._seq_gen.next())
-            self._queued_cmds.append(ack)
-        elif isinstance(cmd, Commands.PlgCommandUnlockRepo):
-            self._handle_unlock_repo_request(cmd)
-        elif isinstance(cmd, Commands.PlgCommandJobStatus):
-            reply = self._handle_job_status_request(cmd)
-            if reply:
-                self._queued_cmds.append(reply)
-        elif isinstance(cmd, Commands.PlgCommandJobFiles):
-            reply = self._handle_job_files_request(cmd)
-            if reply:
-                self._queued_cmds.append(reply)
-        elif isinstance(cmd, Commands.PlgCommandKillJob):
-            self._handle_kill_job_command(cmd)
-
-    def _process_server_response(self, response):
-        """Process the server's response command stream."""
-
-        if not response:
-            # Something went wrong...
-            return
-
-        cmds = Commands.deserialize_command_stream(response)
-        for cmd in cmds:
-            self._dispatch_server_command(cmd)
-
     def run(self):
         """Main builder loop, send commands to and receive commands from
         the server every so often."""
@@ -563,7 +642,7 @@
             if self._last_comm < time.time() - self._SERVER_CONTACT_INTERVAL:
                 self._last_comm = time.time()
                 resp = self._send_commands()
-                self._process_server_response(resp)
+                self._process_server_commands(resp)
             time.sleep(1)
         self._stopped = True
 


Index: BuilderMock.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/builder/BuilderMock.py,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- BuilderMock.py	14 May 2006 05:43:06 -0000	1.6
+++ BuilderMock.py	15 May 2006 17:26:45 -0000	1.7
@@ -27,7 +27,9 @@
 import errno
 import exceptions
 import time
+
 import Builder
+import Config
 
 from plague import ExecUtils
 from plague import FileDownloader
@@ -43,12 +45,9 @@
         return None
     file_part = file_path[len(work_dir) + 1:]
     port = "%s" % cfg.get_int("Network", "fileserver_port")
-    if cfg.get_bool("SSL", "use_ssl"):
-        method = "https://"
-    else:
-        method = "http://"
-    hostname = Builder.get_hostname(cfg, False)
-    full_url = "%s%s:%s/%s" % (method, hostname, port, file_part)
+    hostname = Builder.get_hostname(cfg)
+    full_url = "%s:%s/%s" % (hostname, port, file_part)
+    full_url = Builder.prefix_url(full_url, cfg.get_bool("SSL", "use_ssl"))
     return urllib.quote(full_url)
 
 
@@ -82,7 +81,12 @@
         self._uploader = None
         self.arch_command = ""
 
-        self._work_dir = self._builder_cfg.get_str("Directories", "builder_work_dir")
+        self._work_dir = os.path.abspath(self._builder_cfg.get_str("Directories", "builder_work_dir"))
+
+        self._source_dir = os.path.join(self._work_dir, self._uniqid, "source")
+        if not os.path.exists(self._source_dir):
+            os.makedirs(self._source_dir)
+
         self._result_dir = os.path.join(self._work_dir, self._uniqid, "result")
         if not os.path.exists(self._result_dir):
             os.makedirs(self._result_dir)
@@ -286,29 +290,27 @@
         return contents
 
     def dl_callback(self, dl_status, cb_data, err_msg=None):
-        url = cb_data
-        if dl_status == FileTransfer.FT_RESULT_SUCCESS:
-            self._status = 'downloaded'
-            self._log("Retrieved %s.\n" % url)
-        elif dl_status == FileTransfer.FT_RESULT_FAILED:
-            # If job was cancelled, just return
-            if self.is_done_status():
-                return
-            self._done_status = 'failed'
-            self._log("ERROR: Failed to retrieve %s.\n" % url)
-            self._post_cleanup()
-        elif dl_status == FileTransfer.FT_RESULT_CANCELED:
-            # Ignore cancelation
-            pass
+        if not self.is_done_status():
+            url = cb_data
+            if dl_status == FileTransfer.FT_RESULT_SUCCESS:
+                self._status = 'downloaded'
+                self._log("Retrieved %s.\n" % url)
+            elif dl_status == FileTransfer.FT_RESULT_FAILED:
+                self._done_status = 'failed'
+                self._log("ERROR: Failed to retrieve '%s' because: %s\n" % (url, err_msg))
+                self._post_cleanup()
+            elif dl_status == FileTransfer.FT_RESULT_CANCELED:
+                # Ignore cancelation
+                pass
         self._downloader = None
 
     def _status_init(self):
         self._log("Starting download of %s.\n" % self._srpm_url)
         self._status = 'downloading'
-        target_dir = os.path.dirname(self._srpm_path)
         try:
-            self._downloader = self._controller.download_srpm(self._srpm_url,
-                    target_dir, self.dl_callback, self._srpm_url)
+            target_dir = os.path.dirname(self._srpm_path)
+            self._downloader = self._controller.download_srpm(self._uniqid,
+                    self._srpm_url, target_dir, self.dl_callback, self._srpm_url)
         except FileDownloader.FileNameException, exc:
             self._done_status = 'failed'
             self._log("ERROR: Failed to begin SRPM download.  Error: '%s'  URL: %s\n" % (exc, self._srpm_url))
@@ -410,10 +412,10 @@
 
     def _post_cleanup(self):
         if self._done_status is not 'killed':
+            self._status = "uploading"
             self._files = self._find_files()
             self._uploader = self._controller.upload_files(self._uniqid, self._files,
                     self.ul_callback, None)
-            self._status = "uploading"
         else:
             self._status = self._done_status        
 
@@ -445,18 +447,18 @@
 
     def run(self):
         # Print out a nice message at the start of the job
-        target_dict = self._target_cfg.target_dict()
-        target_str = "%s-%s-%s-%s" % (target_dict['distro'], target_dict['target'], target_dict['arch'], target_dict['repo'])
+        target_str = Config.make_target_string_from_dict(self._target_cfg.target_dict())
+        time_str = time.asctime(time.localtime(self._starttime))
         self._log("""Starting job:
    Time: %s
    Target: %s
    UID: %s
    Architecture: %s
-   SRPM: %s\n\n""" % (time.asctime(time.localtime(self._starttime)), target_str, self._uniqid, self._buildarch, self._srpm_url))
+   SRPM: %s\n\n""" % (time_str, target_str, self._uniqid, self._buildarch, self._srpm_url))
 
         try:
             srpm_filename = FileDownloader.get_base_filename_from_url(self._srpm_url, ['.src.rpm'])
-            self._srpm_path = os.path.join(self._work_dir, self._uniqid, "source", srpm_filename)
+            self._srpm_path = os.path.join(self._source_dir, srpm_filename)
         except FileDownloader.FileNameException, exc:
             self._done_status = 'failed'
             self._log("ERROR: SRPM file name was invalid.  Message: '%s'\n" % exc)


Index: Config.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/builder/Config.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- Config.py	28 Apr 2006 03:17:35 -0000	1.3
+++ Config.py	15 May 2006 17:26:45 -0000	1.4
@@ -25,6 +25,8 @@
 def make_target_string(distro, target, arch, repo):
     return "%s-%s-%s-%s" % (distro, target, arch, repo)
 
+def make_target_string_from_dict(target_dict):
+    return make_target_string(target_dict['distro'], target_dict['target'], target_dict['arch'], target_dict['repo'])
 
 class BuilderConfig(BaseConfig.BaseConfig):
     def __init__(self, filename):
@@ -82,18 +84,18 @@
         self.set_option("General", "builder_cmd", "/usr/bin/mock")
         self.set_option("General", "builder_user", "plague-builder")
         self.set_option("General", "comm_type", "active")
+        self.set_option("General", "hostname", "localhost")
+        self.set_option("General", "server", "")
 
         self.add_section("Directories")
         self.set_option("Directories", "builder_work_dir", "/tmp/builder_work")
         self.set_option("Directories", "target_configs_dir", "/etc/plague/builder/targets")
 
         self.add_section("Active")
-        self.set_option("Active", "server", "")
         self.set_option("Active", "xmlrpc_port", "8886")
         self.set_option("Active", "fileserver_port", "8887")
 
         self.add_section("Passive")
-        self.set_option("Passive", "hostname", "")
         self.set_option("Passive", "xmlrpc_port", "8888")
         self.set_option("Passive", "fileserver_port", "8889")
 




More information about the fedora-extras-commits mailing list