001/* 002 Licensed to the Apache Software Foundation (ASF) under one 003 or more contributor license agreements. See the NOTICE file 004 distributed with this work for additional information 005 regarding copyright ownership. The ASF licenses this file 006 to you under the Apache License, Version 2.0 (the 007 "License"); you may not use this file except in compliance 008 with the License. You may obtain a copy of the License at 009 010 http://www.apache.org/licenses/LICENSE-2.0 011 012 Unless required by applicable law or agreed to in writing, 013 software distributed under the License is distributed on an 014 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 KIND, either express or implied. See the License for the 016 specific language governing permissions and limitations 017 under the License. 018 */ 019package org.apache.wiki.workflow; 020 021import org.apache.commons.collections4.queue.CircularFifoQueue; 022import org.apache.commons.lang3.time.StopWatch; 023import org.apache.logging.log4j.LogManager; 024import org.apache.logging.log4j.Logger; 025import org.apache.wiki.api.core.Context; 026import org.apache.wiki.api.core.Engine; 027import org.apache.wiki.api.core.Session; 028import org.apache.wiki.api.exceptions.WikiException; 029import org.apache.wiki.auth.AuthorizationManager; 030import org.apache.wiki.auth.acl.UnresolvedPrincipal; 031import org.apache.wiki.event.WikiEvent; 032import org.apache.wiki.event.WikiEventEmitter; 033import org.apache.wiki.event.WorkflowEvent; 034import org.apache.wiki.util.TextUtil; 035 036import java.io.*; 037import java.nio.file.Files; 038import java.security.Principal; 039import java.util.*; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.CopyOnWriteArrayList; 042 043 044/** 045 * <p> 046 * Monitor class that tracks running Workflows. The WorkflowManager also keeps track of the names of 047 * users or groups expected to approve particular Workflows. 048 * </p> 049 */ 050public class DefaultWorkflowManager implements WorkflowManager { 051 052 private static final Logger LOG = LogManager.getLogger( DefaultWorkflowManager.class ); 053 static final String SERIALIZATION_FILE = "wkflmgr.ser"; 054 055 /** We use this also a generic serialization id */ 056 private static final long serialVersionUID = 6L; 057 058 DecisionQueue m_queue; 059 Set< Workflow > m_workflows; 060 final Map< String, Principal > m_approvers; 061 Queue< Workflow > m_completed; 062 private Engine m_engine; 063 private int retainCompleted; 064 065 /** 066 * Constructs a new WorkflowManager, with an empty workflow cache. 067 */ 068 public DefaultWorkflowManager() { 069 m_workflows = ConcurrentHashMap.newKeySet(); 070 m_approvers = new ConcurrentHashMap<>(); 071 m_queue = new DecisionQueue(); 072 WikiEventEmitter.attach( this ); 073 } 074 075 /** 076 * {@inheritDoc} 077 */ 078 @Override 079 public Set< Workflow > getWorkflows() { 080 final Set< Workflow > workflows = ConcurrentHashMap.newKeySet(); 081 workflows.addAll( m_workflows ); 082 return workflows; 083 } 084 085 /** 086 * {@inheritDoc} 087 */ 088 @Override 089 public List< Workflow > getCompletedWorkflows() { 090 return new CopyOnWriteArrayList< >( m_completed ); 091 } 092 093 /** 094 * {@inheritDoc} 095 * 096 * Any properties that begin with {@link #PROPERTY_APPROVER_PREFIX} will be assumed to be Decisions that require approval. For a given 097 * property key, everything after the prefix denotes the Decision's message key. The property value indicates the Principal (Role, 098 * GroupPrincipal, WikiPrincipal) that must approve the Decision. For example, if the property key/value pair is 099 * {@code jspwiki.approver.workflow.saveWikiPage=Admin}, the Decision's message key is <code>workflow.saveWikiPage</code>. The Principal 100 * <code>Admin</code> will be resolved via {@link org.apache.wiki.auth.AuthorizationManager#resolvePrincipal(String)}. 101 */ 102 @Override 103 public void initialize( final Engine engine, final Properties props ) { 104 m_engine = engine; 105 retainCompleted = TextUtil.getIntegerProperty( engine.getWikiProperties(), "jspwiki.workflow.completed.retain", 2048 ); 106 m_completed = new CircularFifoQueue<>( retainCompleted ); 107 108 // Identify the workflows requiring approvals 109 for( final Object o : props.keySet() ) { 110 final String prop = ( String )o; 111 if( prop.startsWith( PROPERTY_APPROVER_PREFIX ) ) { 112 // For the key, everything after the prefix is the workflow name 113 final String key = prop.substring( PROPERTY_APPROVER_PREFIX.length() ); 114 if( key.length() > 0 ) { 115 // Only use non-null/non-blank approvers 116 final String approver = props.getProperty( prop ); 117 if( approver != null && !approver.isEmpty() ) { 118 m_approvers.put( key, new UnresolvedPrincipal( approver ) ); 119 } 120 } 121 } 122 } 123 124 unserializeFromDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) ); 125 } 126 127 /** 128 * Reads the serialized data from the disk back to memory. 129 * 130 * @return the date when the data was last written on disk or {@code 0} if there has been problems reading from disk. 131 */ 132 @SuppressWarnings( "unchecked" ) 133 synchronized long unserializeFromDisk( final File f ) { 134 long saved = 0L; 135 final StopWatch sw = new StopWatch(); 136 sw.start(); 137 try( final ObjectInputStream in = new ObjectInputStream( new BufferedInputStream( Files.newInputStream( f.toPath() ) ) ) ) { 138 final long ver = in.readLong(); 139 if( ver != serialVersionUID ) { 140 LOG.warn( "File format has changed; Unable to recover workflows and decision queue from disk." ); 141 } else { 142 saved = in.readLong(); 143 m_workflows = ( Set< Workflow > )in.readObject(); 144 m_queue = ( DecisionQueue )in.readObject(); 145 m_completed = new CircularFifoQueue<>( retainCompleted ); 146 m_completed.addAll( ( Collection< Workflow > )in.readObject() ); 147 LOG.debug( "Read serialized data successfully in " + sw ); 148 } 149 } catch( final IOException | ClassNotFoundException e ) { 150 LOG.warn( "unable to recover from disk workflows and decision queue: " + e.getMessage() ); 151 } 152 sw.stop(); 153 154 return saved; 155 } 156 157 /** 158 * Serializes workflows and decisionqueue to disk. The format is private, don't touch it. 159 */ 160 synchronized void serializeToDisk( final File f ) { 161 try( final ObjectOutputStream out = new ObjectOutputStream( new BufferedOutputStream( Files.newOutputStream( f.toPath() ) ) ) ) { 162 final StopWatch sw = new StopWatch(); 163 sw.start(); 164 165 out.writeLong( serialVersionUID ); 166 out.writeLong( System.currentTimeMillis() ); // Timestamp 167 out.writeObject( m_workflows ); 168 out.writeObject( m_queue ); 169 out.writeObject( m_completed ); 170 171 sw.stop(); 172 173 LOG.debug( "serialization done - took " + sw ); 174 } catch( final IOException ioe ) { 175 LOG.error( "Unable to serialize!", ioe ); 176 } 177 } 178 179 /** 180 * {@inheritDoc} 181 */ 182 @Override 183 public boolean requiresApproval( final String messageKey ) { 184 return m_approvers.containsKey( messageKey ); 185 } 186 187 /** 188 * {@inheritDoc} 189 */ 190 @Override 191 public Principal getApprover( final String messageKey ) throws WikiException { 192 Principal approver = m_approvers.get( messageKey ); 193 if ( approver == null ) { 194 throw new WikiException( "Workflow '" + messageKey + "' does not require approval." ); 195 } 196 197 // Try to resolve UnresolvedPrincipals 198 if ( approver instanceof UnresolvedPrincipal ) { 199 final String name = approver.getName(); 200 approver = m_engine.getManager( AuthorizationManager.class ).resolvePrincipal( name ); 201 202 // If still unresolved, throw exception; otherwise, freshen our cache 203 if ( approver instanceof UnresolvedPrincipal ) { 204 throw new WikiException( "Workflow approver '" + name + "' cannot not be resolved." ); 205 } 206 207 m_approvers.put( messageKey, approver ); 208 } 209 return approver; 210 } 211 212 /** 213 * Protected helper method that returns the associated Engine 214 * 215 * @return the wiki engine 216 */ 217 protected Engine getEngine() { 218 if ( m_engine == null ) { 219 throw new IllegalStateException( "Engine cannot be null; please initialize WorkflowManager first." ); 220 } 221 return m_engine; 222 } 223 224 /** 225 * Returns the DecisionQueue associated with this WorkflowManager 226 * 227 * @return the decision queue 228 */ 229 @Override 230 public DecisionQueue getDecisionQueue() { 231 return m_queue; 232 } 233 234 /** 235 * {@inheritDoc} 236 */ 237 @Override 238 public List< Workflow > getOwnerWorkflows( final Session session ) { 239 final List< Workflow > workflows = new ArrayList<>(); 240 if ( session.isAuthenticated() ) { 241 final Principal[] sessionPrincipals = session.getPrincipals(); 242 for( final Workflow w : m_workflows ) { 243 final Principal owner = w.getOwner(); 244 for ( final Principal sessionPrincipal : sessionPrincipals ) { 245 if ( sessionPrincipal.equals( owner ) ) { 246 workflows.add( w ); 247 break; 248 } 249 } 250 } 251 } 252 return workflows; 253 } 254 255 /** 256 * Listens for {@link WorkflowEvent} objects emitted by Workflows. In particular, this method listens for {@link WorkflowEvent#CREATED}, 257 * {@link WorkflowEvent#ABORTED}, {@link WorkflowEvent#COMPLETED} and {@link WorkflowEvent#DQ_REMOVAL} events. If a workflow is created, 258 * it is automatically added to the cache. If one is aborted or completed, it is automatically removed. If a removal from decision queue 259 * is issued, the current step from workflow, which is assumed to be a {@link Decision}, is removed from the {@link DecisionQueue}. 260 * 261 * @param event the event passed to this listener 262 */ 263 @Override 264 public void actionPerformed( final WikiEvent event ) { 265 if( event instanceof WorkflowEvent ) { 266 if( event.getSrc() instanceof Workflow ) { 267 final Workflow workflow = event.getSrc(); 268 switch( event.getType() ) { 269 // Remove from manager 270 case WorkflowEvent.ABORTED : 271 case WorkflowEvent.COMPLETED : remove( workflow ); break; 272 // Add to manager 273 case WorkflowEvent.CREATED : add( workflow ); break; 274 default: break; 275 } 276 } else if( event.getSrc() instanceof Decision ) { 277 final Decision decision = event.getSrc(); 278 switch( event.getType() ) { 279 // Add to DecisionQueue 280 case WorkflowEvent.DQ_ADDITION : addToDecisionQueue( decision ); break; 281 // Remove from DecisionQueue 282 case WorkflowEvent.DQ_REMOVAL : removeFromDecisionQueue( decision, event.getArg( 0, Context.class ) ); break; 283 default: break; 284 } 285 } 286 serializeToDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) ); 287 } 288 } 289 290 /** 291 * Protected helper method that adds a newly created Workflow to the cache, and sets its {@code workflowManager} and 292 * {@code Id} properties if not set. 293 * 294 * @param workflow the workflow to add 295 */ 296 protected void add( final Workflow workflow ) { 297 m_workflows.add( workflow ); 298 } 299 300 /** 301 * Protected helper method that removes a specified Workflow from the cache, and moves it to the workflow history list. This method 302 * defensively checks to see if the workflow has not yet been removed. 303 * 304 * @param workflow the workflow to remove 305 */ 306 protected void remove( final Workflow workflow ) { 307 if( m_workflows.contains( workflow ) ) { 308 m_workflows.remove( workflow ); 309 m_completed.add( workflow ); 310 } 311 } 312 313 protected void removeFromDecisionQueue( final Decision decision, final Context context ) { 314 // If current workflow is waiting for input, restart it and remove Decision from DecisionQueue 315 final int workflowId = decision.getWorkflowId(); 316 final Optional< Workflow > optw = m_workflows.stream().filter( w -> w.getId() == workflowId ).findAny(); 317 if( optw.isPresent() ) { 318 final Workflow w = optw.get(); 319 if( w.getCurrentState() == Workflow.WAITING && decision.equals( w.getCurrentStep() ) ) { 320 getDecisionQueue().remove( decision ); 321 // Restart workflow 322 try { 323 w.restart( context ); 324 } catch( final WikiException e ) { 325 LOG.error( "restarting workflow #" + w.getId() + " caused " + e.getMessage(), e ); 326 } 327 } 328 } 329 } 330 331 protected void addToDecisionQueue( final Decision decision ) { 332 getDecisionQueue().add( decision ); 333 } 334 335}