Skip to content

Commit 0b1cf50

Browse files
committed
Changes done for v2.2.7.
1 parent 360e721 commit 0b1cf50

File tree

14 files changed

+449
-45
lines changed

14 files changed

+449
-45
lines changed

com.ibm.streamsx.sttgateway/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changes
22

3+
## v2.2.7
4+
* Feb/02/2021
5+
* Modified the IBMVoiceGatewaySource operator to handle the missing VGW start session message and/or missing speech data packet.
6+
* Added an optional job submission time parameter numberOfEocsNeededForVoiceCallCompletion for users to address the condition mentioned in the previous bullet.
7+
* Added log messages to notify when the condition mentioned in the previous bullet occurs.
8+
39
## v2.2.6
410
* Jan/16/2021
511
* Made the End Of Call Signal (EOCS) sending by the IBMVoiceGatewaySource operator to be more reliable and consistent.

com.ibm.streamsx.sttgateway/com.ibm.streamsx.sttgateway.watson/IBMVoiceGatewaySource/IBMVoiceGatewaySource_cpp.cgt

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
/*
22
==============================================
33
# Licensed Materials - Property of IBM
4-
# Copyright IBM Corp. 2019, 2020
4+
# Copyright IBM Corp. 2019, 2021
55
==============================================
66
*/
77

