extras-buildsys/server BuildMaster.py, 1.39, 1.40 Builder.py, 1.33, 1.34 BuilderManager.py, 1.21, 1.22 Config.py, 1.16, 1.17 main.py, 1.20, 1.21

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Fri Apr 28 03:17:43 UTC 2006


Author: dcbw

Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv5292/server

Modified Files:
	BuildMaster.py Builder.py BuilderManager.py Config.py main.py 
Log Message:
2006-04-27  Dan Williams  <dcbw at redhat.com>

    Commit partial rework of builder<->server communcation.




Index: BuildMaster.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuildMaster.py,v
retrieving revision 1.39
retrieving revision 1.40
diff -u -r1.39 -r1.40
--- BuildMaster.py	24 Mar 2006 19:13:41 -0000	1.39
+++ BuildMaster.py	28 Apr 2006 03:17:41 -0000	1.40
@@ -375,9 +375,6 @@
                 have_work = True
             self._archjob_status_updates_lock.release()
 
-        if not have_work and self.builder_manager.have_work(self._paused):
-            have_work = True
-
         return have_work
 
     def get_job(self, uid):
@@ -401,9 +398,6 @@
             # Write update status for jobs to the database
             self._save_job_status()
 
-            if self.builder_manager.have_work(self._paused):
-                self.builder_manager.process(self._paused)
-
             # Clean up jobs that have finished
             self._process_finished_jobs()
 


Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Builder.py,v
retrieving revision 1.33
retrieving revision 1.34
diff -u -r1.33 -r1.34
--- Builder.py	20 Mar 2006 12:43:21 -0000	1.33
+++ Builder.py	28 Apr 2006 03:17:41 -0000	1.34
@@ -22,6 +22,7 @@
 import os
 import urllib
 import threading
+from plague import Commands
 from plague import XMLRPCServerProxy
 from plague import CommonErrors
 import OpenSSL
@@ -34,6 +35,152 @@
 SUSPEND_TIMEOUT = 'timeout'
 SUSPEND_HARD_ERROR = 'hard-error'
 
+TYPE_PASSIVE = 1
+TYPE_ACTIVE = 2
+
+
+class Builder(threading.Thread):
+    """ Tracks all jobs on a builder instance """
+
+    def __init__(self, manager, cfg, address, weight, btype):
+        self._manager = manager
+        self._jobs = {}
+        self._free_slots = 0
+        self._num_slots = 0
+        self._address = address
+        self._available = False
+        self._suspend_reason = SUSPEND_NONE
+        self._stop = False
+        self._prepping_jobs = False
+        self._unavail_count = 0
+        self._target_list = []
+        self._when_died = 0
+        self._server_cfg = cfg
+        self._weight = weight
+        self._type = btype
+        self._seq_gen = Commands.SequenceGenerator()
+
+        try:
+            type, rest = urllib.splittype(address)
+            host, ignore = urllib.splithost(rest)
+            host, port = urllib.splitport(host)
+            self._ip = socket.gethostbyname(host)
+        except Exception, e:
+            print "Builder Error(%s): couldn't lookup builder's IP address." % address
+            raise Exception(e)
+
+        threading.Thread.__init__(self)
+        self.setName("Builder: %s" % address)
+
+    def _match_target_dict(self, td1, td2):
+        if td1['distro'] == td2['distro']:
+            if td1['target'] == td2['target']:
+                if td1['repo'] == td2['repo']:
+                    return True
+        return False
+
+    def arches(self, target_dict):
+        for td in self._target_list:
+            if self._match_target_dict(td, target_dict):
+                arches = []
+                for arch in td['supported_arches']:
+                    if not arch in arches:
+                        arches.append(arch)
+                return arches
+        return None
+
+    def can_build_for_target(self, target_dict):
+        for td in self._target_list:
+            if self._match_target_dict(td, target_dict):
+                if target_dict['arch'] in td['supported_arches']:
+                    return True
+        return False
+
+    def address(self):
+        return (self._ip, self._address)
+
+    def available(self):
+        """ Is the builder responding to requests? """
+        return self._available
+
+    def free_slots(self):
+        return self._free_slots
+
+    def weight(self):
+        return self._weight
+
+    def type(self):
+        return self._type
+
+    def stop(self):
+        self._stop = True
+
+    def _handle_builder_suspend(self, reason, msg):
+        for jobid in self._jobs.keys():
+            job = self._jobs[jobid]
+            job.builder_gone()
+            del self._jobs[jobid]
+        self._jobs = {}
+        self._available = False
+        self._suspend_reason = reason
+        self._unavail_count = 0
+        self._prepping_jobs = False
+        self._when_died = time.time()
+
+        # Notify admins
+        print "Suspending builder '%s'.  Reason: %s - %s." % (self._address, reason, msg)
+        subject = "Builder Suspended: %s" % self._address
+        msg = "The builder '%s' was suspended.  Reason: %s - %s." % (self._address, reason, msg)
+        sender = self._server_cfg.get_str("Email", "email_from")
+        for addr in self._server_cfg.get_list("Email", "admin_emails"):
+            EmailUtils.email_result(sender, addr, msg, subject)
+
+    def _handle_builder_reactivate(self, mail=False):
+        self._available = True
+        self._suspend_reason = SUSPEND_NONE
+
+        print "Re-activating builder '%s'." % self._address
+
+        if mail:
+            subject = "Builder Re-activated: %s" % self._address
+            msg = """The builder '%s' was re-activated.
+
+  Suspended at: %s
+  Re-Enabled at: %s
+""" % (self._address, time.ctime(self._when_died), time.ctime(time.time()))
+            sender = self._server_cfg.get_str("Email", "email_from")
+            for addr in self._server_cfg.get_list("Email", "admin_emails"):
+                EmailUtils.email_result(sender, addr, msg, subject)
+        self._when_died = 0
+
+    def any_prepping_jobs(self):
+        return self._prepping_jobs
+
+    def to_dict(self):
+        builder_dict = {}
+
+        addr = self._address
+        # for some reason, splithost doesn't like the protocol
+        # method, you have to give it a string starting with "//"
+        if addr.startswith("http"):
+            idx = addr.find('//')
+            addr = addr[idx:]
+        host_port, path = urllib.splithost(addr)
+        host, port = urllib.splitport(host_port)
+        builder_dict['address'] = host
+
+        arches = []
+        for td in self._target_list:
+            for arch in td['supported_arches']:
+                if not arch in arches:
+                    arches.append(arch)
+        builder_dict['arches'] = arches
+
+        builder_dict['available'] = self._available
+        builder_dict['num_slots'] = self._num_slots
+        builder_dict['free_slots'] = self._free_slots
+        return builder_dict
+
 # HACK: This class is a hack to work around SSL hanging issues,
 # which cause the whole server to grind to a halt
 class BuildingJobsCheck(threading.Thread):
