001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018
019package org.apache.oozie.executor.jpa;
020
021import java.sql.Timestamp;
022import java.util.ArrayList;
023import java.util.Date;
024import java.util.List;
025
026import javax.persistence.EntityManager;
027import javax.persistence.Query;
028
029import org.apache.oozie.CoordinatorActionBean;
030import org.apache.oozie.ErrorCode;
031import org.apache.oozie.StringBlob;
032import org.apache.oozie.client.CoordinatorAction;
033import org.apache.oozie.service.JPAService;
034import org.apache.oozie.service.Services;
035import org.apache.oozie.util.DateUtils;
036
037/**
038 * Query Executor that provides API to run query for Coordinator Action
039 */
040public class CoordActionQueryExecutor extends
041        QueryExecutor<CoordinatorActionBean, CoordActionQueryExecutor.CoordActionQuery> {
042
043    public enum CoordActionQuery {
044        UPDATE_COORD_ACTION,
045        UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
046        UPDATE_COORD_ACTION_FOR_INPUTCHECK,
047        UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK,
048        UPDATE_COORD_ACTION_DEPENDENCIES,
049        UPDATE_COORD_ACTION_FOR_START,
050        UPDATE_COORD_ACTION_FOR_MODIFIED_DATE,
051        UPDATE_COORD_ACTION_RERUN,
052        GET_COORD_ACTION,
053        GET_COORD_ACTION_STATUS,
054        GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID,
055        GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME,
056        GET_COORD_ACTIONS_STATUS_UNIGNORED,
057        GET_COORD_ACTIONS_PENDING_COUNT,
058        GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE,
059        GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE,
060        GET_TERMINATED_ACTIONS_FOR_DATES,
061        GET_TERMINATED_ACTION_IDS_FOR_DATES,
062        GET_ACTIVE_ACTIONS_FOR_DATES,
063        GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN,
064        GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN,
065        GET_COORD_ACTION_FOR_SLA,
066        GET_COORD_ACTION_FOR_INPUTCHECK
067    };
068
069    private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor();
070
071    private CoordActionQueryExecutor() {
072    }
073
074    public static QueryExecutor<CoordinatorActionBean, CoordActionQueryExecutor.CoordActionQuery> getInstance() {
075        return CoordActionQueryExecutor.instance;
076    }
077
078    @Override
079    public Query getUpdateQuery(CoordActionQuery namedQuery, CoordinatorActionBean actionBean, EntityManager em)
080            throws JPAExecutorException {
081
082        Query query = em.createNamedQuery(namedQuery.name());
083        switch (namedQuery) {
084            case UPDATE_COORD_ACTION:
085                query.setParameter("actionNumber", actionBean.getActionNumber());
086                query.setParameter("actionXml", actionBean.getActionXmlBlob());
087                query.setParameter("consoleUrl", actionBean.getConsoleUrl());
088                query.setParameter("createdConf", actionBean.getCreatedConfBlob());
089                query.setParameter("errorCode", actionBean.getErrorCode());
090                query.setParameter("errorMessage", actionBean.getErrorMessage());
091                query.setParameter("externalStatus", actionBean.getExternalStatus());
092                query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob());
093                query.setParameter("runConf", actionBean.getRunConfBlob());
094                query.setParameter("timeOut", actionBean.getTimeOut());
095                query.setParameter("trackerUri", actionBean.getTrackerUri());
096                query.setParameter("type", actionBean.getType());
097                query.setParameter("createdTime", actionBean.getCreatedTimestamp());
098                query.setParameter("externalId", actionBean.getExternalId());
099                query.setParameter("jobId", actionBean.getJobId());
100                query.setParameter("lastModifiedTime", new Date());
101                query.setParameter("nominalTime", actionBean.getNominalTimestamp());
102                query.setParameter("slaXml", actionBean.getSlaXmlBlob());
103                query.setParameter("status", actionBean.getStatus().toString());
104                query.setParameter("id", actionBean.getId());
105                break;
106
107            case UPDATE_COORD_ACTION_STATUS_PENDING_TIME:
108                query.setParameter("status", actionBean.getStatus().toString());
109                query.setParameter("pending", actionBean.getPending());
110                query.setParameter("lastModifiedTime", new Date());
111                query.setParameter("id", actionBean.getId());
112                break;
113
114            case UPDATE_COORD_ACTION_FOR_INPUTCHECK:
115                query.setParameter("status", actionBean.getStatus().toString());
116                query.setParameter("lastModifiedTime", new Date());
117                query.setParameter("actionXml", actionBean.getActionXmlBlob());
118                query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob());
119                query.setParameter("id", actionBean.getId());
120                break;
121
122            case UPDATE_COORD_ACTION_FOR_PUSH_INPUTCHECK:
123                query.setParameter("status", actionBean.getStatus().toString());
124                query.setParameter("lastModifiedTime", new Date());
125                query.setParameter("actionXml", actionBean.getActionXmlBlob());
126                query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob());
127                query.setParameter("id", actionBean.getId());
128                break;
129
130            case UPDATE_COORD_ACTION_DEPENDENCIES:
131                query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob());
132                query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob());
133                query.setParameter("id", actionBean.getId());
134                break;
135
136            case UPDATE_COORD_ACTION_FOR_START:
137                query.setParameter("status", actionBean.getStatus().toString());
138                query.setParameter("lastModifiedTime", new Date());
139                query.setParameter("runConf", actionBean.getRunConfBlob());
140                query.setParameter("externalId", actionBean.getExternalId());
141                query.setParameter("pending", actionBean.getPending());
142                query.setParameter("errorCode", actionBean.getErrorCode());
143                query.setParameter("errorMessage", actionBean.getErrorMessage());
144                query.setParameter("id", actionBean.getId());
145                break;
146
147            case UPDATE_COORD_ACTION_FOR_MODIFIED_DATE:
148                query.setParameter("lastModifiedTime", actionBean.getLastModifiedTimestamp());
149                query.setParameter("id", actionBean.getId());
150                break;
151
152            case UPDATE_COORD_ACTION_RERUN:
153                query.setParameter("actionXml", actionBean.getActionXmlBlob());
154                query.setParameter("status", actionBean.getStatusStr());
155                query.setParameter("externalId", actionBean.getExternalId());
156                query.setParameter("externalStatus", actionBean.getExternalStatus());
157                query.setParameter("rerunTime", actionBean.getRerunTimestamp());
158                query.setParameter("lastModifiedTime", actionBean.getLastModifiedTimestamp());
159                query.setParameter("createdTime", actionBean.getCreatedTimestamp());
160                query.setParameter("createdConf", actionBean.getCreatedConfBlob());
161                query.setParameter("runConf", actionBean.getRunConfBlob());
162                query.setParameter("missingDependencies", actionBean.getMissingDependenciesBlob());
163                query.setParameter("pushMissingDependencies", actionBean.getPushMissingDependenciesBlob());
164                query.setParameter("errorCode", actionBean.getErrorCode());
165                query.setParameter("errorMessage", actionBean.getErrorMessage());
166                query.setParameter("id", actionBean.getId());
167                break;
168
169            default:
170                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
171                        + namedQuery.name());
172        }
173        return query;
174    }
175
176    @Override
177    public Query getSelectQuery(CoordActionQuery namedQuery, EntityManager em, Object... parameters)
178            throws JPAExecutorException {
179        Query query = em.createNamedQuery(namedQuery.name());
180        CoordActionQuery caQuery = (CoordActionQuery) namedQuery;
181        switch (caQuery) {
182            case GET_COORD_ACTION:
183            case GET_COORD_ACTION_STATUS:
184            case GET_COORD_ACTION_FOR_SLA:
185            case GET_COORD_ACTION_FOR_INPUTCHECK:
186                query.setParameter("id", parameters[0]);
187                break;
188            case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
189                query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
190                break;
191            case GET_COORD_ACTIONS_STATUS_UNIGNORED:
192                query.setParameter("jobId", parameters[0]);
193                break;
194            case GET_COORD_ACTIONS_PENDING_COUNT:
195                query.setParameter("jobId", parameters[0]);
196                break;
197            case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE:
198            query.setParameter("ids", parameters[0]);
199            break;
200            case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE:
201            query.setParameter("jobId", parameters[0]);
202            break;
203            case GET_TERMINATED_ACTIONS_FOR_DATES:
204            case GET_TERMINATED_ACTION_IDS_FOR_DATES:
205            case GET_ACTIVE_ACTIONS_FOR_DATES:
206                query.setParameter("jobId", parameters[0]);
207                query.setParameter("startTime", new Timestamp(((Date) parameters[1]).getTime()));
208                query.setParameter("endTime", new Timestamp(((Date) parameters[2]).getTime()));
209                break;
210            case GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN:
211                query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
212                break;
213            case GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN:
214                query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime()));
215                query.setParameter("currentTime", new Timestamp(new Date().getTime()));
216                break;
217
218            default:
219                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
220                        + caQuery.name());
221        }
222        return query;
223    }
224
225    @Override
226    public int executeUpdate(CoordActionQuery namedQuery, CoordinatorActionBean jobBean) throws JPAExecutorException {
227        JPAService jpaService = Services.get().get(JPAService.class);
228        EntityManager em = jpaService.getEntityManager();
229        Query query = getUpdateQuery(namedQuery, jobBean, em);
230        int ret = jpaService.executeUpdate(namedQuery.name(), query, em);
231        return ret;
232    }
233
234    @Override
235    public CoordinatorActionBean get(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
236        CoordinatorActionBean bean = getIfExist(namedQuery, parameters);
237        if (bean == null) {
238            throw new JPAExecutorException(ErrorCode.E0605, getSelectQuery(namedQuery,
239                    Services.get().get(JPAService.class).getEntityManager(), parameters).toString());
240        }
241        return bean;
242    }
243
244    @Override
245    public CoordinatorActionBean getIfExist(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
246        JPAService jpaService = Services.get().get(JPAService.class);
247        EntityManager em = jpaService.getEntityManager();
248        Query query = getSelectQuery(namedQuery, em, parameters);
249        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
250        if (ret == null) {
251            return null;
252        }
253        CoordinatorActionBean bean = constructBean(namedQuery, ret);
254        return bean;
255    }
256
257    @Override
258    public List<CoordinatorActionBean> getList(CoordActionQuery namedQuery, Object... parameters)
259            throws JPAExecutorException {
260        JPAService jpaService = Services.get().get(JPAService.class);
261        EntityManager em = jpaService.getEntityManager();
262        Query query = getSelectQuery(namedQuery, em, parameters);
263        List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
264        List<CoordinatorActionBean> beanList = new ArrayList<CoordinatorActionBean>();
265        if (retList != null) {
266            for (Object ret : retList) {
267                beanList.add(constructBean(namedQuery, ret));
268            }
269        }
270        return beanList;
271    }
272
273    private CoordinatorActionBean constructBean(CoordActionQuery namedQuery, Object ret) throws JPAExecutorException {
274        CoordinatorActionBean bean;
275        Object[] arr;
276        switch (namedQuery) {
277            case GET_COORD_ACTION:
278                bean = (CoordinatorActionBean) ret;
279                break;
280            case GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME:
281                bean = new CoordinatorActionBean();
282                bean.setJobId((String) ret);
283                break;
284            case GET_COORD_ACTION_STATUS:
285                bean = new CoordinatorActionBean();
286                bean.setStatusStr((String)ret);
287                break;
288            case GET_COORD_ACTIONS_STATUS_UNIGNORED:
289                arr = (Object[]) ret;
290                bean = new CoordinatorActionBean();
291                bean.setStatusStr((String)arr[0]);
292                bean.setPending((Integer)arr[1]);
293                break;
294            case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE:
295            case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE:
296                arr = (Object[]) ret;
297                bean = new CoordinatorActionBean();
298                bean.setId((String)arr[0]);
299                bean.setNominalTime((Timestamp)arr[1]);
300                bean.setCreatedTime((Timestamp)arr[2]);
301                bean.setActionXmlBlob((StringBlob)arr[3]);
302                break;
303            case GET_TERMINATED_ACTIONS_FOR_DATES:
304                bean = (CoordinatorActionBean) ret;
305                break;
306            case GET_TERMINATED_ACTION_IDS_FOR_DATES:
307                bean = new CoordinatorActionBean();
308                bean.setId((String) ret);
309                break;
310            case GET_ACTIVE_ACTIONS_FOR_DATES:
311                arr = (Object[]) ret;
312                bean = new CoordinatorActionBean();
313                bean.setId((String)arr[0]);
314                bean.setJobId((String)arr[1]);
315                bean.setStatusStr((String) arr[2]);
316                bean.setExternalId((String) arr[3]);
317                bean.setPending((Integer) arr[4]);
318                bean.setNominalTime((Timestamp) arr[5]);
319                bean.setCreatedTime((Timestamp) arr[6]);
320                break;
321            case  GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN:
322                arr = (Object[]) ret;
323                bean = new CoordinatorActionBean();
324                bean.setId((String)arr[0]);
325                bean.setJobId((String)arr[1]);
326                bean.setStatusStr((String) arr[2]);
327                bean.setExternalId((String) arr[3]);
328                bean.setPending((Integer) arr[4]);
329                break;
330            case    GET_COORD_ACTIONS_WAITING_READY_SUBMITTED_OLDER_THAN:
331                arr = (Object[]) ret;
332                bean = new CoordinatorActionBean();
333                bean.setId((String)arr[0]);
334                bean.setJobId((String)arr[1]);
335                bean.setStatusStr((String) arr[2]);
336                bean.setExternalId((String) arr[3]);
337                bean.setPushMissingDependenciesBlob((StringBlob) arr[4]);
338                break;
339            case GET_COORD_ACTION_FOR_SLA:
340                arr = (Object[]) ret;
341                bean = new CoordinatorActionBean();
342                bean.setId((String) arr[0]);
343                bean.setJobId((String) arr[1]);
344                bean.setStatusStr((String) arr[2]);
345                bean.setExternalId((String) arr[3]);
346                bean.setLastModifiedTime((Timestamp) arr[4]);
347                break;
348            case GET_COORD_ACTION_FOR_INPUTCHECK:
349                arr = (Object[]) ret;
350                bean = new CoordinatorActionBean();
351                bean.setId((String) arr[0]);
352                bean.setActionNumber((Integer) arr[1]);
353                bean.setJobId((String) arr[2]);
354                bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[3]));
355                bean.setRunConfBlob((StringBlob) arr[4]);
356                bean.setNominalTime(DateUtils.toDate((Timestamp) arr[5]));
357                bean.setCreatedTime(DateUtils.toDate((Timestamp) arr[6]));
358                bean.setActionXmlBlob((StringBlob) arr[7]);
359                bean.setMissingDependenciesBlob((StringBlob) arr[8]);
360                bean.setPushMissingDependenciesBlob((StringBlob) arr[9]);
361                bean.setTimeOut((Integer) arr[10]);
362                bean.setExternalId((String) arr[11]);
363                break;
364
365            default:
366                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
367                        + namedQuery.name());
368        }
369        return bean;
370    }
371
372    @Override
373    public Object getSingleValue(CoordActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
374        JPAService jpaService = Services.get().get(JPAService.class);
375        EntityManager em = jpaService.getEntityManager();
376        Query query = getSelectQuery(namedQuery, em, parameters);
377        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
378        if (ret == null) {
379            throw new JPAExecutorException(ErrorCode.E0604, query.toString());
380        }
381        return ret;
382    }
383}