001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import org.apache.hadoop.conf.Configuration; 021 022 import org.apache.oozie.client.CoordinatorAction; 023 import org.apache.oozie.client.OozieClient; 024 import org.apache.oozie.CoordinatorActionBean; 025 import org.apache.oozie.DagEngineException; 026 import org.apache.oozie.DagEngine; 027 import org.apache.oozie.ErrorCode; 028 import org.apache.oozie.SLAEventBean; 029 import org.apache.oozie.WorkflowJobBean; 030 import org.apache.oozie.command.CommandException; 031 import org.apache.oozie.command.PreconditionException; 032 import org.apache.oozie.service.DagEngineService; 033 import org.apache.oozie.service.JPAService; 034 import org.apache.oozie.service.Services; 035 import org.apache.oozie.util.JobUtils; 036 import org.apache.oozie.util.LogUtils; 037 import org.apache.oozie.util.ParamChecker; 038 import org.apache.oozie.util.XLog; 039 import org.apache.oozie.util.XmlUtils; 040 import org.apache.oozie.util.XConfiguration; 041 import org.apache.oozie.util.db.SLADbOperations; 042 import org.apache.oozie.client.SLAEvent.SlaAppType; 043 import org.apache.oozie.client.SLAEvent.Status; 044 import org.apache.oozie.client.rest.JsonBean; 045 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStartJPAExecutor; 046 import org.apache.oozie.executor.jpa.JPAExecutorException; 047 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 048 import org.jdom.Element; 049 import org.jdom.JDOMException; 050 051 import java.io.IOException; 052 import java.io.StringReader; 053 import java.util.ArrayList; 054 import java.util.Date; 055 import java.util.List; 056 057 public class CoordActionStartXCommand extends CoordinatorXCommand<Void> { 058 059 public static final String EL_ERROR = "EL_ERROR"; 060 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR"; 061 public static final String COULD_NOT_START = "COULD_NOT_START"; 062 public static final String START_DATA_MISSING = "START_DATA_MISSING"; 063 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 064 065 private final XLog log = getLog(); 066 private String actionId = null; 067 private String user = null; 068 private String authToken = null; 069 private CoordinatorActionBean coordAction = null; 070 private JPAService jpaService = null; 071 private String jobId = null; 072 private List<JsonBean> updateList = new ArrayList<JsonBean>(); 073 private List<JsonBean> insertList = new ArrayList<JsonBean>(); 074 075 public CoordActionStartXCommand(String id, String user, String token, String jobId) { 076 //super("coord_action_start", "coord_action_start", 1, XLog.OPS); 077 super("coord_action_start", "coord_action_start", 1); 078 this.actionId = ParamChecker.notEmpty(id, "id"); 079 this.user = ParamChecker.notEmpty(user, "user"); 080 this.authToken = ParamChecker.notEmpty(token, "token"); 081 this.jobId = jobId; 082 } 083 084 /** 085 * Create config to pass to WF Engine 1. Get createdConf from coord_actions table 2. Get actionXml from 086 * coord_actions table. Extract all 'property' tags and merge createdConf (overwrite duplicate keys). 3. Extract 087 * 'app-path' from actionXML. Create a new property called 'oozie.wf.application.path' and merge with createdConf 088 * (overwrite duplicate keys) 4. Read contents of config-default.xml in workflow directory. 5. Merge createdConf 089 * with config-default.xml (overwrite duplicate keys). 6. Results is runConf which is saved in coord_actions table. 090 * Merge Action createdConf with actionXml to create new runConf with replaced variables 091 * 092 * @param action CoordinatorActionBean 093 * @return Configuration 094 * @throws CommandException 095 */ 096 private Configuration mergeConfig(CoordinatorActionBean action) throws CommandException { 097 String createdConf = action.getCreatedConf(); 098 String actionXml = action.getActionXml(); 099 Element workflowProperties = null; 100 try { 101 workflowProperties = XmlUtils.parseXml(actionXml); 102 } 103 catch (JDOMException e1) { 104 log.warn("Configuration parse error in:" + actionXml); 105 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 106 } 107 // generate the 'runConf' for this action 108 // Step 1: runConf = createdConf 109 Configuration runConf = null; 110 try { 111 runConf = new XConfiguration(new StringReader(createdConf)); 112 } 113 catch (IOException e1) { 114 log.warn("Configuration parse error in:" + createdConf); 115 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 116 } 117 // Step 2: Merge local properties into runConf 118 // extract 'property' tags under 'configuration' block in the 119 // coordinator.xml (saved in actionxml column) 120 // convert Element to XConfiguration 121 Element configElement = (Element) workflowProperties.getChild("action", workflowProperties.getNamespace()) 122 .getChild("workflow", workflowProperties.getNamespace()).getChild("configuration", 123 workflowProperties.getNamespace()); 124 if (configElement != null) { 125 String strConfig = XmlUtils.prettyPrint(configElement).toString(); 126 Configuration localConf; 127 try { 128 localConf = new XConfiguration(new StringReader(strConfig)); 129 } 130 catch (IOException e1) { 131 log.warn("Configuration parse error in:" + strConfig); 132 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 133 } 134 135 // copy configuration properties in coordinator.xml to the runConf 136 XConfiguration.copy(localConf, runConf); 137 } 138 139 // Step 3: Extract value of 'app-path' in actionxml, and save it as a 140 // new property called 'oozie.wf.application.path' 141 // WF Engine requires the path to the workflow.xml to be saved under 142 // this property name 143 String appPath = workflowProperties.getChild("action", workflowProperties.getNamespace()).getChild("workflow", 144 workflowProperties.getNamespace()).getChild("app-path", workflowProperties.getNamespace()).getValue(); 145 runConf.set("oozie.wf.application.path", appPath); 146 return runConf; 147 } 148 149 @Override 150 protected Void execute() throws CommandException { 151 boolean makeFail = true; 152 String errCode = ""; 153 String errMsg = ""; 154 ParamChecker.notEmpty(user, "user"); 155 ParamChecker.notEmpty(authToken, "authToken"); 156 157 log.debug("actionid=" + actionId + ", status=" + coordAction.getStatus()); 158 if (coordAction.getStatus() == CoordinatorAction.Status.SUBMITTED) { 159 // log.debug("getting.. job id: " + coordAction.getJobId()); 160 // create merged runConf to pass to WF Engine 161 Configuration runConf = mergeConfig(coordAction); 162 coordAction.setRunConf(XmlUtils.prettyPrint(runConf).toString()); 163 // log.debug("%%% merged runconf=" + 164 // XmlUtils.prettyPrint(runConf).toString()); 165 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, authToken); 166 try { 167 boolean startJob = true; 168 Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf())); 169 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED, 170 SlaAppType.COORDINATOR_ACTION, log); 171 if(slaEvent != null) { 172 insertList.add(slaEvent); 173 } 174 175 // Normalize workflow appPath here; 176 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); 177 String wfId = dagEngine.submitJob(conf, startJob); 178 coordAction.setStatus(CoordinatorAction.Status.RUNNING); 179 coordAction.setExternalId(wfId); 180 coordAction.incrementAndGetPending(); 181 182 //store.updateCoordinatorAction(coordAction); 183 JPAService jpaService = Services.get().get(JPAService.class); 184 if (jpaService != null) { 185 log.debug("Updating WF record for WFID :" + wfId + " with parent id: " + actionId); 186 WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfId)); 187 wfJob.setParentId(actionId); 188 wfJob.setLastModifiedTime(new Date()); 189 updateList.add(wfJob); 190 updateList.add(coordAction); 191 try { 192 jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList)); 193 } 194 catch (JPAExecutorException je) { 195 throw new CommandException(je); 196 } 197 } 198 else { 199 log.error(ErrorCode.E0610); 200 } 201 202 makeFail = false; 203 } 204 catch (DagEngineException dee) { 205 errMsg = dee.getMessage(); 206 errCode = dee.getErrorCode().toString(); 207 log.warn("can not create DagEngine for submitting jobs", dee); 208 } 209 catch (CommandException ce) { 210 errMsg = ce.getMessage(); 211 errCode = ce.getErrorCode().toString(); 212 log.warn("command exception occured ", ce); 213 } 214 catch (java.io.IOException ioe) { 215 errMsg = ioe.getMessage(); 216 errCode = "E1005"; 217 log.warn("Configuration parse error. read from DB :" + coordAction.getRunConf(), ioe); 218 } 219 catch (Exception ex) { 220 errMsg = ex.getMessage(); 221 errCode = "E1005"; 222 log.warn("can not create DagEngine for submitting jobs", ex); 223 } 224 finally { 225 if (makeFail == true) { // No DB exception occurs 226 log.error("Failing the action " + coordAction.getId() + ". Because " + errCode + " : " + errMsg); 227 coordAction.setStatus(CoordinatorAction.Status.FAILED); 228 if (errMsg.length() > 254) { // Because table column size is 255 229 errMsg = errMsg.substring(0, 255); 230 } 231 coordAction.setErrorMessage(errMsg); 232 coordAction.setErrorCode(errCode); 233 234 updateList = new ArrayList<JsonBean>(); 235 updateList.add(coordAction); 236 insertList = new ArrayList<JsonBean>(); 237 238 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.FAILED, 239 SlaAppType.COORDINATOR_ACTION, log); 240 if(slaEvent != null) { 241 insertList.add(slaEvent); //Update SLA events 242 } 243 try { 244 // call JPAExecutor to do the bulk writes 245 jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList)); 246 } 247 catch (JPAExecutorException je) { 248 throw new CommandException(je); 249 } 250 queue(new CoordActionReadyXCommand(coordAction.getJobId())); 251 } 252 } 253 } 254 return null; 255 } 256 257 @Override 258 public String getEntityKey() { 259 return this.jobId; 260 } 261 262 @Override 263 protected boolean isLockRequired() { 264 return true; 265 } 266 267 @Override 268 protected void loadState() throws CommandException { 269 270 } 271 272 @Override 273 protected void eagerLoadState() throws CommandException { 274 jpaService = Services.get().get(JPAService.class); 275 if (jpaService == null) { 276 throw new CommandException(ErrorCode.E0610); 277 } 278 279 try { 280 coordAction = jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionGetForStartJPAExecutor(actionId)); 281 } 282 catch (JPAExecutorException je) { 283 throw new CommandException(je); 284 } 285 LogUtils.setLogInfo(coordAction, logInfo); 286 } 287 288 @Override 289 protected void verifyPrecondition() throws PreconditionException { 290 if (coordAction.getStatus() != CoordinatorAction.Status.SUBMITTED) { 291 throw new PreconditionException(ErrorCode.E1100, "The coord action [" + actionId + "] must have status " 292 + CoordinatorAction.Status.SUBMITTED.name() + " but has status [" + coordAction.getStatus().name() + "]"); 293 } 294 } 295 }