001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import java.io.DataInput; 021 import java.io.DataOutput; 022 import java.io.IOException; 023 import java.sql.Timestamp; 024 import java.util.Date; 025 026 import javax.persistence.Basic; 027 import javax.persistence.Column; 028 import javax.persistence.ColumnResult; 029 import javax.persistence.Entity; 030 import javax.persistence.Lob; 031 import javax.persistence.NamedNativeQueries; 032 import javax.persistence.NamedNativeQuery; 033 import javax.persistence.NamedQueries; 034 import javax.persistence.NamedQuery; 035 import javax.persistence.SqlResultSetMapping; 036 037 import org.apache.hadoop.io.Writable; 038 import org.apache.oozie.client.CoordinatorAction; 039 import org.apache.oozie.client.rest.JsonCoordinatorAction; 040 import org.apache.oozie.util.DateUtils; 041 import org.apache.oozie.util.WritableUtils; 042 import org.apache.openjpa.persistence.jdbc.Index; 043 044 @SqlResultSetMapping( 045 name = "CoordActionJobIdLmt", 046 columns = {@ColumnResult(name = "job_id"), 047 @ColumnResult(name = "min_lmt")}) 048 049 @Entity 050 @NamedQueries({ 051 052 @NamedQuery(name = "UPDATE_COORD_ACTION", query = "update CoordinatorActionBean w set w.actionNumber = :actionNumber, w.actionXml = :actionXml, w.consoleUrl = :consoleUrl, w.createdConf = :createdConf, w.errorCode = :errorCode, w.errorMessage = :errorMessage, w.externalStatus = :externalStatus, w.missingDependencies = :missingDependencies, w.runConf = :runConf, w.timeOut = :timeOut, w.trackerUri = :trackerUri, w.type = :type, w.createdTimestamp = :createdTime, w.externalId = :externalId, w.jobId = :jobId, w.lastModifiedTimestamp = :lastModifiedTime, w.nominalTimestamp = :nominalTime, w.slaXml = :slaXml, w.status = :status where w.id = :id"), 053 054 @NamedQuery(name = "UPDATE_COORD_ACTION_MIN", query = "update CoordinatorActionBean w set w.actionXml = :actionXml, w.missingDependencies = :missingDependencies, w.lastModifiedTimestamp = :lastModifiedTime, w.status = :status where w.id = :id"), 055 056 @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_COORDINATOR", query = "delete from CoordinatorActionBean a where a.jobId = :jobId and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status= 'KILLED')"), 057 058 @NamedQuery(name = "GET_COORD_ACTIONS", query = "select OBJECT(w) from CoordinatorActionBean w"), 059 060 @NamedQuery(name = "GET_COMPLETED_ACTIONS_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where a.createdTimestamp < :createdTime and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status = 'KILLED')"), 061 062 @NamedQuery(name = "GET_COORD_ACTION", query = "select OBJECT(a) from CoordinatorActionBean a where a.id = :id"), 063 064 @NamedQuery(name = "GET_COORD_ACTION_FOR_EXTERNALID", query = "select OBJECT(a) from CoordinatorActionBean a where a.externalId = :externalId"), 065 066 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_FIFO", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp"), 067 068 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_JOB_LIFO", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'READY' order by a.nominalTimestamp desc"), 069 070 @NamedQuery(name = "GET_COORD_RUNNING_ACTIONS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'RUNNING' OR a.status='SUBMITTED')"), 071 072 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId"), 073 074 @NamedQuery(name = "GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'WAITING'"), 075 076 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND (a.status = 'SUSPENDED' OR a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED')"), 077 078 @NamedQuery(name = "GET_COORD_ACTIONS_PENDING_FALSE_STATUS_COUNT", query = "select count(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.pending = 0 AND a.status = :status"), 079 080 @NamedQuery(name = "GET_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId"), 081 082 @NamedQuery(name = "GET_COORD_ACTION_FOR_COORD_JOB_BY_ACTION_NUMBER", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.actionNumber = :actionNumber"), 083 084 @NamedQuery(name = "GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select OBJECT(w) from CoordinatorActionBean w where w.lastModifiedTimestamp >= :lastModifiedTime"), 085 086 @NamedQuery(name = "GET_RUNNING_ACTIONS_FOR_COORD_JOB", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.status = 'RUNNING'"), 087 088 @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where a.status = 'RUNNING' AND a.lastModifiedTimestamp <= :lastModifiedTime"), 089 090 @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where (a.status = 'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <= :lastModifiedTime"), 091 092 @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select OBJECT(a) from CoordinatorActionBean a where a.pending > 0 AND (a.status = 'SUSPENDED' OR a.status = 'KILLED' OR a.status = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"), 093 094 @NamedQuery(name = "GET_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.status = 'TIMEDOUT' OR a.status = 'SUCCEEDED' OR a.status = 'KILLED' OR a.status = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), 095 096 @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"), 097 098 @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w")}) 099 100 @NamedNativeQueries({ 101 102 @NamedNativeQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.job_id as job_id, MIN(a.last_modified_time) as min_lmt from COORD_ACTIONS a where a.status = 'READY' GROUP BY a.job_id HAVING MIN(a.last_modified_time) < ?", resultSetMapping = "CoordActionJobIdLmt") 103 }) 104 public class CoordinatorActionBean extends JsonCoordinatorAction implements 105 Writable { 106 @Basic 107 @Index 108 @Column(name = "job_id") 109 private String jobId; 110 111 @Basic 112 @Index 113 @Column(name = "status") 114 private String status = null; 115 116 @Basic 117 @Column(name = "nominal_time") 118 private java.sql.Timestamp nominalTimestamp = null; 119 120 @Basic 121 @Index 122 @Column(name = "last_modified_time") 123 private java.sql.Timestamp lastModifiedTimestamp = null; 124 125 @Basic 126 @Index 127 @Column(name = "created_time") 128 private java.sql.Timestamp createdTimestamp = null; 129 130 @Basic 131 @Index 132 @Column(name = "rerun_time") 133 private java.sql.Timestamp rerunTimestamp = null; 134 135 @Basic 136 @Index 137 @Column(name = "external_id") 138 private String externalId; 139 140 @Column(name = "sla_xml") 141 @Lob 142 private String slaXml = null; 143 144 @Basic 145 @Column(name = "pending") 146 private int pending = 0; 147 148 public CoordinatorActionBean() { 149 } 150 151 /** 152 * Serialize the coordinator bean to a data output. 153 * 154 * @param dataOutput data output. 155 * @throws IOException thrown if the coordinator bean could not be serialized. 156 */ 157 public void write(DataOutput dataOutput) throws IOException { 158 WritableUtils.writeStr(dataOutput, getJobId()); 159 WritableUtils.writeStr(dataOutput, getType()); 160 WritableUtils.writeStr(dataOutput, getId()); 161 WritableUtils.writeStr(dataOutput, getCreatedConf()); 162 WritableUtils.writeStr(dataOutput, getStatus().toString()); 163 dataOutput.writeInt(getActionNumber()); 164 WritableUtils.writeStr(dataOutput, getRunConf()); 165 WritableUtils.writeStr(dataOutput, getExternalStatus()); 166 WritableUtils.writeStr(dataOutput, getTrackerUri()); 167 WritableUtils.writeStr(dataOutput, getConsoleUrl()); 168 WritableUtils.writeStr(dataOutput, getErrorCode()); 169 WritableUtils.writeStr(dataOutput, getErrorMessage()); 170 dataOutput.writeLong((getCreatedTime() != null) ? getCreatedTime().getTime() : -1); 171 dataOutput.writeLong((getLastModifiedTime() != null) ? getLastModifiedTime().getTime() : -1); 172 } 173 174 /** 175 * Deserialize a coordinator bean from a data input. 176 * 177 * @param dataInput data input. 178 * @throws IOException thrown if the workflow bean could not be deserialized. 179 */ 180 public void readFields(DataInput dataInput) throws IOException { 181 setJobId(WritableUtils.readStr(dataInput)); 182 setType(WritableUtils.readStr(dataInput)); 183 setId(WritableUtils.readStr(dataInput)); 184 setCreatedConf(WritableUtils.readStr(dataInput)); 185 setStatus(CoordinatorAction.Status.valueOf(WritableUtils.readStr(dataInput))); 186 setActionNumber(dataInput.readInt()); 187 setRunConf(WritableUtils.readStr(dataInput)); 188 setExternalStatus(WritableUtils.readStr(dataInput)); 189 setTrackerUri(WritableUtils.readStr(dataInput)); 190 setConsoleUrl(WritableUtils.readStr(dataInput)); 191 setErrorCode(WritableUtils.readStr(dataInput)); 192 setErrorMessage(WritableUtils.readStr(dataInput)); 193 long d = dataInput.readLong(); 194 if (d != -1) { 195 setCreatedTime(new Date(d)); 196 } 197 d = dataInput.readLong(); 198 if (d != -1) { 199 setLastModifiedTime(new Date(d)); 200 } 201 } 202 203 @Override 204 public String getJobId() { 205 return this.jobId; 206 } 207 208 @Override 209 public void setJobId(String id) { 210 super.setJobId(id); 211 this.jobId = id; 212 } 213 214 @Override 215 public Status getStatus() { 216 return Status.valueOf(status); 217 } 218 219 @Override 220 public void setStatus(Status status) { 221 super.setStatus(status); 222 this.status = status.toString(); 223 } 224 225 @Override 226 public void setCreatedTime(Date createdTime) { 227 this.createdTimestamp = DateUtils.convertDateToTimestamp(createdTime); 228 super.setCreatedTime(createdTime); 229 } 230 231 public void setRerunTime(Date rerunTime) { 232 this.rerunTimestamp = DateUtils.convertDateToTimestamp(rerunTime); 233 } 234 235 @Override 236 public void setNominalTime(Date nominalTime) { 237 this.nominalTimestamp = DateUtils.convertDateToTimestamp(nominalTime); 238 super.setNominalTime(nominalTime); 239 } 240 241 @Override 242 public void setLastModifiedTime(Date lastModifiedTime) { 243 this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime); 244 super.setLastModifiedTime(lastModifiedTime); 245 } 246 247 @Override 248 public Date getCreatedTime() { 249 return DateUtils.toDate(createdTimestamp); 250 } 251 252 public Timestamp getCreatedTimestamp() { 253 return createdTimestamp; 254 } 255 256 public Date getRerunTime() { 257 return DateUtils.toDate(rerunTimestamp); 258 } 259 260 public Timestamp getRerunTimestamp() { 261 return rerunTimestamp; 262 } 263 264 @Override 265 public Date getLastModifiedTime() { 266 return DateUtils.toDate(lastModifiedTimestamp); 267 } 268 269 public Timestamp getLastModifiedTimestamp() { 270 return lastModifiedTimestamp; 271 } 272 273 @Override 274 public Date getNominalTime() { 275 return DateUtils.toDate(nominalTimestamp); 276 } 277 278 public Timestamp getNominalTimestamp() { 279 return nominalTimestamp; 280 } 281 282 @Override 283 public String getExternalId() { 284 return externalId; 285 } 286 287 @Override 288 public void setExternalId(String externalId) { 289 super.setExternalId(externalId); 290 this.externalId = externalId; 291 } 292 293 public String getSlaXml() { 294 return slaXml; 295 } 296 297 public void setSlaXml(String slaXml) { 298 this.slaXml = slaXml; 299 } 300 301 /** 302 * @return true if in terminal status 303 */ 304 public boolean isTerminalStatus() { 305 boolean isTerminal = true; 306 switch (getStatus()) { 307 case WAITING: 308 case READY: 309 case SUBMITTED: 310 case RUNNING: 311 case SUSPENDED: 312 isTerminal = false; 313 break; 314 default: 315 isTerminal = true; 316 break; 317 } 318 return isTerminal; 319 } 320 321 /** 322 * Set some actions are in progress for particular coordinator action. 323 * 324 * @param pending set pending to true 325 */ 326 public void setPending(int pending) { 327 this.pending = pending; 328 } 329 330 /** 331 * increment pending and return it 332 * 333 * @return pending 334 */ 335 public int incrementAndGetPending() { 336 this.pending++; 337 return pending; 338 } 339 340 /** 341 * decrement pending and return it 342 * 343 * @return pending 344 */ 345 public int decrementAndGetPending() { 346 this.pending = Math.max(this.pending-1, 0); 347 return pending; 348 } 349 350 /** 351 * Get some actions are in progress for particular bundle action. 352 * 353 * @return pending 354 */ 355 public int getPending() { 356 return this.pending; 357 } 358 359 /** 360 * Return if the action is pending. 361 * 362 * @return if the action is pending. 363 */ 364 public boolean isPending() { 365 return pending > 0 ? true : false; 366 } 367 }