読者です 読者をやめる 読者になる 読者になる

Simple Queue Serviceに触ってみた

Python Amazon

Amazon SQS (Simple Queue Service)はAmazonが提供しているメッセージング・ウェブサービス

自分の周りでは実際に使っているっていう人を聞いたことがないし、ぐぐっても日本語の情報はそんなに出てこない。たぶん、

  • メッセージング・サーバーが北米だかヨーロッパにあるんだか分からんが、わざわざそんな遠方にある(かもしれない)サーバーを使ってメッセージングする必要なくね?
  • そもそもEC2を使うならば、EC2のインスタンスに自前でSTOMPやAMQP等のメッセージング・サーバーを立てればよくね?

みたいな、誰もが思う理由で日本では使われないんだろうけど。

それでも前々から「一度は触ってみなくてはいかんな」と思い、ようやく触ってみた。その感想を先に書いておくと、

  • APIは本当に"Simple"で使いやすい
  • メッセージ送信者・受信者がEC2内のインスタンスならば、これは「あり」ではないか
  • ただしSQSの制限(メッセージの最大容量8K/メッセージの順番は保障されない/メッセージが二重に受信される場合もある等)を許容できる場合に限る

と思った。ローカルPCからメッセージの送受信を行ったので、「もっさりしているなぁ(特に送信)」という印象は否めなかったが、EC2インスタンスからの送受信は試していない。

SQSのサインアップ

SQSトップページの"Sign Up For Amazon SQS"からサインアップ。Amazon web services(AWS)のアカウントをもっていないならば、アカウント作成・クレジットカードの登録が必要。(AWSのアカウントをもっていないのが許されるのは小学生まで!と思うが、自分に対してマジレスすると小中学生がクレジットカードを持っている例はまれであろう・・・)

サインアップが終わったら、Your Account > Security Credentialsから"Access Key ID", "Secret Access Key"を取得する。

AWSって最初の設定が結構面倒そうな先入観をもっていたが、実際は、AWSアカウントを持っていれば、ここまで1分もかからない。素晴らしい。

botoをインストールする

SQSへのアクセスには、今回はIBM developerWorksでも紹介されているPythonライブラリbotoを使う。他の言語のことは調べていない。(他の言語っていっても、僕はScalaとかScalaとかScalaがほんの少し書けるくらいだけど)

botoのインストールは例によって、

$ easy_install install boto

か、

$ pip install boto

developerWorksの記事ではbotoのバージョンは1.4cだったが、本稿執筆時点でインストールされたbotoのバージョンは1.8dだった。

botoを使ってのSQSへのアクセスの仕方

上記のdWの記事は15分で試せる素晴らしいSQS/botoチュートリアルなのでここにわざわざ書く必要も少ないかと思うが、僕が記事を読んで疑問に思ったこと、実際に試していった順番をメモしていく形で、基本的な使い方を示す。

以下の記事を書いてから見つけたけど、botoのプロジェクトページからリンクされている、An Introduction to boto's SQS interfacceも良いチュートリアルだと思う。

c.f.

キューの作成/取得/削除

まず、AWSのAcess Key/Secret Access Keyを使ってSQSへの接続を確立する。

>>> import boto
>>> conn = boto.connect_sqs(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)

dWの記事ではAcess Key/Secret Access Keyを環境変数に設定しているが、こういう使い方は「ちょっと試してみる」といった場合を除いてまずしないと思う。

接続を確立できたら、名前を指定してキューを作成する。

>>> queue = conn.create_queue('queue_name')

既に作成ずみのキューを取得するには、

>>> queue = conn.get_queue('queue_name')

どちらの場合も、成功時にはキューをあらわすオブジェクトを取得できる。もし指定した名前のキューがない場合は、

>>> q = conn.get_queue('unexisiting-queue')
>>> assert q is None

のように、Noneが返る。

すべてのキューを取得するには、

>>> conn.get_all_queues()
[<boto.sqs.queue.Queue instance at 0xb5b878>]

のように、get_all_queues()メソッドを使う。返り値はboto.sqs.queue.Queueオブジェクトのリスト。

キューを削除したい場合には、

>>> queue = get_queue('queue_name')
>>> queue.clear()
>>> conn.delete(queue)

のように、queue.clear()でメッセージを消去して、その後にdelete()メソッドで削除するようだが、実際に削除して、同じ名前でキュー作成して・・・ということを何度か試していたら、下記のようなエラーに遭遇した。

You must wait 60 seconds after deleting a queue before you can create another with the same name.

つまり、キューを削除した後は、60秒間待たないと同名のキューを作成することができないらしい。実際に運用することになったらこのような状況になることは、まずないと思うが、作って消してのテストスクリプトを何度も実行しているとしばしば遭遇する。

メッセージの送信/受信

キューを作成/取得したらならば、次にメッセージを送信する。

>>> from boto.sqs.message import Message
>>> msg = Message()
>>> msg.set_body("Hello SQS")
>>> queue.write(msg)
True

set_body()メソッドをわざわざ呼ぶのが冗長だと思うならば、次のようにbody引数に指定してもよい。

>>> from boto.sqs.message import Message
>>> msg = Message(body="Hello SQS")
>>> queue.write(msg)
True

ユニコード文字列をメッセージのBodyに指定する場合でも、内部的にはStringIOを呼んでメッセージをエンコードして・・・のような処理をしているだけなので、基本はPython 2.x系のString/Unicodeの話なので特に突っ込まない。

>>> from boto.sqs.message import Message
>>> msg = Message(body=u"こんにちはSQS")
>>> queue.write(msg)
True

メッセージを受信する側は、

msgs = queue.get_messages()

でメッセージを待ち受ける。get_messages()はメッセージオブジェクトのリスト。リストなので、

for msg in msgs:
  # do something
  print msg.get_body()

のようにして、一つ一つのメッセージを逐次処理する必要がある。

受信側の注意点として、get_messages()でメッセージを受け取っても自動的にキューから削除されるわけではない(、ただし、メッセージ保存期間が過ぎたら消える)ので、

queue.delete_message(msg)

のようにして、明示的にメッセージを削除する必要がある。このあたりは、少なくともAMQPとかQ4Mとかの振る舞いとは違うと思う。

メッセージ送受信のサンプル

以上の点をふまえて、テスト用に次のようなコードを書いてみた。

# -*- coding: utf-8 -*-
import sys
import boto
from boto.exception import SQSError
from boto.sqs.message import Message

AWS_ACCESS_KEY_ID='your_access_key'
AWS_SECRET_ACCESS_KEY='your_secret_key'

QUEUE_NAME = 'test-sqs-1'

def consumer(queue):
    while 1:
        msgs = queue.get_messages()
        for msg in msgs:
            # do something
            print msg.get_body()

            # remove message
            queue.delete_message(msg)

def producer(queue):
    msg = Message(body=u"こんにちはSQS(%s)" % x)
    queue.write(msg)

if __name__ == '__main__':
    try:
        func = { 'consumer': consumer,
                 'producer': producer,
                 }[sys.argv[1]]
    except (KeyError, IndexError):
        print "Usage: %s (producer|consumer)" % sys.argv[0]
        sys.exit(1)
    else:
        try:
            conn = boto.connect_sqs(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
            queue = conn.get_queue(QUEUE_NAME) or conn.create_queue(QUEUE_NAME)
            func(queue)
        except SQSError, e:
            print e.status, e.reason, e.body

ターミナルを二つ立ち上げて、

$ python test_sqs.py consumer

のようにメッセージ受信者のプロセスを起動し、もう一方のターミナルで、

$ python test_sqs.py producer

のようにしてメッセージを送信する。

ドキュメントもコードを完全に読み込んだわけではないので自身がないが、SQS関連の例外を補足するには、boto.exception.SQSError(とそのサブクラスの例外)をキャッチする必要があるようだ。上記のコードのように、e.status, e.reasonで一応は「処理がなんだか上手くいかなかった」ことは分かるのだが、(e.status, e.reason)は(400, "BAD REQUEST")のようにHTTPのステータスコードをそのまま返すだけにすぎないので、開発時にはe.bodyといった値を参照して問題を追跡する必要が出てくると思う。

また、botoライブラリはPythonの標準のloggingモジュールを使ってログを出力しているので、適切にログ・ハンドラーを設定すれば問題追跡には役に立つだろう。