@@ -61,31 +208,21 @@
         self.free_slots = free_slots
         self.done = True
 
-
-class Builder(threading.Thread):
-    """ Tracks all jobs on a builder instance """
+class PassiveBuilder(Builder):
+    """
+    Passive builders are ones that do not initiate connections.  They
+    wait for the server to contact them, and therefore cannot be behind
+    a firewall without having holes punched through it.
+    """
 
     _BUILDER_PING_INTERVAL = 60 * 5      # In seconds
 
-    def __init__(self, manager, cfg, address, weight):
-        self._cur_jobid = None
-        self._manager = manager
-        self._jobs = {}
-        self._free_slots = 0
-        self._num_slots = 0
-        self._address = address
-        self._alive = True
-        self._suspend_reason = SUSPEND_NONE
-        self._stop = False
-        self._prepping_jobs = False
-        self._unavail_count = 0
-        self._target_list = []
+    def __init__(self, manager, cfg, address, weight, btype):
+        Builder.__init__(self, manager, cfg, address, weight, btype)
+
         self._ping_timeout = 0
         self._cur_ping_interval = self._BUILDER_PING_INTERVAL
         self._ping_now = False
-        self._when_died = 0
-        self._server_cfg = cfg
-        self._weight = weight
 
         certs = None
         if self._server_cfg.get_bool("Builders", "use_ssl"):
@@ -97,17 +234,26 @@
         self._server = XMLRPCServerProxy.PlgXMLRPCServerProxy(self._address, certs, timeout=20)
         self._server_lock = threading.Lock()
 
-        threading.Thread.__init__(self)
-        self.setName("Builder: %s" % address)
-
-        (self._alive, target_list) = self._ping_builder()
-        if self._alive:
+        (self._available, target_list) = self._ping_builder()
+        if self._available:
             self._init_builder(target_list)
         else:
             # Treat the builder as timed out and ping it periodically
             self._ping_timeout = time.time()
             self._suspend_reason = SUSPEND_TIMEOUT
 
+    def _ping_builder(self):
+        target_list = []
+        try:
+            target_list = self._server.supported_targets()
+            alive = True
+        except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
+            alive = False
+        except xmlrpclib.Fault, e:
+            print "Builder Error (%s) in _ping_builder(): builder replied '%s'" % (self._address, e)
+            alive = False
+        return (alive, target_list)
+
     def _init_builder(self, target_list):
         self._target_list = target_list
 
@@ -146,7 +292,7 @@
         if bjc.done:
             if not bjc.failed:
                 self._unavail_count = 0
-                self._alive = True
+                self._available = True
                 self._free_slots = bjc.free_slots
                 return bjc.jobs
             else:
@@ -157,55 +303,22 @@
 
         return {}
 
