View Javadoc

1   /*
2    * $Id: ProcessControllerImpl.java,v 1.1 2004/12/15 14:18:10 patforna Exp $
3    *
4    * Copyright (c) 2004 Patric Fornasier, Pawel Kowalski
5    * Berne University of Applied Sciences
6    * School of Engineering and Information Technology
7    * All rights reserved.
8    */
9   package bexee.core;
10  
11  import java.util.Collection;
12  import java.util.Iterator;
13  import java.util.List;
14  import java.util.Map;
15  
16  import javax.wsdl.Binding;
17  import javax.wsdl.Definition;
18  import javax.wsdl.Message;
19  import javax.wsdl.Operation;
20  import javax.wsdl.Part;
21  import javax.wsdl.Port;
22  import javax.wsdl.PortType;
23  import javax.wsdl.extensions.ExtensibilityElement;
24  import javax.wsdl.extensions.soap.SOAPAddress;
25  import javax.xml.namespace.QName;
26  import javax.xml.rpc.ParameterMode;
27  
28  import org.apache.axis.Constants;
29  import org.apache.axis.client.Call;
30  import org.apache.axis.client.Service;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  
34  import bexee.event.Event;
35  import bexee.model.activity.Activity;
36  import bexee.model.activity.Assign;
37  import bexee.model.activity.Compensate;
38  import bexee.model.activity.Empty;
39  import bexee.model.activity.Flow;
40  import bexee.model.activity.Invoke;
41  import bexee.model.activity.Receive;
42  import bexee.model.activity.Reply;
43  import bexee.model.activity.Sequence;
44  import bexee.model.activity.Switch;
45  import bexee.model.elements.BpelCase;
46  import bexee.model.elements.Copy;
47  import bexee.model.elements.Correlation;
48  import bexee.model.elements.CorrelationPattern;
49  import bexee.model.elements.From;
50  import bexee.model.elements.Link;
51  import bexee.model.elements.PartnerLink;
52  import bexee.model.elements.PartnerLinks;
53  import bexee.model.elements.To;
54  import bexee.model.elements.Variable;
55  import bexee.model.elements.Variables;
56  import bexee.model.process.BPELProcess;
57  import bexee.model.process.Process;
58  
59  /***
60   * The default imlementation of the <code>ProcessController</code>.
61   * 
62   * @version $Revision: 1.1 $, $Date: 2004/12/15 14:18:10 $
63   * @author Patric Fornasier
64   * @author Pawel Kowalski
65   */
66  public class ProcessControllerImpl implements ProcessController {
67  
68      private static Log log = LogFactory.getLog(ProcessControllerImpl.class);
69  
70      //***************************************************/
71      // processor entry method
72      //***************************************************/
73  
74      public void processMessage(ProcessInstance instance, BexeeMessage message) {
75  
76          log.info("Processing a ProcessInstance");
77  
78          // initialize objects
79          //
80          ProcessContext ctx = instance.getContext();
81          BPELProcess process = instance.getProcess();
82  
83          ctx.setMessage(message);
84  
85          // start processing process elements
86          try {
87              process.getProcess().accept(this, instance);
88          } catch (AwaitMessageException e) {
89              e.printStackTrace();
90          } catch (Exception e) {
91              e.printStackTrace();
92          }
93      }
94  
95      //***************************************************/
96      // process the Process
97      //***************************************************/
98  
99      public void process(Process process, ProcessInstance instance)
100             throws Exception {
101 
102         log.info("Processing a Process");
103 
104         // process all child elements
105         Variables variables = process.getVariables();
106         if (variables != null) {
107             variables.accept(this, instance);
108         }
109 
110         PartnerLinks partnerLinks = process.getPartnerLinks();
111         if (partnerLinks != null) {
112             partnerLinks.accept(this, instance);
113         }
114 
115         Activity activity = process.getActivity();
116         // there must be one root activity of this process
117         if (activity != null) {
118             activity.accept(this, instance);
119         } else {
120             throw new MissingActivityException();
121         }
122     }
123 
124     //***************************************************/
125     // process structured activities
126     //***************************************************/
127 
128     public void process(Sequence sequence, ProcessInstance instance)
129             throws Exception {
130 
131         log(sequence);
132 
133         // process all activities in sequence
134         List activities = sequence.getActivities();
135         for (Iterator iter = activities.iterator(); iter.hasNext();) {
136             Activity activity = (Activity) iter.next();
137             activity.accept(this, instance);
138         }
139     }
140 
141     /***
142      * This is the implementation of the Flow activity which is used for
143      * parallel execution. In this primitive implementation we don't use any
144      * thread pooling mechanisms. In a more sophisticated version of the
145      * ProcessController, we might make use of the
146      * http://jakarta.apache.org/commons/pool/ library for thread pooling.
147      */
148     public void process(Flow flow, ProcessInstance instance) throws Exception {
149 
150         log(flow);
151 
152         List activities = flow.getActivities();
153         int activitiesCount = activities.size();
154         Thread[] flowThreads = new Thread[activitiesCount];
155 
156         // execute activities in separate threads
157         // for parallelity
158         //
159         for (int i = 0; i < activitiesCount; i++) {
160             Activity activity = (Activity) activities.get(i);
161             flowThreads[i] = new FlowThread(this, instance, activity);
162             flowThreads[i].start();
163         }
164 
165         // wait for termination of all started FlowThreads
166         for (int i = 0; i < flowThreads.length; i++) {
167             flowThreads[i].join();
168         }
169 
170         /***
171          * TODO because the run() Thread method is not allowed to throw
172          * Exceptions, it is necessary to check the status of each executed
173          * Thread in order to know wheter it finished its taks correctly or
174          * failed.
175          */
176     }
177 
178     /***
179      * A default implementation of the
180      * <code>process(Switch, ProcessInstance)</code> method.
181      *  
182      */
183     public void process(Switch bpelSwitch, ProcessInstance instance)
184             throws Exception {
185 
186         List bpelCases = bpelSwitch.getCases();
187         for (Iterator iter = bpelCases.iterator(); iter.hasNext();) {
188             BpelCase bpelCase = (BpelCase) iter.next();
189             if (bpelCase.getBooleanExpression().evaluate()) {
190                 bpelCase.getCaseActivity().accept(this, instance);
191                 return;
192             }
193         }
194 
195         Activity otherwise = bpelSwitch.getOtherwise();
196         if (otherwise != null) {
197             otherwise.accept(this, instance);
198         }
199     }
200 
201     //***************************************************/
202     // process atomic activities
203     //***************************************************/
204 
205     public void process(Receive receive, ProcessInstance instance)
206             throws AwaitMessageException {
207 
208         log(receive);
209 
210         // get process context
211         ProcessContext ctx = instance.getContext();
212         BPELProcess process = instance.getProcess();
213 
214         Event event = ctx.getEvent(receive);
215         if (event == null) {
216 
217             BexeeMessage message = ctx.removeMessage();
218 
219             if (message == null) {
220                 throw new AwaitMessageException(receive);
221             } else {
222                 event = new Event(receive);
223 
224                 // copy received value into context
225                 Variable var = receive.getVariable();
226                 ctx.setVariable(var, message.getParts());
227 
228                 ctx.addEvent(event);
229             }
230         }
231     }
232 
233     public void process(Assign assign, ProcessInstance instance)
234             throws Exception {
235 
236         log(assign);
237 
238         BPELProcess process = instance.getProcess();
239         ProcessContext ctx = instance.getContext();
240 
241         Event event = ctx.getEvent(assign);
242         if (event == null) {
243 
244             // event creation
245             event = new Event(assign);
246 
247             // get the copy of this assign
248             Copy copy = assign.getCopy();
249 
250             // get from information
251             From from = copy.getFrom();
252             Variable fromVar = from.getVariable();
253             String fromVarPart = from.getPart();
254 
255             // get to information
256             To to = copy.getTo();
257             Variable toVar = to.getVariable();
258             String toVarPart = to.getPart();
259 
260             // get the from variable part and assign it
261             Object variablePart = ctx.getVariablePart(fromVar, fromVarPart);
262 
263             // this is some magic just for demonstration purposes
264             //
265             String responseString = "";
266 
267             if (assign.getName() != null
268                     && assign.getName().indexOf("Response") != -1) {
269                 Collection varIds = ctx.getVariablesIdentifiers();
270                 for (Iterator iter = varIds.iterator(); iter.hasNext();) {
271                     Variable variable = (Variable) iter.next();
272                     Collection parts = ctx.getVariable(variable).values();
273                     for (Iterator iterator = parts.iterator(); iterator
274                             .hasNext();) {
275                         Object element = (Object) iterator.next();
276                         if (element != null) {
277                             responseString += "} {" + element.toString();
278                         }
279                     }
280                 }
281                 variablePart = responseString;
282             }
283 
284             ctx.setVariablePart(toVar, toVarPart, variablePart);
285             ctx.addEvent(event);
286         }
287     }
288 
289     public void process(Invoke invoke, ProcessInstance instance)
290             throws Exception {
291 
292         log(invoke);
293 
294         // initialize objects
295         //
296         ProcessContext ctx = instance.getContext();
297         BPELProcess process = instance.getProcess();
298 
299         Event event = ctx.getEvent(invoke);
300         if (event == null) {
301 
302             // event creation
303             event = new Event(invoke);
304 
305             // get in variable value
306             Variable inVar = invoke.getInputVariable();
307             Map inVarParts = ctx.getVariable(inVar);
308             Object[] variablePartsAsArray = inVarParts.values().toArray();
309 
310             // get out variable value
311             Variable outVar = invoke.getOutputVariable();
312             Map outVarParts = ctx.getVariable(outVar);
313 
314             // get infos about the web service to be invoked
315             String operationName = invoke.getOperation();
316             QName portTypeName = invoke.getPortType();
317 
318             // traverse definition in order to find information about the call
319             Definition definition = findWSDL(portTypeName, process);
320             PortType portType = definition.getPortType(portTypeName);
321             Operation operationType = portType.getOperation(operationName,
322                     null, null);
323             Message inMessage = operationType.getInput().getMessage();
324             Object[] messageParts = inMessage.getParts().values().toArray();
325 
326             Binding binding = findBinding(definition, portTypeName);
327             Port port = findPort(definition, binding);
328             SOAPAddress soapAddress = findSOAPAddress(port);
329 
330             // build service call
331             Service service = new Service();
332             Call call = (Call) service.createCall();
333             call.setTargetEndpointAddress(soapAddress.getLocationURI());
334             call.setPortName(invoke.getPortType());
335             call.setOperationName(invoke.getOperation());
336 
337             // add parameters to the call
338             for (int i = 0; i < messageParts.length; i++) {
339                 Part messagePart = (Part) messageParts[i];
340                 call.addParameter(messagePart.getName(), messagePart
341                         .getTypeName(), ParameterMode.IN);
342             }
343             call.setReturnType(Constants.XSD_STRING);
344 
345             // this will be the result of the invoke
346             Object result = null;
347 
348             // call synchronously
349             if (invoke.isSynchronous()) {
350                 result = call.invoke(variablePartsAsArray);
351             }
352             // call asynchronously
353             else {
354                 // TODO asynchronous call in new thread
355             }
356 
357             // assign result to the variable
358             Variable variable = invoke.getOutputVariable();
359             Message outMessage = operationType.getOutput().getMessage();
360             Object[] outparts = outMessage.getParts().values().toArray();
361             for (int i = 0; i < outparts.length; i++) {
362                 Part messagePart = (Part) messageParts[i];
363                 ctx.setVariablePart(variable, messagePart.getName(), result);
364             }
365 
366             ctx.addEvent(event);
367         }
368     }
369 
370     private Definition findWSDL(QName portTypeName, BPELProcess process) {
371 
372         Definition definition = null;
373         Binding binding = null;
374 
375         // find the right binding for the portType
376         //
377         Collection wsdlCollection = process.getPartnerWSDL();
378         mainloop: for (Iterator iter = wsdlCollection.iterator(); iter
379                 .hasNext();) {
380             definition = (Definition) iter.next();
381             Collection bindings = definition.getBindings().values();
382             for (Iterator iterator = bindings.iterator(); iterator.hasNext();) {
383                 binding = (Binding) iterator.next();
384                 if (binding.getPortType().getQName().equals(portTypeName)) {
385                     break mainloop;
386                 }
387             }
388         }
389         return definition;
390     }
391 
392     private Binding findBinding(Definition definition, QName portTypeName) {
393 
394         Binding binding = null;
395 
396         Collection bindings = definition.getBindings().values();
397         for (Iterator iterator = bindings.iterator(); iterator.hasNext();) {
398             binding = (Binding) iterator.next();
399             if (binding.getPortType().getQName().equals(portTypeName)) {
400                 break;
401             }
402         }
403         return binding;
404     }
405 
406     private Port findPort(Definition definition, Binding binding) {
407 
408         Port port = null;
409 
410         Collection services = definition.getServices().values();
411         for (Iterator iterator = services.iterator(); iterator.hasNext();) {
412             javax.wsdl.Service service = (javax.wsdl.Service) iterator.next();
413             Collection portTypes = service.getPorts().values();
414             for (Iterator iter2 = portTypes.iterator(); iter2.hasNext();) {
415                 port = (Port) iter2.next();
416                 if (port.getBinding().equals(binding)) {
417                     break;
418                 }
419             }
420         }
421         return port;
422     }
423 
424     private SOAPAddress findSOAPAddress(Port port) {
425 
426         SOAPAddress soapAddress = null;
427 
428         // find and return the location
429         //
430         Collection extElems = port.getExtensibilityElements();
431         for (Iterator iter = extElems.iterator(); iter.hasNext();) {
432             ExtensibilityElement element = (ExtensibilityElement) iter.next();
433             if (element instanceof SOAPAddress) {
434                 soapAddress = (SOAPAddress) element;
435                 break;
436             }
437         }
438         return soapAddress;
439     }
440 
441     private String findPortLocation(QName portTypeName, BPELProcess process) {
442 
443         Binding binding = null;
444         SOAPAddress soapAddress = null;
445         Port port = null;
446 
447         // find the right binding for the portType
448         //
449         Collection wsdlCollection = process.getPartnerWSDL();
450         mainloop: for (Iterator iter = wsdlCollection.iterator(); iter
451                 .hasNext();) {
452             Definition defn = (Definition) iter.next();
453             Collection bindings = defn.getBindings().values();
454             for (Iterator iterator = bindings.iterator(); iterator.hasNext();) {
455                 binding = (Binding) iterator.next();
456                 if (binding.getPortType().getQName().equals(portTypeName)) {
457                     break mainloop;
458                 }
459             }
460         }
461 
462         // find the right port within the service for the portType
463         //
464         mainloop: for (Iterator iter = wsdlCollection.iterator(); iter
465                 .hasNext();) {
466             Definition defn = (Definition) iter.next();
467             Collection services = defn.getServices().values();
468             for (Iterator iterator = services.iterator(); iterator.hasNext();) {
469                 javax.wsdl.Service service = (javax.wsdl.Service) iterator
470                         .next();
471                 Collection portTypes = service.getPorts().values();
472                 for (Iterator iter2 = portTypes.iterator(); iter2.hasNext();) {
473                     port = (Port) iter2.next();
474                     if (port.getBinding().equals(binding)) {
475                         break mainloop;
476                     }
477                 }
478             }
479         }
480 
481         // find and return the location
482         //
483         if (port != null) {
484             Collection extElems = port.getExtensibilityElements();
485             for (Iterator iter = extElems.iterator(); iter.hasNext();) {
486                 ExtensibilityElement element = (ExtensibilityElement) iter
487                         .next();
488                 if (element instanceof SOAPAddress) {
489                     soapAddress = (SOAPAddress) element;
490                     break;
491                 }
492             }
493             return soapAddress.getLocationURI();
494         }
495 
496         // no location found
497         //
498         return null;
499     }
500 
501     public void process(Reply reply, ProcessInstance instance) throws Exception {
502 
503         log(reply);
504 
505         BPELProcess process = instance.getProcess();
506         ProcessContext ctx = instance.getContext();
507 
508         Event event = ctx.getEvent(reply);
509         if (event == null) {
510 
511             event = new Event(reply);
512 
513             // copy variable into output
514             Variable variable = reply.getVariable();
515             Map parts = ctx.getVariable(variable);
516 
517             // we only return the first found part as a return
518             Object result = parts.values().iterator().next();
519             ctx.setResult(result);
520 
521             ctx.addEvent(event);
522         }
523     }
524 
525     public void process(Empty empty, ProcessInstance instance) {
526 
527         log(empty);
528 
529         BPELProcess process = instance.getProcess();
530         ProcessContext ctx = instance.getContext();
531 
532         Event event = ctx.getEvent(empty);
533         if (event == null) {
534             event = new Event(empty);
535 
536             //
537             // this is empty, do nothing
538             //
539 
540             ctx.addEvent(event);
541         }
542     }
543 
544     /***
545      * Process method for all unimplemented activities.
546      */
547     public void process(Activity activity, ProcessInstance instance) {
548         log(activity);
549         BPELProcess process = instance.getProcess();
550         ProcessContext ctx = instance.getContext();
551         Event event = ctx.getEvent(activity);
552         if (event == null) {
553             event = new Event(activity);
554             ctx.addEvent(event);
555         }
556     }
557 
558     //***************************************************/
559     // process Process elements
560     //***************************************************/
561 
562     public void process(Variables variables, ProcessInstance instance)
563             throws Exception {
564 
565         log.info("Processing Variables");
566 
567         // process all child elements
568         List list = variables.getVariables();
569         for (Iterator iter = list.iterator(); iter.hasNext();) {
570             Variable variable = (Variable) iter.next();
571             variable.accept(this, instance);
572         }
573     }
574 
575     public void process(Variable variable, ProcessInstance instance) {
576         log.info("Processing a Variable: " + variable.getName());
577     }
578 
579     public void process(PartnerLinks partnerLinks, ProcessInstance instance)
580             throws Exception {
581 
582         log.info("Processing PartnerLinks");
583 
584         // process all child elements
585         List list = partnerLinks.getPartnerLinks();
586         for (Iterator iter = list.iterator(); iter.hasNext();) {
587             PartnerLink partnerLink = (PartnerLink) iter.next();
588             partnerLink.accept(this, instance);
589         }
590     }
591 
592     public void process(PartnerLink partnerLink, ProcessInstance instance) {
593         log.info("Processing a PartnerLink: " + partnerLink.getName());
594     }
595 
596     //***************************************************/
597     // will not be implemented in the scope of the diploma project
598     //***************************************************/
599 
600     public void process(Link link, ProcessInstance instance) {
601     }
602 
603     public void process(Compensate impl, ProcessInstance instance) {
604     }
605 
606     public void process(Correlation correlation, ProcessInstance instance) {
607     }
608 
609     public void process(CorrelationPattern correlationPattern,
610             ProcessInstance instance) {
611     }
612 
613     //***************************************************/
614     // helper methods
615     //***************************************************/
616 
617     private Receive matchMessage(BexeeMessage message, List receives) {
618         // FIXME For now just return first encountered receive
619         return (Receive) receives.get(0);
620     }
621 
622     private void log(Activity activity) {
623         if (activity != null) {
624             log.info("Processing " + activity.getClass().getName() + " name: "
625                     + activity.getName());
626         } else {
627             log.info("Processing null activity");
628         }
629     }
630 
631     public void process(Copy copy, ProcessInstance instance) throws Exception {
632     }
633 
634 }