001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.action.oozie; 020 021import java.io.IOException; 022import java.io.StringReader; 023import java.util.HashSet; 024import java.util.Set; 025 026import org.apache.hadoop.conf.Configuration; 027import org.apache.oozie.DagEngine; 028import org.apache.oozie.LocalOozieClient; 029import org.apache.oozie.WorkflowJobBean; 030import org.apache.oozie.action.ActionExecutor; 031import org.apache.oozie.action.ActionExecutorException; 032import org.apache.oozie.action.hadoop.OozieJobInfo; 033import org.apache.oozie.client.OozieClient; 034import org.apache.oozie.client.OozieClientException; 035import org.apache.oozie.client.WorkflowAction; 036import org.apache.oozie.client.WorkflowJob; 037import org.apache.oozie.command.CommandException; 038import org.apache.oozie.service.ConfigurationService; 039import org.apache.oozie.service.DagEngineService; 040import org.apache.oozie.service.Services; 041import org.apache.oozie.util.ConfigUtils; 042import org.apache.oozie.util.JobUtils; 043import org.apache.oozie.util.PropertiesUtils; 044import org.apache.oozie.util.XConfiguration; 045import org.apache.oozie.util.XLog; 046import org.apache.oozie.util.XmlUtils; 047import org.jdom.Element; 048import org.jdom.Namespace; 049 050public class SubWorkflowActionExecutor extends ActionExecutor { 051 public static final String ACTION_TYPE = "sub-workflow"; 052 public static final String LOCAL = "local"; 053 public static final String PARENT_ID = "oozie.wf.parent.id"; 054 public static final String SUPER_PARENT_ID = "oozie.wf.superparent.id"; 055 public static final String SUBWORKFLOW_MAX_DEPTH = "oozie.action.subworkflow.max.depth"; 056 public static final String SUBWORKFLOW_DEPTH = "oozie.action.subworkflow.depth"; 057 public static final String SUBWORKFLOW_RERUN = "oozie.action.subworkflow.rerun"; 058 059 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 060 public XLog LOG = XLog.getLog(getClass()); 061 062 063 static { 064 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 065 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 066 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 067 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS}; 068 069 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER}; 070 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 071 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 072 } 073 074 protected SubWorkflowActionExecutor() { 075 super(ACTION_TYPE); 076 } 077 078 public void initActionType() { 079 super.initActionType(); 080 } 081 082 protected OozieClient getWorkflowClient(Context context, String oozieUri) { 083 OozieClient oozieClient; 084 if (oozieUri.equals(LOCAL)) { 085 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 086 String user = workflow.getUser(); 087 String group = workflow.getGroup(); 088 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); 089 oozieClient = new LocalOozieClient(dagEngine); 090 } 091 else { 092 // TODO we need to add authToken to the WC for the remote case 093 oozieClient = new OozieClient(oozieUri); 094 } 095 return oozieClient; 096 } 097 098 protected void injectInline(Element eConf, Configuration subWorkflowConf) throws IOException, 099 ActionExecutorException { 100 if (eConf != null) { 101 String strConf = XmlUtils.prettyPrint(eConf).toString(); 102 Configuration conf = new XConfiguration(new StringReader(strConf)); 103 try { 104 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_DEFAULT_PROPERTIES); 105 } 106 catch (CommandException ex) { 107 throw convertException(ex); 108 } 109 XConfiguration.copy(conf, subWorkflowConf); 110 } 111 } 112 113 @SuppressWarnings("unchecked") 114 protected void injectCallback(Context context, Configuration conf) { 115 String callback = context.getCallbackUrl("$status"); 116 if (conf.get(OozieClient.WORKFLOW_NOTIFICATION_URL) != null) { 117 XLog.getLog(getClass()) 118 .warn("Sub-Workflow configuration has a custom job end notification URI, overriding"); 119 } 120 conf.set(OozieClient.WORKFLOW_NOTIFICATION_URL, callback); 121 } 122 123 protected void injectRecovery(String externalId, Configuration conf) { 124 conf.set(OozieClient.EXTERNAL_ID, externalId); 125 } 126 127 protected void injectParent(String parentId, Configuration conf) { 128 conf.set(PARENT_ID, parentId); 129 } 130 131 protected void injectSuperParent(WorkflowJob parentWorkflow, Configuration parentConf, Configuration conf) { 132 String superParentId = parentConf.get(SUPER_PARENT_ID); 133 if (superParentId == null) { 134 // This is a sub-workflow at depth 1 135 superParentId = parentWorkflow.getParentId(); 136 137 // If the parent workflow is not submitted through a coordinator then the parentId will be the super parent id. 138 if (superParentId == null) { 139 superParentId = parentWorkflow.getId(); 140 } 141 conf.set(SUPER_PARENT_ID, superParentId); 142 } else { 143 // Sub-workflow at depth 2 or more. 144 conf.set(SUPER_PARENT_ID, superParentId); 145 } 146 } 147 148 protected void verifyAndInjectSubworkflowDepth(Configuration parentConf, Configuration conf) throws ActionExecutorException { 149 int depth = parentConf.getInt(SUBWORKFLOW_DEPTH, 0); 150 int maxDepth = ConfigurationService.getInt(SUBWORKFLOW_MAX_DEPTH); 151 if (depth >= maxDepth) { 152 throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "SUBWF001", 153 "Depth [{0}] cannot exceed maximum subworkflow depth [{1}]", (depth + 1), maxDepth); 154 } 155 conf.setInt(SUBWORKFLOW_DEPTH, depth + 1); 156 } 157 158 protected String checkIfRunning(OozieClient oozieClient, String extId) throws OozieClientException { 159 String jobId = oozieClient.getJobId(extId); 160 if (jobId.equals("")) { 161 return null; 162 } 163 return jobId; 164 } 165 166 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 167 try { 168 Element eConf = XmlUtils.parseXml(action.getConf()); 169 Namespace ns = eConf.getNamespace(); 170 Element e = eConf.getChild("oozie", ns); 171 String oozieUri = (e == null) ? LOCAL : e.getTextTrim(); 172 OozieClient oozieClient = getWorkflowClient(context, oozieUri); 173 String subWorkflowId = null; 174 String extId = context.getRecoveryId(); 175 String runningJobId = null; 176 if (extId != null) { 177 runningJobId = checkIfRunning(oozieClient, extId); 178 } 179 if (runningJobId == null) { 180 String appPath = eConf.getChild("app-path", ns).getTextTrim(); 181 182 XConfiguration subWorkflowConf = new XConfiguration(); 183 184 Configuration parentConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 185 186 if (eConf.getChild(("propagate-configuration"), ns) != null) { 187 XConfiguration.copy(parentConf, subWorkflowConf); 188 } 189 190 // Propagate coordinator and bundle info to subworkflow 191 if (OozieJobInfo.isJobInfoEnabled()) { 192 if (parentConf.get(OozieJobInfo.COORD_ID) != null) { 193 subWorkflowConf.set(OozieJobInfo.COORD_ID, parentConf.get(OozieJobInfo.COORD_ID)); 194 subWorkflowConf.set(OozieJobInfo.COORD_NAME, parentConf.get(OozieJobInfo.COORD_NAME)); 195 subWorkflowConf.set(OozieJobInfo.COORD_NOMINAL_TIME, parentConf.get(OozieJobInfo.COORD_NOMINAL_TIME)); 196 } 197 if (parentConf.get(OozieJobInfo.BUNDLE_ID) != null) { 198 subWorkflowConf.set(OozieJobInfo.BUNDLE_ID, parentConf.get(OozieJobInfo.BUNDLE_ID)); 199 subWorkflowConf.set(OozieJobInfo.BUNDLE_NAME, parentConf.get(OozieJobInfo.BUNDLE_NAME)); 200 } 201 } 202 203 // the proto has the necessary credentials 204 Configuration protoActionConf = context.getProtoActionConf(); 205 XConfiguration.copy(protoActionConf, subWorkflowConf); 206 subWorkflowConf.set(OozieClient.APP_PATH, appPath); 207 String group = ConfigUtils.getWithDeprecatedCheck(parentConf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 208 if(group != null) { 209 subWorkflowConf.set(OozieClient.GROUP_NAME, group); 210 } 211 212 injectInline(eConf.getChild("configuration", ns), subWorkflowConf); 213 injectCallback(context, subWorkflowConf); 214 injectRecovery(extId, subWorkflowConf); 215 injectParent(context.getWorkflow().getId(), subWorkflowConf); 216 injectSuperParent(context.getWorkflow(), parentConf, subWorkflowConf); 217 verifyAndInjectSubworkflowDepth(parentConf, subWorkflowConf); 218 219 //TODO: this has to be refactored later to be done in a single place for REST calls and this 220 JobUtils.normalizeAppPath(context.getWorkflow().getUser(), context.getWorkflow().getGroup(), 221 subWorkflowConf); 222 223 subWorkflowConf.set(OOZIE_ACTION_YARN_TAG, getActionYarnTag(parentConf, context.getWorkflow(), action)); 224 225 // if the rerun failed node option is provided during the time of rerun command, old subworkflow will 226 // rerun again. 227 if(action.getExternalId() != null && parentConf.getBoolean(OozieClient.RERUN_FAIL_NODES, false)) { 228 subWorkflowConf.setBoolean(SUBWORKFLOW_RERUN, true); 229 oozieClient.reRun(action.getExternalId(), subWorkflowConf.toProperties()); 230 subWorkflowId = action.getExternalId(); 231 } else { 232 subWorkflowId = oozieClient.run(subWorkflowConf.toProperties()); 233 } 234 } 235 else { 236 subWorkflowId = runningJobId; 237 } 238 WorkflowJob workflow = oozieClient.getJobInfo(subWorkflowId); 239 String consoleUrl = workflow.getConsoleUrl(); 240 context.setStartData(subWorkflowId, oozieUri, consoleUrl); 241 if (runningJobId != null) { 242 check(context, action); 243 } 244 } 245 catch (Exception ex) { 246 LOG.error(ex); 247 throw convertException(ex); 248 } 249 } 250 251 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 252 try { 253 String externalStatus = action.getExternalStatus(); 254 WorkflowAction.Status status = externalStatus.equals("SUCCEEDED") ? WorkflowAction.Status.OK 255 : WorkflowAction.Status.ERROR; 256 context.setEndData(status, getActionSignal(status)); 257 } 258 catch (Exception ex) { 259 throw convertException(ex); 260 } 261 } 262 263 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 264 try { 265 String subWorkflowId = action.getExternalId(); 266 String oozieUri = action.getTrackerUri(); 267 OozieClient oozieClient = getWorkflowClient(context, oozieUri); 268 WorkflowJob subWorkflow = oozieClient.getJobInfo(subWorkflowId); 269 WorkflowJob.Status status = subWorkflow.getStatus(); 270 switch (status) { 271 case FAILED: 272 case KILLED: 273 case SUCCEEDED: 274 context.setExecutionData(status.toString(), null); 275 break; 276 default: 277 context.setExternalStatus(status.toString()); 278 break; 279 } 280 } 281 catch (Exception ex) { 282 throw convertException(ex); 283 } 284 } 285 286 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 287 try { 288 String subWorkflowId = action.getExternalId(); 289 String oozieUri = action.getTrackerUri(); 290 if (subWorkflowId != null && oozieUri != null) { 291 OozieClient oozieClient = getWorkflowClient(context, oozieUri); 292 oozieClient.kill(subWorkflowId); 293 } 294 context.setEndData(WorkflowAction.Status.KILLED, getActionSignal(WorkflowAction.Status.KILLED)); 295 } 296 catch (Exception ex) { 297 throw convertException(ex); 298 } 299 } 300 301 private static Set<String> FINAL_STATUS = new HashSet<String>(); 302 303 static { 304 FINAL_STATUS.add("SUCCEEDED"); 305 FINAL_STATUS.add("KILLED"); 306 FINAL_STATUS.add("FAILED"); 307 } 308 309 public boolean isCompleted(String externalStatus) { 310 return FINAL_STATUS.contains(externalStatus); 311 } 312 313 public boolean supportsConfigurationJobXML() { 314 return true; 315 } 316}