extras-buildsys/common ssl-client-test.py, NONE, 1.1 ssl-server-test.py, NONE, 1.1

Daniel Williams (dcbw) fedora-extras-commits at redhat.com
Tue Oct 31 18:10:51 UTC 2006


Author: dcbw

Update of /cvs/fedora/extras-buildsys/common
In directory cvs-int.fedora.redhat.com:/tmp/cvs-serv9665/common

Added Files:
	ssl-client-test.py ssl-server-test.py 
Log Message:
2006-10-31  Dan Williams  <dcbw at redhat.com>

    * common/ssl-client-test.py
      common/ssl-server-test.py
        - Two utilities to test the functionality of SSLConnection class
        and the SSL server/client classes




--- NEW FILE ssl-client-test.py ---
import threading
import time
import random
import OpenSSL
import socket
import SSLCommon
import SSLConnection
import sys, os
import urllib
import base64
import types


client_start = False

threadlist_lock = threading.Lock()
threadlist = {}
timed_out = 0

class TestClient(threading.Thread):
    def __init__(self, certs, num):
        self._ctx = SSLCommon.CreateSSLContext(certs)
        self._timeout = 20
        self._host = "localhost"
        self._port = 8888
        self.num = i
        threading.Thread.__init__(self)

    def get_host_info(self, host):
        x509 = {}
        if isinstance(host, types.TupleType):
            host, x509 = host

        import urllib
        auth, host = urllib.splituser(host)

        if auth:
            import base64
            auth = base64.encodestring(urllib.unquote(auth))
            auth = string.join(string.split(auth), "") # get rid of whitespace
            extra_headers = [
                ("Authorization", "Basic " + auth)
                ]
        else:
            extra_headers = None

        return host, extra_headers, x509

    def _make_connection(self):
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        con = OpenSSL.SSL.Connection(self._ctx, sock)
        sock = SSLConnection.SSLConnection(con)
        sock.connect((self._host, self._port))
        return sock

    def run(self):
        request = "ping" * 40

        while not client_start:
            time.sleep(0.05)
        i = 0
        while i < 5:
            reply = None
            try:
                sock = self._make_connection()

                # Send request
                sock.sendall(request)

                # Wait for response
                print "Waiting for response"
                reply = sock.recv(1024)
                sock.close()
            except OpenSSL.SSL.Error, e:
                reply = "OpenSSL Error (%s)" % e
            except socket.error, e:
                reply = "Socket error (%s)" % e
            except socket.timeout, e:
                reply = "Socket timeout (%s)" % e
                threadlist_lock.acquire()
                timed_out = timed_out + 1
                threadlist_lock.release()
            print "TRY(%d / %d): %s" % (self.num, i, reply)
            time.sleep(0.05)
            i = i + 1
        threadlist_lock.acquire()
        del threadlist[self]
        threadlist_lock.release()


if __name__ == '__main__':
    if len(sys.argv) < 4:
        print "Usage: python %s key_and_cert ca_cert peer_ca_cert" % sys.argv[0]
        sys.exit(1)

    certs = {}
    certs['key_and_cert'] = sys.argv[1]
    certs['ca_cert'] = sys.argv[2]
    certs['peer_ca_cert'] = sys.argv[3]

    i = 100
    while i > 0:
        t = TestClient(certs, i)
        threadlist[t] = None
        print "Created thread %d." % i
        t.start()
        i = i - 1

    time.sleep(3)
    print "Unleashing threads."
    client_start = True
    while True:
        try:
            time.sleep(0.25)
            threadlist_lock.acquire()
            if len(threadlist) == 0:
                break
            threadlist_lock.release()
        except KeyboardInterrupt:
            os._exit(0)
    print "All done. (%d timed out)" % timed_out



--- NEW FILE ssl-server-test.py ---
import os, sys
import socket
import time
import SSLCommon
import SocketServer

response = "pong" * 20

class ReqHandler(SocketServer.StreamRequestHandler):
    def handle(self):
        print "Starting handler"
        # Read the request
        string = self.rfile.read(100)
        if len(string) > 0:
            print "request was: '%s' (%d)" % (string, len(string))
        else:
            print "request was zero-length"
        # Write the response
        self.wfile.write(response)

if __name__ == '__main__':
    if len(sys.argv) < 4:
        print "Usage: python %s key_and_cert ca_cert peer_ca_cert" % sys.argv[0]
        sys.exit(1)

    certs = {}
    certs['key_and_cert'] = sys.argv[1]
    certs['ca_cert'] = sys.argv[2]
    certs['peer_ca_cert'] = sys.argv[3]

    print "Starting the server."
    server = SSLCommon.PlgBaseSSLServer(('localhost', 8888), ReqHandler, certs)
    server.serve_forever()





More information about the fedora-extras-commits mailing list