Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed shdmem bugs and handled fd streaming #85

Merged
merged 3 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ add_example(xlink_server2 xlink_server2.cpp)
add_example(device_connect_reset device_connect_reset.cpp)

# Local shared memory example
if (UNIX)
if (UNIX AND !APPLE)
add_example(xlink_server_local xlink_server_local.cpp)
add_example(xlink_client_local xlink_client_local.cpp)
endif (UNIX)
endif (UNIX AND !APPLE)

37 changes: 19 additions & 18 deletions examples/xlink_client_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,25 @@ int main(int argc, const char** argv){

streamPacketDesc_t *packet;

auto s = XLinkOpenStream(0, "test", 1024);
auto s = XLinkOpenStream(0, "test", 1024 * 1024);
assert(s != INVALID_STREAM_ID);

// Read the data packet containing the FD
auto r = XLinkReadData(s, &packet);
assert(r == X_LINK_SUCCESS);

void *sharedMemAddr;
long receivedFd = packet->fd;
if (receivedFd < 0) {
printf("Not a valid FD, data streamed through message\n");
return 1;
}

// Map the shared memory
void *sharedMemAddr =
mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0);
if (sharedMemAddr == MAP_FAILED) {
sharedMemAddr = packet->data;
} else {
// Map the shared memory
sharedMemAddr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0);
if (sharedMemAddr == MAP_FAILED) {
perror("mmap");
return 1;
}
}

// Read and print the message from shared memory
Expand All @@ -69,20 +69,20 @@ int main(int argc, const char** argv){
assert(w == X_LINK_SUCCESS);

const char *shmName = SHARED_MEMORY_NAME;
long shmFd = shm_open(shmName, O_CREAT | O_RDWR, 0666);
long shmFd = memfd_create(shmName, 0);
if (shmFd < 0) {
perror("shm_open");
return 1;
perror("shm_open");
return 1;
}

ftruncate(shmFd, MAXIMUM_SHM_SIZE);

void *addr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0);
if (addr == MAP_FAILED) {
perror("mmap");
close(shmFd);
shm_unlink(shmName);
return 1;
perror("mmap");
close(shmFd);
shm_unlink(shmName);
return 1;
}

// Write a message to the shared memory
Expand All @@ -94,12 +94,13 @@ int main(int argc, const char** argv){
assert(w == X_LINK_SUCCESS);

r = XLinkReadData(s, &packet);
assert(w == X_LINK_SUCCESS);
assert(r == X_LINK_SUCCESS);

printf("Message from Process A: %s\n", (char *)(packet->data));


munmap(sharedMemAddr, MAXIMUM_SHM_SIZE);
if (receivedFd >= 0) {
munmap(sharedMemAddr, MAXIMUM_SHM_SIZE);
}

munmap(addr, MAXIMUM_SHM_SIZE);
close(shmFd);
Expand Down
36 changes: 19 additions & 17 deletions examples/xlink_server_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,24 @@ int main(int argc, const char** argv){
return 1;
}

auto s = XLinkOpenStream(0, "test", 1024);
auto s = XLinkOpenStream(0, "test", 1024 * 1024);
assert(s != INVALID_STREAM_ID);

const char *shmName = SHARED_MEMORY_NAME;
long shmFd = shm_open(shmName, O_CREAT | O_RDWR, 0666);
long shmFd = memfd_create(shmName, 0);
if (shmFd < 0) {
perror("shm_open");
return 1;
perror("shm_open");
return 1;
}

ftruncate(shmFd, MAXIMUM_SHM_SIZE);

void *addr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0);
if (addr == MAP_FAILED) {
perror("mmap");
close(shmFd);
shm_unlink(shmName);
return 1;
perror("mmap");
close(shmFd);
shm_unlink(shmName);
return 1;
}

