Flink Temporal tables

Lamaara ·
更新时间:2024-11-13
· 776 次阅读

该文章主要是对Flink官网相关内容进行翻译,原文地址:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.html#temporal-table-functions

Temporal table时态表表示改变的历史记录表上的(参数化)视图的概念,该表返回特定时间点的表的内容

更改表可以是跟踪更改的更改历史表(例如数据库更改日志),也可以是具体化更改的更改维表(例如数据库表)。

对于更改历史记录表,Flink可以跟踪更改,并允许在查询中的特定时间点访问表的内容。在Flink中,这种表由Temporal Table Function(时态表函数)表示。

对于变化的维表,Flink允许在查询中的处理时访问表的内容。在Flink中,这种表为Temporal Table

产生的原因 与更改的历史记录表相关

我们假设有下表RatesHistory。

SELECT * FROM RatesHistory; rowtime currency rate ======= ======== ====== 09:00 US Dollar 102 09:00 Euro 114 09:00 Yen 1 10:45 Euro 116 11:15 Euro 119 11:49 Pounds 108

RatesHistory代表一个增长的日元(汇率为1)仅追加(append-only)货币汇率表。例如,欧元兑日元从09:00到10:45的汇率为114;从10:45到11:15,汇率为116。

假设我们要在10:58的时间输出所有当前汇率,则需要以下SQL查询来计算结果表:

SELECT * FROM RatesHistory AS r WHERE r.rowtime = ( SELECT MAX(rowtime) FROM RatesHistory AS r2 WHERE r2.currency = r.currency AND r2.rowtime <= TIME '10:58');

相关子查询确定相应货币的最大时间小于或等于所需时间。 外部查询列出具有最大时间戳的汇率。

下表显示了这种计算的结果。在我们的示例中,考虑了10:45时欧元的更新,但是在10:58时表的版本中未考虑11:15时欧元的更新值以及新的值。

rowtime currency rate ======= ======== ====== 09:00 US Dollar 102 09:00 Yen 1 10:45 Euro 116

Temporal Table的概念旨在简化此类查询,加快其执行速度,并减少Flink的状态使用率。Temporal Table是仅附加表的参数化视图,它将仅附加( append-only)表的行解释为表的更改日志,并在特定时间点提供该表的版本。 将仅附加表解释为更改日志(changelog)需要指定主键属性和时间戳属性。 主键确定覆盖哪些行,时间戳确定行有效的时间。
在上面的示例中,currency是RatesHistory表的主键,rowtime是timestamp属性。

在Flink中,这由Temporal Table Function表示。

与维度表变化相关

另一方面,某些用例需要连接变化的维表,该表是外部数据库表。假设LatestRates是一个以最新汇率具体化的表格(例如存储在其中)。LatestRates是物化历史RatesHistory。然后,时间10:58的LatestRates表的内容将是:

10:58> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Yen 1 Euro 116

12:00时间的LatestRates表的内容为:

12:00> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Yen 1 Euro 119 Pounds 108

在Flink中,这由时态表表示。

时态表函数

为了访问时态表中的数据,必须传递一个时间属性,该属性确定将要返回的表的版本。Flink使用table functions的SQL语法提供一种表达它的方法。

定义后,Temporal Table Function将使用单个时间参数timeAttribute并返回一组行。该集合包含相对于给定时间属性的所有现有主键的行的最新版本。

假设我们基于RatesHistory表定义了一个时态表函数Rates(timeAttribute),则可以通过以下方式查询该函数:

SELECT * FROM Rates('10:15'); rowtime currency rate ======= ======== ====== 09:00 US Dollar 102 09:00 Euro 114 09:00 Yen 1 SELECT * FROM Rates('11:00'); rowtime currency rate ======= ======== ====== 09:00 US Dollar 102 10:45 Euro 116 09:00 Yen 1

Rates(timeAttribute)的每个查询都返回给定timeAttribute的Rates的状态。

注意:目前,Flink不支持使用常量时间属性参数直接查询时态表函数。目前,时态表函数只能在联接join中使用。上面的示例用于提供有关Rates(timeAttribute)函数返回值的直观信息。

定义时态表函数

以下代码段说明了如何从仅追加表中创建时态表函数。

// Get the stream and table environments. val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) // Provide a static data set of the rates history table. val ratesHistoryData = new mutable.MutableList[(String, Long)] ratesHistoryData.+=(("US Dollar", 102L)) ratesHistoryData.+=(("Euro", 114L)) ratesHistoryData.+=(("Yen", 1L)) ratesHistoryData.+=(("Euro", 116L)) ratesHistoryData.+=(("Euro", 119L)) // Create and register an example table using above data set. // In the real setup, you should replace this with your own table. val ratesHistory = env .fromCollection(ratesHistoryData) .toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime) tEnv.registerTable("RatesHistory", ratesHistory) // Create and register TemporalTableFunction. // Define "r_proctime" as the time attribute and "r_currency" as the primary key. val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency) // <==== (1) tEnv.registerFunction("Rates", rates) // <==== (2)

第(1)行创建了一个rates时态表函数,它允许我们使用表API中的函数rates。

第(2)行在我们的表环境中以名称Rates注册此函数,这允许我们在SQL中使用Rates函数。

时态表

注意:仅blink planner支持此功能。

为了访问时态表中的数据,当前必须使用LookupableTableSource定义一个TableSource。Flink使用FOR SYSTEM_TIME AS OF的SQL语法查询时态表,这在SQL:2011中提出。

假设我们定义了一个称为LatestRates的时态表,我们可以通过以下方式查询此类表:

SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15'; currency rate ======== ====== US Dollar 102 Euro 114 Yen 1 SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00'; currency rate ======== ====== US Dollar 102 Euro 116 Yen 1

注意:目前,Flink不支持以固定时间直接查询时态表。目前,时态表只能在联接中使用。上面的示例用于提供有关时态表LatestRates返回的内容的直觉。

另请参阅有关用于joins for continuous queries的页面,以获取有关如何与时态表联接的更多信息。

定义时态表 // Get the stream and table environments. val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) // Create an HBaseTableSource as a temporal table which implements LookableTableSource // In the real setup, you should replace this with your own table. val rates = new HBaseTableSource(conf, "Rates") rates.setRowKey("currency", String.class) // currency as the primary key rates.addColumn("fam1", "rate", Double.class) // register the temporal table into environment, then we can query it in sql tEnv.registerTableSource("Rates", rates)
作者:Perkinl



flink

需要 登录 后方可回复, 如果你还没有账号请 注册新账号