Encode video with MongoDB work queues

Techniques to easily store and process complex jobs

MongoDB is a revolutionary data store that you can use to handle work queues composed of rich data structures. This article discusses the techniques for applying this methodology to encoding video.

David Berube, Proprietor, Berube Consulting

David Berube is a consultant, speaker, and the author of Practical Rails Plugins, Practical Reporting with Ruby and Rails, and Practical Ruby Gems. You can reach him at info@berubeconsulting.com.



28 February 2012

Also available in Chinese Japanese

Overview

Web architecture is constantly growing more complex. At one point, websites typically consisted of static files on a web server. Now, even simple websites with little or no user interactivity are typically dynamic—run on a content management system backed by a database

Still, web applications have reached a certain level of standardization: they start with a similar architecture. You might imagine a site with a single web server and a single database server, they may even be the same machine. When a request comes in, the web server communicates with the database server. Based on the request, the web server typically queries the database server, and it may insert, delete, and so forth. The web server then responds to the user with a nicely formatted response. As the application scales, more web servers and database servers become involved.

As the architecture grows, the basic pattern remains: a request comes in, data is processed, and a response is sent back. However, eventually this isn't good enough. Some requests take too long and require the user to wait. You need to move these requests out of the response-request cycle. To solve this problem, you require a work queue.


What is a work queue?

A work queue is a simple queue—work is put in by producers and taken out by workers. Work queues separate the discovery or creation of a task from the actual execution of the task. This is useful because task execution is usually more resource intensive than task scheduling. When the task is scheduled, you can immediately report success to the end user, and then the actual execution of your task occurs later. This is important for web applications because it takes long-running tasks out of the request-response cycle, giving users immediate feedback. Users are typically much more tolerant of latency that isn't immediately exposed in the user interface, which is why offline processing through work queues is so important.

There are many approaches to creating work queues. One option, though naive, is to use a relational database management system (RDBMS). This is simple to implement because many architectures already have a database system such as MySQL. However, performance is less than optimal compared with other approaches. The atomicity, consistency, isolation, and durability (ACID) compliance required for RDBMS is not necessary for this scenario and negatively impacts performance. A simpler system can perform better.

One system that has gained in popularity for this use is Redis. It's a key-value data store, like the highly popular memcached, but with more features. For example, Redis has support for pushing and popping elements off lists in a highly scalable and efficient way. Resque, often used with Ruby on Rails, is a system built on top of Redis (see Resources for more details). However, Redis supports only simple primitives. You can't insert complex objects into the lists, and it has relatively limited support for managing items in those lists.

Alternatively, many systems use a message broker such as Apache ActiveMQ or RabbitMQ. Although these systems are fast and scalable, they're designed for simple messages. If you want to perform nontrivial reporting on your work queues or modify items in the queues, you are stuck because message brokers rarely offer those features. Fortunately, a powerful, scalable solution is available: MongoDB.

MongoDB allows you to create queues that contain complex nested data. Its locking semantics guarantee you won't experience problems with concurrency, and its scalability ensures you can run large systems. Because MongoDB is a powerful relational database, you can also run robust reporting on your queue and prioritize by complex criteria. However, MongoDB is not a traditional RDBMS. For instance, it does not support Structured Query Language (SQL) queries.

MongoDB has many appealing features in addition to excellent performance for work queues, such as a flexible, schemaless approach. It supports nested data structures, meaning you can even store subdocuments. Because it is a more full-featured data store than Redis, it provides a richer set of management functions so you can easily view, query, update, and delete jobs on any arbitrary criteria.

An example of encoding video with MongoDB work queues

This type of approach is useful in a wide variety of situations. For example, suppose you have several remote sites, each with a number of security cameras. Because of the large number of locations and the fact that high security would be overkill, these security cameras take still frame pictures every five seconds. You are tasked with collecting these pictures and encoding them into videos that are stored in a central location. Your coworker has already written a program to upload compressed (.zip) files containing still pictures into a remote server. For this example, you collect and encode the pictures using a MongoDB work queue coupled with FFmpeg, which is an open source video encoder. You encode the video into Theora, an open source video codec.

Listing 1 shows some code that monitors a directory for uploads and then enqueues all the files it finds.

Listing 1. The monitor.rb file
require 'lib/init'
require 'rb-inotify'

notifier = INotify::Notifier.new

watch_path = ARGV[1] || @app_config[:watch_path]

puts "watching #{watch_path}..."

# Use rb-inotify to watch the directory for changes:

notifier.watch(watch_path, :moved_to, :create) do |event|
	
		filename 	= "#{watch_path}/#{event.name}"
		file_size 	= File.size(filename) 
		file_type 	= `file -b #{Escape.shell_command(filename)}`.strip 

		new_record 	= {	:path=>filename, 
					:file_size=>file_size, 
					:file_type=>file_type, 
					:in_progress=>false, 
					:encoded=>false	}

		@queue_collection.insert(new_record) # enqueue the record

