001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import org.apache.hadoop.conf.Configuration; 021 022 import org.apache.oozie.client.CoordinatorAction; 023 import org.apache.oozie.client.OozieClient; 024 import org.apache.oozie.CoordinatorActionBean; 025 import org.apache.oozie.DagEngineException; 026 import org.apache.oozie.DagEngine; 027 import org.apache.oozie.ErrorCode; 028 import org.apache.oozie.SLAEventBean; 029 import org.apache.oozie.WorkflowJobBean; 030 import org.apache.oozie.command.CommandException; 031 import org.apache.oozie.command.PreconditionException; 032 import org.apache.oozie.service.DagEngineService; 033 import org.apache.oozie.service.EventHandlerService; 034 import org.apache.oozie.service.JPAService; 035 import org.apache.oozie.service.Services; 036 import org.apache.oozie.util.JobUtils; 037 import org.apache.oozie.util.LogUtils; 038 import org.apache.oozie.util.ParamChecker; 039 import org.apache.oozie.util.XLog; 040 import org.apache.oozie.util.XmlUtils; 041 import org.apache.oozie.util.XConfiguration; 042 import org.apache.oozie.util.db.SLADbOperations; 043 import org.apache.oozie.client.SLAEvent.SlaAppType; 044 import org.apache.oozie.client.SLAEvent.Status; 045 import org.apache.oozie.client.rest.JsonBean; 046 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStartJPAExecutor; 047 import org.apache.oozie.executor.jpa.JPAExecutorException; 048 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 049 import org.jdom.Element; 050 import org.jdom.JDOMException; 051 052 import java.io.IOException; 053 import java.io.StringReader; 054 import java.util.ArrayList; 055 import java.util.Date; 056 import java.util.List; 057 058 @SuppressWarnings("deprecation") 059 public class CoordActionStartXCommand extends CoordinatorXCommand<Void> { 060 061 public static final String EL_ERROR = "EL_ERROR"; 062 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR"; 063 public static final String COULD_NOT_START = "COULD_NOT_START"; 064 public static final String START_DATA_MISSING = "START_DATA_MISSING"; 065 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 066 067 private final XLog log = getLog(); 068 private String actionId = null; 069 private String user = null; 070 private String appName = null; 071 private CoordinatorActionBean coordAction = null; 072 private JPAService jpaService = null; 073 private String jobId = null; 074 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 075 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 076 077 public CoordActionStartXCommand(String id, String user, String appName, String jobId) { 078 //super("coord_action_start", "coord_action_start", 1, XLog.OPS); 079 super("coord_action_start", "coord_action_start", 1); 080 this.actionId = ParamChecker.notEmpty(id, "id"); 081 this.user = ParamChecker.notEmpty(user, "user"); 082 this.appName = ParamChecker.notEmpty(appName, "appName"); 083 this.jobId = jobId; 084 } 085 086 /** 087 * Create config to pass to WF Engine 1. Get createdConf from coord_actions table 2. Get actionXml from 088 * coord_actions table. Extract all 'property' tags and merge createdConf (overwrite duplicate keys). 3. Extract 089 * 'app-path' from actionXML. Create a new property called 'oozie.wf.application.path' and merge with createdConf 090 * (overwrite duplicate keys) 4. Read contents of config-default.xml in workflow directory. 5. Merge createdConf 091 * with config-default.xml (overwrite duplicate keys). 6. Results is runConf which is saved in coord_actions table. 092 * Merge Action createdConf with actionXml to create new runConf with replaced variables 093 * 094 * @param action CoordinatorActionBean 095 * @return Configuration 096 * @throws CommandException 097 */ 098 private Configuration mergeConfig(CoordinatorActionBean action) throws CommandException { 099 String createdConf = action.getCreatedConf(); 100 String actionXml = action.getActionXml(); 101 Element workflowProperties = null; 102 try { 103 workflowProperties = XmlUtils.parseXml(actionXml); 104 } 105 catch (JDOMException e1) { 106 log.warn("Configuration parse error in:" + actionXml); 107 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 108 } 109 // generate the 'runConf' for this action 110 // Step 1: runConf = createdConf 111 Configuration runConf = null; 112 try { 113 runConf = new XConfiguration(new StringReader(createdConf)); 114 } 115 catch (IOException e1) { 116 log.warn("Configuration parse error in:" + createdConf); 117 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 118 } 119 // Step 2: Merge local properties into runConf 120 // extract 'property' tags under 'configuration' block in the 121 // coordinator.xml (saved in actionxml column) 122 // convert Element to XConfiguration 123 Element configElement = workflowProperties.getChild("action", workflowProperties.getNamespace()) 124 .getChild("workflow", workflowProperties.getNamespace()).getChild("configuration", 125 workflowProperties.getNamespace()); 126 if (configElement != null) { 127 String strConfig = XmlUtils.prettyPrint(configElement).toString(); 128 Configuration localConf; 129 try { 130 localConf = new XConfiguration(new StringReader(strConfig)); 131 } 132 catch (IOException e1) { 133 log.warn("Configuration parse error in:" + strConfig); 134 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 135 } 136 137 // copy configuration properties in coordinator.xml to the runConf 138 XConfiguration.copy(localConf, runConf); 139 } 140 141 // Step 3: Extract value of 'app-path' in actionxml, and save it as a 142 // new property called 'oozie.wf.application.path' 143 // WF Engine requires the path to the workflow.xml to be saved under 144 // this property name 145 String appPath = workflowProperties.getChild("action", workflowProperties.getNamespace()).getChild("workflow", 146 workflowProperties.getNamespace()).getChild("app-path", workflowProperties.getNamespace()).getValue(); 147 runConf.set("oozie.wf.application.path", appPath); 148 return runConf; 149 } 150 151 @Override 152 protected Void execute() throws CommandException { 153 boolean makeFail = true; 154 String errCode = ""; 155 String errMsg = ""; 156 ParamChecker.notEmpty(user, "user"); 157 158 log.debug("actionid=" + actionId + ", status=" + coordAction.getStatus()); 159 if (coordAction.getStatus() == CoordinatorAction.Status.SUBMITTED) { 160 // log.debug("getting.. job id: " + coordAction.getJobId()); 161 // create merged runConf to pass to WF Engine 162 Configuration runConf = mergeConfig(coordAction); 163 coordAction.setRunConf(XmlUtils.prettyPrint(runConf).toString()); 164 // log.debug("%%% merged runconf=" + 165 // XmlUtils.prettyPrint(runConf).toString()); 166 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); 167 try { 168 Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf())); 169 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED, 170 SlaAppType.COORDINATOR_ACTION, log); 171 if(slaEvent != null) { 172 insertList.add(slaEvent); 173 } 174 175 // Normalize workflow appPath here; 176 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); 177 String wfId = dagEngine.submitJobFromCoordinator(conf, actionId); 178 coordAction.setStatus(CoordinatorAction.Status.RUNNING); 179 coordAction.setExternalId(wfId); 180 coordAction.incrementAndGetPending(); 181 182 //store.updateCoordinatorAction(coordAction); 183 JPAService jpaService = Services.get().get(JPAService.class); 184 if (jpaService != null) { 185 log.debug("Updating WF record for WFID :" + wfId + " with parent id: " + actionId); 186 WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfId)); 187 wfJob.setParentId(actionId); 188 wfJob.setLastModifiedTime(new Date()); 189 updateList.add(wfJob); 190 updateList.add(coordAction); 191 try { 192 jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList)); 193 if (EventHandlerService.isEnabled()) { 194 generateEvent(coordAction, user, appName, wfJob.getStartTime()); 195 } 196 } 197 catch (JPAExecutorException je) { 198 throw new CommandException(je); 199 } 200 } 201 else { 202 log.error(ErrorCode.E0610); 203 } 204 205 makeFail = false; 206 } 207 catch (DagEngineException dee) { 208 errMsg = dee.getMessage(); 209 errCode = dee.getErrorCode().toString(); 210 log.warn("can not create DagEngine for submitting jobs", dee); 211 } 212 catch (CommandException ce) { 213 errMsg = ce.getMessage(); 214 errCode = ce.getErrorCode().toString(); 215 log.warn("command exception occured ", ce); 216 } 217 catch (java.io.IOException ioe) { 218 errMsg = ioe.getMessage(); 219 errCode = "E1005"; 220 log.warn("Configuration parse error. read from DB :" + coordAction.getRunConf(), ioe); 221 } 222 catch (Exception ex) { 223 errMsg = ex.getMessage(); 224 errCode = "E1005"; 225 log.warn("can not create DagEngine for submitting jobs", ex); 226 } 227 finally { 228 if (makeFail == true) { // No DB exception occurs 229 log.error("Failing the action " + coordAction.getId() + ". Because " + errCode + " : " + errMsg); 230 coordAction.setStatus(CoordinatorAction.Status.FAILED); 231 if (errMsg.length() > 254) { // Because table column size is 255 232 errMsg = errMsg.substring(0, 255); 233 } 234 coordAction.setErrorMessage(errMsg); 235 coordAction.setErrorCode(errCode); 236 237 updateList = new ArrayList<JsonBean>(); 238 updateList.add(coordAction); 239 insertList = new ArrayList<JsonBean>(); 240 241 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.FAILED, 242 SlaAppType.COORDINATOR_ACTION, log); 243 if(slaEvent != null) { 244 insertList.add(slaEvent); //Update SLA events 245 } 246 try { 247 // call JPAExecutor to do the bulk writes 248 jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList)); 249 if (EventHandlerService.isEnabled()) { 250 generateEvent(coordAction, user, appName, null); 251 } 252 } 253 catch (JPAExecutorException je) { 254 throw new CommandException(je); 255 } 256 queue(new CoordActionReadyXCommand(coordAction.getJobId())); 257 } 258 } 259 } 260 return null; 261 } 262 263 @Override 264 public String getEntityKey() { 265 return this.jobId; 266 } 267 268 @Override 269 protected boolean isLockRequired() { 270 return true; 271 } 272 273 @Override 274 protected void loadState() throws CommandException { 275 eagerLoadState(); 276 } 277 278 @Override 279 protected void eagerLoadState() throws CommandException { 280 jpaService = Services.get().get(JPAService.class); 281 if (jpaService == null) { 282 throw new CommandException(ErrorCode.E0610); 283 } 284 285 try { 286 coordAction = jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionGetForStartJPAExecutor(actionId)); 287 } 288 catch (JPAExecutorException je) { 289 throw new CommandException(je); 290 } 291 LogUtils.setLogInfo(coordAction, logInfo); 292 } 293 294 @Override 295 protected void verifyPrecondition() throws PreconditionException { 296 if (coordAction.getStatus() != CoordinatorAction.Status.SUBMITTED) { 297 throw new PreconditionException(ErrorCode.E1100, "The coord action [" + actionId + "] must have status " 298 + CoordinatorAction.Status.SUBMITTED.name() + " but has status [" + coordAction.getStatus().name() + "]"); 299 } 300 } 301 302 @Override 303 public String getKey(){ 304 return getName() + "_" + actionId; 305 } 306 }