Skip to content

Commit

Permalink
Merge pull request #332 from bobmcnamara/pyvicp
Browse files Browse the repository at this point in the history
Added VICP protocol
  • Loading branch information
MatthieuDartiailh authored Dec 22, 2022
2 parents 685ef22 + 3893b69 commit 5272310
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 4 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install flake8 black isort mypy pytest
pip install git+https://github.com/pyvisa/pyvisa.git#egg=pyvisa
pip install git+https://github.com/pyvisa/pyvisa.git@main
- name: Isort
run: |
isort pyvisa_py -c;
Expand Down
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ Optionally:
- linux-gpib (to interface with gpib instruments, only on linux)
- gpib-ctypes (to interface with GPIB instruments on Windows and Linux)
- psutil (to discover TCPIP devices across multiple interfaces)
- zeroconf (for HiSLIP and VICP devices discovery)
- pyvicp (to enable the Teledyne LeCroy proprietary VICP protocol)


Installation
Expand Down
8 changes: 6 additions & 2 deletions docs/source/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ To discover VXI-11 devices on all network interfaces, please install
`psutil`_. Otherwise, discovery will only occur on the default network
interface.

Discovery of HiSLIP devices relies on `mDNS`_, which is a protocol for
Discovery of both HiSLIP and VICP devices relies on `mDNS`_, which is a protocol for
service discovery in a local area network. To enable resource
discovery for HiSLIP, you should install `zeroconf`_.
discovery for HiSLIP and VICP, you should install `zeroconf`_.

The TCP/IP VICP protocol (proprietary to Teledyne LeCroy) depends on
the `pyvicp`_ package. You should install this package if you need to
use VICP.


Serial resources: ASRL INSTR
Expand Down Expand Up @@ -110,3 +113,4 @@ form GitHub_::
.. _`psutil`: https://pypi.org/project/psutil/
.. _`mDNS`: https://en.wikipedia.org/wiki/Multicast_DNS
.. _`zeroconf`: https://pypi.org/project/zeroconf/
.. _`pyvicp`: https://pypi.org/project/pyvicp/
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ serial = ["pyserial>=3.0"]
usb = ["pyusb"]
psutil = ["psutil"]
hislip-discovery = ["zeroconf"]
vicp = ["pyvicp", "zeroconf"]


[project.urls]
Expand Down
214 changes: 213 additions & 1 deletion pyvisa_py/tcpip.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@
except ImportError:
zeroconf = None # type: ignore

# Let pyvicp be optional dependency
try:
import pyvicp # type: ignore
except ImportError:
pyvicp = None # type: ignore


# Conversion between VXI11 error codes and VISA status
# TODO this is so far a best guess, in particular 6 and 29 are likely wrong
Expand Down Expand Up @@ -769,6 +775,212 @@ def _set_timeout(self, attribute: ResourceAttribute, value: int) -> StatusCode:
return StatusCode.success


@Session.register(constants.InterfaceType.vicp, "INSTR")
class TCPIPInstrVicp(Session):
"""A VICP Session that uses the network standard library to do the low
level communication.
"""

# Override parsed to take into account the fact that this class is only used
# for a specific kind of resource
parsed: rname.VICPInstr

@staticmethod
def list_resources(wait_time=1.0) -> List[str]:
resources = []
services = get_services("_lxi._tcp.local.", wait_time=wait_time)
for host, properties in services.items():
if properties["Manufacturer"].lower().startswith("lecroy"):
resources.append(f"VICP::{host}::INSTR")
return sorted(resources)

def after_parsing(self) -> None:
# TODO: board_number not handled
if pyvicp is None:
raise NotImplementedError(
"VICP requires the pyvicp package to be installed... "
"try 'pip install pyvicp'"
)

host_address = self.parsed.host_address
if "," in host_address:
host_address, port_str = host_address.split(",")
port = int(port_str)
else:
port = 1861

self.interface = pyvicp.Client(
self.parsed.host_address, port, timeout=self.timeout
)

# initialize the constant attributes
for name in ("SEND_END_EN", "TERMCHAR", "TERMCHAR_EN"):
attribute = getattr(constants, "VI_ATTR_" + name)
self.attrs[attribute] = attributes.AttributesByID[attribute].default

self.attrs[ResourceAttribute.dma_allow_enabled] = constants.VI_FALSE
self.attrs[ResourceAttribute.file_append_enabled] = constants.VI_FALSE
self.attrs[ResourceAttribute.interface_instrument_name] = "TCPIP0 (VICP)"
self.attrs[ResourceAttribute.interface_number] = 0
self.attrs[ResourceAttribute.io_prot] = constants.VI_PROT_NORMAL
self.attrs[
ResourceAttribute.read_buffer_operation_mode
] = constants.VI_FLUSH_DISABLE
self.attrs[ResourceAttribute.resource_lock_state] = constants.VI_NO_LOCK
self.attrs[ResourceAttribute.suppress_end_enabled] = constants.VI_FALSE
self.attrs[ResourceAttribute.tcpip_address] = self.parsed.host_address
self.attrs[ResourceAttribute.tcpip_hostname] = self.parsed.host_address
self.attrs[ResourceAttribute.tcpip_is_hislip] = constants.VI_FALSE
self.attrs[ResourceAttribute.tcpip_nodelay] = constants.VI_TRUE
self.attrs[ResourceAttribute.tcpip_port] = port
self.attrs[
ResourceAttribute.write_buffer_operation_mode
] = constants.VI_FLUSH_WHEN_FULL

