001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *      http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.oozie.executor.jpa;
019
020import java.sql.Timestamp;
021import java.util.ArrayList;
022import java.util.List;
023
024import javax.persistence.EntityManager;
025import javax.persistence.Query;
026
027import org.apache.oozie.ErrorCode;
028import org.apache.oozie.StringBlob;
029import org.apache.oozie.WorkflowActionBean;
030import org.apache.oozie.service.JPAService;
031import org.apache.oozie.service.Services;
032import org.apache.oozie.util.DateUtils;
033
034/**
035 * Query Executor that provides API to run query for Workflow Action
036 */
037public class WorkflowActionQueryExecutor extends
038        QueryExecutor<WorkflowActionBean, WorkflowActionQueryExecutor.WorkflowActionQuery> {
039
040    public enum WorkflowActionQuery {
041        UPDATE_ACTION,
042        UPDATE_ACTION_FOR_LAST_CHECKED_TIME,
043        UPDATE_ACTION_START,
044        UPDATE_ACTION_CHECK,
045        UPDATE_ACTION_END,
046        UPDATE_ACTION_PENDING,
047        UPDATE_ACTION_STATUS_PENDING,
048        UPDATE_ACTION_PENDING_TRANS,
049        UPDATE_ACTION_PENDING_TRANS_ERROR,
050        GET_ACTION,
051        GET_ACTION_ID_TYPE_LASTCHECK,
052        GET_ACTION_FAIL,
053        GET_ACTION_SIGNAL,
054        GET_ACTION_CHECK,
055        GET_ACTION_END,
056        GET_ACTION_COMPLETED,
057        GET_RUNNING_ACTIONS,
058        GET_PENDING_ACTIONS,
059        GET_ACTIONS_FOR_WORKFLOW_RERUN
060    };
061
062    private static WorkflowActionQueryExecutor instance = new WorkflowActionQueryExecutor();
063
064    private WorkflowActionQueryExecutor() {
065    }
066
067    public static QueryExecutor<WorkflowActionBean, WorkflowActionQuery> getInstance() {
068        return WorkflowActionQueryExecutor.instance;
069    }
070
071    @Override
072    public Query getUpdateQuery(WorkflowActionQuery namedQuery, WorkflowActionBean actionBean, EntityManager em)
073            throws JPAExecutorException {
074        Query query = em.createNamedQuery(namedQuery.name());
075        switch (namedQuery) {
076            case UPDATE_ACTION:
077                query.setParameter("conf", actionBean.getConfBlob());
078                query.setParameter("consoleUrl", actionBean.getConsoleUrl());
079                query.setParameter("data", actionBean.getDataBlob());
080                query.setParameter("stats", actionBean.getStatsBlob());
081                query.setParameter("externalChildIDs", actionBean.getExternalChildIDsBlob());
082                query.setParameter("errorCode", actionBean.getErrorCode());
083                query.setParameter("errorMessage", actionBean.getErrorMessage());
084                query.setParameter("externalId", actionBean.getExternalId());
085                query.setParameter("externalStatus", actionBean.getExternalStatus());
086                query.setParameter("name", actionBean.getName());
087                query.setParameter("cred", actionBean.getCred());
088                query.setParameter("retries", actionBean.getRetries());
089                query.setParameter("trackerUri", actionBean.getTrackerUri());
090                query.setParameter("transition", actionBean.getTransition());
091                query.setParameter("type", actionBean.getType());
092                query.setParameter("endTime", actionBean.getEndTimestamp());
093                query.setParameter("executionPath", actionBean.getExecutionPath());
094                query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp());
095                query.setParameter("logToken", actionBean.getLogToken());
096                query.setParameter("pending", actionBean.getPending());
097                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
098                query.setParameter("signalValue", actionBean.getSignalValue());
099                query.setParameter("slaXml", actionBean.getSlaXmlBlob());
100                query.setParameter("startTime", actionBean.getStartTimestamp());
101                query.setParameter("status", actionBean.getStatusStr());
102                query.setParameter("wfId", actionBean.getWfId());
103                query.setParameter("id", actionBean.getId());
104                break;
105            case UPDATE_ACTION_FOR_LAST_CHECKED_TIME:
106                query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp());
107                query.setParameter("id", actionBean.getId());
108                break;
109            case UPDATE_ACTION_PENDING:
110                query.setParameter("pending", actionBean.getPending());
111                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
112                query.setParameter("id", actionBean.getId());
113                break;
114            case UPDATE_ACTION_STATUS_PENDING:
115                query.setParameter("status", actionBean.getStatus().toString());
116                query.setParameter("pending", actionBean.getPending());
117                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
118                query.setParameter("id", actionBean.getId());
119                break;
120            case UPDATE_ACTION_PENDING_TRANS:
121                query.setParameter("transition", actionBean.getTransition());
122                query.setParameter("pending", actionBean.getPending());
123                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
124                query.setParameter("id", actionBean.getId());
125                break;
126            case UPDATE_ACTION_PENDING_TRANS_ERROR:
127                query.setParameter("transition", actionBean.getTransition());
128                query.setParameter("pending", actionBean.getPending());
129                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
130                query.setParameter("errorCode", actionBean.getErrorCode());
131                query.setParameter("errorMessage", actionBean.getErrorMessage());
132                query.setParameter("id", actionBean.getId());
133                break;
134            case UPDATE_ACTION_START:
135                query.setParameter("startTime", actionBean.getStartTimestamp());
136                query.setParameter("externalChildIDs", actionBean.getExternalChildIDsBlob());
137                query.setParameter("conf", actionBean.getConfBlob());
138                query.setParameter("errorCode", actionBean.getErrorCode());
139                query.setParameter("errorMessage", actionBean.getErrorMessage());
140                query.setParameter("externalId", actionBean.getExternalId());
141                query.setParameter("trackerUri", actionBean.getTrackerUri());
142                query.setParameter("consoleUrl", actionBean.getConsoleUrl());
143                query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp());
144                query.setParameter("status", actionBean.getStatus().toString());
145                query.setParameter("externalStatus", actionBean.getExternalStatus());
146                query.setParameter("data", actionBean.getDataBlob());
147                query.setParameter("retries", actionBean.getRetries());
148                query.setParameter("pending", actionBean.getPending());
149                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
150                query.setParameter("userRetryCount", actionBean.getUserRetryCount());
151                query.setParameter("id", actionBean.getId());
152                break;
153            case UPDATE_ACTION_CHECK:
154                query.setParameter("externalChildIDs", actionBean.getExternalChildIDsBlob());
155                query.setParameter("externalStatus", actionBean.getExternalStatus());
156                query.setParameter("status", actionBean.getStatus().toString());
157                query.setParameter("data", actionBean.getDataBlob());
158                query.setParameter("pending", actionBean.getPending());
159                query.setParameter("errorCode", actionBean.getErrorCode());
160                query.setParameter("errorMessage", actionBean.getErrorMessage());
161                query.setParameter("lastCheckTime", actionBean.getLastCheckTimestamp());
162                query.setParameter("retries", actionBean.getRetries());
163                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
164                query.setParameter("startTime", actionBean.getStartTimestamp());
165                query.setParameter("stats", actionBean.getStatsBlob());
166                query.setParameter("userRetryCount", actionBean.getUserRetryCount());
167                query.setParameter("id", actionBean.getId());
168                break;
169            case UPDATE_ACTION_END:
170                query.setParameter("errorCode", actionBean.getErrorCode());
171                query.setParameter("errorMessage", actionBean.getErrorMessage());
172                query.setParameter("retries", actionBean.getRetries());
173                query.setParameter("status", actionBean.getStatus().toString());
174                query.setParameter("endTime", actionBean.getEndTimestamp());
175                query.setParameter("pending", actionBean.getPending());
176                query.setParameter("pendingAge", actionBean.getPendingAgeTimestamp());
177                query.setParameter("signalValue", actionBean.getSignalValue());
178                query.setParameter("userRetryCount", actionBean.getUserRetryCount());
179                query.setParameter("externalStatus", actionBean.getExternalStatus());
180                query.setParameter("stats", actionBean.getStatsBlob());
181                query.setParameter("id", actionBean.getId());
182                break;
183            default:
184                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
185                        + namedQuery.name());
186        }
187        return query;
188    }
189
190    @Override
191    public Query getSelectQuery(WorkflowActionQuery namedQuery, EntityManager em, Object... parameters)
192            throws JPAExecutorException {
193        Query query = em.createNamedQuery(namedQuery.name());
194        switch (namedQuery) {
195            case GET_ACTION:
196            case GET_ACTION_ID_TYPE_LASTCHECK:
197            case GET_ACTION_FAIL:
198            case GET_ACTION_SIGNAL:
199            case GET_ACTION_CHECK:
200            case GET_ACTION_END:
201            case GET_ACTION_COMPLETED:
202                query.setParameter("id", parameters[0]);
203                break;
204            case GET_RUNNING_ACTIONS:
205                Timestamp ts = new Timestamp(System.currentTimeMillis() - (Integer) parameters[0] * 1000);
206                query.setParameter("lastCheckTime", ts);
207                break;
208            case GET_PENDING_ACTIONS:
209                Long minimumPendingAgeSecs = (Long) parameters[0];
210                Timestamp pts = new Timestamp(System.currentTimeMillis() - minimumPendingAgeSecs * 1000);
211                query.setParameter("pendingAge", pts);
212                break;
213            case GET_ACTIONS_FOR_WORKFLOW_RERUN:
214                query.setParameter("wfId", parameters[0]);
215                break;
216            default:
217                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for "
218                        + namedQuery.name());
219        }
220        return query;
221    }
222
223    @Override
224    public int executeUpdate(WorkflowActionQuery namedQuery, WorkflowActionBean actionBean) throws JPAExecutorException {
225        JPAService jpaService = Services.get().get(JPAService.class);
226        EntityManager em = jpaService.getEntityManager();
227        Query query = getUpdateQuery(namedQuery, actionBean, em);
228        int ret = jpaService.executeUpdate(namedQuery.name(), query, em);
229        return ret;
230    }
231
232    private WorkflowActionBean constructBean(WorkflowActionQuery namedQuery, Object ret) throws JPAExecutorException {
233        WorkflowActionBean bean;
234        Object[] arr;
235        switch (namedQuery) {
236            case GET_ACTION:
237                bean = (WorkflowActionBean) ret;
238                break;
239            case GET_ACTION_ID_TYPE_LASTCHECK:
240                bean = new WorkflowActionBean();
241                arr = (Object[]) ret;
242                bean.setId((String) arr[0]);
243                bean.setType((String) arr[1]);
244                bean.setLastCheckTime(DateUtils.toDate((Timestamp) arr[2]));
245                break;
246            case GET_ACTION_FAIL:
247                bean = new WorkflowActionBean();
248                arr = (Object[]) ret;
249                bean.setId((String) arr[0]);
250                bean.setJobId((String) arr[1]);
251                bean.setName((String) arr[2]);
252                bean.setStatusStr((String) arr[3]);
253                bean.setPending((Integer) arr[4]);
254                bean.setType((String) arr[5]);
255                bean.setLogToken((String) arr[6]);
256                bean.setTransition((String) arr[7]);
257                bean.setErrorInfo((String) arr[8], (String) arr[9]);
258                break;
259            case GET_ACTION_SIGNAL:
260                bean = new WorkflowActionBean();
261                arr = (Object[]) ret;
262                bean.setId((String) arr[0]);
263                bean.setJobId((String) arr[1]);
264                bean.setName((String) arr[2]);
265                bean.setStatusStr((String) arr[3]);
266                bean.setPending((Integer) arr[4]);
267                bean.setPendingAge(DateUtils.toDate((Timestamp) arr[5]));
268                bean.setType((String) arr[6]);
269                bean.setLogToken((String) arr[7]);
270                bean.setTransition((String) arr[8]);
271                bean.setErrorInfo((String) arr[9], (String) arr[10]);
272                bean.setExecutionPath((String) arr[11]);
273                bean.setSignalValue((String) arr[12]);
274                bean.setSlaXmlBlob((StringBlob) arr[13]);
275                break;
276            case GET_ACTION_CHECK:
277                bean = new WorkflowActionBean();
278                arr = (Object[]) ret;
279                bean.setId((String) arr[0]);
280                bean.setJobId((String) arr[1]);
281                bean.setName((String) arr[2]);
282                bean.setStatusStr((String) arr[3]);
283                bean.setPending((Integer) arr[4]);
284                bean.setPendingAge(DateUtils.toDate((Timestamp) arr[5]));
285                bean.setType((String) arr[6]);
286                bean.setLogToken((String) arr[7]);
287                bean.setTransition((String) arr[8]);
288                bean.setRetries((Integer) arr[9]);
289                bean.setUserRetryCount((Integer) arr[10]);
290                bean.setUserRetryMax((Integer) arr[11]);
291                bean.setUserRetryInterval((Integer) arr[12]);
292                bean.setTrackerUri((String) arr[13]);
293                bean.setStartTime(DateUtils.toDate((Timestamp) arr[14]));
294                bean.setEndTime(DateUtils.toDate((Timestamp) arr[15]));
295                bean.setLastCheckTime(DateUtils.toDate((Timestamp) arr[16]));
296                bean.setErrorInfo((String) arr[17], (String) arr[18]);
297                bean.setExternalId((String) arr[19]);
298                bean.setExternalStatus((String) arr[20]);
299                bean.setExternalChildIDsBlob((StringBlob) arr[21]);
300                bean.setConfBlob((StringBlob) arr[22]);
301                break;
302            case GET_ACTION_END:
303                bean = new WorkflowActionBean();
304                arr = (Object[]) ret;
305                bean.setId((String) arr[0]);
306                bean.setJobId((String) arr[1]);
307                bean.setName((String) arr[2]);
308                bean.setStatusStr((String) arr[3]);
309                bean.setPending((Integer) arr[4]);
310                bean.setPendingAge(DateUtils.toDate((Timestamp) arr[5]));
311                bean.setType((String) arr[6]);
312                bean.setLogToken((String) arr[7]);
313                bean.setTransition((String) arr[8]);
314                bean.setRetries((Integer) arr[9]);
315                bean.setTrackerUri((String) arr[10]);
316                bean.setUserRetryCount((Integer) arr[11]);
317                bean.setUserRetryMax((Integer) arr[12]);
318                bean.setUserRetryInterval((Integer) arr[13]);
319                bean.setStartTime(DateUtils.toDate((Timestamp) arr[14]));
320                bean.setEndTime(DateUtils.toDate((Timestamp) arr[15]));
321                bean.setErrorInfo((String) arr[16], (String) arr[17]);
322                bean.setExternalId((String) arr[18]);
323                bean.setExternalStatus((String) arr[19]);
324                bean.setExternalChildIDsBlob((StringBlob) arr[20]);
325                bean.setConfBlob((StringBlob) arr[21]);
326                bean.setDataBlob((StringBlob) arr[22]);
327                bean.setStatsBlob((StringBlob) arr[23]);
328                break;
329            case GET_ACTION_COMPLETED:
330                bean = new WorkflowActionBean();
331                arr = (Object[]) ret;
332                bean.setId((String) arr[0]);
333                bean.setJobId((String) arr[1]);
334                bean.setStatusStr((String) arr[2]);
335                bean.setType((String) arr[3]);
336                bean.setLogToken((String) arr[4]);
337                break;
338            case GET_RUNNING_ACTIONS:
339                bean = new WorkflowActionBean();
340                bean.setId((String)ret);
341                break;
342            case GET_PENDING_ACTIONS:
343                bean = new WorkflowActionBean();
344                arr = (Object[]) ret;
345                bean.setId((String) arr[0]);
346                bean.setJobId((String) arr[1]);
347                bean.setStatusStr((String) arr[2]);
348                bean.setType((String) arr[3]);
349                bean.setPendingAge(DateUtils.toDate((Timestamp) arr[4]));
350                break;
351            case GET_ACTIONS_FOR_WORKFLOW_RERUN:
352                bean = new WorkflowActionBean();
353                arr = (Object[]) ret;
354                bean.setId((String) arr[0]);
355                bean.setName((String) arr[1]);
356                bean.setStatusStr((String) arr[2]);
357                bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
358                break;
359            default:
360                throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for "
361                        + namedQuery.name());
362        }
363        return bean;
364    }
365
366    @Override
367    public WorkflowActionBean get(WorkflowActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
368        JPAService jpaService = Services.get().get(JPAService.class);
369        EntityManager em = jpaService.getEntityManager();
370        Query query = getSelectQuery(namedQuery, em, parameters);
371        Object ret = jpaService.executeGet(namedQuery.name(), query, em);
372        if (ret == null) {
373            throw new JPAExecutorException(ErrorCode.E0605, query.toString());
374        }
375        WorkflowActionBean bean = constructBean(namedQuery, ret);
376        return bean;
377    }
378
379    @Override
380    public List<WorkflowActionBean> getList(WorkflowActionQuery namedQuery, Object... parameters)
381            throws JPAExecutorException {
382        JPAService jpaService = Services.get().get(JPAService.class);
383        EntityManager em = jpaService.getEntityManager();
384        Query query = getSelectQuery(namedQuery, em, parameters);
385        List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em);
386        List<WorkflowActionBean> beanList = new ArrayList<WorkflowActionBean>();
387        if (retList != null) {
388            for (Object ret : retList) {
389                beanList.add(constructBean(namedQuery, ret));
390            }
391        }
392        return beanList;
393    }
394
395    @Override
396    public Object getSingleValue(WorkflowActionQuery namedQuery, Object... parameters) throws JPAExecutorException {
397        throw new UnsupportedOperationException();
398    }
399}