This project has retired. For details please refer to its
Attic page.
001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements. See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership. The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License. You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018 package org.apache.oozie.command.wf;
019
020 import org.apache.hadoop.conf.Configuration;
021 import org.apache.oozie.client.CoordinatorAction;
022 import org.apache.oozie.client.WorkflowJob;
023 import org.apache.oozie.client.SLAEvent.SlaAppType;
024 import org.apache.oozie.client.SLAEvent.Status;
025 import org.apache.oozie.CoordinatorActionBean;
026 import org.apache.oozie.WorkflowActionBean;
027 import org.apache.oozie.WorkflowJobBean;
028 import org.apache.oozie.ErrorCode;
029 import org.apache.oozie.XException;
030 import org.apache.oozie.command.CommandException;
031 import org.apache.oozie.command.coord.CoordActionReadyCommand;
032 import org.apache.oozie.command.coord.CoordActionUpdateCommand;
033 import org.apache.oozie.coord.CoordELFunctions;
034 import org.apache.oozie.coord.CoordinatorJobException;
035 import org.apache.oozie.service.ELService;
036 import org.apache.oozie.service.SchemaService;
037 import org.apache.oozie.service.Services;
038 import org.apache.oozie.service.StoreService;
039 import org.apache.oozie.service.UUIDService;
040 import org.apache.oozie.service.WorkflowStoreService;
041 import org.apache.oozie.store.CoordinatorStore;
042 import org.apache.oozie.store.StoreException;
043 import org.apache.oozie.store.WorkflowStore;
044 import org.apache.oozie.workflow.WorkflowException;
045 import org.apache.oozie.workflow.WorkflowInstance;
046 import org.apache.oozie.util.ELEvaluator;
047 import org.apache.oozie.util.XConfiguration;
048 import org.apache.oozie.util.XLog;
049 import org.apache.oozie.util.ParamChecker;
050 import org.apache.oozie.util.XmlUtils;
051 import org.apache.oozie.util.db.SLADbOperations;
052 import org.apache.openjpa.lib.log.Log;
053 import org.jdom.Element;
054 import org.jdom.JDOMException;
055 import org.jdom.Namespace;
056
057 import java.io.StringReader;
058 import java.util.Date;
059 import java.util.List;
060 import java.util.Map;
061
062 public class SignalCommand extends WorkflowCommand<Void> {
063
064 protected static final String INSTR_SUCCEEDED_JOBS_COUNTER_NAME = "succeeded";
065
066 private String jobId;
067 private String actionId;
068
069 protected SignalCommand(String name, int priority, String jobId) {
070 super(name, name, priority, XLog.STD);
071 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
072 }
073
074 public SignalCommand(String jobId, String actionId) {
075 super("signal", "signal", 1, XLog.STD);
076 this.jobId = ParamChecker.notEmpty(jobId, "jobId");
077 this.actionId = ParamChecker.notEmpty(actionId, "actionId");
078 }
079
080 @Override
081 protected Void call(WorkflowStore store) throws CommandException, StoreException {
082
083 WorkflowJobBean workflow = store.getWorkflow(jobId, false);
084 setLogInfo(workflow);
085 WorkflowActionBean action = null;
086 boolean skipAction = false;
087 if (actionId != null) {
088 action = store.getAction(actionId, false);
089 setLogInfo(action);
090 }
091 if ((action == null) || (action.isComplete() && action.isPending())) {
092 try {
093 if (workflow.getStatus() == WorkflowJob.Status.RUNNING
094 || workflow.getStatus() == WorkflowJob.Status.PREP) {
095 WorkflowInstance workflowInstance = workflow.getWorkflowInstance();
096 workflowInstance.setTransientVar(WorkflowStoreService.WORKFLOW_BEAN, workflow);
097 boolean completed;
098 if (action == null) {
099 if (workflow.getStatus() == WorkflowJob.Status.PREP) {
100 completed = workflowInstance.start();
101 workflow.setStatus(WorkflowJob.Status.RUNNING);
102 workflow.setStartTime(new Date());
103 workflow.setWorkflowInstance(workflowInstance);
104 // 1. Add SLA status event for WF-JOB with status
105 // STARTED
106 // 2. Add SLA registration events for all WF_ACTIONS
107 SLADbOperations.writeStausEvent(workflow.getSlaXml(), jobId, store, Status.STARTED,
108 SlaAppType.WORKFLOW_JOB);
109 writeSLARegistrationForAllActions(workflowInstance.getApp().getDefinition(), workflow
110 .getUser(), workflow.getGroup(), workflow.getConf(), store);
111 queueCallable(new NotificationCommand(workflow));
112 }
113 else {
114 throw new CommandException(ErrorCode.E0801, workflow.getId());
115 }
116 }
117 else {
118 String skipVar = workflowInstance.getVar(action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR
119 + ReRunCommand.TO_SKIP);
120 if (skipVar != null) {
121 skipAction = skipVar.equals("true");
122 }
123 completed = workflowInstance.signal(action.getExecutionPath(), action.getSignalValue());
124 workflow.setWorkflowInstance(workflowInstance);
125 action.resetPending();
126 if (!skipAction) {
127 action.setTransition(workflowInstance.getTransition(action.getName()));
128 }
129 store.updateAction(action);
130 }
131
132 if (completed) {
133 for (String actionToKillId : WorkflowStoreService.getActionsToKill(workflowInstance)) {
134 WorkflowActionBean actionToKill = store.getAction(actionToKillId, false);
135 actionToKill.setPending();
136 actionToKill.setStatus(WorkflowActionBean.Status.KILLED);
137 store.updateAction(actionToKill);
138 queueCallable(new ActionKillCommand(actionToKill.getId(), actionToKill.getType()));
139 }
140
141 for (String actionToFailId : WorkflowStoreService.getActionsToFail(workflowInstance)) {
142 WorkflowActionBean actionToFail = store.getAction(actionToFailId, false);
143 actionToFail.resetPending();
144 actionToFail.setStatus(WorkflowActionBean.Status.FAILED);
145 SLADbOperations.writeStausEvent(action.getSlaXml(), action.getId(), store, Status.FAILED,
146 SlaAppType.WORKFLOW_ACTION);
147 store.updateAction(actionToFail);
148 }
149
150 workflow.setStatus(WorkflowJob.Status.valueOf(workflowInstance.getStatus().toString()));
151 workflow.setEndTime(new Date());
152 workflow.setWorkflowInstance(workflowInstance);
153 Status slaStatus = Status.SUCCEEDED;
154 switch (workflow.getStatus()) {
155 case SUCCEEDED:
156 slaStatus = Status.SUCCEEDED;
157 break;
158 case KILLED:
159 slaStatus = Status.KILLED;
160 break;
161 case FAILED:
162 slaStatus = Status.FAILED;
163 break;
164 default: // TODO about SUSPENDED
165
166 }
167 SLADbOperations.writeStausEvent(workflow.getSlaXml(), jobId, store, slaStatus,
168 SlaAppType.WORKFLOW_JOB);
169 queueCallable(new NotificationCommand(workflow));
170 if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) {
171 incrJobCounter(INSTR_SUCCEEDED_JOBS_COUNTER_NAME, 1);
172 }
173 }
174 else {
175 for (WorkflowActionBean newAction : WorkflowStoreService.getStartedActions(workflowInstance)) {
176 String skipVar = workflowInstance.getVar(newAction.getName()
177 + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunCommand.TO_SKIP);
178 boolean skipNewAction = false;
179 if (skipVar != null) {
180 skipNewAction = skipVar.equals("true");
181 }
182 if (skipNewAction) {
183 WorkflowActionBean oldAction = store.getAction(newAction.getId(), false);
184 oldAction.setPending();
185 store.updateAction(oldAction);
186 queueCallable(new SignalCommand(jobId, oldAction.getId()));
187 }
188 else {
189 newAction.setPending();
190 String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp()
191 .getDefinition(), workflow.getConf());
192 // System.out.println("111111 actionXml " +
193 // actionSlaXml);
194 // newAction.setSlaXml(workflow.getSlaXml());
195 newAction.setSlaXml(actionSlaXml);
196 XLog.getLog(getClass()).debug("SignalCOmmand: Name: "+ newAction.getName() +"Id: " +newAction.getId()+ " Authcode:" + newAction.getCred());
197 store.insertAction(newAction);
198 queueCallable(new ActionStartCommand(newAction.getId(), newAction.getType()));
199 }
200 }
201 }
202
203 store.updateWorkflow(workflow);
204 XLog.getLog(getClass()).debug(
205 "Updated the workflow status to " + workflow.getId() + " status ="
206 + workflow.getStatusStr());
207 if (workflow.getStatus() != WorkflowJob.Status.RUNNING
208 && workflow.getStatus() != WorkflowJob.Status.SUSPENDED) {
209 queueCallable(new CoordActionUpdateCommand(workflow));
210 }
211 }
212 else {
213 XLog.getLog(getClass()).warn("Workflow not RUNNING, current status [{0}]", workflow.getStatus());
214 }
215 }
216 catch (WorkflowException ex) {
217 throw new CommandException(ex);
218 }
219 }
220 else {
221 XLog.getLog(getClass()).warn(
222 "SignalCommand for action id :" + actionId + " is already processed. status=" + action.getStatus()
223 + ", Pending=" + action.isPending());
224 }
225 return null;
226 }
227
228 public static ELEvaluator createELEvaluatorForGroup(Configuration conf, String group) {
229 ELEvaluator eval = Services.get().get(ELService.class).createEvaluator(group);
230 for (Map.Entry<String, String> entry : conf) {
231 eval.setVariable(entry.getKey(), entry.getValue());
232 }
233 return eval;
234 }
235
236 private String getActionSLAXml(String actionName, String wfXml, String wfConf) throws CommandException {
237 String slaXml = null;
238 // TODO need to fill-out the code
239 // Get the appropriate action:slaXml and resolve that.
240 try {
241 // Configuration conf = new XConfiguration(new
242 // StringReader(wfConf));
243 Element eWfJob = XmlUtils.parseXml(wfXml);
244 // String prefix = XmlUtils.getNamespacePrefix(eWfJob,
245 // SchemaService.SLA_NAME_SPACE_URI);
246 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
247 if (action.getAttributeValue("name").equals(actionName) == false) {
248 continue;
249 }
250 Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
251 if (eSla != null) {
252 // resolveSla(eSla, conf);
253 slaXml = XmlUtils.prettyPrint(eSla).toString();// Could use
254 // any
255 // non-null
256 // string
257 break;
258 }
259 }
260 }
261 catch (Exception e) {
262 throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
263 }
264 return slaXml;
265 }
266
267 private String resolveSla(Element eSla, Configuration conf) throws CommandException {
268 String slaXml = null;
269 try {
270 ELEvaluator evalSla = SubmitCommand.createELEvaluatorForGroup(conf, "wf-sla-submit");
271 slaXml = SubmitCommand.resolveSla(eSla, evalSla);
272 }
273 catch (Exception e) {
274 throw new CommandException(ErrorCode.E1004, e.getMessage(), e);
275 }
276 return slaXml;
277 }
278
279 private void writeSLARegistrationForAllActions(String wfXml, String user, String group, String strConf,
280 WorkflowStore store) throws CommandException {
281 try {
282 Element eWfJob = XmlUtils.parseXml(wfXml);
283 // String prefix = XmlUtils.getNamespacePrefix(eWfJob,
284 // SchemaService.SLA_NAME_SPACE_URI);
285 Configuration conf = new XConfiguration(new StringReader(strConf));
286 for (Element action : (List<Element>) eWfJob.getChildren("action", eWfJob.getNamespace())) {
287 Element eSla = action.getChild("info", Namespace.getNamespace(SchemaService.SLA_NAME_SPACE_URI));
288 if (eSla != null) {
289 String slaXml = resolveSla(eSla, conf);
290 eSla = XmlUtils.parseXml(slaXml);
291 String actionId = Services.get().get(UUIDService.class).generateChildId(jobId,
292 action.getAttributeValue("name") + "");
293 SLADbOperations.writeSlaRegistrationEvent(eSla, store, actionId, SlaAppType.WORKFLOW_ACTION, user,
294 group);
295 }
296 }
297 }
298 catch (Exception e) {
299 throw new CommandException(ErrorCode.E1007, "workflow:Actions " + jobId, e);
300 }
301
302 }
303
304 @Override
305 protected Void execute(WorkflowStore store) throws CommandException, StoreException {
306 XLog.getLog(getClass()).debug("STARTED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
307 try {
308 if (lock(jobId)) {
309 call(store);
310 }
311 else {
312 queueCallable(new SignalCommand(jobId, actionId), LOCK_FAILURE_REQUEUE_INTERVAL);
313 XLog.getLog(getClass()).warn("SignalCommand lock was not acquired - failed {0}", jobId);
314 }
315 }
316 catch (InterruptedException e) {
317 queueCallable(new SignalCommand(jobId, actionId), LOCK_FAILURE_REQUEUE_INTERVAL);
318 XLog.getLog(getClass()).warn("SignalCommand lock not acquired - interrupted exception failed {0}", jobId);
319 }
320 XLog.getLog(getClass()).debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId);
321 return null;
322 }
323 }