Skip to content

Commit

Permalink
Merge pull request #85 from luxonis/shared-memory-fixes
Browse files Browse the repository at this point in the history
Fixed shdmem bugs and handled fd streaming
  • Loading branch information
TheMutta authored Jul 19, 2024
2 parents 585a38f + 188bce9 commit 6615e07
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 109 deletions.
4 changes: 2 additions & 2 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ add_example(xlink_server2 xlink_server2.cpp)
add_example(device_connect_reset device_connect_reset.cpp)

# Local shared memory example
if (UNIX)
if (UNIX AND !APPLE)
add_example(xlink_server_local xlink_server_local.cpp)
add_example(xlink_client_local xlink_client_local.cpp)
endif (UNIX)
endif (UNIX AND !APPLE)

37 changes: 19 additions & 18 deletions examples/xlink_client_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,25 @@ int main(int argc, const char** argv){

streamPacketDesc_t *packet;

auto s = XLinkOpenStream(0, "test", 1024);
auto s = XLinkOpenStream(0, "test", 1024 * 1024);
assert(s != INVALID_STREAM_ID);

// Read the data packet containing the FD
auto r = XLinkReadData(s, &packet);
assert(r == X_LINK_SUCCESS);

void *sharedMemAddr;
long receivedFd = packet->fd;
if (receivedFd < 0) {
printf("Not a valid FD, data streamed through message\n");
return 1;
}

// Map the shared memory
void *sharedMemAddr =
mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0);
if (sharedMemAddr == MAP_FAILED) {
sharedMemAddr = packet->data;
} else {
// Map the shared memory
sharedMemAddr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0);
if (sharedMemAddr == MAP_FAILED) {
perror("mmap");
return 1;
}
}

// Read and print the message from shared memory
Expand All @@ -69,20 +69,20 @@ int main(int argc, const char** argv){
assert(w == X_LINK_SUCCESS);

const char *shmName = SHARED_MEMORY_NAME;
long shmFd = shm_open(shmName, O_CREAT | O_RDWR, 0666);
long shmFd = memfd_create(shmName, 0);
if (shmFd < 0) {
perror("shm_open");
return 1;
perror("shm_open");
return 1;
}

ftruncate(shmFd, MAXIMUM_SHM_SIZE);

void *addr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0);
if (addr == MAP_FAILED) {
perror("mmap");
close(shmFd);
shm_unlink(shmName);
return 1;
perror("mmap");
close(shmFd);
shm_unlink(shmName);
return 1;
}

// Write a message to the shared memory
Expand All @@ -94,12 +94,13 @@ int main(int argc, const char** argv){
assert(w == X_LINK_SUCCESS);

r = XLinkReadData(s, &packet);
assert(w == X_LINK_SUCCESS);
assert(r == X_LINK_SUCCESS);

printf("Message from Process A: %s\n", (char *)(packet->data));


munmap(sharedMemAddr, MAXIMUM_SHM_SIZE);
if (receivedFd >= 0) {
munmap(sharedMemAddr, MAXIMUM_SHM_SIZE);
}

munmap(addr, MAXIMUM_SHM_SIZE);
close(shmFd);
Expand Down
36 changes: 19 additions & 17 deletions examples/xlink_server_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,24 @@ int main(int argc, const char** argv){
return 1;
}

auto s = XLinkOpenStream(0, "test", 1024);
auto s = XLinkOpenStream(0, "test", 1024 * 1024);
assert(s != INVALID_STREAM_ID);

const char *shmName = SHARED_MEMORY_NAME;
long shmFd = shm_open(shmName, O_CREAT | O_RDWR, 0666);
long shmFd = memfd_create(shmName, 0);
if (shmFd < 0) {
perror("shm_open");
return 1;
perror("shm_open");
return 1;
}

ftruncate(shmFd, MAXIMUM_SHM_SIZE);

void *addr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmFd, 0);
if (addr == MAP_FAILED) {
perror("mmap");
close(shmFd);
shm_unlink(shmName);
return 1;
perror("mmap");
close(shmFd);
shm_unlink(shmName);
return 1;
}

