001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import java.io.DataInput; 021 import java.io.DataOutput; 022 import java.io.IOException; 023 import java.sql.Timestamp; 024 import java.util.Date; 025 import java.util.Properties; 026 027 import javax.persistence.Basic; 028 import javax.persistence.Column; 029 import javax.persistence.Entity; 030 import javax.persistence.Lob; 031 import javax.persistence.NamedQueries; 032 import javax.persistence.NamedQuery; 033 import javax.persistence.Transient; 034 035 import org.apache.hadoop.io.Writable; 036 import org.apache.oozie.client.WorkflowAction; 037 import org.apache.oozie.client.rest.JsonWorkflowAction; 038 import org.apache.oozie.util.DateUtils; 039 import org.apache.oozie.util.ParamChecker; 040 import org.apache.oozie.util.PropertiesUtils; 041 import org.apache.oozie.util.WritableUtils; 042 import org.apache.openjpa.persistence.jdbc.Index; 043 044 /** 045 * Bean that contains all the information to start an action for a workflow node. 046 */ 047 @Entity 048 @NamedQueries({ 049 050 @NamedQuery(name = "UPDATE_ACTION", query = "update WorkflowActionBean a set a.conf = :conf, a.consoleUrl = :consoleUrl, a.data = :data, a.stats = :stats, a.externalChildIDs = :externalChildIDs, a.errorCode = :errorCode, a.errorMessage = :errorMessage, a.externalId = :externalId, a.externalStatus = :externalStatus, a.name = :name, a.cred = :cred , a.retries = :retries, a.trackerUri = :trackerUri, a.transition = :transition, a.type = :type, a.endTimestamp = :endTime, a.executionPath = :executionPath, a.lastCheckTimestamp = :lastCheckTime, a.logToken = :logToken, a.pending = :pending, a.pendingAgeTimestamp = :pendingAge, a.signalValue = :signalValue, a.slaXml = :slaXml, a.startTimestamp = :startTime, a.status = :status, a.wfId=:wfId where a.id = :id"), 051 052 @NamedQuery(name = "DELETE_ACTION", query = "delete from WorkflowActionBean a where a.id = :id"), 053 054 @NamedQuery(name = "DELETE_ACTIONS_FOR_WORKFLOW", query = "delete from WorkflowActionBean a where a.wfId = :wfId"), 055 056 @NamedQuery(name = "GET_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a"), 057 058 @NamedQuery(name = "GET_ACTION", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"), 059 060 @NamedQuery(name = "GET_ACTION_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.id = :id"), 061 062 @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"), 063 064 @NamedQuery(name = "GET_ACTIONS_OF_WORKFLOW_FOR_UPDATE", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp"), 065 066 @NamedQuery(name = "GET_PENDING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.pendingAgeTimestamp < :pendingAge AND a.status <> 'RUNNING'"), 067 068 @NamedQuery(name = "GET_RUNNING_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.pending = 1 AND a.status = 'RUNNING' AND a.lastCheckTimestamp < :lastCheckTime"), 069 070 @NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId AND (a.status = 'START_RETRY' OR a.status = 'START_MANUAL' OR a.status = 'END_RETRY' OR a.status = 'END_MANUAL')") }) 071 072 public class WorkflowActionBean extends JsonWorkflowAction implements Writable { 073 074 @Basic 075 @Index 076 @Column(name = "wf_id") 077 private String wfId = null; 078 079 @Basic 080 @Index 081 @Column(name = "status") 082 private String status = WorkflowAction.Status.PREP.toString(); 083 084 @Basic 085 @Column(name = "last_check_time") 086 private java.sql.Timestamp lastCheckTimestamp; 087 088 @Basic 089 @Column(name = "end_time") 090 private java.sql.Timestamp endTimestamp = null; 091 092 @Basic 093 @Column(name = "start_time") 094 private java.sql.Timestamp startTimestamp = null; 095 096 @Basic 097 @Column(name = "execution_path", length = 1024) 098 private String executionPath = null; 099 100 @Basic 101 @Column(name = "pending") 102 private int pending = 0; 103 104 // @Temporal(TemporalType.TIME) 105 // @Column(name="pending_age",columnDefinition="timestamp default '0000-00-00 00:00:00'") 106 @Basic 107 @Index 108 @Column(name = "pending_age") 109 private java.sql.Timestamp pendingAgeTimestamp = null; 110 111 @Basic 112 @Column(name = "signal_value") 113 private String signalValue = null; 114 115 @Basic 116 @Column(name = "log_token") 117 private String logToken = null; 118 119 @Transient 120 private Date pendingAge; 121 122 @Column(name = "sla_xml") 123 @Lob 124 private String slaXml = null; 125 126 /** 127 * Default constructor. 128 */ 129 public WorkflowActionBean() { 130 } 131 132 /** 133 * Serialize the action bean to a data output. 134 * 135 * @param dataOutput data output. 136 * @throws IOException thrown if the action bean could not be serialized. 137 */ 138 139 public void write(DataOutput dataOutput) throws IOException { 140 WritableUtils.writeStr(dataOutput, getId()); 141 WritableUtils.writeStr(dataOutput, getName()); 142 WritableUtils.writeStr(dataOutput, getCred()); 143 WritableUtils.writeStr(dataOutput, getType()); 144 WritableUtils.writeStr(dataOutput, getConf()); 145 WritableUtils.writeStr(dataOutput, getStatusStr()); 146 dataOutput.writeInt(getRetries()); 147 dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1); 148 dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1); 149 dataOutput.writeLong((getLastCheckTime() != null) ? getLastCheckTime().getTime() : -1); 150 WritableUtils.writeStr(dataOutput, getTransition()); 151 WritableUtils.writeStr(dataOutput, getData()); 152 WritableUtils.writeStr(dataOutput, getStats()); 153 WritableUtils.writeStr(dataOutput, getExternalChildIDs()); 154 WritableUtils.writeStr(dataOutput, getExternalId()); 155 WritableUtils.writeStr(dataOutput, getExternalStatus()); 156 WritableUtils.writeStr(dataOutput, getTrackerUri()); 157 WritableUtils.writeStr(dataOutput, getConsoleUrl()); 158 WritableUtils.writeStr(dataOutput, getErrorCode()); 159 WritableUtils.writeStr(dataOutput, getErrorMessage()); 160 WritableUtils.writeStr(dataOutput, wfId); 161 WritableUtils.writeStr(dataOutput, executionPath); 162 dataOutput.writeInt(pending); 163 dataOutput.writeLong((pendingAge != null) ? pendingAge.getTime() : -1); 164 WritableUtils.writeStr(dataOutput, signalValue); 165 WritableUtils.writeStr(dataOutput, logToken); 166 dataOutput.writeInt(getUserRetryCount()); 167 dataOutput.writeInt(getUserRetryInterval()); 168 dataOutput.writeInt(getUserRetryMax()); 169 } 170 171 /** 172 * Deserialize an action bean from a data input. 173 * 174 * @param dataInput data input. 175 * @throws IOException thrown if the action bean could not be deserialized. 176 */ 177 public void readFields(DataInput dataInput) throws IOException { 178 setId(WritableUtils.readStr(dataInput)); 179 setName(WritableUtils.readStr(dataInput)); 180 setCred(WritableUtils.readStr(dataInput)); 181 setType(WritableUtils.readStr(dataInput)); 182 setConf(WritableUtils.readStr(dataInput)); 183 setStatus(WorkflowAction.Status.valueOf(WritableUtils.readStr(dataInput))); 184 setRetries(dataInput.readInt()); 185 long d = dataInput.readLong(); 186 if (d != -1) { 187 setStartTime(new Date(d)); 188 } 189 d = dataInput.readLong(); 190 if (d != -1) { 191 setEndTime(new Date(d)); 192 } 193 d = dataInput.readLong(); 194 if (d != -1) { 195 setLastCheckTime(new Date(d)); 196 } 197 setTransition(WritableUtils.readStr(dataInput)); 198 setData(WritableUtils.readStr(dataInput)); 199 setStats(WritableUtils.readStr(dataInput)); 200 setExternalChildIDs(WritableUtils.readStr(dataInput)); 201 setExternalId(WritableUtils.readStr(dataInput)); 202 setExternalStatus(WritableUtils.readStr(dataInput)); 203 setTrackerUri(WritableUtils.readStr(dataInput)); 204 setConsoleUrl(WritableUtils.readStr(dataInput)); 205 setErrorInfo(WritableUtils.readStr(dataInput), WritableUtils.readStr(dataInput)); 206 wfId = WritableUtils.readStr(dataInput); 207 executionPath = WritableUtils.readStr(dataInput); 208 pending = dataInput.readInt(); 209 d = dataInput.readLong(); 210 if (d != -1) { 211 pendingAge = new Date(d); 212 pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge); 213 } 214 signalValue = WritableUtils.readStr(dataInput); 215 logToken = WritableUtils.readStr(dataInput); 216 setUserRetryCount(dataInput.readInt()); 217 setUserRetryInterval(dataInput.readInt()); 218 setUserRetryMax(dataInput.readInt()); 219 } 220 221 /** 222 * Return if the action execution is complete. 223 * 224 * @return if the action start is complete. 225 */ 226 public boolean isExecutionComplete() { 227 return getStatus() == WorkflowAction.Status.DONE; 228 } 229 230 /** 231 * Return if the action is START_RETRY or START_MANUAL or END_RETRY or 232 * END_MANUAL. 233 * 234 * @return boolean true if status is START_RETRY or START_MANUAL or END_RETRY or 235 * END_MANUAL 236 */ 237 public boolean isRetryOrManual() { 238 return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL 239 || getStatus() == WorkflowAction.Status.END_RETRY || getStatus() == WorkflowAction.Status.END_MANUAL); 240 } 241 242 /** 243 * Return true if the action is USER_RETRY 244 * 245 * @return boolean true if status is USER_RETRY 246 */ 247 public boolean isUserRetry() { 248 return (getStatus() == WorkflowAction.Status.USER_RETRY); 249 } 250 251 /** 252 * Return if the action is complete. 253 * 254 * @return if the action is complete. 255 */ 256 public boolean isComplete() { 257 return getStatus() == WorkflowAction.Status.OK || getStatus() == WorkflowAction.Status.KILLED || 258 getStatus() == WorkflowAction.Status.ERROR; 259 } 260 261 /** 262 * Set the action pending flag to true. 263 */ 264 public void setPendingOnly() { 265 pending = 1; 266 } 267 268 /** 269 * Set the action as pending and the current time as pending. 270 */ 271 public void setPending() { 272 pending = 1; 273 pendingAge = new Date(); 274 pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge); 275 } 276 277 /** 278 * Set a time when the action will be pending, normally a time in the future. 279 * 280 * @param pendingAge the time when the action will be pending. 281 */ 282 public void setPendingAge(Date pendingAge) { 283 this.pendingAge = pendingAge; 284 this.pendingAgeTimestamp = DateUtils.convertDateToTimestamp(pendingAge); 285 } 286 287 /** 288 * Return the pending age of the action. 289 * 290 * @return the pending age of the action, <code>null</code> if the action is not pending. 291 */ 292 public Date getPendingAge() { 293 return DateUtils.toDate(pendingAgeTimestamp); 294 } 295 296 /** 297 * Return if the action is pending. 298 * 299 * @return if the action is pending. 300 */ 301 public boolean isPending() { 302 return pending == 1 ? true : false; 303 } 304 305 /** 306 * Removes the pending flag and pendingAge from the action. 307 */ 308 public void resetPending() { 309 pending = 0; 310 pendingAge = null; 311 pendingAgeTimestamp = null; 312 } 313 314 /** 315 * Removes the pending flag from the action. 316 */ 317 public void resetPendingOnly() { 318 pending = 0; 319 } 320 321 /** 322 * Increments the number of retries for the action. 323 */ 324 public void incRetries() { 325 setRetries(getRetries() + 1); 326 } 327 328 /** 329 * Set a tracking information for an action, and set the action status to {@link Action.Status#DONE} 330 * 331 * @param externalId external ID for the action. 332 * @param trackerUri tracker URI for the action. 333 * @param consoleUrl console URL for the action. 334 */ 335 public void setStartData(String externalId, String trackerUri, String consoleUrl) { 336 setExternalId(ParamChecker.notEmpty(externalId, "externalId")); 337 setTrackerUri(ParamChecker.notEmpty(trackerUri, "trackerUri")); 338 setConsoleUrl(ParamChecker.notEmpty(consoleUrl, "consoleUrl")); 339 Date now = new Date(); 340 if (this.startTimestamp == null) { 341 setStartTime(now); 342 } 343 setLastCheckTime(now); 344 setStatus(Status.RUNNING); 345 } 346 347 /** 348 * Set the completion information for an action start. Sets the Action status to {@link Action.Status#DONE} 349 * 350 * @param externalStatus action external end status. 351 * @param actionData action output data, <code>null</code> if there is no action output data. 352 */ 353 public void setExecutionData(String externalStatus, Properties actionData) { 354 setStatus(Status.DONE); 355 setExternalStatus(ParamChecker.notEmpty(externalStatus, "externalStatus")); 356 if (actionData != null) { 357 setData(PropertiesUtils.propertiesToString(actionData)); 358 } 359 } 360 361 /** 362 * Return the action statistics info. 363 * 364 * @return Json representation of the stats. 365 */ 366 public String getExecutionStats() { 367 return getStats(); 368 } 369 370 /** 371 * Set the action statistics info for the workflow action. 372 * 373 * @param Json representation of the stats. 374 */ 375 public void setExecutionStats(String jsonStats) { 376 setStats(jsonStats); 377 } 378 379 /** 380 * Return the external child IDs. 381 * 382 * @return externalChildIDs as a string. 383 */ 384 public String getExternalChildIDs() { 385 return super.getExternalChildIDs(); 386 } 387 388 /** 389 * Set the external child IDs for the workflow action. 390 * 391 * @param externalChildIDs as a string. 392 */ 393 public void setExternalChildIDs(String externalChildIDs) { 394 super.setExternalChildIDs(externalChildIDs); 395 } 396 397 /** 398 * Set the completion information for an action end. 399 * 400 * @param status action status, {@link Action.Status#OK} or {@link Action.Status#ERROR} or {@link 401 * Action.Status#KILLED} 402 * @param signalValue the signal value. In most cases, the value should be OK or ERROR. 403 */ 404 public void setEndData(Status status, String signalValue) { 405 if (status == null || (status != Status.OK && status != Status.ERROR && status != Status.KILLED)) { 406 throw new IllegalArgumentException("Action status must be OK, ERROR or KILLED. Received [" 407 + status.toString() + "]"); 408 } 409 if (status == Status.OK) { 410 setErrorInfo(null, null); 411 } 412 setStatus(status); 413 setSignalValue(ParamChecker.notEmpty(signalValue, "signalValue")); 414 } 415 416 417 /** 418 * Return the job Id. 419 * 420 * @return the job Id. 421 */ 422 public String getJobId() { 423 return wfId; 424 } 425 426 /** 427 * Return the job Id. 428 * 429 * @return the job Id. 430 */ 431 public String getWfId() { 432 return wfId; 433 } 434 435 /** 436 * Set the job id. 437 * 438 * @param id jobId; 439 */ 440 public void setJobId(String id) { 441 this.wfId = id; 442 } 443 444 public String getSlaXml() { 445 return slaXml; 446 } 447 448 public void setSlaXml(String slaXml) { 449 this.slaXml = slaXml; 450 } 451 452 @Override 453 public void setStatus(Status val) { 454 this.status = val.toString(); 455 super.setStatus(val); 456 } 457 458 public String getStatusStr() { 459 return status; 460 } 461 462 @Override 463 public Status getStatus() { 464 return Status.valueOf(this.status); 465 } 466 467 /** 468 * Return the node execution path. 469 * 470 * @return the node execution path. 471 */ 472 public String getExecutionPath() { 473 return executionPath; 474 } 475 476 /** 477 * Set the node execution path. 478 * 479 * @param executionPath the node execution path. 480 */ 481 public void setExecutionPath(String executionPath) { 482 this.executionPath = executionPath; 483 } 484 485 /** 486 * Return the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is 487 * OK or ERROR. 488 * 489 * @return the action signal value. 490 */ 491 public String getSignalValue() { 492 return signalValue; 493 } 494 495 /** 496 * Set the signal value for the action. <p/> For decision nodes it is the choosen transition, for actions it is OK 497 * or ERROR. 498 * 499 * @param signalValue the action signal value. 500 */ 501 public void setSignalValue(String signalValue) { 502 this.signalValue = signalValue; 503 } 504 505 /** 506 * Return the job log token. 507 * 508 * @return the job log token. 509 */ 510 public String getLogToken() { 511 return logToken; 512 } 513 514 /** 515 * Set the job log token. 516 * 517 * @param logToken the job log token. 518 */ 519 public void setLogToken(String logToken) { 520 this.logToken = logToken; 521 } 522 523 /** 524 * Return the action last check time 525 * 526 * @return the last check time 527 */ 528 public Date getLastCheckTime() { 529 return DateUtils.toDate(lastCheckTimestamp); 530 } 531 532 /** 533 * Return the action last check time 534 * 535 * @return the last check time 536 */ 537 public Timestamp getLastCheckTimestamp() { 538 return lastCheckTimestamp; 539 } 540 541 /** 542 * Return the action last check time 543 * 544 * @return the last check time 545 */ 546 public Timestamp getStartTimestamp() { 547 return startTimestamp; 548 } 549 550 /** 551 * Return the action last check time 552 * 553 * @return the last check time 554 */ 555 public Timestamp getEndTimestamp() { 556 return endTimestamp; 557 } 558 559 560 /** 561 * Return the action last check time 562 * 563 * @return the last check time 564 */ 565 public Timestamp getPendingAgeTimestamp() { 566 return pendingAgeTimestamp; 567 } 568 569 /** 570 * Sets the action last check time 571 * 572 * @param lastCheckTime the last check time to set. 573 */ 574 public void setLastCheckTime(Date lastCheckTime) { 575 this.lastCheckTimestamp = DateUtils.convertDateToTimestamp(lastCheckTime); 576 } 577 578 public boolean getPending() { 579 return this.pending == 1 ? true : false; 580 } 581 582 @Override 583 public Date getStartTime() { 584 return DateUtils.toDate(startTimestamp); 585 } 586 587 @Override 588 public void setStartTime(Date startTime) { 589 super.setStartTime(startTime); 590 this.startTimestamp = DateUtils.convertDateToTimestamp(startTime); 591 } 592 593 @Override 594 public Date getEndTime() { 595 return DateUtils.toDate(endTimestamp); 596 } 597 598 @Override 599 public void setEndTime(Date endTime) { 600 super.setEndTime(endTime); 601 this.endTimestamp = DateUtils.convertDateToTimestamp(endTime); 602 } 603 604 }