extras-buildsys/server ArchJob.py, 1.8, 1.9 Builder.py, 1.9, 1.10 BuilderManager.py, 1.9, 1.10 PackageJob.py, 1.18, 1.19 UserInterface.py, 1.42, 1.43

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Sat Aug 6 02:35:09 UTC 2005


Author: dcbw

Update of /cvs/fedora/extras-buildsys/server
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv11669/server

Modified Files:
	ArchJob.py Builder.py BuilderManager.py PackageJob.py 
	UserInterface.py 
Log Message:
2005-08-05  Dan Williams <dcbw at redhat.com>

   * Rework builder tracking code to always keep Builder objects around, and
        to mark them as active/unavailable rather than dropping unavailable
        builders.  Unavailable builders are pinged every 5 minutes to see if
        they are alive or not.  Admins can still manually ping builders.

    * Remove lots of locking code in between the Builders, ArchJobs, and the
        BuildMaster since it was only relevant when pyOpenSSL still sucked.
        Communication with the builder's XMLRPC server only happens from each
        Builder object's thread now.

    * Consolidate job-killing code in both the ArchJob and the PackageJob




Index: ArchJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/ArchJob.py,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -r1.8 -r1.9
--- ArchJob.py	25 Jul 2005 19:24:24 -0000	1.8
+++ ArchJob.py	6 Aug 2005 02:35:07 -0000	1.9
@@ -52,6 +52,8 @@
         self.downloads = {}
         self.starttime = time.time()
         self.endtime = 0
+        self._die = False
+        self._die_lock = threading.Lock()
 
     def _builder_finished(self):
         if self.builder_status == 'done' or self.builder_status == 'killed' or self.builder_status == 'failed' or self.builder_status == 'orphaned':
@@ -109,7 +111,6 @@
             del attrdict
 
     def _send_repo_unlocked(self):
-        self.builder.xmlrpc_lock_acquire()
         try:
             self._server.repo_unlocked(self.jobid)
         except socket.error, e:
@@ -118,11 +119,9 @@
                             self.par_job.package, self.arch, self.bci.address(), e)
         except xmlrpclib.ProtocolError, e:
             pass
-        self.builder.xmlrpc_lock_release()
 
     def _dl_files(self):
         files = []
-        self.builder.xmlrpc_lock_acquire()
         try:
             files = self._server.files(self.jobid)
         except socket.error, e:
@@ -131,13 +130,22 @@
                             self.par_job.package, self.arch, self.bci.address(), e)
         except xmlrpclib.ProtocolError, e:
             pass
-        self.builder.xmlrpc_lock_release()
         return files
 
     def process(self):
         if self.status == 'done':
             return
-        elif self.status == 'running':
+
+        # If we're supposed to die, tell the builder and clean up
+        self._die_lock.acquire()
+        should_die = self._die
+        self._die_lock.release()
+        if should_die:
+            self._server.die(self.jobid)
+            self._set_status('done')
+            return
+
+        if self.status == 'running':
             # Builders pause before they enter the 'prep' state (which accesses
             # the repo for this target), and wait for the server to allow them
             # to proceed when the repo is unlocked.
@@ -247,10 +255,10 @@
             self.par_job.remove_arch_job(self)
 
     def die(self):
+        # Can be called from other threads
         if self.status == 'initialize' or self.status == 'running':
-            self.builder.xmlrpc_lock_acquire()
-            self._server.die(self.jobid)
-            self.builder.xmlrpc_lock_release()
-            self._set_status('done')
+            self._die_lock.acquire()
+            self._die = True
+            self._die_lock.release()
 
 


Index: Builder.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/Builder.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- Builder.py	2 Aug 2005 00:58:16 -0000	1.9
+++ Builder.py	6 Aug 2005 02:35:07 -0000	1.10
@@ -25,6 +25,7 @@
 from plague import CommonErrors
 import OpenSSL
 import ArchJob
+import EmailUtils
 
 # Load in the config
 execfile("/etc/plague/server/CONFIG.py")
@@ -37,57 +38,58 @@
 certs['peer_ca_cert'] = config_opts['ca_cert']
 
 