-    def _ping_builder(self):
-        target_list = []
-        try:
-            target_list = self._server.supported_targets()
-            alive = True
-        except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, OpenSSL.SSL.Error, xmlrpclib.ProtocolError):
-            alive = False
-        except xmlrpclib.Fault, e:
-            print "Builder Error (%s) in _ping_builder(): builder replied '%s'" % (self._address, e)
-            alive = False
-        return (alive, target_list)
-
-    def _match_target_dict(self, td1, td2):
-        if td1['distro'] == td2['distro']:
-            if td1['target'] == td2['target']:
-                if td1['repo'] == td2['repo']:
-                    return True
-        return False
-
-    def arches(self, target_dict):
-        for td in self._target_list:
-            if self._match_target_dict(td, target_dict):
-                arches = []
-                for arch in td['supported_arches']:
-                    if not arch in arches:
-                        arches.append(arch)
-                return arches
-        return None
-
-    def can_build_for_target(self, target_dict):
-        for td in self._target_list:
-            if self._match_target_dict(td, target_dict):
-                if target_dict['arch'] in td['supported_arches']:
-                    return True
-        return False
-
-    def address(self):
-        return self._address
+    def ping_asap(self):
+        # Reduce the ping interval to ping the builder right away
+        self._cur_ping_interval = 0
+        self._ping_now = True
 
-    def alive(self):
-        """ Is the builder responding to requests? """
-        return self._alive
+    def _handle_builder_suspend(self, reason, msg):
+        Builder._handle_builder_suspend(self, reason, msg)
+        # Reset current ping interval to default
+        self._cur_ping_interval = self._BUILDER_PING_INTERVAL
+        self._ping_timeout = time.time()
 
-    def free_slots(self):
-        return self._free_slots
+    def _handle_builder_reactivate(self):
+        Builder._handle_builder_reactivate(self)
+        self._ping_timeout = 0
+        self._init_builder(target_list)
 
-    def weight(self):
-        return self._weight
-    
     def start_job(self, par_job, target_dict, srpm_url):
         if not self.available():
             raise RuntimeError
@@ -285,63 +398,12 @@
             pass
         return status
 
-    def stop(self):
-        self._stop = True
-
-    def ping_asap(self):
-        # Reduce the ping interval to ping the builder right away
-        self._cur_ping_interval = 0
-        self._ping_now = True
-
-    def _handle_builder_suspend(self, reason, msg):
-        for jobid in self._jobs.keys():
-            job = self._jobs[jobid]
-            job.builder_gone()
-            del self._jobs[jobid]
-        self._jobs = {}
-        self._alive = False
-        self._suspend_reason = reason
-        self._unavail_count = 0
-        self._prepping_jobs = False
-        self._when_died = time.time()
-
-        # Reset current ping interval to default
-        self._cur_ping_interval = self._BUILDER_PING_INTERVAL
-        self._ping_timeout = time.time()
-
-        # Notify admins
-        print "Suspending builder '%s'.  Reason: %s - %s." % (self._address, reason, msg)
-        subject = "Builder Suspended: %s" % self._address
-        msg = "The builder '%s' was suspended.  Reason: %s - %s." % (self._address, reason, msg)
-        sender = self._server_cfg.get_str("Email", "email_from")
-        for addr in self._server_cfg.get_list("Email", "admin_emails"):
-            EmailUtils.email_result(sender, addr, msg, subject)
-
-    def _handle_builder_reactivate(self, target_list):
-        self._alive = True
-        self._suspend_reason = SUSPEND_NONE
-        self._ping_timeout = 0
-
-        self._init_builder(target_list)
-
-        print "Re-activating builder '%s'." % self._address
-        subject = "Builder Re-activated: %s" % self._address
-        msg = """The builder '%s' was re-activated.
-
-  Suspended at: %s
-  Re-Enabled at: %s
-""" % (self._address, time.ctime(self._when_died), time.ctime(time.time()))
-        sender = self._server_cfg.get_str("Email", "email_from")
-        for addr in self._server_cfg.get_list("Email", "admin_emails"):
-            EmailUtils.email_result(sender, addr, msg, subject)
-        self._when_died = 0
-
     def run(self):
         DebugUtils.registerThreadName(self)
         while not self._stop:
             self._server_lock.acquire()
 
-            if self._alive:
+            if self._available:
                 self._update_building_jobs()
 
                 if self._unavail_count > 2:
@@ -351,12 +413,12 @@
                     # Update status of all archjobs on this builder
                     for j in self._jobs.values():
                         j.process()
-            elif not self._alive and (self._suspend_reason == SUSPEND_TIMEOUT or self._ping_now):
+            elif not self._available and (self._suspend_reason == SUSPEND_TIMEOUT or self._ping_now):
                 # Ping the builder every so often to see if it responds again
                 if time.time() > (self._ping_timeout + self._cur_ping_interval):
                     (alive, target_list) = self._ping_builder()
                     if alive:
