From VisualWorks®, 7.6 of March 3, 2008 on May 12, 2008 at 2:58:30 pm Opentalk ServerTimeout Patch package postLoadBlock '[:package | Opentalk.StreamTransport removeSelector: #doStop ]' ServerTimeoutTransport Opentalk Opentalk.TCPTransport false none timeoutTime Opentalk ServerTimeout Patch Opentalk.ServerTimeoutTransport Instance Variables: timeoutTime <Integer> when is the connection about to time out ServerTimeoutAdaptorConfiguration Opentalk Opentalk.ConnectionAdaptorConfiguration false none timeoutTolerance Opentalk ServerTimeout Patch Opentalk.ServerTimeoutAdaptorConfiguration Instance Variables: timeoutTolerance <Integer> allowed tolerance to expected connection timeout (ms) OtETimeoutToleranceExceeded Opentalk Opentalk.OtECommunicationFailure false none Opentalk-Core Opentalk ServerTimeout Patch ServerTimeoutAdaptor Opentalk Opentalk.ConnectionAdaptor false none Opentalk ServerTimeout Patch Opentalk.ServerTimeoutAdaptor Instance Variables: ServerTimeoutTransportConfiguration Opentalk Opentalk.TCPTransportConfiguration false none Opentalk ServerTimeout Patch OtEConnectionStatusConflict Opentalk Opentalk.OtECommunicationFailure false none Opentalk-Core Opentalk ServerTimeout Patch ServerTimeoutPingToken Opentalk false false New Share #OT_UNIQUE_PING_VALUE_DO_NOT_ECHO_THIS Opentalk ServerTimeout Patch Opentalk.ServerTimeoutTransport accessing timeoutTime ^timeoutTime timeoutTime: anObject timeoutTime := anObject Opentalk.ServerTimeoutTransport events-transport activity receivingPackage: aTransportPackage super receivingPackage: aTransportPackage. timeoutTime := self timeout + Time millisecondClockValue. #{DebuggingCollector} ifDefinedDo: [ :log | log updatingTimeoutTime: timeoutTime in: self]. Opentalk.ServerTimeoutAdaptor initialize-release initialize super initialize. Opentalk.ServerTimeoutAdaptor accessing timeoutTolerance ^configuration timeoutTolerance Opentalk.ServerTimeoutAdaptor private transportFor: request "Return nil if the available connection is about to timeout." | address selectedTransport aboutToTimeout | address := request destination. aboutToTimeout := false. selectedTransport := self clientConnections at: address ifAbsentPut: [ self newClientConnectionTo: address ] do: [ :transport | (((request message selector == #echo:) and: [request message arguments first == ServerTimeoutPingToken ]) or: [ | difference | transport timeoutTime ifNil: [ transport timeoutTime: transport timeout + Time millisecondClockValue ]. difference := (transport timeoutTime - Time millisecondClockValue) abs. aboutToTimeout := difference <= self timeoutTolerance. #{DebuggingCollector} ifDefinedDo: [ :log | log aboutToTimeout: aboutToTimeout in: transport ]. aboutToTimeout not ]) ifTrue: [ "We need to register the request with the transport atomically, so that the higher priority server process doesn't time-out the transport in the meantime. See AR#50767" transport registerRequest: request ] ]. ^aboutToTimeout ifTrue: [ nil ] ifFalse: [ selectedTransport ] Opentalk.ServerTimeoutAdaptor RMI-API sendRequest: aRequest | transport | self isRunning ifFalse: [ OtECommunicationFailure new messageText: 'Must be running to send remote messages!'; parameter: aRequest; raise ]. aRequest interceptorDispatcher: self processingPolicy provideInterceptorDispatcher. transport := (self transportFor: aRequest) ifNil: [ | remoteBroker | remoteBroker := self orb activeBrokerAt: aRequest target accessPoint. [ remoteBroker echo: ServerTimeoutPingToken ] on: OtEServerError, OtEConnectionStatusConflict do: [ :ex2 | #{DebuggingCollector} ifDefinedDo: [ :log | log sendingSecondPing: self]. remoteBroker echo: ServerTimeoutPingToken ]. self transportFor: aRequest ] . transport ifNil: [ OtECommunicationFailure raiseSignal: 'This really should not happen.' ]. ^aRequest sendAndWaitForReply: transport Opentalk.ServerTimeoutTransportConfiguration accessing componentClass ^ServerTimeoutTransport Opentalk.ServerTimeoutAdaptorConfiguration class constants defaultTimeoutTolerance ^50 Opentalk.ServerTimeoutAdaptorConfiguration accessing timeoutTolerance: anObject timeoutTolerance := anObject componentClass ^ServerTimeoutAdaptor timeoutTolerance ^timeoutTolerance isNil ifTrue:[timeoutTolerance := self class defaultTimeoutTolerance ] ifFalse:[ timeoutTolerance] Opentalk.Transport protocol-API stop "Stop of the protocol FSM." ( self isStopped ) ifTrue: [ ^self ]. self preStop. sendLock critical: [ self doStop ]. self postStop Opentalk.StreamTransport events preStop self manager connectionStopped: self. super preStop. Opentalk.TCPTransport private sendTransportPackage: aTransportPackage super sendTransportPackage: aTransportPackage. [ | writeSize pkgSize | pkgSize := aTransportPackage position. writeSize := 0. self testForValidSocket. [ writeSize < pkgSize and: [(socket writeWaitWithTimeoutMs: timeout) not ] ] whileTrue: [ writeSize := writeSize + (socket writeFrom: aTransportPackage buffer startingAt: 1 + writeSize for: pkgSize - writeSize) ]. writeSize = pkgSize ifFalse: [ OtECommunicationFailure raiseSignal: ( (#x1sSocketWriteFailed2s3s << #opentalk >> '<1s> Socket write failed. Wanted <2s> and got <3s>') expandMacrosWith: self printString with: pkgSize printString with: writeSize printString) ] ] on: OsError do: [:ex | "This is likely a connection failure so let's skip connection closing handshake, especially because the handshake itself needs this part working smoothly as well (See AR 42265 for more details)" self doStop. ex resignalAs: ( OtECommunicationFailure new messageText: ( (#x1sFailed2s << #opentalk >> '<1s> failed - <2s>') expandMacrosWith: self printString with: ex printString); yourself) ] Opentalk.TCPTransport private testing testForValidSocket self isRunning ifFalse: [ #{DebuggingCollector} ifDefinedDo: [ :log | log OtEConnectionStatusConflict: self]. OtEConnectionStatusConflict raise] Opentalk.BasicRequestBroker class instance creation newSRBStstTcpAt: anIPSocketAddress ^(StandardBrokerConfiguration new adaptor: (ServerTimeoutAdaptorConfiguration new requestDispatcher: RequestDispatcherConfiguration standard; transport: (ServerTimeoutTransportConfiguration new marshaler: STSTMarshalerConfiguration new ))) newAt: anIPSocketAddress newSRBStstTcpAtPort: aNumber ^(StandardBrokerConfiguration new adaptor: (ServerTimeoutAdaptorConfiguration new requestDispatcher: RequestDispatcherConfiguration standard; transport: (ServerTimeoutTransportConfiguration new marshaler: STSTMarshalerConfiguration new ))) newAtPort: aNumber Opentalk.ServerTimeoutPingToken