This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.oozie.client.WorkflowJob;
022 import org.apache.oozie.client.SLAEvent.SlaAppType;
023 import org.apache.oozie.client.SLAEvent.Status;
024 import org.apache.oozie.WorkflowActionBean;
025 import org.apache.oozie.WorkflowJobBean;
026 import org.apache.oozie.ErrorCode;
027 import org.apache.oozie.XException;
028 import org.apache.oozie.command.CommandException;
029 import org.apache.oozie.command.PreconditionException;
030 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
031 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
032 import org.apache.oozie.executor.jpa.JPAExecutorException;
033 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
034 import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
035 import org.apache.oozie.executor.jpa.WorkflowActionUpdateJPAExecutor;
036 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
037 import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor;
038 import org.apache.oozie.service.ELService;
039 import org.apache.oozie.service.JPAService;
040 import org.apache.oozie.service.SchemaService;
041 import org.apache.oozie.service.Services;
042 import org.apache.oozie.service.UUIDService;
043 import org.apache.oozie.service.WorkflowStoreService;
044 import org.apache.oozie.workflow.WorkflowException;
045 import org.apache.oozie.workflow.WorkflowInstance;
046 import org.apache.oozie.workflow.lite.KillNodeDef;
047 import org.apache.oozie.workflow.lite.NodeDef;
048 import org.apache.oozie.util.ELEvaluator;
049 import org.apache.oozie.util.InstrumentUtils;
050 import org.apache.oozie.util.LogUtils;
051 import org.apache.oozie.util.XConfiguration;
052 import org.apache.oozie.util.ParamChecker;
053 import org.apache.oozie.util.XmlUtils;
054 import org.apache.oozie.util.db.SLADbXOperations;
055 import org.jdom.Element;
056 import org.jdom.Namespace;
057
058 import java.io.StringReader;
059 import java.util.Date;
060 import java.util.List;
061 import java.util.Map;
062
063 public class SignalXCommand extends WorkflowXCommand<Void> {
064
065 protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded";
066
067 private JPAService jpaService = null;
068 private String jobId;
069 private String actionId;
070 private WorkflowJobBean wfJob;
071 private WorkflowActionBean wfAction;
072
073 public SignalXCommand(String name, int priority, String jobId) {
074 super(name, name, priority);
075 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
076 }
077
078 public SignalXCommand(String jobId, String actionId) {
079 this("signal", 1, jobId);
080 this.actionId = ParamChecker.notEmpty(actionId, "actionId");
081 }
082
083 @Override
084 protected boolean isLockRequired() {
085 return true;
086 }
087
088 @Override
089 public String getEntityKey() {
090 return this.jobId;
091 }
092
093 @Override
094 protected void loadState() throws CommandException {
095 try {
096 jpaService = Services.get().get(JPAService.class);
097 if (jpaService != null) {
098 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
099 LogUtils.setLogInfo(wfJob, logInfo);
100 if (actionId != null) {
101 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
102 LogUtils.setLogInfo(wfAction, logInfo);
103 }
104 }
105 else {
106 throw new CommandException(ErrorCode.E0610);
107 }
108 }
109 catch (XException ex) {
110 throw new CommandException(ex);
111 }
112 }
113
114 @Override
115 protected void verifyPrecondition() throws CommandException, PreconditionException {
116 if ((wfAction == null) || (wfAction.isComplete() && wfAction.isPending())) {
117 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.PREP) {
118 throw new PreconditionException(ErrorCode.E0813, wfJob.getStatusStr());
119 }
120 }
121 else {
122 throw new PreconditionException(ErrorCode.E0814, actionId, wfAction.getStatusStr(), wfAction.isPending());
123 }
124 }
125
126 @Override
127 protected Void execute() throws CommandException {
128 LOG.debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
129 WorkflowInstance workflowInstance = wfJob.getWorkflowInstance();
130 workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob);
131 boolean completed = false;
132 boolean skipAction = false;
133 if (wfAction == null) {
134 if (wfJob.getStatus() == WorkflowJob.Status.PREP) {
135 try {
136 completed = workflowInstance.start();
137 }
138 catch (WorkflowException e) {
139 throw new CommandException(e);
140 }
141 wfJob.setStatus(WorkflowJob.Status.RUNNING);
142 wfJob.setStartTime(new Date());
143 wfJob.setWorkflowInstance(workflowInstance);
144 // 1. Add SLA status event for WF-JOB with status STARTED
145 // 2. Add SLA registration events for all WF_ACTIONS
146 SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), jobId, Status.STARTED, SlaAppType.WORKFLOW_JOB);
147 writeSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob
148 .getGroup(), wfJob.getConf());
149 queue(new NotificationXCommand(wfJob));
150 }
151 else {
152 throw new CommandException(ErrorCode.E0801, wfJob.getId());
153 }
154 }
155 else {
156 String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
157 + ReRunXCommand.TO_SKIP);
158 if (skipVar != null) {
159 skipAction = skipVar.equals("true");
160 }
161 try {
162 completed = workflowInstance.signal(wfAction.getExecutionPath(), wfAction.getSignalValue());
163 }
164 catch (WorkflowException e) {
165 throw new CommandException(e);
166 }
167 wfJob.setWorkflowInstance(workflowInstance);
168 wfAction.resetPending();
169 if (!skipAction) {
170 wfAction.setTransition(workflowInstance.getTransition(wfAction.getName()));
171 queue(new NotificationXCommand(wfJob, wfAction));
172 }
173 try {
174 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
175 }
176 catch (JPAExecutorException je) {
177 throw new CommandException(je);
178 }
179 }
180
181 if (completed) {
182 try {
183 for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) {
184 WorkflowActionBean actionToKill;
185
186 actionToKill = jpaService.execute(new WorkflowActionGetJPAExecutor(actionToKillId));
187
188 actionToKill.setPending();
189 actionToKill.setStatus(WorkflowActionBean.Status.KILLED);
190 jpaService.execute(new WorkflowActionUpdateJPAExecutor(actionToKill));
191 queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType()));
192 }
193
194 for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) {
195 WorkflowActionBean actionToFail = jpaService.execute(new WorkflowActionGetJPAExecutor(
196 actionToFailId));
197 actionToFail.resetPending();
198 actionToFail.setStatus(WorkflowActionBean.Status.FAILED);
199 queue(new NotificationXCommand(wfJob, actionToFail));
200 SLADbXOperations.writeStausEvent(wfAction.getSlaXml(), wfAction.getId(), Status.FAILED,
201 SlaAppType.WORKFLOW_ACTION);
202 jpaService.execute(new WorkflowActionUpdateJPAExecutor(actionToFail));
203 }
204 }
205 catch (JPAExecutorException je) {
206 throw new CommandException(je);
207 }
208
209 wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString()));
210 wfJob.setEndTime(new Date());
211 wfJob.setWorkflowInstance(workflowInstance);
212 Status slaStatus = Status.SUCCEEDED;
213 switch (wfJob.getStatus()) {
214 case SUCCEEDED:
215 slaStatus = Status.SUCCEEDED;
216 break;
217 case KILLED:
218 slaStatus = Status.KILLED;
219 break;
220 case FAILED:
221 slaStatus = Status.FAILED;
222 break;
223 default: // TODO SUSPENDED
224 break;
225 }
226 SLADbXOperations.writeStausEvent(wfJob.getSlaXml(), jobId, slaStatus, SlaAppType.WORKFLOW_JOB);
227 queue(new NotificationXCommand(wfJob));
228 if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
229 InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation());
230 }
231
232 // output message for Kill node
233 if (wfAction != null) { // wfAction could be a no-op job
234 NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath());
235 if (nodeDef != null && nodeDef instanceof KillNodeDef) {
236 boolean isRetry = false;
237 boolean isUserRetry = false;
238 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry,
239 isUserRetry);
240 try {
241 String tmpNodeConf = nodeDef.getConf();
242 String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class);
243 LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]",
244 jobId, actionId, tmpNodeConf, actionConf);
245 if (wfAction.getErrorCode() != null) {
246 wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf);
247 }
248 else {
249 wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf);
250 }
251 jpaService.execute(new WorkflowActionUpdateJPAExecutor(wfAction));
252 }
253 catch (JPAExecutorException je) {
254 throw new CommandException(je);
255 }
256 catch (Exception ex) {
257 LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex);
258 throw new CommandException(ErrorCode.E0729, wfAction.getName(), ex);
259 }
260 }
261 }
262
263 }
264 else {
265 for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) {
266 String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
267 + ReRunXCommand.TO_SKIP);
268 boolean skipNewAction = false;
269 if (skipVar != null) {
270 skipNewAction = skipVar.equals("true");
271 }
272 try {
273 if (skipNewAction) {
274 WorkflowActionBean oldAction;
275
276 oldAction = jpaService.execute(new WorkflowActionGetJPAExecutor(newAction.getId()));
277
278 oldAction.setPending();
279 jpaService.execute(new WorkflowActionUpdateJPAExecutor(oldAction));
280
281 queue(new SignalXCommand(jobId, oldAction.getId()));
282 }
283 else {
284 newAction.setPending();
285 String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp()
286 .getDefinition(), wfJob.getConf());
287 newAction.setSlaXml(actionSlaXml);
288 jpaService.execute(new WorkflowActionInsertJPAExecutor(newAction));
289 LOG.debug("SignalXCommand: Name: "+ newAction.getName() + ", Id: " +newAction.getId() + ", Authcode:" + newAction.getCred());
290 queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
291 }
292 }
293 catch (JPAExecutorException je) {
294 throw new CommandException(je);
295 }
296 }
297 }
298
299 try {
300 jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfJob));
301 }
302 catch (JPAExecutorException je) {
303 throw new CommandException(je);
304 }
305 LOG.debug(
306 "Updated the workflow status to " + wfJob.getId() + " status =" + wfJob.getStatusStr());
307 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
308 // update coordinator action
309 new CoordActionUpdateXCommand(wfJob).call(); //Note: Called even if wf is not necessarily instantiated by coordinator
310 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
311 }
312 LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
313 return null;
314 }
315
316 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
317 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
318 for (Map.Entry<String, String> entry : conf) {
319 eval.setVariable(entry.getKey(), entry.getValue());
320 }
321 return eval;
322 }
323
324 @SuppressWarnings("unchecked")
325 private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException {
326 String slaXml = null;
327 try {
328 Element eWfJob = XmlUtils.parseXml(wfXml);
329 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
330 if (action.getAttributeValue("name").equals(actionName) == false) {
331 continue;
332 }
333 Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
334 if (eSla != null) {
335 slaXml = XmlUtils.prettyPrint(eSla).toString();
336 break;
337 }
338 }
339 }
340 catch (Exception e) {
341 throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
342 }
343 return slaXml;
344 }
345
346 private String resolveSla(Element eSla, Configuration conf) throws CommandException {
347 String slaXml = null;
348 try {
349 ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
350 slaXml = SubmitXCommand.resolveSla(eSla, evalSla);
351 }
352 catch (Exception e) {
353 throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
354 }
355 return slaXml;
356 }
357
358 @SuppressWarnings("unchecked")
359 private void writeSLARegistrationForAllActions(String wfXml, String user, String group, String strConf)
360 throws CommandException {
361 try {
362 Element eWfJob = XmlUtils.parseXml(wfXml);
363 Configuration conf = new XConfiguration(new StringReader(strConf));
364 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
365 Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
366 if (eSla != null) {
367 String slaXml = resolveSla(eSla, conf);
368 eSla = XmlUtils.parseXml(slaXml);
369 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId,
370 action.getAttributeValue("name") + "");
371 SLADbXOperations.writeSlaRegistrationEvent(eSla, actionId, SlaAppType.WORKFLOW_ACTION, user, group);
372 }
373 }
374 }
375 catch (Exception e) {
376 throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e);
377 }
378
379 }
380
381 }