-                        self._handle_builder_reactivate(target_list)
+                        self._handle_builder_reactivate()
                     else:
                         # Wait and ping again
                         self._ping_timeout = time.time()
@@ -368,40 +430,131 @@
             self._server_lock.release()
 
             time.sleep(20)
-    
-    def available(self):
-        """
-        Can the builder start a new job right now?
-        """
-        if self._unavail_count > 2 or not self._alive or self.free_slots() <= 0:
-            return False
-        return True
 
-    def any_prepping_jobs(self):
-        return self._prepping_jobs
 
-    def to_dict(self):
-        builder_dict = {}
+class ActiveBuilder(Builder):
+    """
+    Active builders are ones which attempt to contact the build server
+    by themselves.  Therefore, they can be behind a firewall without
+    punching holes through it.
+    """
+
+    _REQUIRED_CONTACT_INTERVAL = 20
+
+    def __init__(self, manager, cfg, address, weight, btype):
+        Builder.__init__(self, manager, cfg, address, weight, btype)
+        self._last_contact = 0
+        self._lock = threading.Lock()
+        self._cmd_queue = []
 
-        addr = self._address
-        # for some reason, splithost doesn't like the protocol
-        # method, you have to give it a string starting with "//"
-        if addr.startswith("http"):
-            idx = addr.find('//')
-            addr = addr[idx:]
-        host_port, path = urllib.splithost(addr)
-        host, port = urllib.splitport(host_port)
-        builder_dict['address'] = host
+    def _init_builder(self, target_list):
+        self._target_list = target_list
 
-        arches = []
-        for td in self._target_list:
-            for arch in td['supported_arches']:
-                if not arch in arches:
-                    arches.append(arch)
-        builder_dict['arches'] = arches
+    def _handle_new_job_ack(self, ack):
+        """Handle a NewJobAck command by finding the original command
+        sent to the builder, removing it from the command queue, and notifying
+        the parent job that this archjob is now in progress."""
+
+        old_cmd = None
+        self._lock.acquire()
+        for old_cmd in self._cmd_queue:
+            if old_cmd.seq() == ack.acked_seq() and isinstance(old_cmd, Commands.PlgCommandNewJobReq):
+                self._cmd_queue.remove(old_cmd)
+                break
+        self._lock.release()
 
-        builder_dict['alive'] = self._alive
-        builder_dict['num_slots'] = self._num_slots
-        builder_dict['free_slots'] = self._free_slots
-        return builder_dict
+        if old_cmd:
+            parent = old_cmd.parent_job()
+            archjob = ArchJob.ArchJob(self, parent, ack.archjob_id(), old_cmd.target_dict())
+            self._jobs[jobid] = archjob
+            parent.add_arch_job(archjob)
+
+    def _dispatch_command(self, cmd):
+        name = cmd.name()
+        if isinstance(cmd, Commands.PlgCommandSlots):
+            self._lock.acquire()
+            self._free_slots = cmd.free_slots()
+            self._num_slots = cmd.max_slots()
+            self._lock.release()
+        elif isinstance(cmd, Commands.PlgCommandTargets):
+            self._lock.acquire()
+            self._target_list = cmd.targets()
+            self._lock.release()
+        elif isinstance(cmd, Commands.PlgCommandNewJobAck):
+            self._handle_new_job_ack(cmd)
+        else:
+            print "Builder Error (%s): unhandled command '%s'" % (self._address, cmd.name())
+
+    def request(self, cmd_list):
+        """Process and respond to an active builder's request.  Called
+        from the BuildMaster's XML-RPC server."""
+
+        self._last_contact = time.time()
+        if not self._available:
+            self._handle_builder_reactivate(cmd_list)
+
+        # Process the commands the builder sent us
+        for cmd in cmd_list:
+            self._dispatch_command(cmd)
+
+        # Grab some work for the builder if any is available
+        new_cmds = []
+        if self._free_slots > 0:
+            req = self._manager.claim_arch_job(self)
+            if req:
+                next_seq = self._seq_gen.next()
+                cmd = Commands.PlgCommandNewJobReq(req['parent'], req['target_dict'], req['srpm_url'], next_seq)
+                new_cmds.append(cmd)
+
+        self._lock.acquire()
+        # Copy command queue
+        self._cmd_queue = self._cmd_queue + new_cmds
+        cmd_list = self._cmd_queue[:]
+        self._lock.release()
+        return cmd_list
+
+    _SLEEP_INTERVAL = 10
+    def run(self):
+        """Main builder loop.  Since the builder contacts us,
+        we don't have to do much here except handle builders
+        going away."""
 
