001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.action.oozie;
020
021import java.io.IOException;
022import java.io.StringReader;
023import java.util.HashSet;
024import java.util.Set;
025
026import org.apache.hadoop.conf.Configuration;
027import org.apache.oozie.DagEngine;
028import org.apache.oozie.LocalOozieClient;
029import org.apache.oozie.WorkflowJobBean;
030import org.apache.oozie.action.ActionExecutor;
031import org.apache.oozie.action.ActionExecutorException;
032import org.apache.oozie.action.hadoop.OozieJobInfo;
033import org.apache.oozie.client.OozieClient;
034import org.apache.oozie.client.OozieClientException;
035import org.apache.oozie.client.WorkflowAction;
036import org.apache.oozie.client.WorkflowJob;
037import org.apache.oozie.command.CommandException;
038import org.apache.oozie.service.ConfigurationService;
039import org.apache.oozie.service.DagEngineService;
040import org.apache.oozie.service.Services;
041import org.apache.oozie.util.ConfigUtils;
042import org.apache.oozie.util.JobUtils;
043import org.apache.oozie.util.PropertiesUtils;
044import org.apache.oozie.util.XConfiguration;
045import org.apache.oozie.util.XLog;
046import org.apache.oozie.util.XmlUtils;
047import org.jdom.Element;
048import org.jdom.Namespace;
049
050public class SubWorkflowActionExecutor extends ActionExecutor {
051    public static final String ACTION_TYPE = "sub-workflow";
052    public static final String LOCAL = "local";
053    public static final String PARENT_ID = "oozie.wf.parent.id";
054    public static final String SUPER_PARENT_ID = "oozie.wf.superparent.id";
055    public static final String SUBWORKFLOW_MAX_DEPTH = "oozie.action.subworkflow.max.depth";
056    public static final String SUBWORKFLOW_DEPTH = "oozie.action.subworkflow.depth";
057    public static final String SUBWORKFLOW_RERUN = "oozie.action.subworkflow.rerun";
058
059    private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
060    public XLog LOG = XLog.getLog(getClass());
061
062
063    static {
064        String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
065                PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
066                PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
067                PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
068
069        String[] badDefaultProps = {PropertiesUtils.HADOOP_USER};
070        PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
071        PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
072    }
073
074    protected SubWorkflowActionExecutor() {
075        super(ACTION_TYPE);
076    }
077
078    public void initActionType() {
079        super.initActionType();
080    }
081
082    protected OozieClient getWorkflowClient(Context context, String oozieUri) {
083        OozieClient oozieClient;
084        if (oozieUri.equals(LOCAL)) {
085            WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
086            String user = workflow.getUser();
087            String group = workflow.getGroup();
088            DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
089            oozieClient = new LocalOozieClient(dagEngine);
090        }
091        else {
092            // TODO we need to add authToken to the WC for the remote case
093            oozieClient = new OozieClient(oozieUri);
094        }
095        return oozieClient;
096    }
097
098    protected void injectInline(Element eConf, Configuration subWorkflowConf) throws IOException,
099            ActionExecutorException {
100        if (eConf != null) {
101            String strConf = XmlUtils.prettyPrint(eConf).toString();
102            Configuration conf = new XConfiguration(new StringReader(strConf));
103            try {
104                PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_DEFAULT_PROPERTIES);
105            }
106            catch (CommandException ex) {
107                throw convertException(ex);
108            }
109            XConfiguration.copy(conf, subWorkflowConf);
110        }
111    }
112
113    @SuppressWarnings("unchecked")
114    protected void injectCallback(Context context, Configuration conf) {
115        String callback = context.getCallbackUrl("$status");
116        if (conf.get(OozieClient.WORKFLOW_NOTIFICATION_URL) != null) {
117            XLog.getLog(getClass())
118                    .warn("Sub-Workflow configuration has a custom job end notification URI, overriding");
119        }
120        conf.set(OozieClient.WORKFLOW_NOTIFICATION_URL, callback);
121    }
122
123    protected void injectRecovery(String externalId, Configuration conf) {
124        conf.set(OozieClient.EXTERNAL_ID, externalId);
125    }
126
127    protected void injectParent(String parentId, Configuration conf) {
128        conf.set(PARENT_ID, parentId);
129    }
130
131    protected void injectSuperParent(WorkflowJob parentWorkflow, Configuration parentConf, Configuration conf) {
132        String superParentId = parentConf.get(SUPER_PARENT_ID);
133        if (superParentId == null) {
134            // This is a sub-workflow at depth 1
135            superParentId = parentWorkflow.getParentId();
136
137            // If the parent workflow is not submitted through a coordinator then the parentId will be the super parent id.
138            if (superParentId == null) {
139                superParentId = parentWorkflow.getId();
140            }
141            conf.set(SUPER_PARENT_ID, superParentId);
142        } else {
143            // Sub-workflow at depth 2 or more.
144            conf.set(SUPER_PARENT_ID, superParentId);
145        }
146    }
147
148    protected void verifyAndInjectSubworkflowDepth(Configuration parentConf, Configuration conf) throws ActionExecutorException {
149        int depth = parentConf.getInt(SUBWORKFLOW_DEPTH, 0);
150        int maxDepth = ConfigurationService.getInt(SUBWORKFLOW_MAX_DEPTH);
151        if (depth >= maxDepth) {
152            throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "SUBWF001",
153                    "Depth [{0}] cannot exceed maximum subworkflow depth [{1}]", (depth + 1), maxDepth);
154        }
155        conf.setInt(SUBWORKFLOW_DEPTH, depth + 1);
156    }
157
158    protected String checkIfRunning(OozieClient oozieClient, String extId) throws OozieClientException {
159        String jobId = oozieClient.getJobId(extId);
160        if (jobId.equals("")) {
161            return null;
162        }
163        return jobId;
164    }
165
166    public void start(Context context, WorkflowAction action) throws ActionExecutorException {
167        try {
168            Element eConf = XmlUtils.parseXml(action.getConf());
169            Namespace ns = eConf.getNamespace();
170            Element e = eConf.getChild("oozie", ns);
171            String oozieUri = (e == null) ? LOCAL : e.getTextTrim();
172            OozieClient oozieClient = getWorkflowClient(context, oozieUri);
173            String subWorkflowId = null;
174            String extId = context.getRecoveryId();
175            String runningJobId = null;
176            if (extId != null) {
177                runningJobId = checkIfRunning(oozieClient, extId);
178            }
179            if (runningJobId == null) {
180                String appPath = eConf.getChild("app-path", ns).getTextTrim();
181
182                XConfiguration subWorkflowConf = new XConfiguration();
183
184                Configuration parentConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
185
186                if (eConf.getChild(("propagate-configuration"), ns) != null) {
187                    XConfiguration.copy(parentConf, subWorkflowConf);
188                }
189
190                // Propagate coordinator and bundle info to subworkflow
191                if (OozieJobInfo.isJobInfoEnabled()) {
192                  if (parentConf.get(OozieJobInfo.COORD_ID) != null) {
193                    subWorkflowConf.set(OozieJobInfo.COORD_ID, parentConf.get(OozieJobInfo.COORD_ID));
194                    subWorkflowConf.set(OozieJobInfo.COORD_NAME, parentConf.get(OozieJobInfo.COORD_NAME));
195                    subWorkflowConf.set(OozieJobInfo.COORD_NOMINAL_TIME, parentConf.get(OozieJobInfo.COORD_NOMINAL_TIME));
196                  }
197                  if (parentConf.get(OozieJobInfo.BUNDLE_ID) != null) {
198                    subWorkflowConf.set(OozieJobInfo.BUNDLE_ID, parentConf.get(OozieJobInfo.BUNDLE_ID));
199                    subWorkflowConf.set(OozieJobInfo.BUNDLE_NAME, parentConf.get(OozieJobInfo.BUNDLE_NAME));
200                  }
201                }
202
203                // the proto has the necessary credentials
204                Configuration protoActionConf = context.getProtoActionConf();
205                XConfiguration.copy(protoActionConf, subWorkflowConf);
206                subWorkflowConf.set(OozieClient.APP_PATH, appPath);
207                String group = ConfigUtils.getWithDeprecatedCheck(parentConf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
208                if(group != null) {
209                    subWorkflowConf.set(OozieClient.GROUP_NAME, group);
210                }
211
212                injectInline(eConf.getChild("configuration", ns), subWorkflowConf);
213                injectCallback(context, subWorkflowConf);
214                injectRecovery(extId, subWorkflowConf);
215                injectParent(context.getWorkflow().getId(), subWorkflowConf);
216                injectSuperParent(context.getWorkflow(), parentConf, subWorkflowConf);
217                verifyAndInjectSubworkflowDepth(parentConf, subWorkflowConf);
218
219                //TODO: this has to be refactored later to be done in a single place for REST calls and this
220                JobUtils.normalizeAppPath(context.getWorkflow().getUser(), context.getWorkflow().getGroup(),
221                                          subWorkflowConf);
222
223                subWorkflowConf.set(OOZIE_ACTION_YARN_TAG, getActionYarnTag(parentConf, context.getWorkflow(), action));
224
225                // if the rerun failed node option is provided during the time of rerun command, old subworkflow will
226                // rerun again.
227                if(action.getExternalId() != null && parentConf.getBoolean(OozieClient.RERUN_FAIL_NODES, false)) {
228                    subWorkflowConf.setBoolean(SUBWORKFLOW_RERUN, true);
229                    oozieClient.reRun(action.getExternalId(), subWorkflowConf.toProperties());
230                    subWorkflowId = action.getExternalId();
231                } else {
232                    subWorkflowId = oozieClient.run(subWorkflowConf.toProperties());
233                }
234            }
235            else {
236                subWorkflowId = runningJobId;
237            }
238            WorkflowJob workflow = oozieClient.getJobInfo(subWorkflowId);
239            String consoleUrl = workflow.getConsoleUrl();
240            context.setStartData(subWorkflowId, oozieUri, consoleUrl);
241            if (runningJobId != null) {
242                check(context, action);
243            }
244        }
245        catch (Exception ex) {
246            LOG.error(ex);
247            throw convertException(ex);
248        }
249    }
250
251    public void end(Context context, WorkflowAction action) throws ActionExecutorException {
252        try {
253            String externalStatus = action.getExternalStatus();
254            WorkflowAction.Status status = externalStatus.equals("SUCCEEDED") ? WorkflowAction.Status.OK
255                                           : WorkflowAction.Status.ERROR;
256            context.setEndData(status, getActionSignal(status));
257        }
258        catch (Exception ex) {
259            throw convertException(ex);
260        }
261    }
262
263    public void check(Context context, WorkflowAction action) throws ActionExecutorException {
264        try {
265            String subWorkflowId = action.getExternalId();
266            String oozieUri = action.getTrackerUri();
267            OozieClient oozieClient = getWorkflowClient(context, oozieUri);
268            WorkflowJob subWorkflow = oozieClient.getJobInfo(subWorkflowId);
269            WorkflowJob.Status status = subWorkflow.getStatus();
270            switch (status) {
271                case FAILED:
272                case KILLED:
273                case SUCCEEDED:
274                    context.setExecutionData(status.toString(), null);
275                    break;
276                default:
277                    context.setExternalStatus(status.toString());
278                    break;
279            }
280        }
281        catch (Exception ex) {
282            throw convertException(ex);
283        }
284    }
285
286    public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
287        try {
288            String subWorkflowId = action.getExternalId();
289            String oozieUri = action.getTrackerUri();
290            if (subWorkflowId != null && oozieUri != null) {
291                OozieClient oozieClient = getWorkflowClient(context, oozieUri);
292                oozieClient.kill(subWorkflowId);
293            }
294            context.setEndData(WorkflowAction.Status.KILLED, getActionSignal(WorkflowAction.Status.KILLED));
295        }
296        catch (Exception ex) {
297            throw convertException(ex);
298        }
299    }
300
301    private static Set<String> FINAL_STATUS = new HashSet<String>();
302
303    static {
304        FINAL_STATUS.add("SUCCEEDED");
305        FINAL_STATUS.add("KILLED");
306        FINAL_STATUS.add("FAILED");
307    }
308
309    public boolean isCompleted(String externalStatus) {
310        return FINAL_STATUS.contains(externalStatus);
311    }
312
313    public boolean supportsConfigurationJobXML() {
314        return true;
315    }
316}