001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import org.apache.hadoop.conf.Configuration; 021 022 import org.apache.oozie.client.CoordinatorAction; 023 import org.apache.oozie.client.OozieClient; 024 import org.apache.oozie.CoordinatorActionBean; 025 import org.apache.oozie.DagEngine; 026 import org.apache.oozie.DagEngineException; 027 import org.apache.oozie.ErrorCode; 028 import org.apache.oozie.WorkflowJobBean; 029 import org.apache.oozie.command.CommandException; 030 import org.apache.oozie.service.DagEngineService; 031 import org.apache.oozie.service.WorkflowStoreService; 032 import org.apache.oozie.store.StoreException; 033 import org.apache.oozie.store.CoordinatorStore; 034 import org.apache.oozie.store.WorkflowStore; 035 import org.apache.oozie.service.Services; 036 import org.apache.oozie.util.JobUtils; 037 import org.apache.oozie.util.ParamChecker; 038 import org.apache.oozie.util.XLog; 039 import org.apache.oozie.util.XmlUtils; 040 import org.apache.oozie.util.XConfiguration; 041 import org.apache.oozie.util.db.SLADbOperations; 042 import org.apache.oozie.client.SLAEvent.SlaAppType; 043 import org.apache.oozie.client.SLAEvent.Status; 044 045 import org.jdom.Element; 046 import org.jdom.JDOMException; 047 048 import java.io.IOException; 049 import java.io.StringReader; 050 051 public class CoordActionStartCommand extends CoordinatorCommand<Void> { 052 053 public static final String EL_ERROR = "EL_ERROR"; 054 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR"; 055 public static final String COULD_NOT_START = "COULD_NOT_START"; 056 public static final String START_DATA_MISSING = "START_DATA_MISSING"; 057 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 058 059 private final XLog log = XLog.getLog(getClass()); 060 private String actionId = null; 061 private String user = null; 062 private String authToken = null; 063 private CoordinatorActionBean coordAction = null; 064 065 public CoordActionStartCommand(String id, String user, String token) { 066 super("coord_action_start", "coord_action_start", 1, XLog.OPS); 067 this.actionId = ParamChecker.notEmpty(id, "id"); 068 this.user = ParamChecker.notEmpty(user, "user"); 069 this.authToken = ParamChecker.notEmpty(token, "token"); 070 } 071 072 /** 073 * Create config to pass to WF Engine 1. Get createdConf from coord_actions table 2. Get actionXml from 074 * coord_actions table. Extract all 'property' tags and merge createdConf (overwrite duplicate keys). 3. Extract 075 * 'app-path' from actionXML. Create a new property called 'oozie.wf.application.path' and merge with createdConf 076 * (overwrite duplicate keys) 4. Read contents of config-default.xml in workflow directory. 5. Merge createdConf 077 * with config-default.xml (overwrite duplicate keys). 6. Results is runConf which is saved in coord_actions table. 078 * Merge Action createdConf with actionXml to create new runConf with replaced variables 079 * 080 * @param action CoordinatorActionBean 081 * @return Configuration 082 * @throws CommandException 083 */ 084 private Configuration mergeConfig(CoordinatorActionBean action) throws CommandException { 085 String createdConf = action.getCreatedConf(); 086 String actionXml = action.getActionXml(); 087 Element workflowProperties = null; 088 try { 089 workflowProperties = XmlUtils.parseXml(actionXml); 090 } 091 catch (JDOMException e1) { 092 log.warn("Configuration parse error in:" + actionXml); 093 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 094 } 095 // generate the 'runConf' for this action 096 // Step 1: runConf = createdConf 097 Configuration runConf = null; 098 try { 099 runConf = new XConfiguration(new StringReader(createdConf)); 100 } 101 catch (IOException e1) { 102 log.warn("Configuration parse error in:" + createdConf); 103 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 104 } 105 // Step 2: Merge local properties into runConf 106 // extract 'property' tags under 'configuration' block in the 107 // coordinator.xml (saved in actionxml column) 108 // convert Element to XConfiguration 109 Element configElement = (Element) workflowProperties.getChild("action", workflowProperties.getNamespace()) 110 .getChild("workflow", workflowProperties.getNamespace()).getChild("configuration", 111 workflowProperties.getNamespace()); 112 if (configElement != null) { 113 String strConfig = XmlUtils.prettyPrint(configElement).toString(); 114 Configuration localConf; 115 try { 116 localConf = new XConfiguration(new StringReader(strConfig)); 117 } 118 catch (IOException e1) { 119 log.warn("Configuration parse error in:" + strConfig); 120 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 121 } 122 123 // copy configuration properties in coordinator.xml to the runConf 124 XConfiguration.copy(localConf, runConf); 125 } 126 127 // Step 3: Extract value of 'app-path' in actionxml, and save it as a 128 // new property called 'oozie.wf.application.path' 129 // WF Engine requires the path to the workflow.xml to be saved under 130 // this property name 131 String appPath = workflowProperties.getChild("action", workflowProperties.getNamespace()).getChild("workflow", 132 workflowProperties.getNamespace()).getChild("app-path", workflowProperties.getNamespace()).getValue(); 133 runConf.set("oozie.wf.application.path", appPath); 134 return runConf; 135 } 136 137 @Override 138 protected Void call(CoordinatorStore store) throws StoreException, CommandException { 139 boolean makeFail = true; 140 String errCode = ""; 141 String errMsg = ""; 142 ParamChecker.notEmpty(user, "user"); 143 ParamChecker.notEmpty(authToken, "authToken"); 144 145 // CoordinatorActionBean coordAction = store.getCoordinatorAction(id, true); 146 log.debug("actionid=" + actionId + ", status=" + coordAction.getStatus()); 147 if (coordAction.getStatus() == CoordinatorAction.Status.SUBMITTED) { 148 // log.debug("getting.. job id: " + coordAction.getJobId()); 149 // create merged runConf to pass to WF Engine 150 Configuration runConf = mergeConfig(coordAction); 151 coordAction.setRunConf(XmlUtils.prettyPrint(runConf).toString()); 152 // log.debug("%%% merged runconf=" + XmlUtils.prettyPrint(runConf).toString()); 153 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, authToken); 154 try { 155 boolean startJob = true; 156 Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf())); 157 SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), store, Status.STARTED, 158 SlaAppType.COORDINATOR_ACTION); 159 160 // Normalize workflow appPath here; 161 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); 162 163 String wfId = dagEngine.submitJob(conf, startJob); 164 coordAction.setStatus(CoordinatorAction.Status.RUNNING); 165 coordAction.setExternalId(wfId); 166 store.updateCoordinatorAction(coordAction); 167 168 log.debug("Updating WF record for WFID :" + wfId + " with parent id: " + actionId); 169 WorkflowStore wfStore = Services.get().get(WorkflowStoreService.class).create(store); 170 WorkflowJobBean wfJob = wfStore.getWorkflow(wfId, false); 171 wfJob.setParentId(actionId); 172 wfStore.updateWorkflow(wfJob); 173 174 makeFail = false; 175 } 176 catch (StoreException se) { 177 makeFail = false; 178 throw se; 179 } 180 catch (DagEngineException dee) { 181 errMsg = dee.getMessage(); 182 errCode = "E1005"; 183 log.warn("can not create DagEngine for submitting jobs", dee); 184 } 185 catch (CommandException ce) { 186 errMsg = ce.getMessage(); 187 errCode = ce.getErrorCode().toString(); 188 log.warn("command exception occured ", ce); 189 } 190 catch (java.io.IOException ioe) { 191 errMsg = ioe.getMessage(); 192 errCode = "E1005"; 193 log.warn("Configuration parse error. read from DB :" + coordAction.getRunConf(), ioe); 194 } 195 catch (Exception ex) { 196 errMsg = ex.getMessage(); 197 errCode = "E1005"; 198 log.warn("can not create DagEngine for submitting jobs", ex); 199 } 200 finally { 201 if (makeFail == true) { // No DB exception occurs 202 log.warn("Failing the action " + coordAction.getId() + ". Because " + errCode + " : " + errMsg); 203 coordAction.setStatus(CoordinatorAction.Status.FAILED); 204 if (errMsg.length() > 254) { // Because table column size is 255 205 errMsg = errMsg.substring(0, 255); 206 } 207 coordAction.setErrorMessage(errMsg); 208 coordAction.setErrorCode(errCode); 209 store.updateCoordinatorAction(coordAction); 210 queueCallable(new CoordActionReadyCommand(coordAction.getJobId())); 211 } 212 } 213 } 214 return null; 215 } 216 217 @Override 218 protected Void execute(CoordinatorStore store) throws StoreException, CommandException { 219 log.info("STARTED CoordActionStartCommand actionId=" + actionId); 220 try { 221 coordAction = store.getEntityManager().find(CoordinatorActionBean.class, actionId); 222 setLogInfo(coordAction); 223 if (lock(coordAction.getJobId())) { 224 call(store); 225 } 226 else { 227 queueCallable(new CoordActionStartCommand(actionId, user, authToken), LOCK_FAILURE_REQUEUE_INTERVAL); 228 log.warn("CoordActionStartCommand lock was not acquired - failed jobId=" + coordAction.getJobId() 229 + ", actionId=" + actionId + ". Requeing the same."); 230 } 231 } 232 catch (InterruptedException e) { 233 queueCallable(new CoordActionStartCommand(actionId, user, authToken), LOCK_FAILURE_REQUEUE_INTERVAL); 234 log.warn("CoordActionStartCommand lock acquiring failed with exception " + e.getMessage() + " for jobId=" 235 + coordAction.getJobId() + ", actionId=" + actionId + " Requeing the same."); 236 } 237 finally { 238 log.info("ENDED CoordActionStartCommand actionId=" + actionId); 239 } 240 return null; 241 } 242 }