001/* 
002    Licensed to the Apache Software Foundation (ASF) under one
003    or more contributor license agreements.  See the NOTICE file
004    distributed with this work for additional information
005    regarding copyright ownership.  The ASF licenses this file
006    to you under the Apache License, Version 2.0 (the
007    "License"); you may not use this file except in compliance
008    with the License.  You may obtain a copy of the License at
009
010       http://www.apache.org/licenses/LICENSE-2.0
011
012    Unless required by applicable law or agreed to in writing,
013    software distributed under the License is distributed on an
014    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015    KIND, either express or implied.  See the License for the
016    specific language governing permissions and limitations
017    under the License.  
018 */
019package org.apache.wiki.workflow;
020
021import org.apache.commons.collections4.queue.CircularFifoQueue;
022import org.apache.commons.lang3.time.StopWatch;
023import org.apache.logging.log4j.LogManager;
024import org.apache.logging.log4j.Logger;
025import org.apache.wiki.api.core.Context;
026import org.apache.wiki.api.core.Engine;
027import org.apache.wiki.api.core.Session;
028import org.apache.wiki.api.exceptions.WikiException;
029import org.apache.wiki.auth.AuthorizationManager;
030import org.apache.wiki.auth.acl.UnresolvedPrincipal;
031import org.apache.wiki.event.WikiEvent;
032import org.apache.wiki.event.WikiEventEmitter;
033import org.apache.wiki.event.WorkflowEvent;
034import org.apache.wiki.util.TextUtil;
035
036import java.io.*;
037import java.nio.file.Files;
038import java.security.Principal;
039import java.util.*;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.CopyOnWriteArrayList;
042
043
044/**
045 * <p>
046 * Monitor class that tracks running Workflows. The WorkflowManager also keeps track of the names of
047 * users or groups expected to approve particular Workflows.
048 * </p>
049 */
050public class DefaultWorkflowManager implements WorkflowManager {
051
052    private static final Logger LOG = LogManager.getLogger( DefaultWorkflowManager.class );
053    static final String SERIALIZATION_FILE = "wkflmgr.ser";
054
055    /** We use this also a generic serialization id */
056    private static final long serialVersionUID = 6L;
057
058    DecisionQueue m_queue;
059    Set< Workflow > m_workflows;
060    final Map< String, Principal > m_approvers;
061    Queue< Workflow > m_completed;
062    private Engine m_engine;
063    private int retainCompleted;
064
065    /**
066     * Constructs a new WorkflowManager, with an empty workflow cache.
067     */
068    public DefaultWorkflowManager() {
069        m_workflows = ConcurrentHashMap.newKeySet();
070        m_approvers = new ConcurrentHashMap<>();
071        m_queue = new DecisionQueue();
072        WikiEventEmitter.attach( this );
073    }
074
075    /**
076     * {@inheritDoc}
077     */
078    @Override
079    public Set< Workflow > getWorkflows() {
080        final Set< Workflow > workflows = ConcurrentHashMap.newKeySet();
081        workflows.addAll( m_workflows );
082        return workflows;
083    }
084
085    /**
086     * {@inheritDoc}
087     */
088    @Override
089    public List< Workflow > getCompletedWorkflows() {
090        return new CopyOnWriteArrayList< >( m_completed );
091    }
092
093    /**
094     * {@inheritDoc}
095     *
096     * Any properties that begin with {@link #PROPERTY_APPROVER_PREFIX} will be assumed to be Decisions that require approval. For a given
097     * property key, everything after the prefix denotes the Decision's message key. The property value indicates the Principal (Role,
098     * GroupPrincipal, WikiPrincipal) that must approve the Decision. For example, if the property key/value pair is
099     * {@code jspwiki.approver.workflow.saveWikiPage=Admin}, the Decision's message key is <code>workflow.saveWikiPage</code>. The Principal
100     * <code>Admin</code> will be resolved via {@link org.apache.wiki.auth.AuthorizationManager#resolvePrincipal(String)}.
101     */
102    @Override
103    public void initialize( final Engine engine, final Properties props ) {
104        m_engine = engine;
105        retainCompleted = TextUtil.getIntegerProperty( engine.getWikiProperties(), "jspwiki.workflow.completed.retain", 2048 );
106        m_completed = new CircularFifoQueue<>( retainCompleted );
107
108        // Identify the workflows requiring approvals
109        for( final Object o : props.keySet() ) {
110            final String prop = ( String )o;
111            if( prop.startsWith( PROPERTY_APPROVER_PREFIX ) ) {
112                // For the key, everything after the prefix is the workflow name
113                final String key = prop.substring( PROPERTY_APPROVER_PREFIX.length() );
114                if( key.length() > 0 ) {
115                    // Only use non-null/non-blank approvers
116                    final String approver = props.getProperty( prop );
117                    if( approver != null && !approver.isEmpty() ) {
118                        m_approvers.put( key, new UnresolvedPrincipal( approver ) );
119                    }
120                }
121            }
122        }
123
124        unserializeFromDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) );
125    }
126
127    /**
128     *  Reads the serialized data from the disk back to memory.
129     *
130     * @return the date when the data was last written on disk or {@code 0} if there has been problems reading from disk.
131     */
132    @SuppressWarnings( "unchecked" )
133    synchronized long unserializeFromDisk( final File f ) {
134        long saved = 0L;
135        final StopWatch sw = new StopWatch();
136        sw.start();
137        try( final ObjectInputStream in = new ObjectInputStream( new BufferedInputStream( Files.newInputStream( f.toPath() ) ) ) ) {
138            final long ver = in.readLong();
139            if( ver != serialVersionUID ) {
140                LOG.warn( "File format has changed; Unable to recover workflows and decision queue from disk." );
141            } else {
142                saved        = in.readLong();
143                m_workflows  = ( Set< Workflow > )in.readObject();
144                m_queue      = ( DecisionQueue )in.readObject();
145                m_completed = new CircularFifoQueue<>( retainCompleted );
146                m_completed.addAll( ( Collection< Workflow > )in.readObject() );
147                LOG.debug( "Read serialized data successfully in " + sw );
148            }
149        } catch( final IOException | ClassNotFoundException e ) {
150            LOG.warn( "unable to recover from disk workflows and decision queue: " + e.getMessage() );
151        }
152        sw.stop();
153
154        return saved;
155    }
156
157    /**
158     *  Serializes workflows and decisionqueue to disk.  The format is private, don't touch it.
159     */
160    synchronized void serializeToDisk( final File f ) {
161        try( final ObjectOutputStream out = new ObjectOutputStream( new BufferedOutputStream( Files.newOutputStream( f.toPath() ) ) ) ) {
162            final StopWatch sw = new StopWatch();
163            sw.start();
164
165            out.writeLong( serialVersionUID );
166            out.writeLong( System.currentTimeMillis() ); // Timestamp
167            out.writeObject( m_workflows );
168            out.writeObject( m_queue );
169            out.writeObject( m_completed );
170
171            sw.stop();
172
173            LOG.debug( "serialization done - took " + sw );
174        } catch( final IOException ioe ) {
175            LOG.error( "Unable to serialize!", ioe );
176        }
177    }
178
179    /**
180     * {@inheritDoc}
181     */
182    @Override
183    public boolean requiresApproval( final String messageKey ) {
184        return  m_approvers.containsKey( messageKey );
185    }
186
187    /**
188     * {@inheritDoc}
189     */
190    @Override
191    public Principal getApprover( final String messageKey ) throws WikiException {
192        Principal approver = m_approvers.get( messageKey );
193        if ( approver == null ) {
194            throw new WikiException( "Workflow '" + messageKey + "' does not require approval." );
195        }
196
197        // Try to resolve UnresolvedPrincipals
198        if ( approver instanceof UnresolvedPrincipal ) {
199            final String name = approver.getName();
200            approver = m_engine.getManager( AuthorizationManager.class ).resolvePrincipal( name );
201
202            // If still unresolved, throw exception; otherwise, freshen our cache
203            if ( approver instanceof UnresolvedPrincipal ) {
204                throw new WikiException( "Workflow approver '" + name + "' cannot not be resolved." );
205            }
206
207            m_approvers.put( messageKey, approver );
208        }
209        return approver;
210    }
211
212    /**
213     * Protected helper method that returns the associated Engine
214     *
215     * @return the wiki engine
216     */
217    protected Engine getEngine() {
218        if ( m_engine == null ) {
219            throw new IllegalStateException( "Engine cannot be null; please initialize WorkflowManager first." );
220        }
221        return m_engine;
222    }
223
224    /**
225     * Returns the DecisionQueue associated with this WorkflowManager
226     *
227     * @return the decision queue
228     */
229    @Override
230    public DecisionQueue getDecisionQueue() {
231        return m_queue;
232    }
233
234    /**
235     * {@inheritDoc}
236     */
237    @Override
238    public List< Workflow > getOwnerWorkflows( final Session session ) {
239        final List< Workflow > workflows = new ArrayList<>();
240        if ( session.isAuthenticated() ) {
241            final Principal[] sessionPrincipals = session.getPrincipals();
242            for( final Workflow w : m_workflows ) {
243                final Principal owner = w.getOwner();
244                for ( final Principal sessionPrincipal : sessionPrincipals ) {
245                    if ( sessionPrincipal.equals( owner ) ) {
246                        workflows.add( w );
247                        break;
248                    }
249                }
250            }
251        }
252        return workflows;
253    }
254
255    /**
256     * Listens for {@link WorkflowEvent} objects emitted by Workflows. In particular, this method listens for {@link WorkflowEvent#CREATED},
257     * {@link WorkflowEvent#ABORTED}, {@link WorkflowEvent#COMPLETED} and {@link WorkflowEvent#DQ_REMOVAL} events. If a workflow is created,
258     * it is automatically added to the cache. If one is aborted or completed, it is automatically removed. If a removal from decision queue
259     * is issued, the current step from workflow, which is assumed to be a {@link Decision}, is removed from the {@link DecisionQueue}.
260     * 
261     * @param event the event passed to this listener
262     */
263    @Override
264    public void actionPerformed( final WikiEvent event ) {
265        if( event instanceof WorkflowEvent ) {
266            if( event.getSrc() instanceof Workflow ) {
267                final Workflow workflow = event.getSrc();
268                switch( event.getType() ) {
269                // Remove from manager
270                case WorkflowEvent.ABORTED   :
271                case WorkflowEvent.COMPLETED : remove( workflow ); break;
272                // Add to manager
273                case WorkflowEvent.CREATED   : add( workflow ); break;
274                default: break;
275                }
276            } else if( event.getSrc() instanceof Decision ) {
277                final Decision decision = event.getSrc();
278                switch( event.getType() ) {
279                // Add to DecisionQueue
280                case WorkflowEvent.DQ_ADDITION : addToDecisionQueue( decision ); break;
281                // Remove from DecisionQueue
282                case WorkflowEvent.DQ_REMOVAL  : removeFromDecisionQueue( decision, event.getArg( 0, Context.class ) ); break;
283                default: break;
284                }
285            }
286            serializeToDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) );
287        }
288    }
289
290    /**
291     * Protected helper method that adds a newly created Workflow to the cache, and sets its {@code workflowManager} and
292     * {@code Id} properties if not set.
293     *
294     * @param workflow the workflow to add
295     */
296    protected void add( final Workflow workflow ) {
297        m_workflows.add( workflow );
298    }
299
300    /**
301     * Protected helper method that removes a specified Workflow from the cache, and moves it to the workflow history list. This method
302     * defensively checks to see if the workflow has not yet been removed.
303     *
304     * @param workflow the workflow to remove
305     */
306    protected void remove( final Workflow workflow ) {
307        if( m_workflows.contains( workflow ) ) {
308            m_workflows.remove( workflow );
309            m_completed.add( workflow );
310        }
311    }
312
313    protected void removeFromDecisionQueue( final Decision decision, final Context context ) {
314        // If current workflow is waiting for input, restart it and remove Decision from DecisionQueue
315        final int workflowId = decision.getWorkflowId();
316        final Optional< Workflow > optw = m_workflows.stream().filter( w -> w.getId() == workflowId ).findAny();
317        if( optw.isPresent() ) {
318            final Workflow w = optw.get();
319            if( w.getCurrentState() == Workflow.WAITING && decision.equals( w.getCurrentStep() ) ) {
320                getDecisionQueue().remove( decision );
321                // Restart workflow
322                try {
323                    w.restart( context );
324                } catch( final WikiException e ) {
325                    LOG.error( "restarting workflow #" + w.getId() + " caused " + e.getMessage(), e );
326                }
327            }
328        }
329    }
330
331    protected void addToDecisionQueue( final Decision decision ) {
332        getDecisionQueue().add( decision );
333    }
334
335}