From f273fd5c22f9f67641d2561a91fcc1ca1d4d57f9 Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Thu, 10 Apr 2025 09:51:10 -0400 Subject: [PATCH 01/24] Refactor: libcrmcommon: Use GByteArray for the crm_ipc_s buffer. This has the built-in advantage of various library functions that can be used to manipulate it instead of using pointer arithmetic. For the moment, this isn't important but it will become important later when we're doing split up IPC messages. On the other hand, there's a couple other changes here that are not advantages. First, instead of reading directly into the GByteArray, we now have to allocate a buffer to read into, and then append that to the GByteArray. This also means we need to be careful about freeing that buffer. Second, because there's no way to clear out a GByteArray and inserting new bytes directly at the beginning, we just need to allocate and free it every time we do a read. This also means we need to be more careful about checking if it's NULL. This will be less of a problem later when we are reading into the same GByteArray multiple times for a split up message. --- lib/common/ipc_client.c | 88 +++++++++++++++++++++++++++-------------- 1 file changed, 59 insertions(+), 29 deletions(-) diff --git a/lib/common/ipc_client.c b/lib/common/ipc_client.c index 1f65ace114e..c458c19aed8 100644 --- a/lib/common/ipc_client.c +++ b/lib/common/ipc_client.c @@ -822,7 +822,7 @@ struct crm_ipc_s { unsigned int buf_size; // size of allocated buffer int msg_size; int need_reply; - char *buffer; + GByteArray *buffer; char *server_name; // server IPC name being connected to qb_ipcc_connection_t *ipc; }; @@ -860,16 +860,9 @@ crm_ipc_new(const char *name, size_t max_size) free(client); return NULL; } - client->buf_size = crm_ipc_default_buffer_size(); - client->buffer = malloc(client->buf_size); - if (client->buffer == NULL) { - crm_err("Could not create %s IPC connection: %s", - name, strerror(errno)); - free(client->server_name); - free(client); - return NULL; - } + client->buf_size = crm_ipc_default_buffer_size(); + client->buffer = NULL; client->pfd.fd = -1; client->pfd.events = POLLIN; client->pfd.revents = 0; @@ -969,7 +962,11 @@ crm_ipc_destroy(crm_ipc_t * client) crm_trace("Destroying inactive %s IPC connection", client->server_name); } - free(client->buffer); + + if (client->buffer != NULL) { + g_byte_array_free(client->buffer, TRUE); + } + free(client->server_name); free(client); } @@ -1063,30 +1060,33 @@ long crm_ipc_read(crm_ipc_t * client) { pcmk__ipc_header_t *header = NULL; + char *buffer = NULL; + long rc = -ENOMSG; - pcmk__assert((client != NULL) && (client->ipc != NULL) - && (client->buffer != NULL)); + pcmk__assert((client != NULL) && (client->ipc != NULL)); - client->buffer[0] = 0; - client->msg_size = qb_ipcc_event_recv(client->ipc, client->buffer, + buffer = pcmk__assert_alloc(crm_ipc_default_buffer_size(), sizeof(char)); + client->msg_size = qb_ipcc_event_recv(client->ipc, buffer, client->buf_size, 0); + if (client->msg_size >= 0) { - header = (pcmk__ipc_header_t *)(void*)client->buffer; + header = (pcmk__ipc_header_t *)(void*) buffer; if (!pcmk__valid_ipc_header(header)) { - return -EBADMSG; + rc = -EBADMSG; + goto done; } crm_trace("Received %s IPC event %d size=%u rc=%d text='%.100s'", client->server_name, header->qb.id, header->qb.size, - client->msg_size, - client->buffer + sizeof(pcmk__ipc_header_t)); + client->msg_size, buffer + sizeof(pcmk__ipc_header_t)); } else { crm_trace("No message received from %s IPC: %s", client->server_name, pcmk_strerror(client->msg_size)); if (client->msg_size == -EAGAIN) { - return -EAGAIN; + rc = -EAGAIN; + goto done; } } @@ -1095,17 +1095,24 @@ crm_ipc_read(crm_ipc_t * client) } if (header) { - /* Data excluding the header */ - return header->size; + client->buffer = g_byte_array_sized_new(client->buf_size); + g_byte_array_append(client->buffer, (const guint8 *) buffer, + client->msg_size); + + /* Data length excluding the header */ + rc = header->size; } - return -ENOMSG; + +done: + free(buffer); + return rc; } const char * crm_ipc_buffer(crm_ipc_t * client) { pcmk__assert(client != NULL); - return client->buffer + sizeof(pcmk__ipc_header_t); + return (const char *) (client->buffer->data + sizeof(pcmk__ipc_header_t)); } uint32_t @@ -1118,7 +1125,7 @@ crm_ipc_buffer_flags(crm_ipc_t * client) return 0; } - header = (pcmk__ipc_header_t *)(void*)client->buffer; + header = (pcmk__ipc_header_t *)(void*) client->buffer->data; return header->flags; } @@ -1149,12 +1156,16 @@ internal_ipc_get_reply(crm_ipc_t *client, int request_id, int ms_timeout, request_id); do { + char *buffer = pcmk__assert_alloc(client->buf_size, sizeof(char)); + const char *data = NULL; xmlNode *xml = NULL; - *bytes = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, + *bytes = qb_ipcc_recv(client->ipc, buffer, client->buf_size, qb_timeout); if (*bytes <= 0) { + free(buffer); + if (!crm_ipc_connected(client)) { crm_err("%s IPC provider disconnected while waiting for message %d", client->server_name, request_id); @@ -1164,14 +1175,22 @@ internal_ipc_get_reply(crm_ipc_t *client, int request_id, int ms_timeout, continue; } - hdr = (pcmk__ipc_header_t *)(void*) client->buffer; + hdr = (pcmk__ipc_header_t *) (void *) buffer; if (hdr->qb.id == request_id) { /* Got the reply we were expecting. */ + if (client->buffer != NULL) { + g_byte_array_free(client->buffer, TRUE); + } + + client->buffer = g_byte_array_sized_new(client->buf_size); + g_byte_array_append(client->buffer, (const guint8 *) buffer, *bytes); + free(buffer); break; } - xml = pcmk__xml_parse(crm_ipc_buffer(client)); + data = buffer + sizeof(pcmk__ipc_header_t); + xml = pcmk__xml_parse(data); if (hdr->qb.id < request_id) { crm_err("Discarding old reply %d (need %d)", hdr->qb.id, request_id); @@ -1181,6 +1200,8 @@ internal_ipc_get_reply(crm_ipc_t *client, int request_id, int ms_timeout, crm_log_xml_notice(xml, "ImpossibleReply"); pcmk__assert(hdr->qb.id <= request_id); } + + free(buffer); } while (time(NULL) < timeout || (timeout == 0 && *bytes == -EAGAIN)); if (*bytes > 0) { @@ -1246,8 +1267,17 @@ crm_ipc_send(crm_ipc_t *client, const xmlNode *message, ms_timeout = 5000; } + /* This loop exists only to clear out any old replies that we haven't + * yet read. We don't care about their contents since it's too late to + * do anything with them, so we just read and throw them away. + */ if (client->need_reply) { - qb_rc = qb_ipcc_recv(client->ipc, client->buffer, client->buf_size, ms_timeout); + char *buffer = pcmk__assert_alloc(crm_ipc_default_buffer_size(), + sizeof(char)); + + qb_rc = qb_ipcc_recv(client->ipc, buffer, client->buf_size, ms_timeout); + free(buffer); + if (qb_rc < 0) { crm_warn("Sending %s IPC disabled until pending reply received", client->server_name); From b14f2ee67f6f08be69ff086ef9eb3f4f035ebde8 Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Wed, 19 Feb 2025 11:34:58 -0500 Subject: [PATCH 02/24] Refactor: libcrmcommon: pcmk__ipc_prepare_iov takes a string... ...instead of an xmlNode. The idea here is that to deal with split up messages, this function can be called repeatedly from a loop. Each call will prepare the next chunk of the message to be sent. The first step in doing that is simply changing the type of the parameter. This does result in a little code duplication in the callers, but there are only a couple. --- daemons/based/based_notify.c | 10 ++++++++-- include/crm/common/ipc_internal.h | 2 +- lib/common/ipc_client.c | 9 ++++++++- lib/common/ipc_server.c | 26 +++++++++++++------------- 4 files changed, 30 insertions(+), 17 deletions(-) diff --git a/daemons/based/based_notify.c b/daemons/based/based_notify.c index 243c1f7db71..b388c2301e7 100644 --- a/daemons/based/based_notify.c +++ b/daemons/based/based_notify.c @@ -106,9 +106,13 @@ cib_notify_send(const xmlNode *xml) { struct iovec *iov; struct cib_notification_s update; - + GString *iov_buffer = NULL; ssize_t bytes = 0; - int rc = pcmk__ipc_prepare_iov(0, xml, &iov, &bytes); + int rc = pcmk_rc_ok; + + iov_buffer = g_string_sized_new(1024); + pcmk__xml_string(xml, 0, iov_buffer, 0); + rc = pcmk__ipc_prepare_iov(0, iov_buffer, &iov, &bytes); if (rc == pcmk_rc_ok) { update.msg = xml; @@ -121,6 +125,8 @@ cib_notify_send(const xmlNode *xml) crm_notice("Could not notify clients: %s " QB_XS " rc=%d", pcmk_rc_str(rc), rc); } + + g_string_free(iov_buffer, TRUE); } void diff --git a/include/crm/common/ipc_internal.h b/include/crm/common/ipc_internal.h index 72b6f7f189a..b634a2f877c 100644 --- a/include/crm/common/ipc_internal.h +++ b/include/crm/common/ipc_internal.h @@ -239,7 +239,7 @@ int pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c, #define pcmk__ipc_send_ack(c, req, flags, tag, ver, st) \ pcmk__ipc_send_ack_as(__func__, __LINE__, (c), (req), (flags), (tag), (ver), (st)) -int pcmk__ipc_prepare_iov(uint32_t request, const xmlNode *message, +int pcmk__ipc_prepare_iov(uint32_t request, const GString *message, struct iovec **result, ssize_t *bytes); int pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message, uint32_t flags); diff --git a/lib/common/ipc_client.c b/lib/common/ipc_client.c index c458c19aed8..05babce4153 100644 --- a/lib/common/ipc_client.c +++ b/lib/common/ipc_client.c @@ -1250,6 +1250,7 @@ crm_ipc_send(crm_ipc_t *client, const xmlNode *message, struct iovec *iov; static uint32_t id = 0; pcmk__ipc_header_t *header; + GString *iov_buffer = NULL; if (client == NULL) { crm_notice("Can't send IPC request without connection (bug?): %.100s", @@ -1292,10 +1293,15 @@ crm_ipc_send(crm_ipc_t *client, const xmlNode *message, id++; CRM_LOG_ASSERT(id != 0); /* Crude wrap-around detection */ - rc = pcmk__ipc_prepare_iov(id, message, &iov, &bytes); + + iov_buffer = g_string_sized_new(1024); + pcmk__xml_string(message, 0, iov_buffer, 0); + rc = pcmk__ipc_prepare_iov(id, iov_buffer, &iov, &bytes); + if (rc != pcmk_rc_ok) { crm_warn("Couldn't prepare %s IPC request: %s " QB_XS " rc=%d", client->server_name, pcmk_rc_str(rc), rc); + g_string_free(iov_buffer, TRUE); return pcmk_rc2legacy(rc); } @@ -1369,6 +1375,7 @@ crm_ipc_send(crm_ipc_t *client, const xmlNode *message, ((rc == 0)? "No bytes sent" : pcmk_strerror(rc)), rc); } + g_string_free(iov_buffer, TRUE); pcmk_free_ipc_event(iov); return rc; } diff --git a/lib/common/ipc_server.c b/lib/common/ipc_server.c index f6bb0bddcc4..eeb493529de 100644 --- a/lib/common/ipc_server.c +++ b/lib/common/ipc_server.c @@ -551,7 +551,7 @@ crm_ipcs_flush_events(pcmk__client_t *c) * \brief Create an I/O vector for sending an IPC XML message * * \param[in] request Identifier for libqb response header - * \param[in] message XML message to send + * \param[in] message Message to send * \param[out] result Where to store prepared I/O vector - NULL * on error * \param[out] bytes Size of prepared data in bytes @@ -559,13 +559,12 @@ crm_ipcs_flush_events(pcmk__client_t *c) * \return Standard Pacemaker return code */ int -pcmk__ipc_prepare_iov(uint32_t request, const xmlNode *message, +pcmk__ipc_prepare_iov(uint32_t request, const GString *message, struct iovec **result, ssize_t *bytes) { struct iovec *iov; unsigned int total = 0; unsigned int max_send_size = crm_ipc_default_buffer_size(); - GString *buffer = NULL; pcmk__ipc_header_t *header = NULL; int rc = pcmk_rc_ok; @@ -580,20 +579,17 @@ pcmk__ipc_prepare_iov(uint32_t request, const xmlNode *message, goto done; } - buffer = g_string_sized_new(1024); - pcmk__xml_string(message, 0, buffer, 0); - *result = NULL; iov = pcmk__new_ipc_event(); iov[0].iov_len = sizeof(pcmk__ipc_header_t); iov[0].iov_base = header; header->version = PCMK__IPC_VERSION; - header->size = buffer->len + 1; + header->size = message->len + 1; total = iov[0].iov_len + header->size; if (total >= max_send_size) { - crm_log_xml_trace(message, "EMSGSIZE"); + crm_trace("%s", message->str); crm_err("Could not transmit message; message size %" PRIu32" bytes is " "larger than the maximum of %" PRIu32, header->size, max_send_size); @@ -602,7 +598,7 @@ pcmk__ipc_prepare_iov(uint32_t request, const xmlNode *message, goto done; } - iov[1].iov_base = pcmk__str_copy(buffer->str); + iov[1].iov_base = pcmk__str_copy(message->str); iov[1].iov_len = header->size; header->qb.size = iov[0].iov_len + iov[1].iov_len; @@ -615,9 +611,6 @@ pcmk__ipc_prepare_iov(uint32_t request, const xmlNode *message, } done: - if (buffer != NULL) { - g_string_free(buffer, TRUE); - } return rc; } @@ -706,11 +699,16 @@ pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message, { struct iovec *iov = NULL; int rc = pcmk_rc_ok; + GString *iov_buffer = NULL; if (c == NULL) { return EINVAL; } - rc = pcmk__ipc_prepare_iov(request, message, &iov, NULL); + + iov_buffer = g_string_sized_new(1024); + pcmk__xml_string(message, 0, iov_buffer, 0); + rc = pcmk__ipc_prepare_iov(request, iov_buffer, &iov, NULL); + if (rc == pcmk_rc_ok) { pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free); rc = pcmk__ipc_send_iov(c, iov, flags); @@ -718,6 +716,8 @@ pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message, crm_notice("IPC message to pid %d failed: %s " QB_XS " rc=%d", c->pid, pcmk_rc_str(rc), rc); } + + g_string_free(iov_buffer, TRUE); return rc; } From ef30a9bce471132ea4f9e7ee3e698165a55131e4 Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Mon, 28 Apr 2025 14:42:36 -0400 Subject: [PATCH 03/24] API: libcrmcommon: Introduce the pcmk_rc_ipc_more error code. Functions should return this error code if a single IPC message is too large to fit into a single buffer, and so it has to be split up into multiple. It can also be used when one chunk of a split up message has been read, indicating that the caller needs to continue reading. --- cts/cli/regression.error_codes.exp | 4 ++++ include/crm/common/results.h | 1 + lib/common/results.c | 4 ++++ 3 files changed, 9 insertions(+) diff --git a/cts/cli/regression.error_codes.exp b/cts/cli/regression.error_codes.exp index 0b9eba43b73..0e0244df9f7 100644 --- a/cts/cli/regression.error_codes.exp +++ b/cts/cli/regression.error_codes.exp @@ -145,6 +145,7 @@ pcmk_rc_node_unknown - Node not found =#=#=#= End test: Get negative Pacemaker return code (with name) (XML) - OK (0) =#=#=#= * Passed: crm_error - Get negative Pacemaker return code (with name) (XML) =#=#=#= Begin test: List Pacemaker return codes (non-positive) =#=#=#= +-1041: More IPC message fragments to send -1040: DC is not yet elected -1039: Compression/decompression error -1038: Nameserver resolution error @@ -190,6 +191,7 @@ pcmk_rc_node_unknown - Node not found * Passed: crm_error - List Pacemaker return codes (non-positive) =#=#=#= Begin test: List Pacemaker return codes (non-positive) (XML) =#=#=#= + @@ -235,6 +237,7 @@ pcmk_rc_node_unknown - Node not found =#=#=#= End test: List Pacemaker return codes (non-positive) (XML) - OK (0) =#=#=#= * Passed: crm_error - List Pacemaker return codes (non-positive) (XML) =#=#=#= Begin test: List Pacemaker return codes (non-positive) (with names) =#=#=#= +-1041: pcmk_rc_ipc_more More IPC message fragments to send -1040: pcmk_rc_no_dc DC is not yet elected -1039: pcmk_rc_compression Compression/decompression error -1038: pcmk_rc_ns_resolution Nameserver resolution error @@ -280,6 +283,7 @@ pcmk_rc_node_unknown - Node not found * Passed: crm_error - List Pacemaker return codes (non-positive) (with names) =#=#=#= Begin test: List Pacemaker return codes (non-positive) (with names) (XML) =#=#=#= + diff --git a/include/crm/common/results.h b/include/crm/common/results.h index 7525282c3f2..9222325ac55 100644 --- a/include/crm/common/results.h +++ b/include/crm/common/results.h @@ -110,6 +110,7 @@ enum pcmk_rc_e { /* When adding new values, use consecutively lower numbers, update the array * in lib/common/results.c, and test with crm_error. */ + pcmk_rc_ipc_more = -1041, pcmk_rc_no_dc = -1040, pcmk_rc_compression = -1039, pcmk_rc_ns_resolution = -1038, diff --git a/lib/common/results.c b/lib/common/results.c index 568b8d448fb..4001abf1ae5 100644 --- a/lib/common/results.c +++ b/lib/common/results.c @@ -430,6 +430,10 @@ static const struct pcmk__rc_info { "DC is not yet elected", -pcmk_err_generic, }, + { "pcmk_rc_ipc_more", + "More IPC message fragments to send", + -pcmk_err_generic, + }, }; /*! From ad98cde9ca4b7cf057d80a9d146881d607c25f71 Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Wed, 19 Feb 2025 12:33:38 -0500 Subject: [PATCH 04/24] Refactor: libcrmcommon: pcmk__ipc_prepare_iov should take an offset. The function now only prepares as much for one I/O vector as will fit in a single buffer. The idea is that you can call the function repeatedly in a loop, preparing and transmitting individual chunks of a single XML message. As long as pcmk__ipc_prepare_iov returns pcmk_rc_ipc_more, you know there's more that needs to be prepared. Keep a running count of the bytes it prepared and pass that in for offset. Note that clients are not doing this yet, so very large IPC messages are still going to fail. --- daemons/based/based_notify.c | 2 +- include/crm/common/ipc_internal.h | 2 +- lib/common/crmcommon_private.h | 1 + lib/common/ipc_client.c | 2 +- lib/common/ipc_server.c | 85 +++++++++++++++++++++++-------- 5 files changed, 69 insertions(+), 23 deletions(-) diff --git a/daemons/based/based_notify.c b/daemons/based/based_notify.c index b388c2301e7..c4107d233aa 100644 --- a/daemons/based/based_notify.c +++ b/daemons/based/based_notify.c @@ -112,7 +112,7 @@ cib_notify_send(const xmlNode *xml) iov_buffer = g_string_sized_new(1024); pcmk__xml_string(xml, 0, iov_buffer, 0); - rc = pcmk__ipc_prepare_iov(0, iov_buffer, &iov, &bytes); + rc = pcmk__ipc_prepare_iov(0, iov_buffer, 0, &iov, &bytes); if (rc == pcmk_rc_ok) { update.msg = xml; diff --git a/include/crm/common/ipc_internal.h b/include/crm/common/ipc_internal.h index b634a2f877c..517c94c8340 100644 --- a/include/crm/common/ipc_internal.h +++ b/include/crm/common/ipc_internal.h @@ -240,7 +240,7 @@ int pcmk__ipc_send_ack_as(const char *function, int line, pcmk__client_t *c, pcmk__ipc_send_ack_as(__func__, __LINE__, (c), (req), (flags), (tag), (ver), (st)) int pcmk__ipc_prepare_iov(uint32_t request, const GString *message, - struct iovec **result, ssize_t *bytes); + uint16_t index, struct iovec **result, ssize_t *bytes); int pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message, uint32_t flags); int pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags); diff --git a/lib/common/crmcommon_private.h b/lib/common/crmcommon_private.h index 2a45e41c419..a2c04f9cfc7 100644 --- a/lib/common/crmcommon_private.h +++ b/lib/common/crmcommon_private.h @@ -309,6 +309,7 @@ typedef struct pcmk__ipc_header_s { uint32_t size; uint32_t flags; uint8_t version; + uint16_t part_id; // If this is a multipart message, which part is this? } pcmk__ipc_header_t; G_GNUC_INTERNAL diff --git a/lib/common/ipc_client.c b/lib/common/ipc_client.c index 05babce4153..e3d707c276f 100644 --- a/lib/common/ipc_client.c +++ b/lib/common/ipc_client.c @@ -1296,7 +1296,7 @@ crm_ipc_send(crm_ipc_t *client, const xmlNode *message, iov_buffer = g_string_sized_new(1024); pcmk__xml_string(message, 0, iov_buffer, 0); - rc = pcmk__ipc_prepare_iov(id, iov_buffer, &iov, &bytes); + rc = pcmk__ipc_prepare_iov(id, iov_buffer, 0, &iov, &bytes); if (rc != pcmk_rc_ok) { crm_warn("Couldn't prepare %s IPC request: %s " QB_XS " rc=%d", diff --git a/lib/common/ipc_server.c b/lib/common/ipc_server.c index eeb493529de..a99762c5588 100644 --- a/lib/common/ipc_server.c +++ b/lib/common/ipc_server.c @@ -550,21 +550,30 @@ crm_ipcs_flush_events(pcmk__client_t *c) * \internal * \brief Create an I/O vector for sending an IPC XML message * - * \param[in] request Identifier for libqb response header - * \param[in] message Message to send - * \param[out] result Where to store prepared I/O vector - NULL - * on error - * \param[out] bytes Size of prepared data in bytes + * If the message is too large to fit into a single buffer, this function will + * prepare an I/O vector that only holds as much as fits. The remainder can + * be prepared in a separate call by keeping a running count of + * \c result[1].iov_len and passing that in for \p offset. + * + * \param[in] request Identifier for libqb response header + * \param[in] message Message to send + * \param[in] offset How many bytes into \p buffer to start when + * building the message + * \param[out] result Where to store prepared I/O vector - NULL + * on error + * \param[out] bytes Size of prepared data in bytes (includes header) * * \return Standard Pacemaker return code */ int -pcmk__ipc_prepare_iov(uint32_t request, const GString *message, +pcmk__ipc_prepare_iov(uint32_t request, const GString *message, uint16_t index, struct iovec **result, ssize_t *bytes) { - struct iovec *iov; + struct iovec *iov = NULL; + unsigned int payload_size = 0; unsigned int total = 0; unsigned int max_send_size = crm_ipc_default_buffer_size(); + size_t offset = 0; pcmk__ipc_header_t *header = NULL; int rc = pcmk_rc_ok; @@ -585,22 +594,58 @@ pcmk__ipc_prepare_iov(uint32_t request, const GString *message, iov[0].iov_base = header; header->version = PCMK__IPC_VERSION; - header->size = message->len + 1; - total = iov[0].iov_len + header->size; + + /* We are passed an index, which is basically how many times this function + * has been called. This is how we support multi-part IPC messages. We + * need to convert that into an offset into the buffer that we want to start + * reading from. + * + * Each call to this function can send max_send_size, but this also includes + * the header and a null terminator character for the end of the payload. + * We need to subtract those out here. + */ + offset = index * (max_send_size - iov[0].iov_len - 1); + + /* How much of message is left to send? This does not include the null + * terminator character. + */ + payload_size = message->len - offset; + + /* How much would be transmitted, including the header size and null + * terminator character for the buffer? + */ + total = iov[0].iov_len + payload_size + 1; if (total >= max_send_size) { - crm_trace("%s", message->str); - crm_err("Could not transmit message; message size %" PRIu32" bytes is " - "larger than the maximum of %" PRIu32, header->size, - max_send_size); - rc = EMSGSIZE; - pcmk_free_ipc_event(iov); - goto done; - } + /* The entire packet is too big to fit in a single buffer. Calculate + * how much of it we can send - buffer size, minus header size, minus + * one for the null terminator. + */ + payload_size = max_send_size - iov[0].iov_len - 1; + + header->size = payload_size + 1; + + iov[1].iov_base = strndup(message->str + offset, payload_size); + if (iov[1].iov_base == NULL) { + rc = ENOMEM; + pcmk_free_ipc_event(iov); + goto done; + } - iov[1].iov_base = pcmk__str_copy(message->str); - iov[1].iov_len = header->size; + iov[1].iov_len = header->size; + rc = pcmk_rc_ipc_more; + + } else { + /* The entire packet fits in a single buffer. We can copy the entirety + * of it into the payload. + */ + header->size = payload_size + 1; + + iov[1].iov_base = pcmk__str_copy(message->str + offset); + iov[1].iov_len = header->size; + } + header->part_id = index; header->qb.size = iov[0].iov_len + iov[1].iov_len; header->qb.id = (int32_t)request; /* Replying to a specific request */ @@ -707,7 +752,7 @@ pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message, iov_buffer = g_string_sized_new(1024); pcmk__xml_string(message, 0, iov_buffer, 0); - rc = pcmk__ipc_prepare_iov(request, iov_buffer, &iov, NULL); + rc = pcmk__ipc_prepare_iov(request, iov_buffer, 0, &iov, NULL); if (rc == pcmk_rc_ok) { pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free); From b3879d5d27d2d5f7c0439fa5b550fde3aeb562f4 Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Tue, 8 Apr 2025 15:20:44 -0400 Subject: [PATCH 05/24] Feature: libcrmcommon: Add flags for multipart messages. This does technically add flags to a public enum, but on the other hand they are added after the "these are for pacemaker's internal use only" comment, and we are hoping to eventually make this entire enum private. --- include/crm/common/ipc.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/include/crm/common/ipc.h b/include/crm/common/ipc.h index 3890919d81e..df84d15013f 100644 --- a/include/crm/common/ipc.h +++ b/include/crm/common/ipc.h @@ -150,6 +150,10 @@ enum crm_ipc_flags //! All replies to proxied connections are sent as events. This flag //! preserves whether the events should be treated as an Event or a Response crm_ipc_proxied_relay_response = (UINT32_C(1) << 18), + //! This is a multi-part IPC message + crm_ipc_multipart = (UINT32_C(1) << 19), + //! This is the end of a multi-part IPC message + crm_ipc_multipart_end = (UINT32_C(1) << 20), }; typedef struct crm_ipc_s crm_ipc_t; From 45b1d3b48515e733f02118c0801b9a57e44af5cf Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Fri, 28 Feb 2025 12:40:36 -0500 Subject: [PATCH 06/24] Refactor: libcrmcommon: Add functions for inspecting multipart messages. --- include/crm/common/ipc_internal.h | 3 +++ lib/common/ipc_common.c | 27 +++++++++++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/include/crm/common/ipc_internal.h b/include/crm/common/ipc_internal.h index 517c94c8340..63a4085d870 100644 --- a/include/crm/common/ipc_internal.h +++ b/include/crm/common/ipc_internal.h @@ -244,6 +244,9 @@ int pcmk__ipc_prepare_iov(uint32_t request, const GString *message, int pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message, uint32_t flags); int pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags); +bool pcmk__ipc_msg_is_multipart(void *data); +bool pcmk__ipc_msg_is_multipart_end(void *data); +uint16_t pcmk__ipc_multipart_id(void *data); xmlNode *pcmk__client_data2xml(pcmk__client_t *c, void *data, uint32_t *id, uint32_t *flags); diff --git a/lib/common/ipc_common.c b/lib/common/ipc_common.c index 4996ec6d280..8aa5ec03e79 100644 --- a/lib/common/ipc_common.c +++ b/lib/common/ipc_common.c @@ -70,3 +70,30 @@ pcmk__client_type_str(uint64_t client_type) return "unknown"; } } + +bool +pcmk__ipc_msg_is_multipart(void *data) +{ + pcmk__ipc_header_t *header = data; + + CRM_LOG_ASSERT(data != NULL); + return pcmk_is_set(header->flags, crm_ipc_multipart); +} + +bool +pcmk__ipc_msg_is_multipart_end(void *data) +{ + pcmk__ipc_header_t *header = data; + + CRM_LOG_ASSERT(data != NULL); + return pcmk_is_set(header->flags, crm_ipc_multipart_end); +} + +uint16_t +pcmk__ipc_multipart_id(void *data) +{ + pcmk__ipc_header_t *header = data; + + CRM_LOG_ASSERT(data != NULL); + return header->part_id; +} From 8230d9f05831d5a749c7ba728e7369c36f97dbeb Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Fri, 11 Apr 2025 14:03:44 -0400 Subject: [PATCH 07/24] Refactor: libcrmcommon: Set multipart flags in pcmk__ipc_prepare_iov. We could do this in the callers, but this function has everything it needs to decide which flags should be set. --- lib/common/ipc_server.c | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/common/ipc_server.c b/lib/common/ipc_server.c index a99762c5588..816d6e7efb8 100644 --- a/lib/common/ipc_server.c +++ b/lib/common/ipc_server.c @@ -649,6 +649,14 @@ pcmk__ipc_prepare_iov(uint32_t request, const GString *message, uint16_t index, header->qb.size = iov[0].iov_len + iov[1].iov_len; header->qb.id = (int32_t)request; /* Replying to a specific request */ + if ((rc == pcmk_rc_ok) && (index != 0)) { + pcmk__set_ipc_flags(header->flags, "multipart ipc", + crm_ipc_multipart | crm_ipc_multipart_end); + } else if (rc == pcmk_rc_ipc_more) { + pcmk__set_ipc_flags(header->flags, "multipart ipc", + crm_ipc_multipart); + } + *result = iov; pcmk__assert(header->qb.size > 0); if (bytes != NULL) { From 487ac757c9fb9122efbb5613652f7b566f301e5a Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Fri, 11 Apr 2025 16:25:09 -0400 Subject: [PATCH 08/24] Feature: libcrmcommon: pcmk__ipc_send_xml can send split up IPC messages. If the IPC message is too large to fit in a single buffer, this function now knows how to loop over the message and send it one chunk at a time. This handles the sending side for servers that use the newer style IPC code. This also takes care of sending IPC events from a server. --- lib/common/ipc_server.c | 42 +++++++++++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/lib/common/ipc_server.c b/lib/common/ipc_server.c index 816d6e7efb8..d9ca9041ddf 100644 --- a/lib/common/ipc_server.c +++ b/lib/common/ipc_server.c @@ -753,6 +753,7 @@ pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message, struct iovec *iov = NULL; int rc = pcmk_rc_ok; GString *iov_buffer = NULL; + uint16_t index = 0; if (c == NULL) { return EINVAL; @@ -760,16 +761,41 @@ pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message, iov_buffer = g_string_sized_new(1024); pcmk__xml_string(message, 0, iov_buffer, 0); - rc = pcmk__ipc_prepare_iov(request, iov_buffer, 0, &iov, NULL); + do { + rc = pcmk__ipc_prepare_iov(request, iov_buffer, index, &iov, NULL); + + switch (rc) { + case pcmk_rc_ok: { + /* No more message to prepare after we send this chunk */ + pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free); + rc = pcmk__ipc_send_iov(c, iov, flags); + goto done; + } - if (rc == pcmk_rc_ok) { - pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free); - rc = pcmk__ipc_send_iov(c, iov, flags); - } else { - crm_notice("IPC message to pid %d failed: %s " QB_XS " rc=%d", - c->pid, pcmk_rc_str(rc), rc); - } + case pcmk_rc_ipc_more: + /* Preparing succeeded, but there are more chunks to go after + * this one is sent. + */ + pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free); + rc = pcmk__ipc_send_iov(c, iov, flags); + + /* Did an error occur during transmission? */ + if (rc != pcmk_rc_ok) { + goto done; + } + + index++; + break; + + default: + /* An error occurred during preparation */ + crm_notice("IPC message to pid %d failed: %s " QB_XS " rc=%d", + c->pid, pcmk_rc_str(rc), rc); + goto done; + } + } while (true); +done: g_string_free(iov_buffer, TRUE); return rc; } From f18e97731f877296f16ff46b1ce648dcc4101543 Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Mon, 24 Feb 2025 14:23:03 -0500 Subject: [PATCH 09/24] Feature: libcrmcommon: crm_ipc_send can send large IPC messages. --- lib/common/ipc_client.c | 86 +++++++++++++++++++++++++++-------------- 1 file changed, 58 insertions(+), 28 deletions(-) diff --git a/lib/common/ipc_client.c b/lib/common/ipc_client.c index e3d707c276f..e25184548a7 100644 --- a/lib/common/ipc_client.c +++ b/lib/common/ipc_client.c @@ -1244,13 +1244,13 @@ crm_ipc_send(crm_ipc_t *client, const xmlNode *message, enum crm_ipc_flags flags, int32_t ms_timeout, xmlNode **reply) { int rc = 0; - time_t timeout = 0; ssize_t qb_rc = 0; ssize_t bytes = 0; - struct iovec *iov; + struct iovec *iov = NULL; static uint32_t id = 0; pcmk__ipc_header_t *header; GString *iov_buffer = NULL; + uint16_t index = 0; if (client == NULL) { crm_notice("Can't send IPC request without connection (bug?): %.100s", @@ -1296,39 +1296,69 @@ crm_ipc_send(crm_ipc_t *client, const xmlNode *message, iov_buffer = g_string_sized_new(1024); pcmk__xml_string(message, 0, iov_buffer, 0); - rc = pcmk__ipc_prepare_iov(id, iov_buffer, 0, &iov, &bytes); - if (rc != pcmk_rc_ok) { - crm_warn("Couldn't prepare %s IPC request: %s " QB_XS " rc=%d", - client->server_name, pcmk_rc_str(rc), rc); - g_string_free(iov_buffer, TRUE); - return pcmk_rc2legacy(rc); - } + do { + time_t timeout = 0; - header = iov[0].iov_base; - pcmk__set_ipc_flags(header->flags, client->server_name, flags); + rc = pcmk__ipc_prepare_iov(id, iov_buffer, index, &iov, &bytes); - if (pcmk_is_set(flags, crm_ipc_proxied)) { - /* Don't look for a synchronous response */ - pcmk__clear_ipc_flags(flags, "client", crm_ipc_client_response); - } + if ((rc != pcmk_rc_ok) && (rc != pcmk_rc_ipc_more)) { + crm_warn("Couldn't prepare %s IPC request: %s " QB_XS " rc=%d", + client->server_name, pcmk_rc_str(rc), rc); + g_string_free(iov_buffer, TRUE); + return pcmk_rc2legacy(rc); + } - crm_trace("Sending %s IPC request %d of %u bytes using %dms timeout", - client->server_name, header->qb.id, header->qb.size, ms_timeout); + header = iov[0].iov_base; + pcmk__set_ipc_flags(header->flags, client->server_name, flags); - /* Send the IPC request, respecting any timeout we were passed */ - if (ms_timeout > 0) { - timeout = time(NULL) + 1 + pcmk__timeout_ms2s(ms_timeout); - } + if (pcmk_is_set(flags, crm_ipc_proxied)) { + /* Don't look for a synchronous response */ + pcmk__clear_ipc_flags(flags, "client", crm_ipc_client_response); + } - do { - qb_rc = qb_ipcc_sendv(client->ipc, iov, 2); - } while ((qb_rc == -EAGAIN) && ((timeout == 0) || (time(NULL) < timeout))); + if (pcmk__ipc_msg_is_multipart(header)) { + bool is_end = pcmk__ipc_msg_is_multipart_end(header); + crm_trace("Sending %s IPC request %d (%spart %d) of %u bytes using %dms timeout", + client->server_name, header->qb.id, is_end ? "final " : "", + index, header->qb.size, ms_timeout); + } else { + crm_trace("Sending %s IPC request %d of %u bytes using %dms timeout", + client->server_name, header->qb.id, header->qb.size, ms_timeout); + } - rc = (int) qb_rc; // Negative of system errno, or bytes sent - if (qb_rc <= 0) { - goto send_cleanup; - } + /* Send the IPC request, respecting any timeout we were passed */ + if (ms_timeout > 0) { + timeout = time(NULL) + 1 + pcmk__timeout_ms2s(ms_timeout); + } + + do { + qb_rc = qb_ipcc_sendv(client->ipc, iov, 2); + } while ((qb_rc == -EAGAIN) && ((timeout == 0) || (time(NULL) < timeout))); + + /* An error occurred when sending. */ + if (qb_rc <= 0) { + rc = (int) qb_rc; // Negative of system errno + goto send_cleanup; + } + + /* Sending succeeded. The next action depends on whether this was a + * multipart IPC message or not. + */ + if (rc == pcmk_rc_ok) { + /* This was either a standalone IPC message or the last part of + * a multipart message. Set the return value and break out of + * this processing loop. + */ + rc = (int) qb_rc; // Bytes sent + break; + } else if (rc == pcmk_rc_ipc_more) { + /* This was a multipart message, loop to process the next chunk. */ + index++; + } + + pcmk_free_ipc_event(iov); + } while (true); /* If we should not wait for a response, bail now */ if (!pcmk_is_set(flags, crm_ipc_client_response)) { From 7a38a6439e5143e3470a02b085e61dcf67b5904c Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Thu, 10 Apr 2025 15:11:02 -0400 Subject: [PATCH 10/24] Refactor: libcrmcommon: Remove buf_size from crm_ipc_s. The libqb read buffer is now a fixed size, so we can just use crm_ipc_default_buffer_size() anywhere we need that. And then anywhere we need to know the size of buffer (nowhere at the moment, but possibly in the future) we can just use GByteArray->len to figure that out. So there's no need to keep an extra struct member around anymore. --- lib/common/ipc_client.c | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/common/ipc_client.c b/lib/common/ipc_client.c index e25184548a7..2857e78fa8f 100644 --- a/lib/common/ipc_client.c +++ b/lib/common/ipc_client.c @@ -819,7 +819,6 @@ pcmk_ipc_purge_node(pcmk_ipc_api_t *api, const char *node_name, uint32_t nodeid) struct crm_ipc_s { struct pollfd pfd; - unsigned int buf_size; // size of allocated buffer int msg_size; int need_reply; GByteArray *buffer; @@ -861,7 +860,6 @@ crm_ipc_new(const char *name, size_t max_size) return NULL; } - client->buf_size = crm_ipc_default_buffer_size(); client->buffer = NULL; client->pfd.fd = -1; client->pfd.events = POLLIN; @@ -893,7 +891,7 @@ pcmk__connect_generic_ipc(crm_ipc_t *ipc) } ipc->need_reply = FALSE; - ipc->ipc = qb_ipcc_connect(ipc->server_name, ipc->buf_size); + ipc->ipc = qb_ipcc_connect(ipc->server_name, crm_ipc_default_buffer_size()); if (ipc->ipc == NULL) { return errno; } @@ -1067,7 +1065,7 @@ crm_ipc_read(crm_ipc_t * client) buffer = pcmk__assert_alloc(crm_ipc_default_buffer_size(), sizeof(char)); client->msg_size = qb_ipcc_event_recv(client->ipc, buffer, - client->buf_size, 0); + crm_ipc_default_buffer_size(), 0); if (client->msg_size >= 0) { header = (pcmk__ipc_header_t *)(void*) buffer; @@ -1095,7 +1093,7 @@ crm_ipc_read(crm_ipc_t * client) } if (header) { - client->buffer = g_byte_array_sized_new(client->buf_size); + client->buffer = g_byte_array_sized_new(crm_ipc_default_buffer_size()); g_byte_array_append(client->buffer, (const guint8 *) buffer, client->msg_size); @@ -1156,12 +1154,13 @@ internal_ipc_get_reply(crm_ipc_t *client, int request_id, int ms_timeout, request_id); do { - char *buffer = pcmk__assert_alloc(client->buf_size, sizeof(char)); + char *buffer = pcmk__assert_alloc(crm_ipc_default_buffer_size(), + sizeof(char)); const char *data = NULL; xmlNode *xml = NULL; - *bytes = qb_ipcc_recv(client->ipc, buffer, client->buf_size, - qb_timeout); + *bytes = qb_ipcc_recv(client->ipc, buffer, + crm_ipc_default_buffer_size(), qb_timeout); if (*bytes <= 0) { free(buffer); @@ -1183,7 +1182,7 @@ internal_ipc_get_reply(crm_ipc_t *client, int request_id, int ms_timeout, g_byte_array_free(client->buffer, TRUE); } - client->buffer = g_byte_array_sized_new(client->buf_size); + client->buffer = g_byte_array_sized_new(crm_ipc_default_buffer_size()); g_byte_array_append(client->buffer, (const guint8 *) buffer, *bytes); free(buffer); break; @@ -1276,7 +1275,8 @@ crm_ipc_send(crm_ipc_t *client, const xmlNode *message, char *buffer = pcmk__assert_alloc(crm_ipc_default_buffer_size(), sizeof(char)); - qb_rc = qb_ipcc_recv(client->ipc, buffer, client->buf_size, ms_timeout); + qb_rc = qb_ipcc_recv(client->ipc, buffer, crm_ipc_default_buffer_size(), + ms_timeout); free(buffer); if (qb_rc < 0) { From 69f1463ac042342aacbc620456c8b23942c603d4 Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Thu, 10 Apr 2025 15:20:46 -0400 Subject: [PATCH 11/24] Feature: libcrmcommon: Add pcmk__ipc_msg_append. This function can be called in a loop on the receive side of new-style IPC code to build up a complete IPC message from individual parts. --- include/crm/common/ipc_internal.h | 1 + lib/common/ipc_common.c | 88 +++++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+) diff --git a/include/crm/common/ipc_internal.h b/include/crm/common/ipc_internal.h index 63a4085d870..ed185817268 100644 --- a/include/crm/common/ipc_internal.h +++ b/include/crm/common/ipc_internal.h @@ -244,6 +244,7 @@ int pcmk__ipc_prepare_iov(uint32_t request, const GString *message, int pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message, uint32_t flags); int pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags); +int pcmk__ipc_msg_append(GByteArray **buffer, void *data); bool pcmk__ipc_msg_is_multipart(void *data); bool pcmk__ipc_msg_is_multipart_end(void *data); uint16_t pcmk__ipc_multipart_id(void *data); diff --git a/lib/common/ipc_common.c b/lib/common/ipc_common.c index 8aa5ec03e79..3c7be04054a 100644 --- a/lib/common/ipc_common.c +++ b/lib/common/ipc_common.c @@ -97,3 +97,91 @@ pcmk__ipc_multipart_id(void *data) CRM_LOG_ASSERT(data != NULL); return header->part_id; } + +/*! + * \internal + * \brief Add more data to a partial IPC message + * + * This function can be called repeatedly to build up a complete IPC message + * from smaller parts. It does this by inspecting flags on the message. + * Most of the time, IPC messages will be small enough where this function + * won't get called more than once, but more complex clusters can end up with + * very large IPC messages that don't fit in a single buffer. + * + * Important return values: + * + * - EBADMSG - Something was wrong with the data. + * - pcmk_rc_ipc_more - \p data was a chunk of a partial message and there is + * more to come. The caller should not process the message + * yet and should continue reading from the IPC connection. + * - pcmk_rc_ok - We have the complete message. The caller should process + * it and free the buffer to prepare for the next message. + * + * \param[in,out] c The client to add this data to + * \param[in] data The received IPC message or message portion. The + * caller is responsible for freeing this. + * + * \return Standard Pacemaker return code + */ +int +pcmk__ipc_msg_append(GByteArray **buffer, void *data) +{ + pcmk__ipc_header_t *header = (pcmk__ipc_header_t *) data; + const guint8 *payload = (guint8 *) data + sizeof(pcmk__ipc_header_t); + int rc = pcmk_rc_ok; + + if (!pcmk__valid_ipc_header(header)) { + return EBADMSG; + } + + if (pcmk__ipc_msg_is_multipart_end(data)) { + /* This is the end of a multipart IPC message. Add the payload of the + * received data (so, don't include the header) to the partial buffer. + * Remember that this needs to include the NULL terminating character. + */ + g_byte_array_append(*buffer, payload, header->size); + + } else if (pcmk__ipc_msg_is_multipart(data)) { + if (pcmk__ipc_multipart_id(data) == 0) { + /* This is the first part of a multipart IPC message. Initialize + * the buffer with the entire message, including its header. Do + * not include the NULL terminating character. + */ + *buffer = g_byte_array_new(); + + /* Clear any multipart flags from the header of the incoming part + * so they'll be clear in the fully reassembled message. This + * message is passed to pcmk__client_data2xml, which will extract + * the header flags and return them. Those flags can then be used + * when constructing a reply, including ACKs. We don't want these + * specific incoming flags to influence the reply. + */ + pcmk__clear_ipc_flags(header->flags, "server", + crm_ipc_multipart | crm_ipc_multipart_end); + + g_byte_array_append(*buffer, data, + sizeof(pcmk__ipc_header_t) + header->size - 1); + + } else { + /* This is some intermediate part of a multipart message. Add + * the payload of the received data (so, don't include the header) + * to the partial buffer and return. Do not include the NULL + * terminating character. + */ + g_byte_array_append(*buffer, payload, header->size - 1); + } + + rc = pcmk_rc_ipc_more; + + } else { + /* This is a standalone IPC message. For simplicity in the caller, + * copy the entire message over into a byte array so it can be handled + * the same as a multipart message. + */ + *buffer = g_byte_array_new(); + g_byte_array_append(*buffer, data, + sizeof(pcmk__ipc_header_t) + header->size); + } + + return rc; +} From 9594f111a32bd47ea205f9164797ac4a4c1bce52 Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Thu, 10 Apr 2025 15:40:33 -0400 Subject: [PATCH 12/24] Feature: libcrmcommon: crm_ipc_send can receive large IPC messages. We create a new fixed-size temporary buffer inside crm_ipc_send and receive from libqb into that in order to keep changes to a minimum. Then, we add the contents of that temporary buffer to the client's IPC buffer which is allowed to grow as more of the message is received. Also note that crm_ipc_send has an extra receive block at the top for reading and discarding replies that previously timed out. This block also has to be modified to handle multipart messages, but since we are just throwing those away, there's not much to do. --- lib/common/ipc_client.c | 94 ++++++++++++++++++++++++++--------------- 1 file changed, 59 insertions(+), 35 deletions(-) diff --git a/lib/common/ipc_client.c b/lib/common/ipc_client.c index 2857e78fa8f..fbcae8cf589 100644 --- a/lib/common/ipc_client.c +++ b/lib/common/ipc_client.c @@ -1137,12 +1137,14 @@ crm_ipc_name(crm_ipc_t * client) // \return Standard Pacemaker return code static int internal_ipc_get_reply(crm_ipc_t *client, int request_id, int ms_timeout, - ssize_t *bytes, xmlNode **reply) + xmlNode **reply) { pcmk__ipc_header_t *hdr = NULL; time_t timeout = 0; int32_t qb_timeout = -1; int rc = pcmk_rc_ok; + ssize_t bytes = 0; + int reply_id = 0; if (ms_timeout > 0) { timeout = time(NULL) + 1 + pcmk__timeout_ms2s(ms_timeout); @@ -1159,10 +1161,10 @@ internal_ipc_get_reply(crm_ipc_t *client, int request_id, int ms_timeout, const char *data = NULL; xmlNode *xml = NULL; - *bytes = qb_ipcc_recv(client->ipc, buffer, - crm_ipc_default_buffer_size(), qb_timeout); + bytes = qb_ipcc_recv(client->ipc, buffer, + crm_ipc_default_buffer_size(), qb_timeout); - if (*bytes <= 0) { + if (bytes <= 0) { free(buffer); if (!crm_ipc_connected(client)) { @@ -1175,44 +1177,47 @@ internal_ipc_get_reply(crm_ipc_t *client, int request_id, int ms_timeout, } hdr = (pcmk__ipc_header_t *) (void *) buffer; + reply_id = hdr->qb.id; - if (hdr->qb.id == request_id) { + if (reply_id == request_id) { /* Got the reply we were expecting. */ - if (client->buffer != NULL) { - g_byte_array_free(client->buffer, TRUE); + rc = pcmk__ipc_msg_append(&client->buffer, buffer); + + if (rc == pcmk_rc_ipc_more) { + continue; + } else if (rc != pcmk_rc_ok) { + free(buffer); + return rc; } - client->buffer = g_byte_array_sized_new(crm_ipc_default_buffer_size()); - g_byte_array_append(client->buffer, (const guint8 *) buffer, *bytes); - free(buffer); break; } data = buffer + sizeof(pcmk__ipc_header_t); xml = pcmk__xml_parse(data); - if (hdr->qb.id < request_id) { - crm_err("Discarding old reply %d (need %d)", hdr->qb.id, request_id); + if (reply_id < request_id) { + crm_err("Discarding old reply %d (need %d)", reply_id, request_id); crm_log_xml_notice(xml, "OldIpcReply"); - } else if (hdr->qb.id > request_id) { - crm_err("Discarding newer reply %d (need %d)", hdr->qb.id, request_id); + } else if (reply_id > request_id) { + crm_err("Discarding newer reply %d (need %d)", reply_id, request_id); crm_log_xml_notice(xml, "ImpossibleReply"); pcmk__assert(hdr->qb.id <= request_id); } free(buffer); - } while (time(NULL) < timeout || (timeout == 0 && *bytes == -EAGAIN)); + } while (time(NULL) < timeout || (timeout == 0 && bytes == -EAGAIN)); - if (*bytes > 0) { - crm_trace("Received %zd-byte reply %" PRId32 " to %s IPC %d: %.100s", - *bytes, hdr->qb.id, client->server_name, request_id, - crm_ipc_buffer(client)); + if (client->buffer->len > 0) { + crm_trace("Received %u-byte reply %" PRId32 " to %s IPC %d: %.100s", + client->buffer->len, reply_id, client->server_name, + request_id, crm_ipc_buffer(client)); if (reply != NULL) { *reply = pcmk__xml_parse(crm_ipc_buffer(client)); } - } else if (*bytes < 0) { - rc = (int) -*bytes; // System errno + } else if (bytes < 0) { + rc = (int) -bytes; // System errno crm_trace("No reply to %s IPC %d: %s " QB_XS " rc=%d", client->server_name, request_id, pcmk_rc_str(rc), rc); } @@ -1275,20 +1280,39 @@ crm_ipc_send(crm_ipc_t *client, const xmlNode *message, char *buffer = pcmk__assert_alloc(crm_ipc_default_buffer_size(), sizeof(char)); - qb_rc = qb_ipcc_recv(client->ipc, buffer, crm_ipc_default_buffer_size(), - ms_timeout); - free(buffer); + do { + qb_rc = qb_ipcc_recv(client->ipc, buffer, + crm_ipc_default_buffer_size(), ms_timeout); - if (qb_rc < 0) { - crm_warn("Sending %s IPC disabled until pending reply received", - client->server_name); - return -EALREADY; + header = (void *) buffer; - } else { - crm_notice("Sending %s IPC re-enabled after pending reply received", - client->server_name); - client->need_reply = FALSE; - } + /* We expected a reply but failed to read it, so we can't continue. */ + if (qb_rc < 0) { + crm_warn("Sending %s IPC disabled until pending reply received", + client->server_name); + free(buffer); + return -EALREADY; + + } else if (!pcmk__valid_ipc_header(header)) { + free(buffer); + return -EBADMSG; + + /* We expected a reply and got either a standalone one or the last + * part of a multipart reply. We're done reading the expected reply + * and can continue. + */ + } else if (!pcmk__ipc_msg_is_multipart(header) || + pcmk__ipc_msg_is_multipart_end(header)) { + crm_notice("Sending %s IPC re-enabled after pending reply received", + client->server_name); + client->need_reply = FALSE; + break; + } + + /* Otherwise, just keep looping until we've read the whole thing. */ + } while (true); + + free(buffer); } id++; @@ -1367,9 +1391,9 @@ crm_ipc_send(crm_ipc_t *client, const xmlNode *message, goto send_cleanup; } - rc = internal_ipc_get_reply(client, header->qb.id, ms_timeout, &bytes, reply); + rc = internal_ipc_get_reply(client, header->qb.id, ms_timeout, reply); if (rc == pcmk_rc_ok) { - rc = (int) bytes; // Size of reply received + rc = client->buffer->len; // Size of reply received } else { /* rc is either a positive system errno or a negative standard Pacemaker * return code. If it's an errno, we need to convert it back to a From 59a57ca78e1d59fe4f02e64cc7f3ab50370bb9e6 Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Thu, 10 Apr 2025 15:53:46 -0400 Subject: [PATCH 13/24] Feature: libcrmcommon: crm_ipc_read can receive large IPC messages. This is similar to internal_ipc_get_reply, except it deals with IPC events and doesn't use timeouts. --- lib/common/ipc_client.c | 61 +++++++++++++++++++++++------------------ 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/lib/common/ipc_client.c b/lib/common/ipc_client.c index fbcae8cf589..ce6d92c155c 100644 --- a/lib/common/ipc_client.c +++ b/lib/common/ipc_client.c @@ -1057,48 +1057,55 @@ crm_ipc_ready(crm_ipc_t *client) long crm_ipc_read(crm_ipc_t * client) { - pcmk__ipc_header_t *header = NULL; char *buffer = NULL; long rc = -ENOMSG; pcmk__assert((client != NULL) && (client->ipc != NULL)); buffer = pcmk__assert_alloc(crm_ipc_default_buffer_size(), sizeof(char)); - client->msg_size = qb_ipcc_event_recv(client->ipc, buffer, - crm_ipc_default_buffer_size(), 0); - if (client->msg_size >= 0) { - header = (pcmk__ipc_header_t *)(void*) buffer; - if (!pcmk__valid_ipc_header(header)) { - rc = -EBADMSG; - goto done; + do { + ssize_t bytes = qb_ipcc_event_recv(client->ipc, buffer, + crm_ipc_default_buffer_size(), 0); + pcmk__ipc_header_t *header = NULL; + + if (bytes <= 0) { + crm_trace("No message received from %s IPC: %s", + client->server_name, pcmk_strerror(bytes)); + + if (!crm_ipc_connected(client) || bytes == -ENOTCONN) { + crm_err("Connection to %s IPC failed", client->server_name); + rc = -ENOTCONN; + goto done; + } else if (bytes == -EAGAIN) { + rc = -EAGAIN; + goto done; + } + + break; } - crm_trace("Received %s IPC event %d size=%u rc=%d text='%.100s'", - client->server_name, header->qb.id, header->qb.size, - client->msg_size, buffer + sizeof(pcmk__ipc_header_t)); + header = (pcmk__ipc_header_t *)(void *) buffer; - } else { - crm_trace("No message received from %s IPC: %s", - client->server_name, pcmk_strerror(client->msg_size)); + crm_trace("Received %s IPC event %d size=%u rc=%zd text='%.100s'", + client->server_name, header->qb.id, header->qb.size, bytes, + buffer + sizeof(pcmk__ipc_header_t)); - if (client->msg_size == -EAGAIN) { - rc = -EAGAIN; + rc = pcmk__ipc_msg_append(&client->buffer, buffer); + + if (rc == pcmk_rc_ok) { + break; + } else if (rc == pcmk_rc_ipc_more) { + continue; + } else { + rc = -rc; goto done; } - } - - if (!crm_ipc_connected(client) || client->msg_size == -ENOTCONN) { - crm_err("Connection to %s IPC failed", client->server_name); - } - - if (header) { - client->buffer = g_byte_array_sized_new(crm_ipc_default_buffer_size()); - g_byte_array_append(client->buffer, (const guint8 *) buffer, - client->msg_size); + } while (true); + if (client->buffer->len > 0) { /* Data length excluding the header */ - rc = header->size; + rc = client->buffer->len - sizeof(pcmk__ipc_header_t); } done: From fa2f9c1f8d8a0372e1d753ffca2f43b28a65f066 Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Tue, 4 Mar 2025 14:43:46 -0500 Subject: [PATCH 14/24] Refactor: libcrmcommon: Standardize IPC logging messages. --- lib/common/ipc_client.c | 9 ++------- lib/common/ipc_common.c | 18 ++++++++++++++++++ lib/common/ipc_server.c | 25 +++++++++++++++++++++---- 3 files changed, 41 insertions(+), 11 deletions(-) diff --git a/lib/common/ipc_client.c b/lib/common/ipc_client.c index ce6d92c155c..df58a047a78 100644 --- a/lib/common/ipc_client.c +++ b/lib/common/ipc_client.c @@ -1067,7 +1067,6 @@ crm_ipc_read(crm_ipc_t * client) do { ssize_t bytes = qb_ipcc_event_recv(client->ipc, buffer, crm_ipc_default_buffer_size(), 0); - pcmk__ipc_header_t *header = NULL; if (bytes <= 0) { crm_trace("No message received from %s IPC: %s", @@ -1085,12 +1084,6 @@ crm_ipc_read(crm_ipc_t * client) break; } - header = (pcmk__ipc_header_t *)(void *) buffer; - - crm_trace("Received %s IPC event %d size=%u rc=%zd text='%.100s'", - client->server_name, header->qb.id, header->qb.size, bytes, - buffer + sizeof(pcmk__ipc_header_t)); - rc = pcmk__ipc_msg_append(&client->buffer, buffer); if (rc == pcmk_rc_ok) { @@ -1353,9 +1346,11 @@ crm_ipc_send(crm_ipc_t *client, const xmlNode *message, crm_trace("Sending %s IPC request %d (%spart %d) of %u bytes using %dms timeout", client->server_name, header->qb.id, is_end ? "final " : "", index, header->qb.size, ms_timeout); + crm_trace("Text = '%s'", (char *) iov[1].iov_base); } else { crm_trace("Sending %s IPC request %d of %u bytes using %dms timeout", client->server_name, header->qb.id, header->qb.size, ms_timeout); + crm_trace("Text = '%s'", (char *) iov[1].iov_base); } /* Send the IPC request, respecting any timeout we were passed */ diff --git a/lib/common/ipc_common.c b/lib/common/ipc_common.c index 3c7be04054a..8341787db2e 100644 --- a/lib/common/ipc_common.c +++ b/lib/common/ipc_common.c @@ -141,7 +141,14 @@ pcmk__ipc_msg_append(GByteArray **buffer, void *data) */ g_byte_array_append(*buffer, payload, header->size); + crm_trace("Received IPC request %d (final part %d) of %u bytes", + header->qb.id, header->part_id, header->qb.size); + crm_trace("Text = '%s'", payload); + crm_trace("Buffer = '%s'", (*buffer)->data + sizeof(pcmk__ipc_header_t)); + } else if (pcmk__ipc_msg_is_multipart(data)) { + const char *initial_str = ""; + if (pcmk__ipc_multipart_id(data) == 0) { /* This is the first part of a multipart IPC message. Initialize * the buffer with the entire message, including its header. Do @@ -161,6 +168,7 @@ pcmk__ipc_msg_append(GByteArray **buffer, void *data) g_byte_array_append(*buffer, data, sizeof(pcmk__ipc_header_t) + header->size - 1); + initial_str = "initial "; } else { /* This is some intermediate part of a multipart message. Add @@ -173,6 +181,11 @@ pcmk__ipc_msg_append(GByteArray **buffer, void *data) rc = pcmk_rc_ipc_more; + crm_trace("Received IPC request %d (%spart %d) of %u bytes", + header->qb.id, initial_str, header->part_id, header->qb.size); + crm_trace("Text = '%s'", payload); + crm_trace("Buffer = '%s'", (*buffer)->data + sizeof(pcmk__ipc_header_t)); + } else { /* This is a standalone IPC message. For simplicity in the caller, * copy the entire message over into a byte array so it can be handled @@ -181,6 +194,11 @@ pcmk__ipc_msg_append(GByteArray **buffer, void *data) *buffer = g_byte_array_new(); g_byte_array_append(*buffer, data, sizeof(pcmk__ipc_header_t) + header->size); + + crm_trace("Received IPC request %d of %u bytes", header->qb.id, + header->qb.size); + crm_trace("Text = '%s'", payload); + crm_trace("Buffer = '%s'", (*buffer)->data + sizeof(pcmk__ipc_header_t)); } return rc; diff --git a/lib/common/ipc_server.c b/lib/common/ipc_server.c index d9ca9041ddf..431092766fc 100644 --- a/lib/common/ipc_server.c +++ b/lib/common/ipc_server.c @@ -711,24 +711,41 @@ pcmk__ipc_send_iov(pcmk__client_t *c, struct iovec *iov, uint32_t flags) } else { ssize_t qb_rc; + char *part_text = NULL; CRM_LOG_ASSERT(header->qb.id != 0); /* Replying to a specific request */ + if (pcmk__ipc_msg_is_multipart_end(header)) { + part_text = crm_strdup_printf(" (final part %d) ", header->part_id); + } else if (pcmk__ipc_msg_is_multipart(header)) { + if (pcmk__ipc_multipart_id(header) == 0) { + part_text = crm_strdup_printf(" (initial part %d) ", header->part_id); + } else { + part_text = crm_strdup_printf(" (part %d) ", header->part_id); + } + } else { + part_text = crm_strdup_printf(" "); + } + qb_rc = qb_ipcs_response_sendv(c->ipcs, iov, 2); if (qb_rc < header->qb.size) { if (qb_rc < 0) { rc = (int) -qb_rc; } - crm_notice("Response %" PRId32 " to pid %u failed: %s " + crm_notice("Response %" PRId32 "%sto pid %u failed: %s " QB_XS " bytes=%" PRId32 " rc=%zd ipcs=%p", - header->qb.id, c->pid, pcmk_rc_str(rc), + header->qb.id, part_text, c->pid, pcmk_rc_str(rc), header->qb.size, qb_rc, c->ipcs); + crm_trace("Text = '%s'", (char *) iov[1].iov_base); } else { - crm_trace("Response %" PRId32 " sent, %zd bytes to %p[%u]", - header->qb.id, qb_rc, c->ipcs, c->pid); + crm_trace("Response %" PRId32 "%ssent, %zd bytes to %p[%u]", + header->qb.id, part_text, qb_rc, c->ipcs, c->pid); + crm_trace("Text = '%s'", (char *) iov[1].iov_base); } + free(part_text); + if (flags & crm_ipc_server_free) { pcmk_free_ipc_event(iov); } From ca9c2171cefa35a7f4cd6195a551fca04776fada Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Thu, 10 Apr 2025 16:01:26 -0400 Subject: [PATCH 15/24] Refactor: libcrmcommon: Add a GByteArray buffer to pcmk__client_s. We need this for the same reason that there's a similar member in crm_ipc_s - we need some place to store a potentially split up IPC message while we are reading it piece-by-piece. --- include/crm/common/ipc_internal.h | 10 ++++++++++ lib/common/ipc_server.c | 6 ++++++ 2 files changed, 16 insertions(+) diff --git a/include/crm/common/ipc_internal.h b/include/crm/common/ipc_internal.h index ed185817268..2227d885ca1 100644 --- a/include/crm/common/ipc_internal.h +++ b/include/crm/common/ipc_internal.h @@ -172,6 +172,16 @@ struct pcmk__client_s { int event_timer; GQueue *event_queue; + /* Buffer used to store a multipart IPC message when we are building it + * up over multiple reads. + * + * NOTE: The use of a GByteArray here restricts the maximum size of an + * IPC message. A GByteArray can hold G_MAXUINT bytes, which is the same + * as UINT_MAX. So, an IPC message can be about 4 GB in size, minus the + * header. + */ + GByteArray *buffer; + /* Depending on the client type, only some of the following will be * populated/valid. @TODO Maybe convert to a union. */ diff --git a/lib/common/ipc_server.c b/lib/common/ipc_server.c index 431092766fc..dd3eed83112 100644 --- a/lib/common/ipc_server.c +++ b/lib/common/ipc_server.c @@ -312,6 +312,12 @@ pcmk__free_client(pcmk__client_t *c) free(c->id); free(c->name); free(c->user); + + if (c->buffer != NULL) { + g_byte_array_free(c->buffer, TRUE); + c->buffer = NULL; + } + if (c->remote) { if (c->remote->auth_timeout) { g_source_remove(c->remote->auth_timeout); From 74e56f6e80026a5b3213b100f64cdd8a1c062ab7 Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Tue, 29 Apr 2025 10:55:30 -0400 Subject: [PATCH 16/24] Refactor: libcrmcommon: EAGAIN isn't an error for pcmk__ipc_send_iov... ...at least, when it's sending a server event. In this case, the iov will be added to the send queue and EAGAIN will be returned. We will attempt to send the event again the next time pcmk__ipc_send_iov is called, or when the timer pops to flush the event queue. Thus, we are already handling the EAGAIN return code properly elsewhere and just need to proceed as if everything will be fine. --- daemons/based/based_notify.c | 9 ++++++++- lib/common/ipc_server.c | 20 ++++++++++++++++++-- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/daemons/based/based_notify.c b/daemons/based/based_notify.c index c4107d233aa..e4000e72374 100644 --- a/daemons/based/based_notify.c +++ b/daemons/based/based_notify.c @@ -81,7 +81,14 @@ cib_notify_send_one(gpointer key, gpointer value, gpointer user_data) case pcmk__client_ipc: rc = pcmk__ipc_send_iov(client, update->iov, crm_ipc_server_event); - if (rc != pcmk_rc_ok) { + + /* EAGAIN isn't strictly an error for a server event. The iov + * was added to the send queue, but sending did fail with EAGAIN. + * However, we will attempt to send the event the next time + * pcmk__ipc_send_iov is called, or when crm_ipcs_flush_events_cb + * happens. + */ + if ((rc != EAGAIN) && (rc != pcmk_rc_ok)) { crm_warn("Could not notify client %s: %s " QB_XS " id=%s", pcmk__client_name(client), pcmk_rc_str(rc), client->id); diff --git a/lib/common/ipc_server.c b/lib/common/ipc_server.c index dd3eed83112..4a76f12e83b 100644 --- a/lib/common/ipc_server.c +++ b/lib/common/ipc_server.c @@ -802,8 +802,24 @@ pcmk__ipc_send_xml(pcmk__client_t *c, uint32_t request, const xmlNode *message, pcmk__set_ipc_flags(flags, "send data", crm_ipc_server_free); rc = pcmk__ipc_send_iov(c, iov, flags); - /* Did an error occur during transmission? */ - if (rc != pcmk_rc_ok) { + if (rc == EAGAIN && pcmk_is_set(flags, crm_ipc_server_event)) { + /* In this case, pcmk__ipc_send_iov was able to add the iov + * for this chunk to the send queue, but sending failed with + * EAGAIN. The chunk is still in the queue and we will + * attempt sending again the next time pcmk__ipc_send_iov is + * called, or when crm_ipcs_flush_events_cb happens. + * + * We don't want to interpret this as an error. If we do, + * we'll return from this function somewhere in the middle of + * transmitting the whole IPC message which will make a mess + * of things. Instead, continue with attempting to send the + * next chunk. + */ + index++; + break; + + } else if (rc != pcmk_rc_ok) { + /* Some other error occurred during transmission. */ goto done; } From 21d95e0684e13deb76a222daae132b71fde3b48f Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Fri, 11 Apr 2025 11:39:28 -0400 Subject: [PATCH 17/24] Low: libcrmcommon: Don't assert on checking for a NULL terminator. header->size doesn't mean the same thing anymore now that we have split up IPC messages. If pcmk__client_data2xml is given an IPC message that did not have to be split up, then the size member will be the same as the size of the entire message. However, if it was given an IPC message that had to be split up for transmit and then reassembled, the size member only refers to the size of the last chunk received. Thus, checking for a NULL character in the position given by size will have us checking somewhere in the middle of the reassembled string which is obviously wrong. --- lib/common/ipc_server.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/common/ipc_server.c b/lib/common/ipc_server.c index 4a76f12e83b..ebeabc1b393 100644 --- a/lib/common/ipc_server.c +++ b/lib/common/ipc_server.c @@ -425,8 +425,6 @@ pcmk__client_data2xml(pcmk__client_t *c, void *data, uint32_t *id, pcmk__set_client_flags(c, pcmk__client_proxied); } - pcmk__assert(text[header->size - 1] == 0); - xml = pcmk__xml_parse(text); crm_log_xml_trace(xml, "[IPC received]"); return xml; From be5609993c3c77de02f53f19350d70dffc48fa1e Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Fri, 28 Feb 2025 12:44:11 -0500 Subject: [PATCH 18/24] Feature: daemons: Convert based to support multipart IPC messages. This is basically how every daemon is going to work. The daemon needs to delay calling pcmk__client_data2xml until it has the complete message. The hard work of reassembling the message and determining whether it's a multipart message in the first place is already done elsewhere. --- daemons/based/based_callbacks.c | 41 +++++++++++++++++++++++++++++---- daemons/based/based_notify.c | 22 +++++++++++++----- 2 files changed, 52 insertions(+), 11 deletions(-) diff --git a/daemons/based/based_callbacks.c b/daemons/based/based_callbacks.c index 2b5be57f7d9..0d2383eec4f 100644 --- a/daemons/based/based_callbacks.c +++ b/daemons/based/based_callbacks.c @@ -302,11 +302,46 @@ cib_common_callback_worker(uint32_t id, uint32_t flags, xmlNode * op_request, int32_t cib_common_callback(qb_ipcs_connection_t * c, void *data, size_t size, gboolean privileged) { + int rc = pcmk_rc_ok; uint32_t id = 0; uint32_t flags = 0; uint32_t call_options = cib_none; pcmk__client_t *cib_client = pcmk__find_client(c); - xmlNode *op_request = pcmk__client_data2xml(cib_client, data, &id, &flags); + xmlNode *op_request = NULL; + + if (cib_client == NULL) { + crm_trace("Invalid client %p", c); + return 0; + } + + rc = pcmk__ipc_msg_append(&cib_client->buffer, data); + + if (rc == pcmk_rc_ipc_more) { + /* We haven't read the complete message yet, so just return. */ + return 0; + + } else if (rc == pcmk_rc_ok) { + /* We've read the complete message and there's already a header on + * the front. Pass it off for processing. + */ + op_request = pcmk__client_data2xml(cib_client, cib_client->buffer->data, + &id, &flags); + g_byte_array_free(cib_client->buffer, TRUE); + cib_client->buffer = NULL; + + } else { + /* Some sort of error occurred reassembling the message. All we can + * do is clean up, log an error and return. + */ + crm_err("Error when reading IPC message: %s", pcmk_rc_str(rc)); + + if (cib_client->buffer != NULL) { + g_byte_array_free(cib_client->buffer, TRUE); + cib_client->buffer = NULL; + } + + return 0; + } if (op_request) { int rc = pcmk_rc_ok; @@ -324,10 +359,6 @@ cib_common_callback(qb_ipcs_connection_t * c, void *data, size_t size, gboolean pcmk__ipc_send_ack(cib_client, id, flags, PCMK__XE_NACK, NULL, CRM_EX_PROTOCOL); return 0; - - } else if(cib_client == NULL) { - crm_trace("Invalid client %p", c); - return 0; } if (pcmk_is_set(call_options, cib_sync_call)) { diff --git a/daemons/based/based_notify.c b/daemons/based/based_notify.c index e4000e72374..620cb637b6f 100644 --- a/daemons/based/based_notify.c +++ b/daemons/based/based_notify.c @@ -116,22 +116,32 @@ cib_notify_send(const xmlNode *xml) GString *iov_buffer = NULL; ssize_t bytes = 0; int rc = pcmk_rc_ok; + uint16_t index = 0; iov_buffer = g_string_sized_new(1024); pcmk__xml_string(xml, 0, iov_buffer, 0); - rc = pcmk__ipc_prepare_iov(0, iov_buffer, 0, &iov, &bytes); - if (rc == pcmk_rc_ok) { + do { + rc = pcmk__ipc_prepare_iov(0, iov_buffer, index, &iov, &bytes); + + if ((rc != pcmk_rc_ok) && (rc != pcmk_rc_ipc_more)) { + crm_notice("Could not notify clients: %s " QB_XS " rc=%d", + pcmk_rc_str(rc), rc); + break; + } + update.msg = xml; update.iov = iov; update.iov_size = bytes; pcmk__foreach_ipc_client(cib_notify_send_one, &update); pcmk_free_ipc_event(iov); - } else { - crm_notice("Could not notify clients: %s " QB_XS " rc=%d", - pcmk_rc_str(rc), rc); - } + if (rc == pcmk_rc_ok) { + break; + } + + index++; + } while (true); g_string_free(iov_buffer, TRUE); } From 008cec43967b82715e2dc0136b360a88581a9a64 Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Wed, 26 Feb 2025 10:34:35 -0500 Subject: [PATCH 19/24] Feature: daemons: Convert controld to support multipart IPC messages. --- daemons/controld/controld_control.c | 30 ++++++++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/daemons/controld/controld_control.c b/daemons/controld/controld_control.c index 4b4baac3af7..e2bf3d1dc18 100644 --- a/daemons/controld/controld_control.c +++ b/daemons/controld/controld_control.c @@ -378,11 +378,39 @@ accept_controller_client(qb_ipcs_connection_t *c, uid_t uid, gid_t gid) static int32_t dispatch_controller_ipc(qb_ipcs_connection_t * c, void *data, size_t size) { + int rc = pcmk_rc_ok; uint32_t id = 0; uint32_t flags = 0; pcmk__client_t *client = pcmk__find_client(c); + xmlNode *msg = NULL; + + rc = pcmk__ipc_msg_append(&client->buffer, data); + + if (rc == pcmk_rc_ipc_more) { + /* We haven't read the complete message yet, so just return. */ + return 0; + + } else if (rc == pcmk_rc_ok) { + /* We've read the complete message and there's already a header on + * the front. Pass it off for processing. + */ + msg = pcmk__client_data2xml(client, client->buffer->data, &id, &flags); + g_byte_array_free(client->buffer, TRUE); + client->buffer = NULL; - xmlNode *msg = pcmk__client_data2xml(client, data, &id, &flags); + } else { + /* Some sort of error occurred reassembling the message. All we can + * do is clean up, log an error and return. + */ + crm_err("Error when reading IPC message: %s", pcmk_rc_str(rc)); + + if (client->buffer != NULL) { + g_byte_array_free(client->buffer, TRUE); + client->buffer = NULL; + } + + return 0; + } if (msg == NULL) { pcmk__ipc_send_ack(client, id, flags, PCMK__XE_ACK, NULL, From 2d7770fceaeeaf1606b1dc9c0ad0ae8aa69578f4 Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Fri, 28 Feb 2025 13:08:51 -0500 Subject: [PATCH 20/24] Feature: daemons: Convert fenced to support multipart IPC messages. --- daemons/fenced/pacemaker-fenced.c | 30 ++++++++++++++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/daemons/fenced/pacemaker-fenced.c b/daemons/fenced/pacemaker-fenced.c index e1b6e1f6cd8..4a3a7001368 100644 --- a/daemons/fenced/pacemaker-fenced.c +++ b/daemons/fenced/pacemaker-fenced.c @@ -100,13 +100,39 @@ st_ipc_dispatch(qb_ipcs_connection_t * qbc, void *data, size_t size) return 0; } - request = pcmk__client_data2xml(c, data, &id, &flags); + rc = pcmk__ipc_msg_append(&c->buffer, data); + + if (rc == pcmk_rc_ipc_more) { + /* We haven't read the complete message yet, so just return. */ + return 0; + + } else if (rc == pcmk_rc_ok) { + /* We've read the complete message and there's already a header on + * the front. Pass it off for processing. + */ + request = pcmk__client_data2xml(c, c->buffer->data, &id, &flags); + g_byte_array_free(c->buffer, TRUE); + c->buffer = NULL; + + } else { + /* Some sort of error occurred reassembling the message. All we can + * do is clean up, log an error and return. + */ + crm_err("Error when reading IPC message: %s", pcmk_rc_str(rc)); + + if (c->buffer != NULL) { + g_byte_array_free(c->buffer, TRUE); + c->buffer = NULL; + } + + return 0; + } + if (request == NULL) { pcmk__ipc_send_ack(c, id, flags, PCMK__XE_NACK, NULL, CRM_EX_PROTOCOL); return 0; } - op = crm_element_value(request, PCMK__XA_CRM_TASK); if(pcmk__str_eq(op, CRM_OP_RM_NODE_CACHE, pcmk__str_casei)) { crm_xml_add(request, PCMK__XA_T, PCMK__VALUE_STONITH_NG); From 1c1f4d2d0059d92b8c3452dcf0ebbb0b8cc2f3ee Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Fri, 28 Feb 2025 14:30:19 -0500 Subject: [PATCH 21/24] Feature: daemons: Convert execd to support multipart IPC messages. --- daemons/execd/pacemaker-execd.c | 31 ++++++++++++++++++++++++++++++- daemons/execd/remoted_proxy.c | 31 +++++++++++++++++++++++++++++-- 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/daemons/execd/pacemaker-execd.c b/daemons/execd/pacemaker-execd.c index 68daf77bf39..b51890ce017 100644 --- a/daemons/execd/pacemaker-execd.c +++ b/daemons/execd/pacemaker-execd.c @@ -123,16 +123,45 @@ lrmd_ipc_created(qb_ipcs_connection_t * c) static int32_t lrmd_ipc_dispatch(qb_ipcs_connection_t * c, void *data, size_t size) { + int rc = pcmk_rc_ok; uint32_t id = 0; uint32_t flags = 0; pcmk__client_t *client = pcmk__find_client(c); - xmlNode *request = pcmk__client_data2xml(client, data, &id, &flags); + xmlNode *request = NULL; CRM_CHECK(client != NULL, crm_err("Invalid client"); return FALSE); CRM_CHECK(client->id != NULL, crm_err("Invalid client: %p", client); return FALSE); + rc = pcmk__ipc_msg_append(&client->buffer, data); + + if (rc == pcmk_rc_ipc_more) { + /* We haven't read the complete message yet, so just return. */ + return 0; + + } else if (rc == pcmk_rc_ok) { + /* We've read the complete message and there's already a header on + * the front. Pass it off for processing. + */ + request = pcmk__client_data2xml(client, client->buffer->data, &id, &flags); + g_byte_array_free(client->buffer, TRUE); + client->buffer = NULL; + + } else { + /* Some sort of error occurred reassembling the message. All we can + * do is clean up, log an error and return. + */ + crm_err("Error when reading IPC message: %s", pcmk_rc_str(rc)); + + if (client->buffer != NULL) { + g_byte_array_free(client->buffer, TRUE); + client->buffer = NULL; + } + + return 0; + } + CRM_CHECK(flags & crm_ipc_client_response, crm_err("Invalid client request: %p", client); return FALSE); diff --git a/daemons/execd/remoted_proxy.c b/daemons/execd/remoted_proxy.c index 9083a9d9cc1..cdae7ef1bd8 100644 --- a/daemons/execd/remoted_proxy.c +++ b/daemons/execd/remoted_proxy.c @@ -1,5 +1,5 @@ /* - * Copyright 2012-2024 the Pacemaker project contributors + * Copyright 2012-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * @@ -227,6 +227,7 @@ ipc_proxy_forward_client(pcmk__client_t *ipc_proxy, xmlNode *xml) static int32_t ipc_proxy_dispatch(qb_ipcs_connection_t * c, void *data, size_t size) { + int rc = pcmk_rc_ok; uint32_t id = 0; uint32_t flags = 0; pcmk__client_t *client = pcmk__find_client(c); @@ -253,7 +254,33 @@ ipc_proxy_dispatch(qb_ipcs_connection_t * c, void *data, size_t size) * This function is receiving a request from connection * 1 and forwarding it to connection 2. */ - request = pcmk__client_data2xml(client, data, &id, &flags); + rc = pcmk__ipc_msg_append(&client->buffer, data); + + if (rc == pcmk_rc_ipc_more) { + /* We haven't read the complete message yet, so just return. */ + return 0; + + } else if (rc == pcmk_rc_ok) { + /* We've read the complete message and there's already a header on + * the front. Pass it off for processing. + */ + request = pcmk__client_data2xml(client, client->buffer->data, &id, &flags); + g_byte_array_free(client->buffer, TRUE); + client->buffer = NULL; + + } else { + /* Some sort of error occurred reassembling the message. All we can + * do is clean up, log an error and return. + */ + crm_err("Error when reading IPC message: %s", pcmk_rc_str(rc)); + + if (client->buffer != NULL) { + g_byte_array_free(client->buffer, TRUE); + client->buffer = NULL; + } + + return 0; + } if (!request) { return 0; From 36423b3df0bde73076ce49a474b7f482465366df Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Fri, 28 Feb 2025 16:08:25 -0500 Subject: [PATCH 22/24] Feature: daemons: Convert pacemakerd to support multipart IPC messages. --- daemons/pacemakerd/pcmkd_messages.c | 32 +++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/daemons/pacemakerd/pcmkd_messages.c b/daemons/pacemakerd/pcmkd_messages.c index 3069aab3c74..0dd9a773c62 100644 --- a/daemons/pacemakerd/pcmkd_messages.c +++ b/daemons/pacemakerd/pcmkd_messages.c @@ -1,5 +1,5 @@ /* - * Copyright 2010-2024 the Pacemaker project contributors + * Copyright 2010-2025 the Pacemaker project contributors * * The version control history for this file may have further details. * @@ -207,6 +207,7 @@ pcmk_ipc_destroy(qb_ipcs_connection_t * c) static int32_t pcmk_ipc_dispatch(qb_ipcs_connection_t * qbc, void *data, size_t size) { + int rc = pcmk_rc_ok; uint32_t id = 0; uint32_t flags = 0; xmlNode *msg = NULL; @@ -218,7 +219,34 @@ pcmk_ipc_dispatch(qb_ipcs_connection_t * qbc, void *data, size_t size) pcmkd_register_handlers(); } - msg = pcmk__client_data2xml(c, data, &id, &flags); + rc = pcmk__ipc_msg_append(&c->buffer, data); + + if (rc == pcmk_rc_ipc_more) { + /* We haven't read the complete message yet, so just return. */ + return 0; + + } else if (rc == pcmk_rc_ok) { + /* We've read the complete message and there's already a header on + * the front. Pass it off for processing. + */ + msg = pcmk__client_data2xml(c, c->buffer->data, &id, &flags); + g_byte_array_free(c->buffer, TRUE); + c->buffer = NULL; + + } else { + /* Some sort of error occurred reassembling the message. All we can + * do is clean up, log an error and return. + */ + crm_err("Error when reading IPC message: %s", pcmk_rc_str(rc)); + + if (c->buffer != NULL) { + g_byte_array_free(c->buffer, TRUE); + c->buffer = NULL; + } + + return 0; + } + if (msg == NULL) { pcmk__ipc_send_ack(c, id, flags, PCMK__XE_ACK, NULL, CRM_EX_PROTOCOL); return 0; From 93f7f3d0c583471f04af431f914c88b15b4f3862 Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Fri, 28 Feb 2025 14:40:56 -0500 Subject: [PATCH 23/24] Feature: daemons: Convert attrd to support multipart IPC messages. --- daemons/attrd/attrd_ipc.c | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/daemons/attrd/attrd_ipc.c b/daemons/attrd/attrd_ipc.c index 07fe84399ad..7b5aa30d2ca 100644 --- a/daemons/attrd/attrd_ipc.c +++ b/daemons/attrd/attrd_ipc.c @@ -553,6 +553,7 @@ attrd_ipc_destroy(qb_ipcs_connection_t *c) static int32_t attrd_ipc_dispatch(qb_ipcs_connection_t * c, void *data, size_t size) { + int rc = pcmk_rc_ok; uint32_t id = 0; uint32_t flags = 0; pcmk__client_t *client = pcmk__find_client(c); @@ -565,7 +566,33 @@ attrd_ipc_dispatch(qb_ipcs_connection_t * c, void *data, size_t size) return 0; } - xml = pcmk__client_data2xml(client, data, &id, &flags); + rc = pcmk__ipc_msg_append(&client->buffer, data); + + if (rc == pcmk_rc_ipc_more) { + /* We haven't read the complete message yet, so just return. */ + return 0; + + } else if (rc == pcmk_rc_ok) { + /* We've read the complete message and there's already a header on + * the front. Pass it off for processing. + */ + xml = pcmk__client_data2xml(client, client->buffer->data, &id, &flags); + g_byte_array_free(client->buffer, TRUE); + client->buffer = NULL; + + } else { + /* Some sort of error occurred reassembling the message. All we can + * do is clean up, log an error and return. + */ + crm_err("Error when reading IPC message: %s", pcmk_rc_str(rc)); + + if (client->buffer != NULL) { + g_byte_array_free(client->buffer, TRUE); + client->buffer = NULL; + } + + return 0; + } if (xml == NULL) { crm_debug("Unrecognizable IPC data from PID %d", pcmk__client_pid(c)); From aa4eba1210c29dd1fd454a4a87953b49c0f81840 Mon Sep 17 00:00:00 2001 From: Chris Lumens Date: Tue, 4 Mar 2025 16:08:00 -0500 Subject: [PATCH 24/24] Feature: daemons: Convert schedulerd to support multipart IPC messages. Fixes T903 --- daemons/schedulerd/schedulerd_messages.c | 30 +++++++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/daemons/schedulerd/schedulerd_messages.c b/daemons/schedulerd/schedulerd_messages.c index 719473d4829..fd8a9221ac8 100644 --- a/daemons/schedulerd/schedulerd_messages.c +++ b/daemons/schedulerd/schedulerd_messages.c @@ -223,6 +223,7 @@ pe_ipc_accept(qb_ipcs_connection_t * c, uid_t uid, gid_t gid) static int32_t pe_ipc_dispatch(qb_ipcs_connection_t * qbc, void *data, size_t size) { + int rc = pcmk_rc_ok; uint32_t id = 0; uint32_t flags = 0; xmlNode *msg = NULL; @@ -235,7 +236,34 @@ pe_ipc_dispatch(qb_ipcs_connection_t * qbc, void *data, size_t size) schedulerd_register_handlers(); } - msg = pcmk__client_data2xml(c, data, &id, &flags); + rc = pcmk__ipc_msg_append(&c->buffer, data); + + if (rc == pcmk_rc_ipc_more) { + /* We haven't read the complete message yet, so just return. */ + return 0; + + } else if (rc == pcmk_rc_ok) { + /* We've read the complete message and there's already a header on + * the front. Pass it off for processing. + */ + msg = pcmk__client_data2xml(c, c->buffer->data, &id, &flags); + g_byte_array_free(c->buffer, TRUE); + c->buffer = NULL; + + } else { + /* Some sort of error occurred reassembling the message. All we can + * do is clean up, log an error and return. + */ + crm_err("Error when reading IPC message: %s", pcmk_rc_str(rc)); + + if (c->buffer != NULL) { + g_byte_array_free(c->buffer, TRUE); + c->buffer = NULL; + } + + return 0; + } + if (msg == NULL) { pcmk__ipc_send_ack(c, id, flags, PCMK__XE_ACK, NULL, CRM_EX_PROTOCOL); return 0;