88
/*
99
============================================================
1010
First created on: Sep/20/2019
11-
Last modified on: Jan/15/2021
11+
Last modified on: Feb/02/2021
1212

1313
Please refer to the sttgateway-tech-brief.txt file in the
1414
top-level directory of this toolkit to read about
@@ -684,15 +684,8 @@ Tree query(Tree& pt, typename Tree::path_type path) {
684684
template <typename EndpointType>
685685
void MY_OPERATOR::on_message(EndpointType* s, websocketpp::connection_hdl hdl,
686686
typename EndpointType::message_ptr msg) {
687-
bool vgwSessionLoggingDone = false;
688687

689688
if (vgwSessionLoggingNeeded == true) {
690-
SPLAPPTRC(L_ERROR, "on_message called with hdl: " << hdl.lock().get()
691-
<< " with a message size of: " << msg->get_payload().size() << " bytes.", "on_message");
692-
vgwSessionLoggingDone = true;
693-
}
694-
695-
if (vgwSessionLoggingDone == false) {
696689
SPLAPPTRC(L_INFO, "on_message called with hdl: " << hdl.lock().get()
697690
<< " with a message size of: " << msg->get_payload().size() << " bytes.", "on_message");
698691
}
@@ -1295,6 +1288,15 @@ void MY_OPERATOR::on_message(EndpointType* s, websocketpp::connection_hdl hdl,
12951288
// have started arriving for this particular VGW session id.
12961289
con_metadata.vgwVoiceChannelNumber = 2;
12971290
}
1291+
1292+
if (vgwSessionLoggingNeeded == true) {
1293+
SPLAPPTRC(L_ERROR, "Operator " << operatorPhysicalName <<
1294+
"-->Channel " << boost::to_string(udpChannelNumber) <<
1295+
"-->X2 Received the very first speech data packet. vgwSessionId=" <<
1296+
con_metadata.vgwSessionId << ", vgwVoiceChannelNumber=" <<
1297+
con_metadata.vgwVoiceChannelNumber <<
1298+
", vgwIsCaller=" << con_metadata.vgwIsCaller, "on_message");
1299+
}
12981300
} // End of if (con_metadata.speechPacketsReceivedCnt == 1)
12991301

13001302
// Update it in the client connections map.
@@ -1365,7 +1367,7 @@ void MY_OPERATOR::on_message(EndpointType* s, websocketpp::connection_hdl hdl,
13651367
submit(oTuple, 0);
13661368

13671369
if (vgwSessionLoggingNeeded == true) {
1368-
SPLAPPTRC(L_ERROR, "Operator " << operatorPhysicalName <<
1370+
SPLAPPTRC(L_INFO, "Operator " << operatorPhysicalName <<
13691371
"-->Channel " << boost::to_string(udpChannelNumber) <<
13701372
"-->X2 Received speech data from the vgwSessionId " <<
13711373
con_metadata.vgwSessionId <<
@@ -1484,7 +1486,15 @@ void MY_OPERATOR::on_close(websocketpp::connection_hdl hdl) {
14841486
bool vgwSessionIdFoundInMap =
14851487
vgw_session_id_map.find(con_metadata.vgwSessionId) != vgw_session_id_map.end();
14861488

1487-
if (vgwSessionIdFoundInMap == true) {
1489+
// For the correct call clean-up operation, this operator must have received
1490+
// call start session messages for both the voice channels in a call and it
1491+
// must have also received speech data bytes from both the voice channels
1492+
// (either silence fillers or real speech data). In that case,
1493+
// vgwVoiceChannelNumber must correctly be set to either 1 or 2.
1494+
// If not, that is going to cause trouble for the downstream operator logic
1495+
// in properly releasing the speech engines assigned for a given call.
1496+
// Please see the log message that will get written in the else block below.
1497+
if (vgwSessionIdFoundInMap == true && con_metadata.vgwVoiceChannelNumber > 0) {
14881498
// Send the "End of Voice Call" signal now for this
14891499
// vgwSessionId_vgwVoiceChannelNumber combo.
14901500
OPort1Type oTuple;
@@ -1521,6 +1531,25 @@ void MY_OPERATOR::on_close(websocketpp::connection_hdl hdl) {
15211531
nOutputTuplesSentMetric->setValueNoLock(nOutputTuplesSent);
15221532
}
15231533
} // End of if (vgw_session_id_map[con_metadata.vgwSessionId] <= 0)
1534+
} else {
1535+
SPLAPPTRC(L_ERROR, "Operator " << operatorPhysicalName <<
1536+
"-->Channel " << boost::to_string(udpChannelNumber) <<
1537+
"-->Possible critical error: Received on_close for connection handle " <<
1538+
hdl.lock().get() <<
1539+
" with its VGW session id not found in the vgw_session_id_map. " <<
1540+
"Reason for this could be either the VGW never sent a start session " <<
1541+
"message for one of the voice channels or it sent a start session message "
1542+
"followed by no binary speech data for that voice channel. This will " <<
1543+
"cause less than the required number of EOCS tuples to be sent to the " <<
1544+
"downstream operator. Unless the downstream operator is configured to " <<
1545+
"handle the reduced number of EOCS tuples for a given VGW session id, " <<
1546+
"it may eventually end up with all the speech processors and the " <<
1547+
"speech engines to be in unreleased i.e. unavailable state for handling " <<
1548+
"any new voice calls. vgwVoiceChannelNumber must be either 1 or 2. " <<
1549+
"If it is not, then it indicates the situation described above. " <<
1550+
"vgwSessionId=" << con_metadata.vgwSessionId <<
1551+
", vgwVoiceChannelNumber=" <<
1552+
con_metadata.vgwVoiceChannelNumber << ".", "on_close");
15241553
} // End of if (vgwSessionIdFoundInMap == true)
15251554

15261555
// This entire if block is a carry-over from the "stop" message processing section in the

com.ibm.streamsx.sttgateway/info.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
**Note:** This toolkit requires c++11 support.
1616
</description>
17-
<version>2.2.6</version>
17+
<version>2.2.7</version>
1818
<requiredProductVersion>4.2.1.6</requiredProductVersion>
1919
</identity>
2020
<dependencies>

samples/VgwDataRouter/com.ibm.streamsx.sttgateway.sample/VgwDataRouter.spl

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
/*
22
==============================================
33
# Licensed Materials - Property of IBM
4-
# Copyright IBM Corp. 2018, 2020
4+
# Copyright IBM Corp. 2018, 2021
55
==============================================
66
*/
77

