001/* 002 Licensed to the Apache Software Foundation (ASF) under one 003 or more contributor license agreements. See the NOTICE file 004 distributed with this work for additional information 005 regarding copyright ownership. The ASF licenses this file 006 to you under the Apache License, Version 2.0 (the 007 "License"); you may not use this file except in compliance 008 with the License. You may obtain a copy of the License at 009 010 http://www.apache.org/licenses/LICENSE-2.0 011 012 Unless required by applicable law or agreed to in writing, 013 software distributed under the License is distributed on an 014 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 KIND, either express or implied. See the License for the 016 specific language governing permissions and limitations 017 under the License. 018 */ 019package org.apache.wiki.workflow; 020 021import org.apache.commons.collections4.queue.CircularFifoQueue; 022import org.apache.commons.lang3.time.StopWatch; 023import org.apache.logging.log4j.LogManager; 024import org.apache.logging.log4j.Logger; 025import org.apache.wiki.api.core.Context; 026import org.apache.wiki.api.core.Engine; 027import org.apache.wiki.api.core.Session; 028import org.apache.wiki.api.exceptions.WikiException; 029import org.apache.wiki.auth.AuthorizationManager; 030import org.apache.wiki.auth.acl.UnresolvedPrincipal; 031import org.apache.wiki.event.WikiEvent; 032import org.apache.wiki.event.WikiEventEmitter; 033import org.apache.wiki.event.WorkflowEvent; 034import org.apache.wiki.util.TextUtil; 035 036import java.io.*; 037import java.nio.file.Files; 038import java.security.Principal; 039import java.util.*; 040import java.util.concurrent.ConcurrentHashMap; 041import java.util.concurrent.CopyOnWriteArrayList; 042 043 044/** 045 * <p> 046 * Monitor class that tracks running Workflows. The WorkflowManager also keeps track of the names of 047 * users or groups expected to approve particular Workflows. 048 * </p> 049 */ 050public class DefaultWorkflowManager implements WorkflowManager, Serializable { 051 052 private static final Logger LOG = LogManager.getLogger( DefaultWorkflowManager.class ); 053 static final String SERIALIZATION_FILE = "wkflmgr.ser"; 054 055 /** We use this also a generic serialization id */ 056 private static final long serialVersionUID = 6L; 057 058 DecisionQueue m_queue; 059 Set< Workflow > m_workflows; 060 final Map< String, Principal > m_approvers; 061 Queue< Workflow > m_completed; 062 private Engine m_engine; 063 private int retainCompleted; 064 065 /** 066 * Constructs a new WorkflowManager, with an empty workflow cache. 067 */ 068 public DefaultWorkflowManager() { 069 m_workflows = ConcurrentHashMap.newKeySet(); 070 m_approvers = new ConcurrentHashMap<>(); 071 m_queue = new DecisionQueue(); 072 WikiEventEmitter.attach( this ); 073 } 074 075 /** 076 * {@inheritDoc} 077 */ 078 @Override 079 public Set< Workflow > getWorkflows() { 080 final Set< Workflow > workflows = ConcurrentHashMap.newKeySet(); 081 workflows.addAll( m_workflows ); 082 return workflows; 083 } 084 085 /** 086 * {@inheritDoc} 087 */ 088 @Override 089 public Map< Integer, Workflow > getWorkflowsAsMap() { 090 final Map< Integer, Workflow > workflows = new ConcurrentHashMap<>(); 091 m_workflows.forEach( w -> workflows.put( w.getId(), w ) ); 092 return workflows; 093 } 094 095 /** 096 * {@inheritDoc} 097 */ 098 @Override 099 public List< Workflow > getCompletedWorkflows() { 100 return new CopyOnWriteArrayList< >( m_completed ); 101 } 102 103 /** 104 * {@inheritDoc} 105 * 106 * Any properties that begin with {@link #PROPERTY_APPROVER_PREFIX} will be assumed to be Decisions that require approval. For a given 107 * property key, everything after the prefix denotes the Decision's message key. The property value indicates the Principal (Role, 108 * GroupPrincipal, WikiPrincipal) that must approve the Decision. For example, if the property key/value pair is 109 * {@code jspwiki.approver.workflow.saveWikiPage=Admin}, the Decision's message key is <code>workflow.saveWikiPage</code>. The Principal 110 * <code>Admin</code> will be resolved via {@link org.apache.wiki.auth.AuthorizationManager#resolvePrincipal(String)}. 111 */ 112 @Override 113 public void initialize( final Engine engine, final Properties props ) { 114 m_engine = engine; 115 retainCompleted = TextUtil.getIntegerProperty( engine.getWikiProperties(), "jspwiki.workflow.completed.retain", 2048 ); 116 m_completed = new CircularFifoQueue<>( retainCompleted ); 117 118 // Identify the workflows requiring approvals 119 for( final Object o : props.keySet() ) { 120 final String prop = ( String )o; 121 if( prop.startsWith( PROPERTY_APPROVER_PREFIX ) ) { 122 // For the key, everything after the prefix is the workflow name 123 final String key = prop.substring( PROPERTY_APPROVER_PREFIX.length() ); 124 if(!key.isEmpty()) { 125 // Only use non-null/non-blank approvers 126 final String approver = props.getProperty( prop ); 127 if( approver != null && !approver.isEmpty() ) { 128 m_approvers.put( key, new UnresolvedPrincipal( approver ) ); 129 } 130 } 131 } 132 } 133 134 unserializeFromDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) ); 135 } 136 137 /** 138 * Reads the serialized data from the disk back to memory. 139 * 140 * @return the date when the data was last written on disk or {@code 0} if there has been problems reading from disk. 141 */ 142 @SuppressWarnings( "unchecked" ) 143 synchronized long unserializeFromDisk( final File f ) { 144 long saved = 0L; 145 final StopWatch sw = new StopWatch(); 146 sw.start(); 147 try( final ObjectInputStream in = new ObjectInputStream( new BufferedInputStream( Files.newInputStream( f.toPath() ) ) ) ) { 148 final long ver = in.readLong(); 149 if( ver != serialVersionUID ) { 150 LOG.warn( "File format has changed; Unable to recover workflows and decision queue from disk." ); 151 } else { 152 saved = in.readLong(); 153 m_workflows = ( Set< Workflow > )in.readObject(); 154 m_queue = ( DecisionQueue )in.readObject(); 155 m_completed = new CircularFifoQueue<>( retainCompleted ); 156 m_completed.addAll( ( Collection< Workflow > )in.readObject() ); 157 LOG.debug( "Read serialized data successfully in " + sw ); 158 } 159 } catch( final IOException | ClassNotFoundException e ) { 160 LOG.warn( "unable to recover from disk workflows and decision queue: " + e.getMessage() ); 161 } 162 sw.stop(); 163 164 return saved; 165 } 166 167 /** 168 * Serializes workflows and decisionqueue to disk. The format is private, don't touch it. 169 */ 170 synchronized void serializeToDisk( final File f ) { 171 try( final ObjectOutputStream out = new ObjectOutputStream( new BufferedOutputStream( Files.newOutputStream( f.toPath() ) ) ) ) { 172 final StopWatch sw = new StopWatch(); 173 sw.start(); 174 175 out.writeLong( serialVersionUID ); 176 out.writeLong( System.currentTimeMillis() ); // Timestamp 177 out.writeObject( m_workflows ); 178 out.writeObject( m_queue ); 179 out.writeObject( m_completed ); 180 181 sw.stop(); 182 183 LOG.debug( "serialization done - took " + sw ); 184 } catch( final IOException ioe ) { 185 LOG.error( "Unable to serialize!", ioe ); 186 } 187 } 188 189 /** 190 * {@inheritDoc} 191 */ 192 @Override 193 public boolean requiresApproval( final String messageKey ) { 194 return m_approvers.containsKey( messageKey ); 195 } 196 197 /** 198 * {@inheritDoc} 199 */ 200 @Override 201 public Principal getApprover( final String messageKey ) throws WikiException { 202 Principal approver = m_approvers.get( messageKey ); 203 if ( approver == null ) { 204 throw new WikiException( "Workflow '" + messageKey + "' does not require approval." ); 205 } 206 207 // Try to resolve UnresolvedPrincipals 208 if ( approver instanceof UnresolvedPrincipal ) { 209 final String name = approver.getName(); 210 approver = m_engine.getManager( AuthorizationManager.class ).resolvePrincipal( name ); 211 212 // If still unresolved, throw exception; otherwise, freshen our cache 213 if ( approver instanceof UnresolvedPrincipal ) { 214 throw new WikiException( "Workflow approver '" + name + "' cannot not be resolved." ); 215 } 216 217 m_approvers.put( messageKey, approver ); 218 } 219 return approver; 220 } 221 222 /** 223 * Protected helper method that returns the associated Engine 224 * 225 * @return the wiki engine 226 */ 227 protected Engine getEngine() { 228 if ( m_engine == null ) { 229 throw new IllegalStateException( "Engine cannot be null; please initialize WorkflowManager first." ); 230 } 231 return m_engine; 232 } 233 234 /** 235 * Returns the DecisionQueue associated with this WorkflowManager 236 * 237 * @return the decision queue 238 */ 239 @Override 240 public DecisionQueue getDecisionQueue() { 241 return m_queue; 242 } 243 244 /** 245 * {@inheritDoc} 246 */ 247 @Override 248 public List< Workflow > getOwnerWorkflows( final Session session ) { 249 final List< Workflow > workflows = new ArrayList<>(); 250 if ( session.isAuthenticated() ) { 251 final Principal[] sessionPrincipals = session.getPrincipals(); 252 for( final Workflow w : m_workflows ) { 253 final Principal owner = w.getOwner(); 254 if (Arrays.stream(sessionPrincipals).anyMatch(sessionPrincipal -> sessionPrincipal.equals(owner))) { 255 workflows.add(w); 256 } 257 } 258 } 259 return workflows; 260 } 261 262 /** 263 * Listens for {@link WorkflowEvent} objects emitted by Workflows. In particular, this method listens for {@link WorkflowEvent#CREATED}, 264 * {@link WorkflowEvent#ABORTED}, {@link WorkflowEvent#COMPLETED} and {@link WorkflowEvent#DQ_REMOVAL} events. If a workflow is created, 265 * it is automatically added to the cache. If one is aborted or completed, it is automatically removed. If a removal from decision queue 266 * is issued, the current step from workflow, which is assumed to be a {@link Decision}, is removed from the {@link DecisionQueue}. 267 * 268 * @param event the event passed to this listener 269 */ 270 @Override 271 public void actionPerformed( final WikiEvent event ) { 272 if( event instanceof WorkflowEvent ) { 273 if( event.getSrc() instanceof Workflow ) { 274 final Workflow workflow = event.getSrc(); 275 switch( event.getType() ) { 276 // Remove from manager 277 case WorkflowEvent.ABORTED : 278 case WorkflowEvent.COMPLETED : remove( workflow ); break; 279 // Add to manager 280 case WorkflowEvent.CREATED : add( workflow ); break; 281 default: break; 282 } 283 } else if( event.getSrc() instanceof Decision ) { 284 final Decision decision = event.getSrc(); 285 switch( event.getType() ) { 286 // Add to DecisionQueue 287 case WorkflowEvent.DQ_ADDITION : addToDecisionQueue( decision ); break; 288 // Remove from DecisionQueue 289 case WorkflowEvent.DQ_REMOVAL : removeFromDecisionQueue( decision, event.getArg( 0, Context.class ) ); break; 290 default: break; 291 } 292 } 293 serializeToDisk( new File( m_engine.getWorkDir(), SERIALIZATION_FILE ) ); 294 } 295 } 296 297 /** 298 * Protected helper method that adds a newly created Workflow to the cache, and sets its {@code workflowManager} and 299 * {@code Id} properties if not set. 300 * 301 * @param workflow the workflow to add 302 */ 303 protected void add( final Workflow workflow ) { 304 m_workflows.add( workflow ); 305 } 306 307 /** 308 * Protected helper method that removes a specified Workflow from the cache, and moves it to the workflow history list. This method 309 * defensively checks to see if the workflow has not yet been removed. 310 * 311 * @param workflow the workflow to remove 312 */ 313 protected void remove( final Workflow workflow ) { 314 if( m_workflows.contains( workflow ) ) { 315 m_workflows.remove( workflow ); 316 m_completed.add( workflow ); 317 } 318 } 319 320 protected void removeFromDecisionQueue( final Decision decision, final Context context ) { 321 // If current workflow is waiting for input, restart it and remove Decision from DecisionQueue 322 final int workflowId = decision.getWorkflowId(); 323 final Optional< Workflow > optw = m_workflows.stream().filter( w -> w.getId() == workflowId ).findAny(); 324 if( optw.isPresent() ) { 325 final Workflow w = optw.get(); 326 if( w.getCurrentState() == Workflow.WAITING && decision.equals( w.getCurrentStep() ) ) { 327 getDecisionQueue().remove( decision ); 328 // Restart workflow 329 try { 330 w.restart( context ); 331 } catch( final WikiException e ) { 332 LOG.error( "restarting workflow #" + w.getId() + " caused " + e.getMessage(), e ); 333 } 334 } 335 } 336 } 337 338 protected void addToDecisionQueue( final Decision decision ) { 339 getDecisionQueue().add( decision ); 340 } 341 342}