001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.wf; 019 020 import org.apache.hadoop.conf.Configuration; 021 import org.apache.oozie.util.XmlUtils; 022 import org.jdom.Element; 023 import org.jdom.Namespace; 024 import org.apache.oozie.action.hadoop.MapReduceMain; 025 import org.apache.oozie.client.XOozieClient; 026 import org.apache.oozie.command.CommandException; 027 028 import java.util.ArrayList; 029 import java.util.List; 030 031 public class SubmitPigXCommand extends SubmitHttpXCommand { 032 public SubmitPigXCommand(Configuration conf, String authToken) { 033 super("submitPig", "submitPig", conf, authToken); 034 } 035 036 private Element generatePigSection(Configuration conf, Namespace ns) { 037 Element pig = new Element("pig", ns); 038 Element jt = new Element("job-tracker", ns); 039 jt.addContent(conf.get(XOozieClient.JT)); 040 pig.addContent(jt); 041 Element nn = new Element("name-node", ns); 042 nn.addContent(conf.get(XOozieClient.NN)); 043 pig.addContent(nn); 044 045 List<String> Dargs = new ArrayList<String>(); 046 List<String> otherArgs = new ArrayList<String>(); 047 String[] pigArgs = MapReduceMain.getStrings(conf, XOozieClient.PIG_OPTIONS); 048 for (String arg : pigArgs) { 049 if (arg.startsWith("-D")) { 050 Dargs.add(arg); 051 } 052 else { 053 otherArgs.add(arg); 054 } 055 } 056 057 // configuration section 058 if (Dargs.size() > 0) { 059 Element configuration = generateConfigurationSection(Dargs, ns); 060 pig.addContent(configuration); 061 } 062 063 Element script = new Element("script", ns); 064 script.addContent("dummy.pig"); 065 pig.addContent(script); 066 067 // argument section 068 for (String arg : otherArgs) { 069 Element argument = new Element("argument", ns); 070 argument.addContent(arg); 071 pig.addContent(argument); 072 } 073 074 // file section 075 addFileSection(pig, conf, ns); 076 077 // archive section 078 addArchiveSection(pig, conf, ns); 079 080 return pig; 081 } 082 083 private Element generateConfigurationSection(List<String> Dargs, Namespace ns) { 084 Element configuration = new Element("configuration", ns); 085 for (String arg : Dargs) { 086 String name = null, value = null; 087 int pos = arg.indexOf("="); 088 if (pos == -1) { // "-D<name>" or "-D" only 089 name = arg.substring(2, arg.length()); 090 value = ""; 091 } 092 else { // "-D<name>=<value>" 093 name = arg.substring(2, pos); 094 value = arg.substring(pos + 1, arg.length()); 095 } 096 097 Element property = new Element("property", ns); 098 Element nameElement = new Element("name", ns); 099 nameElement.addContent(name); 100 property.addContent(nameElement); 101 Element valueElement = new Element("value", ns); 102 valueElement.addContent(value); 103 property.addContent(valueElement); 104 configuration.addContent(property); 105 } 106 107 return configuration; 108 } 109 110 /* 111 * (non-Javadoc) 112 * 113 * @see 114 * org.apache.oozie.command.wf.SubmitHttpCommand#getWorkflowXml(org.apache 115 * .hadoop.conf.Configuration) 116 */ 117 @Override 118 protected String getWorkflowXml(Configuration conf) { 119 for (String key : MANDATORY_OOZIE_CONFS) { 120 String value = conf.get(key); 121 if (value == null) { 122 throw new RuntimeException(key + " is not specified"); 123 } 124 } 125 126 Namespace ns = Namespace.getNamespace("uri:oozie:workflow:0.2"); 127 Element root = new Element("workflow-app", ns); 128 root.setAttribute("name", "oozie-pig"); 129 130 Element start = new Element("start", ns); 131 start.setAttribute("to", "pig1"); 132 root.addContent(start); 133 134 Element action = new Element("action", ns); 135 action.setAttribute("name", "pig1"); 136 137 Element pig = generatePigSection(conf, ns); 138 action.addContent(pig); 139 140 Element ok = new Element("ok", ns); 141 ok.setAttribute("to", "end"); 142 action.addContent(ok); 143 144 Element error = new Element("error", ns); 145 error.setAttribute("to", "fail"); 146 action.addContent(error); 147 148 root.addContent(action); 149 150 Element kill = new Element("kill", ns); 151 kill.setAttribute("name", "fail"); 152 Element message = new Element("message", ns); 153 message.addContent("Pig failed, error message[${wf:errorMessage(wf:lastErrorNode())}]"); 154 kill.addContent(message); 155 root.addContent(kill); 156 157 Element end = new Element("end", ns); 158 end.setAttribute("name", "end"); 159 root.addContent(end); 160 161 return XmlUtils.prettyPrint(root).toString(); 162 } 163 164 @Override 165 public String getEntityKey() { 166 return null; 167 } 168 169 @Override 170 protected boolean isLockRequired() { 171 return false; 172 } 173 174 @Override 175 protected void loadState() { 176 177 } 178 179 @Override 180 protected void verifyPrecondition() throws CommandException { 181 182 } 183 }