# configure the variable attributes
self.attrs[ResourceAttribute.tcpip_keepalive] = (
self.get_keepalive,
self.set_keepalive,
)

# TODO: additional attributes (someday)
# self.attrs[ResourceAttribute.manufacturer_id] = 16711
# self.attrs[ResourceAttribute.max_queue_length] = 50
# self.attrs[ResourceAttribute.read_buffer_size] = 4096
# self.attrs[ResourceAttribute.resource_impl_version] = 0x0050_0c01
# self.attrs[ResourceAttribute.resource_manufacturer_id] = 4015
# self.attrs[ResourceAttribute.resource_manufacturer_name] = 'Rohde & Schwarz GmbH'
# self.attrs[ResourceAttribute.resource_spec_version] = 0x0050_0800
# self.attrs[ResourceAttribute.user_data] = 0
# self.attrs[ResourceAttribute.write_buffer_size] = 4096

def get_keepalive(self, attribute: ResourceAttribute) -> Tuple[bool, StatusCode]:
"""Is TCP keepalive enabled for the resource."""
return self.interface.keepalive, StatusCode.success

def set_keepalive(
self, attribute: ResourceAttribute, keepalive: bool
) -> StatusCode:
"""Turns TCP keepalive on/off for this connection."""
self.interface.keepalive = keepalive
return StatusCode.success

def close(self) -> StatusCode:
self.interface.close()
self.interface = None
return StatusCode.success

def read(self, count: int) -> Tuple[bytes, StatusCode]:
"""Reads data from device or interface synchronously.
Corresponds to viRead function of the VISA library.
Parameters
-----------
count : int
Number of bytes to be read.
Returns
-------
bytes
Data read from the device
StatusCode
Return value of the library call.
"""
try:
data = self.interface.receive(count)
except socket.timeout:
return b"", StatusCode.error_timeout

if len(data) >= count:
return data, StatusCode.success_max_count_read
else:
return data, StatusCode.success_termination_character_read

def write(self, data: bytes) -> Tuple[int, StatusCode]:
"""Writes data to device or interface synchronously.
Corresponds to viWrite function of the VISA library.
Parameters
----------
data : bytes
Data to be written.
Returns
-------
int
Number of bytes actually transferred
StatusCode
Return value of the library call.
"""
self.interface.send(data)

return len(data), StatusCode.success

def clear(self) -> StatusCode:
"""Clears a device.
Corresponds to viClear function of the VISA library.
"""
self.interface.device_clear()

return StatusCode.success

def _set_timeout(self, attribute: ResourceAttribute, value: int) -> StatusCode:

status = super()._set_timeout(attribute, value)
if hasattr(self.interface, "timeout"):
self.interface.timeout = 1e-3 * value

return status

def _get_attribute(self, attribute: ResourceAttribute) -> Tuple[Any, StatusCode]:
"""Get the value for a given VISA attribute for this session.
Use to implement custom logic for attributes.
Parameters
----------
attribute : ResourceAttribute
Attribute for which the state query is made
Returns
-------
Any
State of the queried attribute for a specified resource
StatusCode
Return value of the library call.
"""
raise UnknownAttribute(attribute)

def _set_attribute(
self, attribute: ResourceAttribute, attribute_state: Any
) -> StatusCode:
"""Sets the state of an attribute.
Corresponds to viSetAttribute function of the VISA library.
Parameters
----------
attribute : constants.ResourceAttribute
Attribute for which the state is to be modified. (Attributes.*)
attribute_state : Any
The state of the attribute to be set for the specified object.
Returns
-------
StatusCode
Return value of the library call.
"""
raise UnknownAttribute(attribute)


@Session.register(constants.InterfaceType.tcpip, "SOCKET")
class TCPIPSocketSession(Session):
"""A TCPIP Session that uses the network standard library to do the low
Expand Down Expand Up @@ -1112,7 +1324,7 @@ def _set_attribute(
def get_services(service_type, wait_time=0.1):
if zeroconf is None:
raise NotImplementedError(
"HiSLIP instrument discovery requires the zeroconf package to be installed... "
"Service discovery requires the zeroconf package to be installed... "
"try 'pip install zeroconf'"
)

Expand Down

0 comments on commit 5272310

Please sign in to comment.