-# Python's xmlrpclib & httplib are a bit dodgy with
-# threads, so lets lock xmlrpc operations
-builder_xmlrpc_lock = threading.Lock()
-
-
 class Builder(threading.Thread):
     """ Tracks all jobs on a builder instance """
 
+    _BUILDER_PING_INTERVAL = 60 * 5      # In seconds
+
     def __init__(self, manager, address):
         self._cur_jobid = None
         self._manager = manager
         self._jobs = {}
         self._address = address
+        self._alive = True
+        self._stop = False
+        self._prepping_jobs = False
+        self._unavail_count = 0
+        self._target_arches = {}
+        self._ping_timeout = 0
+        self._cur_ping_interval = self._BUILDER_PING_INTERVAL
+        self._when_died = 0
+
         if config_opts['ssl_builders']:
             self._server = XMLRPCServerProxy.PlgXMLRPCServerProxy(self._address, certs)
         else:
             self._server = XMLRPCServerProxy.PlgXMLRPCServerProxy(self._address, None)
         self._server_lock = threading.Lock()
-        self._unavail_count = 0
-        self._target_arches = []
-        builder_xmlrpc_lock.acquire()
-        try:
-            self._target_arches = self._server.supported_target_arches()
-            for target in self._target_arches.keys():
-                self._target_arches[target].append('noarch')
-        except socket.error:
-            builder_xmlrpc_lock.release()
-            raise RuntimeError
-        builder_xmlrpc_lock.release()
 
-        self._alive = True
-        self._stop = False
-        self._prepping_jobs = False
+        (self._alive, arches) = self._ping_builder()
+        if self._alive:
+            self._init_builder(arches)
+
+        threading.Thread.__init__(self)
+
+    def _init_builder(self, arches):
+        self._target_arches = arches
+        for target in self._target_arches.keys():
+            self._target_arches[target].append('noarch')
 
         # Kill any jobs currently running on the builder
         (jobid, status) = self._get_cur_job_and_status()
         if jobid and jobid != 0:
-            builder_xmlrpc_lock.acquire()
             try:
                 self._server.die(jobid)
             except:
                 pass
-            builder_xmlrpc_lock.release()
-
-        threading.Thread.__init__(self)
 
-    def xmlrpc_lock_acquire(self):
-        builder_xmlrpc_lock.acquire()
-
-    def xmlrpc_lock_release(self):
-        builder_xmlrpc_lock.release()
+    def _ping_builder(self):
+        target_arches = {}
+        alive = True
+        try:
+            target_arches = self._server.supported_target_arches()
+        except socket.error:
+            alive = False
+        return (alive, target_arches)
 
     def arches(self, target):
         arches = None
@@ -107,34 +109,34 @@
         return self._address
 
     def alive(self):
-        """
-        Is the builder responding to requests?
-        """
+        """ Is the builder responding to requests? """
         return self._alive
     
     def start_job(self, par_job, target, arch, srpm_url):
+        self._server_lock.acquire()
         if not self.available():
+            self._server_lock.release()
             raise RuntimeError
         if not self._target_arches.has_key(target) or len(self._target_arches[target]) == 0:
+            self._server_lock.release()
             raise RuntimeError
         if not arch in self._target_arches[target]:
+            self._server_lock.release()
             raise RuntimeError
 
-        builder_xmlrpc_lock.acquire()
-        self._server_lock.acquire()
         try:
             jobid = self._server.start(target, arch, srpm_url)
         except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, xmlrpclib.ProtocolError):
             jobid = 0
-        self._server_lock.release()
-        builder_xmlrpc_lock.release()
 
         if jobid == 0:
+            self._server_lock.release()
             raise RuntimeError
 
         job = ArchJob.ArchJob(self, self._server, par_job, jobid, target, arch)
         self._jobs[jobid] = job
         self._update_cur_job()
+        self._server_lock.release()
 
         return job
 
@@ -142,16 +144,12 @@
         jobid = None
         status = None
 
-        builder_xmlrpc_lock.acquire()
-        self._server_lock.acquire()
         try:
             (jobid, status) = self._server.get_cur_job()
         except (socket.error, socket.timeout, OpenSSL.SSL.SysCallError, xmlrpclib.ProtocolError):
             self._unavail_count = self._unavail_count + 1
         else:
             self._unavail_count = 0
