Flink之SQL

Novia ·
更新时间:2024-11-13
· 502 次阅读

def main(args: Array[String]): Unit = { //sparkcontext val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //时间特性改为eventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val myKafkaConsumer: FlinkKafkaConsumer011[String] = MyKafkaUtil.getConsumer("GMALL_STARTUP") val dstream: DataStream[String] = env.addSource(myKafkaConsumer) val startupLogDstream: DataStream[StartupLog] = dstream.map{ jsonString =>JSON.parseObject(jsonString,classOf[StartupLog]) } //告知watermark 和 eventTime如何提取 val startupLogWithEventTimeDStream: DataStream[StartupLog] = startupLogDstream.assignTimestampsAndWatermarks(newBoundedOutOfOrdernessTimestampExtractor[StartupLog](Time.seconds(0L)) { override def extractTimestamp(element: StartupLog): Long = { element.ts } }).setParallelism(1) //SparkSession val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(env) //把数据流转化成Table val startupTable: Table = tableEnv.fromDataStream(startupLogWithEventTimeDStream ,'mid,'uid,'appid,'area,'os,'ch,'logType,'vs,'logDate,'logHour,'logHourMinute,'ts.rowtime) //通过table api 进行操作 // 每10秒 统计一次各个渠道的个数 table api 解决 //1 groupby 2 要用 window 3 用eventtime来确定开窗时间 val resultTable: Table = startupTable.window(Tumble over 10000.millis on 'ts as'tt).groupBy('ch,'tt ).select( 'ch, 'ch.count) // 通过sql 进行操作 val resultSQLTable : Table = tableEnv.sqlQuery( "select ch ,count(ch) from "+startupTable+" group by ch ,Tumble(ts,interval '10' SECOND )") //把Table转化成数据流 //val appstoreDStream: DataStream[(String, String, Long)] = appstoreTable.toAppendStream[(String,String,Long)] val resultDstream: DataStream[(Boolean, (String, Long))] = resultSQLTable.toRetractStream[(String,Long)] resultDstream.filter(_._1).print() env.execute() }
作者:qq_37969476



SQL flink

需要 登录 后方可回复, 如果你还没有账号请 注册新账号
相关文章