您现在的位置是:首页 >其他 >Flink sql网站首页其他
Flink sql
简介Flink sql
1.创建表的执行环境
第一种
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> streamOperator = env.addSource(new ClickSource())
.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event event, long l) {
return event.timestamp;
}
}));
//创建表的执行环境
StreamTableEnvironment streamTableEnvironment = StreamTableEnvironment.create(env);
//转化为table
Table table = streamTableEnvironment.fromDataStream(streamOperator);
第二种
EnvironmentSettings settings =EnvironmentSettings.newInstance()
.inStreamingMode()
.useBlinkPlanner()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
2.注册表用于把数据载入输入
String inputDDL = " CREATE TABLE clicktable ( "+
"url STRING ,"+
"user_name STRING," +
"timestamp BIGINT "+
" ) with ("+
" 'connector' = 'filesystem' ,"+
" 'path' = 'file/click.txt', "+
" 'format' = 'csv' )";
TableResult tableResult = tableEnv.executeSql(inputDDL);
3.通过sql查询语句得到一张结果表result
Table result= tableEnv.sqlQuery("select url,count(1) as cnt from clicktable group by url");
4.表输出
String createDDL = " CREATE TABLE output ( "+
"url STRING ,"+
"cnt BIGINT ) with ("+
" 'connector' = 'filesystem' ,"+
" 'path' = 'output', "+
" 'format' = 'csv' )";
tableEnv.executeSql(createDDL);
result.executeInsert("output");

流表转换:
1.表转流
StreamTableEnvironment
todataStream
toChangelogStream
2.流转表

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。





U8W/U8W-Mini使用与常见问题解决
QT多线程的5种用法,通过使用线程解决UI主界面的耗时操作代码,防止界面卡死。...
stm32使用HAL库配置串口中断收发数据(保姆级教程)
分享几个国内免费的ChatGPT镜像网址(亲测有效)
Allegro16.6差分等长设置及走线总结