-        self._server_lock.release()
-        builder_xmlrpc_lock.release()
 
         return (jobid, status)
 
@@ -175,24 +173,76 @@
     def stop(self):
         self._stop = True
 
+    def ping_asap(self):
+        # Reduce the ping interval to ping the builder right away
+        self._cur_ping_interval = 0
+
+    def _handle_builder_suspend(self):
+        for jobid in self._jobs.keys():
+            job = self._jobs[jobid]
+            job.builder_gone()
+            del self._jobs[jobid]
+        self._jobs = {}
+        self._alive = False
+        self._unavail_count = 0
+        self._prepping_jobs = False
+        self._ping_timeout = time.time()
+        self._when_died = time.time()
+        # Reset current ping interval to default
+        self._cur_ping_interval = self._BUILDER_PING_INTERVAL
+
+        # Notify admins
+        print "Suspending builder '%s' because it timed out." % self._address
+        subject = "Builder Timeout: %s" % self._address
+        msg = "The builder '%s' timed out and was suspended." % self._address
+        for addr in config_opts['admin_emails']:
+            EmailUtils.email_result(addr, msg, subject)
+
+    def _handle_builder_reactivate(self, target_arches):
+        self._alive = True
+        self._ping_timeout = 0
+
+        self._init_builder(target_arches)
+
+        print "Re-activating builder '%s'." % self._address
+        subject = "Builder Re-activated: %s" % self._address
+        msg = """The builder '%s' was re-activated.
+
+  Suspended at: %s
+  Re-Enabled at: %s
+""" % (self._address, time.ctime(self._when_died), time.ctime(time.time()))
+        for addr in config_opts['admin_emails']:
+            EmailUtils.email_result(addr, msg, subject)
+        self._when_died = 0
+
     def run(self):
         while not self._stop:
-            self._update_cur_job()
+            self._server_lock.acquire()
 
-            # Kill all jobs on the client if it went away
-            if self._unavail_count > 2:
-                for jobid in self._jobs.keys():
-                    job = self._jobs[jobid]
-                    job.builder_gone()
-                    del self._jobs[jobid]
-                self._alive = False
-                self._stop = True
-                self._manager.builder_gone()
-                continue
-
-            # Update status of all jobs
-            for j in self._jobs.values():
-                j.process()
+            if self._alive:
+                self._update_cur_job()
+
+                if self._unavail_count > 2:
+                    # Kill all jobs on the client if it went away
+                    self._handle_builder_suspend()
+                else:
+                    # Update status of all archjobs on this builder
+                    for j in self._jobs.values():
+                        j.process()
+            else:
+                # Ping the builder every so often to see if it responds again
+                if time.time() > (self._ping_timeout + self._cur_ping_interval):
+                    (alive, target_arches) = self._ping_builder()
+                    if alive:
+                        self._handle_builder_reactivate(target_arches)
+                    else:
+                        # Wait and ping again
+                        self._ping_timeout = time.time()
+
+                    # Reset current ping interval to default
+                    self._cur_ping_interval = self._BUILDER_PING_INTERVAL
+
+            self._server_lock.release()
 
             time.sleep(3)
     
@@ -218,9 +268,12 @@
                 if not arch in arches:
                     arches.append(arch)
         builder_dict['arches'] = arches
-        if self._cur_jobid:
-            builder_dict['status'] = 'building'
+        if self._alive:
+            if self._cur_jobid:
+                builder_dict['status'] = 'building'
+            else:
+                builder_dict['status'] = 'idle'
         else:
-            builder_dict['status'] = 'idle'
+            builder_dict['status'] = 'unavailable'
         return builder_dict
 


Index: BuilderManager.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/BuilderManager.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -r1.9 -r1.10
--- BuilderManager.py	29 Jul 2005 06:05:37 -0000	1.9
+++ BuilderManager.py	6 Aug 2005 02:35:07 -0000	1.10
@@ -40,17 +40,22 @@
         self._builders_lock.acquire()
         self.possible_builders = config_opts['builders']
         self._builders_lock.release()
