001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.command.coord;
019    
020    import org.apache.hadoop.conf.Configuration;
021    
022    import org.apache.oozie.client.CoordinatorAction;
023    import org.apache.oozie.client.OozieClient;
024    import org.apache.oozie.CoordinatorActionBean;
025    import org.apache.oozie.DagEngineException;
026    import org.apache.oozie.DagEngine;
027    import org.apache.oozie.ErrorCode;
028    import org.apache.oozie.SLAEventBean;
029    import org.apache.oozie.WorkflowJobBean;
030    import org.apache.oozie.command.CommandException;
031    import org.apache.oozie.command.PreconditionException;
032    import org.apache.oozie.service.DagEngineService;
033    import org.apache.oozie.service.EventHandlerService;
034    import org.apache.oozie.service.JPAService;
035    import org.apache.oozie.service.Services;
036    import org.apache.oozie.util.JobUtils;
037    import org.apache.oozie.util.LogUtils;
038    import org.apache.oozie.util.ParamChecker;
039    import org.apache.oozie.util.XLog;
040    import org.apache.oozie.util.XmlUtils;
041    import org.apache.oozie.util.XConfiguration;
042    import org.apache.oozie.util.db.SLADbOperations;
043    import org.apache.oozie.client.SLAEvent.SlaAppType;
044    import org.apache.oozie.client.SLAEvent.Status;
045    import org.apache.oozie.client.rest.JsonBean;
046    import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStartJPAExecutor;
047    import org.apache.oozie.executor.jpa.JPAExecutorException;
048    import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
049    import org.jdom.Element;
050    import org.jdom.JDOMException;
051    
052    import java.io.IOException;
053    import java.io.StringReader;
054    import java.util.ArrayList;
055    import java.util.Date;
056    import java.util.List;
057    
058    @SuppressWarnings("deprecation")
059    public class CoordActionStartXCommand extends CoordinatorXCommand<Void> {
060    
061        public static final String EL_ERROR = "EL_ERROR";
062        public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
063        public static final String COULD_NOT_START = "COULD_NOT_START";
064        public static final String START_DATA_MISSING = "START_DATA_MISSING";
065        public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
066    
067        private final XLog log = getLog();
068        private String actionId = null;
069        private String user = null;
070        private String appName = null;
071        private CoordinatorActionBean coordAction = null;
072        private JPAService jpaService = null;
073        private String jobId = null;
074        private List<JsonBean> updateList = new ArrayList<JsonBean>();
075        private List<JsonBean> insertList = new ArrayList<JsonBean>();
076    
077        public CoordActionStartXCommand(String id, String user, String appName, String jobId) {
078            //super("coord_action_start", "coord_action_start", 1, XLog.OPS);
079            super("coord_action_start", "coord_action_start", 1);
080            this.actionId = ParamChecker.notEmpty(id, "id");
081            this.user = ParamChecker.notEmpty(user, "user");
082            this.appName = ParamChecker.notEmpty(appName, "appName");
083            this.jobId = jobId;
084        }
085    
086        /**
087         * Create config to pass to WF Engine 1. Get createdConf from coord_actions table 2. Get actionXml from
088         * coord_actions table. Extract all 'property' tags and merge createdConf (overwrite duplicate keys). 3. Extract
089         * 'app-path' from actionXML. Create a new property called 'oozie.wf.application.path' and merge with createdConf
090         * (overwrite duplicate keys) 4. Read contents of config-default.xml in workflow directory. 5. Merge createdConf
091         * with config-default.xml (overwrite duplicate keys). 6. Results is runConf which is saved in coord_actions table.
092         * Merge Action createdConf with actionXml to create new runConf with replaced variables
093         *
094         * @param action CoordinatorActionBean
095         * @return Configuration
096         * @throws CommandException
097         */
098        private Configuration mergeConfig(CoordinatorActionBean action) throws CommandException {
099            String createdConf = action.getCreatedConf();
100            String actionXml = action.getActionXml();
101            Element workflowProperties = null;
102            try {
103                workflowProperties = XmlUtils.parseXml(actionXml);
104            }
105            catch (JDOMException e1) {
106                log.warn("Configuration parse error in:" + actionXml);
107                throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
108            }
109            // generate the 'runConf' for this action
110            // Step 1: runConf = createdConf
111            Configuration runConf = null;
112            try {
113                runConf = new XConfiguration(new StringReader(createdConf));
114            }
115            catch (IOException e1) {
116                log.warn("Configuration parse error in:" + createdConf);
117                throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
118            }
119            // Step 2: Merge local properties into runConf
120            // extract 'property' tags under 'configuration' block in the
121            // coordinator.xml (saved in actionxml column)
122            // convert Element to XConfiguration
123            Element configElement = workflowProperties.getChild("action", workflowProperties.getNamespace())
124                    .getChild("workflow", workflowProperties.getNamespace()).getChild("configuration",
125                                                                                      workflowProperties.getNamespace());
126            if (configElement != null) {
127                String strConfig = XmlUtils.prettyPrint(configElement).toString();
128                Configuration localConf;
129                try {
130                    localConf = new XConfiguration(new StringReader(strConfig));
131                }
132                catch (IOException e1) {
133                    log.warn("Configuration parse error in:" + strConfig);
134                    throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
135                }
136    
137                // copy configuration properties in coordinator.xml to the runConf
138                XConfiguration.copy(localConf, runConf);
139            }
140    
141            // Step 3: Extract value of 'app-path' in actionxml, and save it as a
142            // new property called 'oozie.wf.application.path'
143            // WF Engine requires the path to the workflow.xml to be saved under
144            // this property name
145            String appPath = workflowProperties.getChild("action", workflowProperties.getNamespace()).getChild("workflow",
146                                                                                                               workflowProperties.getNamespace()).getChild("app-path", workflowProperties.getNamespace()).getValue();
147            runConf.set("oozie.wf.application.path", appPath);
148            return runConf;
149        }
150    
151        @Override
152        protected Void execute() throws CommandException {
153            boolean makeFail = true;
154            String errCode = "";
155            String errMsg = "";
156            ParamChecker.notEmpty(user, "user");
157    
158            log.debug("actionid=" + actionId + ", status=" + coordAction.getStatus());
159            if (coordAction.getStatus() == CoordinatorAction.Status.SUBMITTED) {
160                // log.debug("getting.. job id: " + coordAction.getJobId());
161                // create merged runConf to pass to WF Engine
162                Configuration runConf = mergeConfig(coordAction);
163                coordAction.setRunConf(XmlUtils.prettyPrint(runConf).toString());
164                // log.debug("%%% merged runconf=" +
165                // XmlUtils.prettyPrint(runConf).toString());
166                DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user);
167                try {
168                    Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf()));
169                    SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED,
170                            SlaAppType.COORDINATOR_ACTION, log);
171                    if(slaEvent != null) {
172                        insertList.add(slaEvent);
173                    }
174    
175                    // Normalize workflow appPath here;
176                    JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
177                    String wfId = dagEngine.submitJobFromCoordinator(conf, actionId);
178                    coordAction.setStatus(CoordinatorAction.Status.RUNNING);
179                    coordAction.setExternalId(wfId);
180                    coordAction.incrementAndGetPending();
181    
182                    //store.updateCoordinatorAction(coordAction);
183                    JPAService jpaService = Services.get().get(JPAService.class);
184                    if (jpaService != null) {
185                        log.debug("Updating WF record for WFID :" + wfId + " with parent id: " + actionId);
186                        WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfId));
187                        wfJob.setParentId(actionId);
188                        wfJob.setLastModifiedTime(new Date());
189                        updateList.add(wfJob);
190                        updateList.add(coordAction);
191                        try {
192                            jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
193                            if (EventHandlerService.isEnabled()) {
194                                generateEvent(coordAction, user, appName, wfJob.getStartTime());
195                            }
196                        }
197                        catch (JPAExecutorException je) {
198                            throw new CommandException(je);
199                        }
200                    }
201                    else {
202                        log.error(ErrorCode.E0610);
203                    }
204    
205                    makeFail = false;
206                }
207                catch (DagEngineException dee) {
208                    errMsg = dee.getMessage();
209                    errCode = dee.getErrorCode().toString();
210                    log.warn("can not create DagEngine for submitting jobs", dee);
211                }
212                catch (CommandException ce) {
213                    errMsg = ce.getMessage();
214                    errCode = ce.getErrorCode().toString();
215                    log.warn("command exception occured ", ce);
216                }
217                catch (java.io.IOException ioe) {
218                    errMsg = ioe.getMessage();
219                    errCode = "E1005";
220                    log.warn("Configuration parse error. read from DB :" + coordAction.getRunConf(), ioe);
221                }
222                catch (Exception ex) {
223                    errMsg = ex.getMessage();
224                    errCode = "E1005";
225                    log.warn("can not create DagEngine for submitting jobs", ex);
226                }
227                finally {
228                    if (makeFail == true) { // No DB exception occurs
229                        log.error("Failing the action " + coordAction.getId() + ". Because " + errCode + " : " + errMsg);
230                        coordAction.setStatus(CoordinatorAction.Status.FAILED);
231                        if (errMsg.length() > 254) { // Because table column size is 255
232                            errMsg = errMsg.substring(0, 255);
233                        }
234                        coordAction.setErrorMessage(errMsg);
235                        coordAction.setErrorCode(errCode);
236    
237                        updateList = new ArrayList<JsonBean>();
238                        updateList.add(coordAction);
239                        insertList = new ArrayList<JsonBean>();
240    
241                        SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.FAILED,
242                                SlaAppType.COORDINATOR_ACTION, log);
243                        if(slaEvent != null) {
244                            insertList.add(slaEvent); //Update SLA events
245                        }
246                        try {
247                            // call JPAExecutor to do the bulk writes
248                            jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
249                            if (EventHandlerService.isEnabled()) {
250                                generateEvent(coordAction, user, appName, null);
251                            }
252                        }
253                        catch (JPAExecutorException je) {
254                            throw new CommandException(je);
255                        }
256                        queue(new CoordActionReadyXCommand(coordAction.getJobId()));
257                    }
258                }
259            }
260            return null;
261        }
262    
263        @Override
264        public String getEntityKey() {
265            return this.jobId;
266        }
267    
268        @Override
269        protected boolean isLockRequired() {
270            return true;
271        }
272    
273        @Override
274        protected void loadState() throws CommandException {
275            eagerLoadState();
276        }
277    
278        @Override
279        protected void eagerLoadState() throws CommandException {
280            jpaService = Services.get().get(JPAService.class);
281            if (jpaService == null) {
282                throw new CommandException(ErrorCode.E0610);
283            }
284    
285            try {
286                coordAction = jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionGetForStartJPAExecutor(actionId));
287            }
288            catch (JPAExecutorException je) {
289                throw new CommandException(je);
290            }
291            LogUtils.setLogInfo(coordAction, logInfo);
292        }
293    
294        @Override
295        protected void verifyPrecondition() throws PreconditionException {
296            if (coordAction.getStatus() != CoordinatorAction.Status.SUBMITTED) {
297                throw new PreconditionException(ErrorCode.E1100, "The coord action [" + actionId + "] must have status "
298                        + CoordinatorAction.Status.SUBMITTED.name() + " but has status [" + coordAction.getStatus().name() + "]");
299            }
300        }
301    
302        @Override
303        public String getKey(){
304            return getName() + "_" + actionId;
305        }
306    }