001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import java.io.DataInput; 021 import java.io.DataOutput; 022 import java.io.IOException; 023 import java.sql.Timestamp; 024 import java.util.Date; 025 import java.util.Properties; 026 027 import javax.persistence.Basic; 028 import javax.persistence.Column; 029 import javax.persistence.Entity; 030 import javax.persistence.Lob; 031 import javax.persistence.NamedQueries; 032 import javax.persistence.NamedQuery; 033 import javax.persistence.Transient; 034 035 import org.apache.hadoop.io.Writable; 036 import org.apache.oozie.client.WorkflowAction; 037 import org.apache.oozie.client.rest.JsonWorkflowAction; 038 import org.apache.oozie.util.DateUtils; 039 import org.apache.oozie.util.ParamChecker; 040 import org.apache.oozie.util.PropertiesUtils; 041 import org.apache.oozie.util.WritableUtils; 042 import org.apache.openjpa.persistence.jdbc.Index; 043 044 /** 045 * Bean that contains all the information to start an action for a workflow node. 046 */ 047 @Entity 048 @NamedQueries({ 049 050 @NamedQuery(name = "UPDATE_ACTION", query = "update WorkflowActionBean a set a.conf = :conf, a.consoleUrl = :consoleUrl, a.data = :data, a.stats = :stats, a.externalChildIDs = :externalChildIDs, a.errorCode = :errorCode, a.errorMessage = :errorMessage, a.externalId = :externalId, a.externalStatus = :externalStatus, a.name = :name, a.cred = :cred , a.retries = :retries, a.trackerUri = :trackerUri, a.transition = :transition, a.type = :type, a.endTimestamp = :endTime, a.executionPath = :executionPath, a.lastCheckTimestamp = :lastCheckTime, a.logToken = :logToken, a.pending = :pending, a.pendingAgeTimestamp = :pendingAge, a.signalValue = :signalValue, a.slaXml = :slaXml, a.startTimestamp = :startTime, a.status = :status, a.wfId=:wfId where a.id = :id"), 051 052 @NamedQuery(name = "DELETE_ACTION", query = "delete from WorkflowActionBean a where a.id = :id"), 053 054 @NamedQuery(name = "DELETE_ACTIONS_FOR_WORKFLOW", query = "delete from WorkflowActionBean a where a.wfId = :wfId"), 055 056 @NamedQuery(name = "GET_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a"), 057 058 @NamedQuery(name = "GET_ACTION", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"), 059 060 @NamedQuery(name = "GET_ACTION_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"), 061 062 @NamedQuery(name = "GET_ACTION_FOR_SLA", query = "select a.id, a.status, a.startTimestamp, a.endTimestamp from WorkflowActionBean a where a.id = :id"), 063 064 @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"), 065 066 @NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"), 067 068 @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.status <> 'RUNNING'"), 069 070 @NamedQuery(name = "GET_RUNNING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.status = 'RUNNING' AND a.lastCheckTimestamp < :lastCheckTime"), 071 072 @NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId AND (a.status = 'START_RETRY' OR a.status = 'START_MANUAL' OR a.status = 'END_RETRY' OR a.status = 'END_MANUAL')") }) 073 074 public class WorkflowActionBean extends JsonWorkflowAction implements Writable { 075 076 @Basic 077 @Index 078 @Column(name = "wf_id") 079 private String wfId = null; 080 081 @Basic 082 @Index 083 @Column(name = "status") 084 private String status = WorkflowAction.Status.PREP.toString(); 085 086 @Basic 087 @Column(name = "last_check_time") 088 private java.sql.Timestamp lastCheckTimestamp; 089 090 @Basic 091 @Column(name = "end_time") 092 private java.sql.Timestamp endTimestamp = null; 093 094 @Basic 095 @Column(name = "start_time") 096 private java.sql.Timestamp startTimestamp = null; 097 098 @Basic 099 @Column(name = "execution_path", length = 1024) 100 private String executionPath = null; 101 102 @Basic 103 @Column(name = "pending") 104 private int pending = 0; 105 106 // @Temporal(TemporalType.TIME) 107 // @Column(name="pending_age",columnDefinition="timestamp default '0000-00-00 00:00:00'") 108 @Basic 109 @Index 110 @Column(name = "pending_age") 111 private java.sql.Timestamp pendingAgeTimestamp = null; 112 113 @Basic 114 @Column(name = "signal_value") 115 private String signalValue = null; 116 117 @Basic 118 @Column(name = "log_token") 119 private String logToken = null; 120 121 @Transient 122 private Date pendingAge; 123 124 @Column(name = "sla_xml") 125 @Lob 126 private String slaXml = null; 127 128 /** 129 * Default constructor. 130 */ 131 public WorkflowActionBean() { 132 } 133 134 /** 135 * Serialize the action bean to a data output. 136 * 137 * @param dataOutput data output. 138 * @throws IOException thrown if the action bean could not be serialized. 139 */ 140 141 public void write(DataOutput dataOutput) throws IOException { 142 WritableUtils.writeStr(dataOutput, getId()); 143 WritableUtils.writeStr(dataOutput, getName()); 144 WritableUtils.writeStr(dataOutput, getCred()); 145 WritableUtils.writeStr(dataOutput, getType()); 146 WritableUtils.writeStr(dataOutput, getConf()); 147 WritableUtils.writeStr(dataOutput, getStatusStr()); 148 dataOutput.writeInt(getRetries()); 149 dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1); 150 dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1); 151 dataOutput.writeLong((getLastCheckTime() != null) ? getLastCheckTime().getTime() : -1); 152 WritableUtils.writeStr(dataOutput, getTransition()); 153 WritableUtils.writeStr(dataOutput, getData()); 154 WritableUtils.writeStr(dataOutput, getStats()); 155 WritableUtils.writeStr(dataOutput, getExternalChildIDs()); 156 WritableUtils.writeStr(dataOutput, getExternalId()); 157 WritableUtils.writeStr(dataOutput, getExternalStatus()); 158 WritableUtils.writeStr(dataOutput, getTrackerUri()); 159 WritableUtils.writeStr(dataOutput, getConsoleUrl()); 160 WritableUtils.writeStr(dataOutput, getErrorCode()); 161 WritableUtils.writeStr(dataOutput, getErrorMessage()); 162 WritableUtils.writeStr(dataOutput, wfId); 163 WritableUtils.writeStr(dataOutput, executionPath); 164 dataOutput.writeInt(pending); 165 dataOutput.writeLong((pendingAge != null) ? pendingAge.getTime() : -1); 166 WritableUtils.writeStr(dataOutput, signalValue); 167 WritableUtils.writeStr(dataOutput, logToken); 168 dataOutput.writeInt(getUserRetryCount()); 169 dataOutput.writeInt(getUserRetryInterval()); 170 dataOutput.writeInt(getUserRetryMax()); 171 } 172 173 /** 174 * Deserialize an action bean from a data input. 175 * 176 * @param dataInput data input. 177 * @throws IOException thrown if the action bean could not be deserialized. 178 */ 179 public void readFields(DataInput dataInput) throws IOException { 180 setId(WritableUtils.readStr(dataInput)); 181 setName(WritableUtils.readStr(dataInput)); 182 setCred(WritableUtils.readStr(dataInput)); 183 setType(WritableUtils.readStr(dataInput)); 184 setConf(WritableUtils.readStr(dataInput)); 185 setStatus(WorkflowAction.Status.valueOf(WritableUtils.readStr(dataInput))); 186 setRetries(dataInput.readInt()); 187 long d = dataInput.readLong(); 188 if (d != -1) { 189 setStartTime(new Date(d)); 190 } 191 d = dataInput.readLong(); 192 if (d != -1) { 193 setEndTime(new Date(d)); 194 } 195 d = dataInput.readLong(); 196 if (d != -1) { 197 setLastCheckTime(new Date(d)); 198 } 199 setTransition(WritableUtils.readStr(dataInput)); 200 setData(WritableUtils.readStr(dataInput)); 201 setStats(WritableUtils.readStr(dataInput)); 202 setExternalChildIDs(WritableUtils.readStr(dataInput)); 203 setExternalId(WritableUtils.readStr(dataInput)); 204 setExternalStatus(WritableUtils.readStr(dataInput)); 205 setTrackerUri(WritableUtils.readStr(dataInput)); 206 setConsoleUrl(WritableUtils.readStr(dataInput)); 207 setErrorInfo(WritableUtils.readStr(dataInput), WritableUtils.readStr(dataInput)); 208 wfId = WritableUtils.readStr(dataInput); 209 executionPath = WritableUtils.readStr(dataInput); 210 pending = dataInput.readInt(); 211 d = dataInput.readLong(); 212 if (d != -1) { 213 pendingAge = new Date(d); 214 pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge); 215 } 216 signalValue = WritableUtils.readStr(dataInput); 217 logToken = WritableUtils.readStr(dataInput); 218 setUserRetryCount(dataInput.readInt()); 219 setUserRetryInterval(dataInput.readInt()); 220 setUserRetryMax(dataInput.readInt()); 221 } 222 223 /** 224 * Return whether workflow action in terminal state or not 225 * 226 * @return 227 */ 228 public boolean inTerminalState() { 229 boolean isTerminalState = false; 230 switch (WorkflowAction.Status.valueOf(status)) { 231 case ERROR: 232 case FAILED: 233 case KILLED: 234 case OK: 235 isTerminalState = true; 236 break; 237 default: 238 break; 239 } 240 return isTerminalState; 241 } 242 243 /** 244 * Return if the action execution is complete. 245 * 246 * @return if the action start is complete. 247 */ 248 public boolean isExecutionComplete() { 249 return getStatus() == WorkflowAction.Status.DONE; 250 } 251 252 /** 253 * Return if the action is START_RETRY or START_MANUAL or END_RETRY or 254 * END_MANUAL. 255 * 256 * @return boolean true if status is START_RETRY or START_MANUAL or END_RETRY or 257 * END_MANUAL 258 */ 259 public boolean isRetryOrManual() { 260 return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL 261 || getStatus() == WorkflowAction.Status.END_RETRY || getStatus() == WorkflowAction.Status.END_MANUAL); 262 } 263 264 /** 265 * Return true if the action is USER_RETRY 266 * 267 * @return boolean true if status is USER_RETRY 268 */ 269 public boolean isUserRetry() { 270 return (getStatus() == WorkflowAction.Status.USER_RETRY); 271 } 272 273 /** 274 * Return if the action is complete. 275 * 276 * @return if the action is complete. 277 */ 278 public boolean isComplete() { 279 return getStatus() == WorkflowAction.Status.OK || getStatus() == WorkflowAction.Status.KILLED || 280 getStatus() == WorkflowAction.Status.ERROR; 281 } 282 283 /** 284 * Return if the action is complete with failure. 285 * 286 * @return if the action is complete with failure. 287 */ 288 public boolean isTerminalWithFailure() { 289 boolean result = false; 290 switch (getStatus()) { 291 case FAILED: 292 case KILLED: 293 case ERROR: 294 result = true; 295 } 296 return result; 297 } 298 299 /** 300 * Set the action pending flag to true. 301 */ 302 public void setPendingOnly() { 303 pending = 1; 304 } 305 306 /** 307 * Set the action as pending and the current time as pending. 308 */ 309 public void setPending() { 310 pending = 1; 311 pendingAge = new Date(); 312 pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge); 313 } 314 315 /** 316 * Set a time when the action will be pending, normally a time in the future. 317 * 318 * @param pendingAge the time when the action will be pending. 319 */ 320 public void setPendingAge(Date pendingAge) { 321 this.pendingAge = pendingAge; 322 this.pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge); 323 } 324 325 /** 326 * Return the pending age of the action. 327 * 328 * @return the pending age of the action, <code>null</code> if the action is not pending. 329 */ 330 public Date getPendingAge() { 331 return DateUtils.toDate(pendingAgeTimestamp); 332 } 333 334 /** 335 * Return if the action is pending. 336 * 337 * @return if the action is pending. 338 */ 339 public boolean isPending() { 340 return pending == 1 ? true : false; 341 } 342 343 /** 344 * Removes the pending flag and pendingAge from the action. 345 */ 346 public void resetPending() { 347 pending = 0; 348 pendingAge = null; 349 pendingAgeTimestamp = null; 350 } 351 352 /** 353 * Removes the pending flag from the action. 354 */ 355 public void resetPendingOnly() { 356 pending = 0; 357 } 358 359 /** 360 * Increments the number of retries for the action. 361 */ 362 public void incRetries() { 363 setRetries(getRetries() + 1); 364 } 365 366 /** 367 * Set a tracking information for an action, and set the action status to {@link Action.Status#DONE} 368 * 369 * @param externalId external ID for the action. 370 * @param trackerUri tracker URI for the action. 371 * @param consoleUrl console URL for the action. 372 */ 373 public void setStartData(String externalId, String trackerUri, String consoleUrl) { 374 setExternalId(ParamChecker.notEmpty(externalId, "externalId")); 375 setTrackerUri(ParamChecker.notEmpty(trackerUri, "trackerUri")); 376 setConsoleUrl(ParamChecker.notEmpty(consoleUrl, "consoleUrl")); 377 Date now = new Date(); 378 if (this.startTimestamp == null) { 379 setStartTime(now); 380 } 381 setLastCheckTime(now); 382 setStatus(Status.RUNNING); 383 } 384 385 /** 386 * Set the completion information for an action start. Sets the Action status to {@link Action.Status#DONE} 387 * 388 * @param externalStatus action external end status. 389 * @param actionData action output data, <code>null</code> if there is no action output data. 390 */ 391 public void setExecutionData(String externalStatus, Properties actionData) { 392 setStatus(Status.DONE); 393 setExternalStatus(ParamChecker.notEmpty(externalStatus, "externalStatus")); 394 if (actionData != null) { 395 setData(PropertiesUtils.propertiesToString(actionData)); 396 } 397 } 398 399 /** 400 * Return the action statistics info. 401 * 402 * @return Json representation of the stats. 403 */ 404 public String getExecutionStats() { 405 return getStats(); 406 } 407 408 /** 409 * Set the action statistics info for the workflow action. 410 * 411 * @param Json representation of the stats. 412 */ 413 public void setExecutionStats(String jsonStats) { 414 setStats(jsonStats); 415 } 416 417 /** 418 * Return the external child IDs. 419 * 420 * @return externalChildIDs as a string. 421 */ 422 public String getExternalChildIDs() { 423 return super.getExternalChildIDs(); 424 } 425 426 /** 427 * Set the external child IDs for the workflow action. 428 * 429 * @param externalChildIDs as a string. 430 */ 431 public void setExternalChildIDs(String externalChildIDs) { 432 super.setExternalChildIDs(externalChildIDs); 433 } 434 435 /** 436 * Set the completion information for an action end. 437 * 438 * @param status action status, {@link Action.Status#OK} or {@link Action.Status#ERROR} or {@link 439 * Action.Status#KILLED} 440 * @param signalValue the signal value. In most cases, the value should be OK or ERROR. 441 */ 442 public void setEndData(Status status, String signalValue) { 443 if (status == null || (status != Status.OK && status != Status.ERROR && status != Status.KILLED)) { 444 throw new IllegalArgumentException("Action status must be OK, ERROR or KILLED. Received [" 445 + status.toString() + "]"); 446 } 447 if (status == Status.OK) { 448 setErrorInfo(null, null); 449 } 450 setStatus(status); 451 setSignalValue(ParamChecker.notEmpty(signalValue, "signalValue")); 452 } 453 454 455 /** 456 * Return the job Id. 457 * 458 * @return the job Id. 459 */ 460 public String getJobId() { 461 return wfId; 462 } 463 464 /** 465 * Return the job Id. 466 * 467 * @return the job Id. 468 */ 469 public String getWfId() { 470 return wfId; 471 } 472 473 /** 474 * Set the job id. 475 * 476 * @param id jobId; 477 */ 478 public void setJobId(String id) { 479 this.wfId = id; 480 } 481 482 public String getSlaXml() { 483 return slaXml; 484 } 485 486 public void setSlaXml(String slaXml) { 487 this.slaXml = slaXml; 488 } 489 490 @Override 491 public void setStatus(Status val) { 492 this.status = val.toString(); 493 super.setStatus(val); 494 } 495 496 public String getStatusStr() { 497 return status; 498 } 499 500 @Override 501 public Status getStatus() { 502 return Status.valueOf(this.status); 503 } 504 505 /** 506 * Return the node execution path. 507 * 508 * @return the node execution path. 509 */ 510 public String getExecutionPath() { 511 return executionPath; 512 } 513 514 /** 515 * Set the node execution path. 516 * 517 * @param executionPath the node execution path. 518 */ 519 public void setExecutionPath(String executionPath) { 520 this.executionPath = executionPath; 521 } 522 523 /** 524 * Return the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is 525 * OK or ERROR. 526 * 527 * @return the action signal value. 528 */ 529 public String getSignalValue() { 530 return signalValue; 531 } 532 533 /** 534 * Set the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is OK 535 * or ERROR. 536 * 537 * @param signalValue the action signal value. 538 */ 539 public void setSignalValue(String signalValue) { 540 this.signalValue = signalValue; 541 } 542 543 /** 544 * Return the job log token. 545 * 546 * @return the job log token. 547 */ 548 public String getLogToken() { 549 return logToken; 550 } 551 552 /** 553 * Set the job log token. 554 * 555 * @param logToken the job log token. 556 */ 557 public void setLogToken(String logToken) { 558 this.logToken = logToken; 559 } 560 561 /** 562 * Return the action last check time 563 * 564 * @return the last check time 565 */ 566 public Date getLastCheckTime() { 567 return DateUtils.toDate(lastCheckTimestamp); 568 } 569 570 /** 571 * Return the action last check time 572 * 573 * @return the last check time 574 */ 575 public Timestamp getLastCheckTimestamp() { 576 return lastCheckTimestamp; 577 } 578 579 /** 580 * Return the action last check time 581 * 582 * @return the last check time 583 */ 584 public Timestamp getStartTimestamp() { 585 return startTimestamp; 586 } 587 588 /** 589 * Return the action last check time 590 * 591 * @return the last check time 592 */ 593 public Timestamp getEndTimestamp() { 594 return endTimestamp; 595 } 596 597 598 /** 599 * Return the action last check time 600 * 601 * @return the last check time 602 */ 603 public Timestamp getPendingAgeTimestamp() { 604 return pendingAgeTimestamp; 605 } 606 607 /** 608 * Sets the action last check time 609 * 610 * @param lastCheckTime the last check time to set. 611 */ 612 public void setLastCheckTime(Date lastCheckTime) { 613 this.lastCheckTimestamp = DateUtils.convertDateToTimestamp(lastCheckTime); 614 } 615 616 public boolean getPending() { 617 return this.pending == 1 ? true : false; 618 } 619 620 @Override 621 public Date getStartTime() { 622 return DateUtils.toDate(startTimestamp); 623 } 624 625 @Override 626 public void setStartTime(Date startTime) { 627 super.setStartTime(startTime); 628 this.startTimestamp = DateUtils.convertDateToTimestamp(startTime); 629 } 630 631 @Override 632 public Date getEndTime() { 633 return DateUtils.toDate(endTimestamp); 634 } 635 636 @Override 637 public void setEndTime(Date endTime) { 638 super.setEndTime(endTime); 639 this.endTimestamp = DateUtils.convertDateToTimestamp(endTime); 640 } 641 642 }