001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie.command.coord; 020 021import org.apache.hadoop.conf.Configuration; 022import org.apache.oozie.action.hadoop.OozieJobInfo; 023import org.apache.oozie.client.CoordinatorAction; 024import org.apache.oozie.client.OozieClient; 025import org.apache.oozie.CoordinatorActionBean; 026import org.apache.oozie.DagEngineException; 027import org.apache.oozie.DagEngine; 028import org.apache.oozie.ErrorCode; 029import org.apache.oozie.SLAEventBean; 030import org.apache.oozie.WorkflowJobBean; 031import org.apache.oozie.command.CommandException; 032import org.apache.oozie.command.PreconditionException; 033import org.apache.oozie.service.DagEngineService; 034import org.apache.oozie.service.EventHandlerService; 035import org.apache.oozie.service.JPAService; 036import org.apache.oozie.service.Services; 037import org.apache.oozie.util.JobUtils; 038import org.apache.oozie.util.LogUtils; 039import org.apache.oozie.util.ParamChecker; 040import org.apache.oozie.util.XLog; 041import org.apache.oozie.util.XmlUtils; 042import org.apache.oozie.util.XConfiguration; 043import org.apache.oozie.util.db.SLADbOperations; 044import org.apache.oozie.client.SLAEvent.SlaAppType; 045import org.apache.oozie.client.SLAEvent.Status; 046import org.apache.oozie.client.rest.JsonBean; 047import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 048import org.apache.oozie.executor.jpa.BatchQueryExecutor; 049import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 050import org.apache.oozie.executor.jpa.JPAExecutorException; 051import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 052import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 053import org.jdom.Element; 054import org.jdom.JDOMException; 055 056import java.io.IOException; 057import java.io.StringReader; 058import java.util.ArrayList; 059import java.util.Date; 060import java.util.List; 061 062@SuppressWarnings("deprecation") 063public class CoordActionStartXCommand extends CoordinatorXCommand<Void> { 064 065 public static final String EL_ERROR = "EL_ERROR"; 066 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR"; 067 public static final String COULD_NOT_START = "COULD_NOT_START"; 068 public static final String START_DATA_MISSING = "START_DATA_MISSING"; 069 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 070 public static final String OOZIE_COORD_ACTION_NOMINAL_TIME = "oozie.coord.action.nominal_time"; 071 072 private final XLog log = getLog(); 073 private String actionId = null; 074 private String user = null; 075 private String appName = null; 076 private CoordinatorActionBean coordAction = null; 077 private JPAService jpaService = null; 078 private String jobId = null; 079 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 080 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 081 082 public CoordActionStartXCommand(String id, String user, String appName, String jobId) { 083 //super("coord_action_start", "coord_action_start", 1, XLog.OPS); 084 super("coord_action_start", "coord_action_start", 1); 085 this.actionId = ParamChecker.notEmpty(id, "id"); 086 this.user = ParamChecker.notEmpty(user, "user"); 087 this.appName = ParamChecker.notEmpty(appName, "appName"); 088 this.jobId = jobId; 089 } 090 091 @Override 092 protected void setLogInfo() { 093 LogUtils.setLogInfo(actionId); 094 } 095 096 /** 097 * Create config to pass to WF Engine 1. Get createdConf from coord_actions table 2. Get actionXml from 098 * coord_actions table. Extract all 'property' tags and merge createdConf (overwrite duplicate keys). 3. Extract 099 * 'app-path' from actionXML. Create a new property called 'oozie.wf.application.path' and merge with createdConf 100 * (overwrite duplicate keys) 4. Read contents of config-default.xml in workflow directory. 5. Merge createdConf 101 * with config-default.xml (overwrite duplicate keys). 6. Results is runConf which is saved in coord_actions table. 102 * Merge Action createdConf with actionXml to create new runConf with replaced variables 103 * 104 * @param action CoordinatorActionBean 105 * @return Configuration 106 * @throws CommandException 107 */ 108 private Configuration mergeConfig(CoordinatorActionBean action) throws CommandException { 109 String createdConf = action.getCreatedConf(); 110 String actionXml = action.getActionXml(); 111 Element workflowProperties = null; 112 try { 113 workflowProperties = XmlUtils.parseXml(actionXml); 114 } 115 catch (JDOMException e1) { 116 log.warn("Configuration parse error in:" + actionXml); 117 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 118 } 119 // generate the 'runConf' for this action 120 // Step 1: runConf = createdConf 121 Configuration runConf = null; 122 try { 123 runConf = new XConfiguration(new StringReader(createdConf)); 124 125 } 126 catch (IOException e1) { 127 log.warn("Configuration parse error in:" + createdConf); 128 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 129 } 130 // Step 2: Merge local properties into runConf 131 // extract 'property' tags under 'configuration' block in the 132 // coordinator.xml (saved in actionxml column) 133 // convert Element to XConfiguration 134 Element configElement = workflowProperties.getChild("action", workflowProperties.getNamespace()) 135 .getChild("workflow", workflowProperties.getNamespace()).getChild("configuration", 136 workflowProperties.getNamespace()); 137 if (configElement != null) { 138 String strConfig = XmlUtils.prettyPrint(configElement).toString(); 139 Configuration localConf; 140 try { 141 localConf = new XConfiguration(new StringReader(strConfig)); 142 } 143 catch (IOException e1) { 144 log.warn("Configuration parse error in:" + strConfig); 145 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 146 } 147 148 // copy configuration properties in coordinator.xml to the runConf 149 XConfiguration.copy(localConf, runConf); 150 } 151 152 // Step 3: Extract value of 'app-path' in actionxml, and save it as a 153 // new property called 'oozie.wf.application.path' 154 // WF Engine requires the path to the workflow.xml to be saved under 155 // this property name 156 String appPath = workflowProperties.getChild("action", workflowProperties.getNamespace()) 157 .getChild("workflow", workflowProperties.getNamespace()).getChild("app-path", 158 workflowProperties.getNamespace()).getValue(); 159 160 // Copying application path in runconf. 161 runConf.set("oozie.wf.application.path", appPath); 162 163 // Step 4: Extract the runconf and copy the rerun config to runconf. 164 if (runConf.get(CoordRerunXCommand.RERUN_CONF) != null) { 165 Configuration rerunConf = null; 166 try { 167 rerunConf = new XConfiguration(new StringReader(runConf.get(CoordRerunXCommand.RERUN_CONF))); 168 XConfiguration.copy(rerunConf, runConf); 169 } catch (IOException e) { 170 log.warn("Configuration parse error in:" + rerunConf); 171 throw new CommandException(ErrorCode.E1005, e.getMessage(), e); 172 } 173 runConf.unset(CoordRerunXCommand.RERUN_CONF); 174 } 175 return runConf; 176 } 177 178 @Override 179 protected Void execute() throws CommandException { 180 boolean makeFail = true; 181 String errCode = ""; 182 String errMsg = ""; 183 ParamChecker.notEmpty(user, "user"); 184 185 log.debug("actionid=" + actionId + ", status=" + coordAction.getStatus()); 186 if (coordAction.getStatus() == CoordinatorAction.Status.SUBMITTED) { 187 // log.debug("getting.. job id: " + coordAction.getJobId()); 188 // create merged runConf to pass to WF Engine 189 Configuration runConf = mergeConfig(coordAction); 190 coordAction.setRunConf(XmlUtils.prettyPrint(runConf).toString()); 191 // log.debug("%%% merged runconf=" + 192 // XmlUtils.prettyPrint(runConf).toString()); 193 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); 194 try { 195 Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf())); 196 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED, 197 SlaAppType.COORDINATOR_ACTION, log); 198 if(slaEvent != null) { 199 insertList.add(slaEvent); 200 } 201 if (OozieJobInfo.isJobInfoEnabled()) { 202 conf.set(OozieJobInfo.COORD_ID, actionId); 203 conf.set(OozieJobInfo.COORD_NAME, appName); 204 conf.set(OozieJobInfo.COORD_NOMINAL_TIME, coordAction.getNominalTimestamp().toString()); 205 } 206 // Normalize workflow appPath here; 207 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); 208 if (coordAction.getExternalId() != null) { 209 conf.setBoolean(OozieClient.RERUN_FAIL_NODES, true); 210 dagEngine.reRun(coordAction.getExternalId(), conf); 211 } else { 212 // Pushing the nominal time in conf to use for launcher tag search 213 conf.set(OOZIE_COORD_ACTION_NOMINAL_TIME,String.valueOf(coordAction.getNominalTime().getTime())); 214 String wfId = dagEngine.submitJobFromCoordinator(conf, actionId); 215 coordAction.setExternalId(wfId); 216 } 217 coordAction.setStatus(CoordinatorAction.Status.RUNNING); 218 coordAction.incrementAndGetPending(); 219 220 //store.updateCoordinatorAction(coordAction); 221 JPAService jpaService = Services.get().get(JPAService.class); 222 if (jpaService != null) { 223 log.debug("Updating WF record for WFID :" + coordAction.getExternalId() + " with parent id: " + actionId); 224 WorkflowJobBean wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STARTTIME, coordAction.getExternalId()); 225 wfJob.setParentId(actionId); 226 wfJob.setLastModifiedTime(new Date()); 227 BatchQueryExecutor executor = BatchQueryExecutor.getInstance(); 228 updateList.add(new UpdateEntry<WorkflowJobQuery>( 229 WorkflowJobQuery.UPDATE_WORKFLOW_PARENT_MODIFIED, wfJob)); 230 updateList.add(new UpdateEntry<CoordActionQuery>( 231 CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction)); 232 try { 233 executor.executeBatchInsertUpdateDelete(insertList, updateList, null); 234 queue(new CoordActionNotificationXCommand(coordAction), 100); 235 if (EventHandlerService.isEnabled()) { 236 generateEvent(coordAction, user, appName, wfJob.getStartTime()); 237 } 238 } 239 catch (JPAExecutorException je) { 240 throw new CommandException(je); 241 } 242 } 243 else { 244 log.error(ErrorCode.E0610); 245 } 246 247 makeFail = false; 248 } 249 catch (DagEngineException dee) { 250 errMsg = dee.getMessage(); 251 errCode = dee.getErrorCode().toString(); 252 log.warn("can not create DagEngine for submitting jobs", dee); 253 } 254 catch (CommandException ce) { 255 errMsg = ce.getMessage(); 256 errCode = ce.getErrorCode().toString(); 257 log.warn("command exception occured ", ce); 258 } 259 catch (java.io.IOException ioe) { 260 errMsg = ioe.getMessage(); 261 errCode = "E1005"; 262 log.warn("Configuration parse error. read from DB :" + coordAction.getRunConf(), ioe); 263 } 264 catch (Exception ex) { 265 errMsg = ex.getMessage(); 266 errCode = "E1005"; 267 log.warn("can not create DagEngine for submitting jobs", ex); 268 } 269 finally { 270 if (makeFail == true) { // No DB exception occurs 271 log.error("Failing the action " + coordAction.getId() + ". Because " + errCode + " : " + errMsg); 272 coordAction.setStatus(CoordinatorAction.Status.FAILED); 273 if (errMsg.length() > 254) { // Because table column size is 255 274 errMsg = errMsg.substring(0, 255); 275 } 276 coordAction.setErrorMessage(errMsg); 277 coordAction.setErrorCode(errCode); 278 279 updateList = new ArrayList<UpdateEntry>(); 280 updateList.add(new UpdateEntry<CoordActionQuery>( 281 CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction)); 282 insertList = new ArrayList<JsonBean>(); 283 284 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.FAILED, 285 SlaAppType.COORDINATOR_ACTION, log); 286 if(slaEvent != null) { 287 insertList.add(slaEvent); //Update SLA events 288 } 289 try { 290 // call JPAExecutor to do the bulk writes 291 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 292 if (EventHandlerService.isEnabled()) { 293 generateEvent(coordAction, user, appName, null); 294 } 295 } 296 catch (JPAExecutorException je) { 297 throw new CommandException(je); 298 } 299 queue(new CoordActionReadyXCommand(coordAction.getJobId())); 300 } 301 } 302 } 303 return null; 304 } 305 306 @Override 307 public String getEntityKey() { 308 return this.jobId; 309 } 310 311 @Override 312 protected boolean isLockRequired() { 313 return true; 314 } 315 316 @Override 317 protected void loadState() throws CommandException { 318 jpaService = Services.get().get(JPAService.class); 319 try { 320 coordAction = jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionGetForStartJPAExecutor( 321 actionId)); 322 } 323 catch (JPAExecutorException je) { 324 throw new CommandException(je); 325 } 326 LogUtils.setLogInfo(coordAction); 327 } 328 329 @Override 330 protected void verifyPrecondition() throws PreconditionException { 331 if (coordAction.getStatus() != CoordinatorAction.Status.SUBMITTED) { 332 throw new PreconditionException(ErrorCode.E1100, "The coord action [" + actionId + "] must have status " 333 + CoordinatorAction.Status.SUBMITTED.name() + " but has status [" + coordAction.getStatus().name() + "]"); 334 } 335 } 336 337 @Override 338 public String getKey(){ 339 return getName() + "_" + actionId; 340 } 341}