001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.executor.jpa;
020
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025
026import javax.persistence.EntityManager;
027import javax.persistence.Query;
028
029import org.apache.oozie.CoordinatorActionBean;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.StringBlob;
032import org.apache.oozie.service.JPAService;
033import org.apache.oozie.service.Services;
034
035/**
036 * Query Executor that provides API to run query for Coordinator Action
037 */
038public class CoordActionQueryExecutor extends
039        QueryExecutor<CoordinatorActionBean, CoordActionQueryExecutor.CoordActionQuery> {
040
041    public enum CoordActionQuery {
042        UPDATE_COORD_ACTION,
043        UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
044        UPDATE_COORD_ACTION_FOR_INPUTCHECK,
045        UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK,
046        UPDATE_COORD_ACTION_DEPENDENCIES,
047        UPDATE_COORD_ACTION_FOR_START,
048        UPDATE_COORD_ACTION_FOR_MODIFIED_DATE,
049        UPDATE_COORD_ACTION_RERUN,
050        GET_COORD_ACTION,
051        GET_COORD_ACTION_STATUS,
052        GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID,
053        GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME,
054        GET_COORD_ACTIONS_STATUS_UNIGNORED,
055        GET_COORD_ACTIONS_PENDING_COUNT,
056        GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE,
057        GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE,
058        GET_TERMINATED_ACTIONS_FOR_DATES,
059        GET_TERMINATED_ACTION_IDS_FOR_DATES,
060        GET_ACTIVE_ACTIONS_FOR_DATES,
061        GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN,
062        GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN,
063        GET_COORD_ACTION_FOR_SLA
064    };
065
066    private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor();
067
068    private CoordActionQueryExecutor() {
069    }
070
071    public static QueryExecutor<CoordinatorActionBean, CoordActionQueryExecutor.CoordActionQuery> getInstance() {
072        return CoordActionQueryExecutor.instance;
073    }
074
075    @Override
076    public Query getUpdateQuery(CoordActionQuery namedQuery, CoordinatorActionBean actionBean, EntityManager em)
077            throws JPAExecutorException {
078
079        Query query = em.createNamedQuery(namedQuery.name());
080        switch (namedQuery) {
081            case UPDATE_COORD_ACTION:
082                query.setParameter("actionNumber", actionBean.getActionNumber());
083                query.setParameter("actionXml", actionBean.getActionXmlBlob());
084                query.setParameter("consoleUrl", actionBean.getConsoleUrl());
085                query.setParameter("createdConf", actionBean.getCreatedConfBlob());
086                query.setParameter("errorCode", actionBean.getErrorCode());
087                query.setParameter("errorMessage", actionBean.getErrorMessage());
088                query.setParameter("externalStatus", actionBean.getExternalStatus());
089                query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob());
090                query.setParameter("runConf", actionBean.getRunConfBlob());
091                query.setParameter("timeOut", actionBean.getTimeOut());
092                query.setParameter("trackerUri", actionBean.getTrackerUri());
093                query.setParameter("type", actionBean.getType());
094                query.setParameter("createdTime", actionBean.getCreatedTimestamp());
095                query.setParameter("externalId", actionBean.getExternalId());
096                query.setParameter("jobId", actionBean.getJobId());
097                query.setParameter("lastModifiedTime", new Date());
098                query.setParameter("nominalTime", actionBean.getNominalTimestamp());
099                query.setParameter("slaXml", actionBean.getSlaXmlBlob());
100                query.setParameter("status", actionBean.getStatus().toString());
101                query.setParameter("id", actionBean.getId());
102                break;
103
104            case UPDATE_COORD_ACTION_STATUS_PENDING_TIME:
105                query.setParameter("status", actionBean.getStatus().toString());
106                query.setParameter("pending", actionBean.getPending());
107                query.setParameter("lastModifiedTime", new Date());
108                query.setParameter("id", actionBean.getId());
109                break;
110
111            case UPDATE_COORD_ACTION_FOR_INPUTCHECK:
112                query.setParameter("status", actionBean.getStatus().toString());
113                query.setParameter("lastModifiedTime", new Date());
114                query.setParameter("actionXml", actionBean.getActionXmlBlob());
115                query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob());
116                query.setParameter("id", actionBean.getId());
117                break;
118
119            case UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK:
120                query.setParameter("status", actionBean.getStatus().toString());
121                query.setParameter("lastModifiedTime", new Date());
122                query.setParameter("actionXml", actionBean.getActionXmlBlob());
123                query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob());
124                query.setParameter("id", actionBean.getId());
125                break;
126
127            case UPDATE_COORD_ACTION_DEPENDENCIES:
128                query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob());
129                query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob());
130                query.setParameter("id", actionBean.getId());
131                break;
132
133            case UPDATE_COORD_ACTION_FOR_START:
134                query.setParameter("status", actionBean.getStatus().toString());
135                query.setParameter("lastModifiedTime", new Date());
136                query.setParameter("runConf", actionBean.getRunConfBlob());
137                query.setParameter("externalId", actionBean.getExternalId());
138                query.setParameter("pending", actionBean.getPending());
139                query.setParameter("errorCode", actionBean.getErrorCode());
140                query.setParameter("errorMessage", actionBean.getErrorMessage());
141                query.setParameter("id", actionBean.getId());
142                break;
143
144            case UPDATE_COORD_ACTION_FOR_MODIFIED_DATE:
145                query.setParameter("lastModifiedTime", actionBean.getLastModifiedTimestamp());
146                query.setParameter("id", actionBean.getId());
147                break;
148
149            case UPDATE_COORD_ACTION_RERUN:
150                query.setParameter("actionXml", actionBean.getActionXmlBlob());
151                query.setParameter("status", actionBean.getStatusStr());
152                query.setParameter("externalId", actionBean.getExternalId());
153                query.setParameter("externalStatus", actionBean.getExternalStatus());
154                query.setParameter("rerunTime", actionBean.getRerunTimestamp());
155                query.setParameter("lastModifiedTime", actionBean.getLastModifiedTimestamp());
156                query.setParameter("createdTime", actionBean.getCreatedTimestamp());
157                query.setParameter("createdConf", actionBean.getCreatedConfBlob());
158                query.setParameter("runConf", actionBean.getRunConfBlob());
159                query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob());
160                query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob());
161                query.setParameter("errorCode", actionBean.getErrorCode());
162                query.setParameter("errorMessage", actionBean.getErrorMessage());
163                query.setParameter("id", actionBean.getId());
164                break;
165
166            default:
167                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
168                        + namedQuery.name());
169        }
170        return query;
171    }
172
173    @Override
174    public Query getSelectQuery(CoordActionQuery namedQuery, EntityManager em, Object... parameters)
175            throws JPAExecutorException {
176        Query query = em.createNamedQuery(namedQuery.name());
177        CoordActionQuery caQuery = (CoordActionQuery) namedQuery;
178        switch (caQuery) {
179            case GET_COORD_ACTION:
180            case GET_COORD_ACTION_STATUS:
181            case GET_COORD_ACTION_FOR_SLA:
182                query.setParameter("id", parameters[0]);
183                break;
184            case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
185                query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
186                break;
187            case GET_COORD_ACTIONS_STATUS_UNIGNORED:
188                query.setParameter("jobId", parameters[0]);
189                break;
190            case GET_COORD_ACTIONS_PENDING_COUNT:
191                query.setParameter("jobId", parameters[0]);
192                break;
193            case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE:
194            query.setParameter("ids", parameters[0]);
195            break;
196            case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE:
197            query.setParameter("jobId", parameters[0]);
198            break;
199            case GET_TERMINATED_ACTIONS_FOR_DATES:
200            case GET_TERMINATED_ACTION_IDS_FOR_DATES:
201            case GET_ACTIVE_ACTIONS_FOR_DATES:
202                query.setParameter("jobId", parameters[0]);
203                query.setParameter("startTime", new Timestamp(((Date) parameters[1]).getTime()));
204                query.setParameter("endTime", new Timestamp(((Date) parameters[2]).getTime()));
205                break;
206            case GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN:
207                query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
208                break;
209            case GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN:
210                query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
211                query.setParameter("currentTime", new Timestamp(new Date().getTime()));
212                break;
213
214            default:
215                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
216                        + caQuery.name());
217        }
218        return query;
219    }
220
221    @Override
222    public int executeUpdate(CoordActionQuery namedQuery, CoordinatorActionBean jobBean) throws JPAExecutorException {
223        JPAService jpaService = Services.get().get(JPAService.class);
224        EntityManager em = jpaService.getEntityManager();
225        Query query = getUpdateQuery(namedQuery, jobBean, em);
226        int ret = jpaService.executeUpdate(namedQuery.name(), query, em);
227        return ret;
228    }
229
230    @Override
231    public CoordinatorActionBean get(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
232        CoordinatorActionBean bean = getIfExist(namedQuery, parameters);
233        if (bean == null) {
234            throw new JPAExecutorException(ErrorCode.E0605, getSelectQuery(namedQuery,
235                    Services.get().get(JPAService.class).getEntityManager(), parameters).toString());
236        }
237        return bean;
238    }
239
240    @Override
241    public CoordinatorActionBean getIfExist(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
242        JPAService jpaService = Services.get().get(JPAService.class);
243        EntityManager em = jpaService.getEntityManager();
244        Query query = getSelectQuery(namedQuery, em, parameters);
245        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
246        if (ret == null) {
247            return null;
248        }
249        CoordinatorActionBean bean = constructBean(namedQuery, ret);
250        return bean;
251    }
252
253    @Override
254    public List<CoordinatorActionBean> getList(CoordActionQuery namedQuery, Object... parameters)
255            throws JPAExecutorException {
256        JPAService jpaService = Services.get().get(JPAService.class);
257        EntityManager em = jpaService.getEntityManager();
258        Query query = getSelectQuery(namedQuery, em, parameters);
259        List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
260        List<CoordinatorActionBean> beanList = new ArrayList<CoordinatorActionBean>();
261        if (retList != null) {
262            for (Object ret : retList) {
263                beanList.add(constructBean(namedQuery, ret));
264            }
265        }
266        return beanList;
267    }
268
269    private CoordinatorActionBean constructBean(CoordActionQuery namedQuery, Object ret) throws JPAExecutorException {
270        CoordinatorActionBean bean;
271        Object[] arr;
272        switch (namedQuery) {
273            case GET_COORD_ACTION:
274                bean = (CoordinatorActionBean) ret;
275                break;
276            case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
277                bean = new CoordinatorActionBean();
278                bean.setJobId((String) ret);
279                break;
280            case GET_COORD_ACTION_STATUS:
281                bean = new CoordinatorActionBean();
282                bean.setStatusStr((String)ret);
283                break;
284            case GET_COORD_ACTIONS_STATUS_UNIGNORED:
285                arr = (Object[]) ret;
286                bean = new CoordinatorActionBean();
287                bean.setStatusStr((String)arr[0]);
288                bean.setPending((Integer)arr[1]);
289                break;
290            case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE:
291            case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE:
292                arr = (Object[]) ret;
293                bean = new CoordinatorActionBean();
294                bean.setId((String)arr[0]);
295                bean.setNominalTime((Timestamp)arr[1]);
296                bean.setCreatedTime((Timestamp)arr[2]);
297                bean.setActionXmlBlob((StringBlob)arr[3]);
298                break;
299            case GET_TERMINATED_ACTIONS_FOR_DATES:
300                bean = (CoordinatorActionBean) ret;
301                break;
302            case GET_TERMINATED_ACTION_IDS_FOR_DATES:
303                bean = new CoordinatorActionBean();
304                bean.setId((String) ret);
305                break;
306            case GET_ACTIVE_ACTIONS_FOR_DATES:
307                arr = (Object[]) ret;
308                bean = new CoordinatorActionBean();
309                bean.setId((String)arr[0]);
310                bean.setJobId((String)arr[1]);
311                bean.setStatusStr((String) arr[2]);
312                bean.setExternalId((String) arr[3]);
313                bean.setPending((Integer) arr[4]);
314                bean.setNominalTime((Timestamp) arr[5]);
315                bean.setCreatedTime((Timestamp) arr[6]);
316                break;
317            case  GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN:
318                arr = (Object[]) ret;
319                bean = new CoordinatorActionBean();
320                bean.setId((String)arr[0]);
321                bean.setJobId((String)arr[1]);
322                bean.setStatusStr((String) arr[2]);
323                bean.setExternalId((String) arr[3]);
324                bean.setPending((Integer) arr[4]);
325                break;
326            case    GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN:
327                arr = (Object[]) ret;
328                bean = new CoordinatorActionBean();
329                bean.setId((String)arr[0]);
330                bean.setJobId((String)arr[1]);
331                bean.setStatusStr((String) arr[2]);
332                bean.setExternalId((String) arr[3]);
333                bean.setPushMissingDependenciesBlob((StringBlob) arr[4]);
334                break;
335            case GET_COORD_ACTION_FOR_SLA:
336                arr = (Object[]) ret;
337                bean = new CoordinatorActionBean();
338                bean.setId((String) arr[0]);
339                bean.setJobId((String) arr[1]);
340                bean.setStatusStr((String) arr[2]);
341                bean.setExternalId((String) arr[3]);
342                bean.setLastModifiedTime((Timestamp) arr[4]);
343                break;
344
345            default:
346                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
347                        + namedQuery.name());
348        }
349        return bean;
350    }
351
352    @Override
353    public Object getSingleValue(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
354        JPAService jpaService = Services.get().get(JPAService.class);
355        EntityManager em = jpaService.getEntityManager();
356        Query query = getSelectQuery(namedQuery, em, parameters);
357        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
358        if (ret == null) {
359            throw new JPAExecutorException(ErrorCode.E0604, query.toString());
360        }
361        return ret;
362    }
363}