This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import org.apache.hadoop.conf.Configuration;
021
022 import org.apache.oozie.client.CoordinatorAction;
023 import org.apache.oozie.client.OozieClient;
024 import org.apache.oozie.CoordinatorActionBean;
025 import org.apache.oozie.DagEngineException;
026 import org.apache.oozie.DagEngine;
027 import org.apache.oozie.ErrorCode;
028 import org.apache.oozie.SLAEventBean;
029 import org.apache.oozie.WorkflowJobBean;
030 import org.apache.oozie.command.CommandException;
031 import org.apache.oozie.command.PreconditionException;
032 import org.apache.oozie.service.DagEngineService;
033 import org.apache.oozie.service.JPAService;
034 import org.apache.oozie.service.Services;
035 import org.apache.oozie.util.JobUtils;
036 import org.apache.oozie.util.LogUtils;
037 import org.apache.oozie.util.ParamChecker;
038 import org.apache.oozie.util.XLog;
039 import org.apache.oozie.util.XmlUtils;
040 import org.apache.oozie.util.XConfiguration;
041 import org.apache.oozie.util.db.SLADbOperations;
042 import org.apache.oozie.client.SLAEvent.SlaAppType;
043 import org.apache.oozie.client.SLAEvent.Status;
044 import org.apache.oozie.client.rest.JsonBean;
045 import org.apache.oozie.executor.jpa.BulkUpdateInsertForCoordActionStartJPAExecutor;
046 import org.apache.oozie.executor.jpa.JPAExecutorException;
047 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
048 import org.jdom.Element;
049 import org.jdom.JDOMException;
050
051 import java.io.IOException;
052 import java.io.StringReader;
053 import java.util.ArrayList;
054 import java.util.Date;
055 import java.util.List;
056
057 public class CoordActionStartXCommand extends CoordinatorXCommand<Void> {
058
059 public static final String EL_ERROR = "EL_ERROR";
060 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
061 public static final String COULD_NOT_START = "COULD_NOT_START";
062 public static final String START_DATA_MISSING = "START_DATA_MISSING";
063 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
064
065 private final XLog log = getLog();
066 private String actionId = null;
067 private String user = null;
068 private String authToken = null;
069 private CoordinatorActionBean coordAction = null;
070 private JPAService jpaService = null;
071 private String jobId = null;
072 private List<JsonBean> updateList = new ArrayList<JsonBean>();
073 private List<JsonBean> insertList = new ArrayList<JsonBean>();
074
075 public CoordActionStartXCommand(String id, String user, String token, String jobId) {
076 //super("coord_action_start", "coord_action_start", 1, XLog.OPS);
077 super("coord_action_start", "coord_action_start", 1);
078 this.actionId = ParamChecker.notEmpty(id, "id");
079 this.user = ParamChecker.notEmpty(user, "user");
080 this.authToken = ParamChecker.notEmpty(token, "token");
081 this.jobId = jobId;
082 }
083
084 /**
085 * Create config to pass to WF Engine 1. Get createdConf from coord_actions table 2. Get actionXml from
086 * coord_actions table. Extract all 'property' tags and merge createdConf (overwrite duplicate keys). 3. Extract
087 * 'app-path' from actionXML. Create a new property called 'oozie.wf.application.path' and merge with createdConf
088 * (overwrite duplicate keys) 4. Read contents of config-default.xml in workflow directory. 5. Merge createdConf
089 * with config-default.xml (overwrite duplicate keys). 6. Results is runConf which is saved in coord_actions table.
090 * Merge Action createdConf with actionXml to create new runConf with replaced variables
091 *
092 * @param action CoordinatorActionBean
093 * @return Configuration
094 * @throws CommandException
095 */
096 private Configuration mergeConfig(CoordinatorActionBean action) throws CommandException {
097 String createdConf = action.getCreatedConf();
098 String actionXml = action.getActionXml();
099 Element workflowProperties = null;
100 try {
101 workflowProperties = XmlUtils.parseXml(actionXml);
102 }
103 catch (JDOMException e1) {
104 log.warn("Configuration parse error in:" + actionXml);
105 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
106 }
107 // generate the 'runConf' for this action
108 // Step 1: runConf = createdConf
109 Configuration runConf = null;
110 try {
111 runConf = new XConfiguration(new StringReader(createdConf));
112 }
113 catch (IOException e1) {
114 log.warn("Configuration parse error in:" + createdConf);
115 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
116 }
117 // Step 2: Merge local properties into runConf
118 // extract 'property' tags under 'configuration' block in the
119 // coordinator.xml (saved in actionxml column)
120 // convert Element to XConfiguration
121 Element configElement = (Element) workflowProperties.getChild("action", workflowProperties.getNamespace())
122 .getChild("workflow", workflowProperties.getNamespace()).getChild("configuration",
123 workflowProperties.getNamespace());
124 if (configElement != null) {
125 String strConfig = XmlUtils.prettyPrint(configElement).toString();
126 Configuration localConf;
127 try {
128 localConf = new XConfiguration(new StringReader(strConfig));
129 }
130 catch (IOException e1) {
131 log.warn("Configuration parse error in:" + strConfig);
132 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
133 }
134
135 // copy configuration properties in coordinator.xml to the runConf
136 XConfiguration.copy(localConf, runConf);
137 }
138
139 // Step 3: Extract value of 'app-path' in actionxml, and save it as a
140 // new property called 'oozie.wf.application.path'
141 // WF Engine requires the path to the workflow.xml to be saved under
142 // this property name
143 String appPath = workflowProperties.getChild("action", workflowProperties.getNamespace()).getChild("workflow",
144 workflowProperties.getNamespace()).getChild("app-path", workflowProperties.getNamespace()).getValue();
145 runConf.set("oozie.wf.application.path", appPath);
146 return runConf;
147 }
148
149 @Override
150 protected Void execute() throws CommandException {
151 boolean makeFail = true;
152 String errCode = "";
153 String errMsg = "";
154 ParamChecker.notEmpty(user, "user");
155 ParamChecker.notEmpty(authToken, "authToken");
156
157 log.debug("actionid=" + actionId + ", status=" + coordAction.getStatus());
158 if (coordAction.getStatus() == CoordinatorAction.Status.SUBMITTED) {
159 // log.debug("getting.. job id: " + coordAction.getJobId());
160 // create merged runConf to pass to WF Engine
161 Configuration runConf = mergeConfig(coordAction);
162 coordAction.setRunConf(XmlUtils.prettyPrint(runConf).toString());
163 // log.debug("%%% merged runconf=" +
164 // XmlUtils.prettyPrint(runConf).toString());
165 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, authToken);
166 try {
167 boolean startJob = true;
168 Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf()));
169 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED,
170 SlaAppType.COORDINATOR_ACTION, log);
171 if(slaEvent != null) {
172 insertList.add(slaEvent);
173 }
174
175 // Normalize workflow appPath here;
176 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
177 String wfId = dagEngine.submitJob(conf, startJob);
178 coordAction.setStatus(CoordinatorAction.Status.RUNNING);
179 coordAction.setExternalId(wfId);
180 coordAction.incrementAndGetPending();
181
182 //store.updateCoordinatorAction(coordAction);
183 JPAService jpaService = Services.get().get(JPAService.class);
184 if (jpaService != null) {
185 log.debug("Updating WF record for WFID :" + wfId + " with parent id: " + actionId);
186 WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfId));
187 wfJob.setParentId(actionId);
188 wfJob.setLastModifiedTime(new Date());
189 updateList.add(wfJob);
190 updateList.add(coordAction);
191 try {
192 jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
193 }
194 catch (JPAExecutorException je) {
195 throw new CommandException(je);
196 }
197 }
198 else {
199 log.error(ErrorCode.E0610);
200 }
201
202 makeFail = false;
203 }
204 catch (DagEngineException dee) {
205 errMsg = dee.getMessage();
206 errCode = dee.getErrorCode().toString();
207 log.warn("can not create DagEngine for submitting jobs", dee);
208 }
209 catch (CommandException ce) {
210 errMsg = ce.getMessage();
211 errCode = ce.getErrorCode().toString();
212 log.warn("command exception occured ", ce);
213 }
214 catch (java.io.IOException ioe) {
215 errMsg = ioe.getMessage();
216 errCode = "E1005";
217 log.warn("Configuration parse error. read from DB :" + coordAction.getRunConf(), ioe);
218 }
219 catch (Exception ex) {
220 errMsg = ex.getMessage();
221 errCode = "E1005";
222 log.warn("can not create DagEngine for submitting jobs", ex);
223 }
224 finally {
225 if (makeFail == true) { // No DB exception occurs
226 log.error("Failing the action " + coordAction.getId() + ". Because " + errCode + " : " + errMsg);
227 coordAction.setStatus(CoordinatorAction.Status.FAILED);
228 if (errMsg.length() > 254) { // Because table column size is 255
229 errMsg = errMsg.substring(0, 255);
230 }
231 coordAction.setErrorMessage(errMsg);
232 coordAction.setErrorCode(errCode);
233
234 updateList = new ArrayList<JsonBean>();
235 updateList.add(coordAction);
236 insertList = new ArrayList<JsonBean>();
237
238 SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), Status.FAILED,
239 SlaAppType.COORDINATOR_ACTION, log);
240 if(slaEvent != null) {
241 insertList.add(slaEvent); //Update SLA events
242 }
243 try {
244 // call JPAExecutor to do the bulk writes
245 jpaService.execute(new BulkUpdateInsertForCoordActionStartJPAExecutor(updateList, insertList));
246 }
247 catch (JPAExecutorException je) {
248 throw new CommandException(je);
249 }
250 queue(new CoordActionReadyXCommand(coordAction.getJobId()));
251 }
252 }
253 }
254 return null;
255 }
256
257 @Override
258 public String getEntityKey() {
259 return this.jobId;
260 }
261
262 @Override
263 protected boolean isLockRequired() {
264 return true;
265 }
266
267 @Override
268 protected void loadState() throws CommandException {
269
270 }
271
272 @Override
273 protected void eagerLoadState() throws CommandException {
274 jpaService = Services.get().get(JPAService.class);
275 if (jpaService == null) {
276 throw new CommandException(ErrorCode.E0610);
277 }
278
279 try {
280 coordAction = jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionGetForStartJPAExecutor(actionId));
281 }
282 catch (JPAExecutorException je) {
283 throw new CommandException(je);
284 }
285 LogUtils.setLogInfo(coordAction, logInfo);
286 }
287
288 @Override
289 protected void verifyPrecondition() throws PreconditionException {
290 if (coordAction.getStatus() != CoordinatorAction.Status.SUBMITTED) {
291 throw new PreconditionException(ErrorCode.E1100, "The coord action [" + actionId + "] must have status "
292 + CoordinatorAction.Status.SUBMITTED.name() + " but has status [" + coordAction.getStatus().name() + "]");
293 }
294 }
295 }