// Write a message to the shared memory
Expand All @@ -67,26 +67,26 @@ int main(int argc, const char** argv){

streamPacketDesc_t *packet;
auto r = XLinkReadData(s, &packet);
assert(w == X_LINK_SUCCESS);
assert(r == X_LINK_SUCCESS);

printf("Message from Process B: %s\n", (char *)(packet->data));

// Read the data packet containing the FD
r = XLinkReadData(s, &packet);
assert(r == X_LINK_SUCCESS);

void *sharedMemAddr;
long receivedFd = packet->fd;
if (receivedFd < 0) {
printf("Not a valid FD, data streamed through message\n");
return 1;
}

// Map the shared memory
void *sharedMemAddr =
mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0);
if (sharedMemAddr == MAP_FAILED) {
sharedMemAddr = packet->data;
} else {
// Map the shared memory
sharedMemAddr = mmap(NULL, MAXIMUM_SHM_SIZE, PROT_READ, MAP_SHARED, receivedFd, 0);
if (sharedMemAddr == MAP_FAILED) {
perror("mmap");
return 1;
}
}

// Read and print the message from shared memory
Expand All @@ -96,7 +96,9 @@ int main(int argc, const char** argv){
w = XLinkWriteData(s, (uint8_t*)normalMessage, strlen(normalMessage) + 1);
assert(w == X_LINK_SUCCESS);

munmap(sharedMemAddr, MAXIMUM_SHM_SIZE);
if (receivedFd >= 0) {
munmap(sharedMemAddr, MAXIMUM_SHM_SIZE);
}

munmap(addr, MAXIMUM_SHM_SIZE);
close(shmFd);
Expand Down
45 changes: 0 additions & 45 deletions src/pc/PlatformData.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,51 +117,6 @@ int XLinkPlatformWriteFd(xLinkDeviceHandle_t *deviceHandle, const long fd, void
#if defined(__unix__)
case X_LINK_LOCAL_SHDMEM:
return shdmemPlatformWriteFd(deviceHandle->xLinkFD, fd, data2, size2);

case X_LINK_USB_VSC:
case X_LINK_USB_CDC:
case X_LINK_PCIE:
case X_LINK_TCP_IP:
{
if (fd <= 0) {
return X_LINK_ERROR;
}

// Determine file size through fstat
struct stat fileStats;
fstat(fd, &fileStats);
int size = fileStats.st_size;

// mmap the fine in memory
void *addr = mmap(NULL, 4096, PROT_READ, MAP_SHARED, fd, 0);
if (addr == MAP_FAILED) {
mvLog(MVLOG_ERROR, "Failed to mmap file to stream it over\n");
return X_LINK_ERROR;
}

// Use the respective write function to copy and send the message
int result = X_LINK_ERROR;
switch(deviceHandle->protocol) {
case X_LINK_USB_VSC:
case X_LINK_USB_CDC:
result = usbPlatformWrite(deviceHandle->xLinkFD, addr, size);
break;
case X_LINK_PCIE:
result = pciePlatformWrite(deviceHandle->xLinkFD, addr, size);
break;
case X_LINK_TCP_IP:
result = tcpipPlatformWrite(deviceHandle->xLinkFD, addr, size);
break;
default:
result = X_LINK_PLATFORM_INVALID_PARAMETERS;
break;
}

// Unmap file
munmap(addr, size);

return result;
}
#endif
case X_LINK_TCP_IP_OR_LOCAL_SHDMEM:
mvLog(MVLOG_ERROR, "Failed to write FD with TCP_IP_OR_LOCAL_SHDMEM\n");
Expand Down
32 changes: 19 additions & 13 deletions src/pc/protocols/local_memshd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <sys/un.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>

int shdmem_initialize() {
mvLog(MVLOG_DEBUG, "Shared memory initialized\n");
Expand Down Expand Up @@ -116,18 +117,20 @@ int shdmemPlatformRead(void *desc, void *data, int size, long *fd) {

struct msghdr msg = {};
struct iovec iov;
iov.iov_base = data;
iov.iov_len = size;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
if (data != NULL && size > 0) {
iov.iov_base = data;
iov.iov_len = size;
msg.msg_iov = &iov;
msg.msg_iovlen = 1;
}

char ancillaryElementBuffer[CMSG_SPACE(sizeof(long))];
msg.msg_control = ancillaryElementBuffer;
msg.msg_controllen = sizeof(ancillaryElementBuffer);

int bytes;
if(bytes = recvmsg(socketFd, &msg, MSG_WAITALL) < 0) {
mvLog(MVLOG_ERROR, "Failed to recieve message");
mvLog(MVLOG_ERROR, "Failed to recieve message: %s", strerror(errno));
return X_LINK_ERROR;
}

Expand All @@ -141,7 +144,7 @@ int shdmemPlatformRead(void *desc, void *data, int size, long *fd) {
*fd = recvFd;
}

return bytes;
return 0;
}

int shdmemPlatformWrite(void *desc, void *data, int size) {
Expand All @@ -160,11 +163,11 @@ int shdmemPlatformWrite(void *desc, void *data, int size) {

int bytes;
if(bytes = sendmsg(socketFd, &msg, 0) < 0) {
mvLog(MVLOG_ERROR, "Failed to send message\n");
mvLog(MVLOG_ERROR, "Failed to send message: %s\n", strerror(errno));
return X_LINK_ERROR;
}

return bytes;
return 0;
}

int shdmemPlatformWriteFd(void *desc, const long fd, void *data2, int size2) {
Expand All @@ -176,19 +179,22 @@ int shdmemPlatformWriteFd(void *desc, const long fd, void *data2, int size2) {

struct msghdr msg = {};
struct iovec iov;
char buf[1] = {0}; // Buffer for single byte of data to send
long buf[1] = {0}; // Buffer for single byte of data to send
if (data2 != NULL && size2 > 0) {
iov.iov_base = data2;
iov.iov_len = size2;
} else {
} else if (fd >= 0) {
iov.iov_base = buf;
iov.iov_len = sizeof(buf);
} else {
return 0;
}

msg.msg_iov = &iov;
msg.msg_iovlen = 1;

char ancillaryElementBuffer[CMSG_SPACE(sizeof(long))];
if (fd >= 0) {
char ancillaryElementBuffer[CMSG_SPACE(sizeof(long))];
msg.msg_control = ancillaryElementBuffer;
msg.msg_controllen = sizeof(ancillaryElementBuffer);

Expand All @@ -202,11 +208,11 @@ int shdmemPlatformWriteFd(void *desc, const long fd, void *data2, int size2) {

int bytes;
if(bytes = sendmsg(socketFd, &msg, 0) < 0) {
mvLog(MVLOG_ERROR, "Failed to send message");
mvLog(MVLOG_ERROR, "Failed to send message: %s", strerror(errno));
return X_LINK_ERROR;
}

return bytes;
return 0;
}

int shdmemSetProtocol(XLinkProtocol_t *protocol, const char* devPathRead, const char* devPathWrite) {
Expand Down
2 changes: 0 additions & 2 deletions src/pc/protocols/tcpip_memshd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,10 @@ int tcpipOrLocalShdmemPlatformServer(XLinkProtocol_t *protocol, const char *devP

int tcpipOrLocalShdmemPlatformConnect(XLinkProtocol_t *protocol, const char *devPathRead, const char *devPathWrite, void **fd) {
if(shdmemPlatformConnect(SHDMEM_DEFAULT_SOCKET, SHDMEM_DEFAULT_SOCKET, fd) == X_LINK_SUCCESS) {
mvLog(MVLOG_ERROR, "Failed to connect with SHDMEM");
return shdmemSetProtocol(protocol, devPathRead, devPathWrite);
}

if (tcpipPlatformConnect(devPathRead, devPathWrite, fd) == X_LINK_SUCCESS) {
mvLog(MVLOG_ERROR, "Failed to connect with TCP/IP");
*protocol = X_LINK_TCP_IP;
return X_LINK_SUCCESS;
}
Expand Down
Loading

0 comments on commit 6615e07

Please sign in to comment.