001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie.command.coord; 019 020import org.apache.hadoop.conf.Configuration; 021import org.apache.oozie.action.hadoop.OozieJobInfo; 022import org.apache.oozie.client.CoordinatorAction; 023import org.apache.oozie.client.OozieClient; 024import org.apache.oozie.CoordinatorActionBean; 025import org.apache.oozie.DagEngineException; 026import org.apache.oozie.DagEngine; 027import org.apache.oozie.ErrorCode; 028import org.apache.oozie.SLAEventBean; 029import org.apache.oozie.WorkflowJobBean; 030import org.apache.oozie.command.CommandException; 031import org.apache.oozie.command.PreconditionException; 032import org.apache.oozie.service.DagEngineService; 033import org.apache.oozie.service.EventHandlerService; 034import org.apache.oozie.service.JPAService; 035import org.apache.oozie.service.Services; 036import org.apache.oozie.util.JobUtils; 037import org.apache.oozie.util.LogUtils; 038import org.apache.oozie.util.ParamChecker; 039import org.apache.oozie.util.XLog; 040import org.apache.oozie.util.XmlUtils; 041import org.apache.oozie.util.XConfiguration; 042import org.apache.oozie.util.db.SLADbOperations; 043import org.apache.oozie.client.SLAEvent.SlaAppType; 044import org.apache.oozie.client.SLAEvent.Status; 045import org.apache.oozie.client.rest.JsonBean; 046import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; 047import org.apache.oozie.executor.jpa.BatchQueryExecutor; 048import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; 049import org.apache.oozie.executor.jpa.JPAExecutorException; 050import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; 051import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; 052import org.jdom.Element; 053import org.jdom.JDOMException; 054 055import java.io.IOException; 056import java.io.StringReader; 057import java.util.ArrayList; 058import java.util.Date; 059import java.util.HashMap; 060import java.util.List; 061import java.util.Map; 062 063@SuppressWarnings("deprecation") 064public class CoordActionStartXCommand extends CoordinatorXCommand<Void> { 065 066 public static final String EL_ERROR = "EL_ERROR"; 067 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR"; 068 public static final String COULD_NOT_START = "COULD_NOT_START"; 069 public static final String START_DATA_MISSING = "START_DATA_MISSING"; 070 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 071 072 private final XLog log = getLog(); 073 private String actionId = null; 074 private String user = null; 075 private String appName = null; 076 private CoordinatorActionBean coordAction = null; 077 private JPAService jpaService = null; 078 private String jobId = null; 079 private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); 080 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 081 082 public CoordActionStartXCommand(String id, String user, String appName, String jobId) { 083 //super("coord_action_start", "coord_action_start", 1, XLog.OPS); 084 super("coord_action_start", "coord_action_start", 1); 085 this.actionId = ParamChecker.notEmpty(id, "id"); 086 this.user = ParamChecker.notEmpty(user, "user"); 087 this.appName = ParamChecker.notEmpty(appName, "appName"); 088 this.jobId = jobId; 089 } 090 091 /** 092 * Create config to pass to WF Engine 1. Get createdConf from coord_actions table 2. Get actionXml from 093 * coord_actions table. Extract all 'property' tags and merge createdConf (overwrite duplicate keys). 3. Extract 094 * 'app-path' from actionXML. Create a new property called 'oozie.wf.application.path' and merge with createdConf 095 * (overwrite duplicate keys) 4. Read contents of config-default.xml in workflow directory. 5. Merge createdConf 096 * with config-default.xml (overwrite duplicate keys). 6. Results is runConf which is saved in coord_actions table. 097 * Merge Action createdConf with actionXml to create new runConf with replaced variables 098 * 099 * @param action CoordinatorActionBean 100 * @return Configuration 101 * @throws CommandException 102 */ 103 private Configuration mergeConfig(CoordinatorActionBean action) throws CommandException { 104 String createdConf = action.getCreatedConf(); 105 String actionXml = action.getActionXml(); 106 Element workflowProperties = null; 107 try { 108 workflowProperties = XmlUtils.parseXml(actionXml); 109 } 110 catch (JDOMException e1) { 111 log.warn("Configuration parse error in:" + actionXml); 112 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 113 } 114 // generate the 'runConf' for this action 115 // Step 1: runConf = createdConf 116 Configuration runConf = null; 117 try { 118 runConf = new XConfiguration(new StringReader(createdConf)); 119 } 120 catch (IOException e1) { 121 log.warn("Configuration parse error in:" + createdConf); 122 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 123 } 124 // Step 2: Merge local properties into runConf 125 // extract 'property' tags under 'configuration' block in the 126 // coordinator.xml (saved in actionxml column) 127 // convert Element to XConfiguration 128 Element configElement = workflowProperties.getChild("action", workflowProperties.getNamespace()) 129 .getChild("workflow", workflowProperties.getNamespace()).getChild("configuration", 130 workflowProperties.getNamespace()); 131 if (configElement != null) { 132 String strConfig = XmlUtils.prettyPrint(configElement).toString(); 133 Configuration localConf; 134 try { 135 localConf = new XConfiguration(new StringReader(strConfig)); 136 } 137 catch (IOException e1) { 138 log.warn("Configuration parse error in:" + strConfig); 139 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 140 } 141 142 // copy configuration properties in coordinator.xml to the runConf 143 XConfiguration.copy(localConf, runConf); 144 } 145 146 // Step 3: Extract value of 'app-path' in actionxml, and save it as a 147 // new property called 'oozie.wf.application.path' 148 // WF Engine requires the path to the workflow.xml to be saved under 149 // this property name 150 String appPath = workflowProperties.getChild("action", workflowProperties.getNamespace()).getChild("workflow", 151 workflowProperties.getNamespace()).getChild("app-path", workflowProperties.getNamespace()).getValue(); 152 runConf.set("oozie.wf.application.path", appPath); 153 return runConf; 154 } 155 156 @Override 157 protected Void execute() throws CommandException { 158 boolean makeFail = true; 159 String errCode = ""; 160 String errMsg = ""; 161 ParamChecker.notEmpty(user, "user"); 162 163 log.debug("actionid=" + actionId + ", status=" + coordAction.getStatus()); 164 if (coordAction.getStatus() == CoordinatorAction.Status.SUBMITTED) { 165 // log.debug("getting.. job id: " + coordAction.getJobId()); 166 // create merged runConf to pass to WF Engine 167 Configuration runConf = mergeConfig(coordAction); 168 coordAction.setRunConf(XmlUtils.prettyPrint(runConf).toString()); 169 // log.debug("%%% merged runconf=" + 170 // XmlUtils.prettyPrint(runConf).toString()); 171 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user); 172 try { 173 Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf())); 174 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED, 175 SlaAppType.COORDINATOR_ACTION, log); 176 if(slaEvent != null) { 177 insertList.add(slaEvent); 178 } 179 if (OozieJobInfo.isJobInfoEnabled()) { 180 conf.set(OozieJobInfo.COORD_NAME, appName); 181 conf.set(OozieJobInfo.COORD_NOMINAL_TIME, coordAction.getNominalTimestamp().toString()); 182 } 183 // Normalize workflow appPath here; 184 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); 185 String wfId = dagEngine.submitJobFromCoordinator(conf, actionId); 186 coordAction.setStatus(CoordinatorAction.Status.RUNNING); 187 coordAction.setExternalId(wfId); 188 coordAction.incrementAndGetPending(); 189 190 //store.updateCoordinatorAction(coordAction); 191 JPAService jpaService = Services.get().get(JPAService.class); 192 if (jpaService != null) { 193 log.debug("Updating WF record for WFID :" + wfId + " with parent id: " + actionId); 194 WorkflowJobBean wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STARTTIME, wfId); 195 wfJob.setParentId(actionId); 196 wfJob.setLastModifiedTime(new Date()); 197 BatchQueryExecutor executor = BatchQueryExecutor.getInstance(); 198 updateList.add(new UpdateEntry<WorkflowJobQuery>( 199 WorkflowJobQuery.UPDATE_WORKFLOW_PARENT_MODIFIED, wfJob)); 200 updateList.add(new UpdateEntry<CoordActionQuery>( 201 CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction)); 202 try { 203 executor.executeBatchInsertUpdateDelete(insertList, updateList, null); 204 if (EventHandlerService.isEnabled()) { 205 generateEvent(coordAction, user, appName, wfJob.getStartTime()); 206 } 207 } 208 catch (JPAExecutorException je) { 209 throw new CommandException(je); 210 } 211 } 212 else { 213 log.error(ErrorCode.E0610); 214 } 215 216 makeFail = false; 217 } 218 catch (DagEngineException dee) { 219 errMsg = dee.getMessage(); 220 errCode = dee.getErrorCode().toString(); 221 log.warn("can not create DagEngine for submitting jobs", dee); 222 } 223 catch (CommandException ce) { 224 errMsg = ce.getMessage(); 225 errCode = ce.getErrorCode().toString(); 226 log.warn("command exception occured ", ce); 227 } 228 catch (java.io.IOException ioe) { 229 errMsg = ioe.getMessage(); 230 errCode = "E1005"; 231 log.warn("Configuration parse error. read from DB :" + coordAction.getRunConf(), ioe); 232 } 233 catch (Exception ex) { 234 errMsg = ex.getMessage(); 235 errCode = "E1005"; 236 log.warn("can not create DagEngine for submitting jobs", ex); 237 } 238 finally { 239 if (makeFail == true) { // No DB exception occurs 240 log.error("Failing the action " + coordAction.getId() + ". Because " + errCode + " : " + errMsg); 241 coordAction.setStatus(CoordinatorAction.Status.FAILED); 242 if (errMsg.length() > 254) { // Because table column size is 255 243 errMsg = errMsg.substring(0, 255); 244 } 245 coordAction.setErrorMessage(errMsg); 246 coordAction.setErrorCode(errCode); 247 248 updateList = new ArrayList<UpdateEntry>(); 249 updateList.add(new UpdateEntry<CoordActionQuery>( 250 CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction)); 251 insertList = new ArrayList<JsonBean>(); 252 253 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.FAILED, 254 SlaAppType.COORDINATOR_ACTION, log); 255 if(slaEvent != null) { 256 insertList.add(slaEvent); //Update SLA events 257 } 258 try { 259 // call JPAExecutor to do the bulk writes 260 BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); 261 if (EventHandlerService.isEnabled()) { 262 generateEvent(coordAction, user, appName, null); 263 } 264 } 265 catch (JPAExecutorException je) { 266 throw new CommandException(je); 267 } 268 queue(new CoordActionReadyXCommand(coordAction.getJobId())); 269 } 270 } 271 } 272 return null; 273 } 274 275 @Override 276 public String getEntityKey() { 277 return this.jobId; 278 } 279 280 @Override 281 protected boolean isLockRequired() { 282 return true; 283 } 284 285 @Override 286 protected void loadState() throws CommandException { 287 jpaService = Services.get().get(JPAService.class); 288 try { 289 coordAction = jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionGetForStartJPAExecutor( 290 actionId)); 291 } 292 catch (JPAExecutorException je) { 293 throw new CommandException(je); 294 } 295 LogUtils.setLogInfo(coordAction, logInfo); 296 } 297 298 @Override 299 protected void verifyPrecondition() throws PreconditionException { 300 if (coordAction.getStatus() != CoordinatorAction.Status.SUBMITTED) { 301 throw new PreconditionException(ErrorCode.E1100, "The coord action [" + actionId + "] must have status " 302 + CoordinatorAction.Status.SUBMITTED.name() + " but has status [" + coordAction.getStatus().name() + "]"); 303 } 304 } 305 306 @Override 307 public String getKey(){ 308 return getName() + "_" + actionId; 309 } 310}