使用 MongoDB 工作队列进行视频编码!

轻松存储并处理复杂作业的技术

MongoDB 是一个革命性的数据存储,您可以用它来处理由丰富的数据结构组成的工作队列。本文将讨论将这种方法应用于视频编码的技术。

David Berube, 所有者, Berube Consulting

David Berube 是一名顾问和演讲者,他还是 Practical Rails Plugins, Practical Reporting with Ruby and RailsPractical Ruby Gems 这两本书的作者。



2012 年 5 月 04 日

概述

Web 架构不断变得更加复杂。过去,网站通常由 Web 服务器上的静态文件组成。现在,即使是很少或完全没有使用用户交互的简单网站,也通常是动态的,运行在由数据库支持的内容管理系统之上。

尽管如此,Web 应用程序已经实现了一定程度的标准化:他们从类似的架构开始着手。您可以想象由一个 Web 服务器和一个数据库服务器组成的站点,它们甚至有可能在同一台计算机上。当收到请求时,Web 服务器会与数据库服务器进行通信。基于请求,Web 服务器通常会查询数据库服务器,它还可以执行插入、删除等操作。然后,Web 服务器会使用一个格式良好的回复来响应用户。随着应用程序的扩展,所涉及的 Web 服务器和数据库服务器也就更多。

随着架构的发展,基本模式仍然是:收到请求,处理数据,发送回一个响应。然而,这个流程终究还是不够好。有些请求占用太长时间,需要用户等待。您需要将这些请求移出响应请求周期。为了解决这个问题,不必需要一个工作队列。


什么是工作队列?

工作队列 是一个简单的队列:生产者将工作放进去,处理者将它拿出来。工作队列将任务的发现或创建从任务的实际执行中分离出来。这样做非常有用,因为任务执行通常比任务调度具有更高的资源密集程度。在安排好任务之后,您就可以立即向最终用户报告任务安排成功,稍后便可实际执行该任务。这对于 Web 应用程序很重要,因为可以将长时间运行的任务移出 “请求-响应”周期,从而向用户提供即时反馈。用户对于不会立即暴露在用户界面中的延迟通常更宽容一些,这就是通过工作队列进行离线处理如此之重要的原因。

创建工作队列的方法有许多种。其中一个选择(比较原始)是使用关系数据库管理系统 (RDBMS)。该实现很简单,因为许多架构都已拥有数据库系统,如 MySQL。然而,与其他方法相比,该方法的性能并非最佳。在该场景中,RDBMS 所要求的原子性、一致性、隔离性和持久性 (ACID) 的合规性并不是必需的,并且会对性能产生负面影响。简单一些的系统可能会有更好的性能。

针对该用途而获得普及的一个系统是 Redis。它是一个键值数据存储,像极受欢迎的 memcached,但它有更多特性。例如,Redis 支持以高度可扩展和高效的方式将元素推入和弹出列表。经常与 Ruby on Rails 配合使用的 Resque,是构建于 Redis 之上的一个系统(详情请参阅 参考资料)。然而,Redis 只支持简单的原语。您不能将复杂对象插入列表,对于管理这些列表中的项目,它的支持相对有限。

此外,许多系统使用诸如 Apache ActiveMQ 或 RabbitMQ 等消息代理。虽然这些系统速度快并且可扩展,但它们是为简单消息而设计的。如果您打算在工作队列上执行复杂一点的报告,或者修改队列中的项目,那么您可能会被卡住,因为消息代理很少提供这些特性。幸运的是,现在有一个强大的、可扩展的解决方案:MongoDB。

MongoDB 允许您创建包含复杂嵌套数据的队列。它的锁定语义可确保您不会遇到并发问题,而它的可扩展性则可以确保您可以运行大型系统。因为 MongoDB 是一个强大的关系型数据库,所以您也可以在队列上运行强大的报告,并根据复杂的标准设置优先级。然而,MongoDB 并不是传统的 RDBMS。例如,它不支持结构化查询语言 (SQL) 查询。

MongoDB 除了性能优异的工作队列​​之外,还有许多吸引人的特性,比如灵活的无架构方法。它支持嵌套的数据结构,这意味着您甚至可以存储子文档。因为与 Redis 相比,它是一个功能更全面的数据存储,它提供更丰富的管理功能,让您可以根据任意条件轻松地查看、查询、更新和删除作业。

