Skip to content

How should client connect to a channel. #4

Open
@mrkraimer

Description

@mrkraimer

channelConnect

This issue discusses how pvaClient (both C++ and Java) should connect to a channel.
In particular how to manage locking.

Connection involves the following methods.

  • channelCreated - pvAccess callback
  • channelStateChange - pvAccess callback
  • issueConnect - pvaClient method
  • waitConnect - pvaClient method

issueConnect, via a channelProvider, makes a call to createChannel.
As a result of this call channelCreated is always called and channelStateChange may be called.
The channel may or may not already be connected when channelCreated is called.

Sometime after issueConnect returns waitConnect is called.
The connection may already be complete but if not then pvaClient must wait until channelCreated or channelStateChange
is called and either shows success or failure.

Both pvAccess and pvaClient have internal variables that must be protected by a locking mechanism.

I think then logic is similar for both the Java and C++ implementations of pvaClient except for waitConnect.

The Java implementation is:

public Status waitConnect(double timeout)
{
    if(isDestroyed) throw new RuntimeException("pvaClientChannel was destroyed");
    if(channel.isConnected()) return channelConnectStatus;
    lock.lock();
    try {
        long nano = (long)(timeout*1e9);
        waitForConnect.awaitNanos(nano);
    } catch(InterruptedException e) {
        Status status = statusCreate.createStatus(StatusType.ERROR,e.getMessage(), e.fillInStackTrace());
        return status;
    } finally {
        lock.unlock();
    }
    return channelConnectStatus;
}

NOTES

  • channel.isConnected() is called without locking. If called with lock held then deadly embrace.
  • channel.isConnected is synchronized method in pvAccessJava
  • Looks like extra wait can happen if connection callback occcurs after isConnected and lock.lock

The C++ implementation is:

Status PvaClientChannel::waitConnect(double timeout)
{
    {
        Lock xx(mutex);
        if(isDestroyed) throw std::runtime_error("pvaClientChannel was destroyed");
        if(channel->isConnected()) return Status::Ok;
    }
    waitForConnect.wait(timeout);
    return channelConnectStatus;
}

NOTES:

  • channel->isConnected()) is called with lock held
  • waitForConnect.wait(timeout) is called with no lock held
  • Looks like extra wait can happen if connection callback occcurs after mutex is unlocked and waitForConnect

NOTE: locking in Java is NOT like locking in C++.

SO WHAT TO DO?

Matej,

Any ideas?

The following are the Java and C++ implemetation of the methods discussed above.

pvaClientJava

public void channelCreated(Status status, Channel channel) {
    lock.lock();
    try {
        if(isDestroyed) throw new RuntimeException("pvaClientChannel was destroyed");
        if(status.isOK()) {
            this.channel = channel;
            if(channel.isConnected()) {
                boolean waitingForConnect = false;
                if(connectState==ConnectState.connectActive) waitingForConnect = true;
                connectState = ConnectState.connected;
                channelConnectStatus = statusCreate.getStatusOK();
                if(waitingForConnect) waitForConnect.signal();
            }
            return;
        }
    } finally {
        lock.unlock();
    }
    System.err.println("PvaClientChannel::channelCreated status "
            + status.getMessage() + "why??");
}

public void  (Channel channel,ConnectionState connectionState) {
    lock.lock();
    try {
        if(isDestroyed) return;
        boolean waitingForConnect = false;
        if(connectState==ConnectState.connectActive) waitingForConnect = true;
        if(connectionState!=ConnectionState.CONNECTED) {
            String message = channelName + " connection state " + connectionState.name();
            message(message,MessageType.error);
            channelConnectStatus = statusCreate.createStatus(StatusType.ERROR,message,null);
            connectState = ConnectState.notConnected;
        } else {
            connectState = ConnectState.connected;
            channelConnectStatus = statusCreate.getStatusOK();
        }   
        if(waitingForConnect) waitForConnect.signal();
    } finally {
        lock.unlock();
    }
}

