怎么用flink1.11使sql客户端支持执行sql文件
这篇文章主要讲解了“怎么用flink 1.11使sql客户端支持执行sql文件”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“怎么用flink 1.11使sql客户端支持执行sql文件”吧!
成都创新互联公司主营白河网站建设的网络公司,主营网站建设方案,app软件开发公司,白河h5小程序制作搭建,白河网站营销推广欢迎白河等地区企业咨询
背景
目前flink的sql客户端提供了一种交互式的sql查询服务,用户可以使用sql客户端执行一些sql的批任务或者流任务。但是当我想执行一些sql的定时任务时,flink却没有提供一个合适的方式,所以综合考虑了一下,我决定在sql的客户端基础上给加一个 '-filename (-f)' 参数,就像类似'hive -f abc.sql' 一样,可以执行一批sql任务。
源码修改
目前我只是想通过sql客户端执行一些批任务,再加上flink sql 客户端本身的一些设计,所以目前修改后的sql client 执行sql文件的时候支持 SET,DDL,INSERT INTO SELECT ...等语句,其他比如select暂不支持。
修改后执行的方式为:
/home/flink/bin/sql-client.sh embedded -f flink.sql
CliOptionsParser.java
在这个sql 客户端参数解析类里添加一个选项,用于解析-f参数。
public static final Option OPTION_FILENAME = Option
.builder("f")
.required(false)
.longOpt("filename")
.numberOfArgs(1)
.argName("the path of the sql file")
.desc("SQL from files")
.build();
CliOptions.java
在这里添加一个变量filename
private final String filename;
SqlClient.java
在SqlClient里添加对于-filename的处理
if (options.getUpdateStatement() != null){
// execute update statement
final boolean success = cli.submitUpdate(options.getUpdateStatement());
if (!success) {
throw new SqlClientException("Could not submit given SQL update statement to cluster.");
}
} else if (options.getFilename() != null){
final boolean success = cli.executeFile(options.getFilename());
if (!success) {
throw new SqlClientException("Could not submit given SQL file to cluster.");
}
} else {
cli.open();
}
SqlClient#executeFile
添加具体的执行sql文件的方法,sql文件里的所有sql以分号切分,然后分别判断是什么类型,调用不同的方法来执行。
public boolean executeFile(String filename){
File file = new File(filename);
if (!file.exists()){
printError("the file do not exist");
return false;
} else {
String statement = null;
try {
statement = FileUtils.readFileToString(file);
} catch (IOException e){
printError("read the sql file error , " + e.getMessage());
return false;
}
String[] sqls = statement.split(";");
for (String sql : sqls){
if (sql == null || "".equals(sql.trim())){
continue;
}
final Optional parsedStatement = parseCommand(sql);
if (parsedStatement.isPresent()){
SqlCommandCall cmdCall = parsedStatement.get();
switch (cmdCall.command) {
case SET:
callSet(cmdCall);
break;
................
case INSERT_INTO:
case INSERT_OVERWRITE:
callInsert(cmdCall);
break;
case CREATE_TABLE:
callDdl(cmdCall.operands[0], CliStrings.MESSAGE_TABLE_CREATED);
break;
.....................
throw new SqlClientException("Unsupported command: " + cmdCall.command);
}
}
}
}
return true;
}
感谢各位的阅读,以上就是“怎么用flink 1.11使sql客户端支持执行sql文件”的内容了,经过本文的学习后,相信大家对怎么用flink 1.11使sql客户端支持执行sql文件这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!
分享标题:怎么用flink1.11使sql客户端支持执行sql文件
文章起源:http://pwwzsj.com/article/peepge.html