001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie.command.coord; 019 020 import org.apache.hadoop.conf.Configuration; 021 022 import org.apache.oozie.client.CoordinatorAction; 023 import org.apache.oozie.client.OozieClient; 024 import org.apache.oozie.CoordinatorActionBean; 025 import org.apache.oozie.DagEngineException; 026 import org.apache.oozie.DagEngine; 027 import org.apache.oozie.ErrorCode; 028 import org.apache.oozie.WorkflowJobBean; 029 import org.apache.oozie.command.CommandException; 030 import org.apache.oozie.command.PreconditionException; 031 import org.apache.oozie.service.DagEngineService; 032 import org.apache.oozie.service.JPAService; 033 import org.apache.oozie.service.Services; 034 import org.apache.oozie.util.JobUtils; 035 import org.apache.oozie.util.LogUtils; 036 import org.apache.oozie.util.ParamChecker; 037 import org.apache.oozie.util.XLog; 038 import org.apache.oozie.util.XmlUtils; 039 import org.apache.oozie.util.XConfiguration; 040 import org.apache.oozie.util.db.SLADbOperations; 041 import org.apache.oozie.client.SLAEvent.SlaAppType; 042 import org.apache.oozie.client.SLAEvent.Status; 043 import org.apache.oozie.executor.jpa.JPAExecutorException; 044 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; 045 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor; 046 047 import org.jdom.Element; 048 import org.jdom.JDOMException; 049 050 import java.io.IOException; 051 import java.io.StringReader; 052 053 public class CoordActionStartXCommand extends CoordinatorXCommand<Void> { 054 055 public static final String EL_ERROR = "EL_ERROR"; 056 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR"; 057 public static final String COULD_NOT_START = "COULD_NOT_START"; 058 public static final String START_DATA_MISSING = "START_DATA_MISSING"; 059 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; 060 061 private final XLog log = getLog(); 062 private String actionId = null; 063 private String user = null; 064 private String authToken = null; 065 private CoordinatorActionBean coordAction = null; 066 private JPAService jpaService = null; 067 068 public CoordActionStartXCommand(String id, String user, String token) { 069 //super("coord_action_start", "coord_action_start", 1, XLog.OPS); 070 super("coord_action_start", "coord_action_start", 1); 071 this.actionId = ParamChecker.notEmpty(id, "id"); 072 this.user = ParamChecker.notEmpty(user, "user"); 073 this.authToken = ParamChecker.notEmpty(token, "token"); 074 } 075 076 /** 077 * Create config to pass to WF Engine 1. Get createdConf from coord_actions table 2. Get actionXml from 078 * coord_actions table. Extract all 'property' tags and merge createdConf (overwrite duplicate keys). 3. Extract 079 * 'app-path' from actionXML. Create a new property called 'oozie.wf.application.path' and merge with createdConf 080 * (overwrite duplicate keys) 4. Read contents of config-default.xml in workflow directory. 5. Merge createdConf 081 * with config-default.xml (overwrite duplicate keys). 6. Results is runConf which is saved in coord_actions table. 082 * Merge Action createdConf with actionXml to create new runConf with replaced variables 083 * 084 * @param action CoordinatorActionBean 085 * @return Configuration 086 * @throws CommandException 087 */ 088 private Configuration mergeConfig(CoordinatorActionBean action) throws CommandException { 089 String createdConf = action.getCreatedConf(); 090 String actionXml = action.getActionXml(); 091 Element workflowProperties = null; 092 try { 093 workflowProperties = XmlUtils.parseXml(actionXml); 094 } 095 catch (JDOMException e1) { 096 log.warn("Configuration parse error in:" + actionXml); 097 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 098 } 099 // generate the 'runConf' for this action 100 // Step 1: runConf = createdConf 101 Configuration runConf = null; 102 try { 103 runConf = new XConfiguration(new StringReader(createdConf)); 104 } 105 catch (IOException e1) { 106 log.warn("Configuration parse error in:" + createdConf); 107 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 108 } 109 // Step 2: Merge local properties into runConf 110 // extract 'property' tags under 'configuration' block in the 111 // coordinator.xml (saved in actionxml column) 112 // convert Element to XConfiguration 113 Element configElement = (Element) workflowProperties.getChild("action", workflowProperties.getNamespace()) 114 .getChild("workflow", workflowProperties.getNamespace()).getChild("configuration", 115 workflowProperties.getNamespace()); 116 if (configElement != null) { 117 String strConfig = XmlUtils.prettyPrint(configElement).toString(); 118 Configuration localConf; 119 try { 120 localConf = new XConfiguration(new StringReader(strConfig)); 121 } 122 catch (IOException e1) { 123 log.warn("Configuration parse error in:" + strConfig); 124 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1); 125 } 126 127 // copy configuration properties in coordinator.xml to the runConf 128 XConfiguration.copy(localConf, runConf); 129 } 130 131 // Step 3: Extract value of 'app-path' in actionxml, and save it as a 132 // new property called 'oozie.wf.application.path' 133 // WF Engine requires the path to the workflow.xml to be saved under 134 // this property name 135 String appPath = workflowProperties.getChild("action", workflowProperties.getNamespace()).getChild("workflow", 136 workflowProperties.getNamespace()).getChild("app-path", workflowProperties.getNamespace()).getValue(); 137 runConf.set("oozie.wf.application.path", appPath); 138 return runConf; 139 } 140 141 @Override 142 protected Void execute() throws CommandException { 143 boolean makeFail = true; 144 String errCode = ""; 145 String errMsg = ""; 146 ParamChecker.notEmpty(user, "user"); 147 ParamChecker.notEmpty(authToken, "authToken"); 148 149 log.debug("actionid=" + actionId + ", status=" + coordAction.getStatus()); 150 if (coordAction.getStatus() == CoordinatorAction.Status.SUBMITTED) { 151 // log.debug("getting.. job id: " + coordAction.getJobId()); 152 // create merged runConf to pass to WF Engine 153 Configuration runConf = mergeConfig(coordAction); 154 coordAction.setRunConf(XmlUtils.prettyPrint(runConf).toString()); 155 // log.debug("%%% merged runconf=" + 156 // XmlUtils.prettyPrint(runConf).toString()); 157 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, authToken); 158 try { 159 boolean startJob = true; 160 Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf())); 161 SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED, 162 SlaAppType.COORDINATOR_ACTION, log); 163 164 // Normalize workflow appPath here; 165 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf); 166 String wfId = dagEngine.submitJob(conf, startJob); 167 coordAction.setStatus(CoordinatorAction.Status.RUNNING); 168 coordAction.setExternalId(wfId); 169 coordAction.incrementAndGetPending(); 170 171 //store.updateCoordinatorAction(coordAction); 172 JPAService jpaService = Services.get().get(JPAService.class); 173 if (jpaService != null) { 174 log.debug("Updating WF record for WFID :" + wfId + " with parent id: " + actionId); 175 WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfId)); 176 wfJob.setParentId(actionId); 177 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob)); 178 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(coordAction)); 179 } 180 else { 181 log.error(ErrorCode.E0610); 182 } 183 184 makeFail = false; 185 } 186 catch (DagEngineException dee) { 187 errMsg = dee.getMessage(); 188 errCode = "E1005"; 189 log.warn("can not create DagEngine for submitting jobs", dee); 190 } 191 catch (CommandException ce) { 192 errMsg = ce.getMessage(); 193 errCode = ce.getErrorCode().toString(); 194 log.warn("command exception occured ", ce); 195 } 196 catch (java.io.IOException ioe) { 197 errMsg = ioe.getMessage(); 198 errCode = "E1005"; 199 log.warn("Configuration parse error. read from DB :" + coordAction.getRunConf(), ioe); 200 } 201 catch (Exception ex) { 202 errMsg = ex.getMessage(); 203 errCode = "E1005"; 204 log.warn("can not create DagEngine for submitting jobs", ex); 205 } 206 finally { 207 if (makeFail == true) { // No DB exception occurs 208 log.warn("Failing the action " + coordAction.getId() + ". Because " + errCode + " : " + errMsg); 209 coordAction.setStatus(CoordinatorAction.Status.FAILED); 210 if (errMsg.length() > 254) { // Because table column size is 255 211 errMsg = errMsg.substring(0, 255); 212 } 213 coordAction.setErrorMessage(errMsg); 214 coordAction.setErrorCode(errCode); 215 216 JPAService jpaService = Services.get().get(JPAService.class); 217 if (jpaService != null) { 218 try { 219 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor(coordAction)); 220 } 221 catch (JPAExecutorException je) { 222 throw new CommandException(je); 223 } 224 } 225 else { 226 log.error(ErrorCode.E0610); 227 } 228 SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), Status.FAILED, 229 SlaAppType.COORDINATOR_ACTION, log); //Update SLA events 230 queue(new CoordActionReadyXCommand(coordAction.getJobId())); 231 } 232 } 233 } 234 return null; 235 } 236 237 @Override 238 protected String getEntityKey() { 239 return coordAction.getJobId(); 240 } 241 242 @Override 243 protected boolean isLockRequired() { 244 return true; 245 } 246 247 @Override 248 protected void loadState() throws CommandException { 249 250 } 251 252 @Override 253 protected void eagerLoadState() throws CommandException { 254 jpaService = Services.get().get(JPAService.class); 255 if (jpaService == null) { 256 throw new CommandException(ErrorCode.E0610); 257 } 258 259 try { 260 coordAction = jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor(actionId)); 261 } 262 catch (JPAExecutorException je) { 263 throw new CommandException(je); 264 } 265 LogUtils.setLogInfo(coordAction, logInfo); 266 } 267 268 @Override 269 protected void verifyPrecondition() throws PreconditionException { 270 if (coordAction.getStatus() != CoordinatorAction.Status.SUBMITTED) { 271 throw new PreconditionException(ErrorCode.E1100); 272 } 273 } 274 }