- Published on
aws timestream の使い方
- Authors
- Name
- Kikusan
AWS Timestreamを使ってみた。
簡単な時系列データならDynamoDBより安いしSQLが使えるのでいい感じ。
Document
公式は https://docs.aws.amazon.com/timestream/latest/developerguide/what-is-timestream.html
概要
TimestreamはAWSコンソールからデータベース、テーブルを作成して使用する。
DynamoDBみたいにパーティションとか難しいことは考えなくて良さそう。
テーブルを作る時に、インサートしたデータをアクセスが速いメモリに持つ期間、データの保持期間だけは指定がいるので注意。
Query editerの画面からクエリを発行できる。
sdk
データを操作するにはsdkを使用する。
documentにサンプルがあるので、割と簡単。
pythonではboto3で以下の例みたいにクライアントを生成して使用する。
write_client = session.client('timestream-write',
config=Config(read_timeout=20,
max_pool_connections=5000,
retries={'max_attempts': 10}))
query_client = session.client('timestream-query')
insert
参照 : https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.write.html
ざっと理解しづらかった部分をコメントしておく。
def write_records(self):
print("Writing records")
current_time = self._current_milli_time()
# dimensionはメタデータのこと。Nameがカラム名、Valueがレコードの値になる
dimensions = [
{'Name': 'region', 'Value': 'us-east-1'},
{'Name': 'az', 'Value': 'az1'},
{'Name': 'hostname', 'Value': 'host1'}
]
"""
レコード1 指定した値がそのまま1レコードになる。
Dimensions: ↑のリストを指定
MeasureName: 測定値のタイトル
MeasureValue: 測定値
MeasureValueType: 値の型
Time: 測定日時
"""
cpu_utilization = {
'Dimensions': dimensions,
'MeasureName': 'cpu_utilization',
'MeasureValue': '13.5',
'MeasureValueType': 'DOUBLE',
'Time': current_time
}
# レコード2
memory_utilization = {
'Dimensions': dimensions,
'MeasureName': 'memory_utilization',
'MeasureValue': '40',
'MeasureValueType': 'DOUBLE',
'Time': current_time
}
# 挿入したいレコードをリストにまとめる
records = [cpu_utilization, memory_utilization]
try:
# クライアントで挿入。DB名、テーブル名、レコードを指定する。
result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME,
Records=records, CommonAttributes={})
print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode'])
except self.client.exceptions.RejectedRecordsException as err:
print("RejectedRecords: ", err)
for rr in err.response["RejectedRecords"]:
print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
print("Other records were written successfully. ")
except Exception as err:
print("Error:", err)
select
参照 : https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html
こちらもざっとメモ
"""
SELECT文を指定して実行
f'''SELECT * FROM {STREAM_DBNAME}.{STREAM_TABLENAME} ORDER BY time DESC''' とか
"""
def run_query(self, query_string):
try:
# クエリ実行
# paginator : https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html
# query_client.get_paginator('query') で取得できる
page_iterator = self.paginator.paginate(QueryString=query_string)
for page in page_iterator:
# ページごとに処理 ここをListとかに詰めれば全ページのデータを一気見できる
self.__parse_query_result(page)
except Exception as err:
print("Exception while running query:", err)
traceback.print_exc(file=sys.stderr)
"""ページを処理"""
def __parse_query_result(self, query_result):
query_status = query_result["QueryStatus"]
progress_percentage = query_status["ProgressPercentage"]
print("Query progress so far: " + str(progress_percentage) + "%")
bytes_scanned = query_status["CumulativeBytesScanned"] / self.ONE_GB_IN_BYTES
print("Bytes Scanned so far: " + str(bytes_scanned) + " GB")
bytes_metered = query_status["CumulativeBytesMetered"] / self.ONE_GB_IN_BYTES
print("Bytes Metered so far: " + str(bytes_metered) + " GB")
column_info = query_result['ColumnInfo']
print("Metadata: %s" % column_info)
print("Data: ")
for row in query_result['Rows']:
# 行ごとに処理 ここをListとかに詰めればページの全レコードが一気見できる
print(self.__parse_row(column_info, row))
"""行を処理"""
def __parse_row(self, column_info, row):
data = row['Data']
row_output = []
# 行データを詰め直す
for j in range(len(data)):
info = column_info[j]
datum = data[j]
# 各カラムを処理
row_output.append(self.__parse_datum(info, datum))
return "{%s}" % str(row_output)
"""1レコードのカラムを処理"""
def __parse_datum(self, info, datum):
if datum.get('NullValue', False):
return ("%s=NULL" % info['Name'])
column_type = info['Type']
# If the column is of TimeSeries Type
if 'TimeSeriesMeasureValueColumnInfo' in column_type:
return self.__parse_time_series(info, datum)
# If the column is of Array Type
elif 'ArrayColumnInfo' in column_type:
array_values = datum['ArrayValue']
return ("%s=%s" % (info['Name'], self.__parse_array(info['Type']['ArrayColumnInfo'], array_values)))
# If the column is of Row Type
elif 'RowColumnInfo' in column_type:
row_coulmn_info = info['Type']['RowColumnInfo']
row_values = datum['RowValue']
return self.__parse_row(row_coulmn_info, row_values)
#If the column is of Scalar Type
else:
return self.__parse_column_name(info) + datum['ScalarValue']
def __parse_time_series(self, info, datum):
time_series_output = []
for data_point in datum['TimeSeriesValue']:
time_series_output.append("{time=%s, value=%s}"
% (data_point['Time'],
self.__parse_datum(info['Type']['TimeSeriesMeasureValueColumnInfo'],
data_point['Value'])))
return "[%s]" % str(time_series_output)
def __parse_column_name(self, info):
if 'Name' in info:
return info['Name'] + "="
else:
return ""
def __parse_array(self, array_column_info, array_values):
array_output = []
for datum in array_values:
array_output.append(self.__parse_datum(array_column_info, datum))
return "[%s]" % str(array_output)