--- JacORB-2.3.0.orig/src/org/jacorb/orb/Delegate.java 2007-02-15 13:56:06.000000000 +0100 +++ JacORB-2.3.0/src/org/jacorb/orb/Delegate.java 2007-09-10 17:37:31.000000000 +0200 @@ -49,6 +49,16 @@ import org.omg.PortableServer.Servant; import org.omg.PortableServer.ServantActivator; +import edu.emory.mathcs.backport.java.util.concurrent.Callable; +import edu.emory.mathcs.backport.java.util.concurrent.ExecutionException; +import edu.emory.mathcs.backport.java.util.concurrent.Executor; +import edu.emory.mathcs.backport.java.util.concurrent.FutureTask; +import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; +import edu.emory.mathcs.backport.java.util.concurrent.TimeoutException; +import edu.emory.mathcs.backport.java.util.concurrent.locks.Condition; +import edu.emory.mathcs.backport.java.util.concurrent.locks.Lock; +import edu.emory.mathcs.backport.java.util.concurrent.locks.ReentrantLock; + /** * JacORB implementation of CORBA object reference * @@ -91,7 +101,8 @@ private final Set pending_replies = new HashSet(); private final Barrier pending_replies_sync = new Barrier(); - private final java.lang.Object bind_sync = new java.lang.Object(); + private final Lock bind_sync = new ReentrantLock(); + private final Condition isConnected = bind_sync.newCondition(); private boolean locate_on_bind_performed = false; @@ -119,7 +130,9 @@ private boolean useIMR; private boolean locateOnBind; - /** + ThreadPerTaskExecutor executor = null; + + /** * 03-09-04: 1.5.2.2 * * boolean threadlocal to ensure that @@ -159,6 +172,7 @@ super(); this.orb = orb; + this.executor = new ThreadPerTaskExecutor(); } public Delegate ( org.jacorb.orb.ORB orb, ParsedIOR pior ) @@ -284,7 +298,9 @@ */ private void bind(boolean rebind) { - synchronized (bind_sync) + bind_sync.lock(); + + try { if ( bound ) { @@ -351,6 +367,7 @@ case LocateStatusType_1_2._OBJECT_HERE : { + isConnected.signalAll(); break; } @@ -403,9 +420,12 @@ } } - + + } + finally + { //wake up threads waiting for the pior - bind_sync.notifyAll(); + bind_sync.unlock(); } } @@ -425,7 +445,8 @@ public void rebind(ParsedIOR ior) { - synchronized ( bind_sync ) + bind_sync.lock(); + try { // Do the ParsedIORs currently match. final ParsedIOR originalIOR = getParsedIOR(); @@ -474,6 +495,10 @@ bind(); } + finally + { + bind_sync.unlock(); + } } public org.omg.CORBA.Request create_request( org.omg.CORBA.Object self, @@ -792,16 +817,23 @@ ClientConnection getConnection() { - synchronized ( bind_sync ) + bind_sync.lock(); + try { bind(); return connection; } + finally + { + bind_sync.unlock(); + } } public org.omg.IOP.IOR getIOR() { - synchronized ( bind_sync ) + bind_sync.lock(); + + try { if ( piorOriginal != null ) { @@ -809,37 +841,52 @@ } return getParsedIOR().getIOR(); } + finally + { + bind_sync.unlock(); + } } public byte[] getObjectId() { - synchronized ( bind_sync ) + bind_sync.lock(); + try { bind(); return POAUtil.extractOID( getParsedIOR().get_object_key() ); } + finally + { + bind_sync.unlock(); + } } public byte[] getObjectKey() { - synchronized ( bind_sync ) + bind_sync.lock(); + try { bind(); return getParsedIOR().get_object_key(); } + finally + { + bind_sync.unlock(); + } } public ParsedIOR getParsedIOR() { - synchronized ( bind_sync ) + bind_sync.lock(); + try { while ( _pior == null ) { try { - bind_sync.wait(); + isConnected.await(); } catch ( InterruptedException ie ) { @@ -849,6 +896,10 @@ return _pior; } + finally + { + bind_sync.unlock(); + } } public void resolvePOA (org.omg.CORBA.Object self) @@ -993,7 +1044,6 @@ interceptors.handle_send_request(); } - try { if ( !ros.response_expected() ) // oneway op @@ -1015,24 +1065,37 @@ pending_replies.add(receiver); } - synchronized (bind_sync) - { - if (ros.getConnection() == connection) - { - // RequestOutputStream has been created for - // exactly this connection - connection.sendRequest(ros, receiver, ros.requestId(), true); // response - // expected - } - else - { - logger.debug("invoke: RemarshalException"); - - // RequestOutputStream has been created for - // another connection, so try again - throw new RemarshalException(); - } - } + // send the request by a background thread + // + + SendJob sendJob = new SendJob(ros, receiver); + FutureTask task = new FutureTask(sendJob); + + executor.execute(task); + + try + { + task.get(getTimeLeft(ros), TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + throw new TRANSIENT("interrupted while sending the request"); + } + catch (ExecutionException e) + { + if(e.getCause() instanceof SystemException) + { + throw (SystemException)e.getCause(); + } + else + { + throw new RuntimeException(e); + } + } + catch (TimeoutException e) + { + throw new TIMEOUT(e.getMessage()); + } } catch ( org.omg.CORBA.SystemException cfe ) { @@ -1050,7 +1113,7 @@ } interceptors.handle_receive_exception ( cfe ); - + // The exception is a TRANSIENT, so try rebinding. if ( cfe instanceof org.omg.CORBA.TRANSIENT && try_rebind() ) { @@ -1058,8 +1121,8 @@ } throw cfe; - } - finally + } + finally { if (orb.hasRequestInterceptors()) { @@ -1081,6 +1144,51 @@ return null; } + private long getTimeLeft(RequestOutputStream ros2) + { + UtcT timo = ros2.getReplyEndTime(); + long timoMillies = java.lang.Long.MAX_VALUE; + + if(timo != null) + { + timoMillies = org.jacorb.util.Time.millisTo(timo); + } + return(timoMillies); + } + + public void doSend(RequestOutputStream ros, ReplyReceiver receiver) throws RemarshalException, InterruptedException + { + long timo = getTimeLeft(ros); + + if( bind_sync.tryLock(timo, TimeUnit.MILLISECONDS) == false) + { + throw new TIMEOUT("TIMO while doSend "); + } + + try + { + if (ros.getConnection() == connection) + { + // RequestOutputStream has been created for + // exactly this connection + connection.sendRequest(ros, receiver, ros.requestId(), true); // response + // expected + } + else + { + logger.debug("invoke: RemarshalException"); + + // RequestOutputStream has been created for + // another connection, so try again + throw new RemarshalException(); + } + } + finally + { + bind_sync.unlock(); + } + } + private void invoke_oneway (RequestOutputStream ros, ClientInterceptorHandler interceptors) throws RemarshalException, ApplicationException @@ -1132,7 +1240,8 @@ private boolean try_rebind() { - synchronized ( bind_sync ) + bind_sync.lock(); + try { if( logger.isDebugEnabled()) { @@ -1227,6 +1336,10 @@ return false; } } + finally + { + bind_sync.unlock(); + } } public void invokeInterceptors( ClientRequestInfoImpl info, short op ) @@ -1601,7 +1714,8 @@ */ public void release( org.omg.CORBA.Object self ) { - synchronized ( bind_sync ) + bind_sync.lock(); + try { if (!bound) { @@ -1625,6 +1739,10 @@ logger.debug("Delegate released!"); } } + finally + { + bind_sync.unlock(); + } } /** @@ -1652,7 +1770,8 @@ { orb.perform_work(); - synchronized ( bind_sync ) + bind_sync.lock(); + try { bind(); return new org.jacorb.orb.dii.Request( self, @@ -1661,6 +1780,10 @@ getParsedIOR().get_object_key(), operation ); } + finally + { + bind_sync.unlock(); + } } /** @@ -1694,19 +1817,38 @@ UtcT replyEndTime = getReplyEndTime(); long roundtripTimeout = getRelativeRoundtripTimeout(); + long timo = Long.MAX_VALUE; if ((roundtripTimeout != 0) || (replyEndTime != null)) { replyEndTime = Time.earliest(Time.corbaFuture (roundtripTimeout), replyEndTime); + if(replyEndTime != null) + { + timo = Time.millisTo(replyEndTime); + } + if (Time.hasPassed(replyEndTime)) { throw new TIMEOUT("Reply End Time exceeded prior to invocation", 0, CompletionStatus.COMPLETED_NO); } } - - synchronized ( bind_sync ) + + try + { + if( bind_sync.tryLock(timo, TimeUnit.MILLISECONDS) == false) + { + throw new TIMEOUT("Reply End Time exceeded prior to invocation", + 0, CompletionStatus.COMPLETED_NO); + } + } + catch (InterruptedException e) + { + throw new TRANSIENT("interrupted while request generation"); + } + + try { bind(); @@ -1744,6 +1886,10 @@ return out; } + finally + { + bind_sync.unlock(); + } } /** @@ -1943,7 +2089,8 @@ public String toString() { - synchronized ( bind_sync ) + bind_sync.lock(); + try { if ( piorOriginal != null ) { @@ -1951,6 +2098,10 @@ } return getParsedIOR().getIORString(); } + finally + { + bind_sync.unlock(); + } } public String toString( org.omg.CORBA.Object self ) @@ -2057,4 +2208,41 @@ this.notifyAll(); } } + + private class ThreadPerTaskExecutor implements Executor + { + ThreadGroup group; + + ThreadPerTaskExecutor() + { + group = new ThreadGroup("Job Runner"); + group.setDaemon(true); + } + + public void execute(Runnable r) + { + new Thread(group, r).start(); + } + } + + // a SendJob is a helper class used to send requests in background + // + private class SendJob implements Callable + { + private RequestOutputStream ros; + private ReplyReceiver receiver; + + SendJob(RequestOutputStream ros, ReplyReceiver receiver) + { + this.ros = ros; + this.receiver = receiver; + } + + public java.lang.Object call() throws Exception + { + doSend(ros, receiver); + + return(null); + } + } }