001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 019package org.apache.oozie; 020 021import java.io.DataInput; 022import java.io.DataOutput; 023import java.io.IOException; 024import java.sql.Timestamp; 025import java.util.Date; 026 027import javax.persistence.Basic; 028import javax.persistence.Column; 029import javax.persistence.Entity; 030import javax.persistence.Id; 031import javax.persistence.NamedQueries; 032import javax.persistence.NamedQuery; 033import javax.persistence.Table; 034 035import org.apache.hadoop.io.Writable; 036import org.apache.oozie.client.Job.Status; 037import org.apache.oozie.client.rest.JsonBean; 038import org.apache.oozie.util.DateUtils; 039import org.apache.oozie.util.WritableUtils; 040import org.apache.openjpa.persistence.jdbc.Index; 041import org.json.simple.JSONObject; 042 043@Entity 044@Table(name = "BUNDLE_ACTIONS") 045@NamedQueries( { 046 @NamedQuery(name = "DELETE_BUNDLE_ACTION", query = "delete from BundleActionBean w where w.bundleActionId = :bundleActionId"), 047 048 @NamedQuery(name = "UPDATE_BUNDLE_ACTION_PENDING_MODTIME", query = "update BundleActionBean w set w.lastModifiedTimestamp = :lastModifiedTime, w.pending = :pending where w.bundleActionId = :bundleActionId"), 049 050 @NamedQuery(name = "UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME", query = "update BundleActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.pending = :pending where w.bundleActionId = :bundleActionId"), 051 052 @NamedQuery(name = "UPDATE_BUNDLE_ACTION_STATUS_PENDING_MODTIME_COORDID", query = "update BundleActionBean w set w.statusStr = :status, w.lastModifiedTimestamp = :lastModifiedTime, w.pending = :pending, w.coordId = :coordId where w.bundleActionId = :bundleActionId"), 053 054 @NamedQuery(name = "GET_BUNDLE_ACTIONS_STATUS_UNIGNORED_FOR_BUNDLE", query = "select OBJECT(w) from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr <> 'IGNORED'"), 055 056 @NamedQuery(name = "GET_BUNDLE_UNIGNORED_ACTION_STATUS_PENDING_FOR_BUNDLE", query = "select w.coordId, w.statusStr, w.pending from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr <> 'IGNORED'"), 057 058 @NamedQuery(name = "GET_BUNDLE_ACTIONS", query = "select OBJECT(w) from BundleActionBean w"), 059 060 @NamedQuery(name = "GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN", query = "select w.bundleActionId, w.bundleId, w.statusStr, w.coordId, w.coordName from BundleActionBean w where w.pending > 0 AND w.lastModifiedTimestamp <= :lastModifiedTime"), 061 062 @NamedQuery(name = "GET_BUNDLE_ACTION", query = "select OBJECT(w) from BundleActionBean w where w.bundleActionId = :bundleActionId"), 063 064 @NamedQuery(name = "GET_BUNDLE_ACTIONS_COUNT", query = "select count(w) from BundleActionBean w"), 065 066 @NamedQuery(name = "GET_BUNDLE_ACTIONS_COUNT_BY_JOB", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId"), 067 068 @NamedQuery(name = "GET_BUNDLE_ACTIONS_PENDING_TRUE_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND w.pending > 0"), 069 070 @NamedQuery(name = "GET_BUNDLE_ACTIONS_NOT_EQUAL_STATUS_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr <> :status"), 071 072 @NamedQuery(name = "GET_BUNDLE_ACTIONS_NOT_TERMINATE_STATUS_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' OR w.statusStr = 'RUNNINGWITHERROR' OR w.statusStr = 'SUSPENDED' OR w.statusStr = 'SUSPENDEDWITHERROR' OR w.statusStr = 'PREPSUSPENDED' OR w.statusStr = 'PAUSED' OR w.statusStr = 'PAUSEDWITHERROR' OR w.statusStr = 'PREPPAUSED')"), 073 074 @NamedQuery(name = "GET_BUNDLE_ACTIONS_FAILED_NULL_COORD_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND w.statusStr = 'FAILED' AND w.coordId IS NULL"), 075 076 @NamedQuery(name = "GET_BUNDLE_ACTIONS_OLDER_THAN", query = "select OBJECT(w) from BundleActionBean w order by w.lastModifiedTimestamp"), 077 078 @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_BUNDLE", query = "delete from BundleActionBean a where a.bundleId = :bundleId and (a.statusStr = 'SUCCEEDED' OR a.statusStr = 'FAILED' OR a.statusStr= 'KILLED' OR a.statusStr = 'DONEWITHERROR')"), 079 080 @NamedQuery(name = "DELETE_ACTIONS_FOR_BUNDLE", query = "delete from BundleActionBean a where a.bundleId IN (:bundleId)")}) 081public class BundleActionBean implements Writable, JsonBean { 082 083 @Id 084 @Column(name = "bundle_action_id") 085 private String bundleActionId = null; 086 087 @Index 088 @Column(name = "bundle_id") 089 private String bundleId = null; 090 091 @Column(name = "coord_name") 092 private String coordName = null; 093 094 @Basic 095 @Column(name = "coord_id") 096 private String coordId = null; 097 098 @Basic 099 @Column(name = "status") 100 private String statusStr = null; 101 102 @Basic 103 @Column(name = "critical") 104 private int critical = 0; 105 106 @Basic 107 @Column(name = "pending") 108 private int pending = 0; 109 110 @Basic 111 @Column(name = "last_modified_time") 112 private java.sql.Timestamp lastModifiedTimestamp = null; 113 114 /** 115 * bundleActionId to set 116 * 117 * @param bundleActionId the bundleActionId to set 118 */ 119 public void setBundleActionId(String bundleActionId) { 120 this.bundleActionId = bundleActionId; 121 } 122 123 /** 124 * Get the Bundle Action Id. 125 * 126 * @return the bundleActionId 127 */ 128 public String getBundleActionId() { 129 return bundleActionId; 130 } 131 132 /** 133 * Get the BundleId 134 * 135 * @return bundleId 136 */ 137 public String getBundleId() { 138 return bundleId; 139 } 140 141 /** 142 * Set the Bundle Id. 143 * 144 * @param bundleId the bundle Id 145 */ 146 public void setBundleId(String bundleId) { 147 this.bundleId = bundleId; 148 } 149 150 /** 151 * Get the Coordinator name. 152 * 153 * @return coordName 154 */ 155 public String getCoordName() { 156 return coordName; 157 } 158 159 /** 160 * Set the Coordinator name. 161 * 162 * @param coordName the Coordinator name 163 */ 164 public void setCoordName(String coordName) { 165 this.coordName = coordName; 166 } 167 168 /** 169 * Get the coordinator Id. 170 * 171 * @return the coordId 172 */ 173 public String getCoordId() { 174 return coordId; 175 } 176 177 /** 178 * Set the coordinator Id. 179 * 180 * @param coordId the coordinator id 181 */ 182 public void setCoordId(String coordId) { 183 this.coordId = coordId; 184 } 185 186 /** 187 * Get the Status of the Bundle Action 188 * 189 * @return status object 190 */ 191 public Status getStatus() { 192 return Status.valueOf(this.statusStr); 193 } 194 195 /** 196 * Get the Status of the Bundle Action 197 * 198 * @return status string 199 */ 200 public String getStatusStr() { 201 return statusStr; 202 } 203 204 /** 205 * Set the Status of the Bundle Action 206 * 207 * @param statusStr status string 208 */ 209 public void setStatusStr(String statusStr) { 210 this.statusStr = statusStr; 211 } 212 213 /** 214 * Set the Status of the Bundle Action 215 * 216 * @param val the status to set 217 */ 218 public void setStatus(Status val) { 219 this.statusStr = val.toString(); 220 } 221 222 /** 223 * Set that this bundle action is critical. 224 */ 225 public void setCritical() { 226 this.critical = 1; 227 } 228 229 /** 230 * Reseset that this bundle action is not critical. 231 */ 232 public void resetCritical() { 233 this.critical = 0; 234 } 235 236 /** 237 * Return if the action is critical. 238 * 239 * @return if the action is critical. 240 */ 241 public boolean isCritical() { 242 return critical == 1 ? true : false; 243 } 244 245 /** 246 * Set some actions are in progress for particular bundle action. 247 * 248 * @param pending set pending to true 249 */ 250 public void setPending(int pending) { 251 this.pending = pending; 252 } 253 254 /** 255 * increment pending and return it 256 * 257 * @return pending 258 */ 259 public int incrementAndGetPending() { 260 this.pending++; 261 return pending; 262 } 263 264 /** 265 * decrement pending and return it 266 * 267 * @return pending 268 */ 269 public int decrementAndGetPending() { 270 this.pending = Math.max(this.pending-1, 0); 271 return pending; 272 } 273 274 /** 275 * Get some actions are in progress for particular bundle action. 276 * 277 * @return pending 278 */ 279 public int getPending() { 280 return this.pending; 281 } 282 283 /** 284 * Return if the action is pending. 285 * 286 * @return if the action is pending. 287 */ 288 public boolean isPending() { 289 return pending > 0 ? true : false; 290 } 291 292 /** 293 * @return true if in terminal status 294 */ 295 public boolean isTerminalStatus() { 296 boolean isTerminal = false; 297 switch (getStatus()) { 298 case SUCCEEDED: 299 case FAILED: 300 case KILLED: 301 case DONEWITHERROR: 302 isTerminal = true; 303 break; 304 default: 305 isTerminal = false; 306 break; 307 } 308 return isTerminal; 309 } 310 311 /** 312 * Set Last modified time. 313 * 314 * @param lastModifiedTimestamp the lastModifiedTimestamp to set 315 */ 316 public void setLastModifiedTimestamp(java.sql.Timestamp lastModifiedTimestamp) { 317 this.lastModifiedTimestamp = lastModifiedTimestamp; 318 } 319 320 /** 321 * Set Last modified time. 322 * 323 * @param lastModifiedTime the lastModifiedTime to set 324 */ 325 public void setLastModifiedTime(Date lastModifiedTime) { 326 this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime); 327 } 328 329 /** 330 * Get Last modified time. 331 * 332 * @return lastModifiedTime 333 */ 334 public Date getLastModifiedTime() { 335 return DateUtils.toDate(lastModifiedTimestamp); 336 } 337 338 /** 339 * Get Last modified time. 340 * 341 * @return lastModifiedTimestamp 342 */ 343 public Timestamp getLastModifiedTimestamp() { 344 return lastModifiedTimestamp; 345 } 346 347 @Override 348 public void write(DataOutput dataOutput) throws IOException { 349 WritableUtils.writeStr(dataOutput, getBundleActionId()); 350 WritableUtils.writeStr(dataOutput, getBundleId()); 351 WritableUtils.writeStr(dataOutput, getCoordName()); 352 WritableUtils.writeStr(dataOutput, getCoordId()); 353 WritableUtils.writeStr(dataOutput, getStatusStr()); 354 dataOutput.writeInt(critical); 355 dataOutput.writeInt(pending); 356 dataOutput.writeLong((getLastModifiedTimestamp() != null) ? getLastModifiedTimestamp().getTime() : -1); 357 } 358 359 @Override 360 public void readFields(DataInput dataInput) throws IOException { 361 setBundleActionId(WritableUtils.readStr(dataInput)); 362 setBundleId(WritableUtils.readStr(dataInput)); 363 setCoordName(WritableUtils.readStr(dataInput)); 364 setCoordId(WritableUtils.readStr(dataInput)); 365 setStatus(Status.valueOf(WritableUtils.readStr(dataInput))); 366 critical = dataInput.readInt(); 367 pending = dataInput.readInt(); 368 long d = dataInput.readLong(); 369 if (d != -1) { 370 setLastModifiedTime(new Date(d)); 371 } 372 } 373 374 @Override 375 public JSONObject toJSONObject() { 376 return null; 377 } 378 379 @Override 380 public JSONObject toJSONObject(String timeZoneId) { 381 return null; 382 } 383}