+        DebugUtils.registerThreadName(self)
+        while not self._stop:
+            if not self._available:
+                time.sleep(self._SLEEP_INTERVAL)
+                continue
+
+            self._lock.acquire()
+            if self._unavail_count > 2:
+                self._handle_builder_suspend(SUSPEND_TIMEOUT, "the builder timed out")
+            elif self._last_contact + self._REQUIRED_CONTACT_INTERVAL < time.time():
+                self._unavail_count = self._unavail_count + 1
+            self._lock.release()
+
+            time.sleep(self._SLEEP_INTERVAL)
+
+    def _handle_builder_suspend(self, reason, msg):
+        Builder._handle_builder_suspend(self, reason, msg)
+        self._last_contact = 0
+
+    def _handle_builder_reactivate(self, cmd_list):
+        # Grab an updated target list from the command stream when
+        # the builder contacts us
+        target_list = None
+        for cmd in cmd_list:
+            if isinstance(cmd, Commands.PlgCommandTargets):
+                target_list = cmd.targets()
+        if not target_list:
+            target_list = self._target_list
+
+        mail = True
+        if self._suspend_reason == SUSPEND_NONE:
+            # Don't send mail saying the builder has been reactivated if
+            # this is the first time the builder has contacted us
+            mail = False
+
+        self._lock.acquire()
+        Builder._handle_builder_reactivate(self, mail=mail)
+        self._init_builder(target_list)
+        self._lock.release()


Index: BuilderManager.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuilderManager.py,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -r1.21 -r1.22
--- BuilderManager.py	20 Mar 2006 20:05:11 -0000	1.21
+++ BuilderManager.py	28 Apr 2006 03:17:41 -0000	1.22
@@ -24,91 +24,230 @@
 import Builder
 import EmailUtils
 import Config
+import time
+from plague import DebugUtils
+from plague import AuthedXMLRPCServer
+from plague import HTTPServer
+from plague import Commands
+
+
+class AddrCache(object):
+    def __init__(self):
+        self._cache = {}
+
+    def get(self, name):
+        # Expire cache entry if one exists and is old
+        time = ip = None
+        try:
+            (time, ip) = self._cache[name]
+            if time < time.time() - (60 * 60):
+                del self._cache[name]
+                time = ip = None
+        except KeyError:
+            pass
+
+        # Do a lookup and cache it
+        if not ip:
+            try:
+                ip = socket.gethostbyname(name)
+                self._cache[name] = (time.time(), ip)
+            except:
+                pass
+
+        return ip
+
+class AuthedSSLBuilderServer(AuthedXMLRPCServer.AuthedSSLXMLRPCServer):
+    """ SSL XMLRPC server that authenticates builders based on their certificate. """
+    def __init__(self, address, certs, builder_manager):
+        AuthedXMLRPCServer.AuthedSSLXMLRPCServer.__init__(self, address, self.auth_cb, certs)
+        self.authenticator = builder_manager
+        self._addr_cache = AddrCache()
+
+    def auth_cb(self, request, con_addr_pair):
+        peer_cert = request.get_peer_certificate()
+        cert_address = peer_cert.get_subject().commonName        
+        try:
+            (con_address, con_port) = con_addr_pair
+            cert_ip = self._addr_cache.get(cert_address)
+            con_ip = self._addr_cache.get(con_address)
+            builder = self.authenticator.get_builder(cert_ip, con_ip)
+            if builder.type() is not Builder.TYPE_ACTIVE:
+                builder = None
+        except Exception:
+            builder = None
+        return builder
+
+class AuthedBuilderServer(AuthedXMLRPCServer.AuthedXMLRPCServer):
+    """ Authenticates builders based on their IP address. """
+    def __init__(self, address, builder_manager):
+        AuthedXMLRPCServer.AuthedXMLRPCServer.__init__(self, address, self.auth_cb)
+        self.authenticator = builder_manager
+        self._addr_cache = AddrCache()
+
+    def auth_cb(self, request, con_addr_pair):
+        try:
+            (con_address, con_port) = con_addr_pair
+            ip = self._addr_cache.get(con_address)
+            builder = self.authenticator.get_builder(ip, ip)
+            if builder.type() is not Builder.TYPE_ACTIVE:
+                builder = None
+        except Exception:
+            builder = None
+        return builder
+
+
+class BuilderDispatcher(object):
+    def request(self, cmd_list):
+        # Authorize the builder, then pass the request
+        # to the correct builder object
+
+        builder = AuthedXMLRPCServer.get_authinfo()
+        if not builder:
+            cmd = Commands.PlgCommandError("Builder is not authorized")
+            return [cmd.serialize()]
+
+        cmds = Commands.deserialize_command_stream(cmd_list)
+        cmds_for_server = builder.request(cmds)
+        cmd_stream = Commands.serialize_to_command_stream(cmds_for_server)
+        return cmd_stream
 
 
-class BuilderManager:
+class BuilderServerThread(threading.Thread):
     """
-    Tracks individual builder instances.
+    Object to serve active builder requests in a separate thread.
+    Can't block the main BuilderManager object by sitting in
+    serve_forever().
     """
