本文共 6017 字,大约阅读时间需要 20 分钟。
在运行hive cli命令时,调用hadoop jar hive-cli-0.13.1.jar org.apache.hadoop.hive.cli.CliDriver xxxx 命令,而org.apache.hadoop.util.RunJar方法其实是封装了反射调用,最终是调用org.apache.hadoop.hive.cli.CliDriver类的main方法.
CliDriver类是hive的入口类。
首先CliDriver类会通过OptionsProcessor类来parse输入的命令。比如解析-e,-s,-h等参数,然后把对应的值存放到对应的CliSessionState类的属性中,最后应用于CliDriver类中。
比如在executeDriver方法中,根据CliSessionState的属性对命令进行处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | CliDriver cli = new CliDriver(); cli.setHiveVariables(oproc.getHiveVariables()); // 有变量相关的设置时 // use the specified database if specified cli.processSelectDatabase(ss); // Execute -i init files (always in silent mode) cli.processInitFiles(ss); // 指定了-i和加载.hiverc文件 if (ss. execString != null ) { // 指定了 -e时 int cmdProcessStatus = cli.processLine(ss. execString); return cmdProcessStatus; } try { // 指定了-f时 if (ss. fileName != null ) { return cli.processFile(ss.fileName ); } } catch (FileNotFoundException e) { System. err.println( "Could not open input file for reading. (" + e.getMessage() + ")" ); return 3 ; } |
在CliDriver类方法的调用顺序主要有下面几种
1)add xxx/set/compile/reset等命令
1 | main-->run--->executeDriver---->processLine--->processCmd--->processLocalCmd--对应processor类的run方法 |
2)sql命令
1 | main-->run--->executeDriver---->processLine--->processCmd--->processLocalCmd---->Driver类run方法 |
3)shell命令
1 | main-->run--->executeDriver---->processLine--->processCmd |
其中CliDriver类中最重要的方法是processCmd,其定义了不同的命令不同的执行方式:
具体实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 | public int processCmd(String cmd) { CliSessionState ss = (CliSessionState) SessionState.get(); ss.setLastCommand(cmd); // Flush the print stream, so it doesn't include output from the last command ss.err.flush(); String cmd_trimmed = cmd.trim(); String[] tokens = tokenizeCmd(cmd_trimmed); int ret = 0 ; if (cmd_trimmed.toLowerCase().equals( "quit" ) || cmd_trimmed.toLowerCase().equals( "exit" )) { //如果是quit或者是exit,则直接退出jvm ss.close(); System.exit( 0 ); } else if (tokens[ 0 ].equalsIgnoreCase( "source" )) { // 如果是source xxx的情况,则按文件处理(调用processFile方法) String cmd_1 = getFirstCmd(cmd_trimmed, tokens[ 0 ].length()); File sourceFile = new File(cmd_1); if (! sourceFile.isFile()){ console.printError( "File: " + cmd_1 + " is not a file." ); ret = 1 ; } else { try { this .processFile(cmd_1); } catch (IOException e) { console.printError( "Failed processing file " + cmd_1 + " " + e.getLocalizedMessage(), stringifyException(e)); ret = 1 ; } } } else if (cmd_trimmed.startsWith( "!" )) { // 以!开头的,做为shell命令执行,最终调用Runtime.getRuntime().exec(shell_cmd) String shell_cmd = cmd_trimmed.substring( 1 ); shell_cmd = new VariableSubstitution().substitute(ss.getConf(), shell_cmd); //这里也会进行变量替换 // shell_cmd = "/bin/bash -c \'" + shell_cmd + "\'"; try { Process executor = Runtime. getRuntime().exec(shell_cmd); StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null , ss.out); StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null , ss.err); outPrinter.start(); errPrinter.start(); ret = executor.waitFor(); if (ret != 0 ) { console.printError( "Command failed with exit code = " + ret); } } catch (Exception e) { console.printError( "Exception raised from Shell command " + e.getLocalizedMessage(), stringifyException(e)); ret = 1 ; } } else if (tokens[ 0 ].toLowerCase().equals( "list" )) { // list命令时,调用SessionState的list_resource方法 SessionState.ResourceType t; if (tokens. length < 2 || (t = SessionState.find_resource_type(tokens[ 1 ])) == null ) { console.printError( "Usage: list [" + StringUtils.join(SessionState.ResourceType.values(), "|" ) + "] [<value> [<value>]*]" ); ret = 1 ; } else { List<String> filter = null ; if (tokens.length >= 3 ) { System. arraycopy(tokens, 2 , tokens, 0 , tokens.length - 2 ); filter = Arrays. asList(tokens); } Set<String> s = ss.list_resource(t, filter); if (s != null && !s.isEmpty()) { ss.out.println(StringUtils.join(s, "\n" )); } } } else if (ss.isRemoteMode()) { // remote mode -- connecting to remote hive server //如果是远程模式,即hiveserver,调用HiveClient类的execute方法 HiveClient client = ss.getClient(); PrintStream out = ss.out; PrintStream err = ss.err; try { client.execute(cmd_trimmed); List<String> results; do { results = client.fetchN( LINES_TO_FETCH); for (String line : results) { out.println(line); } } while (results.size() == LINES_TO_FETCH); } catch (HiveServerException e) { ret = e.getErrorCode(); if (ret != 0 ) { // OK if ret == 0 -- reached the EOF String errMsg = e.getMessage(); if (errMsg == null ) { errMsg = e.toString(); } ret = e.getErrorCode(); err.println( "[Hive Error]: " + errMsg); } } catch (TException e) { String errMsg = e.getMessage(); if (errMsg == null ) { errMsg = e.toString(); } ret = - 10002 ; err.println( "[Thrift Error]: " + errMsg); } finally { try { client.clean(); } catch (TException e) { String errMsg = e.getMessage(); if (errMsg == null ) { errMsg = e.toString(); } err.println( "[Thrift Error]: Hive server is not cleaned due to thrift exception: " + errMsg); } } } else { // local mode // 剩下的情况都作为local模式,比如add xxx,set xxxx,select/insert xxx/show tables/create table,databse/use xxx等命令。 try { CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf); //会先根据命令获取对应的CommandProcessor 实现类 ret = processLocalCmd(cmd, proc, ss); //并调用processLocalCmd方法 } catch (SQLException e) { console.printError( "Failed processing command " + tokens[ 0 ] + " " + e.getLocalizedMessage(), org.apache.hadoop.util.StringUtils.stringifyException(e)); ret = 1 ; } } return ret; } |
而processLocalCmd方法会将CommandProcessor的实例作为参数传入,并根据不同的CommandProcessor实现类,来调用不同的类的run方法。
1 | int processLocalCmd (String cmd, CommandProcessor proc, CliSessionState ss) |