001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import java.io.DataInput; 021 import java.io.DataOutput; 022 import java.io.IOException; 023 import java.sql.Timestamp; 024 import java.util.Date; 025 import java.util.Properties; 026 027 import javax.persistence.Basic; 028 import javax.persistence.Column; 029 import javax.persistence.Entity; 030 import javax.persistence.Lob; 031 import javax.persistence.NamedQueries; 032 import javax.persistence.NamedQuery; 033 import javax.persistence.Transient; 034 035 import org.apache.hadoop.io.Writable; 036 import org.apache.oozie.client.WorkflowAction; 037 import org.apache.oozie.client.rest.JsonWorkflowAction; 038 import org.apache.oozie.util.DateUtils; 039 import org.apache.oozie.util.ParamChecker; 040 import org.apache.oozie.util.PropertiesUtils; 041 import org.apache.oozie.util.WritableUtils; 042 import org.apache.openjpa.persistence.jdbc.Index; 043 044 /** 045 * Bean that contains all the information to start an action for a workflow node. 046 */ 047 @Entity 048 @NamedQueries({ 049 050 @NamedQuery(name = "UPDATE_ACTION", query = "update WorkflowActionBean a set a.conf = :conf, a.consoleUrl = :consoleUrl, a.data = :data, a.errorCode = :errorCode, a.errorMessage = :errorMessage, a.externalId = :externalId, a.externalStatus = :externalStatus, a.name = :name, a.cred = :cred , a.retries = :retries, a.trackerUri = :trackerUri, a.transition = :transition, a.type = :type, a.endTimestamp = :endTime, a.executionPath = :executionPath, a.lastCheckTimestamp = :lastCheckTime, a.logToken = :logToken, a.pending = :pending, a.pendingAgeTimestamp = :pendingAge, a.signalValue = :signalValue, a.slaXml = :slaXml, a.startTimestamp = :startTime, a.status = :status, a.wfId=:wfId where a.id = :id"), 051 052 @NamedQuery(name = "DELETE_ACTION", query = "delete from WorkflowActionBean a where a.id = :id"), 053 054 @NamedQuery(name = "DELETE_ACTIONS_FOR_WORKFLOW", query = "delete from WorkflowActionBean a where a.wfId = :wfId"), 055 056 @NamedQuery(name = "GET_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a"), 057 058 @NamedQuery(name = "GET_ACTION", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"), 059 060 @NamedQuery(name = "GET_ACTION_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"), 061 062 @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"), 063 064 @NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"), 065 066 @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.status <> 'RUNNING'"), 067 068 @NamedQuery(name = "GET_RUNNING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.status = 'RUNNING' AND a.lastCheckTimestamp < :lastCheckTime"), 069 070 @NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId AND (a.status = 'START_RETRY' OR a.status = 'START_MANUAL' OR a.status = 'END_RETRY' OR a.status = 'END_MANUAL')") }) 071 072 public class WorkflowActionBean extends JsonWorkflowAction implements Writable { 073 074 @Basic 075 @Index 076 @Column(name = "wf_id") 077 private String wfId = null; 078 079 @Basic 080 @Index 081 @Column(name = "status") 082 private String status = WorkflowAction.Status.PREP.toString(); 083 084 @Basic 085 @Column(name = "last_check_time") 086 private java.sql.Timestamp lastCheckTimestamp; 087 088 @Basic 089 @Column(name = "end_time") 090 private java.sql.Timestamp endTimestamp = null; 091 092 @Basic 093 @Column(name = "start_time") 094 private java.sql.Timestamp startTimestamp = null; 095 096 @Basic 097 @Column(name = "execution_path", length = 1024) 098 private String executionPath = null; 099 100 @Basic 101 @Column(name = "pending") 102 private int pending = 0; 103 104 // @Temporal(TemporalType.TIME) 105 // @Column(name="pending_age",columnDefinition="timestamp default '0000-00-00 00:00:00'") 106 @Basic 107 @Index 108 @Column(name = "pending_age") 109 private java.sql.Timestamp pendingAgeTimestamp = null; 110 111 @Basic 112 @Column(name = "signal_value") 113 private String signalValue = null; 114 115 @Basic 116 @Column(name = "log_token") 117 private String logToken = null; 118 119 @Transient 120 private Date pendingAge; 121 122 @Column(name = "sla_xml") 123 @Lob 124 private String slaXml = null; 125 126 /** 127 * Default constructor. 128 */ 129 public WorkflowActionBean() { 130 } 131 132 /** 133 * Serialize the action bean to a data output. 134 * 135 * @param dataOutput data output. 136 * @throws IOException thrown if the action bean could not be serialized. 137 */ 138 139 public void write(DataOutput dataOutput) throws IOException { 140 WritableUtils.writeStr(dataOutput, getId()); 141 WritableUtils.writeStr(dataOutput, getName()); 142 WritableUtils.writeStr(dataOutput, getCred()); 143 WritableUtils.writeStr(dataOutput, getType()); 144 WritableUtils.writeStr(dataOutput, getConf()); 145 WritableUtils.writeStr(dataOutput, getStatusStr()); 146 dataOutput.writeInt(getRetries()); 147 dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1); 148 dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1); 149 dataOutput.writeLong((getLastCheckTime() != null) ? getLastCheckTime().getTime() : -1); 150 WritableUtils.writeStr(dataOutput, getTransition()); 151 WritableUtils.writeStr(dataOutput, getData()); 152 WritableUtils.writeStr(dataOutput, getExternalId()); 153 WritableUtils.writeStr(dataOutput, getExternalStatus()); 154 WritableUtils.writeStr(dataOutput, getTrackerUri()); 155 WritableUtils.writeStr(dataOutput, getConsoleUrl()); 156 WritableUtils.writeStr(dataOutput, getErrorCode()); 157 WritableUtils.writeStr(dataOutput, getErrorMessage()); 158 WritableUtils.writeStr(dataOutput, wfId); 159 WritableUtils.writeStr(dataOutput, executionPath); 160 dataOutput.writeInt(pending); 161 dataOutput.writeLong((pendingAge != null) ? pendingAge.getTime() : -1); 162 WritableUtils.writeStr(dataOutput, signalValue); 163 WritableUtils.writeStr(dataOutput, logToken); 164 dataOutput.writeInt(getUserRetryCount()); 165 dataOutput.writeInt(getUserRetryInterval()); 166 dataOutput.writeInt(getUserRetryMax()); 167 } 168 169 /** 170 * Deserialize an action bean from a data input. 171 * 172 * @param dataInput data input. 173 * @throws IOException thrown if the action bean could not be deserialized. 174 */ 175 public void readFields(DataInput dataInput) throws IOException { 176 setId(WritableUtils.readStr(dataInput)); 177 setName(WritableUtils.readStr(dataInput)); 178 setCred(WritableUtils.readStr(dataInput)); 179 setType(WritableUtils.readStr(dataInput)); 180 setConf(WritableUtils.readStr(dataInput)); 181 setStatus(WorkflowAction.Status.valueOf(WritableUtils.readStr(dataInput))); 182 setRetries(dataInput.readInt()); 183 long d = dataInput.readLong(); 184 if (d != -1) { 185 setStartTime(new Date(d)); 186 } 187 d = dataInput.readLong(); 188 if (d != -1) { 189 setEndTime(new Date(d)); 190 } 191 d = dataInput.readLong(); 192 if (d != -1) { 193 setLastCheckTime(new Date(d)); 194 } 195 setTransition(WritableUtils.readStr(dataInput)); 196 setData(WritableUtils.readStr(dataInput)); 197 setExternalId(WritableUtils.readStr(dataInput)); 198 setExternalStatus(WritableUtils.readStr(dataInput)); 199 setTrackerUri(WritableUtils.readStr(dataInput)); 200 setConsoleUrl(WritableUtils.readStr(dataInput)); 201 setErrorInfo(WritableUtils.readStr(dataInput), WritableUtils.readStr(dataInput)); 202 wfId = WritableUtils.readStr(dataInput); 203 executionPath = WritableUtils.readStr(dataInput); 204 pending = dataInput.readInt(); 205 d = dataInput.readLong(); 206 if (d != -1) { 207 pendingAge = new Date(d); 208 pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge); 209 } 210 signalValue = WritableUtils.readStr(dataInput); 211 logToken = WritableUtils.readStr(dataInput); 212 setUserRetryCount(dataInput.readInt()); 213 setUserRetryInterval(dataInput.readInt()); 214 setUserRetryMax(dataInput.readInt()); 215 } 216 217 /** 218 * Return if the action execution is complete. 219 * 220 * @return if the action start is complete. 221 */ 222 public boolean isExecutionComplete() { 223 return getStatus() == WorkflowAction.Status.DONE; 224 } 225 226 /** 227 * Return if the action is START_RETRY or START_MANUAL or END_RETRY or 228 * END_MANUAL. 229 * 230 * @return boolean true if status is START_RETRY or START_MANUAL or END_RETRY or 231 * END_MANUAL 232 */ 233 public boolean isRetryOrManual() { 234 return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL 235 || getStatus() == WorkflowAction.Status.END_RETRY || getStatus() == WorkflowAction.Status.END_MANUAL); 236 } 237 238 /** 239 * Return true if the action is USER_RETRY 240 * 241 * @return boolean true if status is USER_RETRY 242 */ 243 public boolean isUserRetry() { 244 return (getStatus() == WorkflowAction.Status.USER_RETRY); 245 } 246 247 /** 248 * Return if the action is complete. 249 * 250 * @return if the action is complete. 251 */ 252 public boolean isComplete() { 253 return getStatus() == WorkflowAction.Status.OK || getStatus() == WorkflowAction.Status.KILLED || 254 getStatus() == WorkflowAction.Status.ERROR; 255 } 256 257 /** 258 * Set the action pending flag to true. 259 */ 260 public void setPendingOnly() { 261 pending = 1; 262 } 263 264 /** 265 * Set the action as pending and the current time as pending. 266 */ 267 public void setPending() { 268 pending = 1; 269 pendingAge = new Date(); 270 pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge); 271 } 272 273 /** 274 * Set a time when the action will be pending, normally a time in the future. 275 * 276 * @param pendingAge the time when the action will be pending. 277 */ 278 public void setPendingAge(Date pendingAge) { 279 this.pendingAge = pendingAge; 280 this.pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge); 281 } 282 283 /** 284 * Return the pending age of the action. 285 * 286 * @return the pending age of the action, <code>null</code> if the action is not pending. 287 */ 288 public Date getPendingAge() { 289 return DateUtils.toDate(pendingAgeTimestamp); 290 } 291 292 /** 293 * Return if the action is pending. 294 * 295 * @return if the action is pending. 296 */ 297 public boolean isPending() { 298 return pending == 1 ? true : false; 299 } 300 301 /** 302 * Removes the pending flag and pendingAge from the action. 303 */ 304 public void resetPending() { 305 pending = 0; 306 pendingAge = null; 307 pendingAgeTimestamp = null; 308 } 309 310 /** 311 * Removes the pending flag from the action. 312 */ 313 public void resetPendingOnly() { 314 pending = 0; 315 } 316 317 /** 318 * Increments the number of retries for the action. 319 */ 320 public void incRetries() { 321 setRetries(getRetries() + 1); 322 } 323 324 /** 325 * Set a tracking information for an action, and set the action status to {@link Action.Status#DONE} 326 * 327 * @param externalId external ID for the action. 328 * @param trackerUri tracker URI for the action. 329 * @param consoleUrl console URL for the action. 330 */ 331 public void setStartData(String externalId, String trackerUri, String consoleUrl) { 332 setExternalId(ParamChecker.notEmpty(externalId, "externalId")); 333 setTrackerUri(ParamChecker.notEmpty(trackerUri, "trackerUri")); 334 setConsoleUrl(ParamChecker.notEmpty(consoleUrl, "consoleUrl")); 335 Date now = new Date(); 336 setStartTime(now); 337 setLastCheckTime(now); 338 setStatus(Status.RUNNING); 339 } 340 341 /** 342 * Set the completion information for an action start. Sets the Action status to {@link Action.Status#DONE} 343 * 344 * @param externalStatus action external end status. 345 * @param actionData action output data, <code>null</code> if there is no action output data. 346 */ 347 public void setExecutionData(String externalStatus, Properties actionData) { 348 setStatus(Status.DONE); 349 setExternalStatus(ParamChecker.notEmpty(externalStatus, "externalStatus")); 350 if (actionData != null) { 351 setData(PropertiesUtils.propertiesToString(actionData)); 352 } 353 } 354 355 /** 356 * Set the completion information for an action end. 357 * 358 * @param status action status, {@link Action.Status#OK} or {@link Action.Status#ERROR} or {@link 359 * Action.Status#KILLED} 360 * @param signalValue the signal value. In most cases, the value should be OK or ERROR. 361 */ 362 public void setEndData(Status status, String signalValue) { 363 if (status == null || (status != Status.OK && status != Status.ERROR && status != Status.KILLED)) { 364 throw new IllegalArgumentException("Action status must be OK, ERROR or KILLED. Received [" 365 + status.toString() + "]"); 366 } 367 if (status == Status.OK) { 368 setErrorInfo(null, null); 369 } 370 setStatus(status); 371 setSignalValue(ParamChecker.notEmpty(signalValue, "signalValue")); 372 } 373 374 375 /** 376 * Return the job Id. 377 * 378 * @return the job Id. 379 */ 380 public String getJobId() { 381 return wfId; 382 } 383 384 /** 385 * Return the job Id. 386 * 387 * @return the job Id. 388 */ 389 public String getWfId() { 390 return wfId; 391 } 392 393 /** 394 * Set the job id. 395 * 396 * @param id jobId; 397 */ 398 public void setJobId(String id) { 399 this.wfId = id; 400 } 401 402 public String getSlaXml() { 403 return slaXml; 404 } 405 406 public void setSlaXml(String slaXml) { 407 this.slaXml = slaXml; 408 } 409 410 @Override 411 public void setStatus(Status val) { 412 this.status = val.toString(); 413 super.setStatus(val); 414 } 415 416 public String getStatusStr() { 417 return status; 418 } 419 420 @Override 421 public Status getStatus() { 422 return Status.valueOf(this.status); 423 } 424 425 /** 426 * Return the node execution path. 427 * 428 * @return the node execution path. 429 */ 430 public String getExecutionPath() { 431 return executionPath; 432 } 433 434 /** 435 * Set the node execution path. 436 * 437 * @param executionPath the node execution path. 438 */ 439 public void setExecutionPath(String executionPath) { 440 this.executionPath = executionPath; 441 } 442 443 /** 444 * Return the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is 445 * OK or ERROR. 446 * 447 * @return the action signal value. 448 */ 449 public String getSignalValue() { 450 return signalValue; 451 } 452 453 /** 454 * Set the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is OK 455 * or ERROR. 456 * 457 * @param signalValue the action signal value. 458 */ 459 public void setSignalValue(String signalValue) { 460 this.signalValue = signalValue; 461 } 462 463 /** 464 * Return the job log token. 465 * 466 * @return the job log token. 467 */ 468 public String getLogToken() { 469 return logToken; 470 } 471 472 /** 473 * Set the job log token. 474 * 475 * @param logToken the job log token. 476 */ 477 public void setLogToken(String logToken) { 478 this.logToken = logToken; 479 } 480 481 /** 482 * Return the action last check time 483 * 484 * @return the last check time 485 */ 486 public Date getLastCheckTime() { 487 return DateUtils.toDate(lastCheckTimestamp); 488 } 489 490 /** 491 * Return the action last check time 492 * 493 * @return the last check time 494 */ 495 public Timestamp getLastCheckTimestamp() { 496 return lastCheckTimestamp; 497 } 498 499 /** 500 * Return the action last check time 501 * 502 * @return the last check time 503 */ 504 public Timestamp getStartTimestamp() { 505 return startTimestamp; 506 } 507 508 /** 509 * Return the action last check time 510 * 511 * @return the last check time 512 */ 513 public Timestamp getEndTimestamp() { 514 return endTimestamp; 515 } 516 517 518 /** 519 * Return the action last check time 520 * 521 * @return the last check time 522 */ 523 public Timestamp getPendingAgeTimestamp() { 524 return pendingAgeTimestamp; 525 } 526 527 /** 528 * Sets the action last check time 529 * 530 * @param lastCheckTime the last check time to set. 531 */ 532 public void setLastCheckTime(Date lastCheckTime) { 533 this.lastCheckTimestamp = DateUtils.convertDateToTimestamp(lastCheckTime); 534 } 535 536 public boolean getPending() { 537 return this.pending == 1 ? true : false; 538 } 539 540 @Override 541 public Date getStartTime() { 542 return DateUtils.toDate(startTimestamp); 543 } 544 545 @Override 546 public void setStartTime(Date startTime) { 547 super.setStartTime(startTime); 548 this.startTimestamp = DateUtils.convertDateToTimestamp(startTime); 549 } 550 551 @Override 552 public Date getEndTime() { 553 return DateUtils.toDate(endTimestamp); 554 } 555 556 @Override 557 public void setEndTime(Date endTime) { 558 super.setEndTime(endTime); 559 this.endTimestamp = DateUtils.convertDateToTimestamp(endTime); 560 } 561 562 }