使用 MongoDB 工作队列进行视频编码的示例

这种类型的方法在多种情况下都非常有用。例如,假设您有若干个远程站点,每个站点都有一些安全摄像头。由于地点非常多,并且事实上并不需要高安全性,所以这些安全摄像头只是每隔五秒钟拍摄一张静态帧图片。您的任务是收集这些照片,并将它们编码成一个视频,然后将该视频存储在一个中央位置。您的同事已经写了一个程序,将包含静态图片的压缩 (.zip) 文件上传到一个远程服务器。对于这个示例,您可以使用 MongoDB 队列与 FFmpeg ​​来收集图片并对其进行编码,FFmpeg 是一个开源视频编码器。您需要将视频编码进 Theora,这是一个开源的视频编解码器。

清单 1 所示的代码监视上传目录,然后对它找到的所有文件进行排队。

清单 1. monitor.rb 文件
require 'lib/init'
require 'rb-inotify'

notifier = INotify::Notifier.new

watch_path = ARGV[1] || @app_config[:watch_path]

puts "watching #{watch_path}..."

# Use rb-inotify to watch the directory for changes:

notifier.watch(watch_path, :moved_to, :create) do |event|
	
		filename 	= "#{watch_path}/#{event.name}"
		file_size 	= File.size(filename) 
		file_type 	= `file -b #{Escape.shell_command(filename)}`.strip 

		new_record 	= {	:path=>filename, 
					:file_size=>file_size, 
					:file_type=>file_type, 
					:in_progress=>false, 
					:encoded=>false	}

		@queue_collection.insert(new_record) # enqueue the record

end

notifier.run

请注意,清单 1 中包含了一个对 lib/init.rb 文件的引用。该文件的代码如 清单 2 所示。

清单 2. lib/init.rb 文件
require 'rubygems'
require 'yaml'
require 'escape'
require 'mongo'

default_app_settings = {:watch_path=>'./incoming', 
	:encoded_path=>'./encoded',
	:frames_per_second=>0.25}

@app_config = default_app_settings

if File.exists?('config/app.yml')
	@app_config.merge!(YAML.load(File.open('config/app.yml')) || {}) 
end

Dir.mkdir(@app_config[:watch_path]) unless File.exists?(@app_config[:watch_path])
Dir.mkdir(@app_config[:encoded_path]) unless File.exists?(@app_config[:encoded_path])

default_mongo_settings = {:hostname=>'127.0.0.1', 
		:port=>27017, 
		:database=>'sample_db', 
		:collection=>'encode_queue'} 

mongo_config = default_mongo_settings

if File.exists?('config/mongo.yml')
	mongo_config.merge!(YAML.load(File.open('config/mongo.yml')) || {}) 

end

@conn = Mongo::Connection.new(mongo_config[:hostname], mongo_config[:port])
@db   = @conn[mongo_config[:database]]
@queue_collection = @db[mongo_config[:collection]]

在清单 1 中的代码使用 rb-inotify gem 来监视存储传入文件的目录。默认情况下,该文件被简称为 incoming。安装 rb-inotify 和其他几个必需的 gem,如 清单 3 所示。

清单 3. 安装 gem
  sudo gem install rb-inotify mongo

清单 2 中的初始化程序脚本中包含应用程序及其 MongoDB 连接的默认设置。您可以创建名为 config 的目录,并在该目录中添加两个 YAML 文件:app.yml 和 mongo.yml,用这些覆盖默认设置。清单 1 使用这些设置来监视传入的文件。每当 incoming 目录中移入或创建一个文件,rb-inotify gem 就会使用 Linux® 的 inotify 工具来运行一段代码。rb-inotify gem 不支持其他类型的事件(如删除和修改),但它们在本例中并不重要。

