Published on

aws timestream の使い方

Authors
  • avatar
    Name
    Kikusan
    Twitter

AWS Timestreamを使ってみた。
簡単な時系列データならDynamoDBより安いしSQLが使えるのでいい感じ。

Document

公式は https://docs.aws.amazon.com/timestream/latest/developerguide/what-is-timestream.html

概要

TimestreamはAWSコンソールからデータベース、テーブルを作成して使用する。
DynamoDBみたいにパーティションとか難しいことは考えなくて良さそう。
テーブルを作る時に、インサートしたデータをアクセスが速いメモリに持つ期間、データの保持期間だけは指定がいるので注意。
Query editerの画面からクエリを発行できる。

sdk

データを操作するにはsdkを使用する。
documentにサンプルがあるので、割と簡単。

pythonではboto3で以下の例みたいにクライアントを生成して使用する。

write_client = session.client('timestream-write',
                              config=Config(read_timeout=20,
                                            max_pool_connections=5000,
                                            retries={'max_attempts': 10}))

query_client = session.client('timestream-query')

insert

参照 : https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.write.html

ざっと理解しづらかった部分をコメントしておく。

def write_records(self):
        print("Writing records")
        current_time = self._current_milli_time()

        # dimensionはメタデータのこと。Nameがカラム名、Valueがレコードの値になる
        dimensions = [
            {'Name': 'region', 'Value': 'us-east-1'},
            {'Name': 'az', 'Value': 'az1'},
            {'Name': 'hostname', 'Value': 'host1'}
        ]

        """
        レコード1 指定した値がそのまま1レコードになる。
          Dimensions: ↑のリストを指定
          MeasureName: 測定値のタイトル
          MeasureValue: 測定値
          MeasureValueType: 値の型
          Time: 測定日時
        """
        cpu_utilization = {
            'Dimensions': dimensions,
            'MeasureName': 'cpu_utilization',
            'MeasureValue': '13.5',
            'MeasureValueType': 'DOUBLE',
            'Time': current_time
        }

        # レコード2
        memory_utilization = {
            'Dimensions': dimensions,
            'MeasureName': 'memory_utilization',
            'MeasureValue': '40',
            'MeasureValueType': 'DOUBLE',
            'Time': current_time
        }

        # 挿入したいレコードをリストにまとめる
        records = [cpu_utilization, memory_utilization]

        try:
            # クライアントで挿入。DB名、テーブル名、レコードを指定する。
            result = self.client.write_records(DatabaseName=Constant.DATABASE_NAME, TableName=Constant.TABLE_NAME,
                                               Records=records, CommonAttributes={})
            print("WriteRecords Status: [%s]" % result['ResponseMetadata']['HTTPStatusCode'])
        except self.client.exceptions.RejectedRecordsException as err:
            print("RejectedRecords: ", err)
            for rr in err.response["RejectedRecords"]:
                print("Rejected Index " + str(rr["RecordIndex"]) + ": " + rr["Reason"])
            print("Other records were written successfully. ")
        except Exception as err:
            print("Error:", err)

select

参照 : https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html

こちらもざっとメモ

"""
SELECT文を指定して実行
f'''SELECT * FROM {STREAM_DBNAME}.{STREAM_TABLENAME} ORDER BY time DESC''' とか
"""
def run_query(self, query_string):
        try:
            # クエリ実行
            # paginator : https://boto3.amazonaws.com/v1/documentation/api/latest/guide/paginators.html
            #             query_client.get_paginator('query') で取得できる
            page_iterator = self.paginator.paginate(QueryString=query_string)
            for page in page_iterator:
                # ページごとに処理 ここをListとかに詰めれば全ページのデータを一気見できる
                self.__parse_query_result(page)
        except Exception as err:
            print("Exception while running query:", err)
            traceback.print_exc(file=sys.stderr)

"""ページを処理"""
def __parse_query_result(self, query_result):
        query_status = query_result["QueryStatus"]

        progress_percentage = query_status["ProgressPercentage"]
        print("Query progress so far: " + str(progress_percentage) + "%")
        
        bytes_scanned = query_status["CumulativeBytesScanned"] / self.ONE_GB_IN_BYTES
        print("Bytes Scanned so far: " + str(bytes_scanned) + " GB")
                
        bytes_metered = query_status["CumulativeBytesMetered"] / self.ONE_GB_IN_BYTES
        print("Bytes Metered so far: " + str(bytes_metered) + " GB")
        
        column_info = query_result['ColumnInfo']

        print("Metadata: %s" % column_info)
        print("Data: ")
        for row in query_result['Rows']:
            # 行ごとに処理 ここをListとかに詰めればページの全レコードが一気見できる
            print(self.__parse_row(column_info, row))        


    """行を処理"""
    def __parse_row(self, column_info, row):
        data = row['Data']
        row_output = []
        # 行データを詰め直す
        for j in range(len(data)):
            info = column_info[j]
            datum = data[j]
            # 各カラムを処理
            row_output.append(self.__parse_datum(info, datum))

        return "{%s}" % str(row_output)

    """1レコードのカラムを処理"""
    def __parse_datum(self, info, datum):
        if datum.get('NullValue', False):
            return ("%s=NULL" % info['Name'])

        column_type = info['Type']

        # If the column is of TimeSeries Type
        if 'TimeSeriesMeasureValueColumnInfo' in column_type:
            return self.__parse_time_series(info, datum)

        # If the column is of Array Type
        elif 'ArrayColumnInfo' in column_type:
            array_values = datum['ArrayValue']
            return ("%s=%s" % (info['Name'], self.__parse_array(info['Type']['ArrayColumnInfo'], array_values)))

        # If the column is of Row Type
        elif 'RowColumnInfo' in column_type:
            row_coulmn_info = info['Type']['RowColumnInfo']
            row_values = datum['RowValue']
            return self.__parse_row(row_coulmn_info, row_values)

        #If the column is of Scalar Type
        else:
            return self.__parse_column_name(info) + datum['ScalarValue']


    def __parse_time_series(self, info, datum):
        time_series_output = []
        for data_point in datum['TimeSeriesValue']:
            time_series_output.append("{time=%s, value=%s}"
                                      % (data_point['Time'],
                                         self.__parse_datum(info['Type']['TimeSeriesMeasureValueColumnInfo'],
                                                            data_point['Value'])))
        return "[%s]" % str(time_series_output)

    def __parse_column_name(self, info):
        if 'Name' in info:
            return info['Name'] + "="
        else:
            return ""

    def __parse_array(self, array_column_info, array_values):
        array_output = []
        for datum in array_values:
            array_output.append(self.__parse_datum(array_column_info, datum))

        return "[%s]" % str(array_output)