Contents


Linear optimization in Python, Part 2

Build a scalable architecture in the cloud

Combining Pyomo, RabbitMQ, and Tornado

Comments

Content series:

This content is part # of # in the series: Linear optimization in Python, Part 2

Stay tuned for additional content in this series.

This content is part of the series:Linear optimization in Python, Part 2

Stay tuned for additional content in this series.

Part 1 of this 3-part series introduces the basics of linear optimization in Python using the Pyomo library. Now let's talk about scaling up. Python lacks true OS threads, so how do you scale it? This article shows how to combine technologies to create a real-world scalable architecture for buiding a Pyomo web solving farm. We combine a single threaded event loop, an AMQP server, and worker processes to create a pattern that works to scale a linear optimization farm. This architecture also works for many general purpose computing problems in Python or Ruby.

Scaling

Creating scalable and parallelizable code in a scripting language like Python has different challenges than C, C++ or Java programming languages. Two key factors are execution speed and the threading model. To make things even more complex, Amdahl's Law (see Related topics) shows that the speedup of a program is not strictly proportional to the number of processors available on a computer. The proportionality of parallel portions of the code is ultimately limited by the time needed for the sequential portion of the program.

An intuitive way to think about this problem is to imagine an event loop, where each tick of the event loop takes a total of 1 second of CPU time. Inside the event loop, which you are trying to debug, you find that 75% of the time is spent on a time.sleep (.75) command and the rest is spent on doing the work. Since 75% of the time in the 1-second event loop is spent sleeping, it's impossible to make that portion of the run loop parallelizable. The only portion of the 1 second that could run faster is the 25% not spent sleeping. No matter how fast, or how many processors get thrown at the .25 second of the event loop, it will always take .75 seconds + the optimized "parallelizable" code.

Even when taking Amdahl's law into consideration, the parallel portions of Python code can't be paralyzed by adding more threads, as a general rule. While such a strategy works well for languages like C# and Java, it's slightly more complicated in Python. In Python, if a portion of code can be parallelized but it's doing CPU work, a thread will not help because of the Global Interpreter Lock (GIL).

In a final complication, when you're writing a web application that performs linear optimization using an event driven architecture like Tornado, you have to put special thought into the design. If the event loop is blocked performing CPU work or blocking network operations, then this design can be safely called an iterative server. In the book Unix Network Programming, author W. Richard Stevens describes two servers:

  • Iterative server: Can't process a pending client until it has completely serviced the current client.
  • Concurrent server: Classically forks a process for each request.

Without careful thought, an event loop server, like Ruby's Event Machine or Python's Tornado, quickly becomes an iterative server. In the real world, an iterative server is a toy, and it must be avoided at all costs.

Event loop programming

In the case of an event loop, the CPU-intensive work of performing a linear optimization algorithm will block all subsequent requests by clients until the work is finished from the first request. In many cases this is non-optimal because other CPU-intensive work can be done using one of the other cores on the server. It can be easy to get into a situation where a server can have 23 of 24 cores sitting idle, and CPU intensive requests can be start to form an exponential queue. Likewise, if a blocking network operation occurs inside an event loop, something even worse can occur — all 24 cores can be sitting idle while requests begin to queue up.

The trick to working with an event loop architecture, then, is to make sure two things occur:

  • All network operations must be done in a non-blocking fashion, in a separate thread or process, or using I/O multiplexing such as select and poll or using asynchronous I/O using the POSIX aio_ functions.
  • Minimal CPU or network work should be done in an event loop because it blocks the event loop.

Simply stated, never block the event loop if you use it as a server for a web site. Something else has to do the rest of the CPU work if it needs to be done. In Python, one strategy is to accept messages concurrently and then offload any task to a series of workers subscribing to an Advanced Message Queueing Protocol (AMQP) message bus such as RabbitMQ. This is the strategy demonstrated in this article. The ideal sweet spot for asynchronous event loop servers is to serve out many concurrent, long running socket connections. A real world example is to send data from one network connection, say a socket that gathers stock ticket data, to another such as a WebSockets connection.

Creating a Tornado web application to serve out Pyomo applications

