Snake Wrestling

Controlling Connection Bursts in Twisted Applications

21 May 2011

I've been using Twisted's Deferred and DeferredList to much pleasure, but recently I found I needed to limit the amount of tasks/connections that would run at once—my application simply spawned too many connections too quickly, and killed my little home router.

In this post I want to demonstrate how to do that using Twisted's DeferredSemaphore.

Here's a sample application which doesn't use DeferredSemaphore:

from twisted.internet import defer
from twisted.internet import reactor
from twisted.names.client import lookupMailExchange

domains = [
    'google.com',
    'yahoo.com',
    'microsoft.com',
    'facebook.com',
    'twitter.com'
    ]

def showMailExchanges(results):
    for result in results:
        # DeferredList returns (status, deferred) for each deferred, e.g.:
        # (True, ([<RR name=twitter.com type=MX class=IN ttl=164s auth=False>,
        #          <RR name=twitter.com type=MX class=IN ttl=164s auth=False>,
        #          <RR name=twitter.com type=MX class=IN ttl=164s auth=False>,
        #          <RR name=twitter.com type=MX class=IN ttl=164s auth=False>,
        #          <RR name=twitter.com type=MX class=IN ttl=164s auth=False>],
        #         [], []))
        ans, auth, add = result[1] # DNS results are always a 3-part tuple
        for x in ans:
            print("{0:15} {1}".format(x.name, x.payload.name))

deferreds = []
for domain in domains:
    d = lookupMailExchange(domain)
    deferreds.append(d)
dl = defer.DeferredList(deferreds)
dl.addCallback(showMailExchanges)

reactor.run()

# Example output:
# google.com      aspmx.l.google.com
# google.com      alt2.aspmx.l.google.com
# google.com      alt3.aspmx.l.google.com
# google.com      alt1.aspmx.l.google.com
# google.com      alt4.aspmx.l.google.com
# yahoo.com       b.mx.mail.yahoo.com
# yahoo.com       d.mx.mail.yahoo.com
# yahoo.com       e.mx.mail.yahoo.com
# yahoo.com       f.mx.mail.yahoo.com
# yahoo.com       g.mx.mail.yahoo.com
# yahoo.com       h.mx.mail.yahoo.com
# yahoo.com       i.mx.mail.yahoo.com
# yahoo.com       j.mx.mail.yahoo.com
# yahoo.com       k.mx.mail.yahoo.com
# yahoo.com       l.mx.mail.yahoo.com
# yahoo.com       m.mx.mail.yahoo.com
# yahoo.com       n.mx.mail.yahoo.com
# yahoo.com       a.mx.mail.yahoo.com
# microsoft.com   mail.messaging.microsoft.com
# facebook.com    smtpin.mx.facebook.com
# twitter.com     alt2.aspmx.l.google.com
# twitter.com     ASPMX2.GOOGLEMAIL.com
# twitter.com     ASPMX3.GOOGLEMAIL.com
# twitter.com     aspmx.l.google.com
# twitter.com     alt1.aspmx.l.google.com

Say we want to limit the amount of lookups that are performed at once. Enter Twisted's DeferredSemaphore:

from twisted.internet import defer
from twisted.internet import reactor
from twisted.names.client import lookupMailExchange

domains = [
    'google.com',
    'yahoo.com',
    'microsoft.com',
    'facebook.com',
    'twitter.com'
    ]

def showMailExchanges(results):
    for result in results:
        ans, auth, add = result[1]
        for x in ans:
            print("{0:15} {1}".format(x.name, x.payload.name))

deferreds = []
sem = defer.DeferredSemaphore(2)            # New
for domain in domains:
    d = sem.run(lookupMailExchange, domain) # New
    deferreds.append(d)
dl = defer.DeferredList(deferreds)
dl.addCallback(showMailExchanges)

reactor.run()

Neat. We only had to change two lines.

What Twisted's "asynchronous semaphore" does is restrict the number of Deferred objects that will run at once. In this case, we specify that at most two Deferred objects (and their children) can do work at the same time. That means our script won't try to look up the mail exchanges of more than two domains at once.

If we want to implement an application-wide semaphore, we can write a helper function that returns a global semaphore:

from twisted.internet import defer
from twisted.internet import reactor
from twisted.names.client import lookupMailExchange

domains = [
    'google.com',
    'yahoo.com',
    'microsoft.com',
    'facebook.com',
    'twitter.com'
    ]

def showMailExchanges(results):
    for result in results:
        ans, auth, add = result[1]
        for x in ans:
            print("{0:15} {1}".format(x.name, x.payload.name))

theSemaphore = None
def getSemaphore():
    global theSemaphore
    if theSemaphore is None:
        theSemaphore = defer.DeferredSemaphore(2)
    return theSemaphore

deferreds = []
sem = getSemaphore()
for domain in domains:
    d = sem.run(lookupMailExchange, domain)
    deferreds.append(d)
dl = defer.DeferredList(deferreds)
dl.addCallback(showMailExchanges)

reactor.run()

Now, whenever we do something, and we're using this module's getSemaphore function to load the semaphore, the amount of Deferred objects that run at once is restricted to two. Awesome.

We can even write a "semaphore map" to do away with the boilerplate looping and adding:

from twisted.internet import defer
from twisted.internet import reactor
from twisted.names.client import lookupMailExchange

domains = [
    'google.com',
    'yahoo.com',
    'microsoft.com',
    'facebook.com',
    'twitter.com'
    ]

def showMailExchanges(results):
    for result in results:
        ans, auth, add = result[1]
        for x in ans:
            print("{0:15} {1}".format(x.name, x.payload.name))

theSemaphore = None
def getSemaphore():
    global theSemaphore
    if theSemaphore is None:
        theSemaphore = defer.DeferredSemaphore(2)
    return theSemaphore

def semMap(function, things, *args, **kwargs):
    assert callable(function)
    sem = getSemaphore()
    deferreds = []
    for x in things:
        d = sem.run(function, x, *args, **kwargs)
        deferreds.append(d)
    dl = defer.DeferredList(deferreds)
    return dl

dl = semMap(lookupMailExchange, domains)
dl.addCallback(showMailExchanges)

reactor.run()

Pretty sweet, huh?