+    def __init__(self, cfg, bm):
+        self._cfg = cfg
+        self._bm = bm
+        self._stopped = False
+        threading.Thread.__init__(self)
+        self.setName("BuilderServerThread")
+
+        hostname = cfg.get_str("General", "hostname")
+        port = cfg.get_int("Active Builders", "xmlrpc_server_port")
+        if cfg.get_bool("Builders", "use_ssl") == True:
+            certs = {}
+            certs['key_and_cert'] = cfg.get_str("SSL", "server_key_and_cert")
+            certs['ca_cert'] = cfg.get_str("SSL", "ca_cert")
+            certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert")
+            self._server = AuthedSSLBuilderServer((hostname, port), certs, self._bm)
+        else:
+            self._server = AuthedBuilderServer((hostname, port), self._bm)
+        self._dispatcher = BuilderDispatcher()
+        self._server.register_instance(self._dispatcher)
+
+    def run(self):
+        DebugUtils.registerThreadName(self)
+        self._server.serve_forever()
+        self._stopped = True
+
+    def stop(self):
+        self._server.stop()
+        t = time.time()
+        while not self._stopped:
+            try:
+                if time.time() > t + 2:
+                    break
+            except KeyboardInterrupt:
+                pass
+
+
+class BuilderManager:
+    """ Tracks individual builder instances. """
 
     def __init__(self, cfg):
         self._cfg = cfg
         self._builders_lock = threading.Lock()
 
         self._builders = []
-        self.add_new_builders()
+        any_active = self._load_builders()
+        self._print_builders()
+
+        self._queue_lock = threading.Lock()
+        self._queue = []
+
+        self._xmlrpc_server = None
+        if any_active:
+            # Builder XMLRPC server
+            # Only start it when there are active-type builders listed
+            # in the config file
+            self._xmlrpc_server = BuilderServerThread(cfg, self)
+            self._xmlrpc_server.start()
+
+        # Builder HTTP fileserver
+        hostname = cfg.get_str("General", "hostname")
+        port = cfg.get_int("Active Builders", "file_server_port")
+        http_dir = os.path.join(cfg.get_str("Directories", "server_work_dir"), "srpm_http_dir")
+        certs = {}
+        if cfg.get_bool("Builders", "use_ssl"):
+            certs['key_and_cert'] = cfg.get_str("SSL", "server_key_and_cert")
+            certs['ca_cert'] = cfg.get_str("SSL", "ca_cert")
+            certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert")
+        self._srpm_server = HTTPServer.PlgHTTPServerManager((hostname, port), http_dir, certs)
+        self._srpm_server.start()
 
+    def _print_builders(self):
         # Print out builder list when starting up
-        print "\nBuilders:"
+        print "\nAuthorized Builders:"
         print "-" * 90
         for builder in self._builders:
-            string = "  " + builder.address()
-            string = string + " " * (40 - len(builder.address()))
+            (ip, addr) = builder.address()
+            string = "  " + addr
+            string = string + " " * (40 - len(addr))
             builder_dict = builder.to_dict()
             for arch in builder_dict['arches']:
                 string = string + arch + " "
             string = string + " " * (80 - len(string))
             status = "unavailable"
-            if builder_dict['alive']:
-                status = "alive"
+            if builder_dict['available']:
+                status = "available"
             string = string + status
             del builder_dict
             print string
         print ""
 
-        self._queue_lock = threading.Lock()
-        self._queue = []
-
-        self._have_work = False
-
-    def __del__(self):
+    def stop(self):
         for builder in self._builders:
             builder.stop()
-        time.sleep(2)
+        if self._xmlrpc_server:
+            self._xmlrpc_server.stop()
+        self._srpm_server.stop()
 
-    def set_build_master(self, build_master):
-        self._build_master = build_master
-
-    def add_new_builders(self):
+    def _load_builders(self):
         self._builders_lock.acquire()
-
-        tmp_list = self._cfg.builders()
-        prefix = "http://"
-        if self._cfg.get_bool("Builders", "use_ssl") == True:
-            prefix = "https://"
-
-        builder_list = {}
-        for addr in tmp_list.keys():
-            new_addr = addr
-            # Rewrite addresses to match current builder connection method
-            if addr.startswith("http://"):
-                new_addr = addr[7:]
-            elif addr.startswith("https://"):
-                new_addr = addr[8:]
-            if new_addr:
-                builder_list[prefix + new_addr] = tmp_list[addr]
-
-        for address in builder_list.keys():
+        any_active = False
+        builders = self._cfg.builders()
+        for address in builders:
+            (weight, btype) = builders[address]
+            if btype == Builder.TYPE_ACTIVE:
+                any_active = True
             # If the address is already in our _builders list, skip it
             skip = False
             for builder in self._builders:
-                if address == builder.address():
+                (ip, addr) = builder.address()
+                if address == addr:
                     skip = True
+                    break
             if skip == True:
                 continue
 
             # Add the builder to our build list
-            weight = builder_list[address]
-            builder = Builder.Builder(self, self._cfg, address, weight)
+            if btype == Builder.TYPE_ACTIVE:
+                builder = Builder.ActiveBuilder(self, self._cfg, address, weight, btype)
+            else:
+                builder = Builder.PassiveBuilder(self, self._cfg, address, weight, btype)
             builder.start()
             self._builders.append(builder)
-
         self._builders_lock.release()
+        return any_active
 
     def ping_suspended_builders(self):
         self._builders_lock.acquire()
         for builder in self._builders:
-            if not builder.alive():
+            passive = (builder.type() == Builder.TYPE_PASSIVE)
+            if passive and not builder.alive():
                 builder.ping_asap()
         self._builders_lock.release()
 
@@ -118,14 +257,22 @@
             builder_list.append(builder.to_dict())
         return builder_list
 
-    def have_work(self, paused):
-        avail = False
-        for builder in self._builders:
-            if builder.available():
-                avail = True
-        if not paused and len(self._queue) > 0 and avail:
-            return True
-        return self._have_work
+    def get_builder(self, cert_ip, con_ip):
+        self._builders_lock.acquire()
+        builder = None
+
+        # Ensure builder's certificate (if SSL) and
+        # the remote address of its connection are the same
+        if cert_ip == con_ip:
+            # Find matching builder in our authorized builders list
+            for b in self._builders:
+                (ip, addr) = b.address()
+                if cert_ip == ip:
+                    builder = b
+                    break
+
+        self._builders_lock.release()
+        return builder
 
     def _builder_cmp_func(self, builder1, builder2):
         # If both builders have at least one free slot, sort on
@@ -150,62 +297,23 @@
             return -1
         return 1
 
-    def process(self, paused):
-        self._have_work = False
-
-        # Don't queue any new jobs if we are paused
-        if paused:
-            return
-
-        # Deal with new arch jobs
+    def claim_arch_job(self, builder):
+        archjob = None
         self._queue_lock.acquire()
-        new_jobs = {}
         for req in self._queue:
-            parent = req['parent']
-            stage = parent.cur_stage()
-            if stage != 'building' and stage != 'waiting':
-                self._queue.remove(req)
-                continue
-
-            # Find all free builders that could satisfy the request
-            possible_builders = []
-            for builder in self._builders:
-                if builder.available() and builder.can_build_for_target(req['target_dict']):
-                    possible_builders.append(builder)
-
-            # Sort builder list by free slots and weights
-            possible_builders.sort(self._builder_cmp_func)
-            possible_builders.reverse()
-
-            for builder in possible_builders:
-                try:
-                    job = builder.start_job(parent, req['target_dict'], req['srpm_url'])
-                except RuntimeError, e:
-                    print "Builder (%s) couldn't start job %s because: '%s'" % (builder.address(),
-                        req['target_dict'], e)
-                    continue
-
-                if not new_jobs.has_key(parent):
-                    new_jobs[parent] = []
-                new_jobs[parent].append(job)
+            if builder.can_build_for_target(req['target_dict']):
                 self._queue.remove(req)
+                archjob = req
                 break
         self._queue_lock.release()
-
-        # Notify the parent jobs of their new archjobs.  Have to do this outside _queue_lock
-        # for locking reasons
-        for parent in new_jobs.keys():
-            for job in new_jobs[parent]:
-                parent.add_arch_job(job)
-
-        if len(self._queue) > 0:
-            time.sleep(0.25)
+        return archjob
 
     def request_arch_job(self, par_job, target_dict, srpm_url, orphaned):
         req = {}
         req['parent'] = par_job
         req['target_dict'] = target_dict
         req['srpm_url'] = srpm_url
+        req['time_queued'] = time.time()
 
         self._queue_lock.acquire()
         if orphaned:


Index: Config.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Config.py,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -r1.16 -r1.17
--- Config.py	20 Mar 2006 18:28:37 -0000	1.16
+++ Config.py	28 Apr 2006 03:17:41 -0000	1.17
@@ -18,6 +18,7 @@
 import fnmatch
 from ConfigParser import ConfigParser
 from plague import BaseConfig
+import Builder
 
 
 def make_target_string(distro, target, repo):
@@ -97,11 +98,21 @@
             except InvalidTargetException, e:
                 print "Error: could not add target %s because: %s" % (f, e)
 
-    def _load_builders(self):
-        if not self._config.has_section("Builders"):
+    def _get_builders_of_type(self, btype):
+        if btype == Builder.TYPE_PASSIVE:
+            section = "Passive Builders"
+        elif btype == Builder.TYPE_ACTIVE:
+            section = "Active Builders"
+        else:
+            raise Exception("Unknown builder type %d" % btype)
+
+        if not self._config.has_section(section):
             return {}