-        self.running_builders = []
-        builder_list = self.update_builders()
+
+        self._builders = []
+        self.add_new_builders()
 
         # Print out builder list when starting up
         print "\nBuilders:"
-        print "-" * 60
-        for builder in builder_list:
-            string = "  " + builder['address']
-            string = string + " " * (40 - len(builder['address']))
-            for arch in builder['arches']:
+        print "-" * 90
+        for builder in self._builders:
+            string = "  " + builder.address()
+            string = string + " " * (40 - len(builder.address()))
+            builder_dict = builder.to_dict()
+            for arch in builder_dict['arches']:
                 string = string + arch + " "
+            string = string + " " * (80 - len(string))
+            string = string + builder_dict['status']
+            del builder_dict
             print string
         print ""
 
@@ -60,23 +65,20 @@
         self._have_work = False
 
     def __del__(self):
-        for builder in self.running_builders:
+        for builder in self._builders:
             builder.stop()
         time.sleep(2)
-        for builder in self.running_builders:
-            del builder
 
     def set_build_master(self, build_master):
         self._build_master = build_master
 
-    def update_builders(self):
+    def add_new_builders(self):
         self._builders_lock.acquire()
 
         # Load in any new builders from the config file
         execfile("/etc/plague/server/CONFIG.py")
         self.possible_builders = config_opts['builders']
 
-        builder_list = []
         for address in self.possible_builders:
             # If the address is "https" but we aren't set up for SSL, exit
             if address.startswith('https') and not config_opts['ssl_builders']:
@@ -86,79 +88,46 @@
                 print "Builder address (%s) starts with 'http', but the 'ssl_builders' option is set to True." % address
                 os._exit(1)
 
-            # If the address is already in our running_builders list, skip it
+            # If the address is already in our _builders list, skip it
             skip = False
-            for builder in self.running_builders:
+            for builder in self._builders:
                 if address == builder.address():
                     skip = True
             if skip == True:
                 continue
 
-            # Try to connect to builder and add it to our builder
-            # list if we can
-            try:
-                builder = Builder.Builder(self, address)
-            except RuntimeError:
-                pass
-            else:
-                builder_list.append(builder.to_dict())
-                builder.start()
-                self.running_builders.append(builder)
+            # Add the builder to our build list
+            builder = Builder.Builder(self, address)
+            builder.start()
+            self._builders.append(builder)
 
         self._builders_lock.release()
-        return builder_list
+
+    def ping_suspended_builders(self):
+        self._builders_lock.acquire()
+        for builder in self._builders:
+            if not builder.alive():
+                builder.ping_asap()
+        self._builders_lock.release()
 
     def list_builders(self):
         builder_list = []
-        for builder in self.running_builders:
+        for builder in self._builders:
             builder_list.append(builder.to_dict())
-
-        # Add unavailable builders
-        for builder in self.possible_builders:
-            found = False
-            for tmp in builder_list:
-                if builder == tmp['address']:
-                    found = True
-            if found:
-                continue
-            builder_dict = {}
-            builder_dict['address'] = builder
-            builder_dict['arches'] = []
-            builder_dict['status'] = "unavailable"
-            builder_list.append(builder_dict)
-
         return builder_list
 
     def have_work(self, paused):
         avail = False
-        for builder in self.running_builders:
+        for builder in self._builders:
             if builder.available():
                 avail = True
         if not paused and len(self._queue) > 0 and avail:
             return True
         return self._have_work
 
-    def builder_gone(self):
-        self._have_work = True
-
     def process(self, paused):
         self._have_work = False
 
-        # Deal with dead/unreachable builders
-        for builder in self.running_builders:
-            if not builder.alive():
-                print "Removing builder '%s' because it timed out." % builder.address()
-
-                # Notify admins
-                subject = "Builder Timeout: %s" % builder.address()
-                msg = "The builder '%s' timed out and was removed from the active builder list." % builder.address()
-                for addr in config_opts['admin_emails']:
-                    EmailUtils.email_result(addr, msg, subject)
-
-                # Forget about the builder
-                builder.stop()
-                self.running_builders.remove(builder)
-
         # Don't queue any new jobs if we are paused
         if paused:
             return
