001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     * 
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     * 
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.action.oozie;
019    
020    import org.apache.oozie.client.OozieClientException;
021    import org.apache.oozie.action.ActionExecutor;
022    import org.apache.oozie.action.ActionExecutorException;
023    import org.apache.oozie.DagEngine;
024    import org.apache.oozie.LocalOozieClient;
025    import org.apache.oozie.WorkflowJobBean;
026    import org.apache.oozie.service.DagEngineService;
027    import org.apache.oozie.service.WorkflowAppService;
028    import org.apache.oozie.client.WorkflowAction;
029    import org.apache.oozie.client.OozieClient;
030    import org.apache.oozie.client.WorkflowJob;
031    import org.apache.oozie.command.CommandException;
032    import org.apache.oozie.util.JobUtils;
033    import org.apache.oozie.util.PropertiesUtils;
034    import org.apache.oozie.util.XmlUtils;
035    import org.apache.oozie.util.XConfiguration;
036    import org.apache.oozie.util.XLog;
037    import org.apache.oozie.service.Services;
038    import org.apache.hadoop.conf.Configuration;
039    import org.jdom.Element;
040    import org.jdom.Namespace;
041    
042    import java.io.StringReader;
043    import java.io.IOException;
044    import java.util.Set;
045    import java.util.HashSet;
046    
047    public class SubWorkflowActionExecutor extends ActionExecutor {
048        public static final String ACTION_TYPE = "sub-workflow";
049        public static final String LOCAL = "local";
050    
051        private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>();
052    
053        static {
054            String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES,
055                    PropertiesUtils.KB, PropertiesUtils.MB, PropertiesUtils.GB, PropertiesUtils.TB, PropertiesUtils.PB,
056                    PropertiesUtils.RECORDS, PropertiesUtils.MAP_IN, PropertiesUtils.MAP_OUT, PropertiesUtils.REDUCE_IN,
057                    PropertiesUtils.REDUCE_OUT, PropertiesUtils.GROUPS};
058    
059            String[] badDefaultProps = {PropertiesUtils.HADOOP_USER, PropertiesUtils.HADOOP_UGI,
060                    WorkflowAppService.HADOOP_JT_KERBEROS_NAME, WorkflowAppService.HADOOP_NN_KERBEROS_NAME};
061            PropertiesUtils.createPropertySet(badUserProps, DISALLOWED_DEFAULT_PROPERTIES);
062            PropertiesUtils.createPropertySet(badDefaultProps, DISALLOWED_DEFAULT_PROPERTIES);
063        }
064    
065        protected SubWorkflowActionExecutor() {
066            super(ACTION_TYPE);
067        }
068    
069        public void initActionType() {
070            super.initActionType();
071        }
072    
073        protected OozieClient getWorkflowClient(Context context, String oozieUri) {
074            OozieClient oozieClient;
075            if (oozieUri.equals(LOCAL)) {
076                WorkflowJobBean workflow = (WorkflowJobBean) context.getWorkflow();
077                String user = workflow.getUser();
078                String group = workflow.getGroup();
079                String authToken = workflow.getAuthToken();
080                DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, authToken);
081                oozieClient = new LocalOozieClient(dagEngine);
082            }
083            else {
084                // TODO we need to add authToken to the WC for the remote case
085                oozieClient = new OozieClient(oozieUri);
086            }
087            return oozieClient;
088        }
089    
090        protected void injectInline(Element eConf, Configuration subWorkflowConf) throws IOException,
091                ActionExecutorException {
092            if (eConf != null) {
093                String strConf = XmlUtils.prettyPrint(eConf).toString();
094                Configuration conf = new XConfiguration(new StringReader(strConf));
095                try {
096                    PropertiesUtils.checkDisallowedProperties(conf, DISALLOWED_DEFAULT_PROPERTIES);
097                }
098                catch (CommandException ex) {
099                    throw convertException(ex);
100                }
101                XConfiguration.copy(conf, subWorkflowConf);
102            }
103        }
104    
105        @SuppressWarnings("unchecked")
106        protected void injectCallback(Context context, Configuration conf) {
107            String callback = context.getCallbackUrl("$status");
108            if (conf.get(OozieClient.WORKFLOW_NOTIFICATION_URL) != null) {
109                XLog.getLog(getClass())
110                        .warn("Sub-Workflow configuration has a custom job end notification URI, overriding");
111            }
112            conf.set(OozieClient.WORKFLOW_NOTIFICATION_URL, callback);
113        }
114    
115        protected void injectRecovery(String externalId, Configuration conf) {
116            conf.set(OozieClient.EXTERNAL_ID, externalId);
117        }
118    
119        protected String checkIfRunning(OozieClient oozieClient, String extId) throws OozieClientException {
120            String jobId = oozieClient.getJobId(extId);
121            if (jobId.equals("")) {
122                return null;
123            }
124            return jobId;
125        }
126    
127        public void start(Context context, WorkflowAction action) throws ActionExecutorException {
128            try {
129                Element eConf = XmlUtils.parseXml(action.getConf());
130                Namespace ns = eConf.getNamespace();
131                Element e = eConf.getChild("oozie", ns);
132                String oozieUri = (e == null) ? LOCAL : e.getTextTrim();
133                OozieClient oozieClient = getWorkflowClient(context, oozieUri);
134                String subWorkflowId = null;
135                String extId = context.getRecoveryId();
136                String runningJobId = null;
137                if (extId != null) {
138                    runningJobId = checkIfRunning(oozieClient, extId);
139                }
140                if (runningJobId == null) {
141                    String appPath = eConf.getChild("app-path", ns).getTextTrim();
142    
143                    XConfiguration subWorkflowConf = new XConfiguration();
144                    if (eConf.getChild(("propagate-configuration"), ns) != null) {
145                        Configuration parentConf = new XConfiguration(new StringReader(context.getWorkflow().getConf()));
146                        XConfiguration.copy(parentConf, subWorkflowConf);
147                    }
148    
149                    // the proto has the necessary credentials
150                    Configuration protoActionConf = context.getProtoActionConf();
151                    XConfiguration.copy(protoActionConf, subWorkflowConf);
152                    subWorkflowConf.set(OozieClient.APP_PATH, appPath);
153                    injectInline(eConf.getChild("configuration", ns), subWorkflowConf);
154                    injectCallback(context, subWorkflowConf);
155                    injectRecovery(extId, subWorkflowConf);
156    
157                    //TODO: this has to be refactored later to be done in a single place for REST calls and this
158                    JobUtils.normalizeAppPath(context.getWorkflow().getUser(), context.getWorkflow().getGroup(),
159                                              subWorkflowConf);
160    
161                    subWorkflowId = oozieClient.run(subWorkflowConf.toProperties());
162                }
163                else {
164                    subWorkflowId = runningJobId;
165                }
166                WorkflowJob workflow = oozieClient.getJobInfo(subWorkflowId);
167                String consoleUrl = workflow.getConsoleUrl();
168                context.setStartData(subWorkflowId, oozieUri, consoleUrl);
169                if (runningJobId != null) {
170                    check(context, action);
171                }
172            }
173            catch (Exception ex) {
174                throw convertException(ex);
175            }
176        }
177    
178        public void end(Context context, WorkflowAction action) throws ActionExecutorException {
179            try {
180                String externalStatus = action.getExternalStatus();
181                WorkflowAction.Status status = externalStatus.equals("SUCCEEDED") ? WorkflowAction.Status.OK
182                                               : WorkflowAction.Status.ERROR;
183                context.setEndData(status, getActionSignal(status));
184            }
185            catch (Exception ex) {
186                throw convertException(ex);
187            }
188        }
189    
190        public void check(Context context, WorkflowAction action) throws ActionExecutorException {
191            try {
192                String subWorkflowId = action.getExternalId();
193                String oozieUri = action.getTrackerUri();
194                OozieClient oozieClient = getWorkflowClient(context, oozieUri);
195                WorkflowJob subWorkflow = oozieClient.getJobInfo(subWorkflowId);
196                WorkflowJob.Status status = subWorkflow.getStatus();
197                switch (status) {
198                    case FAILED:
199                    case KILLED:
200                    case SUCCEEDED:
201                        context.setExecutionData(status.toString(), null);
202                        break;
203                    default:
204                        context.setExternalStatus(status.toString());
205                        break;
206                }
207            }
208            catch (Exception ex) {
209                throw convertException(ex);
210            }
211        }
212    
213        public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
214            try {
215                String subWorkflowId = action.getExternalId();
216                String oozieUri = action.getTrackerUri();
217                OozieClient oozieClient = getWorkflowClient(context, oozieUri);
218                oozieClient.kill(subWorkflowId);
219                context.setEndData(WorkflowAction.Status.KILLED, getActionSignal(WorkflowAction.Status.KILLED));
220            }
221            catch (Exception ex) {
222                throw convertException(ex);
223            }
224        }
225    
226        private static Set<String> FINAL_STATUS = new HashSet<String>();
227    
228        static {
229            FINAL_STATUS.add("SUCCEEDED");
230            FINAL_STATUS.add("KILLED");
231            FINAL_STATUS.add("FAILED");
232        }
233    
234        public boolean isCompleted(String externalStatus) {
235            return FINAL_STATUS.contains(externalStatus);
236        }
237    }