This example uses a few different technologies:

  • Tornado (see Related topics) is an open source, scalable, non-blocking web server. It serves as the asynchronous messaging system.
  • Pyomo (see Related topics) services the linear optimization tasks that are run upon each request.
  • RabbitMQ serves as the message bus between the layers.

To set up your environment and run this example, install the following components on OS X.

  1. RabbitMQ using HomeBrew.
  2. Pyomo
  3. Pika, a RabbitMQ Python client.
  4. Tornado, an asynchronous web server. Use easy_install.

A walk through the code

Let's begin an examination of the Tornado part of the architecture by benchmarking the performance of a basic, blocking, yet quick message to RabbitMQ using ApacheBench.

Listing 1. Benchmarking a basic message
ab -n 2000 -c 100 http://127.0.0.1:8888/benchmark
This is ApacheBench, Version 2.3 <$Revision: 1178079 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
Completed 200 requests
Completed 400 requests
Completed 600 requests
Completed 800 requests
Completed 1000 requests
Completed 1200 requests
Completed 1400 requests
Completed 1600 requests
Completed 1800 requests
Completed 2000 requests
Finished 2000 requests


Server Software:        TornadoServer/2.3
Server Hostname:        127.0.0.1
Server Port:            8888

Document Path:          /benchmark
Document Length:        24 bytes