// Write a message to the shared memory
Expand All @@ -67,26 +67,26 @@ int main(int argc, const char** argv){

streamPacketDesc_t *packet;
auto r = XLinkReadData(s, &packet);
assert(w == X_LINK_SUCCESS);
assert(r == X_LINK_SUCCESS);

printf("Message from Process B: %s\n", (char *)(packet->data));

// Read the data packet containing the FD
r = XLinkReadData(s, &packet);
assert(r == X_LINK_SUCCESS);

void *sharedMemAddr;
long receivedFd = packet->fd;
if (receivedFd < 0) {
printf("Not a valid FD, data streamed through message\n");
return 1;
}

// Map the shared memory
void *sharedMemAddr =
mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0);
if (sharedMemAddr == MAP_FAILED) {
sharedMemAddr = packet->data;
} else {
// Map the shared memory
sharedMemAddr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0);
if (sharedMemAddr == MAP_FAILED) {
perror("mmap");
return 1;
}
}

// Read and print the message from shared memory
Expand All @@ -96,7 +96,9 @@ int main(int argc, const char** argv){
w = XLinkWriteData(s, (uint8_t*)normalMessage, strlen(normalMessage) + 1);
assert(w == X_LINK_SUCCESS);

munmap(sharedMemAddr, MAXIMUM_SHM_SIZE);
if (receivedFd >= 0) {
munmap(sharedMemAddr, MAXIMUM_SHM_SIZE);
}

munmap(addr, MAXIMUM_SHM_SIZE);
close(shmFd);
Expand Down
45 changes: 0 additions & 45 deletions src/pc/PlatformData.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,51 +117,6 @@ int XLinkPlatformWriteFd(xLinkDeviceHandle_t *deviceHandle, const long fd, void
#if defined(__unix__)
case X_LINK_LOCAL_SHDMEM:
return shdmemPlatformWriteFd(deviceHandle->xLinkFD, fd, data2, size2);

case X_LINK_USB_VSC:
case X_LINK_USB_CDC:
case X_LINK_PCIE:
case X_LINK_TCP_IP:
{
if (fd <= 0) {
return X_LINK_ERROR;
}

// Determine file size through fstat
struct stat fileStats;
fstat(fd, &fileStats);
int size = fileStats.st_size;

// mmap the fine in memory
void *addr = mmap(NULL, 4096, PROT_READ, MAP_SHARED, fd, 0);
if (addr == MAP_FAILED) {
mvLog(MVLOG_ERROR, "Failed to mmap file to stream it over\n");
return X_LINK_ERROR;
}

// Use the respective write function to copy and send the message
int result = X_LINK_ERROR;
switch(deviceHandle->protocol) {
case X_LINK_USB_VSC:
case X_LINK_USB_CDC:
result = usbPlatformWrite(deviceHandle->xLinkFD, addr, size);
break;
case X_LINK_PCIE:
result = pciePlatformWrite(deviceHandle->xLinkFD, addr, size);
break;
case X_LINK_TCP_IP:
result = tcpipPlatformWrite(deviceHandle->xLinkFD, addr, size);
break;
default:
result = X_LINK_PLATFORM_INVALID_PARAMETERS;
break;
}

// Unmap file
munmap(addr, size);

return result;
}
#endif
case X_LINK_TCP_IP_OR_LOCAL_SHDMEM:
mvLog(MVLOG_ERROR, "Failed to write FD with TCP_IP_OR_LOCAL_SHDMEM\n");
Expand Down
32 changes: 19 additions & 13 deletions src/pc/protocols/local_memshd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <sys/un.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>

int shdmem_initialize() {
mvLog(MVLOG_DEBUG, "Shared memory initialized\n");
Expand Down Expand Up @@ -116,18 +117,20 @@ int shdmemPlatformRead(void *desc, void *data, int size, long *fd) {

struct msghdr msg = {};
struct iovec iov;
iov.iov_base = data;
iov.iov_len = size;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
if (data != NULL && size > 0) {
iov.iov_base = data;
iov.iov_len = size;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
}

char ancillaryElementBuffer[CMSG_SPACE(sizeof(long))];
msg.msg_control = ancillaryElementBuffer;
msg.msg_controllen = sizeof(ancillaryElementBuffer);

int bytes;
if(bytes = recvmsg(socketFd, &msg, MSG_WAITALL) < 0) {
mvLog(MVLOG_ERROR, "Failed to recieve message");
mvLog(MVLOG_ERROR, "Failed to recieve message: %s", strerror(errno));
return X_LINK_ERROR;
}

Expand All @@ -141,7 +144,7 @@ int shdmemPlatformRead(void *desc, void *data, int size, long *fd) {
*fd = recvFd;
}

return bytes;
return 0;
}

int shdmemPlatformWrite(void *desc, void *data, int size) {
Expand All @@ -160,11 +163,11 @@ int shdmemPlatformWrite(void *desc, void *data, int size) {

int bytes;
if(bytes = sendmsg(socketFd, &msg, 0) < 0) {
mvLog(MVLOG_ERROR, "Failed to send message\n");
mvLog(MVLOG_ERROR, "Failed to send message: %s\n", strerror(errno));
return X_LINK_ERROR;
}

return bytes;
return 0;
}

int shdmemPlatformWriteFd(void *desc, const long fd, void *data2, int size2) {
Expand All @@ -176,19 +179,22 @@ int shdmemPlatformWriteFd(void *desc, const long fd, void *data2, int size2) {

struct msghdr msg = {};
struct iovec iov;
char buf[1] = {0}; // Buffer for single byte of data to send
long buf[1] = {0}; // Buffer for single byte of data to send
if (data2 != NULL && size2 > 0) {
iov.iov_base = data2;
iov.iov_len = size2;
} else {
} else if (fd >= 0) {
iov.iov_base = buf;
iov.iov_len = sizeof(buf);
} else {
return 0;
}

msg.msg_iov = &iov;
msg.msg_iovlen = 1;

char ancillaryElementBuffer[CMSG_SPACE(sizeof(long))];
if (fd >= 0) {
char ancillaryElementBuffer[CMSG_SPACE(sizeof(long))];
msg.msg_control = ancillaryElementBuffer;
msg.msg_controllen = sizeof(ancillaryElementBuffer);

Expand All @@ -202,11 +208,11 @@ int shdmemPlatformWriteFd(void *desc, const long fd, void *data2, int size2) {

int bytes;
if(bytes = sendmsg(socketFd, &msg, 0) < 0) {
mvLog(MVLOG_ERROR, "Failed to send message");
mvLog(MVLOG_ERROR, "Failed to send message: %s", strerror(errno));
return X_LINK_ERROR;
}

return bytes;
return 0;
}

int shdmemSetProtocol(XLinkProtocol_t *protocol, const char* devPathRead, const char* devPathWrite) {
Expand Down
2 changes: 0 additions & 2 deletions src/pc/protocols/tcpip_memshd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,10 @@ int tcpipOrLocalShdmemPlatformServer(XLinkProtocol_t *protocol, const char *devP

int tcpipOrLocalShdmemPlatformConnect(XLinkProtocol_t *protocol, const char *devPathRead, const char *devPathWrite, void **fd) {
if(shdmemPlatformConnect(SHDMEM_DEFAULT_SOCKET, SHDMEM_DEFAULT_SOCKET, fd) == X_LINK_SUCCESS) {
mvLog(MVLOG_ERROR, "Failed to connect with SHDMEM");
return shdmemSetProtocol(protocol, devPathRead, devPathWrite);
}

if (tcpipPlatformConnect(devPathRead, devPathWrite, fd) == X_LINK_SUCCESS) {
mvLog(MVLOG_ERROR, "Failed to connect with TCP/IP");
*protocol = X_LINK_TCP_IP;
return X_LINK_SUCCESS;
}
Expand Down
Loading
Loading