InfluxDB - Flux言語を用いた基本的なデータ取り出し

時系列DBであるInfluxDB 2.0から採用されたFlux言語を用いたクエリの書き方を調べたので、メモをここに残しておく。

 

Flux言語の公式ドキュメント

Flux built-in functions | InfluxDB OSS 2.0 Documentation

 

サンプル

from(bucket"bucketname")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"== "tablename")
  |> filter(fn: (r) => r["_field"== "fieldname")
  |> filter(fn: (r) => r["tag1"== "tag1_value")
  |> filter(fn: (r) => r["tag2"== "tag2_value")
  |> aggregateWindow(every: 5sfn: last, createEmpty: false)
  |> keep(columns: ["_time""_field""_value"])

 

 

from(bucket"bucketname")

バケット名 「bucketname」からデータを取り出す。つまり、以降のクエリのターゲットがこのバケットのデータになる。

 

  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)

バケットから取り出すデータの時間範囲を設定する。上記設定はDataExplorerの表示範囲に従って取り出すことを意味している。

例えばstart位置を最後のデータから5分前としたい場合は以下のように書く。

  |> range(start: -5m)

 

  |> filter(fn: (r) => r["_measurement"== "tablename")
  |> filter(fn: (r) => r["_field"== "fieldname")

バケットから取り出すデータのテーブル名(_measurement)を「tablename」に指定し、取り出すデータ(_field)を「fieldname」に設定している。なお、or演算子が使用できるが、複数の_fieldをorで設定しても、_fieldの値ごとに別々のtableが作成されるので、もし統合したtableを作りたい場合はjoinする必要がある。

 

  |> filter(fn: (r) => r["tag1"== "tag1_value")
  |> filter(fn: (r) => r["tag2"== "tag2_value")

タグ tag1の値が「tag1_value」、かつタグ tag2の値が「tag2_value」となっているデータだけを対象としている。

 

  |> aggregateWindow(every: 5sfn: last, createEmpty: false)

5秒毎に最終値(last)でウインドウ処理が行われる。例えば、5秒間の中に1, 3, 4, 2という順番でデータが格納されていた場合、最後のデータである2が採用される。

なお、最終値以外に、平均(mean)、中央値(median)、最大(max)、最小(min)等がある。

 

  |> keep(columns: ["_time""_field""_value"])

"_time"、"_field"、"_value"以外のデータ(例えば"_start"、"_end"等)が出力されるテーブルから除外される。