001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.oozie; 019 020import java.io.DataInput; 021import java.io.DataOutput; 022import java.io.IOException; 023import java.sql.Timestamp; 024import java.text.MessageFormat; 025import java.util.ArrayList; 026import java.util.Date; 027import java.util.List; 028 029import javax.persistence.Basic; 030import javax.persistence.Column; 031import javax.persistence.Entity; 032import javax.persistence.Id; 033import javax.persistence.Lob; 034import javax.persistence.NamedQueries; 035import javax.persistence.NamedQuery; 036import javax.persistence.Table; 037import javax.persistence.Transient; 038 039import org.apache.hadoop.io.Writable; 040import org.apache.oozie.client.CoordinatorAction; 041import org.apache.oozie.client.CoordinatorJob; 042import org.apache.oozie.client.rest.JsonBean; 043import org.apache.oozie.client.rest.JsonTags; 044import org.apache.oozie.client.rest.JsonUtils; 045import org.apache.oozie.util.DateUtils; 046import org.apache.oozie.util.WritableUtils; 047import org.apache.openjpa.persistence.jdbc.Index; 048import org.apache.openjpa.persistence.jdbc.Strategy; 049import org.json.simple.JSONArray; 050import org.json.simple.JSONObject; 051 052@Entity 053@NamedQueries( { 054 @NamedQuery(name = "UPDATE_COORD_JOB", query = "update CoordinatorJobBean w set w.appName = :appName, w.appPath = :appPath,w.concurrency = :concurrency, w.conf = :conf, w.externalId = :externalId, w.frequency = :frequency, w.lastActionNumber = :lastActionNumber, w.timeOut = :timeOut, w.timeZone = :timeZone, w.createdTimestamp = :createdTime, w.endTimestamp = :endTime, w.execution = :execution, w.jobXml = :jobXml, w.lastActionTimestamp = :lastAction, w.lastModifiedTimestamp = :lastModifiedTime, w.nextMaterializedTimestamp = :nextMaterializedTime, w.origJobXml = :origJobXml, w.slaXml=:slaXml, w.startTimestamp = :startTime, w.statusStr = :status, w.timeUnitStr = :timeUnit, w.appNamespace = :appNamespace, w.bundleId = :bundleId, w.matThrottling = :matThrottling where w.id = :id"), 055 056 @NamedQuery(name = "UPDATE_COORD_JOB_STATUS", query = "update CoordinatorJobBean w set w.statusStr =:status, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"), 057 058 @NamedQuery(name = "UPDATE_COORD_JOB_PENDING", query = "update CoordinatorJobBean w set w.pending = :pending, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"), 059 060 @NamedQuery(name = "UPDATE_COORD_JOB_BUNDLEID", query = "update CoordinatorJobBean w set w.bundleId = :bundleId where w.id = :id"), 061 062 @NamedQuery(name = "UPDATE_COORD_JOB_APPNAMESPACE", query = "update CoordinatorJobBean w set w.appNamespace = :appNamespace where w.id = :id"), 063 064 @NamedQuery(name = "UPDATE_COORD_JOB_STATUS_PENDING", query = "update CoordinatorJobBean w set w.statusStr = :status, w.pending = :pending where w.id = :id"), 065 066 @NamedQuery(name = "UPDATE_COORD_JOB_BUNDLEID_APPNAMESPACE_PAUSETIME", query = "update CoordinatorJobBean w set w.bundleId = :bundleId, w.appNamespace = :appNamespace, w.pauseTimestamp = :pauseTime where w.id = :id"), 067 068 @NamedQuery(name = "UPDATE_COORD_JOB_STATUS_MODTIME", query = "update CoordinatorJobBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"), 069 070 @NamedQuery(name = "UPDATE_COORD_JOB_STATUS_PENDING_MODTIME", query = "update CoordinatorJobBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.pending = :pending where w.id = :id"), 071 072 @NamedQuery(name = "UPDATE_COORD_JOB_LAST_MODIFIED_TIME", query = "update CoordinatorJobBean w set w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"), 073 074 @NamedQuery(name = "UPDATE_COORD_JOB_STATUS_PENDING_TIME", query = "update CoordinatorJobBean w set w.statusStr = :status, w.pending = :pending, w.doneMaterialization = :doneMaterialization, w.lastModifiedTimestamp = :lastModifiedTime, w.suspendedTimestamp = :suspendedTime where w.id = :id"), 075 076 @NamedQuery(name = "UPDATE_COORD_JOB_MATERIALIZE", query = "update CoordinatorJobBean w set w.statusStr = :status, w.pending = :pending, w.doneMaterialization = :doneMaterialization, w.lastActionTimestamp = :lastActionTime, w.lastActionNumber = :lastActionNumber, w.nextMaterializedTimestamp = :nextMatdTime where w.id = :id"), 077 078 @NamedQuery(name = "UPDATE_COORD_JOB_CHANGE", query = "update CoordinatorJobBean w set w.endTimestamp = :endTime, w.statusStr = :status, w.pending = :pending, w.doneMaterialization = :doneMaterialization, w.concurrency = :concurrency, w.pauseTimestamp = :pauseTime, w.lastActionNumber = :lastActionNumber, w.lastActionTimestamp = :lastActionTime, w.nextMaterializedTimestamp = :nextMatdTime, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"), 079 080 @NamedQuery(name = "DELETE_COORD_JOB", query = "delete from CoordinatorJobBean w where w.id = :id"), 081 082 @NamedQuery(name = "GET_COORD_JOBS", query = "select OBJECT(w) from CoordinatorJobBean w"), 083 084 @NamedQuery(name = "GET_COORD_JOB", query = "select OBJECT(w) from CoordinatorJobBean w where w.id = :id"), 085 086 @NamedQuery(name = "GET_COORD_JOB_USER_APPNAME", query = "select w.user, w.appName from CoordinatorJobBean w where w.id = :id"), 087 088 @NamedQuery(name = "GET_COORD_JOB_INPUT_CHECK", query = "select w.user, w.appName, w.statusStr, w.appNamespace, w.execution, w.frequency, w.timeUnitStr, w.timeZone, w.endTimestamp from CoordinatorJobBean w where w.id = :id"), 089 090 @NamedQuery(name = "GET_COORD_JOB_ACTION_READY", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.execution, w.concurrency from CoordinatorJobBean w where w.id = :id"), 091 092 @NamedQuery(name = "GET_COORD_JOB_ACTION_KILL", query = "select w.id, w.user, w.group, w.appName, w.statusStr from CoordinatorJobBean w where w.id = :id"), 093 094 @NamedQuery(name = "GET_COORD_JOB_MATERIALIZE", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.frequency, w.matThrottling, w.timeOut, w.timeZone, w.startTimestamp, w.endTimestamp, w.pauseTimestamp, w.nextMaterializedTimestamp, w.lastActionTimestamp, w.lastActionNumber, w.doneMaterialization, w.bundleId, w.conf, w.jobXml, w.appNamespace, w.timeUnitStr, w.execution from CoordinatorJobBean w where w.id = :id"), 095 096 @NamedQuery(name = "GET_COORD_JOB_SUSPEND_KILL", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.bundleId, w.appNamespace, w.doneMaterialization from CoordinatorJobBean w where w.id = :id"), 097 098 @NamedQuery(name = "GET_COORD_JOBS_PENDING", query = "select OBJECT(w) from CoordinatorJobBean w where w.pending = 1 order by w.lastModifiedTimestamp"), 099 100 @NamedQuery(name = "GET_COORD_JOBS_CHANGED", query = "select OBJECT(w) from CoordinatorJobBean w where w.pending = 1 AND w.doneMaterialization = 1 AND w.lastModifiedTimestamp >= :lastModifiedTime"), 101 102 @NamedQuery(name = "GET_COORD_JOBS_COUNT", query = "select count(w) from CoordinatorJobBean w"), 103 104 @NamedQuery(name = "GET_COORD_JOBS_COLUMNS", query = "select w.id, w.appName, w.statusStr, w.user, w.group, w.startTimestamp, w.endTimestamp, w.appPath, w.concurrency, w.frequency, w.lastActionTimestamp, w.nextMaterializedTimestamp, w.createdTimestamp, w.timeUnitStr, w.timeZone, w.timeOut from CoordinatorJobBean w order by w.createdTimestamp desc"), 105 106 //TODO need to remove. 107 @NamedQuery(name = "GET_COORD_JOBS_OLDER_THAN", query = "select OBJECT(w) from CoordinatorJobBean w where w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR') AND (w.nextMaterializedTimestamp < :matTime OR w.nextMaterializedTimestamp IS NULL) AND (w.nextMaterializedTimestamp IS NULL OR (w.endTimestamp > w.nextMaterializedTimestamp AND (w.pauseTimestamp IS NULL OR w.pauseTimestamp > w.nextMaterializedTimestamp))) order by w.lastModifiedTimestamp"), 108 109 @NamedQuery(name = "GET_COORD_JOBS_OLDER_FOR_MATERILZATION", query = "select w.id from CoordinatorJobBean w where w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR') AND (w.nextMaterializedTimestamp < :matTime OR w.nextMaterializedTimestamp IS NULL) AND (w.nextMaterializedTimestamp IS NULL OR (w.endTimestamp > w.nextMaterializedTimestamp AND (w.pauseTimestamp IS NULL OR w.pauseTimestamp > w.nextMaterializedTimestamp))) and w.matThrottling > ( select count(a.jobId) from CoordinatorActionBean a where a.jobId = w.id and a.statusStr = 'WAITING') order by w.lastModifiedTimestamp"), 110 111 @NamedQuery(name = "GET_COORD_JOBS_OLDER_THAN_STATUS", query = "select OBJECT(w) from CoordinatorJobBean w where w.statusStr = :status AND w.lastModifiedTimestamp <= :lastModTime order by w.lastModifiedTimestamp"), 112 113 @NamedQuery(name = "GET_COMPLETED_COORD_JOBS_OLDER_THAN_STATUS", query = "select OBJECT(w) from CoordinatorJobBean w where ( w.statusStr = 'SUCCEEDED' OR w.statusStr = 'FAILED' or w.statusStr = 'KILLED') AND w.lastModifiedTimestamp <= :lastModTime order by w.lastModifiedTimestamp"), 114 115 @NamedQuery(name = "GET_COMPLETED_COORD_JOBS_WITH_NO_PARENT_OLDER_THAN_STATUS", query = "select w.id from CoordinatorJobBean w where ( w.statusStr = 'SUCCEEDED' OR w.statusStr = 'FAILED' or w.statusStr = 'KILLED' or w.statusStr = 'DONEWITHERROR') AND w.lastModifiedTimestamp <= :lastModTime and w.bundleId is null order by w.lastModifiedTimestamp"), 116 117 @NamedQuery(name = "GET_COORD_JOBS_UNPAUSED", query = "select OBJECT(w) from CoordinatorJobBean w where w.statusStr = 'RUNNING' OR w.statusStr = 'RUNNINGWITHERROR' OR w.statusStr = 'PREP' order by w.lastModifiedTimestamp"), 118 119 @NamedQuery(name = "GET_COORD_JOBS_PAUSED", query = "select OBJECT(w) from CoordinatorJobBean w where w.statusStr = 'PAUSED' OR w.statusStr = 'PAUSEDWITHERROR' OR w.statusStr = 'PREPPAUSED' order by w.lastModifiedTimestamp"), 120 121 @NamedQuery(name = "GET_COORD_JOBS_FOR_BUNDLE", query = "select OBJECT(w) from CoordinatorJobBean w where w.bundleId = :bundleId order by w.lastModifiedTimestamp"), 122 123 @NamedQuery(name = "GET_COORD_JOBS_WITH_PARENT_ID", query = "select w.id from CoordinatorJobBean w where w.bundleId = :parentId"), 124 125 @NamedQuery(name = "GET_COORD_COUNT_WITH_PARENT_ID_NOT_READY_FOR_PURGE", query = "select count(w) from CoordinatorJobBean w where w.bundleId = :parentId and (w.statusStr NOT IN ('SUCCEEDED', 'FAILED', 'KILLED', 'DONEWITHERROR') OR w.lastModifiedTimestamp >= :lastModTime)"), 126 127 @NamedQuery(name = "GET_COORD_JOB_FOR_USER_APPNAME", query = "select w.user, w.appName from CoordinatorJobBean w where w.id = :id"), 128 129 @NamedQuery(name = "GET_COORD_JOB_FOR_USER", query = "select w.user from CoordinatorJobBean w where w.id = :id"), 130 131 @NamedQuery(name = "GET_COORD_JOB_STATUS_PARENTID", query = "select w.statusStr, w.bundleId from CoordinatorJobBean w where w.id = :id") 132 133}) 134@Table(name = "COORD_JOBS") 135public class CoordinatorJobBean implements Writable, CoordinatorJob, JsonBean { 136 137 @Id 138 private String id; 139 140 @Basic 141 @Column(name = "app_path") 142 private String appPath = null; 143 144 @Basic 145 @Column(name = "app_name") 146 private String appName = null; 147 148 @Basic 149 @Column(name = "external_id") 150 private String externalId = null; 151 152 @Basic 153 @Column(name = "conf") 154 @Lob 155 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 156 private StringBlob conf = null; 157 158 @Basic 159 @Column(name = "frequency") 160 private String frequency = "0"; 161 162 @Basic 163 @Column(name = "time_zone") 164 private String timeZone = null; 165 166 @Basic 167 @Column(name = "concurrency") 168 private int concurrency = 0; 169 170 @Basic 171 @Column(name = "mat_throttling") 172 private int matThrottling = 0; 173 174 @Basic 175 @Column(name = "time_out") 176 private int timeOut = 0; 177 178 @Basic 179 @Column(name = "last_action_number") 180 private int lastActionNumber; 181 182 @Basic 183 @Column(name = "user_name") 184 private String user = null; 185 186 @Basic 187 @Column(name = "group_name") 188 private String group = null; 189 190 @Basic 191 @Index 192 @Column(name = "bundle_id") 193 private String bundleId = null; 194 195 @Transient 196 private String consoleUrl; 197 198 @Transient 199 private List<CoordinatorActionBean> actions; 200 201 @Transient 202 private int numActions = 0; 203 204 @Basic 205 @Index 206 @Column(name = "status") 207 private String statusStr = CoordinatorJob.Status.PREP.toString(); 208 209 @Basic 210 @Column(name = "start_time") 211 private java.sql.Timestamp startTimestamp = null; 212 213 @Basic 214 @Column(name = "end_time") 215 private java.sql.Timestamp endTimestamp = null; 216 217 @Basic 218 @Column(name = "pause_time") 219 private java.sql.Timestamp pauseTimestamp = null; 220 221 @Basic 222 @Index 223 @Column(name = "created_time") 224 private java.sql.Timestamp createdTimestamp = null; 225 226 @Basic 227 @Column(name = "time_unit") 228 private String timeUnitStr = CoordinatorJob.Timeunit.NONE.toString(); 229 230 @Basic 231 @Column(name = "execution") 232 private String execution = CoordinatorJob.Execution.FIFO.toString(); 233 234 @Basic 235 @Column(name = "last_action") 236 private java.sql.Timestamp lastActionTimestamp = null; 237 238 @Basic 239 @Index 240 @Column(name = "next_matd_time") 241 private java.sql.Timestamp nextMaterializedTimestamp = null; 242 243 @Basic 244 @Index 245 @Column(name = "last_modified_time") 246 private java.sql.Timestamp lastModifiedTimestamp = null; 247 248 @Basic 249 @Index 250 @Column(name = "suspended_time") 251 private java.sql.Timestamp suspendedTimestamp = null; 252 253 @Basic 254 @Column(name = "job_xml") 255 @Lob 256 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 257 private StringBlob jobXml = null; 258 259 @Basic 260 @Column(name = "orig_job_xml") 261 @Lob 262 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 263 private StringBlob origJobXml = null; 264 265 @Basic 266 @Column(name = "sla_xml") 267 @Lob 268 @Strategy("org.apache.oozie.executor.jpa.StringBlobValueHandler") 269 private StringBlob slaXml = null; 270 271 @Basic 272 @Column(name = "pending") 273 private int pending = 0; 274 275 @Basic 276 @Column(name = "done_materialization") 277 private int doneMaterialization = 0; 278 279 @Basic 280 @Column(name = "app_namespace") 281 private String appNamespace = null; 282 283 /** 284 * Get start timestamp 285 * 286 * @return start timestamp 287 */ 288 public java.sql.Timestamp getStartTimestamp() { 289 return startTimestamp; 290 } 291 292 /** 293 * Set start timestamp 294 * 295 * @param startTimestamp start timestamp 296 */ 297 public void setStartTimestamp(java.sql.Timestamp startTimestamp) { 298 this.startTimestamp = startTimestamp; 299 } 300 301 /** 302 * Get end timestamp 303 * 304 * @return end timestamp 305 */ 306 public java.sql.Timestamp getEndTimestamp() { 307 return endTimestamp; 308 } 309 310 /** 311 * Set end timestamp 312 * 313 * @param endTimestamp end timestamp 314 */ 315 public void setEndTimestamp(java.sql.Timestamp endTimestamp) { 316 this.endTimestamp = endTimestamp; 317 } 318 319 /** 320 * Get next materialized timestamp 321 * 322 * @return next materialized timestamp 323 */ 324 public Timestamp getNextMaterializedTimestamp() { 325 return nextMaterializedTimestamp; 326 } 327 328 /** 329 * Set next materialized timestamp 330 * 331 * @param nextMaterializedTimestamp next materialized timestamp 332 */ 333 public void setNextMaterializedTimestamp(java.sql.Timestamp nextMaterializedTimestamp) { 334 this.nextMaterializedTimestamp = nextMaterializedTimestamp; 335 } 336 337 /** 338 * Get last modified timestamp 339 * 340 * @return last modified timestamp 341 */ 342 public Timestamp getLastModifiedTimestamp() { 343 return lastModifiedTimestamp; 344 } 345 346 /** 347 * Set last modified timestamp 348 * 349 * @param lastModifiedTimestamp last modified timestamp 350 */ 351 public void setLastModifiedTimestamp(java.sql.Timestamp lastModifiedTimestamp) { 352 this.lastModifiedTimestamp = lastModifiedTimestamp; 353 } 354 355 /** 356 * Get suspended timestamp 357 * 358 * @return suspended timestamp 359 */ 360 public Timestamp getSuspendedTimestamp() { 361 return suspendedTimestamp; 362 } 363 364 /** 365 * Set suspended timestamp 366 * 367 * @param suspendedTimestamp suspended timestamp 368 */ 369 public void setSuspendedTimestamp(java.sql.Timestamp suspendedTimestamp) { 370 this.suspendedTimestamp = suspendedTimestamp; 371 } 372 373 /** 374 * Get job xml 375 * 376 * @return job xml 377 */ 378 public String getJobXml() { 379 return jobXml == null ? null : jobXml.getString(); 380 } 381 382 /** 383 * Set job xml 384 * 385 * @param jobXml job xml 386 */ 387 public void setJobXml(String jobXml) { 388 if (this.jobXml == null) { 389 this.jobXml = new StringBlob(jobXml); 390 } 391 else { 392 this.jobXml.setString(jobXml); 393 } 394 } 395 396 public void setJobXmlBlob (StringBlob jobXmlBlob) { 397 this.jobXml = jobXmlBlob; 398 } 399 400 public StringBlob getJobXmlBlob() { 401 return jobXml; 402 } 403 404 /** 405 * Get original job xml 406 * 407 * @return original job xml 408 */ 409 public String getOrigJobXml() { 410 return origJobXml == null ? null : origJobXml.getString(); 411 } 412 413 /** 414 * Set original job xml 415 * 416 * @param origJobXml 417 */ 418 public void setOrigJobXml(String origJobXml) { 419 if (this.origJobXml == null) { 420 this.origJobXml = new StringBlob(origJobXml); 421 } 422 else { 423 this.origJobXml.setString(origJobXml); 424 } 425 } 426 427 public void setOrigJobXmlBlob (StringBlob origJobXml) { 428 this.origJobXml = origJobXml; 429 } 430 431 public StringBlob getOrigJobXmlBlob() { 432 return origJobXml; 433 } 434 435 /** 436 * Get sla xml 437 * 438 * @return sla xml 439 */ 440 public String getSlaXml() { 441 return slaXml == null ? null : slaXml.getString(); 442 } 443 444 /** 445 * Set sla xml 446 * 447 * @param slaXml sla xml 448 */ 449 public void setSlaXml(String slaXml) { 450 if (this.slaXml == null) { 451 this.slaXml = new StringBlob(slaXml); 452 } 453 else { 454 this.slaXml.setString(slaXml); 455 } 456 } 457 458 public void setSlaXmlBlob(StringBlob slaXml) { 459 this.slaXml = slaXml; 460 } 461 462 public StringBlob getSlaXmlBlob() { 463 return slaXml; 464 } 465 466 467 468 /** 469 * Set last action timestamp 470 * 471 * @param lastActionTimestamp last action timestamp 472 */ 473 public void setLastActionTimestamp(java.sql.Timestamp lastActionTimestamp) { 474 this.lastActionTimestamp = lastActionTimestamp; 475 } 476 477 /** 478 * Return if the action is pending. 479 * 480 * @return if the action is pending. 481 */ 482 public boolean isPending() { 483 return pending == 1 ? true : false; 484 } 485 486 /** 487 * Set doneMaterialization to true 488 */ 489 public void setDoneMaterialization() { 490 this.doneMaterialization = 1; 491 } 492 493 /** 494 * Set doneMaterialization 495 */ 496 public void setDoneMaterialization(int i) { 497 this.doneMaterialization = i; 498 } 499 500 /** 501 * Set doneMaterialization to false 502 */ 503 public void resetDoneMaterialization() { 504 this.doneMaterialization = 0; 505 } 506 507 /** 508 * Return if the action is done with materialization 509 * 510 * @return if the action is done with materialization 511 */ 512 public boolean isDoneMaterialization() { 513 return doneMaterialization == 1 ? true : false; 514 } 515 516 517 /** 518 * Get app namespce 519 * 520 * @return app namespce 521 */ 522 public String getAppNamespace() { 523 return appNamespace; 524 } 525 526 /** 527 * Set app namespce 528 * 529 * @param appNamespace the app namespce to set 530 */ 531 public void setAppNamespace(String appNamespace) { 532 this.appNamespace = appNamespace; 533 } 534 535 public CoordinatorJobBean() { 536 actions = new ArrayList<CoordinatorActionBean>(); 537 } 538 539 /* 540 * Serialize the coordinator bean to a data output. @param dataOutput data 541 * output. @throws IOException thrown if the coordinator bean could not be 542 * serialized. 543 */ 544 public void write(DataOutput dataOutput) throws IOException { 545 WritableUtils.writeStr(dataOutput, getAppPath()); 546 WritableUtils.writeStr(dataOutput, getAppName()); 547 WritableUtils.writeStr(dataOutput, getId()); 548 WritableUtils.writeStr(dataOutput, getConf()); 549 WritableUtils.writeStr(dataOutput, getStatusStr()); 550 WritableUtils.writeStr(dataOutput, getFrequency()); 551 WritableUtils.writeStr(dataOutput, getTimeUnit().toString()); 552 WritableUtils.writeStr(dataOutput, getTimeZone()); 553 dataOutput.writeInt(getConcurrency()); 554 WritableUtils.writeStr(dataOutput, getExecutionOrder().toString()); 555 dataOutput.writeLong((getLastActionTime() != null) ? getLastActionTime().getTime() : -1); 556 dataOutput.writeLong((getNextMaterializedTime() != null) ? getNextMaterializedTime().getTime() : -1); 557 dataOutput.writeLong((getStartTime() != null) ? getStartTime().getTime() : -1); 558 dataOutput.writeLong((getEndTime() != null) ? getEndTime().getTime() : -1); 559 WritableUtils.writeStr(dataOutput, getUser()); 560 WritableUtils.writeStr(dataOutput, getGroup()); 561 WritableUtils.writeStr(dataOutput, getExternalId()); 562 dataOutput.writeInt(getTimeout()); 563 dataOutput.writeInt(getMatThrottling()); 564 if (isPending()) { 565 dataOutput.writeInt(1); 566 } else { 567 dataOutput.writeInt(0); 568 } 569 if (isDoneMaterialization()) { 570 dataOutput.writeInt(1); 571 } else { 572 dataOutput.writeInt(0); 573 } 574 WritableUtils.writeStr(dataOutput, getAppNamespace()); 575 } 576 577 /** 578 * Deserialize a coordinator bean from a data input. 579 * 580 * @param dataInput data input. 581 * @throws IOException thrown if the workflow bean could not be deserialized. 582 */ 583 public void readFields(DataInput dataInput) throws IOException { 584 setAppPath(WritableUtils.readStr(dataInput)); 585 setAppName(WritableUtils.readStr(dataInput)); 586 setId(WritableUtils.readStr(dataInput)); 587 setConf(WritableUtils.readStr(dataInput)); 588 setStatus(CoordinatorJob.Status.valueOf(WritableUtils.readStr(dataInput))); 589 setFrequency(WritableUtils.readStr(dataInput)); 590 setTimeUnit(CoordinatorJob.Timeunit.valueOf(WritableUtils.readStr(dataInput))); 591 setTimeZone(WritableUtils.readStr(dataInput)); 592 setConcurrency(dataInput.readInt()); 593 setExecutionOrder(Execution.valueOf(WritableUtils.readStr(dataInput))); 594 595 long d = dataInput.readLong(); 596 if (d != -1) { 597 setLastActionTime(new Date(d)); 598 } 599 d = dataInput.readLong(); 600 if (d != -1) { 601 setNextMaterializedTime(new Date(d)); 602 } 603 d = dataInput.readLong(); 604 if (d != -1) { 605 setStartTime(new Date(d)); 606 } 607 608 d = dataInput.readLong(); 609 if (d != -1) { 610 setEndTime(new Date(d)); 611 } 612 setUser(WritableUtils.readStr(dataInput)); 613 setGroup(WritableUtils.readStr(dataInput)); 614 setExternalId(WritableUtils.readStr(dataInput)); 615 setTimeout(dataInput.readInt()); 616 setMatThrottling(dataInput.readInt()); 617 618 d = dataInput.readInt(); 619 if (d == 1) { 620 setPending(); 621 } 622 623 d = dataInput.readInt(); 624 if (d == 1) { 625 setDoneMaterialization(); 626 } 627 628 setAppNamespace(WritableUtils.readStr(dataInput)); 629 } 630 631 /** 632 * @return true if in terminal status 633 */ 634 public boolean isTerminalStatus() { 635 boolean isTerminal = false; 636 switch (getStatus()) { 637 case SUCCEEDED: 638 case FAILED: 639 case KILLED: 640 case DONEWITHERROR: 641 case IGNORED: 642 isTerminal = true; 643 break; 644 default: 645 isTerminal = false; 646 break; 647 } 648 return isTerminal; 649 } 650 651 @Override 652 public Status getStatus() { 653 return Status.valueOf(this.statusStr); 654 } 655 656 /** 657 * Get status 658 * 659 * @return status 660 */ 661 public String getStatusStr() { 662 return statusStr; 663 } 664 665 /** 666 * Get status 667 * 668 * @return status 669 */ 670 public void setStatusStr(String status) { 671 this.statusStr = status; 672 } 673 674 @Override 675 public void setStatus(Status val) { 676 this.statusStr = val.toString(); 677 } 678 679 /** 680 * Get time unit 681 * 682 * @return time unit 683 */ 684 public String getTimeUnitStr() { 685 return timeUnitStr; 686 } 687 688 /** 689 * Set time unit 690 * 691 */ 692 public void setTimeUnitStr(String timeunit) { 693 this.timeUnitStr = timeunit; 694 } 695 696 public void setTimeUnit(Timeunit timeUnit) { 697 this.timeUnitStr = timeUnit.toString(); 698 } 699 700 /* (non-Javadoc) 701 * @see org.apache.oozie.client.rest.JsonCoordinatorJob#getTimeUnit() 702 */ 703 @Override 704 public Timeunit getTimeUnit() { 705 return Timeunit.valueOf(this.timeUnitStr); 706 } 707 708 /** 709 * Set order 710 * 711 * @param order 712 */ 713 public void setExecutionOrder(Execution order) { 714 this.execution = order.toString(); 715 } 716 717 /* (non-Javadoc) 718 * @see org.apache.oozie.client.rest.JsonCoordinatorJob#getExecutionOrder() 719 */ 720 @Override 721 public Execution getExecutionOrder() { 722 return Execution.valueOf(this.execution); 723 } 724 725 /** 726 * Get execution 727 * 728 * @return execution 729 */ 730 public void setExecution(String order) { 731 this.execution = order; 732 } 733 734 /** 735 * Get execution 736 * 737 * @return execution 738 */ 739 public String getExecution() { 740 return execution; 741 } 742 743 public void setLastActionTime(Date lastAction) { 744 this.lastActionTimestamp = DateUtils.convertDateToTimestamp(lastAction); 745 } 746 747 /* (non-Javadoc) 748 * @see org.apache.oozie.client.rest.JsonCoordinatorJob#getLastActionTime() 749 */ 750 @Override 751 public Date getLastActionTime() { 752 return DateUtils.toDate(lastActionTimestamp); 753 } 754 755 /** 756 * Get last action timestamp 757 * 758 * @return last action timestamp 759 */ 760 public Timestamp getLastActionTimestamp() { 761 return lastActionTimestamp; 762 } 763 764 public void setNextMaterializedTime(Date nextMaterializedTime) { 765 this.nextMaterializedTimestamp = DateUtils.convertDateToTimestamp(nextMaterializedTime); 766 } 767 768 /* (non-Javadoc) 769 * @see org.apache.oozie.client.rest.JsonCoordinatorJob#getNextMaterializedTime() 770 */ 771 @Override 772 public Date getNextMaterializedTime() { 773 return DateUtils.toDate(nextMaterializedTimestamp); 774 } 775 776 /** 777 * Set last modified time 778 * 779 * @param lastModifiedTime last modified time 780 */ 781 public void setLastModifiedTime(Date lastModifiedTime) { 782 this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime); 783 } 784 785 /** 786 * Get last modified time 787 * 788 * @return last modified time 789 */ 790 public Date getLastModifiedTime() { 791 return DateUtils.toDate(lastModifiedTimestamp); 792 } 793 794 /** 795 * Set suspended time 796 * 797 * @param suspendedTime suspended time 798 */ 799 public void setSuspendedTime(Date suspendedTime) { 800 this.suspendedTimestamp = DateUtils.convertDateToTimestamp(suspendedTime); 801 } 802 803 /** 804 * Get suspended time 805 * 806 * @return suspended time 807 */ 808 public Date getSuspendedTime() { 809 return DateUtils.toDate(suspendedTimestamp); 810 } 811 812 public void setStartTime(Date startTime) { 813 this.startTimestamp = DateUtils.convertDateToTimestamp(startTime); 814 } 815 816 /* (non-Javadoc) 817 * @see org.apache.oozie.client.rest.JsonCoordinatorJob#getStartTime() 818 */ 819 @Override 820 public Date getStartTime() { 821 return DateUtils.toDate(startTimestamp); 822 } 823 824 public void setEndTime(Date endTime) { 825 this.endTimestamp = DateUtils.convertDateToTimestamp(endTime); 826 } 827 828 public void setPauseTime(Date pauseTime) { 829 this.pauseTimestamp = DateUtils.convertDateToTimestamp(pauseTime); 830 } 831 832 @Override 833 public Date getEndTime() { 834 return DateUtils.toDate(endTimestamp); 835 } 836 837 @Override 838 public Date getPauseTime() { 839 return DateUtils.toDate(pauseTimestamp); 840 } 841 842 public Timestamp getPauseTimestamp() { 843 return pauseTimestamp; 844 } 845 846 /** 847 * Set created time 848 * 849 * @param createTime created time 850 */ 851 public void setCreatedTime(Date createTime) { 852 this.createdTimestamp = DateUtils.convertDateToTimestamp(createTime); 853 } 854 855 /** 856 * Get created time 857 * 858 * @return created time 859 */ 860 public Date getCreatedTime() { 861 return DateUtils.toDate(createdTimestamp); 862 } 863 864 /** 865 * Get created timestamp 866 * 867 * @return created timestamp 868 */ 869 public Timestamp getCreatedTimestamp() { 870 return createdTimestamp; 871 } 872 873 public String getAppPath() { 874 return appPath; 875 } 876 877 public void setAppPath(String appPath) { 878 this.appPath = appPath; 879 } 880 881 public String getAppName() { 882 return appName; 883 } 884 885 public void setAppName(String appName) { 886 this.appName = appName; 887 } 888 889 public String getId() { 890 return id; 891 } 892 893 public void setId(String id) { 894 this.id = id; 895 } 896 897 public void setExternalId(String externalId) { 898 this.externalId = externalId; 899 } 900 901 public String getExternalId() { 902 return externalId; 903 } 904 905 public String getConf() { 906 return conf == null ? null : conf.getString(); 907 } 908 909 public void setConf(String conf) { 910 if (this.conf == null) { 911 this.conf = new StringBlob(conf); 912 } 913 else { 914 this.conf.setString(conf); 915 } 916 } 917 918 public void setConfBlob(StringBlob conf) { 919 this.conf = conf; 920 } 921 922 public StringBlob getConfBlob() { 923 return conf; 924 } 925 926 public void setFrequency(String frequency) { 927 this.frequency = frequency; 928 } 929 930 public String getFrequency() { 931 return frequency; 932 } 933 934 935 public void setTimeZone(String timeZone) { 936 this.timeZone = timeZone; 937 } 938 939 public String getTimeZone() { 940 return timeZone; 941 } 942 943 public void setConcurrency(int concurrency) { 944 this.concurrency = concurrency; 945 } 946 947 public int getConcurrency() { 948 return concurrency; 949 } 950 951 public int getMatThrottling() { 952 return matThrottling; 953 } 954 955 public void setMatThrottling(int matThrottling) { 956 this.matThrottling = matThrottling; 957 } 958 959 public void setTimeout(int timeOut) { 960 this.timeOut = timeOut; 961 } 962 963 public int getTimeout() { 964 return timeOut; 965 } 966 967 public String getUser() { 968 return user; 969 } 970 971 public void setUser(String user) { 972 this.user = user; 973 } 974 975 public String getGroup() { 976 return group; 977 } 978 979 @Override 980 public String getAcl() { 981 return getGroup(); 982 } 983 984 public void setGroup(String group) { 985 this.group = group; 986 } 987 988 public String getBundleId() { 989 return bundleId; 990 } 991 992 public void setBundleId(String bundleId) { 993 this.bundleId = bundleId; 994 } 995 996 /** 997 * Return the coordinate application console URL. 998 * 999 * @return the coordinate application console URL. 1000 */ 1001 public String getConsoleUrl() { 1002 return consoleUrl; 1003 } 1004 1005 /** 1006 * Set the coordinate application console URL. 1007 * 1008 * @param consoleUrl the coordinate application console URL. 1009 */ 1010 public void setConsoleUrl(String consoleUrl) { 1011 this.consoleUrl = consoleUrl; 1012 } 1013 1014 @Override 1015 public String toString() { 1016 return MessageFormat.format("Coordinator application id[{0}] status[{1}]", getId(), getStatus()); 1017 } 1018 1019 public void setActions(List<CoordinatorActionBean> nodes) { 1020 this.actions = (nodes != null) ? nodes : new ArrayList<CoordinatorActionBean>(); 1021 } 1022 1023 @SuppressWarnings("unchecked") 1024 public List<CoordinatorAction> getActions() { 1025 return (List) actions; 1026 } 1027 1028 /** 1029 * Convert a coordinator application list into a JSONArray. 1030 * 1031 * @param applications list. 1032 * @param timeZoneId time zone to use for dates in the JSON array. 1033 * @return the corresponding JSON array. 1034 */ 1035 @SuppressWarnings("unchecked") 1036 public static JSONArray toJSONArray(List<CoordinatorJobBean> applications, String timeZoneId) { 1037 JSONArray array = new JSONArray(); 1038 if (applications != null) { 1039 for (CoordinatorJobBean application : applications) { 1040 array.add(application.toJSONObject(timeZoneId)); 1041 } 1042 } 1043 return array; 1044 } 1045 1046 public int getLastActionNumber() { 1047 return lastActionNumber; 1048 } 1049 1050 public void setLastActionNumber(int lastActionNumber) { 1051 this.lastActionNumber = lastActionNumber; 1052 } 1053 1054 /** 1055 * Set pending to true 1056 */ 1057 public void setPending() { 1058 this.pending = 1; 1059 } 1060 1061 /** 1062 * Set pending to false 1063 */ 1064 public void resetPending() { 1065 this.pending = 0; 1066 } 1067 1068 public int getNumActions() { 1069 return numActions; 1070 } 1071 1072 public void setNumActions(int numAction) { 1073 this.numActions = numAction; 1074 } 1075 1076 @SuppressWarnings("unchecked") 1077 public JSONObject toJSONObject() { 1078 return toJSONObject("GMT"); 1079 } 1080 1081 @SuppressWarnings("unchecked") 1082 public JSONObject toJSONObject(String timeZoneId) { 1083 JSONObject json = new JSONObject(); 1084 json.put(JsonTags.COORDINATOR_JOB_PATH, getAppPath()); 1085 json.put(JsonTags.COORDINATOR_JOB_NAME, getAppName()); 1086 json.put(JsonTags.COORDINATOR_JOB_ID, getId()); 1087 json.put(JsonTags.COORDINATOR_JOB_EXTERNAL_ID, getExternalId()); 1088 json.put(JsonTags.COORDINATOR_JOB_BUNDLE_ID, getBundleId()); 1089 json.put(JsonTags.COORDINATOR_JOB_CONF, getConf()); 1090 json.put(JsonTags.COORDINATOR_JOB_STATUS, getStatus().toString()); 1091 json.put(JsonTags.COORDINATOR_JOB_EXECUTIONPOLICY, getExecutionOrder().toString()); 1092 json.put(JsonTags.COORDINATOR_JOB_FREQUENCY, getFrequency()); 1093 json.put(JsonTags.COORDINATOR_JOB_TIMEUNIT, getTimeUnit().toString()); 1094 json.put(JsonTags.COORDINATOR_JOB_TIMEZONE, getTimeZone()); 1095 json.put(JsonTags.COORDINATOR_JOB_CONCURRENCY, getConcurrency()); 1096 json.put(JsonTags.COORDINATOR_JOB_TIMEOUT, getTimeout()); 1097 json.put(JsonTags.COORDINATOR_JOB_LAST_ACTION_TIME, JsonUtils.formatDateRfc822(getLastActionTime(), timeZoneId)); 1098 json.put(JsonTags.COORDINATOR_JOB_NEXT_MATERIALIZED_TIME, 1099 JsonUtils.formatDateRfc822(getNextMaterializedTime(), timeZoneId)); 1100 json.put(JsonTags.COORDINATOR_JOB_START_TIME, JsonUtils.formatDateRfc822(getStartTime(), timeZoneId)); 1101 json.put(JsonTags.COORDINATOR_JOB_END_TIME, JsonUtils.formatDateRfc822(getEndTime(), timeZoneId)); 1102 json.put(JsonTags.COORDINATOR_JOB_PAUSE_TIME, JsonUtils.formatDateRfc822(getPauseTime(), timeZoneId)); 1103 json.put(JsonTags.COORDINATOR_JOB_USER, getUser()); 1104 json.put(JsonTags.COORDINATOR_JOB_GROUP, getGroup()); 1105 json.put(JsonTags.COORDINATOR_JOB_ACL, getAcl()); 1106 json.put(JsonTags.COORDINATOR_JOB_CONSOLE_URL, getConsoleUrl()); 1107 json.put(JsonTags.COORDINATOR_JOB_MAT_THROTTLING, getMatThrottling()); 1108 json.put(JsonTags.COORDINATOR_ACTIONS, CoordinatorActionBean.toJSONArray(actions, timeZoneId)); 1109 json.put(JsonTags.TO_STRING,toString()); 1110 json.put(JsonTags.COORDINATOR_JOB_NUM_ACTION, numActions); 1111 1112 return json; 1113 } 1114 1115}