88
/*
99
==============================================
1010
First created on: Nov/24/2020
11-
Last modified on: Nov/27/2020
11+
Last modified on: Feb/02/2021
1212

1313
A) What does this example application do?
1414
--------------------------------------
@@ -331,6 +331,20 @@ public composite VgwDataRouter {
331331
// Is IBM Voice Gateway message exchange logging needed for debugging?
332332
expression<boolean> $vgwSessionLoggingNeeded :
333333
(boolean)getSubmissionTimeValue("vgwSessionLoggingNeeded", "false");
334+
// Under some cicumstances, if the IBMVoiceGatewaySource operator sends
335+
// only one EOCS (End Of Call Sigmal) tuple instead of two as required for
336+
// the two voice channels, that may eventually cause the application logic
337+
// below not be able to release the speech processor jobs properly at the end of
338+
// a voice call for a given VGW session id. We have seen it in certain
339+
// customer environments. To avoid that condition, such customers can
340+
// configure this application to treat the very first EOCS tuple as
341+
// sufficient to treat a voice call as a "completed call". In that case,
342+
// it will simply ignore if and when a second EOCS tuple arrives.
343+
// This feature can be activated to compensate for the situation described
344+
// above if it happens in some customer environments.
345+
// (Senthil added this on Feb/01/2021).
346+
expression<int32> $numberOfEocsNeededForVoiceCallCompletion :
347+
(int32)getSubmissionTimeValue("numberOfEocsNeededForVoiceCallCompletion", "2");
334348
//
335349
// IBM Watson STT related submission time values are defined below.
336350
expression<int32> $totalNumberOfSpeechProcessorJobs :(int32)
@@ -674,7 +688,8 @@ public composite VgwDataRouter {
674688
boolean key1Exists = has(_vgwSessionVgwVoiceChannelNumberCompletedMap, key1);
675689
boolean key2Exists = has(_vgwSessionVgwVoiceChannelNumberCompletedMap, key2);
676690

677-
if (key1Exists == true && key2Exists == true) {
691+
if ($numberOfEocsNeededForVoiceCallCompletion == 2 &&
692+
(key1Exists == true && key2Exists == true)) {
678693
// Since the voice call for this VGW session id has ended completely,
679694
// we can also release the speech processor id assigned for this call so that
680695
// it can be repurposed for handling any new future calls.
@@ -690,16 +705,54 @@ public composite VgwDataRouter {
690705
removeM(_vgwSessionIdToSpeechProcessorMap, EOCS.vgwSessionId);
691706
removeM(_vgwSessionVgwVoiceChannelNumberCompletedMap, key1);
692707
removeM(_vgwSessionVgwVoiceChannelNumberCompletedMap, key2);
693-
appTrc(Trace.error, "A call with vgwSessionId=" + EOCS.vgwSessionId +
708+
appTrc(Trace.error, "i) A call with vgwSessionId=" + EOCS.vgwSessionId +
709+
" ended and its speech processor id " + (rstring)speechProcessorId +
710+
" got released.");
711+
} else if ($numberOfEocsNeededForVoiceCallCompletion == 1 &&
712+
(key1Exists == true || key2Exists == true)) {
713+
// If the user configured this application to handle
714+
// a single EOCS as sufficient to consider a voice call
715+
// completed for a given VGW session id, we will use this
716+
// block of code. Please refer to the constant i.e. expression
717+
// declaration section above to read the commentary about this idea.
718+
//
719+
// Since the voice call for this VGW session id has ended completely,
720+
// we can also release the speech processor id assigned for this call so that
721+
// it can be repurposed for handling any new future calls.
722+
// We can go ahead and release the speech processor id by adding it back to
723+
// the speech processor status list.
724+
// Let us decrement the given speech processor's current call handling count.
725+
// It is a zero based indexed array. Hence, we have to subtract by 1 to get the
726+
// current index in that array.
727+
_speechProcessorStatusList[speechProcessorId-1] =
728+
_speechProcessorStatusList[speechProcessorId-1] - 1;
729+
730+
// We can now do the clean-up in our state variables.
731+
removeM(_vgwSessionIdToSpeechProcessorMap, EOCS.vgwSessionId);
732+
733+
if(key1Exists == true) {
734+
removeM(_vgwSessionVgwVoiceChannelNumberCompletedMap, key1);
735+
}
736+
737+
if(key2Exists == true) {
738+
removeM(_vgwSessionVgwVoiceChannelNumberCompletedMap, key2);
739+
}
740+
741+
appTrc(Trace.error, "ii) A call with vgwSessionId=" + EOCS.vgwSessionId +
694742
" ended and its speech processor id " + (rstring)speechProcessorId +
695743
" got released.");
696744
}
697745
} else {
698-
appTrc(Trace.error,
699-
"_YYYYY No speech processor id is available at this time for the " +
700-
"vgwSessionId_vgwVoiceChannelNumber: " + _key +
701-
" We are not going to process the currently received EOCS " +
702-
" of this speaker in this voice call. This is serious error.");
746+
// Flag an error only when the user configured for two
747+
// EOCS tuples to be received for considering a voice call
748+
// as complted.
749+
if ($numberOfEocsNeededForVoiceCallCompletion == 2) {
750+
appTrc(Trace.error,
751+
"_YYYYY No speech processor id is available at this time for the " +
752+
"vgwSessionId_vgwVoiceChannelNumber: " + _key +
753+
" We are not going to process the currently received EOCS " +
754+
" of this speaker in this voice call. This is a serious error.");
755+
}
703756
}
704757
} // End of onTuple EOCS.
705758

samples/VgwDataRouter/info.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<info:identity>
55
<info:name>VgwDataRouter</info:name>
66
<info:description>Example that shows how to route VGW speech data to different Speech processor jobs</info:description>
7-
<info:version>1.0.0</info:version>
7+
<info:version>1.0.1</info:version>
88
<info:requiredProductVersion>4.2.1.6</info:requiredProductVersion>
99
</info:identity>
1010
<info:dependencies>

samples/VgwDataRouterToWatsonS2T/com.ibm.streamsx.sttgateway.sample.watsons2t/VgwDataRouterToWatsonS2T.spl

Lines changed: 84 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
/*
22
==============================================
33
# Licensed Materials - Property of IBM
4-
# Copyright IBM Corp. 2018, 2020
4+
# Copyright IBM Corp. 2018, 2021
55
==============================================
66
*/
77

