001 /** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018 package org.apache.oozie; 019 020 import java.io.DataInput; 021 import java.io.DataOutput; 022 import java.io.IOException; 023 import java.sql.Timestamp; 024 import java.util.Date; 025 026 import javax.persistence.Basic; 027 import javax.persistence.Column; 028 import javax.persistence.DiscriminatorColumn; 029 import javax.persistence.DiscriminatorType; 030 import javax.persistence.Entity; 031 import javax.persistence.Id; 032 import javax.persistence.NamedQueries; 033 import javax.persistence.NamedQuery; 034 import javax.persistence.Table; 035 036 import org.apache.hadoop.io.Writable; 037 import org.apache.oozie.client.Job.Status; 038 import org.apache.oozie.util.DateUtils; 039 import org.apache.oozie.util.WritableUtils; 040 import org.apache.openjpa.persistence.jdbc.Index; 041 042 @Entity 043 @Table(name = "BUNDLE_ACTIONS") 044 @DiscriminatorColumn(name = "bean_type", discriminatorType = DiscriminatorType.STRING) 045 @NamedQueries( { 046 @NamedQuery(name = "DELETE_BUNDLE_ACTION", query = "delete from BundleActionBean w where w.bundleActionId = :bundleActionId"), 047 048 @NamedQuery(name = "GET_BUNDLE_ACTIONS_FOR_BUNDLE", query = "select OBJECT(w) from BundleActionBean w where w.bundleId = :bundleId"), 049 050 @NamedQuery(name = "GET_BUNDLE_ACTIONS", query = "select OBJECT(w) from BundleActionBean w"), 051 052 @NamedQuery(name = "GET_BUNDLE_ACTIONS_BY_LAST_MODIFIED_TIME", query = "select OBJECT(w) from BundleActionBean w where w.lastModifiedTimestamp >= :lastModifiedTime"), 053 054 @NamedQuery(name = "GET_BUNDLE_WAITING_ACTIONS_OLDER_THAN", query = "select OBJECT(a) from BundleActionBean a where a.pending > 0 AND a.lastModifiedTimestamp <= :lastModifiedTime"), 055 056 @NamedQuery(name = "GET_BUNDLE_ACTION", query = "select OBJECT(w) from BundleActionBean w where w.bundleActionId = :bundleActionId"), 057 058 @NamedQuery(name = "GET_BUNDLE_ACTIONS_COUNT", query = "select count(w) from BundleActionBean w"), 059 060 @NamedQuery(name = "GET_BUNDLE_ACTIONS_COUNT_BY_JOB", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId"), 061 062 @NamedQuery(name = "GET_BUNDLE_ACTIONS_PENDING_TRUE_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND w.pending > 0"), 063 064 @NamedQuery(name = "GET_BUNDLE_ACTIONS_NOT_EQUAL_STATUS_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND w.status <> :status"), 065 066 @NamedQuery(name = "GET_BUNDLE_ACTIONS_NOT_TERMINATE_STATUS_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND (w.status = 'PERP' OR w.status = 'RUNNING' OR w.status = 'SUSPENDED' OR w.status = 'PREPSUSPENDED' OR w.status = 'PAUSED' OR w.status = 'PREPPAUSED')"), 067 068 @NamedQuery(name = "GET_BUNDLE_ACTIONS_FAILED_NULL_COORD_COUNT", query = "select count(w) from BundleActionBean w where w.bundleId = :bundleId AND w.status = 'FAILED' AND w.coordId IS NULL"), 069 070 @NamedQuery(name = "GET_BUNDLE_ACTIONS_OLDER_THAN", query = "select OBJECT(w) from BundleActionBean w order by w.lastModifiedTimestamp"), 071 072 @NamedQuery(name = "DELETE_COMPLETED_ACTIONS_FOR_BUNDLE", query = "delete from BundleActionBean a where a.bundleId = :bundleId and (a.status = 'SUCCEEDED' OR a.status = 'FAILED' OR a.status= 'KILLED' OR a.status = 'DONEWITHERROR')")}) 073 public class BundleActionBean implements Writable { 074 075 @Id 076 @Column(name = "bundle_action_id") 077 private String bundleActionId = null; 078 079 @Column(name = "bundle_id") 080 private String bundleId = null; 081 082 @Column(name = "coord_name") 083 private String coordName = null; 084 085 @Basic 086 @Column(name = "coord_id") 087 private String coordId = null; 088 089 @Basic 090 @Column(name = "status") 091 private String status = null; 092 093 @Basic 094 @Column(name = "critical") 095 private int critical = 0; 096 097 @Basic 098 @Column(name = "pending") 099 private int pending = 0; 100 101 @Basic 102 @Column(name = "last_modified_time") 103 private java.sql.Timestamp lastModifiedTimestamp = null; 104 105 /** 106 * bundleActionId to set 107 * 108 * @param bundleActionId the bundleActionId to set 109 */ 110 public void setBundleActionId(String bundleActionId) { 111 this.bundleActionId = bundleActionId; 112 } 113 114 /** 115 * Get the Bundle Action Id. 116 * 117 * @return the bundleActionId 118 */ 119 public String getBundleActionId() { 120 return bundleActionId; 121 } 122 123 /** 124 * Get the BundleId 125 * 126 * @return bundleId 127 */ 128 public String getBundleId() { 129 return bundleId; 130 } 131 132 /** 133 * Set the Bundle Id. 134 * 135 * @param bundleId 136 */ 137 public void setBundleId(String bundleId) { 138 this.bundleId = bundleId; 139 } 140 141 /** 142 * Get the Coordinator name. 143 * 144 * @return coordName 145 */ 146 public String getCoordName() { 147 return coordName; 148 } 149 150 /** 151 * Set the Coordinator name. 152 * 153 * @param coordName 154 */ 155 public void setCoordName(String coordName) { 156 this.coordName = coordName; 157 } 158 159 /** 160 * Get the coordinator Id. 161 * 162 * @return the coordId 163 */ 164 public String getCoordId() { 165 return coordId; 166 } 167 168 /** 169 * Set the coordinator Id. 170 * 171 * @param coordId 172 */ 173 public void setCoordId(String coordId) { 174 this.coordId = coordId; 175 } 176 177 /** 178 * Get the Status of the Bundle Action 179 * 180 * @return status object 181 */ 182 public Status getStatus() { 183 return Status.valueOf(this.status); 184 } 185 186 /** 187 * Get the Status of the Bundle Action 188 * 189 * @return status string 190 */ 191 public String getStatusStr() { 192 return status; 193 } 194 195 /** 196 * Set the Status of the Bundle Action 197 * 198 * @param val 199 */ 200 public void setStatus(Status val) { 201 this.status = val.toString(); 202 } 203 204 /** 205 * Set Whether this bundle action is critical or not. 206 * 207 * @param critical set critical to true 208 */ 209 public void setCritical() { 210 this.critical = 1; 211 } 212 213 /** 214 * Reseset Whether this bundle action is critical or not. 215 * 216 * @param critical set critical to false 217 */ 218 public void resetCritical() { 219 this.critical = 0; 220 } 221 222 /** 223 * Return if the action is critical. 224 * 225 * @return if the action is critical. 226 */ 227 public boolean isCritical() { 228 return critical == 1 ? true : false; 229 } 230 231 /** 232 * Set some actions are in progress for particular bundle action. 233 * 234 * @param pending set pending to true 235 */ 236 public void setPending(int pending) { 237 this.pending = pending; 238 } 239 240 /** 241 * increment pending and return it 242 * 243 * @return pending 244 */ 245 public int incrementAndGetPending() { 246 this.pending++; 247 return pending; 248 } 249 250 /** 251 * decrement pending and return it 252 * 253 * @return pending 254 */ 255 public int decrementAndGetPending() { 256 this.pending = Math.max(this.pending-1, 0); 257 return pending; 258 } 259 260 /** 261 * Get some actions are in progress for particular bundle action. 262 * 263 * @return pending 264 */ 265 public int getPending() { 266 return this.pending; 267 } 268 269 /** 270 * Return if the action is pending. 271 * 272 * @return if the action is pending. 273 */ 274 public boolean isPending() { 275 return pending > 0 ? true : false; 276 } 277 278 /** 279 * Set Last modified time. 280 * 281 * @param lastModifiedTimestamp the lastModifiedTimestamp to set 282 */ 283 public void setLastModifiedTimestamp(java.sql.Timestamp lastModifiedTimestamp) { 284 this.lastModifiedTimestamp = lastModifiedTimestamp; 285 } 286 287 /** 288 * Set Last modified time. 289 * 290 * @param lastModifiedTime the lastModifiedTime to set 291 */ 292 public void setLastModifiedTime(Date lastModifiedTime) { 293 this.lastModifiedTimestamp = DateUtils.convertDateToTimestamp(lastModifiedTime); 294 } 295 296 /** 297 * Get Last modified time. 298 * 299 * @return lastModifiedTime 300 */ 301 public Date getLastModifiedTime() { 302 return DateUtils.toDate(lastModifiedTimestamp); 303 } 304 305 /** 306 * Get Last modified time. 307 * 308 * @return lastModifiedTimestamp 309 */ 310 public Timestamp getLastModifiedTimestamp() { 311 return lastModifiedTimestamp; 312 } 313 314 /* (non-Javadoc) 315 * @see org.apache.hadoop.io.Writable#write(java.io.DataOutput) 316 */ 317 @Override 318 public void write(DataOutput dataOutput) throws IOException { 319 WritableUtils.writeStr(dataOutput, getBundleActionId()); 320 WritableUtils.writeStr(dataOutput, getBundleId()); 321 WritableUtils.writeStr(dataOutput, getCoordName()); 322 WritableUtils.writeStr(dataOutput, getCoordId()); 323 WritableUtils.writeStr(dataOutput, getStatusStr()); 324 dataOutput.writeInt(critical); 325 dataOutput.writeInt(pending); 326 dataOutput.writeLong((getLastModifiedTimestamp() != null) ? getLastModifiedTimestamp().getTime() : -1); 327 } 328 329 /* (non-Javadoc) 330 * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput) 331 */ 332 @Override 333 public void readFields(DataInput dataInput) throws IOException { 334 setBundleActionId(WritableUtils.readStr(dataInput)); 335 setBundleId(WritableUtils.readStr(dataInput)); 336 setCoordName(WritableUtils.readStr(dataInput)); 337 setCoordId(WritableUtils.readStr(dataInput)); 338 setStatus(Status.valueOf(WritableUtils.readStr(dataInput))); 339 critical = dataInput.readInt(); 340 pending = dataInput.readInt(); 341 long d = dataInput.readLong(); 342 if (d != -1) { 343 setLastModifiedTime(new Date(d)); 344 } 345 } 346 }