Description
channelConnect
This issue discusses how pvaClient (both C++ and Java) should connect to a channel.
In particular how to manage locking.
Connection involves the following methods.
- channelCreated - pvAccess callback
- channelStateChange - pvAccess callback
- issueConnect - pvaClient method
- waitConnect - pvaClient method
issueConnect, via a channelProvider, makes a call to createChannel.
As a result of this call channelCreated is always called and channelStateChange may be called.
The channel may or may not already be connected when channelCreated is called.
Sometime after issueConnect returns waitConnect is called.
The connection may already be complete but if not then pvaClient must wait until channelCreated or channelStateChange
is called and either shows success or failure.
Both pvAccess and pvaClient have internal variables that must be protected by a locking mechanism.
I think then logic is similar for both the Java and C++ implementations of pvaClient except for waitConnect.
The Java implementation is:
public Status waitConnect(double timeout)
{
if(isDestroyed) throw new RuntimeException("pvaClientChannel was destroyed");
if(channel.isConnected()) return channelConnectStatus;
lock.lock();
try {
long nano = (long)(timeout*1e9);
waitForConnect.awaitNanos(nano);
} catch(InterruptedException e) {
Status status = statusCreate.createStatus(StatusType.ERROR,e.getMessage(), e.fillInStackTrace());
return status;
} finally {
lock.unlock();
}
return channelConnectStatus;
}
NOTES
- channel.isConnected() is called without locking. If called with lock held then deadly embrace.
- channel.isConnected is synchronized method in pvAccessJava
- Looks like extra wait can happen if connection callback occcurs after isConnected and lock.lock
The C++ implementation is:
Status PvaClientChannel::waitConnect(double timeout)
{
{
Lock xx(mutex);
if(isDestroyed) throw std::runtime_error("pvaClientChannel was destroyed");
if(channel->isConnected()) return Status::Ok;
}
waitForConnect.wait(timeout);
return channelConnectStatus;
}
NOTES:
- channel->isConnected()) is called with lock held
- waitForConnect.wait(timeout) is called with no lock held
- Looks like extra wait can happen if connection callback occcurs after mutex is unlocked and waitForConnect
NOTE: locking in Java is NOT like locking in C++.
SO WHAT TO DO?
Matej,
Any ideas?
The following are the Java and C++ implemetation of the methods discussed above.
pvaClientJava
public void channelCreated(Status status, Channel channel) {
lock.lock();
try {
if(isDestroyed) throw new RuntimeException("pvaClientChannel was destroyed");
if(status.isOK()) {
this.channel = channel;
if(channel.isConnected()) {
boolean waitingForConnect = false;
if(connectState==ConnectState.connectActive) waitingForConnect = true;
connectState = ConnectState.connected;
channelConnectStatus = statusCreate.getStatusOK();
if(waitingForConnect) waitForConnect.signal();
}
return;
}
} finally {
lock.unlock();
}
System.err.println("PvaClientChannel::channelCreated status "
+ status.getMessage() + "why??");
}
public void (Channel channel,ConnectionState connectionState) {
lock.lock();
try {
if(isDestroyed) return;
boolean waitingForConnect = false;
if(connectState==ConnectState.connectActive) waitingForConnect = true;
if(connectionState!=ConnectionState.CONNECTED) {
String message = channelName + " connection state " + connectionState.name();
message(message,MessageType.error);
channelConnectStatus = statusCreate.createStatus(StatusType.ERROR,message,null);
connectState = ConnectState.notConnected;
} else {
connectState = ConnectState.connected;
channelConnectStatus = statusCreate.getStatusOK();
}
if(waitingForConnect) waitForConnect.signal();
} finally {
lock.unlock();
}
}
public void issueConnect()
{
lock.lock();
try {
if(isDestroyed) throw new RuntimeException("pvaClientChannel was destroyed");
if(connectState==ConnectState.connected) return;
if(connectState!=ConnectState.connectIdle) {
throw new RuntimeException("pvaClientChannel already connected");
}
channelConnectStatus = statusCreate.createStatus(
StatusType.ERROR,
getChannelName() + " createChannel failed",null);
connectState = ConnectState.connectActive;
} finally {
lock.unlock();
}
ChannelProvider provider = ChannelProviderRegistryFactory
.getChannelProviderRegistry().getProvider(providerName);
if(provider==null) {
String mess = getChannelName() + " provider " + providerName + " not registered";
throw new RuntimeException(mess);
}
channel = provider.createChannel(channelName, this, ChannelProvider.PRIORITY_DEFAULT);
if(channel==null) {
String mess = getChannelName() + " channelCreate failed ";
throw new RuntimeException(mess);
}
}
public Status waitConnect(double timeout)
{
if(isDestroyed) throw new RuntimeException("pvaClientChannel was destroyed");
if(channel.isConnected()) return channelConnectStatus;
lock.lock();
try {
long nano = (long)(timeout*1e9);
waitForConnect.awaitNanos(nano);
} catch(InterruptedException e) {
Status status = statusCreate.createStatus(StatusType.ERROR,e.getMessage(), e.fillInStackTrace());
return status;
} finally {
lock.unlock();
}
return channelConnectStatus;
}
pvaClientCPP
void PvaClientChannel::channelCreated(const Status& status, Channel::shared_pointer const & channel)
{
Lock xx(mutex);
if(isDestroyed) throw std::runtime_error("pvaClientChannel was destroyed");
if(status.isOK()) {
this->channel = channel;
if(channel->isConnected()) {
bool waitingForConnect = false;
if(connectState==connectActive) waitingForConnect = true;
connectState = connected;
channelConnectStatus = Status::Ok;
if(waitingForConnect) waitForConnect.signal();
}
return;
}
cout << "PvaClientChannel::channelCreated status " << status.getMessage() << " why??\n";
}
void PvaClientChannel::channelStateChange(
Channel::shared_pointer const & channel,
Channel::ConnectionState connectionState)
{
Lock xx(mutex);
if(isDestroyed) return;
bool waitingForConnect = false;
if(connectState==connectActive) waitingForConnect = true;
if(connectionState!=Channel::CONNECTED) {
string mess(channelName +
" connection state " + Channel::ConnectionStateNames[connectionState]);
message(mess,errorMessage);
channelConnectStatus = Status(Status::STATUSTYPE_ERROR,mess);
connectState = notConnected;
} else {
connectState = connected;
channelConnectStatus = Status::Ok;
}
if(waitingForConnect) waitForConnect.signal();
}
void PvaClientChannel::issueConnect()
{
{
Lock xx(mutex);
if(isDestroyed) throw std::runtime_error("pvaClientChannel was destroyed");
if(connectState!=connectIdle) {
throw std::runtime_error("pvaClientChannel already connected");
}
channelConnectStatus = Status(
Status::STATUSTYPE_ERROR,
getChannelName() + " createChannel failed");
connectState = connectActive;
}
ChannelProviderRegistry::shared_pointer reg = getChannelProviderRegistry();
ChannelProvider::shared_pointer provider = reg->getProvider(providerName);
if(!provider) {
throw std::runtime_error(getChannelName() + " provider " + providerName + " not registered");
}
channelRequester = ChannelRequester::shared_pointer(new ChannelRequesterImpl(this));
channel = provider->createChannel(channelName,channelRequester,ChannelProvider::PRIORITY_DEFAULT);
if(!channel) {
throw std::runtime_error(getChannelName() + " channelCreate failed ");
}
}
Status PvaClientChannel::waitConnect(double timeout)
{
{
Lock xx(mutex);
if(isDestroyed) throw std::runtime_error("pvaClientChannel was destroyed");
if(channel->isConnected()) return Status::Ok;
}
waitForConnect.wait(timeout);
return channelConnectStatus;
}