Concurrency Level:      100
Time taken for tests:   6.231 seconds
Complete requests:      2000
Failed requests:        0
Write errors:           0
Total transferred:      360000 bytes
HTML transferred:       48000 bytes
Requests per second:    320.96 [#/sec] (mean)
Time per request:       311.570 [ms] (mean)
Time per request:       3.116 [ms] (mean, across all concurrent requests)
Transfer rate:          56.42 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.5      0       4
Processing:    11  304  42.9    308     605
Waiting:       11  304  42.9    308     605
Total:         15  304  42.7    309     608

Percentage of the requests served within a certain time (ms)
  50%    309
  66%    311
  75%    312
  80%    313
  90%    315
  95%    324
  98%    336
  99%    338
 100%    608 (longest request)

There's a special web request handler called /benchmark. Inside that handler a simple message to RabbitMQ is made via the pika.BlockingConnection method. Look at at the requests per second output from ApacheBench. There's a mean of about 320. This isn't horrible, but it isn't great either. The official benchmarks for Tornado put it at about 3353 requests per second. If this were an actual bottleneck in a real world application, it would be easy to take the next step and convert the blocking RabbitMQ messaging to a fully asynchronous method. The pika library has an asynchronous adapter called SelectConnection which should convert the benchmark into something much closer to the 3353 number.

RabbitMQ itself can accept tens of thousands of messages per second. The published benchmarks show it producing and consuming in parallel 64,315 messages per second. It would take an order of magnitude more of these Tornado workers to overload one RabbitMQ instance, even in the asynchronous model. A basic RabbitMQ installation is assumed, and there is an exchange declared, pyomo-job-exchange. The Tornado web server publishes using the routing_key pyomo-job-exchange. Additionally, there are worker processes that consume the pyomo-job-exchange.

In this pika worker layer, there are workers that have subscribed to the pyomo-job-exchange. Each worker is "dumb" in the sense that they blindly accept parameter data that they use to perform a linear optimization using Pyomo. If you like, you can send the final result from the pika worker to WebSockets directly subscribed to RabbitMQ via the STOMP plugin (see Related topics for a link to an article that tells how). The only additional part to implement is to tell the pika worker process to push the final result back to the RabbitMQ queue to which the WebSocket worker is subscribed.

Figure 1. The Tornado-Pyomo-RabbitMQ architecture
The Tornado-Pyomo-RabbitMQ architecture
The Tornado-Pyomo-RabbitMQ architecture

Listing 2 shows the Tornado web server code. To start the server, execute the command python server.py.

Listing 2. Tornado server
import tornado.ioloop
import tornado.web
import tornado.websocket
import pika
import time

def publish_to_rabbitmq(msg):
    "blocking message sent to RabbitMQ"
    
    credentials = pika.PlainCredentials("guest", "guest")
    conn_params = pika.ConnectionParameters("localhost",
                                            credentials = credentials)
    conn_broker = pika.BlockingConnection(conn_params) 
    channel = conn_broker.channel() 
    channel.exchange_declare(exchange="pyomo-job-exchange", 
                             type="direct",
                             passive=False,
                             durable=False,
                             auto_delete=False)
    
    msg_props = pika.BasicProperties()
    msg_props.content_type = "text/plain"
    channel.basic_publish(body=msg,
                          exchange="pyomo-job-exchange",
                          properties=msg_props,
                          routing_key="pyomo-job-dispatch")

def consume_from_rabbitmq():
    pass

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        msg = "%s" % time.asctime()
        publish_to_rabbitmq(msg)
        self.write(msg)

class PyomoTask(tornado.web.RequestHandler):
    def get(self):
        self.write('<html><body><form action="/pyomo" method="post">'
                   '<input type="text" name="ProfitRateWindows">'
                   '<input type="submit" value="ProfitRateWindows">'
                   '</form></body></html>')

    def post(self):
        self.set_header("Content-Type", "text/plain")
        result = self.get_argument("ProfitRateWindows")
        publish_to_rabbitmq(result)
        self.write("Submitted to RabbitMQ/Pyomo Worker: " + result)
        
class PyomoWebSocketResult(tornado.websocket.WebSocketHandler):

    def open(self):
        """Called when a websocket opens"""
    pass        

application = tornado.web.Application([
    (r"/benchmark", MainHandler),
    (r"/pyomo", PyomoTask),
    (r"/websocket", PyomoWebSocketResult),
    
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

The Tornado web server code has three main handlers: /benchmark, /pyomo, and /websockets (I leave it to you to implement). The /pyomo handler is a naively implemented form that accepts a value and then sends this value to RabbitMQ as extracted from the larger example above:

result = self.get_argument("ProfitRateWindows")
publish_to_rabbitmq(result)

Let's look now at the Pyomo worker code. It's shown in Listing 3. To execute the worker, run these commands:

source coopr/bin/activate
python worker.py

Visit http://localhost:8888/pyomo and submit a numerical value for the Profit Rate Windows.

Listing 3. Pyomo worker
import pika
from coopr.pyomo import (ConcreteModel, Objective, Var, NonNegativeReals,
                              maximize, Constraint)
from coopr.opt import SolverFactory
import time

def do_pyomo_work(profit_rate_windows):
    
    Products = ['Doors', 'Windows']
    ProfitRate = {'Doors':300, 'Windows':profit_rate_windows}
    Plants = ['Door Fab', 'Window Fab', 'Assembly']
    HoursAvailable = {'Door Fab':4, 'Window Fab':12, 'Assembly':18}
    HoursPerUnit = {('Doors','Door Fab'):1, ('Windows', 'Window Fab'):2,
                    ('Doors','Assembly'):3, ('Windows', 'Assembly'):2,
                    ('Windows', 'Door Fab'):0, ('Doors', 'Window Fab'):0}
    
    #Concrete Model
    model = ConcreteModel()
    #Decision Variables
    model.WeeklyProd = Var(Products, within=NonNegativeReals)
    
    #Objective
    model.obj = Objective(expr=
                sum(ProfitRate[i] * model.WeeklyProd[i] for i in Products),
                sense = maximize)
    
    def CapacityRule(model, p):
        """User Defined Capacity Rule
        
        Accepts a pyomo Concrete Model as the first positional argument,
        and a list of Plants as a second positional argument
        """
        
        return sum(HoursPerUnit[i,p] * model.WeeklyProd[i] for i in Products) 
                                  <= HoursAvailable[p]

    model.Capacity = Constraint(Plants, rule = CapacityRule)
    opt = SolverFactory("glpk")
    instance = model.create()
    results = opt.solve(instance)
    #results.write()
    return results.Solution()

def create_channel():
    credentials = pika.PlainCredentials("guest", "guest")
    conn_params = pika.ConnectionParameters("localhost",
                                        credentials = credentials)
    conn_broker = pika.BlockingConnection(conn_params) 
    channel = conn_broker.channel() 
    channel.exchange_declare(exchange="pyomo-job-exchange", 
                         type="direct",
                         passive=False,
                         durable=False,
                         auto_delete=False)
    channel.queue_declare(queue="pyomo-queue") 
    channel.queue_bind(queue="pyomo-queue",     
                   exchange="pyomo-job-exchange",
                   routing_key="pyomo-job-dispatch")
    return channel

def consume_run_loop():
    channel = create_channel()
    def msg_consumer(channel, method, header, body):
        channel.basic_ack(delivery_tag=method.delivery_tag) 
        print body
        now = time.time()
        res = do_pyomo_work(int(body))
        print res
        print "Pyomo Job Completed in: %s seconds" % round(time.time() - now, 2)
        return
    channel.basic_consume( msg_consumer, 
                           queue="pyomo-queue",
                           consumer_tag="pyomo-consumer")
    channel.start_consuming()

consume_run_loop()

The worker is a composite of two actions. First the existing Wyndor example has been modified such that the web form is substituting the ProfitRate = {'Doors':300, 'Windows':profit_rate_windows} value. This is completely arbitrary and unrealistic in a real world application, but it makes it easier to demonstrate for this article. In a real world application, it's more likely that many portions of the linear optimization are assigned dynamically.

Next, the worker is to essentially "block" forever waiting for requests to come from RabbitMQ. It then takes each result and passes it to the callback msg_consumer which, in turn, runs the linear optimization and prints the result and the time it took to run the code to stdout. Note, that "N" worker processes can be spawned and the first worker process grabs the first job. This allows for a relatively easy scaling model because of how "dumb" each worker is, plus there is no shared state to reference, just a message to process.

In looking at a couple of different submissions of the web form which are in turn processed by the Pyomo workers, you can see that each one is receiving the request and spending about 2/100 of a seconds in CPU work.

Listing 4. Worker output
python worker.py
100

Gap: 0.0
Status: feasible
Objective: 
  obj: 
    Id: 0
    Value: 1500.0
Variable: 
  WeeklyProd(Doors):
    Id: 0
    Value: 4
  WeeklyProd(Windows):
    Id: 1
    Value: 3
Constraint: 
  c_u_Capacity(Assembly)_: 
    Id: 0
    Value: 18.0
  c_u_Capacity(Door_Fab)_: 
    Id: 1
    Value: 4.0
  c_u_Capacity(Window_Fab)_: 
    Id: 2
    Value: 6.0

Pyomo Job Completed in: 0.02 seconds
500

Gap: 0.0
Status: feasible
Objective: 
  obj: 
    Id: 0
    Value: 3600.0
Variable: 
  WeeklyProd(Doors):
    Id: 0
    Value: 2
  WeeklyProd(Windows):
    Id: 1
    Value: 6
Constraint: 
  c_u_Capacity(Assembly)_: 
    Id: 0
    Value: 18.0
  c_u_Capacity(Door_Fab)_: 
    Id: 1
    Value: 2.0
  c_u_Capacity(Window_Fab)_: 
    Id: 2
    Value: 12.0

Pyomo Job Completed in: 0.02 seconds

Conclusion

Here's an additional benefit of this architecture: RabbitMQ is language agnostic, so you can swap out any component for a new component in another language. Also, RabbitMQ is built on the Erlang programming language, which can spawn millions of lightweight processes, each in the matter of few microseconds (1 millionth of a second); thus, RabbitMQ can serve as a very reliable message bus that can effectively scale horizontally. A very reasonable way to scale Ruby and Python is to combine them with technologies like RabbitMQ, which can scale horizontally under tremendous load. Then you're using Python and Ruby for what they do best: Quick iteration and prototyping.

A final note: It's important that you write small programs and benchmark them with network programming. It's often easy to look at an API and follow along with the code example in your head. However, with network programming, you often can't do this. With network programming, the only way to really understand how something works is to write yourself a small program, benchmark it, then build something on top of that small program and benchmark it, and so forth. If you skip these steps, you might discover you've built something that doesn't scale the way your intuition told you it would.

Watch for the final articles in this series, where I'll show hands-on examples of investment analysis and statistical analysis using IPython and Pandas.


Downloadable resources


Related topics


Comments

Sign in or register to add and subscribe to comments.

static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Cloud computing, Open source
ArticleID=858654
ArticleTitle=Linear optimization in Python, Part 2: Build a scalable architecture in the cloud
publish-date=02202013