如果它检测到一个新文件,那么可以监视脚本,将一个记录插入 MongoDB 集合,该集合中包括文件的路径、文件大小和文件类型。监视脚本使用 Linux file 命令来检索文件类型,它在快速诊断问题时非常有用,因为您可以查看数据库中的记录,查看该文件的类型是否正确。如果某个文件类型是不正确的(例如,它是一个 LibreOffice 电子表格,而您预计的是一个压缩文件),那么您就可以迅速确定该错误是一个错误输入,而不是一个脚本错误。


处理队列

现在,您的队列中已经有一些作业,您需要处理这些作业。幸运的是,如 清单 4 中所示,这很容易。

清单 4. queue_runner.rb 文件
require 'lib/init'

puts "running queue with PID #{Process.pid}"

time_between_checks = 5 # in seconds 
encoder_information = {:hostname=>`hostname`, :process_id=>Process.pid}

while true
	# Search through the queue; if nothing is present, 
	# then the MongoDB API throws an exception. 
	# We trap that exception, and retry until something is found.
	
	row = @queue_collection.find_and_modify(
					:query=>{:in_progress=>false, 
						    :encoded=>false}, 
					:update=>{:$set=>{:in_progress=>true}}
					 ) rescue (sleep(time_between_checks); retry )
	if row 
		# If something is found, 
		# then we use the encode_zip_file script to encode it:
		
		# Create a filename for output video:
		timestamp = Time.now.strftime("%d_%m_%Y_%H%M%p")
		outfile = File.join(@app_config[:encoded_path], 
				    "video_" <<
				    "#{row['_id'].to_s}.ogv")

		infile = row['path']
		cmd = "ruby encode_zip_file.rb " << 
				Escape.shell_command(infile) << " " <<
				Escape.shell_command(outfile)

		output = `#{cmd} 2>&1` # Redirect STDERR to STDOUT, 
				       # so that we get all of the output

		@queue_collection.update({:_id=>row["_id"]}, 
					  {:$set=>{:encoder=>encoder_information, 
					  :encoded_video=>outfile,
					  :output=>output, 
					  :encoded=>true, 
					  :in_progress=>false}}) 

	end
end

此代码不断循环运行,检查是否有新的工作。该代码使用了 MongoDB find_and_modify 命令,此命令具有双重效果。顾名思义,该命令首先会查找一个记录,然后在第二个原子操作中更新该记录。此命令每次只影响一个记录,所以您每次只接收一个作业。find_and_modify 命令的 query 子句可以确保您不会对已经处理或正被其他进程编码的作业进行处理。find_and_modify 命令的 update 子句可以设置 in_progress 标志,使其他进程不会开始处理该作业。如果没有发现记录,MongoDB 会引发一个异常。代码挽救该异常,休眠一个轮询间隔,然后重试操作。

如果代码找到一个记录,它就会处理该记录。它根据 Mongo ID 创建一个文件名,保证该名称是惟一的。然后它会调用 encode_zip_file.rb 脚本,下一节中会讨论此脚本。随后,该代码会更新记录。该纪录的状态不再是 “处理中”,因为它已被编码。该代码还会更新记录,使其包括输出路径。所以,如果您想对视频文件再做进一步的操作,比如向用户显示它,您可以轻松做到。最后,它还包括有关当前编码进程的信息(主机名、进程 ID 等等)。

请注意,为简单起见,该脚本只访问本地存储的文件。但是,您可以轻松地扩展它,使它可以使用文件传输协议 (FTP)、超文本传输​​协议 (HTTP) 或类似的机制下载文件。


对影片进行编码

清单 5 所示,encode_zip_file.rb 脚本使用 FFmpeg 进行视频编码。

清单 5. encode_zip_file.rb 脚本
require 'lib/init'
require 'ftools'
require 'tmpdir'
require 'pathname'

(puts "usage: #{$0} INPUT_FILE OUTPUT_FILE"; exit) unless ARGV.length == 2

input_file_raw = ARGV.first
output_file = ARGV.last

input = Pathname.new(input_file_raw).realpath.to_s
puts "processing #{input}"

temporary_directory = Dir.mktmpdir
temporary_image_directory = File.join(temporary_directory, 'images')

# Create directory to store images
Dir.mkdir(temporary_image_directory)

# Unzip zip file into temporary image directory:
cmd = "cd #{temporary_image_directory}; unzip #{Escape.shell_command(input)}"

