001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.oozie;
019    
020    import org.apache.oozie.client.OozieClientException;
021    import org.apache.oozie.action.ActionExecutor;
022    import org.apache.oozie.action.ActionExecutorException;
023    import org.apache.oozie.DagEngine;
024    import org.apache.oozie.LocalOozieClient;
025    import org.apache.oozie.WorkflowJobBean;
026    import org.apache.oozie.service.DagEngineService;
027    import org.apache.oozie.client.WorkflowAction;
028    import org.apache.oozie.client.OozieClient;
029    import org.apache.oozie.client.WorkflowJob;
030    import org.apache.oozie.command.CommandException;
031    import org.apache.oozie.util.ConfigUtils;
032    import org.apache.oozie.util.JobUtils;
033    import org.apache.oozie.util.PropertiesUtils;
034    import org.apache.oozie.util.XmlUtils;
035    import org.apache.oozie.util.XConfiguration;
036    import org.apache.oozie.util.XLog;
037    import org.apache.oozie.service.Services;
038    import org.apache.hadoop.conf.Configuration;
039    import org.jdom.Element;
040    import org.jdom.Namespace;
041    
042    import java.io.StringReader;
043    import java.io.IOException;
044    import java.util.Set;
045    import java.util.HashSet;
046    
047    public class SubWorkflowActionExecutor extends ActionExecutor {
048        public static final String ACTION_TYPE = "sub-workflow";
049        public static final String LOCAL = "local";
050        public static final String PARENT_ID = "oozie.wf.parent.id";
051    
052        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
053    
054        static {
055            String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
056                    PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
057                    PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
058                    PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
059    
060            String[] badDefaultProps = {PropertiesUtils.HADOOP_USER};
061            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
062            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
063        }
064    
065        protected SubWorkflowActionExecutor() {
066            super(ACTION_TYPE);
067        }
068    
069        public void initActionType() {
070            super.initActionType();
071        }
072    
073        protected OozieClient getWorkflowClient(Context context, String oozieUri) {
074            OozieClient oozieClient;
075            if (oozieUri.equals(LOCAL)) {
076                WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
077                String user = workflow.getUser();
078                String group = workflow.getGroup();
079                DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
080                oozieClient = new LocalOozieClient(dagEngine);
081            }
082            else {
083                // TODO we need to add authToken to the WC for the remote case
084                oozieClient = new OozieClient(oozieUri);
085            }
086            return oozieClient;
087        }
088    
089        protected void injectInline(Element eConf, Configuration subWorkflowConf) throws IOException,
090                ActionExecutorException {
091            if (eConf != null) {
092                String strConf = XmlUtils.prettyPrint(eConf).toString();
093                Configuration conf = new XConfiguration(new StringReader(strConf));
094                try {
095                    PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_DEFAULT_PROPERTIES);
096                }
097                catch (CommandException ex) {
098                    throw convertException(ex);
099                }
100                XConfiguration.copy(conf, subWorkflowConf);
101            }
102        }
103    
104        @SuppressWarnings("unchecked")
105        protected void injectCallback(Context context, Configuration conf) {
106            String callback = context.getCallbackUrl("$status");
107            if (conf.get(OozieClient.WORKFLOW_NOTIFICATION_URL) != null) {
108                XLog.getLog(getClass())
109                        .warn("Sub-Workflow configuration has a custom job end notification URI, overriding");
110            }
111            conf.set(OozieClient.WORKFLOW_NOTIFICATION_URL, callback);
112        }
113    
114        protected void injectRecovery(String externalId, Configuration conf) {
115            conf.set(OozieClient.EXTERNAL_ID, externalId);
116        }
117    
118        protected void injectParent(String parentId, Configuration conf) {
119            conf.set(PARENT_ID, parentId);
120        }
121    
122        protected String checkIfRunning(OozieClient oozieClient, String extId) throws OozieClientException {
123            String jobId = oozieClient.getJobId(extId);
124            if (jobId.equals("")) {
125                return null;
126            }
127            return jobId;
128        }
129    
130        public void start(Context context, WorkflowAction action) throws ActionExecutorException {
131            try {
132                Element eConf = XmlUtils.parseXml(action.getConf());
133                Namespace ns = eConf.getNamespace();
134                Element e = eConf.getChild("oozie", ns);
135                String oozieUri = (e == null) ? LOCAL : e.getTextTrim();
136                OozieClient oozieClient = getWorkflowClient(context, oozieUri);
137                String subWorkflowId = null;
138                String extId = context.getRecoveryId();
139                String runningJobId = null;
140                if (extId != null) {
141                    runningJobId = checkIfRunning(oozieClient, extId);
142                }
143                if (runningJobId == null) {
144                    String appPath = eConf.getChild("app-path", ns).getTextTrim();
145    
146                    XConfiguration subWorkflowConf = new XConfiguration();
147                    Configuration parentConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
148                    if (eConf.getChild(("propagate-configuration"), ns) != null) {
149                        XConfiguration.copy(parentConf, subWorkflowConf);
150                    }
151    
152                    // the proto has the necessary credentials
153                    Configuration protoActionConf = context.getProtoActionConf();
154                    XConfiguration.copy(protoActionConf, subWorkflowConf);
155                    subWorkflowConf.set(OozieClient.APP_PATH, appPath);
156                    String group = ConfigUtils.getWithDeprecatedCheck(parentConf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
157                    if(group != null) {
158                        subWorkflowConf.set(OozieClient.GROUP_NAME, group);
159                    }
160                    injectInline(eConf.getChild("configuration", ns), subWorkflowConf);
161                    injectCallback(context, subWorkflowConf);
162                    injectRecovery(extId, subWorkflowConf);
163                    injectParent(context.getWorkflow().getId(), subWorkflowConf);
164    
165                    //TODO: this has to be refactored later to be done in a single place for REST calls and this
166                    JobUtils.normalizeAppPath(context.getWorkflow().getUser(), context.getWorkflow().getGroup(),
167                                              subWorkflowConf);
168    
169                    subWorkflowId = oozieClient.run(subWorkflowConf.toProperties());
170                }
171                else {
172                    subWorkflowId = runningJobId;
173                }
174                WorkflowJob workflow = oozieClient.getJobInfo(subWorkflowId);
175                String consoleUrl = workflow.getConsoleUrl();
176                context.setStartData(subWorkflowId, oozieUri, consoleUrl);
177                if (runningJobId != null) {
178                    check(context, action);
179                }
180            }
181            catch (Exception ex) {
182                throw convertException(ex);
183            }
184        }
185    
186        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
187            try {
188                String externalStatus = action.getExternalStatus();
189                WorkflowAction.Status status = externalStatus.equals("SUCCEEDED") ? WorkflowAction.Status.OK
190                                               : WorkflowAction.Status.ERROR;
191                context.setEndData(status, getActionSignal(status));
192            }
193            catch (Exception ex) {
194                throw convertException(ex);
195            }
196        }
197    
198        public void check(Context context, WorkflowAction action) throws ActionExecutorException {
199            try {
200                String subWorkflowId = action.getExternalId();
201                String oozieUri = action.getTrackerUri();
202                OozieClient oozieClient = getWorkflowClient(context, oozieUri);
203                WorkflowJob subWorkflow = oozieClient.getJobInfo(subWorkflowId);
204                WorkflowJob.Status status = subWorkflow.getStatus();
205                switch (status) {
206                    case FAILED:
207                    case KILLED:
208                    case SUCCEEDED:
209                        context.setExecutionData(status.toString(), null);
210                        break;
211                    default:
212                        context.setExternalStatus(status.toString());
213                        break;
214                }
215            }
216            catch (Exception ex) {
217                throw convertException(ex);
218            }
219        }
220    
221        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
222            try {
223                String subWorkflowId = action.getExternalId();
224                String oozieUri = action.getTrackerUri();
225                OozieClient oozieClient = getWorkflowClient(context, oozieUri);
226                oozieClient.kill(subWorkflowId);
227                context.setEndData(WorkflowAction.Status.KILLED, getActionSignal(WorkflowAction.Status.KILLED));
228            }
229            catch (Exception ex) {
230                throw convertException(ex);
231            }
232        }
233    
234        private static Set<String> FINAL_STATUS = new HashSet<String>();
235    
236        static {
237            FINAL_STATUS.add("SUCCEEDED");
238            FINAL_STATUS.add("KILLED");
239            FINAL_STATUS.add("FAILED");
240        }
241    
242        public boolean isCompleted(String externalStatus) {
243            return FINAL_STATUS.contains(externalStatus);
244        }
245    }