1
2
3
4
5
6
7
8
9 package bexee.core;
10
11 import java.util.Collection;
12 import java.util.Iterator;
13 import java.util.List;
14 import java.util.Map;
15
16 import javax.wsdl.Binding;
17 import javax.wsdl.Definition;
18 import javax.wsdl.Message;
19 import javax.wsdl.Operation;
20 import javax.wsdl.Part;
21 import javax.wsdl.Port;
22 import javax.wsdl.PortType;
23 import javax.wsdl.extensions.ExtensibilityElement;
24 import javax.wsdl.extensions.soap.SOAPAddress;
25 import javax.xml.namespace.QName;
26 import javax.xml.rpc.ParameterMode;
27
28 import org.apache.axis.Constants;
29 import org.apache.axis.client.Call;
30 import org.apache.axis.client.Service;
31 import org.apache.commons.logging.Log;
32 import org.apache.commons.logging.LogFactory;
33
34 import bexee.event.Event;
35 import bexee.model.activity.Activity;
36 import bexee.model.activity.Assign;
37 import bexee.model.activity.Compensate;
38 import bexee.model.activity.Empty;
39 import bexee.model.activity.Flow;
40 import bexee.model.activity.Invoke;
41 import bexee.model.activity.Receive;
42 import bexee.model.activity.Reply;
43 import bexee.model.activity.Sequence;
44 import bexee.model.activity.Switch;
45 import bexee.model.elements.BpelCase;
46 import bexee.model.elements.Copy;
47 import bexee.model.elements.Correlation;
48 import bexee.model.elements.CorrelationPattern;
49 import bexee.model.elements.From;
50 import bexee.model.elements.Link;
51 import bexee.model.elements.PartnerLink;
52 import bexee.model.elements.PartnerLinks;
53 import bexee.model.elements.To;
54 import bexee.model.elements.Variable;
55 import bexee.model.elements.Variables;
56 import bexee.model.process.BPELProcess;
57 import bexee.model.process.Process;
58
59 /***
60 * The default imlementation of the <code>ProcessController</code>.
61 *
62 * @version $Revision: 1.1 $, $Date: 2004/12/15 14:18:10 $
63 * @author Patric Fornasier
64 * @author Pawel Kowalski
65 */
66 public class ProcessControllerImpl implements ProcessController {
67
68 private static Log log = LogFactory.getLog(ProcessControllerImpl.class);
69
70 //***************************************************/
71
72 //***************************************************/
73
74 public void processMessage(ProcessInstance instance, BexeeMessage message) {
75
76 log.info("Processing a ProcessInstance");
77
78
79
80 ProcessContext ctx = instance.getContext();
81 BPELProcess process = instance.getProcess();
82
83 ctx.setMessage(message);
84
85
86 try {
87 process.getProcess().accept(this, instance);
88 } catch (AwaitMessageException e) {
89 e.printStackTrace();
90 } catch (Exception e) {
91 e.printStackTrace();
92 }
93 }
94
95 //***************************************************/
96
97 //***************************************************/
98
99 public void process(Process process, ProcessInstance instance)
100 throws Exception {
101
102 log.info("Processing a Process");
103
104
105 Variables variables = process.getVariables();
106 if (variables != null) {
107 variables.accept(this, instance);
108 }
109
110 PartnerLinks partnerLinks = process.getPartnerLinks();
111 if (partnerLinks != null) {
112 partnerLinks.accept(this, instance);
113 }
114
115 Activity activity = process.getActivity();
116
117 if (activity != null) {
118 activity.accept(this, instance);
119 } else {
120 throw new MissingActivityException();
121 }
122 }
123
124 //***************************************************/
125
126 //***************************************************/
127
128 public void process(Sequence sequence, ProcessInstance instance)
129 throws Exception {
130
131 log(sequence);
132
133
134 List activities = sequence.getActivities();
135 for (Iterator iter = activities.iterator(); iter.hasNext();) {
136 Activity activity = (Activity) iter.next();
137 activity.accept(this, instance);
138 }
139 }
140
141 /***
142 * This is the implementation of the Flow activity which is used for
143 * parallel execution. In this primitive implementation we don't use any
144 * thread pooling mechanisms. In a more sophisticated version of the
145 * ProcessController, we might make use of the
146 * http://jakarta.apache.org/commons/pool/ library for thread pooling.
147 */
148 public void process(Flow flow, ProcessInstance instance) throws Exception {
149
150 log(flow);
151
152 List activities = flow.getActivities();
153 int activitiesCount = activities.size();
154 Thread[] flowThreads = new Thread[activitiesCount];
155
156
157
158
159 for (int i = 0; i < activitiesCount; i++) {
160 Activity activity = (Activity) activities.get(i);
161 flowThreads[i] = new FlowThread(this, instance, activity);
162 flowThreads[i].start();
163 }
164
165
166 for (int i = 0; i < flowThreads.length; i++) {
167 flowThreads[i].join();
168 }
169
170 /***
171 * TODO because the run() Thread method is not allowed to throw
172 * Exceptions, it is necessary to check the status of each executed
173 * Thread in order to know wheter it finished its taks correctly or
174 * failed.
175 */
176 }
177
178 /***
179 * A default implementation of the
180 * <code>process(Switch, ProcessInstance)</code> method.
181 *
182 */
183 public void process(Switch bpelSwitch, ProcessInstance instance)
184 throws Exception {
185
186 List bpelCases = bpelSwitch.getCases();
187 for (Iterator iter = bpelCases.iterator(); iter.hasNext();) {
188 BpelCase bpelCase = (BpelCase) iter.next();
189 if (bpelCase.getBooleanExpression().evaluate()) {
190 bpelCase.getCaseActivity().accept(this, instance);
191 return;
192 }
193 }
194
195 Activity otherwise = bpelSwitch.getOtherwise();
196 if (otherwise != null) {
197 otherwise.accept(this, instance);
198 }
199 }
200
201 //***************************************************/
202
203 //***************************************************/
204
205 public void process(Receive receive, ProcessInstance instance)
206 throws AwaitMessageException {
207
208 log(receive);
209
210
211 ProcessContext ctx = instance.getContext();
212 BPELProcess process = instance.getProcess();
213
214 Event event = ctx.getEvent(receive);
215 if (event == null) {
216
217 BexeeMessage message = ctx.removeMessage();
218
219 if (message == null) {
220 throw new AwaitMessageException(receive);
221 } else {
222 event = new Event(receive);
223
224
225 Variable var = receive.getVariable();
226 ctx.setVariable(var, message.getParts());
227
228 ctx.addEvent(event);
229 }
230 }
231 }
232
233 public void process(Assign assign, ProcessInstance instance)
234 throws Exception {
235
236 log(assign);
237
238 BPELProcess process = instance.getProcess();
239 ProcessContext ctx = instance.getContext();
240
241 Event event = ctx.getEvent(assign);
242 if (event == null) {
243
244
245 event = new Event(assign);
246
247
248 Copy copy = assign.getCopy();
249
250
251 From from = copy.getFrom();
252 Variable fromVar = from.getVariable();
253 String fromVarPart = from.getPart();
254
255
256 To to = copy.getTo();
257 Variable toVar = to.getVariable();
258 String toVarPart = to.getPart();
259
260
261 Object variablePart = ctx.getVariablePart(fromVar, fromVarPart);
262
263
264
265 String responseString = "";
266
267 if (assign.getName() != null
268 && assign.getName().indexOf("Response") != -1) {
269 Collection varIds = ctx.getVariablesIdentifiers();
270 for (Iterator iter = varIds.iterator(); iter.hasNext();) {
271 Variable variable = (Variable) iter.next();
272 Collection parts = ctx.getVariable(variable).values();
273 for (Iterator iterator = parts.iterator(); iterator
274 .hasNext();) {
275 Object element = (Object) iterator.next();
276 if (element != null) {
277 responseString += "} {" + element.toString();
278 }
279 }
280 }
281 variablePart = responseString;
282 }
283
284 ctx.setVariablePart(toVar, toVarPart, variablePart);
285 ctx.addEvent(event);
286 }
287 }
288
289 public void process(Invoke invoke, ProcessInstance instance)
290 throws Exception {
291
292 log(invoke);
293
294
295
296 ProcessContext ctx = instance.getContext();
297 BPELProcess process = instance.getProcess();
298
299 Event event = ctx.getEvent(invoke);
300 if (event == null) {
301
302
303 event = new Event(invoke);
304
305
306 Variable inVar = invoke.getInputVariable();
307 Map inVarParts = ctx.getVariable(inVar);
308 Object[] variablePartsAsArray = inVarParts.values().toArray();
309
310
311 Variable outVar = invoke.getOutputVariable();
312 Map outVarParts = ctx.getVariable(outVar);
313
314
315 String operationName = invoke.getOperation();
316 QName portTypeName = invoke.getPortType();
317
318
319 Definition definition = findWSDL(portTypeName, process);
320 PortType portType = definition.getPortType(portTypeName);
321 Operation operationType = portType.getOperation(operationName,
322 null, null);
323 Message inMessage = operationType.getInput().getMessage();
324 Object[] messageParts = inMessage.getParts().values().toArray();
325
326 Binding binding = findBinding(definition, portTypeName);
327 Port port = findPort(definition, binding);
328 SOAPAddress soapAddress = findSOAPAddress(port);
329
330
331 Service service = new Service();
332 Call call = (Call) service.createCall();
333 call.setTargetEndpointAddress(soapAddress.getLocationURI());
334 call.setPortName(invoke.getPortType());
335 call.setOperationName(invoke.getOperation());
336
337
338 for (int i = 0; i < messageParts.length; i++) {
339 Part messagePart = (Part) messageParts[i];
340 call.addParameter(messagePart.getName(), messagePart
341 .getTypeName(), ParameterMode.IN);
342 }
343 call.setReturnType(Constants.XSD_STRING);
344
345
346 Object result = null;
347
348
349 if (invoke.isSynchronous()) {
350 result = call.invoke(variablePartsAsArray);
351 }
352
353 else {
354
355 }
356
357
358 Variable variable = invoke.getOutputVariable();
359 Message outMessage = operationType.getOutput().getMessage();
360 Object[] outparts = outMessage.getParts().values().toArray();
361 for (int i = 0; i < outparts.length; i++) {
362 Part messagePart = (Part) messageParts[i];
363 ctx.setVariablePart(variable, messagePart.getName(), result);
364 }
365
366 ctx.addEvent(event);
367 }
368 }
369
370 private Definition findWSDL(QName portTypeName, BPELProcess process) {
371
372 Definition definition = null;
373 Binding binding = null;
374
375
376
377 Collection wsdlCollection = process.getPartnerWSDL();
378 mainloop: for (Iterator iter = wsdlCollection.iterator(); iter
379 .hasNext();) {
380 definition = (Definition) iter.next();
381 Collection bindings = definition.getBindings().values();
382 for (Iterator iterator = bindings.iterator(); iterator.hasNext();) {
383 binding = (Binding) iterator.next();
384 if (binding.getPortType().getQName().equals(portTypeName)) {
385 break mainloop;
386 }
387 }
388 }
389 return definition;
390 }
391
392 private Binding findBinding(Definition definition, QName portTypeName) {
393
394 Binding binding = null;
395
396 Collection bindings = definition.getBindings().values();
397 for (Iterator iterator = bindings.iterator(); iterator.hasNext();) {
398 binding = (Binding) iterator.next();
399 if (binding.getPortType().getQName().equals(portTypeName)) {
400 break;
401 }
402 }
403 return binding;
404 }
405
406 private Port findPort(Definition definition, Binding binding) {
407
408 Port port = null;
409
410 Collection services = definition.getServices().values();
411 for (Iterator iterator = services.iterator(); iterator.hasNext();) {
412 javax.wsdl.Service service = (javax.wsdl.Service) iterator.next();
413 Collection portTypes = service.getPorts().values();
414 for (Iterator iter2 = portTypes.iterator(); iter2.hasNext();) {
415 port = (Port) iter2.next();
416 if (port.getBinding().equals(binding)) {
417 break;
418 }
419 }
420 }
421 return port;
422 }
423
424 private SOAPAddress findSOAPAddress(Port port) {
425
426 SOAPAddress soapAddress = null;
427
428
429
430 Collection extElems = port.getExtensibilityElements();
431 for (Iterator iter = extElems.iterator(); iter.hasNext();) {
432 ExtensibilityElement element = (ExtensibilityElement) iter.next();
433 if (element instanceof SOAPAddress) {
434 soapAddress = (SOAPAddress) element;
435 break;
436 }
437 }
438 return soapAddress;
439 }
440
441 private String findPortLocation(QName portTypeName, BPELProcess process) {
442
443 Binding binding = null;
444 SOAPAddress soapAddress = null;
445 Port port = null;
446
447
448
449 Collection wsdlCollection = process.getPartnerWSDL();
450 mainloop: for (Iterator iter = wsdlCollection.iterator(); iter
451 .hasNext();) {
452 Definition defn = (Definition) iter.next();
453 Collection bindings = defn.getBindings().values();
454 for (Iterator iterator = bindings.iterator(); iterator.hasNext();) {
455 binding = (Binding) iterator.next();
456 if (binding.getPortType().getQName().equals(portTypeName)) {
457 break mainloop;
458 }
459 }
460 }
461
462
463
464 mainloop: for (Iterator iter = wsdlCollection.iterator(); iter
465 .hasNext();) {
466 Definition defn = (Definition) iter.next();
467 Collection services = defn.getServices().values();
468 for (Iterator iterator = services.iterator(); iterator.hasNext();) {
469 javax.wsdl.Service service = (javax.wsdl.Service) iterator
470 .next();
471 Collection portTypes = service.getPorts().values();
472 for (Iterator iter2 = portTypes.iterator(); iter2.hasNext();) {
473 port = (Port) iter2.next();
474 if (port.getBinding().equals(binding)) {
475 break mainloop;
476 }
477 }
478 }
479 }
480
481
482
483 if (port != null) {
484 Collection extElems = port.getExtensibilityElements();
485 for (Iterator iter = extElems.iterator(); iter.hasNext();) {
486 ExtensibilityElement element = (ExtensibilityElement) iter
487 .next();
488 if (element instanceof SOAPAddress) {
489 soapAddress = (SOAPAddress) element;
490 break;
491 }
492 }
493 return soapAddress.getLocationURI();
494 }
495
496
497
498 return null;
499 }
500
501 public void process(Reply reply, ProcessInstance instance) throws Exception {
502
503 log(reply);
504
505 BPELProcess process = instance.getProcess();
506 ProcessContext ctx = instance.getContext();
507
508 Event event = ctx.getEvent(reply);
509 if (event == null) {
510
511 event = new Event(reply);
512
513
514 Variable variable = reply.getVariable();
515 Map parts = ctx.getVariable(variable);
516
517
518 Object result = parts.values().iterator().next();
519 ctx.setResult(result);
520
521 ctx.addEvent(event);
522 }
523 }
524
525 public void process(Empty empty, ProcessInstance instance) {
526
527 log(empty);
528
529 BPELProcess process = instance.getProcess();
530 ProcessContext ctx = instance.getContext();
531
532 Event event = ctx.getEvent(empty);
533 if (event == null) {
534 event = new Event(empty);
535
536
537
538
539
540 ctx.addEvent(event);
541 }
542 }
543
544 /***
545 * Process method for all unimplemented activities.
546 */
547 public void process(Activity activity, ProcessInstance instance) {
548 log(activity);
549 BPELProcess process = instance.getProcess();
550 ProcessContext ctx = instance.getContext();
551 Event event = ctx.getEvent(activity);
552 if (event == null) {
553 event = new Event(activity);
554 ctx.addEvent(event);
555 }
556 }
557
558 //***************************************************/
559
560 //***************************************************/
561
562 public void process(Variables variables, ProcessInstance instance)
563 throws Exception {
564
565 log.info("Processing Variables");
566
567
568 List list = variables.getVariables();
569 for (Iterator iter = list.iterator(); iter.hasNext();) {
570 Variable variable = (Variable) iter.next();
571 variable.accept(this, instance);
572 }
573 }
574
575 public void process(Variable variable, ProcessInstance instance) {
576 log.info("Processing a Variable: " + variable.getName());
577 }
578
579 public void process(PartnerLinks partnerLinks, ProcessInstance instance)
580 throws Exception {
581
582 log.info("Processing PartnerLinks");
583
584
585 List list = partnerLinks.getPartnerLinks();
586 for (Iterator iter = list.iterator(); iter.hasNext();) {
587 PartnerLink partnerLink = (PartnerLink) iter.next();
588 partnerLink.accept(this, instance);
589 }
590 }
591
592 public void process(PartnerLink partnerLink, ProcessInstance instance) {
593 log.info("Processing a PartnerLink: " + partnerLink.getName());
594 }
595
596 //***************************************************/
597
598 //***************************************************/
599
600 public void process(Link link, ProcessInstance instance) {
601 }
602
603 public void process(Compensate impl, ProcessInstance instance) {
604 }
605
606 public void process(Correlation correlation, ProcessInstance instance) {
607 }
608
609 public void process(CorrelationPattern correlationPattern,
610 ProcessInstance instance) {
611 }
612
613 //***************************************************/
614
615 //***************************************************/
616
617 private Receive matchMessage(BexeeMessage message, List receives) {
618
619 return (Receive) receives.get(0);
620 }
621
622 private void log(Activity activity) {
623 if (activity != null) {
624 log.info("Processing " + activity.getClass().getName() + " name: "
625 + activity.getName());
626 } else {
627 log.info("Processing null activity");
628 }
629 }
630
631 public void process(Copy copy, ProcessInstance instance) throws Exception {
632 }
633
634 }