001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.hadoop; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.hadoop.fs.FileSystem; 022 import org.apache.hadoop.fs.FSDataOutputStream; 023 import org.apache.hadoop.fs.Path; 024 import org.apache.oozie.action.ActionExecutorException; 025 import org.apache.oozie.client.XOozieClient; 026 import org.apache.oozie.client.WorkflowAction; 027 import org.apache.oozie.util.XLog; 028 import org.apache.oozie.util.XmlUtils; 029 import org.jdom.Element; 030 import org.jdom.Namespace; 031 import org.jdom.JDOMException; 032 import org.mortbay.log.Log; 033 034 import java.io.IOException; 035 import java.util.List; 036 037 public class PigActionExecutor extends JavaActionExecutor { 038 039 public PigActionExecutor() { 040 super("pig"); 041 } 042 043 protected List<Class> getLauncherClasses() { 044 List<Class> classes = super.getLauncherClasses(); 045 classes.add(LauncherMain.class); 046 classes.add(MapReduceMain.class); 047 classes.add(PigMain.class); 048 return classes; 049 } 050 051 protected String getLauncherMain(Configuration launcherConf, Element actionXml) { 052 return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, PigMain.class.getName()); 053 } 054 055 void injectActionCallback(Context context, Configuration launcherConf) { 056 } 057 058 @Override 059 protected Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) 060 throws ActionExecutorException { 061 super.setupLauncherConf(conf, actionXml, appPath, context); 062 Namespace ns = actionXml.getNamespace(); 063 String script = actionXml.getChild("script", ns).getTextTrim(); 064 String pigName = new Path(script).getName(); 065 String pigScriptContent = context.getProtoActionConf().get(XOozieClient.PIG_SCRIPT); 066 067 Path pigScriptFile = null; 068 if (pigScriptContent != null) { // Create pig script on hdfs if this is 069 // an http submission pig job; 070 FSDataOutputStream dos = null; 071 try { 072 Path actionPath = context.getActionDir(); 073 pigScriptFile = new Path(actionPath, script); 074 FileSystem fs = context.getAppFileSystem(); 075 dos = fs.create(pigScriptFile); 076 dos.writeBytes(pigScriptContent); 077 078 addToCache(conf, actionPath, script + "#" + pigName, false); 079 } 080 catch (Exception ex) { 081 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "FAILED_OPERATION", XLog 082 .format("Not able to write pig script file {0} on hdfs", pigScriptFile), ex); 083 } 084 finally { 085 try { 086 if (dos != null) { 087 dos.close(); 088 } 089 } 090 catch (IOException ex) { 091 XLog.getLog(getClass()).error("Error: " + ex.getMessage()); 092 } 093 } 094 } 095 else { 096 addToCache(conf, appPath, script + "#" + pigName, false); 097 } 098 099 return conf; 100 } 101 102 @SuppressWarnings("unchecked") 103 Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) 104 throws ActionExecutorException { 105 super.setupActionConf(actionConf, context, actionXml, appPath); 106 Namespace ns = actionXml.getNamespace(); 107 108 String script = actionXml.getChild("script", ns).getTextTrim(); 109 String pigName = new Path(script).getName(); 110 111 List<Element> params = (List<Element>) actionXml.getChildren("param", ns); 112 String[] strParams = new String[params.size()]; 113 for (int i = 0; i < params.size(); i++) { 114 strParams[i] = params.get(i).getTextTrim(); 115 } 116 String[] strArgs = null; 117 List<Element> eArgs = actionXml.getChildren("argument", ns); 118 if (eArgs != null && eArgs.size() > 0) { 119 strArgs = new String[eArgs.size()]; 120 for (int i = 0; i < eArgs.size(); i++) { 121 strArgs[i] = eArgs.get(i).getTextTrim(); 122 } 123 } 124 PigMain.setPigScript(actionConf, pigName, strParams, strArgs); 125 return actionConf; 126 } 127 128 protected boolean getCaptureOutput(WorkflowAction action) throws JDOMException { 129 return true; 130 } 131 132 }