001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.action.oozie; 019 020 import org.apache.oozie.client.OozieClientException; 021 import org.apache.oozie.action.ActionExecutor; 022 import org.apache.oozie.action.ActionExecutorException; 023 import org.apache.oozie.DagEngine; 024 import org.apache.oozie.LocalOozieClient; 025 import org.apache.oozie.WorkflowJobBean; 026 import org.apache.oozie.service.DagEngineService; 027 import org.apache.oozie.client.WorkflowAction; 028 import org.apache.oozie.client.OozieClient; 029 import org.apache.oozie.client.WorkflowJob; 030 import org.apache.oozie.command.CommandException; 031 import org.apache.oozie.util.ConfigUtils; 032 import org.apache.oozie.util.JobUtils; 033 import org.apache.oozie.util.PropertiesUtils; 034 import org.apache.oozie.util.XmlUtils; 035 import org.apache.oozie.util.XConfiguration; 036 import org.apache.oozie.util.XLog; 037 import org.apache.oozie.service.Services; 038 import org.apache.hadoop.conf.Configuration; 039 import org.jdom.Element; 040 import org.jdom.Namespace; 041 042 import java.io.StringReader; 043 import java.io.IOException; 044 import java.util.Set; 045 import java.util.HashSet; 046 047 public class SubWorkflowActionExecutor extends ActionExecutor { 048 public static final String ACTION_TYPE = "sub-workflow"; 049 public static final String LOCAL = "local"; 050 public static final String PARENT_ID = "oozie.wf.parent.id"; 051 052 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); 053 054 static { 055 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, 056 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB, 057 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN, 058 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS}; 059 060 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER}; 061 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES); 062 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES); 063 } 064 065 protected SubWorkflowActionExecutor() { 066 super(ACTION_TYPE); 067 } 068 069 public void initActionType() { 070 super.initActionType(); 071 } 072 073 protected OozieClient getWorkflowClient(Context context, String oozieUri) { 074 OozieClient oozieClient; 075 if (oozieUri.equals(LOCAL)) { 076 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow(); 077 String user = workflow.getUser(); 078 String group = workflow.getGroup(); 079 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); 080 oozieClient = new LocalOozieClient(dagEngine); 081 } 082 else { 083 // TODO we need to add authToken to the WC for the remote case 084 oozieClient = new OozieClient(oozieUri); 085 } 086 return oozieClient; 087 } 088 089 protected void injectInline(Element eConf, Configuration subWorkflowConf) throws IOException, 090 ActionExecutorException { 091 if (eConf != null) { 092 String strConf = XmlUtils.prettyPrint(eConf).toString(); 093 Configuration conf = new XConfiguration(new StringReader(strConf)); 094 try { 095 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_DEFAULT_PROPERTIES); 096 } 097 catch (CommandException ex) { 098 throw convertException(ex); 099 } 100 XConfiguration.copy(conf, subWorkflowConf); 101 } 102 } 103 104 @SuppressWarnings("unchecked") 105 protected void injectCallback(Context context, Configuration conf) { 106 String callback = context.getCallbackUrl("$status"); 107 if (conf.get(OozieClient.WORKFLOW_NOTIFICATION_URL) != null) { 108 XLog.getLog(getClass()) 109 .warn("Sub-Workflow configuration has a custom job end notification URI, overriding"); 110 } 111 conf.set(OozieClient.WORKFLOW_NOTIFICATION_URL, callback); 112 } 113 114 protected void injectRecovery(String externalId, Configuration conf) { 115 conf.set(OozieClient.EXTERNAL_ID, externalId); 116 } 117 118 protected void injectParent(String parentId, Configuration conf) { 119 conf.set(PARENT_ID, parentId); 120 } 121 122 protected String checkIfRunning(OozieClient oozieClient, String extId) throws OozieClientException { 123 String jobId = oozieClient.getJobId(extId); 124 if (jobId.equals("")) { 125 return null; 126 } 127 return jobId; 128 } 129 130 public void start(Context context, WorkflowAction action) throws ActionExecutorException { 131 try { 132 Element eConf = XmlUtils.parseXml(action.getConf()); 133 Namespace ns = eConf.getNamespace(); 134 Element e = eConf.getChild("oozie", ns); 135 String oozieUri = (e == null) ? LOCAL : e.getTextTrim(); 136 OozieClient oozieClient = getWorkflowClient(context, oozieUri); 137 String subWorkflowId = null; 138 String extId = context.getRecoveryId(); 139 String runningJobId = null; 140 if (extId != null) { 141 runningJobId = checkIfRunning(oozieClient, extId); 142 } 143 if (runningJobId == null) { 144 String appPath = eConf.getChild("app-path", ns).getTextTrim(); 145 146 XConfiguration subWorkflowConf = new XConfiguration(); 147 Configuration parentConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); 148 if (eConf.getChild(("propagate-configuration"), ns) != null) { 149 XConfiguration.copy(parentConf, subWorkflowConf); 150 } 151 152 // the proto has the necessary credentials 153 Configuration protoActionConf = context.getProtoActionConf(); 154 XConfiguration.copy(protoActionConf, subWorkflowConf); 155 subWorkflowConf.set(OozieClient.APP_PATH, appPath); 156 String group = ConfigUtils.getWithDeprecatedCheck(parentConf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null); 157 if(group != null) { 158 subWorkflowConf.set(OozieClient.GROUP_NAME, group); 159 } 160 injectInline(eConf.getChild("configuration", ns), subWorkflowConf); 161 injectCallback(context, subWorkflowConf); 162 injectRecovery(extId, subWorkflowConf); 163 injectParent(context.getWorkflow().getId(), subWorkflowConf); 164 165 //TODO: this has to be refactored later to be done in a single place for REST calls and this 166 JobUtils.normalizeAppPath(context.getWorkflow().getUser(), context.getWorkflow().getGroup(), 167 subWorkflowConf); 168 169 subWorkflowId = oozieClient.run(subWorkflowConf.toProperties()); 170 } 171 else { 172 subWorkflowId = runningJobId; 173 } 174 WorkflowJob workflow = oozieClient.getJobInfo(subWorkflowId); 175 String consoleUrl = workflow.getConsoleUrl(); 176 context.setStartData(subWorkflowId, oozieUri, consoleUrl); 177 if (runningJobId != null) { 178 check(context, action); 179 } 180 } 181 catch (Exception ex) { 182 throw convertException(ex); 183 } 184 } 185 186 public void end(Context context, WorkflowAction action) throws ActionExecutorException { 187 try { 188 String externalStatus = action.getExternalStatus(); 189 WorkflowAction.Status status = externalStatus.equals("SUCCEEDED") ? WorkflowAction.Status.OK 190 : WorkflowAction.Status.ERROR; 191 context.setEndData(status, getActionSignal(status)); 192 } 193 catch (Exception ex) { 194 throw convertException(ex); 195 } 196 } 197 198 public void check(Context context, WorkflowAction action) throws ActionExecutorException { 199 try { 200 String subWorkflowId = action.getExternalId(); 201 String oozieUri = action.getTrackerUri(); 202 OozieClient oozieClient = getWorkflowClient(context, oozieUri); 203 WorkflowJob subWorkflow = oozieClient.getJobInfo(subWorkflowId); 204 WorkflowJob.Status status = subWorkflow.getStatus(); 205 switch (status) { 206 case FAILED: 207 case KILLED: 208 case SUCCEEDED: 209 context.setExecutionData(status.toString(), null); 210 break; 211 default: 212 context.setExternalStatus(status.toString()); 213 break; 214 } 215 } 216 catch (Exception ex) { 217 throw convertException(ex); 218 } 219 } 220 221 public void kill(Context context, WorkflowAction action) throws ActionExecutorException { 222 try { 223 String subWorkflowId = action.getExternalId(); 224 String oozieUri = action.getTrackerUri(); 225 OozieClient oozieClient = getWorkflowClient(context, oozieUri); 226 oozieClient.kill(subWorkflowId); 227 context.setEndData(WorkflowAction.Status.KILLED, getActionSignal(WorkflowAction.Status.KILLED)); 228 } 229 catch (Exception ex) { 230 throw convertException(ex); 231 } 232 } 233 234 private static Set<String> FINAL_STATUS = new HashSet<String>(); 235 236 static { 237 FINAL_STATUS.add("SUCCEEDED"); 238 FINAL_STATUS.add("KILLED"); 239 FINAL_STATUS.add("FAILED"); 240 } 241 242 public boolean isCompleted(String externalStatus) { 243 return FINAL_STATUS.contains(externalStatus); 244 } 245 }