end

notifier.run

Notice that Listing 1 includes a reference to the lib/init.rb file. The code for that file is shown in Listing 2.

Listing 2. The lib/init.rb file
require 'rubygems'
require 'yaml'
require 'escape'
require 'mongo'

default_app_settings = {:watch_path=>'./incoming', 
	:encoded_path=>'./encoded',
	:frames_per_second=>0.25}

@app_config = default_app_settings

if File.exists?('config/app.yml')
	@app_config.merge!(YAML.load(File.open('config/app.yml')) || {}) 
end

Dir.mkdir(@app_config[:watch_path]) unless File.exists?(@app_config[:watch_path])
Dir.mkdir(@app_config[:encoded_path]) unless File.exists?(@app_config[:encoded_path])

default_mongo_settings = {:hostname=>'127.0.0.1', 
		:port=>27017, 
		:database=>'sample_db', 
		:collection=>'encode_queue'} 

mongo_config = default_mongo_settings

if File.exists?('config/mongo.yml')
	mongo_config.merge!(YAML.load(File.open('config/mongo.yml')) || {}) 

end

@conn = Mongo::Connection.new(mongo_config[:hostname], mongo_config[:port])
@db   = @conn[mongo_config[:database]]
@queue_collection = @db[mongo_config[:collection]]

The code in Listing 1 uses the rb-inotify gem to monitor the directory where incoming files are stored. By default, that file is simply called incoming. Install rb-inotify, along with several other necessary gems, as shown in Listing 3.

Listing 3. Installing gems
  sudo gem install rb-inotify mongo

The initializer script in Listing 2 contains default settings for the application and its connection to MongoDB. You can override these settings by creating a directory called config and adding two YAML files, app.yml and mongo.yml, into that directory. Listing 1 uses these settings to monitor the incoming files. The rb-inotify gem uses the inotify facility of Linux® to run a piece of code whenever a file is moved into or created in the incoming directory. The rb-inotify gem does support other types of events, such as deletions and modifications, but they aren't important for this example.