88
/*
99
==============================================
1010
First created on: Nov/24/2020
11-
Last modified on: Nov/28/2020
11+
Last modified on: Feb/02/2021
1212

1313
IMPORTANT NOTE
1414
--------------
@@ -372,7 +372,21 @@ public composite VgwDataRouterToWatsonS2T {
372372
(boolean)getSubmissionTimeValue("wsConnectionLoggingNeeded", "false");
373373
// Is client message exchange logging needed for debugging?
374374
expression<boolean> $wsClientSessionLoggingNeeded :
375-
(boolean)getSubmissionTimeValue("wsClientSessionLoggingNeeded", "false");
375+
(boolean)getSubmissionTimeValue("wsClientSessionLoggingNeeded", "false");
376+
// Under some cicumstances, if the IBMVoiceGatewaySource operator sends
377+
// only one EOCS (End Of Call Sigmal) tuple instead of two as required for
378+
// the two voice channels, that may eventually cause the application logic
379+
// below not be able to release the speech engines properly at the end of
380+
// a voice call for a given VGW session id. We have seen it in certain
381+
// customer environments. To avoid that condition, such customers can
382+
// configure this application to treat the very first EOCS tuple as
383+
// sufficient to treat a voice call as a "completed call". In that case,
384+
// it will simply ignore if and when a second EOCS tuple arrives.
385+
// This feature can be activated to compensate for the situation described
386+
// above if it happens in some customer environments.
387+
// (Senthil added this on Feb/01/2021).
388+
expression<int32> $numberOfEocsNeededForVoiceCallCompletion :
389+
(int32)getSubmissionTimeValue("numberOfEocsNeededForVoiceCallCompletion", "2");
376390
//
377391
// IBM Watson S2T related submission time values are defined below.
378392
expression<int32> $numberOfS2TEngines :(int32)
@@ -858,6 +872,8 @@ public composite VgwDataRouterToWatsonS2T {
858872
" started at " + ctime(getTimestamp()) + ".", fileHandle, err);
859873
fclose(fileHandle, err);
860874
}
875+
876+
appTrc(Trace.error, "A new voice call has started. vgwSessionId=" + BSD.vgwSessionId);
861877
}
862878

863879
// Insert into the state map for future reference.
@@ -935,6 +951,53 @@ public composite VgwDataRouterToWatsonS2T {
935951
insertM(_vgwSessionToCompletedUdpChannelMap, _key, _oTuple.speechEngineId);
936952
}
937953

954+
// Senthil added this if block on Feb/01/2020.
955+
if($numberOfEocsNeededForVoiceCallCompletion == 1) {
956+
// If the user configured this application to handle
957+
// only one EOCS to treat a voice call as completed, then we
958+
// will try to clean-up the other voice channel if it exists.
959+
mutable int32 otherVgwVoiceChannelNumber = 1;
960+
961+
if(EOCS.vgwVoiceChannelNumber == 1) {
962+
otherVgwVoiceChannelNumber = 2;
963+
}
964+
965+
// Get the sessionId + channelNumber combo string.
966+
_key = EOCS.vgwSessionId + "_" + (rstring)otherVgwVoiceChannelNumber;
967+
968+
if (has(_vgwSessionIdToUdpChannelMap, _key) == true) {
969+
// Let us send an empty blob to the WatsonS2T operator to indicate that
970+
// this speaker of a given voice call is done.
971+
_oTuple = (BinarySpeech_t){};
972+
// Copy the three input tuple attributes that must
973+
// match with that of the outgoing tuple.
974+
assignFrom(_oTuple, EOCS);
975+
// Override the following two attributes to reflect the other voice channel.
976+
// Flip this attribute value.
977+
if(_oTuple.isCustomerSpeechData == true) {
978+
_oTuple.isCustomerSpeechData = false;
979+
} else {
980+
_oTuple.isCustomerSpeechData = true;
981+
}
982+
983+
_oTuple.vgwVoiceChannelNumber = otherVgwVoiceChannelNumber;
984+
985+
// Assign the S2T engine id where this voice channel was
986+
// getting processed until now.
987+
_oTuple.speechEngineId = _vgwSessionIdToUdpChannelMap[_key];
988+
// We have to send this tuple to the result processor as well for
989+
// the call recording logic to work correctly.
990+
_oTuple.speechResultProcessorId =
991+
_vgwSessionToResultProcessorChannelMap[EOCS.vgwSessionId];
992+
submit(_oTuple, BSDF);
993+
// We are now done with this vgwSessionId_vgwVoiceChannelNumber combo.
994+
removeM(_vgwSessionIdToUdpChannelMap, _key);
995+
// Add the S2T engine id to this call completed map to be released later in the
996+
// following if block only after receiving EOCS for both the voice channels of this call.
997+
insertM(_vgwSessionToCompletedUdpChannelMap, _key, _oTuple.speechEngineId);
998+
}
999+
}
1000+
9381001
// Since this voice call is ending, let us release the S2T result processor
9391002
// instance that was allocated above for this voice call.
9401003
if (has(_vgwSessionToResultProcessorChannelMap,
@@ -951,8 +1014,22 @@ public composite VgwDataRouterToWatsonS2T {
9511014
// Remove the result processor id only if the EOCS signal
9521015
// was sent for both of the voice channels. That must first
9531016
// happen before we can release the result processor id.
954-
if (has(_vgwSessionIdToUdpChannelMap, key1) == false &&
955-
has(_vgwSessionIdToUdpChannelMap, key2) == false) {
1017+
//
1018+
// This if condition was changed by Senthil on
1019+
// Feb/01/2021 for the following reason.
1020+
// If the user configured this application to handle
1021+
// a single EOCS as sufficient to consider a voice call
1022+
// completed for a given VGW session id, we will use the
1023+
// second || i.e. OR condition. Please refer to the
1024+
// constant i.e. expression declaration section above to
1025+
// read the commentary about this idea.
1026+
//
1027+
if (($numberOfEocsNeededForVoiceCallCompletion == 2 &&
1028+
(has(_vgwSessionIdToUdpChannelMap, key1) == false &&
1029+
has(_vgwSessionIdToUdpChannelMap, key2) == false)) ||
1030+
($numberOfEocsNeededForVoiceCallCompletion == 1 &&
1031+
(has(_vgwSessionIdToUdpChannelMap, key1) == false ||
1032+
has(_vgwSessionIdToUdpChannelMap, key2) == false))) {
9561033
removeM(_vgwSessionToResultProcessorChannelMap, EOCS.vgwSessionId);
9571034

9581035
// Since the voice call for this VGW session id has ended completely,
@@ -985,6 +1062,8 @@ public composite VgwDataRouterToWatsonS2T {
9851062
" ended at " + ctime(getTimestamp()) + ".", fileHandle, err);
9861063
fclose(fileHandle, err);
9871064
}
1065+
1066+
appTrc(Trace.error, "An ongoing voice call has completed. vgwSessionId=" + EOCS.vgwSessionId);
9881067
}
9891068
}
9901069
}

samples/VgwDataRouterToWatsonS2T/info.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<info:identity>
55
<info:name>VgwDataRouterToWatsonS2T</info:name>
66
<info:description>Example that showcases embedded S2T in IBM Streams</info:description>
7-
<info:version>1.0.0</info:version>
7+
<info:version>1.0.1</info:version>
88
<info:requiredProductVersion>4.2.1.6</info:requiredProductVersion>
99
</info:identity>
1010
<info:dependencies>

0 commit comments

Comments
 (0)