This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.oozie.client.WorkflowJob;
022 import org.apache.oozie.client.SLAEvent.SlaAppType;
023 import org.apache.oozie.client.SLAEvent.Status;
024 import org.apache.oozie.client.rest.JsonBean;
025 import org.apache.oozie.SLAEventBean;
026 import org.apache.oozie.WorkflowActionBean;
027 import org.apache.oozie.WorkflowJobBean;
028 import org.apache.oozie.ErrorCode;
029 import org.apache.oozie.XException;
030 import org.apache.oozie.command.CommandException;
031 import org.apache.oozie.command.PreconditionException;
032 import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
033 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
034 import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
035 import org.apache.oozie.executor.jpa.JPAExecutorException;
036 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
037 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
038 import org.apache.oozie.service.ELService;
039 import org.apache.oozie.service.JPAService;
040 import org.apache.oozie.service.SchemaService;
041 import org.apache.oozie.service.Services;
042 import org.apache.oozie.service.UUIDService;
043 import org.apache.oozie.service.WorkflowStoreService;
044 import org.apache.oozie.workflow.WorkflowException;
045 import org.apache.oozie.workflow.WorkflowInstance;
046 import org.apache.oozie.workflow.lite.KillNodeDef;
047 import org.apache.oozie.workflow.lite.NodeDef;
048 import org.apache.oozie.util.ELEvaluator;
049 import org.apache.oozie.util.InstrumentUtils;
050 import org.apache.oozie.util.LogUtils;
051 import org.apache.oozie.util.XConfiguration;
052 import org.apache.oozie.util.ParamChecker;
053 import org.apache.oozie.util.XmlUtils;
054 import org.apache.oozie.util.db.SLADbXOperations;
055 import org.jdom.Element;
056 import org.jdom.Namespace;
057
058 import java.io.StringReader;
059 import java.util.ArrayList;
060 import java.util.Date;
061 import java.util.List;
062 import java.util.Map;
063
064 public class SignalXCommand extends WorkflowXCommand<Void> {
065
066 protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded";
067
068 private JPAService jpaService = null;
069 private String jobId;
070 private String actionId;
071 private WorkflowJobBean wfJob;
072 private WorkflowActionBean wfAction;
073 private List<JsonBean> updateList = new ArrayList<JsonBean>();
074 private List<JsonBean> insertList = new ArrayList<JsonBean>();
075
076
077 public SignalXCommand(String name, int priority, String jobId) {
078 super(name, name, priority);
079 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
080 }
081
082 public SignalXCommand(String jobId, String actionId) {
083 this("signal", 1, jobId);
084 this.actionId = ParamChecker.notEmpty(actionId, "actionId");
085 }
086
087 @Override
088 protected boolean isLockRequired() {
089 return true;
090 }
091
092 @Override
093 public String getEntityKey() {
094 return this.jobId;
095 }
096
097 @Override
098 protected void loadState() throws CommandException {
099 try {
100 jpaService = Services.get().get(JPAService.class);
101 if (jpaService != null) {
102 this.wfJob = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId));
103 LogUtils.setLogInfo(wfJob, logInfo);
104 if (actionId != null) {
105 this.wfAction = jpaService.execute(new WorkflowActionGetJPAExecutor(actionId));
106 LogUtils.setLogInfo(wfAction, logInfo);
107 }
108 }
109 else {
110 throw new CommandException(ErrorCode.E0610);
111 }
112 }
113 catch (XException ex) {
114 throw new CommandException(ex);
115 }
116 }
117
118 @Override
119 protected void verifyPrecondition() throws CommandException, PreconditionException {
120 if ((wfAction == null) || (wfAction.isComplete() && wfAction.isPending())) {
121 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.PREP) {
122 throw new PreconditionException(ErrorCode.E0813, wfJob.getStatusStr());
123 }
124 }
125 else {
126 throw new PreconditionException(ErrorCode.E0814, actionId, wfAction.getStatusStr(), wfAction.isPending());
127 }
128 }
129
130 @Override
131 protected Void execute() throws CommandException {
132 LOG.debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
133 WorkflowInstance workflowInstance = wfJob.getWorkflowInstance();
134 workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob);
135 boolean completed = false;
136 boolean skipAction = false;
137 if (wfAction == null) {
138 if (wfJob.getStatus() == WorkflowJob.Status.PREP) {
139 try {
140 completed = workflowInstance.start();
141 }
142 catch (WorkflowException e) {
143 throw new CommandException(e);
144 }
145 wfJob.setStatus(WorkflowJob.Status.RUNNING);
146 wfJob.setStartTime(new Date());
147 wfJob.setWorkflowInstance(workflowInstance);
148 // 1. Add SLA status event for WF-JOB with status STARTED
149 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
150 Status.STARTED, SlaAppType.WORKFLOW_JOB);
151 if(slaEvent != null) {
152 insertList.add(slaEvent);
153 }
154 // 2. Add SLA registration events for all WF_ACTIONS
155 createSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(), wfJob
156 .getGroup(), wfJob.getConf());
157 queue(new NotificationXCommand(wfJob));
158 }
159 else {
160 throw new CommandException(ErrorCode.E0801, wfJob.getId());
161 }
162 }
163 else {
164 String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
165 + ReRunXCommand.TO_SKIP);
166 if (skipVar != null) {
167 skipAction = skipVar.equals("true");
168 }
169 try {
170 completed = workflowInstance.signal(wfAction.getExecutionPath(), wfAction.getSignalValue());
171 }
172 catch (WorkflowException e) {
173 throw new CommandException(e);
174 }
175 wfJob.setWorkflowInstance(workflowInstance);
176 wfAction.resetPending();
177 if (!skipAction) {
178 wfAction.setTransition(workflowInstance.getTransition(wfAction.getName()));
179 queue(new NotificationXCommand(wfJob, wfAction));
180 }
181 updateList.add(wfAction);
182 }
183
184 if (completed) {
185 try {
186 for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) {
187 WorkflowActionBean actionToKill;
188
189 actionToKill = jpaService.execute(new WorkflowActionGetJPAExecutor(actionToKillId));
190
191 actionToKill.setPending();
192 actionToKill.setStatus(WorkflowActionBean.Status.KILLED);
193 updateList.add(actionToKill);
194 queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType()));
195 }
196
197 for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) {
198 WorkflowActionBean actionToFail = jpaService.execute(new WorkflowActionGetJPAExecutor(
199 actionToFailId));
200 actionToFail.resetPending();
201 actionToFail.setStatus(WorkflowActionBean.Status.FAILED);
202 queue(new NotificationXCommand(wfJob, actionToFail));
203 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(),
204 Status.FAILED, SlaAppType.WORKFLOW_ACTION);
205 if(slaEvent != null) {
206 insertList.add(slaEvent);
207 }
208 updateList.add(actionToFail);
209 }
210 }
211 catch (JPAExecutorException je) {
212 throw new CommandException(je);
213 }
214
215 wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString()));
216 wfJob.setEndTime(new Date());
217 wfJob.setWorkflowInstance(workflowInstance);
218 Status slaStatus = Status.SUCCEEDED;
219 switch (wfJob.getStatus()) {
220 case SUCCEEDED:
221 slaStatus = Status.SUCCEEDED;
222 break;
223 case KILLED:
224 slaStatus = Status.KILLED;
225 break;
226 case FAILED:
227 slaStatus = Status.FAILED;
228 break;
229 default: // TODO SUSPENDED
230 break;
231 }
232 SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId,
233 slaStatus, SlaAppType.WORKFLOW_JOB);
234 if(slaEvent != null) {
235 insertList.add(slaEvent);
236 }
237 queue(new NotificationXCommand(wfJob));
238 if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
239 InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation());
240 }
241
242 // output message for Kill node
243 if (wfAction != null) { // wfAction could be a no-op job
244 NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath());
245 if (nodeDef != null && nodeDef instanceof KillNodeDef) {
246 boolean isRetry = false;
247 boolean isUserRetry = false;
248 ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry,
249 isUserRetry);
250 try {
251 String tmpNodeConf = nodeDef.getConf();
252 String actionConf = context.getELEvaluator().evaluate(tmpNodeConf, String.class);
253 LOG.debug("Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], after resolve [{3}]",
254 jobId, actionId, tmpNodeConf, actionConf);
255 if (wfAction.getErrorCode() != null) {
256 wfAction.setErrorInfo(wfAction.getErrorCode(), actionConf);
257 }
258 else {
259 wfAction.setErrorInfo(ErrorCode.E0729.toString(), actionConf);
260 }
261 updateList.add(wfAction);
262 }
263 catch (Exception ex) {
264 LOG.warn("Exception in SignalXCommand ", ex.getMessage(), ex);
265 throw new CommandException(ErrorCode.E0729, wfAction.getName(), ex);
266 }
267 }
268 }
269
270 }
271 else {
272 for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) {
273 String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
274 + ReRunXCommand.TO_SKIP);
275 boolean skipNewAction = false;
276 if (skipVar != null) {
277 skipNewAction = skipVar.equals("true");
278 }
279 try {
280 if (skipNewAction) {
281 WorkflowActionBean oldAction;
282
283 oldAction = jpaService.execute(new WorkflowActionGetJPAExecutor(newAction.getId()));
284
285 oldAction.setPending();
286 updateList.add(oldAction);
287
288 queue(new SignalXCommand(jobId, oldAction.getId()));
289 }
290 else {
291 newAction.setPending();
292 String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp()
293 .getDefinition(), wfJob.getConf());
294 newAction.setSlaXml(actionSlaXml);
295 insertList.add(newAction);
296 LOG.debug("SignalXCommand: Name: "+ newAction.getName() + ", Id: " +newAction.getId() + ", Authcode:" + newAction.getCred());
297 queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
298 }
299 }
300 catch (JPAExecutorException je) {
301 throw new CommandException(je);
302 }
303 }
304 }
305
306 try {
307 wfJob.setLastModifiedTime(new Date());
308 updateList.add(wfJob);
309 // call JPAExecutor to do the bulk writes
310 jpaService.execute(new BulkUpdateInsertJPAExecutor(updateList, insertList));
311 }
312 catch (JPAExecutorException je) {
313 throw new CommandException(je);
314 }
315 LOG.debug(
316 "Updated the workflow status to " + wfJob.getId() + " status =" + wfJob.getStatusStr());
317 if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
318 // update coordinator action
319 new CoordActionUpdateXCommand(wfJob).call(); //Note: Called even if wf is not necessarily instantiated by coordinator
320 new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
321 }
322 LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
323 return null;
324 }
325
326 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
327 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
328 for (Map.Entry<String, String> entry : conf) {
329 eval.setVariable(entry.getKey(), entry.getValue());
330 }
331 return eval;
332 }
333
334 @SuppressWarnings("unchecked")
335 private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException {
336 String slaXml = null;
337 try {
338 Element eWfJob = XmlUtils.parseXml(wfXml);
339 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
340 if (action.getAttributeValue("name").equals(actionName) == false) {
341 continue;
342 }
343 Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
344 if (eSla != null) {
345 slaXml = XmlUtils.prettyPrint(eSla).toString();
346 break;
347 }
348 }
349 }
350 catch (Exception e) {
351 throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
352 }
353 return slaXml;
354 }
355
356 private String resolveSla(Element eSla, Configuration conf) throws CommandException {
357 String slaXml = null;
358 try {
359 ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
360 slaXml = SubmitXCommand.resolveSla(eSla, evalSla);
361 }
362 catch (Exception e) {
363 throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
364 }
365 return slaXml;
366 }
367
368 @SuppressWarnings("unchecked")
369 private void createSLARegistrationForAllActions(String wfXml, String user, String group, String strConf)
370 throws CommandException {
371 try {
372 Element eWfJob = XmlUtils.parseXml(wfXml);
373 Configuration conf = new XConfiguration(new StringReader(strConf));
374 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
375 Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
376 if (eSla != null) {
377 String slaXml = resolveSla(eSla, conf);
378 eSla = XmlUtils.parseXml(slaXml);
379 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId,
380 action.getAttributeValue("name") + "");
381 SLAEventBean slaEvent = SLADbXOperations.createSlaRegistrationEvent(eSla, actionId,
382 SlaAppType.WORKFLOW_ACTION, user, group);
383 if(slaEvent != null) {
384 insertList.add(slaEvent);
385 }
386 }
387 }
388 }
389 catch (Exception e) {
390 throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e.getMessage(), e);
391 }
392
393 }
394
395 }