MongoDB のワーク・キューを使用して動画をエンコードする

複雑なジョブを容易に格納、処理するための技術

MongoDB はリッチなデータ構造で構成されるワーク・キューを扱うために使用できる画期的なデータ・ストアです。この記事では、この技術を動画のエンコードに適用する方法について説明します。

David Berube, Proprietor, Berube Consulting

David Berube はコンサルタント、講演者、そして『Practical Rails Plugins』、『Practical Reporting with Ruby and Rails』、『Practical Ruby Gems』の著者です。



2012年 3月 29日

概要

Web アーキテクチャーは複雑化の一途をたどっています。かつて、Web サイトは Web サーバー上の静的ファイルで構成されることが普通でした。しかし現在では、ユーザーとのやりとりがほとんどない、またはまったくない単純な Web サイトでさえ、動的な構成になっている (つまりデータベースを用いたコンテンツ管理システムを使用している) のが普通です。

とは言え、Web アプリケーションはあるレベルで標準化されてきており、似たようなアーキテクチャーを出発点とするようになっています。こう言うと、皆さんは 1 つの Web サーバーと 1 つのデータベース・サーバーで構成されるサイトを想像するかもしれませんが、Web サーバーとデータベース・サーバーが同じマシンで構成される場合もあるかもしれません。リクエストを受信すると、Web サーバーはデータベース・サーバーと通信します。そのリクエストに応じて、Web サーバーは通常、データベース・サーバーにクエリーを実行し、挿入、削除、等々を実行します。次に Web サーバーは適切にフォーマット設定されたレスポンスでユーザーに応答します。アプリケーションの規模が大きくなると、そのための Web サーバーとデータベース・サーバーの数も多くなります。

アーキテクチャーが複雑になっても基本的なパターンは同じままです。つまりリクエストを受信するとデータが処理され、レスポンスが返送されます。しかしやがて、それでは不十分になります。一部のリクエストは処理に時間がかかりすぎ、ユーザーはレスポンスが返されるのを待たなければなりません。これらのリクエストはリクエスト・レスポンス・サイクルから排除する必要があります。この問題を解決するためにはワーク・キューが必要です。


ワーク・キューとは何か

「ワーク・キュー」は単純なキューであり、ワーク (ジョブ) が「プロデューサー」によってキューに入れられ、「ワーカー」によってキューから取り出されます。ワーク・キューにより、タスクの発見または作成と実際のタスクの実行とを分離することができます。通常はタスクの実行の方がタスクのスケジューリングよりも多くのリソースを必要とするため、ワーク・キューは実用的です。タスクがスケジューリングされると、即座にエンド・ユーザーにスケジューリングが正常に行われたことを伝えることができます。そして実際にタスクが実行されるのはその後になります。これは Web アプリケーションにとって重要です。というのも、長時間にわたって実行されるタスクをワーク・キューによってリクエスト・レスポンス・サイクルから排除することができ、ユーザーに即座にフィードバックを返せるからです。ユーザーは通常、ユーザー・インターフェースに即座に反映されることのない処理の遅延に関しては、即座のレスポンスが要求される処理の遅延に比べてはるかに寛容です。そのため、ワーク・キューを使用するオフライン処理が非常に重要なのです。

ワーク・キューの作成方法は数多くあります。1 つの方法は、単純ながらも RDMBS (Relational Database Management System) を使用する方法です。多くのアーキテクチャーには既に MySQL などのデータベース・システムが含まれているため、RDMBS を使用する方法を実装するのは簡単です。ただし、他の方法に比べるとパフォーマンスは劣ります。RDBMS に要求される ACID 特性 (Atomicity (アトミック性)、Consistency (一貫性)、Isolation (独立性)、Durability (永続性)) への準拠はワーク・キューのシナリオには不要であり、ACID 特性に準拠しようとするとパフォーマンスに悪影響が及びます。もっと単純なシステムの方がより高いパフォーマンスを実現することができます。

ワーク・キューによく使われるシステムの 1 つが Redis です。Redis は非常によく使われている memcached と同じように、キー・バリュー型のデータ・ストアですが、より多くの機能を持っています。例えば、Redis は非常にスケーラブルかつ効率的な方法でリストの要素をプッシュ、ポップすることができます。Ruby on Rails でよく使用される Resque は Redis の上に構築されたシステムです (詳細については「参考文献」を参照)。しかし Redis でサポートされているのは単純なプリミティブのみです。複雑なオブジェクトをリストに挿入することはできず、リストの中の項目を管理する機能も比較的限定されています。

それらに代わる手段として、多くのシステムでは Apache ActiveMQ や RabbitMQ などのメッセージ・ブローカーを使用しています。これらのシステムは高速でスケーラブルですが、単純なメッセージを扱うように設計されているため、ワーク・キューに関する複雑なレポートを作成したい場合や、キュー内の項目を変更したい場合には、どうすることもできません。メッセージ・ブローカーのほとんどは、そうした機能を持っていないからです。幸いなことに、MongoDB という強力でスケーラブルなソリューションがあります。

MongoDB を使用すると、ネストされた複雑なデータを含むキューを作成することができます。MongoDB にはロック・メカニズムがあるため、並行処理で問題が発生することはありません。また MongoDB はスケーラブルであるため、大規模なシステムを実行可能なことに加え、強力なリレーショナル・データベースである MongoDB は、キューに関する確実なレポートを作成することや、複雑な基準で優先順位を付けることができます。ただし MongoDB は従来の RDBMS とは異なります。例えば、MongoDB は SQL (Structured Query Language) のクエリーをサポートしていません。

MongoDB は優れたパフォーマンスのワーク・キューに加え、柔軟でスキーマが不要であるなど、数多くの魅力的な特徴があります。MongoDB はネストされたデータ構造をサポートしており、サブ文書を格納することもできます。MongoDB は Redis に比べて機能が豊富に用意されたデータ・ストアであるため、ワーク・キューを操作するための機能も数多くサポートしており、あらゆる基準に基づいて容易にジョブを表示、照会、更新、削除することができます。

MongoDB のワーク・キューを使用して動画をエンコードする例

この種の手法はさまざまな状況で有効です。例えば、多数の監視カメラが設置された何ヶ所かの遠隔地を監視するとします。監視対象の場所が多く、高度なセキュリティーは過剰であることから、これらの監視カメラは 5 秒ごとに静止画を撮影します。皆さんのタスクは、これらの画像を収集して動画にエンコードし、中央の場所に格納することです。静止画を含む圧縮された (.zip) ファイルをリモート・サーバーにアップロードするためのプログラムは、皆さんの同僚が既に作成を終えています。この例では、皆さんは MongoDB のワーク・キューと FFmpeg (オープンソースのビデオ・エンコーダー) を使用して画像を収集し、エンコードします。ここでは動画を Theora (オープンソースのビデオ・コーデック) にエンコードします。

リスト 1 のコードはアップロード用のディレクトリーをモニターし、検出したすべてのファイルをキューに入れます。

リスト 1. monitor.rb ファイル
require 'lib/init'
require 'rb-inotify'

notifier = INotify::Notifier.new

watch_path = ARGV[1] || @app_config[:watch_path]

puts "watching #{watch_path}..."

# Use rb-inotify to watch the directory for changes:

notifier.watch(watch_path, :moved_to, :create) do |event|
	
		filename 	= "#{watch_path}/#{event.name}"
		file_size 	= File.size(filename) 
		file_type 	= `file -b #{Escape.shell_command(filename)}`.strip 

		new_record 	= {	:path=>filename, 
					:file_size=>file_size, 
					:file_type=>file_type, 
					:in_progress=>false, 
					:encoded=>false	}

		@queue_collection.insert(new_record) # enqueue the record

end

notifier.run

リスト 1 では、lib/init.rb ファイルを読み込んでいることに注意してください。この lib/init.rb ファイルのコードをリスト 2 に示します。

リスト 2. lib/init.rb ファイル
require 'rubygems'
require 'yaml'
require 'escape'
require 'mongo'

default_app_settings = {:watch_path=>'./incoming', 
	:encoded_path=>'./encoded',
	:frames_per_second=>0.25}

@app_config = default_app_settings

if File.exists?('config/app.yml')
	@app_config.merge!(YAML.load(File.open('config/app.yml')) || {}) 
end

Dir.mkdir(@app_config[:watch_path]) unless File.exists?(@app_config[:watch_path])
Dir.mkdir(@app_config[:encoded_path]) unless File.exists?(@app_config[:encoded_path])

default_mongo_settings = {:hostname=>'127.0.0.1', 
		:port=>27017, 
		:database=>'sample_db', 
		:collection=>'encode_queue'} 

mongo_config = default_mongo_settings

if File.exists?('config/mongo.yml')
	mongo_config.merge!(YAML.load(File.open('config/mongo.yml')) || {}) 

end

@conn = Mongo::Connection.new(mongo_config[:hostname], mongo_config[:port])
@db   = @conn[mongo_config[:database]]
@queue_collection = @db[mongo_config[:collection]]

リスト 1 のコードは rb-inotify gem を使用して、受信したファイルを格納するディレクトリーをモニターします。デフォルトで、受信ファイルは単純に incoming と呼ばれます。リスト 3 のように、他に必要ないくつかの gem と共に rb-inotify をインストールします。

リスト 3. gem をインストールする
  sudo gem install rb-inotify mongo

リスト 2 の初期化スクリプトには、アプリケーションのデフォルトの設定と、アプリケーションと MongoDB との接続が含まれています。これらの設定を変更するために、config というディレクトリーを作成し、そのディレクトリーに 2 つの YAML ファイル (app.yml と mongo.yml) を追加します。リスト 1 はこれらの設定を使用して受信ファイルを監視しています。incoming ディレクトリーにファイルが追加または作成されると、必ず rb-inotify gem は Linux の inotify 機能を使用してコード・スニペットを実行します。rb-inotify gem は実際には他のタイプのイベント (削除や変更など) もサポートしていますが、この例ではそれらのイベントは重要ではありません。

モニター・スクリプトは新しいファイルを検出すると、MongoDB のコレクションにレコードを挿入します。このレコードには、そのファイルのパス、ファイル・サイズ、ファイル・タイプが含まれています。モニター・スクリプトは Linux の file コマンドを使用してファイル・タイプを取得します。この方法は問題を素早く診断する上で役立つ場合があります。データベースのレコードを調べればファイル・タイプが適切であるかどうかを判断することができるからです。ファイル・タイプが明らかに不適切な場合 (例えばファイルが LibreOffice スプレッドシートでありながら圧縮ファイルが想定されている場合など) には、問題なのはスクリプトの誤りではなく不適切な入力によるものだと即座に判断することができます。


キューを処理する

キューにジョブが入れられたので、これらのジョブを処理する必要があります。幸い、リスト 4 を見るとわかるように、ジョブの処理は簡単です。

リスト 4. queue_runner.rb ファイル
require 'lib/init'

puts "running queue with PID #{Process.pid}"

time_between_checks = 5 # in seconds 
encoder_information = {:hostname=>`hostname`, :process_id=>Process.pid}

while true
	# Search through the queue; if nothing is present, 
	# then the MongoDB API throws an exception. 
	# We trap that exception, and retry until something is found.
	
	row = @queue_collection.find_and_modify(
					:query=>{:in_progress=>false, 
						    :encoded=>false}, 
					:update=>{:$set=>{:in_progress=>true}}
					 ) rescue (sleep(time_between_checks); retry )
	if row 
		# If something is found, 
		# then we use the encode_zip_file script to encode it:
		
		# Create a filename for output video:
		timestamp = Time.now.strftime("%d_%m_%Y_%H%M%p")
		outfile = File.join(@app_config[:encoded_path], 
				    "video_" <<
				    "#{row['_id'].to_s}.ogv")

		infile = row['path']
		cmd = "ruby encode_zip_file.rb " << 
				Escape.shell_command(infile) << " " <<
				Escape.shell_command(outfile)

		output = `#{cmd} 2>&1` # Redirect STDERR to STDOUT, 
				       # so that we get all of the output

		@queue_collection.update({:_id=>row["_id"]}, 
					  {:$set=>{:encoder=>encoder_information, 
					  :encoded_video=>outfile,
					  :output=>output, 
					  :encoded=>true, 
					  :in_progress=>false}}) 

	end
end

このコードは無限ループを実行し、新しい作業がないかどうかを調べます。ループの中で、2 つの効果を持つ MongoDB の find_and_modify コマンドを使用しています。名前からもわかるように、find_and_modify コマンドは最初にレコードを検出し、次にその検出したレコードを 2 番目のアトミック処理で更新します。このコマンドは一度に 1 つのレコードにしか影響を与えないため、一度に受信するジョブは 1 つのみです。find_and_modify コマンドの query 節は、他のプロセスによって既に処理されたジョブやエンコード中のジョブを実行しないようにしています。find_and_modify コマンドの update 節は in_progress フラグをセットし、他のプロセスがそのジョブの実行を始めることがないようにします。レコードが見つからない場合には、MongoDB は例外をスローします。このコードはその例外を rescue で処理し、ポーリング周期の間スリープした後、上記の操作を再度試みます。

レコードが検出されると、そのレコードが処理されます。このコードは、一意であることが保証されている Mongo ID に基づき、ファイルに名前を付けます。次に、このコードは encode_zip_file.rb スクリプトを呼び出します。このスクリプトについては次のセクションで説明します。続いてレコードを更新します。レコードはエンコード済みなので、もはや in_progress ではありません。またこのコードは出力パスを含むようにレコードを更新します。そのため、この動画ファイルに対して (例えばユーザーに表示するなど) さらに処理を実行することができます。最後に、現在のエンコーダー・プロセスの情報 (ホスト名、プロセス ID など) をレコードに含めます。

簡単のため、このスクリプトのアクセス対象がローカルに保存されているファイルのみであることに注意してください。ただし、このスクリプトを拡張し、FTP (File Transfer Protocol) や HTTP (HyperText Transfer Protocol) などのメカニズムを使用してファイルをダウンロードできるようにするのも簡単です。


動画をエンコードする

リスト 5 を見るとわかるように、encode_zip_file.rb スクリプトは FFmpeg を使用して動画をエンコードしています。

リスト 5. encode_zip_file.rb スクリプト
require 'lib/init'
require 'ftools'
require 'tmpdir'
require 'pathname'

(puts "usage: #{$0} INPUT_FILE OUTPUT_FILE"; exit) unless ARGV.length == 2

input_file_raw = ARGV.first
output_file = ARGV.last

input = Pathname.new(input_file_raw).realpath.to_s
puts "processing #{input}"

temporary_directory = Dir.mktmpdir
temporary_image_directory = File.join(temporary_directory, 'images')

# Create directory to store images
Dir.mkdir(temporary_image_directory)

# Unzip zip file into temporary image directory:
cmd = "cd #{temporary_image_directory}; unzip #{Escape.shell_command(input)}"

`#{cmd}`

input_frames = Dir.glob("#{temporary_image_directory}/*")

index = 0

target_file_extension = File.extname(input_frames.first).downcase

# Sort input images by creation time,
# then copy them to the root of the temporary directory:

input_frames.sort_by { |f| File.ctime(f) }.each do |f|

	# ffmpeg needs a consistent file format, so we'll reformat the filenames:
	target_file = File.join(temporary_directory,
       				"frame_#{'%03i' % index}#{target_file_extension}")
	index = index + 1
	File.copy(f, target_file)
end

frames_per_second = @app_config[:frames_per_second]

# Encode the video:
cmd = 	"ffmpeg  -r #{frames_per_second} " <<
		"-i #{temporary_directory}/frame_%03d#{target_file_extension} " <<
		"-vcodec libtheora #{output_file}"

puts `#{cmd}`

このスクリプトはコマンドラインから入力される 2 つのファイルの名前 (入力ファイルの名前と出力ファイルの名前) を引数に取り、それらを一時ディレクトリーで解凍してからリネームしています。これは FFmpeg がフレームの命名規則に一貫性を必要とするためです。このスクリプトではファイルをソートしてからリネームすることで、圧縮された画像ファイルが任意の名前を持つことができる一方、それでも FFmpeg はこれらのフレームを処理することができます。

FFmpeg には単純な一連のオプションがあります。-r オプションは毎秒のフレーム数を設定します。-i オプションは入力ファイルを設定します。例えばトランスコーディングをしたい場合には、入力ファイルは別の動画ファイルにすることができます。ただしこの記事の場合には一連の入力ファイルを対象にしています。frame_%03d の部分は、FFmpeg ではファイル名が frame_000frame_001、・・・のようになっていることを想定しています。このスクリプトでは、圧縮ファイル・アーカイブ内のすべてのファイルが同様のファイル・フォーマットであるという前提の下、圧縮ファイル・アーカイブ内の先頭にあるファイルの拡張子を (その種類によらず) 使用します。最後に、-vcodec libtheora オプションによって、Theora コーデックを使用するように FFmpeg に指示します。そして、このスクリプトは単純にコマンドを実行し、その結果を出力します。

これで、動画をエンコードするための実際に動作するシステムが出来上がりました。このシステムの動作を確認するためには、まず MongoDB を起動し、リスト 6 のコマンドを 2 つの異なるウィンドウで使用します。

リスト 6. システムを起動する
	ruby monitor.rb
	ruby queue_runner.rb

すると、初期化スクリプトによって incoming と encoded という 2 つのディレクトリーが作成されることがわかります。単純に、いくつかの JPEG (Joint Photographic Experts Group) 画像を含む圧縮ファイルを作成して incoming ディレクトリーにコピーします。これにより、encoded ディレクトリーに .ogv ファイルが現れ、そのファイルを Firefox で開くと、静止画からエンコードされた動画が表示されるはずです。


まとめ

この記事からわかるように、MongoDB を使用するのは単純であり、簡単です。この記事で紹介した手法を拡張すると、多種多様なシステムに対応することができます。MongoDB はネストされたデータをモデル化するための豊富な機能を備えており、それらの機能を使用することで、さらに複雑なジョブ・データを容易に処理することができます。簡単に言えば、MongoDB によるワーク・キューは大規模なキューにまでスケーリングすることができる強力かつ柔軟な手法です。皆さんのアプリケーションに MongoDB を使用して後悔することはないはずです。

参考文献

学ぶために

製品や技術を入手するために

  • MongoDB は高速でオープンソース、そして文書指向のデータベースです。
  • Apache ActiveMQ はオープンソースのメッセージング・システムです。メッセージング・システムもキューを使用する作業のための適切な手段です。
  • Resque は Redis を使用してジョブをエンキューするためのシステムです。
  • データベース・アプリケーションに関心がある皆さんは、developerWorks のソフトウェア評価版のダウンロード・ページから入手できる無料の DB2 Express-C にも関心があるのではないでしょうか。

議論するために

  • developerWorks コミュニティーに参加してください。ここでは他の developerWorks ユーザーとのつながりを持てる他、開発者によるブログ、フォーラム、グループ、ウィキを調べることができます。

コメント

developerWorks: サイン・イン

必須フィールドは(*)で示されます。


IBM ID が必要ですか?
IBM IDをお忘れですか?


パスワードをお忘れですか?
パスワードの変更

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


お客様が developerWorks に初めてサインインすると、お客様のプロフィールが作成されます。会社名を非表示とする選択を行わない限り、プロフィール内の情報(名前、国/地域や会社名)は公開され、投稿するコンテンツと一緒に表示されますが、いつでもこれらの情報を更新できます。

送信されたすべての情報は安全です。

ディスプレイ・ネームを選択してください



developerWorks に初めてサインインするとプロフィールが作成されますので、その際にディスプレイ・ネームを選択する必要があります。ディスプレイ・ネームは、お客様が developerWorks に投稿するコンテンツと一緒に表示されます。

ディスプレイ・ネームは、3文字から31文字の範囲で指定し、かつ developerWorks コミュニティーでユニークである必要があります。また、プライバシー上の理由でお客様の電子メール・アドレスは使用しないでください。

必須フィールドは(*)で示されます。

3文字から31文字の範囲で指定し

「送信する」をクリックすることにより、お客様は developerWorks のご使用条件に同意したことになります。 ご使用条件を読む

 


送信されたすべての情報は安全です。


static.content.url=http://www.ibm.com/developerworks/js/artrating/
SITE_ID=60
Zone=Open source
ArticleID=806649
ArticleTitle=MongoDB のワーク・キューを使用して動画をエンコードする
publish-date=03292012