-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
getConsumerContext causing "stream not found" error for some streams #31
Comments
I know that in one (there may be more than one) production environment they have some very extreme security rules that disallow client application from making consumer info calls, this could be the kind of nice permission. getConsumerContext will make a consumer info call as it's called (so it has the consumer info cached already). There are no partitions in NATS JetStream, unlike most other streaming systems, and Spark expects that. From the NATS point of view there's just no (i.e. just one) partitions so here it takes the number of pending messages and divides it by the size of the batch for the pull requests and artificially creates a number of partitions that way. It's artificial anyways because getPartitions is only called once (at the start I would assume) and then there could be more messages added to the stream after that. Not sure what the advantages/disadvantages would be from the Spark point of view but from the NATS point of view getParttions could simply just return 1 partition. Or that number could just be passed as configuration parameter rather than calculated. If access to consumer info is not possible and you want to still calculate (rather than set a number in the config) you could estimate that using stream info to get the number of messages in the stream and divide that by the pull fetch size. In any case yes you can do a PR to use subscribe in replacement of getConsumerContext, but maybe also add for getPartitions the ability to return a number set by config (if set in config) as well as calculated from number pending on the consumer? (let me know if you agree this would be a good addition) |
Ah @jnmoyne I see the benefit of partition from sparks perspective. If the job fails between batches, they way the spark library is written here -- it won't ack those messages. You will inevitably restart your spark job for myriad reasons, so you're guaranteed to have some double reads at some point. |
if we had a way to get the same partition and Ack that partition afterwards en bloc, then we could do away with what is written now, where it caches the whole dataframe between reads. |
What version were you using?
nats.java io.nats:jnats:2.19.1
What environment was the server running in?
Copied code into Databricks notebook running on DBR 13.3
Is this defect reproducible?
Yes. But, only in a private stream within a VPN. I believe this is a niche permissions issue.
Given the capability you are leveraging, describe your expectation?
The library to find streams consistantly.
Given the expectation, what is the defect you are observing?
Not finding stream. It works for one stream/durable pair consistently but not for another. This makes me believe it's a niche configuration/permissions issue.
I have found the following workaround:
Changing
.getConsumerContext(natsSourceParams.streamName, natsSourceParams.consumerName)
To use instead
.subscribe(None.orNull, pullSubscriptionConf)
In getPartitions and getOffset fixes the issue.
I'm assuming it's because subscribe assumes it exists, whereas getConsumerContext will attempt to create one. As
.subscribe
seems like the more stable way to accessgetConsumerInfo
.Let me know if you think this change is reasonable. If so, I'm happy to open a PR to make this change
The text was updated successfully, but these errors were encountered: