[[SoftwareEngineering]]

*Amazon Web Services [#md611f84]
*Amazon Kinesis Data Streams [#n8898ab2]
+レコードを入力する
アクティブなストリームができたら、データを入力できます。このチュートリアルでは、最もシンプルなコマンド put-record を使用して、"testdata" というテキストを含む単一のデータレコードをストリームに入力します。
 aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata
このコマンドが成功すると、出力は次の例のようになります。
 {
     "ShardId": "shardId-000000000000",
     "SequenceNumber": "49546986683135544286507457936321625675700192471156785154"
 }
これで、ストリームにデータを追加できました。次にストリームからデータを取得する方法を説明します。
&br;
+レコードを取得する
ストリームからデータを取得するには、対象となるシャードのシャードイテレーターを取得する必要があります。シャードイテレーターは、コンシューマー(ここでは get-record コマンド)が読み取るストリームとシャードの位置を表します。次のように get-shard-iterator コマンドを使用します。
 aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo
aws kinesis のコマンドにはその背後に Kinesis Data Streams API があります。示されているパラメータに関心がある場合は、GetShardIterator API のリファレンスのトピックを参照してください。実行に成功すると、出力は次の例のようになります。
 {
     "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg="
 }
ランダムに見える長い文字列がシャードイテレーターです(お客様のシャードイテレーターはこれとは異なります)。このシャードイテレーターをコピーして、次に示す get コマンドに貼り付ける必要があります。シャードイテレーターの有効期間は 300 秒です。これは、シャードイテレーターをコピーして次のコマンドに貼り付けるのに十分な時間です。次のコマンドに貼り付ける前に、シャードイテレーターから改行を削除する必要があることに注意してください。シャードイテレーターが有効ではないことを示すエラーメッセージが表示された場合は、もう一度 get-shard-iterator コマンドを実行します。
&br;
get-records コマンドは、ストリームからデータを取得し、Kinesis Data Streams API の GetRecords 呼び出しとして解決されます。シャードイテレーターは、データレコードの逐次読み取りを開始する、シャード内の位置を指定します。イテレーターが指定するシャードの位置にレコードがない場合、GetRecords は空のリストを返します。シャード内のレコードが含まれる位置に到達するために、複数の呼び出しが必要になる場合があることに注意してください。
&br;
get-records コマンドの例を次に示します。
 aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=
bash など Unix タイプのコマンドプロセッサからこのチュートリアルを実行する場合は、次のように入れ子にしたコマンドを使用して、シャードイテレーターの取得を自動化できます。
 SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator')
 
 aws kinesis get-records --shard-iterator $SHARD_ITERATOR
PowerShell をサポートするシステムからこのチュートリアルを実行する場合、次のようなコマンドを使用してシャードのイテレータの取得を自動化できます。
 aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])
get-records コマンドが正常に終了すると、次の例のように、シャードイテレーターを取得するときに指定したシャードに対応するストリーム内のレコードがリクエストされます。
 {
   "Records":[ {
     "Data":"dGVzdGRhdGE=",
     "PartitionKey":"123”,
     "ApproximateArrivalTimestamp": 1.441215410867E9,
     "SequenceNumber":"49544985256907370027570885864065577703022652638596431874"
   } ],
   "MillisBehindLatest":24000,
   "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is="
 }
上記で get-records を "リクエスト" と説明しましたが、これは、ストリーム内にレコードが存在する場合もゼロ件以上のレコードが返される可能性があり、返されたどのレコードも現在ストリーム内にあるすべてのレコードを表していない可能性があることを意味します。これは完全に正常で、本稼働用のコードではストリームに対し、適切な間隔でレコードに対するポーリングを行います(このポーリング速度は、個々のアプリケーションの設計要件によって異なります)。 
&br;
チュートリアルのこのパートで、レコードについて最初に気付くのは、データが文字化けのように見える点でしょう。送信した testdata のクリアテキストではありません。これは、バイナリデータを送信できるように、put-record では Base64 エンコーディングを使用しているためです。ただし、AWS CLI での Kinesis Data Streams のサポートでは、Base64 デコーディングを提供していません。これは、Base64 デコーディングされた raw バイナリコンテンツを stdout に出力すると、特定のプラットフォームやターミナルで、意図しない動作やセキュリティ上の問題が発生する可能性があるためです。Base64 デコーダ(https://www.base64decode.org/ など)を使用して手動で dGVzdGRhdGE= をデコードすると、これが実際に testdata であることを確認できます。このチュートリアルではこれで問題ありません。なぜなら、実際には、AWS CLI を使用してデータを利用することはまれであり、通常は前に示したように(describe-stream および list-streams)、ストリームの状態をモニタリングしたり、情報を取得したりするために使用されるからです。後のチュートリアルでは、Kinesis クライアントライブラリ (KCL) を使用して、本稼働品質のコンシューマーアプリケーションを構築する方法を示し、Base64 の処理も検討します。KCL の詳細については、「Kinesis Client Library を使用した Amazon Kinesis Data Streams コンシューマーの開発」を参照してください。
&br;
get-records によって、常にストリーム/シャード内のすべてのレコードが返されるわけではありません。このような場合は、最後の結果から NextShardIterator を使用して、次のレコードのセットを取得します。したがって、大量のデータがストリームに入力されていた場合(本稼働アプリケーションでの通常の状況)、毎回 get-records を使用してデータのポーリングを継続できます。ただし、300 秒のシャードイテレーターの有効期間内に次のシャードイテレーターを使用して get-records を呼び出した場合、エラーメッセージが表示され、get-shard-iterator コマンドを使用して最新のシャードイテレーターを取得する必要があります。
&br;
この出力には、MillisBehindLatest も含まれています。これは、GetRecords オペレーションのストリームの末尾からの応答をミリ秒で表した数値であり、コンシューマーの時間の現在の時刻からの遅れを示します。値ゼロはレコード処理が追いついて、現在処理する新しいレコードは存在しないことを示します。このチュートリアルの場合は、作業を進めるのに時間をかけていると、この数値がかなり大きくなる可能性があります。これは問題ではなく、データレコードはストリームに 24 時間留まり、取得されるのを待ちます。この期間は保持期間と呼ばれ、168 時間(7 日)まで設定可能です。
&br;
get-records が成功したときの結果は、現在ストリームにこれ以上レコードが見つからない場合でも常に NextShardIterator です。これは、プロデューサーがどの時点でもストリームにレコードを入力している可能性があることを前提としたポーリングモデルです。独自のポーリングルーチンを記述することもできますが、開発中のコンシューマーアプリケーションで、前に説明した KCL を使用している場合、このポーリングによって処理が実行されます。
&br;
レコードをプルする対象のストリームまたはシャードにそれ以上レコードがなくなるまで get-records を呼び出すと、次の例のような空のレコードの出力が表示されます(出力全体を表示するには、水平にスクロールします)。 
 {
     "Records": [],
     "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk="
 }


トップ   差分 履歴 リロード   一覧 検索 最終更新   ヘルプ   最終更新のRSS