@@ -173,7 +142,7 @@
                 self._queue.remove(req)
                 continue
             # Find a free builder for this request
-            for builder in self.running_builders:
+            for builder in self._builders:
                 if builder.available() and builder.can_build_arch_on_target(req['arch'], req['target']):
                     try:
                         job = builder.start_job(parent, req['target'], req['arch'], req['srpm_url'])
@@ -213,7 +182,7 @@
 
     def any_prepping_builders(self):
         # query each build builder for any jobs that are in the 'prepping' state
-        for builder in self.running_builders:
+        for builder in self._builders:
             if builder.alive() and builder.any_prepping_jobs():
                 return True
         return False


Index: PackageJob.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/PackageJob.py,v
retrieving revision 1.18
retrieving revision 1.19
diff -u -r1.18 -r1.19
--- PackageJob.py	4 Aug 2005 20:27:12 -0000	1.18
+++ PackageJob.py	6 Aug 2005 02:35:07 -0000	1.19
@@ -137,6 +137,8 @@
         self.archjobs = {}
         self._archjobs_lock = threading.Lock()
         self._event = threading.Event()
+        self._killer = None
+        self._die = False
 
         first_stage = 'initialize'
         if self.no_cvs == True:
@@ -449,13 +451,27 @@
         return False
 
     def die(self, username):
-        # Kill any building jobs
-        resultstring = "%s (%s): Build on target %s was killed by %s." % (self.uid, self.name, self.target, username)
+        self._killer = username
+        self._die = True
+        self.wake()
 
+    def _handle_death(self):
+        resultstring = "%s (%s): Build on target %s was killed by %s." % (self.uid, self.name, self.target, self._killer)
         self.result = 'killed'
         self._set_cur_stage('finished', resultstring)
         self.email_result(self.username, resultstring)
 
+        # Kill any building jobs
+        self._kill_all_archjobs()
+
+        # Wake us up if the Controller thread is still running
+        if not self._event.isSet():
+            self._event.set()
+
+        self.endtime = time.time()
+        self.bm.notify_job_done(self)
+
+    def _kill_all_archjobs(self):
         self._archjobs_lock.acquire()
         for job in self.archjobs.values():
             if job:
@@ -463,9 +479,6 @@
         self.archjobs = {}
         self._archjobs_lock.release()
 
-        self.endtime = time.time()
-        self.bm.notify_job_done(self)
-
     def wake(self):
         self._event.set()
 
@@ -473,6 +486,10 @@
         if self.is_done():
             return
 
+        if self._die:
+            self._handle_death()
+            return
+
         try:
             func = getattr(self, "_stage_%s" % self.curstage)
             if func():
@@ -494,11 +511,7 @@
             msg = "%s\n-------------------------------------------------\n\n%s\n" % (msg, logtail)
             self.email_result(self.username, resultstring=msg, subject=subj)
             # Kill remaining jobs on other arches
-            self._archjobs_lock.acquire()
-            for job in self.archjobs.values():
-                if job:
-                    job.die()
-            self._archjobs_lock.release()
+            self._kill_all_archjobs()
             self._stage_failed(e.msg)
 
     def _stage_building(self):


Index: UserInterface.py
===================================================================
RCS file: /cvs/fedora/extras-buildsys/server/UserInterface.py,v
retrieving revision 1.42
retrieving revision 1.43
diff -u -r1.42 -r1.43
--- UserInterface.py	5 Aug 2005 16:24:13 -0000	1.42
+++ UserInterface.py	6 Aug 2005 02:35:07 -0000	1.43
@@ -452,8 +452,10 @@
 
 
     def update_builders(self):
-        execfile("/etc/plague/server/CONFIG.py")
-        builder_list = self._builder_manager.update_builders()
+        self._builder_manager.add_new_builders()
+        self._builder_manager.ping_suspended_builders()
+        time.sleep(2)
+        builder_list = self._builder_manager.list_builders()
         return (0, "Success.", builder_list)
 
 




More information about the fedora-extras-commits mailing list