001/* 
002    Licensed to the Apache Software Foundation (ASF) under one
003    or more contributor license agreements.  See the NOTICE file
004    distributed with this work for additional information
005    regarding copyright ownership.  The ASF licenses this file
006    to you under the Apache License, Version 2.0 (the
007    "License"); you may not use this file except in compliance
008    with the License.  You may obtain a copy of the License at
009
010       http://www.apache.org/licenses/LICENSE-2.0
011
012    Unless required by applicable law or agreed to in writing,
013    software distributed under the License is distributed on an
014    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015    KIND, either express or implied.  See the License for the
016    specific language governing permissions and limitations
017    under the License.  
018 */
019package org.apache.wiki.workflow;
020
021import org.apache.commons.collections4.queue.CircularFifoQueue;
022import org.apache.commons.lang3.time.StopWatch;
023import org.apache.log4j.Logger;
024import org.apache.wiki.api.core.Context;
025import org.apache.wiki.api.core.Engine;
026import org.apache.wiki.api.core.Session;
027import org.apache.wiki.api.exceptions.WikiException;
028import org.apache.wiki.auth.AuthorizationManager;
029import org.apache.wiki.auth.acl.UnresolvedPrincipal;
030import org.apache.wiki.event.WikiEvent;
031import org.apache.wiki.event.WikiEventEmitter;
032import org.apache.wiki.event.WorkflowEvent;
033import org.apache.wiki.util.TextUtil;
034
035import java.io.*;
036import java.security.Principal;
037import java.util.*;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.CopyOnWriteArrayList;
040
041
042/**
043 * <p>
044 * Monitor class that tracks running Workflows. The WorkflowManager also keeps track of the names of
045 * users or groups expected to approve particular Workflows.
046 * </p>
047 */
048public class DefaultWorkflowManager implements WorkflowManager {
049
050    private static final Logger LOG = Logger.getLogger( DefaultWorkflowManager.class );
051    static final String SERIALIZATION_FILE = "wkflmgr.ser";
052
053    /** We use this also a generic serialization id */
054    private static final long serialVersionUID = 6L;
055
056    DecisionQueue m_queue;
057    Set< Workflow > m_workflows;
058    final Map< String, Principal > m_approvers;
059    Queue< Workflow > m_completed;
060    private Engine m_engine = null;
061    private int retainCompleted;
062
063    /**
064     * Constructs a new WorkflowManager, with an empty workflow cache.
065     */
066    public DefaultWorkflowManager() {
067        m_workflows = ConcurrentHashMap.newKeySet();
068        m_approvers = new ConcurrentHashMap<>();
069        m_queue = new DecisionQueue();
070        WikiEventEmitter.attach( this );
071    }
072
073    /**
074     * {@inheritDoc}
075     */
076    @Override
077    public Set< Workflow > getWorkflows() {
078        final Set< Workflow > workflows = ConcurrentHashMap.newKeySet();
079        workflows.addAll( m_workflows );
080        return workflows;
081    }
082
083    /**
084     * {@inheritDoc}
085     */
086    @Override
087    public List< Workflow > getCompletedWorkflows() {
088        return new CopyOnWriteArrayList< >( m_completed );
089    }
090
091    /**
092     * {@inheritDoc}
093     *
094     * Any properties that begin with {@link #PROPERTY_APPROVER_PREFIX} will be assumed to be Decisions that require approval. For a given
095     * property key, everything after the prefix denotes the Decision's message key. The property value indicates the Principal (Role,
096     * GroupPrincipal, WikiPrincipal) that must approve the Decision. For example, if the property key/value pair is
097     * {@code jspwiki.approver.workflow.saveWikiPage=Admin}, the Decision's message key is <code>workflow.saveWikiPage</code>. The Principal
098     * <code>Admin</code> will be resolved via {@link org.apache.wiki.auth.AuthorizationManager#resolvePrincipal(String)}.
099     */
100    @Override
101    public void initialize( final Engine engine, final Properties props ) {
102        m_engine = engine;
103        retainCompleted = TextUtil.getIntegerProperty( engine.getWikiProperties(), "jspwiki.workflow.completed.retain", 2048 );
104        m_completed = new CircularFifoQueue<>( retainCompleted );
105
106        // Identify the workflows requiring approvals
107        for( final Object o : props.keySet() ) {
108            final String prop = ( String )o;
109            if( prop.startsWith( PROPERTY_APPROVER_PREFIX ) ) {
110                // For the key, everything after the prefix is the workflow name
111                final String key = prop.substring( PROPERTY_APPROVER_PREFIX.length() );
112                if( key.length() > 0 ) {
113                    // Only use non-null/non-blank approvers
114                    final String approver = props.getProperty( prop );
115                    if( approver != null && approver.length() > 0 ) {
116                        m_approvers.put( key, new UnresolvedPrincipal( approver ) );
117                    }
118                }
119            }
120        }
121
122        unserializeFromDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) );
123    }
124
125    /**
126     *  Reads the serialized data from the disk back to memory.
127     *
128     * @return the date when the data was last written on disk or {@code 0} if there has been problems reading from disk.
129     */
130    @SuppressWarnings( "unchecked" )
131    synchronized long unserializeFromDisk( final File f ) {
132        long saved = 0L;
133        final StopWatch sw = new StopWatch();
134        sw.start();
135        try( final ObjectInputStream in = new ObjectInputStream( new BufferedInputStream( new FileInputStream( f ) ) ) ) {
136            final long ver = in.readLong();
137            if( ver != serialVersionUID ) {
138                LOG.warn( "File format has changed; Unable to recover workflows and decision queue from disk." );
139            } else {
140                saved        = in.readLong();
141                m_workflows  = ( Set< Workflow > )in.readObject();
142                m_queue      = ( DecisionQueue )in.readObject();
143                m_completed = new CircularFifoQueue<>( retainCompleted );
144                m_completed.addAll( ( Collection< Workflow > )in.readObject() );
145                LOG.debug( "Read serialized data successfully in " + sw );
146            }
147        } catch( final IOException | ClassNotFoundException e ) {
148            LOG.warn( "unable to recover from disk workflows and decision queue: " + e.getMessage() );
149        }
150        sw.stop();
151
152        return saved;
153    }
154
155    /**
156     *  Serializes workflows and decisionqueue to disk.  The format is private, don't touch it.
157     */
158    synchronized void serializeToDisk( final File f ) {
159        try( final ObjectOutputStream out = new ObjectOutputStream( new BufferedOutputStream( new FileOutputStream( f ) ) ) ) {
160            final StopWatch sw = new StopWatch();
161            sw.start();
162
163            out.writeLong( serialVersionUID );
164            out.writeLong( System.currentTimeMillis() ); // Timestamp
165            out.writeObject( m_workflows );
166            out.writeObject( m_queue );
167            out.writeObject( m_completed );
168
169            sw.stop();
170
171            LOG.debug( "serialization done - took " + sw );
172        } catch( final IOException ioe ) {
173            LOG.error( "Unable to serialize!", ioe );
174        }
175    }
176
177    /**
178     * {@inheritDoc}
179     */
180    @Override
181    public boolean requiresApproval( final String messageKey ) {
182        return  m_approvers.containsKey( messageKey );
183    }
184
185    /**
186     * {@inheritDoc}
187     */
188    @Override
189    public Principal getApprover( final String messageKey ) throws WikiException {
190        Principal approver = m_approvers.get( messageKey );
191        if ( approver == null ) {
192            throw new WikiException( "Workflow '" + messageKey + "' does not require approval." );
193        }
194
195        // Try to resolve UnresolvedPrincipals
196        if ( approver instanceof UnresolvedPrincipal ) {
197            final String name = approver.getName();
198            approver = m_engine.getManager( AuthorizationManager.class ).resolvePrincipal( name );
199
200            // If still unresolved, throw exception; otherwise, freshen our cache
201            if ( approver instanceof UnresolvedPrincipal ) {
202                throw new WikiException( "Workflow approver '" + name + "' cannot not be resolved." );
203            }
204
205            m_approvers.put( messageKey, approver );
206        }
207        return approver;
208    }
209
210    /**
211     * Protected helper method that returns the associated Engine
212     *
213     * @return the wiki engine
214     */
215    protected Engine getEngine() {
216        if ( m_engine == null ) {
217            throw new IllegalStateException( "Engine cannot be null; please initialize WorkflowManager first." );
218        }
219        return m_engine;
220    }
221
222    /**
223     * Returns the DecisionQueue associated with this WorkflowManager
224     *
225     * @return the decision queue
226     */
227    @Override
228    public DecisionQueue getDecisionQueue() {
229        return m_queue;
230    }
231
232    /**
233     * {@inheritDoc}
234     */
235    @Override
236    public List< Workflow > getOwnerWorkflows( final Session session ) {
237        final List< Workflow > workflows = new ArrayList<>();
238        if ( session.isAuthenticated() ) {
239            final Principal[] sessionPrincipals = session.getPrincipals();
240            for( final Workflow w : m_workflows ) {
241                final Principal owner = w.getOwner();
242                for ( final Principal sessionPrincipal : sessionPrincipals ) {
243                    if ( sessionPrincipal.equals( owner ) ) {
244                        workflows.add( w );
245                        break;
246                    }
247                }
248            }
249        }
250        return workflows;
251    }
252
253    /**
254     * Listens for {@link WorkflowEvent} objects emitted by Workflows. In particular, this method listens for {@link WorkflowEvent#CREATED},
255     * {@link WorkflowEvent#ABORTED}, {@link WorkflowEvent#COMPLETED} and {@link WorkflowEvent#DQ_REMOVAL} events. If a workflow is created,
256     * it is automatically added to the cache. If one is aborted or completed, it is automatically removed. If a removal from decision queue
257     * is issued, the current step from workflow, which is assumed to be a {@link Decision}, is removed from the {@link DecisionQueue}.
258     * 
259     * @param event the event passed to this listener
260     */
261    @Override
262    public void actionPerformed( final WikiEvent event ) {
263        if( event instanceof WorkflowEvent ) {
264            if( event.getSrc() instanceof Workflow ) {
265                final Workflow workflow = event.getSrc();
266                switch( event.getType() ) {
267                // Remove from manager
268                case WorkflowEvent.ABORTED   :
269                case WorkflowEvent.COMPLETED : remove( workflow ); break;
270                // Add to manager
271                case WorkflowEvent.CREATED   : add( workflow ); break;
272                default: break;
273                }
274            } else if( event.getSrc() instanceof Decision ) {
275                final Decision decision = event.getSrc();
276                switch( event.getType() ) {
277                // Add to DecisionQueue
278                case WorkflowEvent.DQ_ADDITION : addToDecisionQueue( decision ); break;
279                // Remove from DecisionQueue
280                case WorkflowEvent.DQ_REMOVAL  : removeFromDecisionQueue( decision, event.getArg( 0, Context.class ) ); break;
281                default: break;
282                }
283            }
284            serializeToDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) );
285        }
286    }
287
288    /**
289     * Protected helper method that adds a newly created Workflow to the cache, and sets its {@code workflowManager} and
290     * {@code Id} properties if not set.
291     *
292     * @param workflow the workflow to add
293     */
294    protected void add( final Workflow workflow ) {
295        m_workflows.add( workflow );
296    }
297
298    /**
299     * Protected helper method that removes a specified Workflow from the cache, and moves it to the workflow history list. This method
300     * defensively checks to see if the workflow has not yet been removed.
301     *
302     * @param workflow the workflow to remove
303     */
304    protected void remove( final Workflow workflow ) {
305        if( m_workflows.contains( workflow ) ) {
306            m_workflows.remove( workflow );
307            m_completed.add( workflow );
308        }
309    }
310
311    protected void removeFromDecisionQueue( final Decision decision, final Context context ) {
312        // If current workflow is waiting for input, restart it and remove Decision from DecisionQueue
313        final int workflowId = decision.getWorkflowId();
314        final Optional< Workflow > optw = m_workflows.stream().filter( w -> w.getId() == workflowId ).findAny();
315        if( optw.isPresent() ) {
316            final Workflow w = optw.get();
317            if( w.getCurrentState() == Workflow.WAITING && decision.equals( w.getCurrentStep() ) ) {
318                getDecisionQueue().remove( decision );
319                // Restart workflow
320                try {
321                    w.restart( context );
322                } catch( final WikiException e ) {
323                    LOG.error( "restarting workflow #" + w.getId() + " caused " + e.getMessage(), e );
324                }
325            }
326        }
327    }
328
329    protected void addToDecisionQueue( final Decision decision ) {
330        getDecisionQueue().add( decision );
331    }
332
333}