001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.command.wf;
020
021import java.io.IOException;
022
023import org.apache.hadoop.conf.Configuration;
024import org.apache.oozie.action.ActionExecutor;
025import org.apache.oozie.action.control.ForkActionExecutor;
026import org.apache.oozie.action.control.StartActionExecutor;
027import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
028import org.apache.oozie.client.Job;
029import org.apache.oozie.client.WorkflowAction;
030import org.apache.oozie.client.WorkflowJob;
031import org.apache.oozie.client.SLAEvent.SlaAppType;
032import org.apache.oozie.client.SLAEvent.Status;
033import org.apache.oozie.client.rest.JsonBean;
034import org.apache.oozie.SLAEventBean;
035import org.apache.oozie.WorkflowActionBean;
036import org.apache.oozie.WorkflowJobBean;
037import org.apache.oozie.ErrorCode;
038import org.apache.oozie.XException;
039import org.apache.oozie.command.CommandException;
040import org.apache.oozie.command.PreconditionException;
041import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
042import org.apache.oozie.command.wf.ActionXCommand.ForkedActionExecutorContext;
043import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
044import org.apache.oozie.executor.jpa.BatchQueryExecutor;
045import org.apache.oozie.executor.jpa.JPAExecutorException;
046import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
047import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
048import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
049import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
050import org.apache.oozie.service.ActionService;
051import org.apache.oozie.service.CallableQueueService;
052import org.apache.oozie.service.CallableQueueService.CallableWrapper;
053import org.apache.oozie.service.ConfigurationService;
054import org.apache.oozie.service.ELService;
055import org.apache.oozie.service.EventHandlerService;
056import org.apache.oozie.service.JPAService;
057import org.apache.oozie.service.Services;
058import org.apache.oozie.service.UUIDService;
059import org.apache.oozie.service.WorkflowStoreService;
060import org.apache.oozie.workflow.WorkflowException;
061import org.apache.oozie.workflow.WorkflowInstance;
062import org.apache.oozie.workflow.lite.KillNodeDef;
063import org.apache.oozie.workflow.lite.NodeDef;
064import org.apache.oozie.util.ELEvaluator;
065import org.apache.oozie.util.InstrumentUtils;
066import org.apache.oozie.util.LogUtils;
067import org.apache.oozie.util.XConfiguration;
068import org.apache.oozie.util.ParamChecker;
069import org.apache.oozie.util.XmlUtils;
070import org.apache.oozie.util.db.SLADbXOperations;
071import org.jdom.Element;
072
073import java.io.StringReader;
074import java.util.ArrayList;
075import java.util.Date;
076import java.util.List;
077import java.util.Map;
078import java.util.concurrent.Future;
079
080import org.apache.oozie.client.OozieClient;
081
082@SuppressWarnings("deprecation")
083public class SignalXCommand extends WorkflowXCommand<Void> {
084
085    private JPAService jpaService = null;
086    private String jobId;
087    private String actionId;
088    private WorkflowJobBean wfJob;
089    private WorkflowActionBean wfAction;
090    private List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
091    private List<JsonBean> insertList = new ArrayList<JsonBean>();
092    private boolean generateEvent = false;
093    private String wfJobErrorCode;
094    private String wfJobErrorMsg;
095    public final static String FORK_PARALLEL_JOBSUBMISSION = "oozie.workflow.parallel.fork.action.start";
096
097    public SignalXCommand(String name, int priority, String jobId) {
098        super(name, name, priority);
099        this.jobId = ParamChecker.notEmpty(jobId, "jobId");
100    }
101
102    public SignalXCommand(String jobId, String actionId) {
103        this("signal", 1, jobId);
104        this.actionId = ParamChecker.notEmpty(actionId, "actionId");
105    }
106
107    @Override
108    protected void setLogInfo() {
109        if (jobId != null) {
110            LogUtils.setLogInfo(jobId);
111        }
112        else if (actionId !=null) {
113            LogUtils.setLogInfo(actionId);
114        }
115    }
116
117    @Override
118    protected boolean isLockRequired() {
119        return true;
120    }
121
122    @Override
123    public String getEntityKey() {
124        return this.jobId;
125    }
126
127    @Override
128    public String getKey() {
129        return getName() + "_" + jobId + "_" + actionId;
130    }
131
132    @Override
133    protected void loadState() throws CommandException {
134        try {
135            jpaService = Services.get().get(JPAService.class);
136            if (jpaService != null) {
137                this.wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId);
138                LogUtils.setLogInfo(wfJob);
139                if (actionId != null) {
140                    this.wfAction = WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_SIGNAL, actionId);
141                    LogUtils.setLogInfo(wfAction);
142                }
143            }
144            else {
145                throw new CommandException(ErrorCode.E0610);
146            }
147        }
148        catch (XException ex) {
149            throw new CommandException(ex);
150        }
151    }
152
153    @Override
154    protected void verifyPrecondition() throws CommandException, PreconditionException {
155        if ((wfAction == null) || (wfAction.isComplete() && wfAction.isPending())) {
156            if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.PREP) {
157                throw new PreconditionException(ErrorCode.E0813, wfJob.getStatusStr());
158            }
159        }
160        else {
161            throw new PreconditionException(ErrorCode.E0814, actionId, wfAction.getStatusStr(), wfAction.isPending());
162        }
163    }
164
165    @Override
166    protected Void execute() throws CommandException {
167
168        LOG.debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
169        WorkflowInstance workflowInstance = wfJob.getWorkflowInstance();
170        workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, wfJob);
171        WorkflowJob.Status prevStatus = wfJob.getStatus();
172        boolean completed = false, skipAction = false;
173        WorkflowActionBean syncAction = null;
174        List<WorkflowActionBean> workflowActionBeanListForForked = new ArrayList<WorkflowActionBean>();
175
176
177        if (wfAction == null) {
178            if (wfJob.getStatus() == WorkflowJob.Status.PREP) {
179                try {
180                    completed = workflowInstance.start();
181                }
182                catch (WorkflowException e) {
183                    throw new CommandException(e);
184                }
185                wfJob.setStatus(WorkflowJob.Status.RUNNING);
186                wfJob.setStartTime(new Date());
187                wfJob.setWorkflowInstance(workflowInstance);
188                generateEvent = true;
189                // 1. Add SLA status event for WF-JOB with status STARTED
190                SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId, Status.STARTED,
191                        SlaAppType.WORKFLOW_JOB);
192                if (slaEvent != null) {
193                    insertList.add(slaEvent);
194                }
195                // 2. Add SLA registration events for all WF_ACTIONS
196                createSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), wfJob.getUser(),
197                        wfJob.getGroup(), wfJob.getConf());
198                queue(new WorkflowNotificationXCommand(wfJob));
199            }
200            else {
201                throw new CommandException(ErrorCode.E0801, wfJob.getId());
202            }
203        }
204        else {
205            WorkflowInstance.Status initialStatus = workflowInstance.getStatus();
206            String skipVar = workflowInstance.getVar(wfAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
207                    + ReRunXCommand.TO_SKIP);
208            if (skipVar != null) {
209                skipAction = skipVar.equals("true");
210            }
211            try {
212                completed = workflowInstance.signal(wfAction.getExecutionPath(), wfAction.getSignalValue());
213            }
214            catch (WorkflowException e) {
215               LOG.error("Workflow action failed : " + e.getMessage(), e);
216                wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString()));
217                completed = true;
218            }
219            wfJob.setWorkflowInstance(workflowInstance);
220            wfAction.resetPending();
221            if (!skipAction) {
222                wfAction.setTransition(workflowInstance.getTransition(wfAction.getName()));
223                queue(new WorkflowNotificationXCommand(wfJob, wfAction));
224            }
225            updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING_TRANS,
226                    wfAction));
227            WorkflowInstance.Status endStatus = workflowInstance.getStatus();
228            if (endStatus != initialStatus) {
229                generateEvent = true;
230            }
231        }
232
233        if (completed) {
234            try {
235                for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) {
236                    WorkflowActionBean actionToKill;
237
238                    actionToKill = WorkflowActionQueryExecutor.getInstance().get(
239                            WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK, actionToKillId);
240
241                    actionToKill.setPending();
242                    actionToKill.setStatus(WorkflowActionBean.Status.KILLED);
243                    updateList.add(new UpdateEntry<WorkflowActionQuery>(
244                            WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, actionToKill));
245                    queue(new ActionKillXCommand(actionToKill.getId(), actionToKill.getType()));
246                }
247
248                for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) {
249                    WorkflowActionBean actionToFail = WorkflowActionQueryExecutor.getInstance().get(
250                            WorkflowActionQuery.GET_ACTION_FAIL, actionToFailId);
251                    actionToFail.resetPending();
252                    actionToFail.setStatus(WorkflowActionBean.Status.FAILED);
253                    if (wfJobErrorCode != null) {
254                        wfJobErrorCode = actionToFail.getErrorCode();
255                        wfJobErrorMsg = actionToFail.getErrorMessage();
256                    }
257                    queue(new WorkflowNotificationXCommand(wfJob, actionToFail));
258                    SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(),
259                            Status.FAILED, SlaAppType.WORKFLOW_ACTION);
260                    if (slaEvent != null) {
261                        insertList.add(slaEvent);
262                    }
263                    updateList.add(new UpdateEntry<WorkflowActionQuery>(
264                            WorkflowActionQuery.UPDATE_ACTION_STATUS_PENDING, actionToFail));
265                }
266            }
267            catch (JPAExecutorException je) {
268                throw new CommandException(je);
269            }
270
271            wfJob.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString()));
272            wfJob.setEndTime(new Date());
273            wfJob.setWorkflowInstance(workflowInstance);
274            Status slaStatus = Status.SUCCEEDED;
275            switch (wfJob.getStatus()) {
276                case SUCCEEDED:
277                    slaStatus = Status.SUCCEEDED;
278                    break;
279                case KILLED:
280                    slaStatus = Status.KILLED;
281                    break;
282                case FAILED:
283                    slaStatus = Status.FAILED;
284                    break;
285                default: // TODO SUSPENDED
286                    break;
287            }
288            SLAEventBean slaEvent = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), jobId, slaStatus,
289                    SlaAppType.WORKFLOW_JOB);
290            if (slaEvent != null) {
291                insertList.add(slaEvent);
292            }
293            queue(new WorkflowNotificationXCommand(wfJob));
294            if (wfJob.getStatus() == WorkflowJob.Status.SUCCEEDED) {
295                InstrumentUtils.incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1, getInstrumentation());
296            }
297
298            // output message for Kill node
299            if (wfAction != null) { // wfAction could be a no-op job
300                NodeDef nodeDef = workflowInstance.getNodeDef(wfAction.getExecutionPath());
301                if (nodeDef != null && nodeDef instanceof KillNodeDef) {
302                    boolean isRetry = false;
303                    boolean isUserRetry = false;
304                    ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry,
305                            isUserRetry);
306                    InstrumentUtils.incrJobCounter(INSTR_KILLED_JOBS_COUNTER_NAME, 1, getInstrumentation());
307                    try {
308                        String tmpNodeConf = nodeDef.getConf();
309                        String message = context.getELEvaluator().evaluate(tmpNodeConf, String.class);
310                        LOG.debug(
311                                "Try to resolve KillNode message for jobid [{0}], actionId [{1}], before resolve [{2}], "
312                                        + "after resolve [{3}]", jobId, actionId, tmpNodeConf, message);
313                        if (wfAction.getErrorCode() != null) {
314                            wfAction.setErrorInfo(wfAction.getErrorCode(), message);
315                        }
316                        else {
317                            wfAction.setErrorInfo(ErrorCode.E0729.toString(), message);
318                        }
319                    }
320                    catch (Exception ex) {
321                        LOG.warn("Exception in SignalXCommand when processing Kill node message: {0}", ex.getMessage(), ex);
322                        wfAction.setErrorInfo(ErrorCode.E0756.toString(), ErrorCode.E0756.format(ex.getMessage()));
323                        wfAction.setStatus(WorkflowAction.Status.ERROR);
324                    }
325                    updateList.add(new UpdateEntry<WorkflowActionQuery>(
326                            WorkflowActionQuery.UPDATE_ACTION_PENDING_TRANS_ERROR, wfAction));
327                }
328            }
329
330        }
331        else {
332            for (WorkflowActionBean newAction : WorkflowStoreService.getActionsToStart(workflowInstance)) {
333                boolean isOldWFAction = false;
334
335                // In case of subworkflow rerun when failed option have been provided, rerun command do not delete
336                // old action. To avoid twice entry for same action, Checking in Db if the workflow action already exist.
337                if(SubWorkflowActionExecutor.ACTION_TYPE.equals(newAction.getType())) {
338                    try {
339                        WorkflowActionBean oldAction = WorkflowActionQueryExecutor.getInstance()
340                                .get(WorkflowActionQuery.GET_ACTION_CHECK,
341                                        newAction.getId());
342                        newAction.setExternalId(oldAction.getExternalId());
343                        newAction.setCreatedTime(oldAction.getCreatedTime());
344                        isOldWFAction = true;
345                    } catch (JPAExecutorException e) {
346                        if(e.getErrorCode() != ErrorCode.E0605) {
347                            throw new CommandException(e);
348                        }
349                    }
350                }
351
352                String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
353                        + ReRunXCommand.TO_SKIP);
354                boolean skipNewAction = false, suspendNewAction = false;
355                if (skipVar != null) {
356                    skipNewAction = skipVar.equals("true");
357                }
358
359                if (skipNewAction) {
360                    WorkflowActionBean oldAction = new WorkflowActionBean();
361                    oldAction.setId(newAction.getId());
362                    oldAction.setPending();
363                    oldAction.setExecutionPath(newAction.getExecutionPath());
364                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_PENDING,
365                            oldAction));
366                    queue(new SignalXCommand(jobId, oldAction.getId()));
367                }
368                else {
369                    if(!skipAction) {
370                        try {
371                            // Make sure that transition node for a forked action
372                            // is inserted only once
373                            WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK,
374                                    newAction.getId());
375
376                            continue;
377                        } catch (JPAExecutorException jee) {
378                        }
379                    }
380                    suspendNewAction = checkForSuspendNode(newAction);
381                    newAction.setPending();
382                    String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp()
383                            .getDefinition(), wfJob.getConf());
384                    newAction.setSlaXml(actionSlaXml);
385                    if(!isOldWFAction) {
386                        newAction.setCreatedTime(new Date());
387                        insertList.add(newAction);
388                    } else {
389                        updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START,
390                                newAction));
391                    }
392                    LOG.debug("SignalXCommand: Name: " + newAction.getName() + ", Id: " + newAction.getId()
393                            + ", Authcode:" + newAction.getCred());
394                    if (wfAction != null) { // null during wf job submit
395                        ActionService as = Services.get().get(ActionService.class);
396                        ActionExecutor current = as.getExecutor(wfAction.getType());
397                        LOG.trace("Current Action Type:" + current.getClass());
398                        if (!suspendNewAction) {
399                            if (current instanceof StartActionExecutor) {
400                                // Excluding :start: here from executing first action synchronously since it
401                                // blocks the consumer thread till the action is submitted to Hadoop,
402                                // in turn reducing the number of new submissions the threads can accept.
403                                // Would also be susceptible to longer delays in case Hadoop cluster is busy.
404                                queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
405                            }
406                            else if (current instanceof ForkActionExecutor) {
407                                if (ConfigurationService.getBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION)) {
408                                    workflowActionBeanListForForked.add(newAction);
409                                }
410                                else {
411                                    queue(new ActionStartXCommand(newAction.getId(), newAction.getType()));
412
413                                }
414                            }
415                            else {
416                                syncAction = newAction;
417                            }
418                        }
419                        else {
420                            // suspend check will happen later... where if one of action is suspended all forked action
421                            // will be ignored.
422                            if (ConfigurationService.getBoolean(SignalXCommand.FORK_PARALLEL_JOBSUBMISSION)) {
423                                workflowActionBeanListForForked.add(newAction);
424                            }
425                        }
426                    }
427                    else {
428                        syncAction = newAction; // first action after wf submit should always be sync
429                    }
430                }
431            }
432        }
433
434        try {
435            wfJob.setLastModifiedTime(new Date());
436            updateList.add(new UpdateEntry<WorkflowJobQuery>(
437                    WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob));
438            // call JPAExecutor to do the bulk writes
439            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
440            if (prevStatus != wfJob.getStatus()) {
441                LOG.debug("Updated the workflow status to " + wfJob.getId() + "  status =" + wfJob.getStatusStr());
442            }
443            if (generateEvent && EventHandlerService.isEnabled()) {
444                generateEvent(wfJob, wfJobErrorCode, wfJobErrorMsg);
445            }
446        }
447        catch (JPAExecutorException je) {
448            throw new CommandException(je);
449        }
450        // Changing to synchronous call from asynchronous queuing to prevent
451        // undue delay from between end of previous and start of next action
452        if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
453            // only for asynchronous actions, parent coord action's external id will
454            // persisted and following update will succeed.
455            updateParentIfNecessary(wfJob);
456            new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
457        }
458        else if (syncAction != null) {
459            new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call();
460        }
461        else if (!workflowActionBeanListForForked.isEmpty() && !checkForSuspendNode(workflowActionBeanListForForked)) {
462            startForkedActions(workflowActionBeanListForForked);
463        }
464        LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
465        return null;
466    }
467
468    public void startForkedActions(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException {
469
470        List<CallableWrapper<ActionExecutorContext>> tasks = new ArrayList<CallableWrapper<ActionExecutorContext>>();
471        List<UpdateEntry> updateList = new ArrayList<UpdateEntry>();
472        List<JsonBean> insertList = new ArrayList<JsonBean>();
473
474        boolean endWorkflow = false;
475        boolean submitJobByQueuing = false;
476        for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) {
477            LOG.debug("Starting forked actions parallely : " + workflowActionBean.getId());
478            tasks.add(Services.get().get(CallableQueueService.class).new CallableWrapper<ActionExecutorContext>(
479                    new ForkedActionStartXCommand(wfJob, workflowActionBean.getId(), workflowActionBean.getType()), 0));
480        }
481
482        try {
483            List<Future<ActionExecutorContext>> futures = Services.get().get(CallableQueueService.class)
484                    .invokeAll(tasks);
485            for (Future<ActionExecutorContext> result : futures) {
486                if (result == null) {
487                    submitJobByQueuing = true;
488                    continue;
489                }
490                ActionExecutorContext context = result.get();
491                Map<String, String> contextVariableMap = ((ForkedActionExecutorContext) context).getContextMap();
492                LOG.debug("contextVariableMap size of action " + context.getAction().getId() + " is " + contextVariableMap.size());
493                for (String key : contextVariableMap.keySet()) {
494                    context.setVarToWorkflow(key, contextVariableMap.get(key));
495                }
496                if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.FAILED)) {
497                    LOG.warn("Action has failed, failing job" + context.getAction().getId());
498                    new ActionStartXCommand(context.getAction().getId(), null).failJob(context);
499                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START,
500                            (WorkflowActionBean) context.getAction()));
501                    if (context.isShouldEndWF()) {
502                        endWorkflow = true;
503                    }
504                }
505                if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.SUSPENDED)) {
506                    LOG.warn("Action has failed, failing job" + context.getAction().getId());
507                    new ActionStartXCommand(context.getAction().getId(), null).handleNonTransient(context, null,
508                            WorkflowAction.Status.START_MANUAL);
509                    updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START,
510                            (WorkflowActionBean) context.getAction()));
511                    if (context.isShouldEndWF()) {
512                        endWorkflow = true;
513                    }
514                }
515            }
516            if (endWorkflow) {
517                endWF(insertList);
518            }
519
520        }
521        catch (Exception e) {
522            LOG.error("Error running forked jobs parallely", e);
523            startForkedActionsByQueuing(workflowActionBeanListForForked);
524            submitJobByQueuing = false;
525        }
526        if (submitJobByQueuing && !endWorkflow) {
527            LOG.error("There is error in running forked jobs parallely");
528            startForkedActionsByQueuing(workflowActionBeanListForForked);
529        }
530        wfJob.setLastModifiedTime(new Date());
531        updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED,
532                wfJob));
533        try {
534            BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null);
535        }
536        catch (JPAExecutorException e) {
537            throw new CommandException(e);
538        }
539
540        LOG.debug("forked actions submitted parallely");
541    }
542
543    public void startForkedActionsByQueuing(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException {
544        //queuing all jobs, submitted job will fail in precondition
545        for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) {
546            LOG.debug("Queuing fork action " + workflowActionBean.getId());
547            queue(new ActionStartXCommand(workflowActionBean.getId(), workflowActionBean.getType()));
548        }
549    }
550
551    private void endWF(List<JsonBean> insertList) throws CommandException {
552        updateParentIfNecessary(wfJob, 3);
553        new WfEndXCommand(wfJob).call(); // To delete the WF temp dir
554        SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED,
555                SlaAppType.WORKFLOW_JOB);
556        if (slaEvent2 != null) {
557            insertList.add(slaEvent2);
558        }
559    }
560
561    public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
562        ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
563        for (Map.Entry<String, String> entry : conf) {
564            eval.setVariable(entry.getKey(), entry.getValue());
565        }
566        return eval;
567    }
568
569    @SuppressWarnings("unchecked")
570    private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException {
571        String slaXml = null;
572        try {
573            Element eWfJob = XmlUtils.parseXml(wfXml);
574            for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
575                if (action.getAttributeValue("name").equals(actionName) == false) {
576                    continue;
577                }
578                Element eSla = XmlUtils.getSLAElement(action);
579                if (eSla != null) {
580                    slaXml = XmlUtils.prettyPrint(eSla).toString();
581                    break;
582                }
583            }
584        }
585        catch (Exception e) {
586            throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
587        }
588        return slaXml;
589    }
590
591    private String resolveSla(Element eSla, Configuration conf) throws CommandException {
592        String slaXml = null;
593        try {
594            ELEvaluator evalSla = SubmitXCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
595            slaXml = SubmitXCommand.resolveSla(eSla, evalSla);
596        }
597        catch (Exception e) {
598            throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
599        }
600        return slaXml;
601    }
602
603    @SuppressWarnings("unchecked")
604    private void createSLARegistrationForAllActions(String wfXml, String user, String group, String strConf)
605            throws CommandException {
606        try {
607            Element eWfJob = XmlUtils.parseXml(wfXml);
608            Configuration conf = new XConfiguration(new StringReader(strConf));
609            for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
610                Element eSla = XmlUtils.getSLAElement(action);
611                if (eSla != null) {
612                    String slaXml = resolveSla(eSla, conf);
613                    eSla = XmlUtils.parseXml(slaXml);
614                    String actionId = Services.get().get(UUIDService.class)
615                            .generateChildId(jobId, action.getAttributeValue("name") + "");
616                    SLAEventBean slaEvent = SLADbXOperations.createSlaRegistrationEvent(eSla, actionId,
617                            SlaAppType.WORKFLOW_ACTION, user, group);
618                    if (slaEvent != null) {
619                        insertList.add(slaEvent);
620                    }
621                }
622            }
623        }
624        catch (Exception e) {
625            throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e.getMessage(), e);
626        }
627
628    }
629
630    private boolean checkForSuspendNode(WorkflowActionBean newAction) {
631        boolean suspendNewAction = false;
632        try {
633            XConfiguration wfjobConf = new XConfiguration(new StringReader(wfJob.getConf()));
634            String[] values = wfjobConf.getTrimmedStrings(OozieClient.OOZIE_SUSPEND_ON_NODES);
635            if (values != null) {
636                if (values.length == 1 && values[0].equals("*")) {
637                    LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(),
638                            wfJob.getId());
639                    queue(new SuspendXCommand(jobId));
640                    suspendNewAction = true;
641                }
642                else {
643                    for (String suspendPoint : values) {
644                        if (suspendPoint.equals(newAction.getName())) {
645                            LOG.info("Reached suspend node at [{0}], suspending workflow [{1}]", newAction.getName(),
646                                    wfJob.getId());
647                            queue(new SuspendXCommand(jobId));
648                            suspendNewAction = true;
649                            break;
650                        }
651                    }
652                }
653            }
654        }
655        catch (IOException ex) {
656            LOG.warn("Error reading " + OozieClient.OOZIE_SUSPEND_ON_NODES + ", ignoring [{0}]", ex.getMessage());
657        }
658        return suspendNewAction;
659    }
660
661
662
663private boolean checkForSuspendNode(List<WorkflowActionBean> workflowActionBeanListForForked) {
664    for(WorkflowActionBean bean :workflowActionBeanListForForked)
665        if(checkForSuspendNode(bean)){
666            return true;
667        }
668    return false;
669}
670
671
672}