博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
hive执行流程(1)-hive入口CliDriver类分析
阅读量:6090 次
发布时间:2019-06-20

本文共 6017 字,大约阅读时间需要 20 分钟。

在运行hive cli命令时,调用hadoop jar hive-cli-0.13.1.jar org.apache.hadoop.hive.cli.CliDriver xxxx 命令,而org.apache.hadoop.util.RunJar方法其实是封装了反射调用,最终是调用org.apache.hadoop.hive.cli.CliDriver类的main方法.

CliDriver类是hive的入口类。

  首先CliDriver类会通过OptionsProcessor类来parse输入的命令。比如解析-e,-s,-h等参数,然后把对应的值存放到对应的CliSessionState类的属性中,最后应用于CliDriver类中。

比如在executeDriver方法中,根据CliSessionState的属性对命令进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CliDriver cli = 
new 
CliDriver();
    
cli.setHiveVariables(oproc.getHiveVariables());  
// 有变量相关的设置时
  
    
// use the specified database if specified
    
cli.processSelectDatabase(ss); 
    
// Execute -i init files (always in silent mode)
    
cli.processInitFiles(ss); 
// 指定了-i和加载.hiverc文件
    
if 
(ss. execString != 
null 
) {  
// 指定了 -e时
      
int 
cmdProcessStatus = cli.processLine(ss. execString);  
      
return 
cmdProcessStatus;
    
}
    
try 
{   
// 指定了-f时
      
if 
(ss. fileName != 
null
) {
        
return 
cli.processFile(ss.fileName );
      
}
    
catch 
(FileNotFoundException e) {
      
System. err.println(
"Could not open input file for reading. (" 
+ e.getMessage() + 
")" 
);
      
return 
3
;
    
}

在CliDriver类方法的调用顺序主要有下面几种

1)add xxx/set/compile/reset等命令

1
main-->run--->executeDriver---->processLine--->processCmd--->processLocalCmd--对应processor类的run方法

2)sql命令

1
main-->run--->executeDriver---->processLine--->processCmd--->processLocalCmd---->Driver类run方法

3)shell命令

1
main-->run--->executeDriver---->processLine--->processCmd

其中CliDriver类中最重要的方法是processCmd,其定义了不同的命令不同的执行方式:

