This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.coord;
019
020 import org.apache.hadoop.conf.Configuration;
021
022 import org.apache.oozie.client.CoordinatorAction;
023 import org.apache.oozie.client.OozieClient;
024 import org.apache.oozie.CoordinatorActionBean;
025 import org.apache.oozie.DagEngine;
026 import org.apache.oozie.DagEngineException;
027 import org.apache.oozie.ErrorCode;
028 import org.apache.oozie.WorkflowJobBean;
029 import org.apache.oozie.command.CommandException;
030 import org.apache.oozie.service.DagEngineService;
031 import org.apache.oozie.service.WorkflowStoreService;
032 import org.apache.oozie.store.StoreException;
033 import org.apache.oozie.store.CoordinatorStore;
034 import org.apache.oozie.store.WorkflowStore;
035 import org.apache.oozie.service.Services;
036 import org.apache.oozie.util.JobUtils;
037 import org.apache.oozie.util.ParamChecker;
038 import org.apache.oozie.util.XLog;
039 import org.apache.oozie.util.XmlUtils;
040 import org.apache.oozie.util.XConfiguration;
041 import org.apache.oozie.util.db.SLADbOperations;
042 import org.apache.oozie.client.SLAEvent.SlaAppType;
043 import org.apache.oozie.client.SLAEvent.Status;
044
045 import org.jdom.Element;
046 import org.jdom.JDOMException;
047
048 import java.io.IOException;
049 import java.io.StringReader;
050
051 public class CoordActionStartCommand extends CoordinatorCommand<Void> {
052
053 public static final String EL_ERROR = "EL_ERROR";
054 public static final String EL_EVAL_ERROR = "EL_EVAL_ERROR";
055 public static final String COULD_NOT_START = "COULD_NOT_START";
056 public static final String START_DATA_MISSING = "START_DATA_MISSING";
057 public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING";
058
059 private final XLog log = XLog.getLog(getClass());
060 private String actionId = null;
061 private String user = null;
062 private String authToken = null;
063 private CoordinatorActionBean coordAction = null;
064
065 public CoordActionStartCommand(String id, String user, String token) {
066 super("coord_action_start", "coord_action_start", 1, XLog.OPS);
067 this.actionId = ParamChecker.notEmpty(id, "id");
068 this.user = ParamChecker.notEmpty(user, "user");
069 this.authToken = ParamChecker.notEmpty(token, "token");
070 }
071
072 /**
073 * Create config to pass to WF Engine 1. Get createdConf from coord_actions table 2. Get actionXml from
074 * coord_actions table. Extract all 'property' tags and merge createdConf (overwrite duplicate keys). 3. Extract
075 * 'app-path' from actionXML. Create a new property called 'oozie.wf.application.path' and merge with createdConf
076 * (overwrite duplicate keys) 4. Read contents of config-default.xml in workflow directory. 5. Merge createdConf
077 * with config-default.xml (overwrite duplicate keys). 6. Results is runConf which is saved in coord_actions table.
078 * Merge Action createdConf with actionXml to create new runConf with replaced variables
079 *
080 * @param action CoordinatorActionBean
081 * @return Configuration
082 * @throws CommandException
083 */
084 private Configuration mergeConfig(CoordinatorActionBean action) throws CommandException {
085 String createdConf = action.getCreatedConf();
086 String actionXml = action.getActionXml();
087 Element workflowProperties = null;
088 try {
089 workflowProperties = XmlUtils.parseXml(actionXml);
090 }
091 catch (JDOMException e1) {
092 log.warn("Configuration parse error in:" + actionXml);
093 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
094 }
095 // generate the 'runConf' for this action
096 // Step 1: runConf = createdConf
097 Configuration runConf = null;
098 try {
099 runConf = new XConfiguration(new StringReader(createdConf));
100 }
101 catch (IOException e1) {
102 log.warn("Configuration parse error in:" + createdConf);
103 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
104 }
105 // Step 2: Merge local properties into runConf
106 // extract 'property' tags under 'configuration' block in the
107 // coordinator.xml (saved in actionxml column)
108 // convert Element to XConfiguration
109 Element configElement = (Element) workflowProperties.getChild("action", workflowProperties.getNamespace())
110 .getChild("workflow", workflowProperties.getNamespace()).getChild("configuration",
111 workflowProperties.getNamespace());
112 if (configElement != null) {
113 String strConfig = XmlUtils.prettyPrint(configElement).toString();
114 Configuration localConf;
115 try {
116 localConf = new XConfiguration(new StringReader(strConfig));
117 }
118 catch (IOException e1) {
119 log.warn("Configuration parse error in:" + strConfig);
120 throw new CommandException(ErrorCode.E1005, e1.getMessage(), e1);
121 }
122
123 // copy configuration properties in coordinator.xml to the runConf
124 XConfiguration.copy(localConf, runConf);
125 }
126
127 // Step 3: Extract value of 'app-path' in actionxml, and save it as a
128 // new property called 'oozie.wf.application.path'
129 // WF Engine requires the path to the workflow.xml to be saved under
130 // this property name
131 String appPath = workflowProperties.getChild("action", workflowProperties.getNamespace()).getChild("workflow",
132 workflowProperties.getNamespace()).getChild("app-path", workflowProperties.getNamespace()).getValue();
133 runConf.set("oozie.wf.application.path", appPath);
134 return runConf;
135 }
136
137 @Override
138 protected Void call(CoordinatorStore store) throws StoreException, CommandException {
139 boolean makeFail = true;
140 String errCode = "";
141 String errMsg = "";
142 ParamChecker.notEmpty(user, "user");
143 ParamChecker.notEmpty(authToken, "authToken");
144
145 // CoordinatorActionBean coordAction = store.getCoordinatorAction(id, true);
146 log.debug("actionid=" + actionId + ", status=" + coordAction.getStatus());
147 if (coordAction.getStatus() == CoordinatorAction.Status.SUBMITTED) {
148 // log.debug("getting.. job id: " + coordAction.getJobId());
149 // create merged runConf to pass to WF Engine
150 Configuration runConf = mergeConfig(coordAction);
151 coordAction.setRunConf(XmlUtils.prettyPrint(runConf).toString());
152 // log.debug("%%% merged runconf=" + XmlUtils.prettyPrint(runConf).toString());
153 DagEngine dagEngine = Services.get().get(DagEngineService.class).getDagEngine(user, authToken);
154 try {
155 boolean startJob = true;
156 Configuration conf = new XConfiguration(new StringReader(coordAction.getRunConf()));
157 SLADbOperations.writeStausEvent(coordAction.getSlaXml(), coordAction.getId(), store, Status.STARTED,
158 SlaAppType.COORDINATOR_ACTION);
159
160 // Normalize workflow appPath here;
161 JobUtils.normalizeAppPath(conf.get(OozieClient.USER_NAME), conf.get(OozieClient.GROUP_NAME), conf);
162
163 String wfId = dagEngine.submitJob(conf, startJob);
164 coordAction.setStatus(CoordinatorAction.Status.RUNNING);
165 coordAction.setExternalId(wfId);
166 store.updateCoordinatorAction(coordAction);
167
168 log.debug("Updating WF record for WFID :" + wfId + " with parent id: " + actionId);
169 WorkflowStore wfStore = Services.get().get(WorkflowStoreService.class).create(store);
170 WorkflowJobBean wfJob = wfStore.getWorkflow(wfId, false);
171 wfJob.setParentId(actionId);
172 wfStore.updateWorkflow(wfJob);
173
174 makeFail = false;
175 }
176 catch (StoreException se) {
177 makeFail = false;
178 throw se;
179 }
180 catch (DagEngineException dee) {
181 errMsg = dee.getMessage();
182 errCode = "E1005";
183 log.warn("can not create DagEngine for submitting jobs", dee);
184 }
185 catch (CommandException ce) {
186 errMsg = ce.getMessage();
187 errCode = ce.getErrorCode().toString();
188 log.warn("command exception occured ", ce);
189 }
190 catch (java.io.IOException ioe) {
191 errMsg = ioe.getMessage();
192 errCode = "E1005";
193 log.warn("Configuration parse error. read from DB :" + coordAction.getRunConf(), ioe);
194 }
195 catch (Exception ex) {
196 errMsg = ex.getMessage();
197 errCode = "E1005";
198 log.warn("can not create DagEngine for submitting jobs", ex);
199 }
200 finally {
201 if (makeFail == true) { // No DB exception occurs
202 log.warn("Failing the action " + coordAction.getId() + ". Because " + errCode + " : " + errMsg);
203 coordAction.setStatus(CoordinatorAction.Status.FAILED);
204 if (errMsg.length() > 254) { // Because table column size is 255
205 errMsg = errMsg.substring(0, 255);
206 }
207 coordAction.setErrorMessage(errMsg);
208 coordAction.setErrorCode(errCode);
209 store.updateCoordinatorAction(coordAction);
210 queueCallable(new CoordActionReadyCommand(coordAction.getJobId()));
211 }
212 }
213 }
214 return null;
215 }
216
217 @Override
218 protected Void execute(CoordinatorStore store) throws StoreException, CommandException {
219 log.info("STARTED CoordActionStartCommand actionId=" + actionId);
220 try {
221 coordAction = store.getEntityManager().find(CoordinatorActionBean.class, actionId);
222 setLogInfo(coordAction);
223 if (lock(coordAction.getJobId())) {
224 call(store);
225 }
226 else {
227 queueCallable(new CoordActionStartCommand(actionId, user, authToken), LOCK_FAILURE_REQUEUE_INTERVAL);
228 log.warn("CoordActionStartCommand lock was not acquired - failed jobId=" + coordAction.getJobId()
229 + ", actionId=" + actionId + ". Requeing the same.");
230 }
231 }
232 catch (InterruptedException e) {
233 queueCallable(new CoordActionStartCommand(actionId, user, authToken), LOCK_FAILURE_REQUEUE_INTERVAL);
234 log.warn("CoordActionStartCommand lock acquiring failed with exception " + e.getMessage() + " for jobId="
235 + coordAction.getJobId() + ", actionId=" + actionId + " Requeing the same.");
236 }
237 finally {
238 log.info("ENDED CoordActionStartCommand actionId=" + actionId);
239 }
240 return null;
241 }
242 }