001    /**
002     * Licensed to the Apache Software Foundation (ASF) under one
003     * or more contributor license agreements.  See the NOTICE file
004     * distributed with this work for additional information
005     * regarding copyright ownership.  The ASF licenses this file
006     * to you under the Apache License, Version 2.0 (the
007     * "License"); you may not use this file except in compliance
008     * with the License.  You may obtain a copy of the License at
009     *
010     *      http://www.apache.org/licenses/LICENSE-2.0
011     *
012     * Unless required by applicable law or agreed to in writing, software
013     * distributed under the License is distributed on an "AS IS" BASIS,
014     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015     * See the License for the specific language governing permissions and
016     * limitations under the License.
017     */
018    package org.apache.oozie.executor.jpa;
019    
020    import java.util.Collection;
021    import java.util.Date;
022    
023    import javax.persistence.EntityManager;
024    import javax.persistence.Query;
025    
026    import org.apache.oozie.CoordinatorActionBean;
027    import org.apache.oozie.ErrorCode;
028    import org.apache.oozie.FaultInjection;
029    import org.apache.oozie.client.rest.JsonBean;
030    import org.apache.oozie.util.ParamChecker;
031    
032    /**
033     * Class for inserting and updating beans in bulk
034     *
035     */
036    public class BulkUpdateInsertForCoordActionStatusJPAExecutor implements JPAExecutor<Void> {
037    
038        private Collection<JsonBean> updateList;
039        private Collection<JsonBean> insertList;
040    
041        /**
042         * Initialize the JPAExecutor using the update and insert list of JSON beans
043         *
044         * @param updateList
045         */
046        public BulkUpdateInsertForCoordActionStatusJPAExecutor(Collection<JsonBean> updateList,
047                Collection<JsonBean> insertList) {
048            this.updateList = updateList;
049            this.insertList = insertList;
050        }
051    
052        public BulkUpdateInsertForCoordActionStatusJPAExecutor() {
053        }
054    
055        /**
056         * Sets the update list for JSON bean
057         *
058         * @param updateList
059         */
060        public void setUpdateList(Collection<JsonBean> updateList) {
061            this.updateList = updateList;
062        }
063    
064        /**
065         * Sets the insert list for JSON bean
066         *
067         * @param insertList
068         */
069        public void setInsertList(Collection<JsonBean> insertList) {
070            this.insertList = insertList;
071        }
072    
073        /*
074         * (non-Javadoc)
075         *
076         * @see org.apache.oozie.executor.jpa.JPAExecutor#getName()
077         */
078        @Override
079        public String getName() {
080            return "BulkUpdateInsertForCoordActionStatusJPAExecutor";
081        }
082    
083        /*
084         * (non-Javadoc)
085         *
086         * @see org.apache.oozie.executor.jpa.JPAExecutor#execute(javax.persistence.
087         * EntityManager)
088         */
089        @Override
090        public Void execute(EntityManager em) throws JPAExecutorException {
091            try {
092                if (insertList != null) {
093                    for (JsonBean entity : insertList) {
094                        ParamChecker.notNull(entity, "JsonBean");
095                        em.persist(entity);
096                    }
097                }
098                // Only used by test cases to check for rollback of transaction
099                FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection");
100                if (updateList != null) {
101                    for (JsonBean entity : updateList) {
102                        ParamChecker.notNull(entity, "JsonBean");
103                        if (entity instanceof CoordinatorActionBean) {
104                            CoordinatorActionBean action = (CoordinatorActionBean) entity;
105                            Query q = em.createNamedQuery("UPDATE_COORD_ACTION_STATUS_PENDING_TIME");
106                            q.setParameter("id", action.getId());
107                            q.setParameter("status", action.getStatus().toString());
108                            q.setParameter("pending", action.getPending());
109                            q.setParameter("lastModifiedTime", new Date());
110                            q.executeUpdate();
111                        }
112                        else {
113                            em.merge(entity);
114                        }
115                    }
116                }
117                // Since the return type is Void, we have to return null
118                return null;
119            }
120            catch (Exception e) {
121                throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e);
122            }
123        }
124    }