具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
public 
int 
processCmd(String cmd) {
    
CliSessionState ss = (CliSessionState) SessionState.get();
    
ss.setLastCommand(cmd);
    
// Flush the print stream, so it doesn't include output from the last command
    
ss.err.flush();
    
String cmd_trimmed = cmd.trim();
    
String[] tokens = tokenizeCmd(cmd_trimmed);
    
int 
ret = 
0
;
    
if 
(cmd_trimmed.toLowerCase().equals( 
"quit"
) || cmd_trimmed.toLowerCase().equals(
"exit" 
)) { 
//如果是quit或者是exit,则直接退出jvm
      
ss.close();
      
System.exit(
0
);
    
else 
if 
(tokens[
0
].equalsIgnoreCase(
"source" 
)) {  
// 如果是source xxx的情况,则按文件处理(调用processFile方法)
      
String cmd_1 = getFirstCmd(cmd_trimmed, tokens[
0
].length());
      
File sourceFile = 
new 
File(cmd_1);
      
if 
(! sourceFile.isFile()){
        
console.printError( 
"File: "
+ cmd_1 + 
" is not a file." 
);
        
ret = 
1
;
      
else 
{
        
try 
{
          
this
.processFile(cmd_1);
        
catch 
(IOException e) {
          
console.printError( 
"Failed processing file "
+ cmd_1 +
" " 
+ e.getLocalizedMessage(),
            
stringifyException(e));
          
ret = 
1
;
        
}
      
}
    
else 
if 
(cmd_trimmed.startsWith(
"!" 
)) {  
// 以!开头的,做为shell命令执行,最终调用Runtime.getRuntime().exec(shell_cmd)
      
String shell_cmd = cmd_trimmed.substring(
1
);
      
shell_cmd = 
new 
VariableSubstitution().substitute(ss.getConf(), shell_cmd);  
//这里也会进行变量替换
      
// shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'";
      
try 
{
        
Process executor = Runtime. getRuntime().exec(shell_cmd);
        
StreamPrinter outPrinter = 
new 
StreamPrinter(executor.getInputStream(), 
null
, ss.out);
        
StreamPrinter errPrinter = 
new 
StreamPrinter(executor.getErrorStream(), 
null
, ss.err);
        
outPrinter.start();
        
errPrinter.start();
        
ret = executor.waitFor();
        
if 
(ret != 
0
) {
          
console.printError( 
"Command failed with exit code = " 
+ ret);
        
}
      
catch 
(Exception e) {
        
console.printError( 
"Exception raised from Shell command " 
+ e.getLocalizedMessage(),
            
stringifyException(e));
        
ret = 
1
;
      
}
    
else 
if 
(tokens[
0
].toLowerCase().equals(
"list" 
)) { 
// list命令时,调用SessionState的list_resource方法
      
SessionState.ResourceType t;
      
if 
(tokens. length < 
2 
|| (t = SessionState.find_resource_type(tokens[
1
])) == 
null
) {
        
console.printError( 
"Usage: list ["
            
+ StringUtils.join(SessionState.ResourceType.values(), 
"|"
) + 
"] [<value> [<value>]*]"
);
        
ret = 
1
;
      
else 
{
        
List<String> filter = 
null
;
        
if 
(tokens.length >= 
3
) {
          
System. arraycopy(tokens, 
2
, tokens, 
0
, tokens.length - 
2
);
          
filter = Arrays. asList(tokens);
        
}
        
Set<String> s = ss.list_resource(t, filter);
        
if 
(s != 
null 
&& !s.isEmpty()) {
          
ss.out.println(StringUtils.join(s, 
"\n"
));
        
}
      
}
    
else 
if 
(ss.isRemoteMode()) { 
// remote mode -- connecting to remote hive server   //如果是远程模式,即hiveserver,调用HiveClient类的execute方法
      
HiveClient client = ss.getClient();
      
PrintStream out = ss.out;
      
PrintStream err = ss.err;
      
try 
{
        
client.execute(cmd_trimmed);
        
List<String> results;
        
do 
{
          
results = client.fetchN( LINES_TO_FETCH);
          
for 
(String line : results) {
            
out.println(line);
          
}
        
while 
(results.size() == LINES_TO_FETCH);
      
catch 
(HiveServerException e) {
        
ret = e.getErrorCode();
        
if 
(ret != 
0
) { 
// OK if ret == 0 -- reached the EOF
          
String errMsg = e.getMessage();
          
if 
(errMsg == 
null
) {
            
errMsg = e.toString();
          
}
          
ret = e.getErrorCode();
          
err.println( 
"[Hive Error]: " 
+ errMsg);
        
}
      
catch 
(TException e) {
        
String errMsg = e.getMessage();
        
if 
(errMsg == 
null
) {
          
errMsg = e.toString();
        
}
        
ret = -
10002
;
        
err.println( 
"[Thrift Error]: " 
+ errMsg);
      
finally 
{
        
try 
{
          
client.clean();
        
catch 
(TException e) {
          
String errMsg = e.getMessage();
          
if 
(errMsg == 
null
) {
            
errMsg = e.toString();
          
}
          
err.println( 
"[Thrift Error]: Hive server is not cleaned due to thrift exception: "
              
+ errMsg);
        
}
      
}
    
else 
// local mode   // 剩下的情况都作为local模式,比如add xxx,set xxxx,select/insert xxx/show tables/create table,databse/use xxx等命令。
      
try 
{
        
CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);  
//会先根据命令获取对应的CommandProcessor 实现类
        
ret = processLocalCmd(cmd, proc, ss);  
//并调用processLocalCmd方法
      
catch 
(SQLException e) {
        
console.printError( 
"Failed processing command " 
+ tokens[
0
] + 
" " 
+ e.getLocalizedMessage(),
          
org.apache.hadoop.util.StringUtils.stringifyException(e));
        
ret = 
1
;
      
}
    
}
    
return 
ret;
  
}

而processLocalCmd方法会将CommandProcessor的实例作为参数传入,并根据不同的CommandProcessor实现类,来调用不同的类的run方法。

1
  
int 
processLocalCmd (String cmd, CommandProcessor proc, CliSessionState ss)
本文转自菜菜光 51CTO博客,原文链接:http://blog.51cto.com/caiguangguang/1542275,如需转载请自行联系原作者
你可能感兴趣的文章
详细的文档(吐槽)
查看>>
DEVEXPRESS 随记
查看>>
Ember.js 入门指南——{{action}} 助手
查看>>
VMware下安装QT Creator
查看>>
Linux时间同步设置
查看>>
Measure Graphics Performance
查看>>
RetrunMoreRow
查看>>
Redis学习笔记(3)-Hash
查看>>
Git使用的常用命令
查看>>
微软职位内部推荐-Senior Software Engineer
查看>>
多线程开发
查看>>
成功搞定一个通用的Extjs增删改查模块
查看>>
暴力屏蔽80访问失败的用户
查看>>
营销型后台系统开发应该考虑到的
查看>>
vue-admin-template 切换回中文
查看>>
java模式之模板模式——抽象类
查看>>
[ACM] hdu 1251 统计难题 (字典树)
查看>>
调试json
查看>>
C - Surprising Strings
查看>>
hibernate里的generator中class =value介绍
查看>>