-        items = self._config.items("Builders")
-        builder_list = {}
+        prefix = "http://"
+        if self.get_bool("Builders", "use_ssl") == True:
+            prefix = "https://"
+        items = self._config.items(section)
+        list = {}
         for (tag, builder) in items:
             if not tag.startswith("builder"):
                 continue
@@ -116,8 +127,26 @@
                 weight = int(weight_str)
             except ValueError:
                 weight = 0
-            builder_list[addr] = weight
-        return builder_list
+            # Rewrite addresses to match current builder connection method
+            new_addr = addr
+            if addr.startswith("http://"):
+                new_addr = addr[7:]
+            elif addr.startswith("https://"):
+                new_addr = addr[8:]
+            if new_addr:
+                list[prefix + new_addr] = (weight, btype)
+        return list
+
+    def _load_builders(self):
+        passive_builders = self._get_builders_of_type(Builder.TYPE_PASSIVE)
+        active_builders = self._get_builders_of_type(Builder.TYPE_ACTIVE)
+
+        builders = {}
+        for key in active_builders.keys():
+            builders[key] = active_builders[key]
+        for key in passive_builders.keys():
+            builders[key] = passive_builders[key]
+        return builders
 
     def save_default_config(self, filename=None):
         self.add_section("General")
@@ -139,8 +168,15 @@
 
         self.add_section("Builders")
         self.set_option("Builders", "use_ssl", "yes")
-        self.set_option("Builders", "builder1", "20 127.0.0.1:8888")
-        self.set_option("Builders", "builder2", "0  127.0.0.2:8888")
+
+        self.add_section("Active Builders")
+        self.set_option("Active Builders", "xmlrpc_server_port", "8889")
+        self.set_option("Active Builders", "file_server_port", "8890")
+        self.set_option("Active Builders", "builder1", "20 127.0.0.1")
+        self.set_option("Active Builders", "builder2", "0  127.0.0.2")
+
+        self.add_section("Passive Builders")
+        self.set_option("Passive Builders", "builder1", "20 127.0.0.1:8888")
 
         self.add_section("SSL")
         self.set_option("SSL", "server_key_and_cert", "/etc/plague/server/certs/server_key_and_cert.pem")
@@ -152,7 +188,7 @@
         self.add_section("UI")
         self.set_option("UI", "use_ssl", "yes")
         self.set_option("UI", "client_ca_cert", "/etc/plague/server/certs/ui_ca_cert.pem")
-        self.set_option("UI", "port", "8887")
+        self.set_option("UI", "port", "8888")
         self.set_option("UI", "guest_allowed", "yes")
         self.set_option("UI", "log_url", "http://www.foo.com/logs/")
 


Index: main.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/main.py,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -r1.20 -r1.21
--- main.py	13 Mar 2006 13:05:38 -0000	1.20
+++ main.py	28 Apr 2006 03:17:41 -0000	1.21
@@ -124,9 +124,8 @@
 
     dbm = DBManager.DBManager(cfg)
 
-    builder_manager = BuilderManager.BuilderManager(cfg)
-
     # Create the BuildMaster thread
+    builder_manager = BuilderManager.BuilderManager(cfg)
     bm = BuildMaster.BuildMaster(builder_manager, dbm, cfg)
     bm.start()
 
@@ -148,20 +147,8 @@
         if e[0] == 98:      # Address already in use
             print "Error: couldn't bind to address '%s:%s'.  Is the server already running?" % (hostname, port)
             os._exit(1)
-
     bm_server.register_instance(ui)
 
-    # SRPM fileserver
-    SRPM_SERVER_PORT = 8886
-    http_dir = os.path.join(cfg.get_str("Directories", "server_work_dir"), "srpm_http_dir")
-    srpm_server_certs = {}
-    if cfg.get_bool("Builders", "use_ssl"):
-        srpm_server_certs['key_and_cert'] = cfg.get_str("SSL", "server_key_and_cert")
-        srpm_server_certs['ca_cert'] = cfg.get_str("SSL", "ca_cert")
-        srpm_server_certs['peer_ca_cert'] = cfg.get_str("SSL", "ca_cert")
-    srpm_server = HTTPServer.PlgHTTPServerManager((hostname, SRPM_SERVER_PORT), http_dir, srpm_server_certs)
-    srpm_server.start()
-
     # Create dummy thread just to register main thread's name
     dummy = threading.Thread()
     dummy.setName("MainThread")
@@ -182,9 +169,9 @@
     # Make sure the BuildMaster thread shuts down
     print "Shutting down..."
     bm.stop()
-    srpm_server.stop()
     if use_tbs:
         tbs.stop()
+    builder_manager.stop()
 
     if opts.pidfile:
         os.unlink(opts.pidfile)




More information about the fedora-extras-commits mailing list