This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import org.apache.hadoop.conf.Configuration;
021
022 import org.apache.oozie.client.CoordinatorAction;
023 import org.apache.oozie.client.OozieClient;
024 import org.apache.oozie.CoordinatorActionBean;
025 import org.apache.oozie.DagEngineException;
026 import org.apache.oozie.DagEngine;
027 import org.apache.oozie.ErrorCode;
028 import org.apache.oozie.WorkflowJobBean;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.command.PreconditionException;
031 import org.apache.oozie.service.DagEngineService;
032 import org.apache.oozie.service.JPAService;
033 import org.apache.oozie.service.Services;
034 import org.apache.oozie.util.JobUtils;
035 import org.apache.oozie.util.LogUtils;
036 import org.apache.oozie.util.ParamChecker;
037 import org.apache.oozie.util.XLog;
038 import org.apache.oozie.util.XmlUtils;
039 import org.apache.oozie.util.XConfiguration;
040 import org.apache.oozie.util.db.SLADbOperations;
041 import org.apache.oozie.client.SLAEvent.SlaAppType;
042 import org.apache.oozie.client.SLAEvent.Status;
043 import org.apache.oozie.executor.jpa.JPAExecutorException;
044 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
045 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
046
047 import org.jdom.Element;
048 import org.jdom.JDOMException;
049
050 import java.io.IOException;
051 import java.io.StringReader;
052
053 public class CoordActionStartXCommand extends CoordinatorXCommand<Void> {
054
055 public static final String EL_ERROR = "EL_ERROR";
056 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
057 public static final String COULD_NOT_START = "COULD_NOT_START";
058 public static final String START_DATA_MISSING = "START_DATA_MISSING";
059 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
060
061 private final XLog log = getLog();
062 private String actionId = null;
063 private String user = null;
064 private String authToken = null;
065 private CoordinatorActionBean coordAction = null;
066 private JPAService jpaService = null;
067 private String jobId = null;
068
069 public CoordActionStartXCommand(String id, String user, String token, String jobId) {
070 //super("coord_action_start", "coord_action_start", 1, XLog.OPS);
071 super("coord_action_start", "coord_action_start", 1);
072 this.actionId = ParamChecker.notEmpty(id, "id");
073 this.user = ParamChecker.notEmpty(user, "user");
074 this.authToken = ParamChecker.notEmpty(token, "token");
075 this.jobId = jobId;
076 }
077
078 /**
079 * Create config to pass to WF Engine 1. Get createdConf from coord_actions table 2. Get actionXml from
080 * coord_actions table. Extract all 'property' tags and merge createdConf (overwrite duplicate keys). 3. Extract
081 * 'app-path' from actionXML. Create a new property called 'oozie.wf.application.path' and merge with createdConf
082 * (overwrite duplicate keys) 4. Read contents of config-default.xml in workflow directory. 5. Merge createdConf
083 * with config-default.xml (overwrite duplicate keys). 6. Results is runConf which is saved in coord_actions table.
084 * Merge Action createdConf with actionXml to create new runConf with replaced variables
085 *
086 * @param action CoordinatorActionBean
087 * @return Configuration
088 * @throws CommandException
089 */
090 private Configuration mergeConfig(CoordinatorActionBean action) throws CommandException {
091 String createdConf = action.getCreatedConf();
092 String actionXml = action.getActionXml();
093 Element workflowProperties = null;
094 try {
095 workflowProperties = XmlUtils.parseXml(actionXml);
096 }
097 catch (JDOMException e1) {
098 log.warn("Configuration parse error in:" + actionXml);
099 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
100 }
101 // generate the 'runConf' for this action
102 // Step 1: runConf = createdConf
103 Configuration runConf = null;
104 try {
105 runConf = new XConfiguration(new StringReader(createdConf));
106 }
107 catch (IOException e1) {
108 log.warn("Configuration parse error in:" + createdConf);
109 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
110 }
111 // Step 2: Merge local properties into runConf
112 // extract 'property' tags under 'configuration' block in the
113 // coordinator.xml (saved in actionxml column)
114 // convert Element to XConfiguration
115 Element configElement = (Element) workflowProperties.getChild("action", workflowProperties.getNamespace())
116 .getChild("workflow", workflowProperties.getNamespace()).getChild("configuration",
117 workflowProperties.getNamespace());
118 if (configElement != null) {
119 String strConfig = XmlUtils.prettyPrint(configElement).toString();
120 Configuration localConf;
121 try {
122 localConf = new XConfiguration(new StringReader(strConfig));
123 }
124 catch (IOException e1) {
125 log.warn("Configuration parse error in:" + strConfig);
126 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
127 }
128
129 // copy configuration properties in coordinator.xml to the runConf
130 XConfiguration.copy(localConf, runConf);
131 }
132
133 // Step 3: Extract value of 'app-path' in actionxml, and save it as a
134 // new property called 'oozie.wf.application.path'
135 // WF Engine requires the path to the workflow.xml to be saved under
136 // this property name
137 String appPath = workflowProperties.getChild("action", workflowProperties.getNamespace()).getChild("workflow",
138 workflowProperties.getNamespace()).getChild("app-path", workflowProperties.getNamespace()).getValue();
139 runConf.set("oozie.wf.application.path", appPath);
140 return runConf;
141 }
142
143 @Override
144 protected Void execute() throws CommandException {
145 boolean makeFail = true;
146 String errCode = "";
147 String errMsg = "";
148 ParamChecker.notEmpty(user, "user");
149 ParamChecker.notEmpty(authToken, "authToken");
150
151 log.debug("actionid=" + actionId + ", status=" + coordAction.getStatus());
152 if (coordAction.getStatus() == CoordinatorAction.Status.SUBMITTED) {
153 // log.debug("getting.. job id: " + coordAction.getJobId());
154 // create merged runConf to pass to WF Engine
155 Configuration runConf = mergeConfig(coordAction);
156 coordAction.setRunConf(XmlUtils.prettyPrint(runConf).toString());
157 // log.debug("%%% merged runconf=" +
158 // XmlUtils.prettyPrint(runConf).toString());
159 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, authToken);
160 try {
161 boolean startJob = true;
162 Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf()));
163 SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), Status.STARTED,
164 SlaAppType.COORDINATOR_ACTION, log);
165
166 // Normalize workflow appPath here;
167 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
168 String wfId = dagEngine.submitJob(conf, startJob);
169 coordAction.setStatus(CoordinatorAction.Status.RUNNING);
170 coordAction.setExternalId(wfId);
171 coordAction.incrementAndGetPending();
172
173 //store.updateCoordinatorAction(coordAction);
174 JPAService jpaService = Services.get().get(JPAService.class);
175 if (jpaService != null) {
176 log.debug("Updating WF record for WFID :" + wfId + " with parent id: " + actionId);
177 WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(wfId));
178 wfJob.setParentId(actionId);
179 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
180 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForStartJPAExecutor(coordAction));
181 }
182 else {
183 log.error(ErrorCode.E0610);
184 }
185
186 makeFail = false;
187 }
188 catch (DagEngineException dee) {
189 errMsg = dee.getMessage();
190 errCode = "E1005";
191 log.warn("can not create DagEngine for submitting jobs", dee);
192 }
193 catch (CommandException ce) {
194 errMsg = ce.getMessage();
195 errCode = ce.getErrorCode().toString();
196 log.warn("command exception occured ", ce);
197 }
198 catch (java.io.IOException ioe) {
199 errMsg = ioe.getMessage();
200 errCode = "E1005";
201 log.warn("Configuration parse error. read from DB :" + coordAction.getRunConf(), ioe);
202 }
203 catch (Exception ex) {
204 errMsg = ex.getMessage();
205 errCode = "E1005";
206 log.warn("can not create DagEngine for submitting jobs", ex);
207 }
208 finally {
209 if (makeFail == true) { // No DB exception occurs
210 log.warn("Failing the action " + coordAction.getId() + ". Because " + errCode + " : " + errMsg);
211 coordAction.setStatus(CoordinatorAction.Status.FAILED);
212 if (errMsg.length() > 254) { // Because table column size is 255
213 errMsg = errMsg.substring(0, 255);
214 }
215 coordAction.setErrorMessage(errMsg);
216 coordAction.setErrorCode(errCode);
217
218 JPAService jpaService = Services.get().get(JPAService.class);
219 if (jpaService != null) {
220 try {
221 jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionUpdateForStartJPAExecutor(coordAction));
222 }
223 catch (JPAExecutorException je) {
224 throw new CommandException(je);
225 }
226 }
227 else {
228 log.error(ErrorCode.E0610);
229 }
230 SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), Status.FAILED,
231 SlaAppType.COORDINATOR_ACTION, log); //Update SLA events
232 queue(new CoordActionReadyXCommand(coordAction.getJobId()));
233 }
234 }
235 }
236 return null;
237 }
238
239 @Override
240 public String getEntityKey() {
241 return this.jobId;
242 }
243
244 @Override
245 protected boolean isLockRequired() {
246 return true;
247 }
248
249 @Override
250 protected void loadState() throws CommandException {
251
252 }
253
254 @Override
255 protected void eagerLoadState() throws CommandException {
256 jpaService = Services.get().get(JPAService.class);
257 if (jpaService == null) {
258 throw new CommandException(ErrorCode.E0610);
259 }
260
261 try {
262 coordAction = jpaService.execute(new org.apache.oozie.executor.jpa.CoordActionGetForStartJPAExecutor(actionId));
263 }
264 catch (JPAExecutorException je) {
265 throw new CommandException(je);
266 }
267 LogUtils.setLogInfo(coordAction, logInfo);
268 }
269
270 @Override
271 protected void verifyPrecondition() throws PreconditionException {
272 if (coordAction.getStatus() != CoordinatorAction.Status.SUBMITTED) {
273 throw new PreconditionException(ErrorCode.E1100);
274 }
275 }
276 }