001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.oozie;
019    
020    import org.apache.oozie.client.OozieClientException;
021    import org.apache.oozie.action.ActionExecutor;
022    import org.apache.oozie.action.ActionExecutorException;
023    import org.apache.oozie.DagEngine;
024    import org.apache.oozie.LocalOozieClient;
025    import org.apache.oozie.WorkflowJobBean;
026    import org.apache.oozie.service.DagEngineService;
027    import org.apache.oozie.client.WorkflowAction;
028    import org.apache.oozie.client.OozieClient;
029    import org.apache.oozie.client.WorkflowJob;
030    import org.apache.oozie.command.CommandException;
031    import org.apache.oozie.util.ConfigUtils;
032    import org.apache.oozie.util.JobUtils;
033    import org.apache.oozie.util.PropertiesUtils;
034    import org.apache.oozie.util.XmlUtils;
035    import org.apache.oozie.util.XConfiguration;
036    import org.apache.oozie.util.XLog;
037    import org.apache.oozie.service.Services;
038    import org.apache.hadoop.conf.Configuration;
039    import org.jdom.Element;
040    import org.jdom.Namespace;
041    
042    import java.io.StringReader;
043    import java.io.IOException;
044    import java.util.Set;
045    import java.util.HashSet;
046    
047    public class SubWorkflowActionExecutor extends ActionExecutor {
048        public static final String ACTION_TYPE = "sub-workflow";
049        public static final String LOCAL = "local";
050    
051        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
052    
053        static {
054            String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
055                    PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
056                    PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
057                    PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
058    
059            String[] badDefaultProps = {PropertiesUtils.HADOOP_USER};
060            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
061            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
062        }
063    
064        protected SubWorkflowActionExecutor() {
065            super(ACTION_TYPE);
066        }
067    
068        public void initActionType() {
069            super.initActionType();
070        }
071    
072        protected OozieClient getWorkflowClient(Context context, String oozieUri) {
073            OozieClient oozieClient;
074            if (oozieUri.equals(LOCAL)) {
075                WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
076                String user = workflow.getUser();
077                String group = workflow.getGroup();
078                String authToken = workflow.getAuthToken();
079                DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, authToken);
080                oozieClient = new LocalOozieClient(dagEngine);
081            }
082            else {
083                // TODO we need to add authToken to the WC for the remote case
084                oozieClient = new OozieClient(oozieUri);
085            }
086            return oozieClient;
087        }
088    
089        protected void injectInline(Element eConf, Configuration subWorkflowConf) throws IOException,
090                ActionExecutorException {
091            if (eConf != null) {
092                String strConf = XmlUtils.prettyPrint(eConf).toString();
093                Configuration conf = new XConfiguration(new StringReader(strConf));
094                try {
095                    PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_DEFAULT_PROPERTIES);
096                }
097                catch (CommandException ex) {
098                    throw convertException(ex);
099                }
100                XConfiguration.copy(conf, subWorkflowConf);
101            }
102        }
103    
104        @SuppressWarnings("unchecked")
105        protected void injectCallback(Context context, Configuration conf) {
106            String callback = context.getCallbackUrl("$status");
107            if (conf.get(OozieClient.WORKFLOW_NOTIFICATION_URL) != null) {
108                XLog.getLog(getClass())
109                        .warn("Sub-Workflow configuration has a custom job end notification URI, overriding");
110            }
111            conf.set(OozieClient.WORKFLOW_NOTIFICATION_URL, callback);
112        }
113    
114        protected void injectRecovery(String externalId, Configuration conf) {
115            conf.set(OozieClient.EXTERNAL_ID, externalId);
116        }
117    
118        protected String checkIfRunning(OozieClient oozieClient, String extId) throws OozieClientException {
119            String jobId = oozieClient.getJobId(extId);
120            if (jobId.equals("")) {
121                return null;
122            }
123            return jobId;
124        }
125    
126        public void start(Context context, WorkflowAction action) throws ActionExecutorException {
127            try {
128                Element eConf = XmlUtils.parseXml(action.getConf());
129                Namespace ns = eConf.getNamespace();
130                Element e = eConf.getChild("oozie", ns);
131                String oozieUri = (e == null) ? LOCAL : e.getTextTrim();
132                OozieClient oozieClient = getWorkflowClient(context, oozieUri);
133                String subWorkflowId = null;
134                String extId = context.getRecoveryId();
135                String runningJobId = null;
136                if (extId != null) {
137                    runningJobId = checkIfRunning(oozieClient, extId);
138                }
139                if (runningJobId == null) {
140                    String appPath = eConf.getChild("app-path", ns).getTextTrim();
141    
142                    XConfiguration subWorkflowConf = new XConfiguration();
143                    Configuration parentConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
144                    if (eConf.getChild(("propagate-configuration"), ns) != null) {
145                        XConfiguration.copy(parentConf, subWorkflowConf);
146                    }
147    
148                    // the proto has the necessary credentials
149                    Configuration protoActionConf = context.getProtoActionConf();
150                    XConfiguration.copy(protoActionConf, subWorkflowConf);
151                    subWorkflowConf.set(OozieClient.APP_PATH, appPath);
152                    String group = ConfigUtils.getWithDeprecatedCheck(parentConf, OozieClient.JOB_ACL, OozieClient.GROUP_NAME, null);
153                    if(group != null) {
154                        subWorkflowConf.set(OozieClient.GROUP_NAME, group);
155                    }
156                    injectInline(eConf.getChild("configuration", ns), subWorkflowConf);
157                    injectCallback(context, subWorkflowConf);
158                    injectRecovery(extId, subWorkflowConf);
159    
160                    //TODO: this has to be refactored later to be done in a single place for REST calls and this
161                    JobUtils.normalizeAppPath(context.getWorkflow().getUser(), context.getWorkflow().getGroup(),
162                                              subWorkflowConf);
163    
164                    subWorkflowId = oozieClient.run(subWorkflowConf.toProperties());
165                }
166                else {
167                    subWorkflowId = runningJobId;
168                }
169                WorkflowJob workflow = oozieClient.getJobInfo(subWorkflowId);
170                String consoleUrl = workflow.getConsoleUrl();
171                context.setStartData(subWorkflowId, oozieUri, consoleUrl);
172                if (runningJobId != null) {
173                    check(context, action);
174                }
175            }
176            catch (Exception ex) {
177                throw convertException(ex);
178            }
179        }
180    
181        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
182            try {
183                String externalStatus = action.getExternalStatus();
184                WorkflowAction.Status status = externalStatus.equals("SUCCEEDED") ? WorkflowAction.Status.OK
185                                               : WorkflowAction.Status.ERROR;
186                context.setEndData(status, getActionSignal(status));
187            }
188            catch (Exception ex) {
189                throw convertException(ex);
190            }
191        }
192    
193        public void check(Context context, WorkflowAction action) throws ActionExecutorException {
194            try {
195                String subWorkflowId = action.getExternalId();
196                String oozieUri = action.getTrackerUri();
197                OozieClient oozieClient = getWorkflowClient(context, oozieUri);
198                WorkflowJob subWorkflow = oozieClient.getJobInfo(subWorkflowId);
199                WorkflowJob.Status status = subWorkflow.getStatus();
200                switch (status) {
201                    case FAILED:
202                    case KILLED:
203                    case SUCCEEDED:
204                        context.setExecutionData(status.toString(), null);
205                        break;
206                    default:
207                        context.setExternalStatus(status.toString());
208                        break;
209                }
210            }
211            catch (Exception ex) {
212                throw convertException(ex);
213            }
214        }
215    
216        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
217            try {
218                String subWorkflowId = action.getExternalId();
219                String oozieUri = action.getTrackerUri();
220                OozieClient oozieClient = getWorkflowClient(context, oozieUri);
221                oozieClient.kill(subWorkflowId);
222                context.setEndData(WorkflowAction.Status.KILLED, getActionSignal(WorkflowAction.Status.KILLED));
223            }
224            catch (Exception ex) {
225                throw convertException(ex);
226            }
227        }
228    
229        private static Set<String> FINAL_STATUS = new HashSet<String>();
230    
231        static {
232            FINAL_STATUS.add("SUCCEEDED");
233            FINAL_STATUS.add("KILLED");
234            FINAL_STATUS.add("FAILED");
235        }
236    
237        public boolean isCompleted(String externalStatus) {
238            return FINAL_STATUS.contains(externalStatus);
239        }
240    }