This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.action.oozie;
019
020 import org.apache.oozie.client.OozieClientException;
021 import org.apache.oozie.action.ActionExecutor;
022 import org.apache.oozie.action.ActionExecutorException;
023 import org.apache.oozie.DagEngine;
024 import org.apache.oozie.LocalOozieClient;
025 import org.apache.oozie.WorkflowJobBean;
026 import org.apache.oozie.service.DagEngineService;
027 import org.apache.oozie.client.WorkflowAction;
028 import org.apache.oozie.client.OozieClient;
029 import org.apache.oozie.client.WorkflowJob;
030 import org.apache.oozie.command.CommandException;
031 import org.apache.oozie.util.ConfigUtils;
032 import org.apache.oozie.util.JobUtils;
033 import org.apache.oozie.util.PropertiesUtils;
034 import org.apache.oozie.util.XmlUtils;
035 import org.apache.oozie.util.XConfiguration;
036 import org.apache.oozie.util.XLog;
037 import org.apache.oozie.service.Services;
038 import org.apache.hadoop.conf.Configuration;
039 import org.jdom.Element;
040 import org.jdom.Namespace;
041
042 import java.io.StringReader;
043 import java.io.IOException;
044 import java.util.Set;
045 import java.util.HashSet;
046
047 public class SubWorkflowActionExecutor extends ActionExecutor {
048 public static final String ACTION_TYPE = "sub-workflow";
049 public static final String LOCAL = "local";
050 public static final String PARENT_ID = "oozie.wf.parent.id";
051
052 private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
053
054 static {
055 String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
056 PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
057 PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
058 PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
059
060 String[] badDefaultProps = {PropertiesUtils.HADOOP_USER};
061 PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
062 PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
063 }
064
065 protected SubWorkflowActionExecutor() {
066 super(ACTION_TYPE);
067 }
068
069 public void initActionType() {
070 super.initActionType();
071 }
072
073 protected OozieClient getWorkflowClient(Context context, String oozieUri) {
074 OozieClient oozieClient;
075 if (oozieUri.equals(LOCAL)) {
076 WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
077 String user = workflow.getUser();
078 String group = workflow.getGroup();
079 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
080 oozieClient = new LocalOozieClient(dagEngine);
081 }
082 else {
083 // TODO we need to add authToken to the WC for the remote case
084 oozieClient = new OozieClient(oozieUri);
085 }
086 return oozieClient;
087 }
088
089 protected void injectInline(Element eConf, Configuration subWorkflowConf) throws IOException,
090 ActionExecutorException {
091 if (eConf != null) {
092 String strConf = XmlUtils.prettyPrint(eConf).toString();
093 Configuration conf = new XConfiguration(new StringReader(strConf));
094 try {
095 PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_DEFAULT_PROPERTIES);
096 }
097 catch (CommandException ex) {
098 throw convertException(ex);
099 }
100 XConfiguration.copy(conf, subWorkflowConf);
101 }
102 }
103
104 @SuppressWarnings("unchecked")
105 protected void injectCallback(Context context, Configuration conf) {
106 String callback = context.getCallbackUrl("$status");
107 if (conf.get(OozieClient.WORKFLOW_NOTIFICATION_URL) != null) {
108 XLog.getLog(getClass())
109 .warn("Sub-Workflow configuration has a custom job end notification URI, overriding");
110 }
111 conf.set(OozieClient.WORKFLOW_NOTIFICATION_URL, callback);
112 }
113
114 protected void injectRecovery(String externalId, Configuration conf) {
115 conf.set(OozieClient.EXTERNAL_ID, externalId);
116 }
117
118 protected void injectParent(String parentId, Configuration conf) {
119 conf.set(PARENT_ID, parentId);
120 }
121
122 protected String checkIfRunning(OozieClient oozieClient, String extId) throws OozieClientException {
123 String jobId = oozieClient.getJobId(extId);
124 if (jobId.equals("")) {
125 return null;
126 }
127 return jobId;
128 }
129
130 public void start(Context context, WorkflowAction action) throws ActionExecutorException {
131 try {
132 Element eConf = XmlUtils.parseXml(action.getConf());
133 Namespace ns = eConf.getNamespace();
134 Element e = eConf.getChild("oozie", ns);
135 String oozieUri = (e == null) ? LOCAL : e.getTextTrim();
136 OozieClient oozieClient = getWorkflowClient(context, oozieUri);
137 String subWorkflowId = null;
138 String extId = context.getRecoveryId();
139 String runningJobId = null;
140 if (extId != null) {
141 runningJobId = checkIfRunning(oozieClient, extId);
142 }
143 if (runningJobId == null) {
144 String appPath = eConf.getChild("app-path", ns).getTextTrim();
145
146 XConfiguration subWorkflowConf = new XConfiguration();
147 Configuration parentConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
148 if (eConf.getChild(("propagate-configuration"), ns) != null) {
149 XConfiguration.copy(parentConf, subWorkflowConf);
150 }
151
152 // the proto has the necessary credentials
153 Configuration protoActionConf = context.getProtoActionConf();
154 XConfiguration.copy(protoActionConf, subWorkflowConf);
155 subWorkflowConf.set(OozieClient.APP_PATH, appPath);
156 String group = ConfigUtils.getWithDeprecatedCheck(parentConf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
157 if(group != null) {
158 subWorkflowConf.set(OozieClient.GROUP_NAME, group);
159 }
160 injectInline(eConf.getChild("configuration", ns), subWorkflowConf);
161 injectCallback(context, subWorkflowConf);
162 injectRecovery(extId, subWorkflowConf);
163 injectParent(context.getWorkflow().getId(), subWorkflowConf);
164
165 //TODO: this has to be refactored later to be done in a single place for REST calls and this
166 JobUtils.normalizeAppPath(context.getWorkflow().getUser(), context.getWorkflow().getGroup(),
167 subWorkflowConf);
168
169 subWorkflowId = oozieClient.run(subWorkflowConf.toProperties());
170 }
171 else {
172 subWorkflowId = runningJobId;
173 }
174 WorkflowJob workflow = oozieClient.getJobInfo(subWorkflowId);
175 String consoleUrl = workflow.getConsoleUrl();
176 context.setStartData(subWorkflowId, oozieUri, consoleUrl);
177 if (runningJobId != null) {
178 check(context, action);
179 }
180 }
181 catch (Exception ex) {
182 throw convertException(ex);
183 }
184 }
185
186 public void end(Context context, WorkflowAction action) throws ActionExecutorException {
187 try {
188 String externalStatus = action.getExternalStatus();
189 WorkflowAction.Status status = externalStatus.equals("SUCCEEDED") ? WorkflowAction.Status.OK
190 : WorkflowAction.Status.ERROR;
191 context.setEndData(status, getActionSignal(status));
192 }
193 catch (Exception ex) {
194 throw convertException(ex);
195 }
196 }
197
198 public void check(Context context, WorkflowAction action) throws ActionExecutorException {
199 try {
200 String subWorkflowId = action.getExternalId();
201 String oozieUri = action.getTrackerUri();
202 OozieClient oozieClient = getWorkflowClient(context, oozieUri);
203 WorkflowJob subWorkflow = oozieClient.getJobInfo(subWorkflowId);
204 WorkflowJob.Status status = subWorkflow.getStatus();
205 switch (status) {
206 case FAILED:
207 case KILLED:
208 case SUCCEEDED:
209 context.setExecutionData(status.toString(), null);
210 break;
211 default:
212 context.setExternalStatus(status.toString());
213 break;
214 }
215 }
216 catch (Exception ex) {
217 throw convertException(ex);
218 }
219 }
220
221 public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
222 try {
223 String subWorkflowId = action.getExternalId();
224 String oozieUri = action.getTrackerUri();
225 OozieClient oozieClient = getWorkflowClient(context, oozieUri);
226 oozieClient.kill(subWorkflowId);
227 context.setEndData(WorkflowAction.Status.KILLED, getActionSignal(WorkflowAction.Status.KILLED));
228 }
229 catch (Exception ex) {
230 throw convertException(ex);
231 }
232 }
233
234 private static Set<String> FINAL_STATUS = new HashSet<String>();
235
236 static {
237 FINAL_STATUS.add("SUCCEEDED");
238 FINAL_STATUS.add("KILLED");
239 FINAL_STATUS.add("FAILED");
240 }
241
242 public boolean isCompleted(String externalStatus) {
243 return FINAL_STATUS.contains(externalStatus);
244 }
245 }