public void issueConnect()
{
    lock.lock();
    try {
        if(isDestroyed) throw new RuntimeException("pvaClientChannel was destroyed");
        if(connectState==ConnectState.connected) return;
        if(connectState!=ConnectState.connectIdle) {
            throw new RuntimeException("pvaClientChannel already connected");
        }
        channelConnectStatus = statusCreate.createStatus(
                StatusType.ERROR,
                getChannelName() + " createChannel failed",null);
        connectState = ConnectState.connectActive;  
    } finally {
        lock.unlock();
    }
    ChannelProvider provider = ChannelProviderRegistryFactory
            .getChannelProviderRegistry().getProvider(providerName);
    if(provider==null) {
        String mess = getChannelName() + " provider " + providerName + " not registered";
        throw new RuntimeException(mess);
    }
    channel = provider.createChannel(channelName, this, ChannelProvider.PRIORITY_DEFAULT);
    if(channel==null) {
        String mess = getChannelName() + " channelCreate failed ";
        throw new RuntimeException(mess);
    }
}


public Status waitConnect(double timeout)
{
    if(isDestroyed) throw new RuntimeException("pvaClientChannel was destroyed");
    if(channel.isConnected()) return channelConnectStatus;
    lock.lock();
    try {
        long nano = (long)(timeout*1e9);
        waitForConnect.awaitNanos(nano);
    } catch(InterruptedException e) {
        Status status = statusCreate.createStatus(StatusType.ERROR,e.getMessage(), e.fillInStackTrace());
        return status;
    } finally {
        lock.unlock();
    }
    return channelConnectStatus;
}

pvaClientCPP

void PvaClientChannel::channelCreated(const Status& status, Channel::shared_pointer const & channel)
{
    Lock xx(mutex);
    if(isDestroyed) throw std::runtime_error("pvaClientChannel was destroyed");
    if(status.isOK()) {
        this->channel = channel;
        if(channel->isConnected()) {
             bool waitingForConnect = false;
             if(connectState==connectActive) waitingForConnect = true;
             connectState = connected;
             channelConnectStatus = Status::Ok;
             if(waitingForConnect) waitForConnect.signal();
        }
        return;
    }
    cout << "PvaClientChannel::channelCreated status " << status.getMessage() << " why??\n";
}

void PvaClientChannel::channelStateChange(
    Channel::shared_pointer const & channel,
    Channel::ConnectionState connectionState)
{
    Lock xx(mutex);
    if(isDestroyed) return;
    bool waitingForConnect = false;
    if(connectState==connectActive) waitingForConnect = true;
    if(connectionState!=Channel::CONNECTED) {
         string mess(channelName +
             " connection state " + Channel::ConnectionStateNames[connectionState]);
         message(mess,errorMessage);
         channelConnectStatus = Status(Status::STATUSTYPE_ERROR,mess);
         connectState = notConnected;
    } else {
         connectState = connected;
         channelConnectStatus = Status::Ok;
    }
    if(waitingForConnect) waitForConnect.signal();
}

void PvaClientChannel::issueConnect()
{
    {
        Lock xx(mutex);
        if(isDestroyed) throw std::runtime_error("pvaClientChannel was destroyed");
        if(connectState!=connectIdle) {
           throw std::runtime_error("pvaClientChannel already connected");
        }

        channelConnectStatus = Status(
             Status::STATUSTYPE_ERROR,
             getChannelName() + " createChannel failed");
        connectState = connectActive;
    }
    ChannelProviderRegistry::shared_pointer reg = getChannelProviderRegistry();
    ChannelProvider::shared_pointer provider = reg->getProvider(providerName);
    if(!provider) {
        throw std::runtime_error(getChannelName() + " provider " + providerName + " not registered");
    }
    channelRequester = ChannelRequester::shared_pointer(new ChannelRequesterImpl(this));
    channel = provider->createChannel(channelName,channelRequester,ChannelProvider::PRIORITY_DEFAULT);
    if(!channel) {
         throw std::runtime_error(getChannelName() + " channelCreate failed ");
    }
}

Status PvaClientChannel::waitConnect(double timeout)
{
    {
        Lock xx(mutex);
        if(isDestroyed) throw std::runtime_error("pvaClientChannel was destroyed");
        if(channel->isConnected()) return Status::Ok;
    }
    waitForConnect.wait(timeout);
    return channelConnectStatus;
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions