001/* 
002    Licensed to the Apache Software Foundation (ASF) under one
003    or more contributor license agreements.  See the NOTICE file
004    distributed with this work for additional information
005    regarding copyright ownership.  The ASF licenses this file
006    to you under the Apache License, Version 2.0 (the
007    "License"); you may not use this file except in compliance
008    with the License.  You may obtain a copy of the License at
009
010       http://www.apache.org/licenses/LICENSE-2.0
011
012    Unless required by applicable law or agreed to in writing,
013    software distributed under the License is distributed on an
014    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015    KIND, either express or implied.  See the License for the
016    specific language governing permissions and limitations
017    under the License.  
018 */
019package org.apache.wiki.workflow;
020
021import org.apache.commons.collections4.queue.CircularFifoQueue;
022import org.apache.commons.lang3.time.StopWatch;
023import org.apache.logging.log4j.LogManager;
024import org.apache.logging.log4j.Logger;
025import org.apache.wiki.api.core.Context;
026import org.apache.wiki.api.core.Engine;
027import org.apache.wiki.api.core.Session;
028import org.apache.wiki.api.exceptions.WikiException;
029import org.apache.wiki.auth.AuthorizationManager;
030import org.apache.wiki.auth.acl.UnresolvedPrincipal;
031import org.apache.wiki.event.WikiEvent;
032import org.apache.wiki.event.WikiEventEmitter;
033import org.apache.wiki.event.WorkflowEvent;
034import org.apache.wiki.util.TextUtil;
035
036import java.io.*;
037import java.nio.file.Files;
038import java.security.Principal;
039import java.util.*;
040import java.util.concurrent.ConcurrentHashMap;
041import java.util.concurrent.CopyOnWriteArrayList;
042
043
044/**
045 * <p>
046 * Monitor class that tracks running Workflows. The WorkflowManager also keeps track of the names of
047 * users or groups expected to approve particular Workflows.
048 * </p>
049 */
050public class DefaultWorkflowManager implements WorkflowManager, Serializable {
051
052    private static final Logger LOG = LogManager.getLogger( DefaultWorkflowManager.class );
053    static final String SERIALIZATION_FILE = "wkflmgr.ser";
054
055    /** We use this also a generic serialization id */
056    private static final long serialVersionUID = 6L;
057
058    DecisionQueue m_queue;
059    Set< Workflow > m_workflows;
060    final Map< String, Principal > m_approvers;
061    Queue< Workflow > m_completed;
062    private Engine m_engine;
063    private int retainCompleted;
064
065    /**
066     * Constructs a new WorkflowManager, with an empty workflow cache.
067     */
068    public DefaultWorkflowManager() {
069        m_workflows = ConcurrentHashMap.newKeySet();
070        m_approvers = new ConcurrentHashMap<>();
071        m_queue = new DecisionQueue();
072        WikiEventEmitter.attach( this );
073    }
074
075    /**
076     * {@inheritDoc}
077     */
078    @Override
079    public Set< Workflow > getWorkflows() {
080        final Set< Workflow > workflows = ConcurrentHashMap.newKeySet();
081        workflows.addAll( m_workflows );
082        return workflows;
083    }
084
085    /**
086     * {@inheritDoc}
087     */
088    @Override
089    public Map< Integer, Workflow > getWorkflowsAsMap() {
090        final Map< Integer, Workflow > workflows = new ConcurrentHashMap<>();
091        m_workflows.forEach( w -> workflows.put( w.getId(), w ) );
092        return workflows;
093    }
094
095    /**
096     * {@inheritDoc}
097     */
098    @Override
099    public List< Workflow > getCompletedWorkflows() {
100        return new CopyOnWriteArrayList< >( m_completed );
101    }
102
103    /**
104     * {@inheritDoc}
105     *
106     * Any properties that begin with {@link #PROPERTY_APPROVER_PREFIX} will be assumed to be Decisions that require approval. For a given
107     * property key, everything after the prefix denotes the Decision's message key. The property value indicates the Principal (Role,
108     * GroupPrincipal, WikiPrincipal) that must approve the Decision. For example, if the property key/value pair is
109     * {@code jspwiki.approver.workflow.saveWikiPage=Admin}, the Decision's message key is <code>workflow.saveWikiPage</code>. The Principal
110     * <code>Admin</code> will be resolved via {@link org.apache.wiki.auth.AuthorizationManager#resolvePrincipal(String)}.
111     */
112    @Override
113    public void initialize( final Engine engine, final Properties props ) {
114        m_engine = engine;
115        retainCompleted = TextUtil.getIntegerProperty( engine.getWikiProperties(), "jspwiki.workflow.completed.retain", 2048 );
116        m_completed = new CircularFifoQueue<>( retainCompleted );
117
118        // Identify the workflows requiring approvals
119        for( final Object o : props.keySet() ) {
120            final String prop = ( String )o;
121            if( prop.startsWith( PROPERTY_APPROVER_PREFIX ) ) {
122                // For the key, everything after the prefix is the workflow name
123                final String key = prop.substring( PROPERTY_APPROVER_PREFIX.length() );
124                if(!key.isEmpty()) {
125                    // Only use non-null/non-blank approvers
126                    final String approver = props.getProperty( prop );
127                    if( approver != null && !approver.isEmpty() ) {
128                        m_approvers.put( key, new UnresolvedPrincipal( approver ) );
129                    }
130                }
131            }
132        }
133
134        unserializeFromDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) );
135    }
136
137    /**
138     *  Reads the serialized data from the disk back to memory.
139     *
140     * @return the date when the data was last written on disk or {@code 0} if there has been problems reading from disk.
141     */
142    @SuppressWarnings( "unchecked" )
143    synchronized long unserializeFromDisk( final File f ) {
144        long saved = 0L;
145        final StopWatch sw = new StopWatch();
146        sw.start();
147        try( final ObjectInputStream in = new ObjectInputStream( new BufferedInputStream( Files.newInputStream( f.toPath() ) ) ) ) {
148            final long ver = in.readLong();
149            if( ver != serialVersionUID ) {
150                LOG.warn( "File format has changed; Unable to recover workflows and decision queue from disk." );
151            } else {
152                saved        = in.readLong();
153                m_workflows  = ( Set< Workflow > )in.readObject();
154                m_queue      = ( DecisionQueue )in.readObject();
155                m_completed = new CircularFifoQueue<>( retainCompleted );
156                m_completed.addAll( ( Collection< Workflow > )in.readObject() );
157                LOG.debug( "Read serialized data successfully in " + sw );
158            }
159        } catch( final IOException | ClassNotFoundException e ) {
160            LOG.warn( "unable to recover from disk workflows and decision queue: " + e.getMessage() );
161        }
162        sw.stop();
163
164        return saved;
165    }
166
167    /**
168     *  Serializes workflows and decisionqueue to disk.  The format is private, don't touch it.
169     */
170    synchronized void serializeToDisk( final File f ) {
171        try( final ObjectOutputStream out = new ObjectOutputStream( new BufferedOutputStream( Files.newOutputStream( f.toPath() ) ) ) ) {
172            final StopWatch sw = new StopWatch();
173            sw.start();
174
175            out.writeLong( serialVersionUID );
176            out.writeLong( System.currentTimeMillis() ); // Timestamp
177            out.writeObject( m_workflows );
178            out.writeObject( m_queue );
179            out.writeObject( m_completed );
180
181            sw.stop();
182
183            LOG.debug( "serialization done - took " + sw );
184        } catch( final IOException ioe ) {
185            LOG.error( "Unable to serialize!", ioe );
186        }
187    }
188
189    /**
190     * {@inheritDoc}
191     */
192    @Override
193    public boolean requiresApproval( final String messageKey ) {
194        return  m_approvers.containsKey( messageKey );
195    }
196
197    /**
198     * {@inheritDoc}
199     */
200    @Override
201    public Principal getApprover( final String messageKey ) throws WikiException {
202        Principal approver = m_approvers.get( messageKey );
203        if ( approver == null ) {
204            throw new WikiException( "Workflow '" + messageKey + "' does not require approval." );
205        }
206
207        // Try to resolve UnresolvedPrincipals
208        if ( approver instanceof UnresolvedPrincipal ) {
209            final String name = approver.getName();
210            approver = m_engine.getManager( AuthorizationManager.class ).resolvePrincipal( name );
211
212            // If still unresolved, throw exception; otherwise, freshen our cache
213            if ( approver instanceof UnresolvedPrincipal ) {
214                throw new WikiException( "Workflow approver '" + name + "' cannot not be resolved." );
215            }
216
217            m_approvers.put( messageKey, approver );
218        }
219        return approver;
220    }
221
222    /**
223     * Protected helper method that returns the associated Engine
224     *
225     * @return the wiki engine
226     */
227    protected Engine getEngine() {
228        if ( m_engine == null ) {
229            throw new IllegalStateException( "Engine cannot be null; please initialize WorkflowManager first." );
230        }
231        return m_engine;
232    }
233
234    /**
235     * Returns the DecisionQueue associated with this WorkflowManager
236     *
237     * @return the decision queue
238     */
239    @Override
240    public DecisionQueue getDecisionQueue() {
241        return m_queue;
242    }
243
244    /**
245     * {@inheritDoc}
246     */
247    @Override
248    public List< Workflow > getOwnerWorkflows( final Session session ) {
249        final List< Workflow > workflows = new ArrayList<>();
250        if ( session.isAuthenticated() ) {
251            final Principal[] sessionPrincipals = session.getPrincipals();
252            for( final Workflow w : m_workflows ) {
253                final Principal owner = w.getOwner();
254                if (Arrays.stream(sessionPrincipals).anyMatch(sessionPrincipal -> sessionPrincipal.equals(owner))) {
255                    workflows.add(w);
256                }
257            }
258        }
259        return workflows;
260    }
261
262    /**
263     * Listens for {@link WorkflowEvent} objects emitted by Workflows. In particular, this method listens for {@link WorkflowEvent#CREATED},
264     * {@link WorkflowEvent#ABORTED}, {@link WorkflowEvent#COMPLETED} and {@link WorkflowEvent#DQ_REMOVAL} events. If a workflow is created,
265     * it is automatically added to the cache. If one is aborted or completed, it is automatically removed. If a removal from decision queue
266     * is issued, the current step from workflow, which is assumed to be a {@link Decision}, is removed from the {@link DecisionQueue}.
267     * 
268     * @param event the event passed to this listener
269     */
270    @Override
271    public void actionPerformed( final WikiEvent event ) {
272        if( event instanceof WorkflowEvent ) {
273            if( event.getSrc() instanceof Workflow ) {
274                final Workflow workflow = event.getSrc();
275                switch( event.getType() ) {
276                // Remove from manager
277                case WorkflowEvent.ABORTED   :
278                case WorkflowEvent.COMPLETED : remove( workflow ); break;
279                // Add to manager
280                case WorkflowEvent.CREATED   : add( workflow ); break;
281                default: break;
282                }
283            } else if( event.getSrc() instanceof Decision ) {
284                final Decision decision = event.getSrc();
285                switch( event.getType() ) {
286                // Add to DecisionQueue
287                case WorkflowEvent.DQ_ADDITION : addToDecisionQueue( decision ); break;
288                // Remove from DecisionQueue
289                case WorkflowEvent.DQ_REMOVAL  : removeFromDecisionQueue( decision, event.getArg( 0, Context.class ) ); break;
290                default: break;
291                }
292            }
293            serializeToDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) );
294        }
295    }
296
297    /**
298     * Protected helper method that adds a newly created Workflow to the cache, and sets its {@code workflowManager} and
299     * {@code Id} properties if not set.
300     *
301     * @param workflow the workflow to add
302     */
303    protected void add( final Workflow workflow ) {
304        m_workflows.add( workflow );
305    }
306
307    /**
308     * Protected helper method that removes a specified Workflow from the cache, and moves it to the workflow history list. This method
309     * defensively checks to see if the workflow has not yet been removed.
310     *
311     * @param workflow the workflow to remove
312     */
313    protected void remove( final Workflow workflow ) {
314        if( m_workflows.contains( workflow ) ) {
315            m_workflows.remove( workflow );
316            m_completed.add( workflow );
317        }
318    }
319
320    protected void removeFromDecisionQueue( final Decision decision, final Context context ) {
321        // If current workflow is waiting for input, restart it and remove Decision from DecisionQueue
322        final int workflowId = decision.getWorkflowId();
323        final Optional< Workflow > optw = m_workflows.stream().filter( w -> w.getId() == workflowId ).findAny();
324        if( optw.isPresent() ) {
325            final Workflow w = optw.get();
326            if( w.getCurrentState() == Workflow.WAITING && decision.equals( w.getCurrentStep() ) ) {
327                getDecisionQueue().remove( decision );
328                // Restart workflow
329                try {
330                    w.restart( context );
331                } catch( final WikiException e ) {
332                    LOG.error( "restarting workflow #" + w.getId() + " caused " + e.getMessage(), e );
333                }
334            }
335        }
336    }
337
338    protected void addToDecisionQueue( final Decision decision ) {
339        getDecisionQueue().add( decision );
340    }
341
342}