When it detects a new file, the monitor script then inserts a record into the MongoDB collection that includes the path, file size, and file type of the file. The monitor script uses the Linux file command to retrieve the file type, which can be useful for quickly diagnosing problems because you can look at database records to see if the file type appears to be correct. If a file type is obviously incorrect (for example, it's a LibreOffice spreadsheet and you are expecting a compressed file), then you can quickly determine that the fault is a bad input rather than a script error.


Processing the queue

Now that you have jobs present in the queue, you need to process those jobs. Fortunately, as you can see in Listing 4, this is easy.

Listing 4. The queue_runner.rb file
require 'lib/init'

puts "running queue with PID #{Process.pid}"

time_between_checks = 5 # in seconds 
encoder_information = {:hostname=>`hostname`, :process_id=>Process.pid}

while true
	# Search through the queue; if nothing is present, 
	# then the MongoDB API throws an exception. 
	# We trap that exception, and retry until something is found.
	
	row = @queue_collection.find_and_modify(
					:query=>{:in_progress=>false, 
						    :encoded=>false}, 
					:update=>{:$set=>{:in_progress=>true}}
					 ) rescue (sleep(time_between_checks); retry )
	if row 
		# If something is found, 
		# then we use the encode_zip_file script to encode it:
		
		# Create a filename for output video:
		timestamp = Time.now.strftime("%d_%m_%Y_%H%M%p")
		outfile = File.join(@app_config[:encoded_path], 
				    "video_" <<
				    "#{row['_id'].to_s}.ogv")

		infile = row['path']
		cmd = "ruby encode_zip_file.rb " << 
				Escape.shell_command(infile) << " " <<
				Escape.shell_command(outfile)

		output = `#{cmd} 2>&1` # Redirect STDERR to STDOUT, 
				       # so that we get all of the output

		@queue_collection.update({:_id=>row["_id"]}, 
					  {:$set=>{:encoder=>encoder_information, 
					  :encoded_video=>outfile,
					  :output=>output, 
					  :encoded=>true, 
					  :in_progress=>false}}) 

	end
end

This code runs in a constant loop checking for new work. It uses the MongoDB find_and_modify command, which has a twofold effect. As the name implies, the command first finds a record and then updates that record in a second atomic operation. This command only affects one record at a time, so you only receive one job at a time. The query clause of the find_and_modify command ensures that you don't work on jobs that are already processed or are being encoded by other processes. The update clause of the find_and_modify command sets the in_progress flag so that other processes don't start working on the job. When no record is found, MongoDB throws an exception. The code rescues that exception, sleeps for a polling interval, and then retries the operation.

When the code finds a record, it then processes the record. It creates a name for the filename based on the Mongo ID, which is guaranteed to be unique. It then calls the encode_zip_file.rb script, which the next section discusses. The code then updates the record. The record is no longer in progress because it's encoded. The code also updates the record to include the output path. So, if you want to do further work with the video file, such as display it to the user, you can. Finally, it includes the information (hostname, process ID, and so forth) on the current encoder process.

Note that, for simplicity, this script only accesses files stored locally. However, you can easily extend it so that it downloads files using File Transfer Protocol (FTP), Hypertext Transfer Protocol (HTTP), or a similar mechanism.


Encoding the movie

As you can see in Listing 5, the encode_zip_file.rb script uses FFmpeg to encode the video.

Listing 5. The encode_zip_file.rb script
require 'lib/init'
require 'ftools'
require 'tmpdir'
require 'pathname'

(puts "usage: #{$0} INPUT_FILE OUTPUT_FILE"; exit) unless ARGV.length == 2

input_file_raw = ARGV.first
output_file = ARGV.last

input = Pathname.new(input_file_raw).realpath.to_s
puts "processing #{input}"

temporary_directory = Dir.mktmpdir
temporary_image_directory = File.join(temporary_directory, 'images')

# Create directory to store images
Dir.mkdir(temporary_image_directory)

# Unzip zip file into temporary image directory:
cmd = "cd #{temporary_image_directory}; unzip #{Escape.shell_command(input)}"

`#{cmd}`

input_frames = Dir.glob("#{temporary_image_directory}/*")

index = 0

target_file_extension = File.extname(input_frames.first).downcase

# Sort input images by creation time,
# then copy them to the root of the temporary directory:

input_frames.sort_by { |f| File.ctime(f) }.each do |f|

	# ffmpeg needs a consistent file format, so we'll reformat the filenames:
	target_file = File.join(temporary_directory,
       				"frame_#{'%03i' % index}#{target_file_extension}")
	index = index + 1
	File.copy(f, target_file)
end

frames_per_second = @app_config[:frames_per_second]

# Encode the video:
cmd = 	"ffmpeg  -r #{frames_per_second} " <<
		"-i #{temporary_directory}/frame_%03d#{target_file_extension} " <<
		"-vcodec libtheora #{output_file}"

puts `#{cmd}`

This script takes an input file and an output file from the command line, decompresses them into a temporary directory, and then renames them because FFmpeg expects a consistent naming convention for its frames. By sorting the files and then renaming them, the script allows the compressed file images to have arbitrary naming while still allowing FFmpeg to process the frames.

FFmpeg has a straightforward set of options. The -r option sets the number of frames per second. The -i option sets the input file, which can be another video file if, for example, you want to do transcoding. In this case, though, you are working from a set of input files. The frame_%03d part means that FFmpeg expects files to be named frame_000, frame_001, and so forth. The script uses whatever extension the first file in the compressed file archive has, assuming all of the files have a similar file format. Finally, the -vcodec libtheora option tells FFmpeg to use the Theora codec. The script then simply runs the command and outputs the results.

You now have a working system to encode videos. You can verify this by starting MongoDB and then using the commands shown in Listing 6 in two different windows.

Listing 6. Starting the system
	ruby monitor.rb
	ruby queue_runner.rb

You can now see two directories created by the initializer script: incoming and encoded. Simply create a compressed file that contains several Joint Photographic Experts Group (JPEG) images and copy it into the incoming directory. You should see an .ogv file appear in the encoded directory that you can open in Firefox to view a video of your images.


Conclusion

As you can see, using MongoDB is straightforward and easy. You can extend the approaches mentioned in this article to work with a large variety of systems. MongoDB has a rich set of features to model nested data, and you can easily use that to deal with more complex job data. In short, MongoDB-driven work queues are a powerful, flexible approach that can scale to large queues. You won't regret using them in your application.

Resources

Learn

Get products and technologies

Discuss

  • Get involved in the developerWorks community. Connect with other developerWorks users while exploring the developer-driven blogs, forums, groups, and wikis.

Comments

developerWorks: Sign in

Required fields are indicated with an asterisk (*).


Need an IBM ID?
Forgot your IBM ID?


Forgot your password?
Change your password

By clicking Submit, you agree to the developerWorks terms of use.

 


The first time you sign into developerWorks, a profile is created for you. Information in your profile (your name, country/region, and company name) is displayed to the public and will accompany any content you post, unless you opt to hide your company name. You may update your IBM account at any time.

All information submitted is secure.

Choose your display name



The first time you sign in to developerWorks, a profile is created for you, so you need to choose a display name. Your display name accompanies the content you post on developerWorks.

Please choose a display name between 3-31 characters. Your display name must be unique in the developerWorks community and should not be your email address for privacy reasons.

Required fields are indicated with an asterisk (*).

(Must be between 3 – 31 characters.)

By clicking Submit, you agree to the developerWorks terms of use.

 


All information submitted is secure.

Dig deeper into Open source on developerWorks


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=1
Zone=Open source,
ArticleID=795740
ArticleTitle=Encode video with MongoDB work queues
publish-date=02282012