`#{cmd}`

input_frames = Dir.glob("#{temporary_image_directory}/*")

index = 0

target_file_extension = File.extname(input_frames.first).downcase

# Sort input images by creation time,
# then copy them to the root of the temporary directory:

input_frames.sort_by { |f| File.ctime(f) }.each do |f|

	# ffmpeg needs a consistent file format, so we'll reformat the filenames:
	target_file = File.join(temporary_directory,
       				"frame_#{'%03i' % index}#{target_file_extension}")
	index = index + 1
	File.copy(f, target_file)
end

frames_per_second = @app_config[:frames_per_second]

# Encode the video:
cmd = 	"ffmpeg  -r #{frames_per_second} " <<
		"-i #{temporary_directory}/frame_%03d#{target_file_extension} " <<
		"-vcodec libtheora #{output_file}"

puts `#{cmd}`

该脚本通过命令行获取一个输入文件和输出文件,将它们解压缩到一个临时目录中,然后重命名它们,因为 FFmpeg 要求其帧有一致的命名约定。通过对文件进行排序,然后重命名它们,该脚本允许压缩文件的图像使用任意名称,同时使 FFmpeg 仍然能够处理这些帧。

FFmpeg 有一个直观的选项集。-r 选项用于设置每秒的帧数。-i 选项用于设置输入文件,例如,如果您想执行转码,输入文件可以是另一个视频文件。虽然在本例中,您要处理的是一组输入文件。frame_%03d 部分意味着 FFmpeg 希望将文件命名为 frame_000frame_001 等。该脚本使用了压缩文件档案中第一个文件的扩展名,假设所有文件都有类似的文件格式。最后,-vcodec libtheora 选项告诉 FFmpeg 使用 Theora 编解码器。然后,该脚本只需运行命令并输出结果即可。

现在,您已拥有一个进行视频编码的工作系统。您可以通过启动 MongoDB 并在两个不同窗口中使用清单 清单 6 中所示的命令来验证该系统。

清单 6. 启动系统
	ruby monitor.rb
	ruby queue_runner.rb

您现在能看到由初始化程序脚本创建的两个目录:incoming 和 encoded。只需创建包含若干个联合图像专家组 (Joint Photographic Experts Group, JPEG) 图像的一个压缩文件,并将其复制到 incoming 目录中,然后您应该在 encoded 目录中看到一个 .ogv 文件,您可以在 Firefox 浏览器中打开该文件并观看您的图像视频。


结束语

如您所见,使用 MongoDB 既直观又简单。您可以扩展本文中提到的方法,将它用于各种各样的系统。MongoDB 有一组丰富的特性,可以用它们对嵌套数据进行建模,您可以使用这些特性轻松地处理更复杂的作业数据。简而言之,MongoDB 驱动的工作队列是一个强大、灵活的方法,可扩展到大型队列。您不会后悔在应用程序中使用它们。

参考资料

学习

获得产品和技术

讨论

条评论

developerWorks: 登录

标有星(*)号的字段是必填字段。


需要一个 IBM ID?
忘记 IBM ID?


忘记密码?
更改您的密码

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件

 


在您首次登录 developerWorks 时,会为您创建一份个人概要。您的个人概要中的信息(您的姓名、国家/地区,以及公司名称)是公开显示的,而且会随着您发布的任何内容一起显示,除非您选择隐藏您的公司名称。您可以随时更新您的 IBM 帐户。

所有提交的信息确保安全。

选择您的昵称



当您初次登录到 developerWorks 时,将会为您创建一份概要信息,您需要指定一个昵称。您的昵称将和您在 developerWorks 发布的内容显示在一起。

昵称长度在 3 至 31 个字符之间。 您的昵称在 developerWorks 社区中必须是唯一的,并且出于隐私保护的原因,不能是您的电子邮件地址。

标有星(*)号的字段是必填字段。

(昵称长度在 3 至 31 个字符之间)

单击提交则表示您同意developerWorks 的条款和条件。 查看条款和条件.

 


所有提交的信息确保安全。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=10
Zone=Open source
ArticleID=812987
ArticleTitle=使用 MongoDB 工作队列进行视频编码!
publish-date=05042012