|
7 | 7 | "fmt"
|
8 | 8 | "github.com/apache/arrow/go/v8/arrow/flight"
|
9 | 9 | configpb2 "github.com/deephaven/deephaven-core/go/internal/proto/config"
|
| 10 | + sessionpb2 "github.com/deephaven/deephaven-core/go/internal/proto/session" |
10 | 11 | "google.golang.org/grpc/metadata"
|
| 12 | + "google.golang.org/protobuf/proto" |
11 | 13 | "log"
|
12 | 14 | "strconv"
|
13 | 15 | "sync"
|
@@ -36,8 +38,19 @@ func withAuthToken(ctx context.Context, token []byte) context.Context {
|
36 | 38 | }
|
37 | 39 |
|
38 | 40 | // requestToken requests a new token from flight.
|
39 |
| -func requestToken(handshakeClient flight.FlightService_HandshakeClient, handshakeReq *flight.HandshakeRequest) ([]byte, error) { |
40 |
| - err := handshakeClient.Send(handshakeReq) |
| 41 | +func requestToken(handshakeClient flight.FlightService_HandshakeClient, authType string, authToken []byte) ([]byte, error) { |
| 42 | + |
| 43 | + war := sessionpb2.WrappedAuthenticationRequest{ |
| 44 | + Type: authType, |
| 45 | + Payload: authToken, |
| 46 | + } |
| 47 | + payload, err := proto.Marshal(&war) |
| 48 | + if err != nil { |
| 49 | + return nil, err |
| 50 | + } |
| 51 | + handshakeReq := flight.HandshakeRequest{Payload: []byte(payload)} |
| 52 | + |
| 53 | + err = handshakeClient.Send(&handshakeReq) |
41 | 54 |
|
42 | 55 | if err != nil {
|
43 | 56 | return nil, err
|
@@ -122,15 +135,13 @@ func (tr *tokenManager) Close() error {
|
122 | 135 | // "user:password"; when auth_type is DefaultAuth, it will be ignored; when auth_type is a custom-built
|
123 | 136 | // authenticator, it must conform to the specific requirement of the authenticator.
|
124 | 137 | func newTokenManager(ctx context.Context, fs *flightStub, cfg configpb2.ConfigServiceClient, authType string, authToken string) (*tokenManager, error) {
|
125 |
| - authString := makeAuthString(authType, authToken) |
126 |
| - |
127 |
| - handshakeClient, err := fs.handshake(withAuth(ctx, authString)) |
| 138 | + handshakeClient, err := fs.handshake(ctx) |
128 | 139 |
|
129 | 140 | if err != nil {
|
130 | 141 | return nil, err
|
131 | 142 | }
|
132 | 143 |
|
133 |
| - tkn, err := requestToken(handshakeClient, &flight.HandshakeRequest{Payload: []byte(authString)}) |
| 144 | + tkn, err := requestToken(handshakeClient, authType, []byte(authToken)) |
134 | 145 |
|
135 | 146 | if err != nil {
|
136 | 147 | return nil, err
|
@@ -174,10 +185,10 @@ func newTokenManager(ctx context.Context, fs *flightStub, cfg configpb2.ConfigSe
|
174 | 185 | var tkn []byte
|
175 | 186 |
|
176 | 187 | if err == nil {
|
177 |
| - tkn, err = requestToken(handshakeClient, &flight.HandshakeRequest{Payload: oldToken}) |
| 188 | + tkn, err = requestToken(handshakeClient, "Bearer", oldToken) |
178 | 189 | } else {
|
179 | 190 | log.Println("Old token has an error during token update. Attempting to acquire a fresh token. err=", err)
|
180 |
| - tkn, err = requestToken(handshakeClient, &flight.HandshakeRequest{Payload: []byte(authString)}) |
| 191 | + tkn, err = requestToken(handshakeClient, authType, []byte(authToken)) |
181 | 192 | }
|
182 | 193 |
|
183 | 194 | if err != nil {
|
|
0 commit comments